```java
DataStreamSource<Long> streamSource = env.fromParallelCollection(new NumberSequenceIterator(1, 10), Long.class);
```
```java
DataStreamSource<Long> streamSource = env.fromParallelCollection(new NumberSequenceIterator(1, 10), TypeInformation.of(Long.TYPE));
```
查看并行度
```java
System.out.println(streamSource.getParallelism()); // 并行度是 12
```
可以設置并行度
```java
streamSource.setParallelism(4);
```
1到100的序列
```java
DataStreamSource<Long> streamSource = env.generateSequence(1, 100);
```
generateSequence得到的DataStreamSource的并行度是12。
以上的Source都是玩具,實際上不會使用的。
```java
env.readTextFile("C:\\Users\\Administrator\\Desktop\\data");
```
readTextFile的并行度是12。
```java
public class WordCountStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> streamSource = env.readTextFile("C:\\Users\\Administrator\\Desktop\\data");
System.out.println(streamSource.getParallelism());
SingleOutputStreamOperator<Tuple2> wordAndOne = streamSource.flatMap((FlatMapFunction<String, Tuple2>) (line, out) -> {
for (String word : line.split(" ")) {
out.collect(Tuple2.of(word, 1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT));
System.out.println(wordAndOne.getParallelism());
wordAndOne.keyBy(0).sum(1).print();
env.execute("WordCountStreamingJob");
}
}
```
綜上所述:
readTextFile、fromParallelCollection、generateSequence它們的并行度是跟可用的邏輯核數是相關的。可以多并行的。
readTextFile也不是可以一直運行的。socketTextStream是可以一直運行的,但是并行度是1。
- Flink簡介
- flink搭建standalone模式與測試
- flink提交任務(界面方式)
- Flink項目初始化
- Java版WordCount(匿名類)
- Java版WordCount(lambda)
- Scala版WordCount
- Java版WordCount[批處理]
- Scala版WordCount[批處理]
- 流處理非并行的Source
- 流處理可并行的Source
- kafka的Source
- Flink算子(Map,FlatMap,Filter)
- Flink算子KeyBy
- Flink算子Reduce和Max與Min
- addSink自定義Sink
- startNewChain和disableChaining
- 資源槽slotSharingGroup
- 計數窗口
- 滾動窗口
- 滑動窗口
- Session窗口
- 按照EventTime作為標準