# Joining Streams in Storm Core
Storm 支持通過 JoinBolt 來 join 多個 data streams 變成一個 stream. `JoinBolt`是一個 Windowed bolt。`JoinBolt` 會等待配置的窗口時間來匹配被join 的streams的tuples。這有助于通過窗口邊界生成streams.
`JoinBolt` 每個進來的 data streams 必須基于一個字段進行 Field Group。stream只能使用被 FieldsGrouped的字段 join 其他stream。
## Performing Joins
考慮下面的 SQL join,設計四張表:
```
select userId, key4, key2, key3
from table1
inner join table2 on table2.userId = table1.key1
inner join table3 on table3.key3 = table2.userId
left join table4 on table4.key4 = table3.key3
```
相同的可以使用`JoinBolt`,join四個spouts,生成想要的tuples:
```
JoinBolt jbolt = new JoinBolt("spout1", "key1") // from spout1
.join ("spout2", "userId", "spout1") // inner join spout2 on spout2.userId = spout1.key1
.join ("spout3", "key3", "spout2") // inner join spout3 on spout3.key3 = spout2.userId
.leftJoin ("spout4", "key4", "spout3") // left join spout4 on spout4.key4 = spout3.key3
.select ("userId, key4, key2, spout3:key3") // chose output fields
.withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ;
topoBuilder.setBolt("joiner", jbolt, 1)
.fieldsGrouping("spout1", new Fields("key1") )
.fieldsGrouping("spout2", new Fields("userId") )
.fieldsGrouping("spout3", new Fields("key3") )
.fieldsGrouping("spout4", new Fields("key4") );
```
bolt 構造器需要兩個參數.第一個參數介紹了第一個stream來自于 `spout1`,并指定了通過key1來和其他 streams 連接.組件的名稱必須根據直接連接 Join bolt的 `spout` 或者bolt來設置.這里,來自于spout1的數據必須根據 `key1` 來 field group。同樣的,調用 `leftJoin()` 和 `join()` 方法的時候,也會通過這個字段來join.根據上面的例子,FieldGrouping 要求也適用于其他spout 的streams。第三個參數表示streams要和哪個spout的streams連接.
`select()` 方法用來指定 output fields。`select` 參數是逗號分隔的字段列表。單個字段可以通過 stream名稱作為前綴,來區別不同streams中相同的字段,像這樣:`.select("spout3:key3, spout4:key3")`.嵌套的tuple 類型是支持的.例如,`outer.inner.innermost` 就是一個字段嵌套三層深度,`outer` 和 `inner` 是 `Map` 的類型.
join 參數中的字段不允許用 stream 名稱作為前綴,但是支持嵌套字段.
上面調用 `withTumblingWindow()`方法,將join 窗口配置成10分鐘的翻滾窗口.由于 `JoinBolt` 是一個窗口 spout,我們還可以使用 `withWindow` 方法將其配置為滑動窗口(參考下面的提示部分).
## Stream names and Join order
* Streams name 在引用之前必須聲明,在構造函數和 join方法的第一個參數都需要 Streams name,join方法的第三個參數會用到Stream name.像下面這樣引用stream name是不允許的:
```
new JoinBolt( "spout1", "key1")
.join ( "spout2", "userId", "spout3") //not allowed. spout3 not yet introduced
.join ( "spout3", "key3", "spout1")
```
* 在內部,join將按照用戶所表示的順序執行。
## Joining based on Stream names
為了簡單起見,Storm topology(拓撲)經常使用 `default` stream 。拓撲也可以使用命名的stream 而不是`default` stream。為了支持這種 topology(拓撲),可以通過構造函數的第一個參數將 `JoinBolt` 配置為使用 stream name 而不是源組件(spout / bolt)名稱:
```
new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1")
.join("stream2", "key2")
...
```
第一個參數 `JoinBolt.Selector.STREAM` 通知 bolt `stream1/2/3/4` 引用 named stream (而不是上游 spouts/bolts的名稱)。
以下示例從四個 spouts 連接兩個命名流:
```
new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1")
.join ("stream2", "userId", "stream1" )
.select ("userId, key1, key2")
.withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ;
topoBuilder.setBolt("joiner", jbolt, 1)
.fieldsGrouping("bolt1", "stream1", new Fields("key1") )
.fieldsGrouping("bolt2", "stream1", new Fields("key1") )
.fieldsGrouping("bolt3", "stream2", new Fields("userId") )
.fieldsGrouping("bolt4", "stream1", new Fields("key1") );
```
在上述示例中,例如,spout1也可能發送其他 stream。但是連接 bolt 只是從不同的 bolts 訂閱了`stream1`&stream2。來自`bolt1`,`bolt2`和`bolt4`的`stream1`被視為單個 stream,并且與`bolt3`相連接。
## Limitations:
1.當前值支持 INNER 和LEFT join.
1. 不同于SQL,它允許通過不同的 keys 將相同的表和不同的表連接,這里必須在stream 上使用相同的字段. Fields Grouping 保證tuples被正確連接到 JoinBolt的實例.因此,FieldsGrouping字段必須與 join 字段相同,以獲得正確的結果.要在多個字段上執行 join,可以將這些字段組著成一個字段,然后發送到 Join Bolt。
## Tips:
1. Join 可以是CPU和內存密集型.當前窗口中積累的數據越大(與窗口長度成正比),join所需要的時間就越長。滑動間隔很短(例如幾秒鐘)會觸發頻繁的連接.因此,如果使用大的窗口長度或者小的滑動間隔,則性能可能受損.
2. 使用滑動窗口時,跨窗口重復 join 記錄。這是因為使用滑動窗口時,tuples 在多個窗口中繼續存在。
3. 如果啟用了消息超時,請確保超時設置(topology.message.timeout.secs)足夠大以舒適地適應窗口大小,以及其他 spouts 和 bolts 的附加處理。
4. 在最壞的情況下,連接一個具有M和N個元素的兩個 streams 的窗口,可以產生每個輸出元組的MxN元素,每個輸出 tuple 從每個輸入流錨定到一個 tuple 。這可能意味著來自JoinBolt的大量輸出元組和甚至更多的ACK用于下游 spout 發出。這可能會對消息傳遞系統造成重大壓力,如果不小心,則會大大減緩 topology(拓撲)結構。要管理消息傳遞子系統上的負載,建議:
* 增加 worker 堆(topology.worker.max.heap.size.mb)。
* 如果您的 topology(拓撲)不需要ACK,則禁用ACKers(topology.acker.executors = 0)。
* 禁用事件記錄器(topology.eventlogger.executors = 0)。
* 打開拓撲調試(topology.debug = false)。
* 將topology.max.spout設置為一個足夠大的值,以容納估計的全窗口值的 tuple 加上一些更多的余量。這有助于減少在消息傳遞子系統遇到過載時發出過多元組的端口的可能性。當它的值設置為null時,可能會發生這種情況。
* 最后,將窗口大小保持在解決手頭問題所需的最小值。
- Storm 基礎
- 概念
- Scheduler(調度器)
- Configuration
- Guaranteeing Message Processing
- 守護進程容錯
- 命令行客戶端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 綜述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 語言參考
- Storm SQL 內部實現
- Flux
- Storm 安裝和部署
- 設置Storm集群
- 本地模式
- 疑難解答
- 在生產集群上運行 Topology
- Maven
- 安全地運行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 資源感知調度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各種內部行為的 Metrics
- Windows 用戶指南
- Storm 中級
- 序列化
- 常見 Topology 模式
- Clojure DSL
- 使用沒有jvm的語言編輯storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 狀態管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 調試
- 動態日志級別設置
- Storm Logs
- 動態員工分析
- 拓撲事件檢查器
- Storm 與外部系統, 以及其它庫的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息隊列遙測傳輸) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高級
- 針對 Storm 定義一個不是 JVM 的 DSL
- 多語言協議
- Storm 內部實現
- 翻譯進度