# Trident State
Trident 擁有一流的 abstractions (抽象)用于 reading from (讀取)和 writing to (寫入) stateful sources (狀態源). state (狀態)可以是 internal to the topology (拓撲內部), 例如保存在內存中并由 HDFS 支持 - 或者 externally stored (外部存儲)在像 Memcached 或者 Cassandra 這樣的數據庫中. Trident API 在任何一種情況下都沒有區別.
Trident 以 fault-tolerant (容錯方式)來管理 state (狀態), 以便在 retries (重試)和 failures (失敗)時 state updates (狀態更新)是 idempotent (冪等)的. 這可以讓您理解 Trident topologies (Trident 拓撲結構), 就好像每個消息都被 exactly-once (精確處理一次).
在進行 state updates (狀態更新)時可能會有各種級別的 fault-tolerance (容錯能力). 在得到這些之前, 我們來看一個例子來說明實現 exactly-once semantics (完全一次性語義)所必需的技巧. 假設您正在對 stream (流)進行 count aggregation (計數聚合), 并希望將 running count (運行的計數)存儲在數據庫中. 現在假設您在數據庫中存儲一個 single value representing the count (表示計數的值), 并且每次處理 new tuple (新的元組)時, 都會增加 count (計數).
發生故障時, 將 replayed tuples (元組). 這會導致在執行 state updates (狀態更新)時出現問題(或任何帶有副作用的東西) - 您不知道如果您曾經成功根據此 tuple (元組)更新了 state (狀態). 也許你從來沒有處理過 tuple (元組), 在這種情況下你應該增加 count (計數). 也許你已經處理了 tuple , 并成功地增加了 count (計數), 但是 tuple (元組)在另一個步驟中處理失敗. 在這種情況下, 您不應該增加 count (計數). 或者也許你看到了 tuple , 但更新數據庫時出錯. 在這種情況下, 您 _應該_ update the database (更新數據庫).
通過將 count 存儲在數據庫中, 您不知道這個 tuple 是否被處理過. 所以你需要更多的信息才能做出正確的決定. Trident 提供以下 semantics (語義), 足以實現 exactly-once (完全一次性)處理語義:
1. Tuples (元組)被 small batches (小批)處理(參見 [the tutorial](Trident-tutorial.html))
2. Each batch of tuples (每批元組)給出一個唯一的id, 稱為 "transaction id" (txid) . 如果 batch (批次)被 replayed , 則給出完全相同的 txid .
3. State updates (狀態更新)跟隨 batches (批次)的順序. 也就是說, 在 batch 2 的狀態更新成功之前, batch 3 的狀態更新將不會被應用.
使用這些 primitives (原語), 您的 State implementation (State 實現)可以檢測該 batch (批次)的 tuples (元組)是否已被處理, 并采取適當的操作以一致的方式更新狀態. 您所采取的操作取決于您的 input spouts (輸入端口)提供的關于每 batch 中的內容的 exact semantics (確切語義). 有關 fault-tolerance (容錯)的 spouts 可能性有三種:"non-transactional (非事務性)", "transactional (事務)" 和 "opaque transactional (不透明事務)" . 同樣, 在 fault-tolerance (容錯)方面有三種可能 state : "non-transactional (非事務性)" , "transactional (事務)" 和 "opaque transactional (不透明事務)" . 我們來看看每個 spout 類型, 看看你可以實現什么樣的容錯.
## Transactional spouts
請記住, Trident 將 tuples (元組)作為 small batches (小批量)進行處理, 每個批處理都被賦予一個唯一的 transaction id (事務 ID). spouts 的屬性根據他們可以提供的每 batch (批次)中的內容的 guarantees (保證)而有所不同. transactional spout 具有以下屬性:
1. 給定 txid 的 Batches (批次)總是相同的. txid 的批次的 Replays 將與該 txid 的第一次發出批次時的一組元組完全相同.
2. batches of tuples (批量的元組)之間沒有 overlap (重疊)( tuples (元組)在一個批次或另一個批次中, 而不是在多個批次中).
3. 每個元組都是批量的(沒有元組被跳過).
這是一個理解起來非常簡單的類型的 spout , stream 被分為 fixed batches (固定批次), 從不改變. storm-contrib 具有 [一個 transactional spout 的實現](http://github.com/apache/storm/tree/master%0A/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java) 針對于 Kafka .
你可能會想 - 為什么你不總是使用一個 transactional spout ?它們簡單易懂. 你不能使用它的一個原因是因為它們不一定非常 fault-tolerant (容錯). 例如, TransactionalTridentKafkaSpout 的工作原理是 txid 的批處理將包含來自所有 Kafka partitions 的元組. 一旦批次被發出, 在未來的任何時候批次被重新發出, 必須發出完全相同的元組集合才能滿足 transactional spouts 的語義. 現在假設一個批處理從 TransactionalTridentKafkaSpout 發出, batch 無法處理, 同時一個 Kafka 節點宕機. 您現在無法像以前一樣 replaying 同一批次(因為節點關閉, topic 的某些 partitions 不可用), 并且處理將 halt (停止).
這就是為什么存在 "opaque transactional (不透明事務)" spout - 它們容許這樣的錯誤, 丟失 source nodes (源節點), 同時仍允許您實現 exactly-once (一次)處理語義. 我們將在下一節中介紹這些 spouts .
(一方面注意 - 一旦 Kafka 支持 replication (備份), 就有可能擁有對節點故障容錯的 transactional spouts , 但該功能尚不存在. )
在我們介紹 "opaque transactional (不透明事務)" spouts 之前, 我們來看看如何設計一個具有 exactly-once semantics (完全一致的語義的) transactional spouts 的 State implementation (狀態實現). 這種狀態稱為 "transactional state (事務狀態)" , 并且利用了任何給定的 txid 始終與完全相同的元組集合相關聯的事實.
假設您的 topology 計算 word count , 并且您想將 word counts 存儲在 key/value database 中. key 將是這個 word , value 將包含 count . 您已經看到, 僅將 count 存儲為 value 不足以知道是否已經處理了一批元組. 相反, 您可以做的是將 transaction id 存儲在數據庫中的 count 作為 atomic value (原子值). 然后, 當更新 count 時, 可以將數據庫中的 transaction id 與當前批處理的 transaction id 進行比較. 如果它們相同, 則跳過該更新 - 由于強大的排序, 您可以確定數據庫中的 value 包含當前 batch . 如果它們不同, 你會增加 count . 此邏輯工作原理是因為 txid 的批次不會更改, Trident 可確保 state updates 跟隨 batches 的順序.
考慮這個為什么它起作用的例子. 假設您正在處理由以下批次元組組成的 txid 3 :
```
["man"]
["man"]
["dog"]
```
假設數據庫當前持有以下 key/value 對:
```
man => [count=3, txid=1]
dog => [count=4, txid=3]
apple => [count=10, txid=2]
```
與 "man" 相關聯的 txid 為 txid 1 .由于當前的 txid 為 3 , 因此您可以肯定地知道這批元組在該 count 中未被顯示. 所以你可以繼續遞增 count 2 并更新 txid . 另一方面, "dog" 的 txid 與當前的 txid 相同. 所以你確定當前批次的增量已經在數據庫中被顯示為 "dog" key . 所以你可以跳過更新. 完成更新后, 數據庫如下所示:
```
man => [count=5, txid=3]
dog => [count=4, txid=3]
apple => [count=10, txid=2]
```
現在我們來看看 opaque transactional spouts (不透明事務 spouts), 以及如何設計這種 spout 的 states.
## Opaque transactional spouts
如前所述, opaque transactional spout 不能保證 txid 的元組的批次保持不變. opaque transactional spout 具有以下屬性:
1. 每個元組都被 exactly one batch (正確的一個批次中) _successfully_ 處理. 但是, 一個元組可能無法在一個批處理中處理, 然后在后續批處理中成功處理.
[OpaqueTridentKafkaSpout](http://github.com/apache/storm/tree/master%0A/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java) 是一個具有此屬性并且是對丟失 Kafka 節點有容錯性的 spout . 無論何時 OpaqueTridentKafkaSpout emit a batch (發出批次), 它將從最后一批完成發出的位置開始發出元組. 這就確保了永遠沒有任何一個 tuple 會被跳過或者被放在多個 batch 中被多次成功處理的情況.
使用 opaque transactional spouts , 如果數據庫中的 transaction id 與當前批處理的 transaction id 相同, 則不再可能使用 trick of skipping state updates (跳過狀態更新的技巧). 這是因為在 state updates (狀態更新)之間批處理可能已更改.
你可以做的是在數據庫中存儲更多的 state . 而不是在數據庫中存儲 value 和 transaction id , 而是將 value , transaction id 和上一個 value 存儲在數據庫中. 我們再次使用在數據庫中存儲 count 的示例. 假設您的批次的部分計數是 "2" , 現在是應用 state update (狀態更新)的時間. 假設數據庫中的 value 如下所示:
```
{ value = 4, prevValue = 1, txid = 2 }
```
假設你當前的 txid 是 3 , 與數據庫不同. 在這種情況下, 您將 "prevValue" 設置為 "value" , 通過 partial count 增加 "value" , 并更新 txid . 新的數據庫值將如下所示:
```
{ value = 6, prevValue = 4, txid = 3 }
```
現在假設你當前的 txid 是 2 , 等于數據庫中的內容. 現在, 您知道數據庫中的 "value" 包含來自當前 txid 的上一批次的更新, 但該批次可能已經不同, 因此您必須忽略它. 在這種情況下, 您的 partial count 將增加 "prevValue" , 以計算新的 "value" . 然后將數據庫中的 value 設置為:
```
{ value = 3, prevValue = 1, txid = 2 }
```
這是因為 Trident 提供的批次的 strong ordering (強大順序). 一旦 Trident 移動到新的批次進行狀態更新, 它將永遠不會返回到上一批. 而且由于 opaque transactional spouts 保證批次之間不 overlap (重疊) - 每個元組都被一個批次成功處理 - 您可以根據先前的 value 安全地進行更新.
## Non-transactional spouts
Non-transactional spouts 不對每批中的內容提供任何保證. 所以它可能是 at-most-once (最多一次)的處理, 在這種情況下, 元組不會在失敗的批次后重試. 或者它可能 at-least-once (至少處理一次), 其中可以通過多個批次成功處理元組. 沒有辦法為這種 spout 實現 exactly-once semantics (完全一次性語義).
## spout 和 state type 的匯總
此圖顯示了 spouts / states 的哪些組合可以實現一次消息傳遞語義:

Opaque transactional states 具有最強的 fault-tolerance (容錯能力), 但這需要以 txid 和兩個 values 存儲在數據庫中為代價. Transactional states 在數據庫中需要較少的 state , 但僅適用于 transactional spouts . 最后, non-transactional states 在數據庫中需要最少的 state , 但不能實現 exactly-once semantics (一次性語義).
您選擇的 state 和 spout types 是容錯和存儲成本之間的折中, 最終您的應用程序要求將決定哪種組合適合您.
## State APIs
您已經看到了完成 exactly-once semantics (完全一次語義)所需要的復雜性. Trident 的好處是它將所有容錯邏輯內部化 - 作為一個用戶, 您不必處理比較 txids , 在數據庫中存儲多個值或類似的內容. 你可以這樣編寫代碼:
```
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count"))
.parallelismHint(6);
```
管理 opaque transactional state logic 所需的所有 logic 都內在于 MemcachedState.opaque 調用. 此外, 自動批量更新以最小化到數據庫的 roundtrips (往返行程).
基本 State interface (狀態接口)只有兩種方法:
```
public interface State {
void beginCommit(Long txid); // can be null for things like partitionPersist occurring off a DRPC stream
void commit(Long txid);
}
```
當狀態更新開始時, 您被告知, 當狀態更新結束時, 在每種情況下都被給予了 txid . Trident 對于你的 state 如何工作, 什么樣的方法有更新, 以及從中讀取什么樣的方法呢?
假設您有一個 home-grown database (本地生成的數據庫), 其中包含用戶位置信息, 并且您希望能夠從 Trident 訪問它. 您的 State implementation 將具有獲取和設置用戶信息的方法:
```
public class LocationDB implements State {
public void beginCommit(Long txid) {
}
public void commit(Long txid) {
}
public void setLocation(long userId, String location) {
// code to access database and set location
}
public String getLocation(long userId) {
// code to get location from database
}
}
```
然后, 您可以向 Trident 提供一個 StateFactory , 它可以在 Trident tasks 中創建 State 對象的實例. 您的 LocationDB 的 StateFactory 可能看起來像這樣:
```
public class LocationDBFactory implements StateFactory {
public State makeState(Map conf, int partitionIndex, int numPartitions) {
return new LocationDB();
}
}
```
Trident 提供 QueryFunction interface , 用于編寫查詢 source of state (狀態源)的Trident 操作, 以及 StateUpdater 接口, 用于編寫更新 source of state (狀態源)的 Trident 操作. 例如, 我們來寫一個操作 "QueryLocation" , 它查詢 LocationDB 的用戶位置. 我們先來看看如何在 topology 中使用它. 假設這種 topology 消耗了 userids 的 input stream:
```
TridentTopology topology = new TridentTopology();
TridentState locations = topology.newStaticState(new LocationDBFactory());
topology.newStream("myspout", spout)
.stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"))
```
現在我們來看看 QueryLocation 的實現如何:
```
public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
List<String> ret = new ArrayList();
for(TridentTuple input: inputs) {
ret.add(state.getLocation(input.getLong(0)));
}
return ret;
}
public void execute(TridentTuple tuple, String location, TridentCollector collector) {
collector.emit(new Values(location));
}
}
```
QueryFunction 的兩個步驟執行. 首先, Trident 將一批讀取合并在一起, 并將它們傳遞給 batchRetrieve . 在這種情況下, batchRetrieve 將接收 multiple user ids (多個用戶 ID ). batchRetrieve 預期返回與輸入元組列表大小相同的結果列表. 結果列表的第一個元素對應于第一個輸入元組的結果, 第二個是第二個輸入元組的結果, 依此類推.
你可以看到, 這個代碼沒有利用 Trident 的批處理, 因為它只是一次查詢一個 LocationDB . 所以寫一個更好的方法來編寫 LocationDB 就是這樣的:
```
public class LocationDB implements State {
public void beginCommit(Long txid) {
}
public void commit(Long txid) {
}
public void setLocationsBulk(List<Long> userIds, List<String> locations) {
// set locations in bulk
}
public List<String> bulkGetLocations(List<Long> userIds) {
// get locations in bulk
}
}
```
然后, 您可以像這樣編寫 QueryLocation 函數:
```
public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
List<Long> userIds = new ArrayList<Long>();
for(TridentTuple input: inputs) {
userIds.add(input.getLong(0));
}
return state.bulkGetLocations(userIds);
}
public void execute(TridentTuple tuple, String location, TridentCollector collector) {
collector.emit(new Values(location));
}
}
```
通過減少到數據庫的 roundtrips (往返行程), 此代碼將更加高效.
要 update state , 可以使用 StateUpdater interface . 這是一個 StateUpdater , 它使用新的位置信息來更新 LocationDB :
```
public class LocationUpdater extends BaseStateUpdater<LocationDB> {
public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) {
List<Long> ids = new ArrayList<Long>();
List<String> locations = new ArrayList<String>();
for(TridentTuple t: tuples) {
ids.add(t.getLong(0));
locations.add(t.getString(1));
}
state.setLocationsBulk(ids, locations);
}
}
```
以下是在 Trident topology 中使用此操作的方法:
```
TridentTopology topology = new TridentTopology();
TridentState locations =
topology.newStream("locations", locationsSpout)
.partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater())
```
partitionPersist 操作更新 source of state (狀態源). StateUpdater 收到該 State 和一批具有該 State 更新的元組. 該代碼只是從輸入元組中獲取用戶名和位置, 并將批量集合放入 States .
partitionPersist 返回表示由 Trident topology 更新的位置數據塊的 TridentState 對象. 然后, 您可以在 topology 中的其他地方的 stateQuery 操作中使用此 state .
您還可以看到 StateUpdaters 被賦予了 TridentCollector . 發送到這個 collector 的元組轉到 "new values stream" . 在這種情況下, 沒有什么有趣的可以發送到該 stream , 但是如果您在數據庫中進行更新 counts , 則可以將更新的 counts 發送到該 stream . 然后, 您可以通過 TridentState#newValuesStream 方法訪問 new values stream 以進一步處理.
## persistentAggregate
Trident 有另外一種更新 State 的方法叫做 persistentAggregate . 你在之前的 streaming word count 例子中應該已經見過了, 如下:
```
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
```
persistentAggregate 是在 partitionPersist 之上的另外一層抽象,它知道怎么去使用一個 Trident aggregator (Trident 聚合器)來更新 State . 在這個例子當中, 因為這是一個 grouped stream (分組流), Trident 會期待你提供的 state 是實現了 "MapState" 接口的. 用來進行 group 的字段會以 key 的形式存在于 State 當中, 聚合后的結果會以 value 的形式存儲在 State 當中. "MapState" 接口看上去如下所示:
```
public interface MapState<T> extends State {
List<T> multiGet(List<List<Object>> keys);
List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);
void multiPut(List<List<Object>> keys, List<T> vals);
}
```
當你在一個 non-grouped streams 上面進行 aggregations (聚合)的話, Trident 會期待你的 State 對象實現 "Snapshottable" 接口:
```
public interface Snapshottable<T> extends State {
T get();
T update(ValueUpdater updater);
void set(T o);
}
```
[MemoryMapState](http://github.com/apache/storm/blob/master%0A/storm-core/src/jvm/org/apache/storm/trident/testing/MemoryMapState.java) 和 [MemcachedState](https://github.com/nathanmarz/trident-memcached/blob//src/jvm/trident/memcached/MemcachedState.java) 分別實現了上面的 2 個接口.
## Implementing Map States
在 Trident 中實現 MapState 是非常簡單的, 它幾乎幫你做了所有的事情. OpaqueMap , TransactionalMap , 和 NonTransactionalMap 類實現了所有相關的邏輯, 包括容錯的邏輯. 你只需要將一個知道如何執行相應 key/values 的 multiGet 和 multiPuts 的 IBackingMap 的實現提供給這些類就可以了. IBackingMap 接口看上去如下所示:
```
public interface IBackingMap<T> {
List<T> multiGet(List<List<Object>> keys);
void multiPut(List<List<Object>> keys, List<T> vals);
}
```
OpaqueMap 會用 [OpaqueValue](http://github.com/apache/storm/blob/master%0A/storm-core/src/jvm/org/apache/storm/trident/state/OpaqueValue.java) 的 value 來調用 multiPut 方法, TransactionalMap 會提供 [TransactionalValue](http://github.com/apache/storm/blob/master%0A/storm-core/src/jvm/org/apache/storm/trident/state/TransactionalValue.java) 中的 value , 而 NonTransactionalMaps 只是簡單的把從 Topology 獲取的 object 傳遞給 multiPut .
Trident 還提供了一種 [CachedMap](http://github.com/apache/storm/blob/master%0A/storm-core/src/jvm/org/apache/storm/trident/state/map/CachedMap.java) 類來進行自動的LRU cache (緩存) map key/vals .
最后, Trident 提供了 [SnapshottableMap](http://github.com/apache/storm/blob/master%0A/storm-core/src/jvm/org/apache/storm/trident/state/map/SnapshottableMap.java) 類, 通過將 global aggregations (全局聚合)存儲到 fixed key (固定密鑰)中將一個 MapState 轉換成一個 Snapshottable 對象.
大家可以看看 [MemcachedState](https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java) 的實現, 從而學習一下怎樣將這些工具組合在一起形成一個高性能的 MapState 實現. MemcachedState 是允許你選擇使用 opaque transactional , transactional , 還是 non-transactional 語義的.
- Storm 基礎
- 概念
- Scheduler(調度器)
- Configuration
- Guaranteeing Message Processing
- 守護進程容錯
- 命令行客戶端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 綜述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 語言參考
- Storm SQL 內部實現
- Flux
- Storm 安裝和部署
- 設置Storm集群
- 本地模式
- 疑難解答
- 在生產集群上運行 Topology
- Maven
- 安全地運行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 資源感知調度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各種內部行為的 Metrics
- Windows 用戶指南
- Storm 中級
- 序列化
- 常見 Topology 模式
- Clojure DSL
- 使用沒有jvm的語言編輯storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 狀態管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 調試
- 動態日志級別設置
- Storm Logs
- 動態員工分析
- 拓撲事件檢查器
- Storm 與外部系統, 以及其它庫的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息隊列遙測傳輸) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高級
- 針對 Storm 定義一個不是 JVM 的 DSL
- 多語言協議
- Storm 內部實現
- 翻譯進度