<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                ### reduce:減少、降低、縮小、減低、削減、縮減、壓縮、簡化、裁減、精簡、簡約 ... 總之,reduce就是由多變少的意思。 ```java public interface ReduceFunction<T> extends Function, Serializable { T reduce(T value1, T value2) throws Exception; } ``` `reduce`方法,傳入`value1`和`value2`兩個相同類型的值,傳出去一個同類型的值。所以,輸入的多,輸出的少。就是`reduce減少`的意思。 --- ```java public class WordCountStreamingJob { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource = env.fromElements("Apollo", "Paul", "Tom", "Paul", "Apollo", "Tom", "Marry"); SingleOutputStreamOperator<Tuple2> mapped = dataStreamSource.map((MapFunction<String, Tuple2>) item -> Tuple2.of(item, 1)).returns(Types.TUPLE(Types.STRING, Types.INT)); KeyedStream<Tuple2, Tuple> keyed = mapped.keyBy(0); SingleOutputStreamOperator<Tuple2> returns = keyed.reduce((ReduceFunction<Tuple2>) (value1, value2) -> Tuple2.of(value1.f0, (Integer) value1.f1 + (Integer) value2.f1)).returns(Types.TUPLE(Types.STRING, Types.INT)); returns.print(); env.execute("WordCountStreamingJob"); } } ``` 示例中,傳入兩個Tuple2<String,Integer>類型的元素,傳出一個Tuple2<String,Integer>類型的元素。 其中,Tuple2.f0都一樣,Tuple2.f1進行了相加,然后返回。 --- ### Max與Min,分組后聚合函數。 ```java public class WordCountStreamingJob { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource = env.fromElements("Apollo 1", "Paul 3", "Tom 2", "Paul 1", "Apollo 2", "Tom 4", "Marry 6"); SingleOutputStreamOperator<Tuple2> mapped = dataStreamSource.map(new MapFunction<String, Tuple2>() { @Override public Tuple2 map(String item) throws Exception { return Tuple2.of(item.split(" ")[0], Integer.valueOf(item.split(" ")[1])); } }).returns(Types.TUPLE(Types.STRING, Types.INT)); KeyedStream<Tuple2, Tuple> keyed = mapped.keyBy(0); SingleOutputStreamOperator<Tuple2> result = keyed.max(1); result.print(); env.execute("WordCountStreamingJob"); } } ``` SingleOutputStreamOperator<Tuple2> result = keyed.max(1); **根據第二個字段求最大值。**
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看