# Transformation
## Map
DataStreamSource<T> 轉換 SingleOutputStreamOperator<T>
```java
public class WordCountStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9));
SingleOutputStreamOperator<Integer> result = streamSource.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer item) throws Exception {
return item * 1;
}
});
result.print();
env.execute("WordCountStreamingJob");
}
}
```
比如,傳入的是一個整型的集合,傳出的是另一個整型的集合。一比一對應。
簡化版:
```java
public class WordCountStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9));
SingleOutputStreamOperator<Integer> result = streamSource.map((MapFunction<Integer, Integer>) item -> item * 1);
result.print();
env.execute("WordCountStreamingJob");
}
}
```
## FlatMap
```java
public class WordCountStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9));
streamSource.flatMap(new FlatMapFunction<Integer, Integer>() {
@Override
public void flatMap(Integer item, Collector<Integer> out) throws Exception {
// 復制十份
for (int i = 0; i < 10; i++) {
out.collect(item);
}
}
}).print();
env.execute("WordCountStreamingJob");
}
}
```
傳入一個集合,把每個集合復制十份,再輸出一個集合。
## Filter
```java
public class WordCountStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9));
streamSource.filter((FilterFunction<Integer>) item -> item > 5).print();
env.execute("WordCountStreamingJob");
}
}
```
返回一個布爾值,來過濾數據。
- Flink簡介
- flink搭建standalone模式與測試
- flink提交任務(界面方式)
- Flink項目初始化
- Java版WordCount(匿名類)
- Java版WordCount(lambda)
- Scala版WordCount
- Java版WordCount[批處理]
- Scala版WordCount[批處理]
- 流處理非并行的Source
- 流處理可并行的Source
- kafka的Source
- Flink算子(Map,FlatMap,Filter)
- Flink算子KeyBy
- Flink算子Reduce和Max與Min
- addSink自定義Sink
- startNewChain和disableChaining
- 資源槽slotSharingGroup
- 計數窗口
- 滾動窗口
- 滑動窗口
- Session窗口
- 按照EventTime作為標準