# Trident 教程
Trident 是在 Storm 基礎上, 一個以實時計算為目標的 high-level abstraction (高度抽象). 它在提供處理大吞吐量數據能力(每秒百萬次消息)的同時, 也提供了低延時分布式查詢和 stateful stream processing (有狀態流式處理)的能力. 如果你對 Pig 和 Cascading 這種高級批處理工具很了解的話, 那么應該很容易理解 Trident , 因為他們之間很多的概念和思想都是類似的. Trident 提供了 joins , aggregations, grouping, functions, 以及 filters 等能力. 除此之外, Trident 還提供了一些專門的 primitives (原語), 從而在基于數據庫或者其他存儲的前提下來應付有狀態的遞增式處理. Trident 也提供一致性(consistent)、有且僅有一次(exactly-once)等語義, 這使得我們在使用 trident toplogy 時變得容易.
## 舉例說明
讓我們一起來看一個 Trident 的例子. 在這個例子中, 我們主要做了兩件事情:
1. 從一個 input stream 中讀取語句并計算每個單詞的個數
2. 提供查詢給定單詞列表中每個單詞當前總數的功能
為了說明的目的, 本例將從如下這樣一個無限的輸入流中讀取語句作為輸入:
```
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy"),
new Values("four score and seven years ago"),
new Values("how many apples can you eat"));
spout.setCycle(true);
```
這個 spout 會循環 sentences 數據集,不斷輸出 sentence stream , 下面的代碼會以這個 stream 作為輸入并計算每個單詞的個數
```
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
.parallelismHint(6);
```
在這段代碼中, 我們首先創建了一個 TridentTopology 對象, 該對象提供了相應的接口去 constructing Trident computations (構造 Trident 計算過程). TridentTopology 類中的 newStream 方法從 input source (輸入源)中讀取數據, 并在 topology 中創建一個新的數據流. 在這個例子中, 我們使用了上面定義的 FixedBatchSpout 對象作為 input source (輸入源). Input sources (輸入數據源)同樣也可以是如 Kestrel 或者 Kafka 這樣的隊列代理. Trident 會在 Zookeeper 中保存每個 input source(輸入源)一小部分 state 信息(關于它已經消費的數據的 metadata 信息)來追蹤數據的處理情況, "spout1" 字符串指定 Trident 應保留 metadata 信息到 Zookeeper 的哪個節點.
Trident 在處理輸入 stream 的時候會轉換成 batch (包含若干個 tuple )來處理. 比如說, 輸入的 sentence stream 可能會被拆分成如下的 batch :

