<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                # keyBy ### 元組方式 ```java public class WordCountStreamingJob { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 1, 2, 3, 1, 2, 3)); SingleOutputStreamOperator<Tuple2> mapped = streamSource.map(new MapFunction<Integer, Tuple2>() { @Override public Tuple2 map(Integer item) throws Exception { return Tuple2.of(item, 1); } }).returns(Types.TUPLE(Types.INT, Types.INT)); KeyedStream<Tuple2, Tuple> stream = mapped.keyBy(0); stream.print(); env.execute("WordCountStreamingJob"); } } ``` ### 自定義Bean方式 ##### 定義一個`Bean`名稱為`WordCount` ```java package com.gosuncn; public class WordCount { public Integer word; public Integer count; public WordCount() { } public WordCount(Integer word, Integer count) { this.word = word; this.count = count; } public static WordCount of(Integer word, Integer count) { return new WordCount(word, count); } @Override public String toString() { return "WordCount{" + "word=" + word + ", count=" + count + '}'; } } ``` ##### 通過`WordCount`中的`word`字段進行分組,不能使用腳標`0 1 2 3 ...`了。 ```java public class WordCountStreamingJob { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 1, 2, 3, 1, 2, 3)); SingleOutputStreamOperator<WordCount> mapped = streamSource.map(new MapFunction<Integer, WordCount>() { @Override public WordCount map(Integer item) throws Exception { return WordCount.of(item, 1); } }).returns(WordCount.class); KeyedStream<WordCount, Tuple> stream = mapped.keyBy("word"); stream.print(); env.execute("WordCountStreamingJob"); } } ``` ### 多字段分組 ```java public class WordCountStreamingJob { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> streamSource = env.fromCollection(Arrays.asList("江蘇 南京", "江蘇 徐州", "湖南 長沙", "廣東 廣州", "遼寧 沈陽", "廣東 廣州", "湖南 長沙", "江蘇 南京", "遼寧 沈陽", "湖南 張家界")); SingleOutputStreamOperator<Tuple3> mapped = streamSource.map(new MapFunction<String, Tuple3>() { @Override public Tuple3 map(String item) throws Exception { return Tuple3.of(item.split(" ")[0], item.split(" ")[1], 1); } }).returns(Types.TUPLE(Types.STRING, Types.STRING, Types.INT)); KeyedStream<Tuple3/*分組后的類型*/, Tuple/*分組的key的類型*/> keyed = mapped.keyBy(0, 1); SingleOutputStreamOperator<Tuple3> summed = keyed.sum(2); summed.print(); env.execute("WordCountStreamingJob"); } } ``` **同理,如果自定義Bean的話,不能用腳標了,要用Bean的字段名稱。**
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看