```java
StreamExecutionEnvironment.getExecutionEnvironment();
```
根據環境判斷是本地環境還是集群環境,來創建運行環境。
```java
DataStream<String> lines = env.socketTextStream("192.168.8.111", 8888);
```
DataStreamSource是DataStream的實現類。

DataStream是抽象的數據集,不實際裝數據,只是數據集的描述。
通過轉換方法可以被轉換成其他的DataStream。
```java
DataStreamSource<String> lines = env.fromElements();
```
fromElements方法,通常用來做實驗的。(這只是一個玩具 ^_^ )
```java
DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8);
```
同理,fromCollection和fromElements方法類似,只不過它是個集合。
```java
DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8));
```
獲取并行度 [getParallelism方法]
```java
streamSource.getParallelism()
```
```java
public class WordCountStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14));
/**
* fromCollection返回的DataStreamSource并行度為1
*/
System.out.println(streamSource.getParallelism());
SingleOutputStreamOperator<Integer> filtered = streamSource.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value % 2 == 0;
}
});
/**
* filter返回的DataStreamSource并行度為12
*/
System.out.println(filtered.getParallelism());
filtered.print();
env.execute("WordCountStreamingJob");
}
}
```
并行度在程序執行前,程序已經知道了,以為它只是一個描述信息,已經知道了又幾個并行。
```java
env.socketTextStream("192.168.8.111", 8888); // 并行度也為1
```
---
綜上所述:
socketTextStream、fromElements、fromCollection返回DataStream的并行度默認均為1。
可以通過<u>setParallelism</u>方法進行設置并行度。
```java
filtered.setParallelism(6);
```
- Flink簡介
- flink搭建standalone模式與測試
- flink提交任務(界面方式)
- Flink項目初始化
- Java版WordCount(匿名類)
- Java版WordCount(lambda)
- Scala版WordCount
- Java版WordCount[批處理]
- Scala版WordCount[批處理]
- 流處理非并行的Source
- 流處理可并行的Source
- kafka的Source
- Flink算子(Map,FlatMap,Filter)
- Flink算子KeyBy
- Flink算子Reduce和Max與Min
- addSink自定義Sink
- startNewChain和disableChaining
- 資源槽slotSharingGroup
- 計數窗口
- 滾動窗口
- 滑動窗口
- Session窗口
- 按照EventTime作為標準