一般來說, 這些小的 batch 中的 tuple 可能會在數千或者數百萬這樣的數量級, 這完全取決于你的 incoming throughput (輸入的吞吐量).
Trident 提供了一系列非常成熟的批處理 API 來處理這些小 batches . 這些 API 和你在 Pig 或者 Cascading 中看到的非常類似, 你可以做 groupby, join, aggregation, 執行 function 和 filter 等等. 當然, 獨立的處理每個小的 batch 并不是非常有趣的事情, 所以 Trident 提供了功能來實現 batch 之間的聚合,并可以將這些聚合的結果存儲到內存,Memcached, Cassandra 或者是一些其他的存儲中. 同時, Trident 還提供了非常好的功能來查詢這些數據源的實時狀態, 這些實時狀態可以被 Trident 更新, 同時這些狀態也可以是一個 independent source of state (獨立的狀態源).
回到我們的這個例子中來, spout 輸出了一包含單一字段 "sentence" stream . 在下一行, topology 使用了 Split 函數將 stream 拆分成一個個 tuple , Split 函數讀取 stream (輸入流)中的 "sentence" 字段并將其拆分成若干個 word tuple . 每一個 sentence tuple 可能會被轉換成多個 word tuple, 比如說 "the cow jumped over the moon" 會被轉換成 6 個 "word" tuples. 下面是 Split 的定義:
```
public class Split extends BaseFunction {
public void execute(TridentTuple tuple, TridentCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(new Values(word));
}
}
}
```
如你所見, 真的很簡單. 它只是簡單的根據空格拆分 sentence , 并將拆分出的每個單詞作為一個 tuple 輸出.
topology 計算完成后,會將計算結果持久化保存. 首先, word stream被根據 "word" 字段進行 group 操作, 然后每一個 group 使用 Count 聚合器進行持久化聚合. persistentAggregate 方法會幫助你把一個聚合的結果保存或者更新到狀態源中. 在這個例子中, 單詞的計數結果保存在內存中, 不過我們可以很簡單的把結果保存到其他的存儲類型中, 如 Memcached, Cassandra 等持久化存儲. 如果我們要把結果存儲到 Memcached 中, 只是簡單的使用下面這句話替換掉 persistentAggregate 這一行就可以(使用 [trident-memcached](https://github.com/nathanmarz/trident-memcached) ), 這當中的 "serverLocations" 是 Memcached cluster 的 host/ports (主機和端口號)列表:
```
.persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count"))
MemcachedState.transactional()
```
persistentAggregate 存儲的數據就是 stream 輸出的所有 batches 聚合的結果.
Trident 非常酷的一點就是它提供 fully fault-tolerant (完整的容錯性), exactly-once (處理一次且僅一次)的語義. 這就讓你可以很輕松的使用 Trident 來進行實時數據處理. Trident 會把狀態以某種形式持久化, 以至于錯誤發生時 retries is necessary, 但是Trident 不會對相同的源數據多次 update 數據庫.
persistentAggregate 方法會把數據流轉換成一個 TridentState 對象. 在這個例子當中, TridentState 對象代表了所有的單詞計數結果. 我們會使用這個 TridentState 對象來實現在計算過程中的分布式查詢部分.
上面的是 topology 中的第一部分, topology 的第二部分實現了一個低延時的單詞數量的分布式查詢. 這個查詢以一個用空格分割的單詞列表為輸入, 并返回這些單詞的總個數. 這些查詢就像普通的 RPC 調用那樣被執行的, 要說不同的話, 那就是他們在后臺是并行執行的. 下面是執行查詢的一個例子:
```
DRPCClient client = new DRPCClient("drpc.server.location", 3772);
System.out.println(client.execute("words", "cat dog the man");
// prints the JSON-encoded result, e.g.: "[[5078]]"
```
如你所見, 除了在 storm cluster 上并行執行之外, 這個查詢看上去就是一個普通的 RPC 調用. 這樣的簡單查詢的延時通常在 10 毫秒左右. 當然, 更復雜的 DRPC 調用可能會占用更長的時間, 但是延時很大程度上是取決于你給計算分配了多少資源.
Topology 中的分布式查詢部分實現如下所示:
```
topology.newDRPCStream("words")
.each(new Fields("args"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
.each(new Fields("count"), new FilterNull())
.aggregate(new Fields("count"), new Sum(), new Fields("sum"));
```
我們仍然是使用 TridentTopology 對象來創建 DRPC stream , 并且我們將這個函數命名為 "words" . 這個函數名會作為第一個參數在使用 DRPC Client 來執行查詢的時候用到.
每個 DRPC 請求會被當做處理 little batch 的 job, 這個 job 輸入一個代表請求的單一 tuple. 這個 tuple 包含了一個叫做 "args" 的字段, 在這個字段中保存了客戶端提供的查詢參數. 在這個例子中, 這個參數是一個以空格分割的單詞列表.
首先, 我們使用 Split 函數把傳入的請求參數拆分成獨立的單詞. 然后按照 word 對 stream 進行 group 操作, 之后就可以使用 stateQuery 操作查詢 topology 在第一部分生成的 TridentState對象. stateQuery 接受一個 source of state (state 源)(在這個例子中, 就是我們的 topology 所計算的單詞的個數的結果)以及一個用于查詢的函數作為輸入. 在這個例子中, 我們使用了 MapGet 函數來獲取每個單詞的出現個數. 由于 DRPC stream 是使用跟 TridentState 完全同樣的 group 方式(按照 "word" 字段進行 group by ), 每個單詞的查詢會被路由到 TridentState 對象的分區,TridentState 對象是用來管理和更新 word 計數結果的.
接下來, 我們用 FilterNull 這個過濾器將沒有 count 結果的 words 過濾掉, 并使用 Sum 這個聚合器將這些 count 累加起來得到結果. 最終, Trident 會自動把這個結果發送回等待的客戶端.
Trident 在如何最大程度地保證執行 topology(拓撲) 性能方面是非常智能的. 在 topology 中會自動的發生兩件非常有意思的事情:
1. 讀取和寫入狀態的操作 (比如說 stateQuery 和 persistentAggregate ) 會自動地批量處理到該狀態. 如果當前處理的 batch 中有 20 次 updates 需要被同步到存儲中.Trident 會自動的把這些操作匯總到一起, 只做一次讀和一次寫(在許多情況下, 您可以在 State implementation 中使用緩存來減少讀請求), 而不是進行 20 次讀 20 次寫的操作. 因此你可以在很方便的執行計算的同時, 保證了非常好的性能.
2. Trident 的聚合器已經是被優化的非常好了的. Trident 并不是簡單的把一個 group 中所有的 tuples 都發送到同一個機器上面進行聚合, 而是在發送之前已經進行過一次部分的聚合. 打個比方, Count 聚合器會先在每個 partition 上面進行 count , 然后把每個分片 count 匯總到一起就得到了最終的 count . 這個技術其實就跟 MapReduce 里面的 combiner 是一個思想.
讓我們再來看一下 Trident 的另外一個例子.
## Reach
這個例子是一個純粹的 DRPC topology , 這個 topology 會計算一個給定 URL 的 reach 值, reach 值是該 URL 對應頁面的推文能夠 Reach (送達)的用戶數量, 那么我們就把這個數量叫做這個 URL 的 reach . 要計算 reach , 你需要獲取轉發過這個推文的所有人, 然后找到所有該轉發者的粉絲, 并將這些粉絲去重, 最后就得到了去重后的用戶的數量. 如果把計算 reach 的整個過程都放在一臺機器上面, 就太困難了, 因為這會需要數千次數據庫調用以及千萬級別數量的 tuple . 如果使用 Storm 和 Trident , 你就可以把這些計算步驟在整個 cluster 中并行進行(具體哪些步驟, 可以參考 DRPC 介紹一文, 該文有介紹過 Reach 值的計算方法).
這個 topology 會讀取兩個 sources of state (state 源): 這是兩個 map 集合,一個將該 URL 映射到所有轉發該推文的用戶列表, 還有一個將用戶映射到該用戶的粉絲列表. topology 的定義如下:
```
TridentState urlToTweeters =
topology.newStaticState(getUrlToTweetersState());
TridentState tweetersToFollowers =
topology.newStaticState(getTweeterToFollowersState());
topology.newDRPCStream("reach")
.stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters"))
.each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))
.shuffle()
.stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers"))
.parallelismHint(200)
.each(new Fields("followers"), new ExpandList(), new Fields("follower"))
.groupBy(new Fields("follower"))
.aggregate(new One(), new Fields("one"))
.parallelismHint(20)
.aggregate(new Count(), new Fields("reach"));
```
這個 topology 使用 newStaticState 方法創建了 TridentState 對象來代表一個外部數據庫. 使用這個 TridentState 對象, 我們就可以在這個 topology 上面進行動態查詢了. 和所有的 sources of state 一樣, 在這些數據庫上面的查找會自動被批量執行, 從而最大程度的提升效率.
這個 topology 的定義是非常簡單的 – 它僅是一個 simple batch processing job (簡單的批處理的作業). 首先, 查詢 urlToTweeters 數據庫來得到轉發過這個 URL 的用戶列表. 這個查詢會返回一個 tweeter 列表, 因此我們使用 ExpandList 函數來把其中的每一個 tweeter 轉換成一個 tuple .
接下來, 我們來獲取每個 tweeter 的 follower . 我們使用 shuffle 來把要處理的 tweeter 均勻地分配到 topology 運行的每一個 worker 中并發去處理. 然后查詢 tweetersToFollowers 數據庫從而的到每個轉發者的 followers. 你可以看到我們為 topology 的這部分分配了很大的并行度, 這是因為這部分是整個 topology 中最耗資源的計算部分.
然后, 我們對這些粉絲進行去重和計數. 這分為如下兩步:首先, 通過 "follower" 字段對流進行 "group by" 分組, 并對每個 group 執行 "One" 聚合器. "One" 聚合器對每個 group 簡單的發送一個 tuple, 該 tuple 僅包含一個數字 "1". 然后, 將這些 "1" 加到一起, 得到去重后的粉絲集中的粉絲數. "One" 聚合器的定義如下:
```
public class One implements CombinerAggregator<Integer> {
public Integer init(TridentTuple tuple) {
return 1;
}
public Integer combine(Integer val1, Integer val2) {
return 1;
}
public Integer zero() {
return 1;
}
}
```
這是一個 "combiner aggregator (匯總聚合器)" , 它會在傳送結果到其他 worker 匯總之前進行局部匯總, 從而使性能最優. 同樣, Sum 被定義成一個匯總聚合器, 在 topology 的最后部分進行全局求和是高效的.
接下來讓我們一起來看看 Trident 的一些細節.
## Fields and tuples
Trident 的數據模型是 TridentTuple,它是一個 list. 在一個 topology 中, tuple 是在一系列的 operation (操作)中增量生成的. operation 一般以一組字段作為輸入并輸出一組 function fields (功能字段). Operation 的輸入字段經常是輸入 tuple 的一個子集, 而功能字段則是 operation 的輸出.
看下面這個例子. 假定你有一個叫做 "stream" 的 stream, 它包含了 "x" ,"y" 和 "z" 三個字段. 為了運行一個讀取 "y" 作為輸入的 MyFilter 過濾器 , 你可以這樣寫:
```
stream.each(new Fields("y"), new MyFilter())
```
假設 MyFilter 的實現是這樣的:
```
public class MyFilter extends BaseFilter {
public boolean isKeep(TridentTuple tuple) {
return tuple.getInteger(0) < 10;
}
}
```
這會保留所有 "y" 字段小于 10 的 tuples . 傳給 MyFilter 的 TridentTuple 輸入將只包含字段 "y" . 這里需要注意的是, 當選擇輸入字段時, Trident 只發送 tuple 的一個子集, 這個操作是非常高效的.
讓我們一起看一下 "function fields (功能字段)" 是怎樣工作的. 假定你有如下這個函數:
```
public class AddAndMultiply extends BaseFunction {
public void execute(TridentTuple tuple, TridentCollector collector) {
int i1 = tuple.getInteger(0);
int i2 = tuple.getInteger(1);
collector.emit(new Values(i1 + i2, i1 * i2));
}
}
```
這個函數接收兩個數作為輸入并輸出兩個新的值: 這兩個數的和與乘積. 假定你有一個 stream, 其中包含 "x","y" 和 "z" 三個字段. 你可以這樣使用這個函數:
```
stream.each(new Fields("x", "y"), new AddAndMultiply(), new Fields("added", "multiplied"));
```
輸出的功能字段被添加到輸入 tuple 后面, 因此這個時候, 每個 tuple 中將會有5個字段 "x", "y", "z", "added", 和 "multiplied". "added" 和 "multiplied" 對應于 AddAndMultiply 輸出的第一和第二個字段.
另外, 我們可以使用聚合器來用輸出字段來替換輸入 tuple . 如果你有一個 stream 包含字段 "val1" 和 "val2", 你可以這樣做:
```
stream.aggregate(new Fields("val2"), new Sum(), new Fields("sum"))
```
輸出流將會僅包含一個 tuple, 該 tuple 有一個 "sum" 字段, 這個 sum 字段就是一批 tuple 中 "val2" 字段的累積和.
但是若對 group by 之后的流進行該聚合操作, 則輸出 tuple 中包含分組字段和聚合器輸出的字段, 例如:
```
stream.groupBy(new Fields("val1"))
.aggregate(new Fields("val2"), new Sum(), new Fields("sum"))
```
這個例子中的輸出包含 "val1" 字段和 "sum" 字段.
## State
在實時計算領域的一個主要問題就是怎么樣來管理狀態,在面對錯誤和重試的時候,更新是冪等的. 消除錯誤的是不可能的, 當一個節點死掉, 或者一些其他的問題出現時, 這些 batch 需要被重新處理. 問題是-你怎樣做狀態更新(無論是外部數據庫還是 topology 內部的 State)來保證每一個消息被處理且只被處理一次?
這是一個很棘手的問題, 我們可以用接下來的例子進一步說明. 假定你在做一個你的 stream 的計數聚合, 并且你想要存儲運行時的 count 到一個數據庫中去. 如果你只是存儲這個 count 到數據庫中, 并且想要進行一次更新, 我們是沒有辦法知道同樣的狀態是不是以前已經被update過了的. 這次更新可能在之前就嘗試過, 并且已經成功的更新到了數據庫中, 不過在后續的步驟中失敗了. 還有可能是在上次更新數據庫的過程中失敗的, 這些你都不知道.
Trident 通過做下面兩件事情來解決這個問題:
1. 每一個 batch 被賦予一個唯一標識 id "transaction id". 如果一個 batch 被重試, 它將會擁有和之前同樣的 transaction id .
2. State updates (狀態更新)是按照 batch 的順序進行的(強順序). 也就是說, batch 3 的狀態更新必須等到 batch 2 的狀態更新成功之后才可以進行.
有了這 2 個原則, 你就可以達到有且只有一次更新的目標. 此時, 不是只將 count 存到數據庫中, 而是將 transaction id 和 count 作為原子值存到數據庫中. 當更新一個 count 的時候, 需要比較數據庫中 transaction id 和當前 batch 的 ransaction id . 如果相同, 就跳過這次更新. 如果不同, 就更新這個 count .
當然, 你不需要在 topology 中手動處理這些邏輯, 這些邏輯已經被封裝在 State 的抽象中并自動進行. 你的 State object 也不需要自己去實現 transaction id 的跟蹤操作. 如果你想了解更多的關于如何實現一個 State 以及在容錯過程中的一些取舍問題, 可以參照 [這個文檔](/documentation/Trident-state.html).
一個 State 可以采用任何策略來存儲狀態, 它可以存儲到一個外部的數據庫, 也可以在內存中保持狀態并備份到 HDFS 中. State 并不需要永久的保持狀態. 比如說, 你有一個內存版的 State 實現, 它保存最近 X 個小時的數據并丟棄老的數據. 可以把 [Memcached integration](https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java) 作為例子來看看 State 的實現.
## Trident topologies 的執行
Trident 的 topology 會被編譯成盡可能高效的 Storm topology . 只有在需要對數據進行 repartition (重新分配)的時候(如 groupby 或者 shuffle )才會把 tuple 通過 network 發送出去, 如果你有一個 trident topology 如下:

它將會被編譯成如下的 Storm spouts/bolts:

## 小結
Trident 使得實時計算更加優雅. 你已經看到了如何使用 Trident 的 API 來完成大吞吐量的流式計算, 狀態維護, 低延時查詢等等功能. Trident 讓你在獲取最大性能的同時, 以更自然的一種方式進行實時計算.
- Storm 基礎
- 概念
- Scheduler(調度器)
- Configuration
- Guaranteeing Message Processing
- 守護進程容錯
- 命令行客戶端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 綜述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 語言參考
- Storm SQL 內部實現
- Flux
- Storm 安裝和部署
- 設置Storm集群
- 本地模式
- 疑難解答
- 在生產集群上運行 Topology
- Maven
- 安全地運行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 資源感知調度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各種內部行為的 Metrics
- Windows 用戶指南
- Storm 中級
- 序列化
- 常見 Topology 模式
- Clojure DSL
- 使用沒有jvm的語言編輯storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 狀態管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 調試
- 動態日志級別設置
- Storm Logs
- 動態員工分析
- 拓撲事件檢查器
- Storm 與外部系統, 以及其它庫的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息隊列遙測傳輸) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高級
- 針對 Storm 定義一個不是 JVM 的 DSL
- 多語言協議
- Storm 內部實現
- 翻譯進度