```java
public class TumblingWindowAll {
private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888);
public static void main(String[] args) throws Exception {
SingleOutputStreamOperator<Integer> mapped = stream.map((MapFunction<String, Integer>) Integer::valueOf).returns(Types.INT);
AllWindowedStream<Integer, TimeWindow> timeWindowAll = mapped.timeWindowAll(Time.seconds(5));
SingleOutputStreamOperator<Integer> summed = timeWindowAll.sum(0);
summed.print();
env.execute("TumblingWindowAll");
}
}
```
窗口大小是5秒,時間間隔是5秒。換句話說,就是每隔五秒,統計五秒內的結果。
```java
public class TumblingWindow {
private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888);
public static void main(String[] args) throws Exception {
SingleOutputStreamOperator<Tuple2> mapped = stream.map((MapFunction<String, Tuple2>) item -> {
String[] data = item.split(" ");
return Tuple2.of(data[0], Integer.valueOf(data[1]));
}).returns(Types.TUPLE(Types.STRING, Types.INT));
KeyedStream<Tuple2, Tuple> keyed = mapped.keyBy(0);
WindowedStream<Tuple2, Tuple, TimeWindow> timeWindow = keyed.timeWindow(Time.seconds(5));
SingleOutputStreamOperator<Tuple2> summed = timeWindow.sum(1);
summed.print();
env.execute("TumblingWindow");
}
}
```
分組后,每隔五秒,統計一次各個分組的情況。若某些分組數據無變化,則不打印無變化的分組。
```java
public class TumblingProcessingTimeWindowsTest {
private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888);
public static void main(String[] args) throws Exception {
SingleOutputStreamOperator<Tuple2> mapped = stream.map((MapFunction<String, Tuple2>) item -> {
String[] data = item.split(" ");
return Tuple2.of(data[0], Integer.valueOf(data[1]));
}).returns(Types.TUPLE(Types.STRING, Types.INT));
KeyedStream<Tuple2, Tuple> keyed = mapped.keyBy(0);
WindowedStream<Tuple2, Tuple, TimeWindow> timeWindow = keyed.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
SingleOutputStreamOperator<Tuple2> summed = timeWindow.sum(1);
summed.print();
env.execute("TumblingWindow");
}
}
```
- Flink簡介
- flink搭建standalone模式與測試
- flink提交任務(界面方式)
- Flink項目初始化
- Java版WordCount(匿名類)
- Java版WordCount(lambda)
- Scala版WordCount
- Java版WordCount[批處理]
- Scala版WordCount[批處理]
- 流處理非并行的Source
- 流處理可并行的Source
- kafka的Source
- Flink算子(Map,FlatMap,Filter)
- Flink算子KeyBy
- Flink算子Reduce和Max與Min
- addSink自定義Sink
- startNewChain和disableChaining
- 資源槽slotSharingGroup
- 計數窗口
- 滾動窗口
- 滑動窗口
- Session窗口
- 按照EventTime作為標準