按照數據所攜帶的時間來劃分窗口
```java
public class EventTimeSessionWindowAll {
private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888);
public static void main(String[] args) throws Exception {
/**
* 設置EventTime作為標準
*/
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
/**
* 從數據中提取時間字段作為EventTime,不會改變原有數據的樣子。
*/
SingleOutputStreamOperator<String> dataStream = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
@Override
public long extractTimestamp(String item) {
// 1000,spark,1
String[] data = item.split(",");
return Long.parseLong(data[0]);
}
});
SingleOutputStreamOperator<Tuple3> mapped = dataStream.map((MapFunction<String, Tuple3>) item -> {
String[] data = item.split(",");
return Tuple3.of(Long.parseLong(data[0]), data[1], Integer.valueOf(data[2]));
}).returns(Types.TUPLE(Types.LONG, Types.STRING, Types.INT));
AllWindowedStream<Tuple3, TimeWindow> eventTimeSessionWindowAll = mapped.windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)));
eventTimeSessionWindowAll.sum(2).print();
env.execute("EventTimeSessionWindowAll");
}
}
```
- Flink簡介
- flink搭建standalone模式與測試
- flink提交任務(界面方式)
- Flink項目初始化
- Java版WordCount(匿名類)
- Java版WordCount(lambda)
- Scala版WordCount
- Java版WordCount[批處理]
- Scala版WordCount[批處理]
- 流處理非并行的Source
- 流處理可并行的Source
- kafka的Source
- Flink算子(Map,FlatMap,Filter)
- Flink算子KeyBy
- Flink算子Reduce和Max與Min
- addSink自定義Sink
- startNewChain和disableChaining
- 資源槽slotSharingGroup
- 計數窗口
- 滾動窗口
- 滑動窗口
- Session窗口
- 按照EventTime作為標準