# Trident API 綜述
"Stream" 是 Trident 中的核心數據模型, 它被當做一系列的 batch 來處理.在 Storm 集群的節點之間, 一個 stream 被劃分成很多 partition (分區), 對流的 operation (操作)是在每個 partition 上并行進行的.
注: 1\. "Stream" 是 Trident 中的核心數據模型:有些地方也說是 TridentTuple , 沒有個標準的說法. 2\. 一個 stream 被劃分成很多 partition : partition 是 stream 的一個子集, 里面可能有多個 batch , 一個 batch 也可能位于不同的 partition 上.
Trident 有 5 類操作:
1. Partition-local operations , 對每個 partition 的局部操作, 不產生網絡傳輸
2. Repartitioning operations: 對 stream (數據流)的重新劃分(僅僅是劃分, 但不改變內容), 產生網絡傳輸
3. 作為 operation (操作)的一部分進行網絡傳輸的 Aggregation operations (聚合操作).
4. Operations on grouped streams (作用在分組流上的操作)
5. Merges 和 joins 操作
## Partition-local operations
Partition-local operations (分區本地操作)不涉及網絡傳輸, 并且獨立地應用于每個 batch partition (批處理分區).
### Functions
一個 function 收到一個輸入 tuple 后可以輸出 0 或多個 tuple , 輸出 tuple 的字段被追加到接收到的輸入 tuple 后面.如果對某個 tuple 執行 function 后沒有輸出 tuple, 則該 tuple 被 filter(過濾), 否則, 就會為每個輸出 tuple 復制一份輸入 tuple 的副本.假設有如下的 function :
```
public class MyFunction extends BaseFunction {
public void execute(TridentTuple tuple, TridentCollector collector) {
for(int i=0; i < tuple.getInteger(0); i++) {
collector.emit(new Values(i));
}
}
}
```
假設有個叫 "mystream" 的 stream (流), 該流中有如下 tuple ( tuple 的字段為["a", "b", "c"] ):
```
[1, 2, 3]
[4, 1, 6]
[3, 0, 8]
```
如果您運行下面的代碼:
```
mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))
```
則 resulting tuples (輸出 tuple )中的字段為 ["a", "b", "c", "d"], 如下所示:
```
[1, 2, 3, 0]
[1, 2, 3, 1]
[4, 1, 6, 0]
```
### Filters
Filters 收到一個輸入 tuple , 并決定是否保留該 tuple .假設nin擁有這個 filters:
```
public class MyFilter extends BaseFilter {
public boolean isKeep(TridentTuple tuple) {
return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;
}
}
```
現在, 假設您有如下這些 tuple , 包含字段 ["a", "b", "c"]:
```
[1, 2, 3]
[2, 1, 1]
[2, 3, 4]
```
如果您運行如下代碼:
```
mystream.filter(new MyFilter())
```
則得到的 resulting tuples (結果 tuples)為:
```
[1, 2, 3]
```
### map and flatMap
`map` 返回一個 stream , 它包含將給定的 mapping function (映射函數)應用到 stream 的 tuples 的結果. 這個可以用來對 tuples 應用 one-one transformation (一一變換).
例如, 如果有一個 stream of words (單詞流), 并且您想將其轉換為 stream of upper case words (大寫字母的流), 你可以定義一個 mapping function (映射函數)如下,
```
public class UpperCase extends MapFunction {
@Override
public Values execute(TridentTuple input) {
return new Values(input.getString(0).toUpperCase());
}
}
```
然后可以將 mapping function (映射函數)應用于 stream 以產生 stream of uppercase words (大寫字的流).
```
mystream.map(new UpperCase())
```
`flatMap` 類似于 `map` , 但具有將 one-to-many transformation (一對多變換)應用于 values of the stream (流的值)的效果, 然后將所得到的元素 flattening (平坦化)為新的 stream .
例如, 如果有 stream of sentences (句子流), 并且您想將其轉換成 stream of words (單詞流), 你可以定義一個 flatMap 函數如下,
```
public class Split extends FlatMapFunction {
@Override
public Iterable<Values> execute(TridentTuple input) {
List<Values> valuesList = new ArrayList<>();
for (String word : input.getString(0).split(" ")) {
valuesList.add(new Values(word));
}
return valuesList;
}
}
```
然后可以將 flatMap 函數應用于 stream of sentences (句子流)以產生一個 stream of words (單詞流),
```
mystream.flatMap(new Split())
```
當然這些操作可以被 chained (鏈接), 因此可以從如下的 stream of sentences (句子流)中獲得 stream of uppercase words (大寫字的流),
```
mystream.flatMap(new Split()).map(new UpperCase())
```
如果不將 output fields (輸出字段)作為 parameter (參數)傳遞, 則 map 和 flatMap 會將 input fields (輸入字段)保留為 output fields (輸出字段).
如果要使用 MapFunction 或 FlatMapFunction 使用 new output fields (新的輸出字段)替換 old fields (舊字段), 您可以使用附加的 Fields 參數調用 map/flatMap , 如下所示,
```
mystream.map(new UpperCase(), new Fields("uppercased"))
```
Output stream (輸出流)只有一個 output field (輸出字段) "uppercased" , 而不管以前的流有什么輸出字段. 同樣的事情適用于 flatMap, 所以以下是有效的,
```
mystream.flatMap(new Split(), new Fields("word"))
```
### peek
`peek` 可用于在每個 trident tuple 流過 stream 時對其執行 additional action (附加操作). ? 這可能對于在流經 pipeline 中 certain point (某一點)的元組來 debugging (調試) tuples 是有用的.
例如, 下面的代碼將打印在將這些單詞轉換為 `groupBy` 之前將單詞轉換為大寫的結果 `java mystream.flatMap(new Split()).map(new UpperCase()) .peek(new Consumer() { @Override public void accept(TridentTuple input) { System.out.println(input.getString(0)); } }) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))`
### min and minBy
`min` 和 `minBy` operations (操作)在 trident stream 中的 a batch of tuples (一批元組)的每個 partition (分區)上返回 minimum value (最小值).
假設 trident stream 包含字段 ["device-id", "count"] 和 partitions of tuples (元組的以下分區)
```
Partition 0:
[123, 2]
[113, 54]
[23, 28]
[237, 37]
[12, 23]
[62, 17]
[98, 42]
Partition 1:
[64, 18]
[72, 54]
[2, 28]
[742, 71]
[98, 45]
[62, 12]
[19, 174]
Partition 2:
[27, 94]
[82, 23]
[9, 86]
[53, 71]
[74, 37]
[51, 49]
[37, 98]
```
`minBy` operation (操作)可以應用在上面的 stream of tuples (元組流)中, 如下所示, 這導致在每個 partition (分區)中以最小值 `count` field (字段)發出 tuples .
```
mystream.minBy(new Fields("count"))
```
上述代碼在上述 partitions (分區)上的結果是:
```
Partition 0:
[123, 2]
Partition 1:
[62, 12]
Partition 2:
[82, 23]
```
您可以在 Stream 上查看其他 `min` 和 `minBy` 操作 `java public <T> Stream minBy(String inputFieldName, Comparator<T> comparator) public Stream min(Comparator<TridentTuple> comparator)` 下面的示例顯示了如何使用這些 API 來使用 tuple 上的 respective Comparators (相應比較器)來找到 minimum (最小值).
```
FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, Vehicle.generateVehicles(20));
TridentTopology topology = new TridentTopology();
Stream vehiclesStream = topology.newStream("spout1", spout).
each(allFields, new Debug("##### vehicles"));
Stream slowVehiclesStream =
vehiclesStream
.min(new SpeedComparator()) // Comparator w.r.t speed on received tuple.
.each(vehicleField, new Debug("#### slowest vehicle"));
vehiclesStream
.minBy(Vehicle.FIELD_NAME, new EfficiencyComparator()) // Comparator w.r.t efficiency on received tuple.
.each(vehicleField, new Debug("#### least efficient vehicle"));
```
這些 API 的示例應用程序可以位于 [TridentMinMaxOfDevicesTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java) 和 [TridentMinMaxOfVehiclesTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java) .
### max and maxBy
`max` 和 `maxBy` operations (操作)在 trident stream 中的一 batch of tuples (批元組)的每個 partition (分區)上返回 maximum (最大值).
假設 trident stream 包含上述部分所述的字段 ["device-id", "count"] .
`max` 和 `maxBy` operations (操作)可以應用于上面的 stream of tuples (元組流), 如下所示, 這導致每個分區的最大值為 `count` 字段的元組.
```
mystream.maxBy(new Fields("count"))
```
上述代碼在上述 partitions (分區)上的結果是:
```
Partition 0:
[113, 54]
Partition 1:
[19, 174]
Partition 2:
[37, 98]
```
您可以在 Stream 上查看其他 `max` 和 `maxBy` 函數
```
public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator)
public Stream max(Comparator<TridentTuple> comparator)
```
下面的示例顯示了如何使用這些 API 來使用元組上的 respective Comparators (相應比較器)來找到 maximum (最大值).
```
FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, Vehicle.generateVehicles(20));
TridentTopology topology = new TridentTopology();
Stream vehiclesStream = topology.newStream("spout1", spout).
each(allFields, new Debug("##### vehicles"));
vehiclesStream
.max(new SpeedComparator()) // Comparator w.r.t speed on received tuple.
.each(vehicleField, new Debug("#### fastest vehicle"))
.project(driverField)
.each(driverField, new Debug("##### fastest driver"));
vehiclesStream
.maxBy(Vehicle.FIELD_NAME, new EfficiencyComparator()) // Comparator w.r.t efficiency on received tuple.
.each(vehicleField, new Debug("#### most efficient vehicle"));
```
這些 API 的示例應用程序可以位于 [TridentMinMaxOfDevicesTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java) 和 [TridentMinMaxOfVehiclesTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java)
### Windowing
Trident streams 可以 batches (批處理)同一個 windowing (窗口)的元組, 并將 aggregated result (聚合結果)發送到下一個 operation (操作). 有 2 種支持的 windowing (窗口)是基于 processing time (處理時間)或 tuples count (元組數): 1\. Tumbling window 2\. Sliding window
#### Tumbling window
基于 processing time (處理時間)或 count (計數), 元組在 single window (單個窗口)中分組. 任何 tuple (元組)只屬于其中一個 windows (窗口).
```
/**
* Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples.
*/
public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory,
Fields inputFields, Aggregator aggregator, Fields functionFields);
/**
* Returns a stream of tuples which are aggregated results of a window that tumbles at duration of {@code windowDuration}
*/
public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory,
Fields inputFields, Aggregator aggregator, Fields functionFields);
```
#### Sliding window
每個 sliding interval (滑動間隔), Tuples (元組)被分組在 windows (窗口)和 window slides 中.元組可以屬于多個 window (窗口).
```
/**
* Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples
* and slides the window after {@code slideCount}.
*/
public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory,
Fields inputFields, Aggregator aggregator, Fields functionFields);
/**
* Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slidingInterval}
* and completes a window at {@code windowDuration}
*/
public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval,
WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
```
tumbling 和 sliding windows 的示例可以在 [這里](Windowing.html) 被找到.
#### 通用 windowing API
以下是通用的 windowing API, 它為任何支持的 windowing 配置提供了 `WindowConfig` .
```
public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields,
Aggregator aggregator, Fields functionFields)
```
`windowConfig` 可以是下面的任何一個. - `SlidingCountWindow.of(int windowCount, int slidingCount)` - `SlidingDurationWindow.of(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingDuration)` - `TumblingCountWindow.of(int windowLength)` - `TumblingDurationWindow.of(BaseWindowedBolt.Duration windowLength)`
Trident windowing APIs 需要 `WindowsStoreFactory` 來存儲接收的 tuples 和 aggregated values (聚合值). 目前, HBase 的基本實現由 `HBaseWindowsStoreFactory` 提供. 可以進一步擴展以解決各自的用途. 使用 `HBaseWindowStoreFactory` 進行 windowing 的例子可以在下面看到.
```
// window-state table should already be created with cf:tuples column
HBaseWindowsStoreFactory windowStoreFactory = new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
new Values("how many apples can you eat"), new Values("to be or not to be the person"));
spout.setCycle(true);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
new Split(), new Fields("word"))
.window(TumblingCountWindow.of(1000), windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count"))
.peek(new Consumer() {
@Override
public void accept(TridentTuple input) {
LOG.info("Received tuple: [{}]", input);
}
});
StormTopology stormTopology = topology.build();
```
可以在 [這里](javadocs/org/apache/storm/trident/Stream.html) 中找到本節中所有上述 API 的詳細說明.
#### 示例應用程序
這些 API 的示例應用程序位于 [TridentHBaseWindowingStoreTopology](http://github.com/apache/storm/blob/master%0A/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java) 和 [TridentWindowingInmemoryStoreTopology](http://github.com/apache/storm/blob/master%0A/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java)
### partitionAggregate
partitionAggregate 在每個 batch of tuples (批量元組) partition 上執行一個 function 操作(實際上是聚合操作), 但它又不同于上面的 functions 操作, partitionAggregate 的輸出 tuple 將會取代收到的輸入 tuple , 如下面的例子:
```
mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
```
假設 input stream 包括字段 ["a", "b"] , 并有下面的 partitions of tuples (元組 partitions ):
```
Partition 0:
["a", 1]
["b", 2]
Partition 1:
["a", 3]
["c", 8]
Partition 2:
["e", 1]
["d", 9]
["d", 10]
```
則這段代碼的 output stream 包含如下 tuple , 且只有一個 "sum" 的字段:
```
Partition 0:
[3]
Partition 1:
[11]
Partition 2:
[20]
```
上面代碼中的 new Sum() 實際上是一個 aggregator (聚合器), 定義一個聚合器有三種不同的接口:CombinerAggregator, ReducerAggregator 和 Aggregator .
下面是 CombinerAggregator 接口:
```
public interface CombinerAggregator<T> extends Serializable {
T init(TridentTuple tuple);
T combine(T val1, T val2);
T zero();
}
```
一個 CombinerAggregator 僅輸出一個 tuple (該 tuple 也只有一個字段).每收到一個輸入 tuple, CombinerAggregator 就會執行 init() 方法(該方法返回一個初始值), 并且用 combine() 方法匯總這些值, 直到剩下一個值為止(聚合值).如果 partition 中沒有 tuple, CombinerAggregator 會發送 zero() 的返回值.下面是聚合器 Count 的實現:
```
public class Count implements CombinerAggregator<Long> {
public Long init(TridentTuple tuple) {
return 1L;
}
public Long combine(Long val1, Long val2) {
return val1 + val2;
}
public Long zero() {
return 0L;
}
}
```
當使用 aggregate() 方法代替 partitionAggregate() 方法時, 就能看到 CombinerAggregation 帶來的好處.這種情況下, Trident 會自動優化計算:先做局部聚合操作, 然后再通過網絡傳輸 tuple 進行全局聚合.
ReducerAggregator 接口如下:
```
public interface ReducerAggregator<T> extends Serializable {
T init();
T reduce(T curr, TridentTuple tuple);
}
```
ReducerAggregator 使用 init() 方法產生一個初始值, 對于每個輸入 tuple , 依次迭代這個初始值, 最終產生一個單值輸出 tuple .下面示例了如何將 Count 定義為 ReducerAggregator:
```
public class Count implements ReducerAggregator<Long> {
public Long init() {
return 0L;
}
public Long reduce(Long curr, TridentTuple tuple) {
return curr + 1;
}
}
```
ReducerAggregator 也可以與 persistentAggregate 一起使用, 稍后你會看到的.
用于 performing aggregations (執行聚合)的最通用的接口是 Aggregator , 如下所示:
```
public interface Aggregator<T> extends Operation {
T init(Object batchId, TridentCollector collector);
void aggregate(T state, TridentTuple tuple, TridentCollector collector);
void complete(T state, TridentCollector collector);
}
```
Aggregator 可以輸出任意數量的 tuple , 且這些 tuple 的字段也可以有多個.執行過程中的任何時候都可以輸出 tuple (三個方法的參數中都有 collector ). Aggregator 的執行方式如下:
1. 處理每個 batch 之前調用一次 init() 方法, 該方法的返回值是一個對象, 代表 aggregation 的狀態, 并且會傳遞給下面的 aggregate() 和 complete() 方法.
2. 每個收到一個該 batch 中的輸入 tuple 就會調用一次 aggregate , 該方法中可以 update the state (更新狀態)(第一點中 init() 方法的返回值)并 optionally emit tuples (可選地發出元組).
3. 當該 batch partition 中的所有 tuple 都被 aggregate() 方法處理完之后調用 complete 方法.
注:理解 batch, partition 之間的區別將會更好的理解上面的幾個方法.
下面的代碼將 Count 作為 Aggregator 實現:
```
public class CountAgg extends BaseAggregator<CountState> {
static class CountState {
long count = 0;
}
public CountState init(Object batchId, TridentCollector collector) {
return new CountState();
}
public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
state.count+=1;
}
public void complete(CountState state, TridentCollector collector) {
collector.emit(new Values(state.count));
}
}
```
有時需要同時執行 multiple aggregators (多個聚合)操作, 這個可以使用 chaining (鏈式)操作完成:
```
mystream.chainedAgg()
.partitionAggregate(new Count(), new Fields("count"))
.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
.chainEnd()
```
這段代碼將會對每個 partition 執行 Count 和 Sum aggregators (聚合器), 并輸出一個tuple 字段 ["count", "sum"].
### stateQuery and partitionPersist
stateQuery 和 partitionPersist 分別 query (查詢)和 update (更新) sources of state (狀態源). 您可以在 [Trident state doc](Trident-state.html) 上閱讀有關如何使用它們.
### projection
經 Stream 中的 project 方法處理后的 tuple 僅保持指定字段(相當于過濾字段).如果你有一個包含字段 ["a", "b", "c", "d"] 的 stream , 執行下面代碼:
```
mystream.project(new Fields("b", "d"))
```
則 output stream 將僅包含 ["b", "d"] 字段.
## Repartitioning operations
Repartitioning operations (重新分區操作)運行一個函數來 change how the tuples are partitioned across tasks (更改元組在任務之間的分區). number of partitions (分區的數量)也可以由于 repartitioning (重新分區)而改變(例如, 如果并行提示在 repartioning (重新分配)后更大). Repartitioning (重新分區)需要 network transfer (網絡傳輸). 以下是 repartitioning functions (重新分區功能):
1. shuffle: 隨機將 tuple 均勻地分發到目標 partition 里.
2. broadcast: 每個 tuple 被復制到所有的目標 partition 里, 在 DRPC 中有用 — 你可以在每個 partition 上使用 stateQuery .
3. partitionBy: 對每個 tuple 選擇 partition 的方法是:(該 tuple 指定字段的 hash 值) mod (目標 partition 的個數), 該方法確保指定字段相同的 tuple 能夠被發送到同一個 partition .(但同一個 partition 里可能有字段不同的 tuple ).
4. global: 所有的 tuple 都被發送到同一個 partition .
5. batchGlobal: 確保同一個 batch 中的 tuple 被發送到相同的 partition 中.
6. partition: 此方法采用實現 org.apache.storm.grouping.CustomStreamGrouping 的自定義分區函數.
## Aggregation operations
Trident 中有 aggregate() 和 persistentAggregate() 方法對流進行聚合操作. aggregate() 在每個 batch 上獨立的執行, persistemAggregate() 對所有 batch 中的所有 tuple 進行聚合, 并將結果存入 state 源中.
aggregate() 對 Stream 做全局聚合, 當使用 ReduceAggregator 或者 Aggregator 聚合器時, 流先被重新劃分成一個大分區(僅有一個 partition ), 然后對這個 partition 做聚合操作;另外, 當使用 CombinerAggregator 時, Trident 首先對每個 partition 局部聚合, 然后將所有這些 partition 重新劃分到一個 partition 中, 完成全局聚合.相比而言, CombinerAggregator 更高效, 推薦使用.
下面的例子使用 aggregate() 對一個 batch 操作得到一個全局的 count:
```
mystream.aggregate(new Count(), new Fields("count"))
```
同在 partitionAggregate 中一樣, aggregate 中的聚合器也可以使用鏈式用法.但是, 如果你將一個 CombinerAggregator 鏈到一個非 CombinerAggregator 后面, Trident 就不能做局部聚合優化.
關于 persistentAggregate 的用法請參見 [Trident state doc](Trident-state.html) 一文.
## Operations on grouped streams
groupBy 操作先對流中的指定字段做 partitionBy 操作, 讓指定字段相同的 tuple 能被發送到同一個 partition 里.然后在每個 partition 里根據指定字段值對該分區里的 tuple 進行分組.下面演示了 groupBy 操作的過程:

如果你在一個 grouped stream 上做聚合操作, 聚合操作將會在每個 group (分組)內進行, 而不是整個 batch 上. GroupStream 類中也有 persistentAggregate 方法, 該方法聚合的結果將會存儲在一個 key 值為分組字段(即 groupBy 中指定的字段)的 [MapState](http://github.com/apache/storm/blob/master%0A/storm-core/src/jvm/org/apache/storm/trident/state/map/MapState.java) 中, 這些還是在 [Trident state doc](Trident-state.html) 一文中講解.
和普通的 stream 一樣, groupstream 上的聚合操作也可以使用 chained (鏈式語法).
## Merges and joins
最后一部分 API 內容是關于將幾個 stream 匯總到一起, 最簡單的匯總方法是將他們合并成一個 stream , 這個可以通過 TridentTopology 中的 merge 方法完成, 就像這樣:
```
topology.merge(stream1, stream2, stream3);
```
Trident 將把新的 merged stream 的 output fields 命名為第一個 stream 的 output fields (輸出字段).
另一種 combine streams (匯總方法)是使用 join (連接, 類似于 sql 中的連接操作, 需要有限的輸入).所以, 它們對于 infinite streams (無限流)是沒有意義的. Joins in Trident 僅適用于從 spout 發出的每個 small batch 中.
以下是包含字段 ["key", "val1", "val2"] 的 stream 和包含 ["x", "val1"] 的另一個 stream 之間的 join 示例:
```
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
```
使用 "key" 和 "x" 作為每個相應流的連接字段將 stream1 和 stream2 join 在一起.然后, Trident 要求命名 new stream 的所有 output fields , 因為 input streams 可能具有 overlapping field names (重疊的字段名稱).從 join 發出的 tuples 將包含:
1. list of join fields (連接字段列表).在這種情況下, "key" 對應于 stream1 的 "key" , stream2 對應于 "x" .
2. 接下來, 按照 streams 如何傳遞到 join 方法的順序, 所有流中的所有 non-join fields (非連接字段)的列表.在這種情況下, "a" 和 "b" 對應于來自 stream1 的 "val1" 和 "val2" , "c" 對應于來自 stream2 的 "val1" .
當來自不同 spouts 的 stream 之間發生 join 時, 這些 spouts 將與它們如何 emit batches (發出批次)同步.也就是說, 一批處理將包括 tuples from each spout (每個 spout 的元組).
你可能會想知道 - 你如何做一些像 "windowed join" 這樣的事情, 其中從 join 的一邊的 tuples 連接 join 另一邊的最后一個小時的 tuples .
為此, 您將使用 partitionPersist 和 stateQuery .join 一端的元組的最后一小時將被存儲并在 source of state (狀態源)中旋轉, 并由 join 字段鍵入.然后 stateQuery 將通過連接字段進行查找以執行 "join".
- Storm 基礎
- 概念
- Scheduler(調度器)
- Configuration
- Guaranteeing Message Processing
- 守護進程容錯
- 命令行客戶端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 綜述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 語言參考
- Storm SQL 內部實現
- Flux
- Storm 安裝和部署
- 設置Storm集群
- 本地模式
- 疑難解答
- 在生產集群上運行 Topology
- Maven
- 安全地運行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 資源感知調度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各種內部行為的 Metrics
- Windows 用戶指南
- Storm 中級
- 序列化
- 常見 Topology 模式
- Clojure DSL
- 使用沒有jvm的語言編輯storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 狀態管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 調試
- 動態日志級別設置
- Storm Logs
- 動態員工分析
- 拓撲事件檢查器
- Storm 與外部系統, 以及其它庫的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息隊列遙測傳輸) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高級
- 針對 Storm 定義一個不是 JVM 的 DSL
- 多語言協議
- Storm 內部實現
- 翻譯進度