### reduce:減少、降低、縮小、減低、削減、縮減、壓縮、簡化、裁減、精簡、簡約 ...
總之,reduce就是由多變少的意思。
```java
public interface ReduceFunction<T> extends Function, Serializable {
T reduce(T value1, T value2) throws Exception;
}
```
`reduce`方法,傳入`value1`和`value2`兩個相同類型的值,傳出去一個同類型的值。所以,輸入的多,輸出的少。就是`reduce減少`的意思。
---
```java
public class WordCountStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource = env.fromElements("Apollo", "Paul", "Tom", "Paul", "Apollo", "Tom", "Marry");
SingleOutputStreamOperator<Tuple2> mapped = dataStreamSource.map((MapFunction<String, Tuple2>) item -> Tuple2.of(item, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
KeyedStream<Tuple2, Tuple> keyed = mapped.keyBy(0);
SingleOutputStreamOperator<Tuple2> returns = keyed.reduce((ReduceFunction<Tuple2>) (value1, value2) -> Tuple2.of(value1.f0, (Integer) value1.f1 + (Integer) value2.f1)).returns(Types.TUPLE(Types.STRING, Types.INT));
returns.print();
env.execute("WordCountStreamingJob");
}
}
```
示例中,傳入兩個Tuple2<String,Integer>類型的元素,傳出一個Tuple2<String,Integer>類型的元素。
其中,Tuple2.f0都一樣,Tuple2.f1進行了相加,然后返回。
---
### Max與Min,分組后聚合函數。
```java
public class WordCountStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource = env.fromElements("Apollo 1", "Paul 3", "Tom 2", "Paul 1", "Apollo 2", "Tom 4", "Marry 6");
SingleOutputStreamOperator<Tuple2> mapped = dataStreamSource.map(new MapFunction<String, Tuple2>() {
@Override
public Tuple2 map(String item) throws Exception {
return Tuple2.of(item.split(" ")[0], Integer.valueOf(item.split(" ")[1]));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT));
KeyedStream<Tuple2, Tuple> keyed = mapped.keyBy(0);
SingleOutputStreamOperator<Tuple2> result = keyed.max(1);
result.print();
env.execute("WordCountStreamingJob");
}
}
```
SingleOutputStreamOperator<Tuple2> result = keyed.max(1); **根據第二個字段求最大值。**
- Flink簡介
- flink搭建standalone模式與測試
- flink提交任務(界面方式)
- Flink項目初始化
- Java版WordCount(匿名類)
- Java版WordCount(lambda)
- Scala版WordCount
- Java版WordCount[批處理]
- Scala版WordCount[批處理]
- 流處理非并行的Source
- 流處理可并行的Source
- kafka的Source
- Flink算子(Map,FlatMap,Filter)
- Flink算子KeyBy
- Flink算子Reduce和Max與Min
- addSink自定義Sink
- startNewChain和disableChaining
- 資源槽slotSharingGroup
- 計數窗口
- 滾動窗口
- 滑動窗口
- Session窗口
- 按照EventTime作為標準