正如書中之前所提到的,使用Storm編程,可以通過調用ack和fail方法來確保一條消息的處理成功或失敗。不過當元組被重發時,會發生什么呢?你又該如何砍不會重復計算?
*Storm0.7.0*實現了一個新特性——事務性拓撲,這一特性使消息在語義上確保你可以安全的方式重發消息,并保證它們只會被處理一次。在不支持事務性拓撲的情況下,你無法在準確性,可擴展性,以空錯性上得到保證的前提下完成計算。
**NOTE:**事務性拓撲是一個構建于標準Storm?*spout*和*bolt*之上的抽象概念。
**設計**
在事務性拓撲中,Storm以并行和順序處理混合的方式處理元組。*spout*并行分批創建供*bolt*處理的元組(譯者注:下文將這種分批創建、分批處理的元組稱做批次)。其中一些*bolt*作為提交者以嚴格有序的方式提交處理過的批次。這意味著如果你有每批五個元組的兩個批次,將有兩個元組被*bolt*并行處理,但是直到提交者成功提交了第一個元組之后,才會提交第二個元組。?**NOTE:**?使用事務性拓撲時,數據源要能夠重發批次,有時候甚至要重復多次。因此確認你的數據源——你連接到的那個*spout*——具備這個能力。 這個過程可以被描述為兩個階段:?*處理階段*?純并行階段,許多批次同時處理。?*提交階段*?嚴格有序階段,直到批次一成功提交之后,才會提交批次二。 這兩個階段合起來稱為一個Storm事務。**NOTE:**?Storm使用zookeeper儲存事務元數據,默認情況下就是拓撲使用的那個zookeeper。你可以修改以下兩個配置參數鍵指定其它的zookeeper——transactional.zookeeper.servers和transactional.zookeeper.port。
**事務實踐**
下面我們要創建一個Twitter分析工具來了解事務的工作方式。我們從一個Redis數據庫讀取tweets,通過幾個*bolt*處理它們,最后把結果保存在另一個Redis數據庫的列表中。處理結果就是所有話題和它們的在tweets中出現的次數列表,所有用戶和他們在tweets中出現的次數列表,還有一個包含發起話題和頻率的用戶列表。 這個工具的拓撲見圖8-1。[](http://ifeve.com/getting-started-of-storm8-2/figure8-1/)? 圖8-1 拓撲概覽
正如你看到的,**TweetsTransactionalSpout**會連接你的tweet數據庫并向拓撲分發批次。**UserSplitterBolt**和**HashTagSplitterBolt**兩個*bolt*,從*spout*接收元組。**UserSplitterBolt**解析tweets并查找用戶——以@開頭的單詞——然后把這些單詞分發到名為*users*的自定義數據流組。**HashtagSplitterBolt**從tweet查找**#**開頭的單詞,并把它們分發到名為*hashtags*的自定義數據流組。第三個*bolt*,**UserHashtagJoinBolt**,接收前面提到的兩個數據流組,并計算具名用戶的一條tweet內的話題數量。為了計數并分發計算結果,這是個**BaseBatchBolt**(稍后有更多介紹)。
最后一個bolt——**RedisCommitterBolt**——接收以上三個*bolt*的數據流組。它為每樣東西計數,并在對一個批次完成處理時,把所有結果保存到redis。這是一種特殊的*bolt*,叫做提交者,在本章后面做更多講解。
用**TransactionalTopologyBuilder**構建拓撲,代碼如下:
| `01` | `TransactionalTopologyBuilder builder=` |
| `02` | `new`?`TransactionalTopologyBuilder(``"test"``,?``"spout"``,?``new`?`TweetsTransactionalSpout());` |
| `03` | ? |
| `04` | `builder.setBolt(``"users-splitter"``,?``new`?`UserSplitterBolt(),?``4``).shuffleGrouping(``"spout"``);` |
| `05` | `buildeer.setBolt(``"hashtag-splitter"``,?``new`?`HashtagSplitterBolt(),?``4``).shuffleGrouping(``"spout"``);` |
| `06` | ? |
| `07` | `builder.setBolt(``"users-hashtag-manager"``,?``new`?`UserHashtagJoinBolt(), r)` |
| `08` | `.fieldsGrouping(``"users-splitter"``,?``"users"``,?``new`?`Fields(``"tweet_id"``))` |
| `09` | `.fieldsGrouping(``"hashtag-splitter"``,?``"hashtags"``,?``new`?`Fields(``"tweet_id"``));` |
| `10` | ? |
| `11` | `builder.setBolt(``"redis-commiter"``,?``new`?`RedisCommiterBolt())` |
| `12` | `.globalGrouping(``"users-splitter"``,?``"users"``)` |
| `13` | `.globalGrouping(``"hashtag-splitter"``,?``"hashtags"``)` |
| `14` | `.globalGrouping(``"user-hashtag-merger"``);` |
接下來就看看如何在一個事務性拓撲中實現*spout*。
***Spout***
一個事務性拓撲的*spout*與標準*spout*完全不同。
| `1` | `public`?`class`?`TweetsTransactionalSpout?``extends`?`BaseTransactionalSpout<TransactionMetadata>{` |
正如你在這個類定義中看到的,TweetsTransactionalSpout繼承了帶范型的**BaseTransactionalSpout**。指定的范型類型的對象是事務元數據集合。它將在后面的代碼中用于從數據源分發批次。
在這個例子中,**TransactionMetadata**定義如下:
| `01` | `public`?`class`?`TransactionMetadata?``implements`?`Serializable {` |
| `02` | `private`?`static`?`final`?`long`?`serialVersionUID = 1L;` |
| `03` | `long`?`from;` |
| `04` | `int`?`quantity;` |
| `05` | ? |
| `06` | `public`?`TransactionMetadata(``long`?`from,?``int`?`quantity) {` |
| `07` | `this``.from = from;` |
| `08` | `this``.quantity = quantity;` |
| `09` | `}` |
| `10` | `}` |
該類的對象維護著兩個屬性**from**和**quantity**,它們用來生成批次。
*spout*的最后需要實現下面的三個方法:
| `01` | `@Override` |
| `02` | `public`?`ITransactionalSpout.Coordinator<TransactionMetadata> getCoordinator(` |
| `03` | `Map conf, TopologyContext context) {` |
| `04` | `return`?`new`?`TweetsTransactionalSpoutCoordinator();` |
| `05` | `}` |
| `06` | ? |
| `07` | `@Override` |
| `08` | `public`?`backtype.storm.transactional.ITransactionalSpout.Emitter<TransactionMetadata> getEmitter(Map conf, TopologyContext contest) {` |
| `09` | `return`?`new`?`TweetsTransactionalSpoutEmitter();` |
| `10` | `}` |
| `11` | ? |
| `12` | `@Override` |
| `13` | `public`?`void`?`declareOutputFields(OuputFieldsDeclarer declarer) {` |
| `14` | `declarer.declare(``new`?`Fields(``"txid"``,?``"tweet_id"``,?``"tweet"``));` |
| `15` | `}` |
**getCoordinator**方法,告訴Storm用來協調生成批次的類。**getEmitter**,負責讀取批次并把它們分發到拓撲中的數據流組。最后,就像之前做過的,需要聲明要分發的域。
**RQ類
**為了讓例子簡單點,我們決定用一個類封裝所有對Redis的操作。
| `01` | `public`?`class`?`RQ {` |
| `02` | `public`?`static`?`final`?`String NEXT_READ =?``"NEXT_READ"``;` |
| `03` | `public`?`static`?`final`?`String NEXT_WRITE =?``"NEXT_WRITE"``;` |
| `04` | ? |
| `05` | `Jedis jedis;` |
| `06` | ? |
| `07` | `public`?`RQ() {` |
| `08` | `jedis =?``new`?`Jedis(``"localhost"``);` |
| `09` | `}` |
| `10` | ? |
| `11` | `public`?`long`?`getavailableToRead(``long`?`current) {` |
| `12` | `return`?`getNextWrite() - current;` |
| `13` | `}` |
| `14` | ? |
| `15` | `public`?`long`?`getNextRead() {` |
| `16` | `String sNextRead = jedis.get(NEXT_READ);` |
| `17` | `if``(sNextRead ==?``null``) {` |
| `18` | `return`?`1``;` |
| `19` | `}` |
| `20` | `return`?`Long.valueOf(sNextRead);` |
| `21` | `}` |
| `22` | ? |
| `23` | `public`?`long`?`getNextWrite() {` |
| `24` | `return`?`Long.valueOf(jedis.get(NEXT_WRITE));` |
| `25` | `}` |
| `26` | ? |
| `27` | `public`?`void`?`close() {` |
| `28` | `jedis.disconnect();` |
| `29` | `}` |
| `30` | ? |
| `31` | `public`?`void`?`setNextRead(``long`?`nextRead) {` |
| `32` | `jedis.set(NEXT_READ,?``""``+nextRead);` |
| `33` | `}` |
| `34` | ? |
| `35` | `public`?`List<String> getMessages(``long`?`from,?``int`?`quantity) {` |
| `36` | `String[] keys =?``new`?`String[quantity];` |
| `37` | `for`?`(``int`?`i =?``0``; i < quantity; i++) {` |
| `38` | `keys[i] =?``""``+(i+from);` |
| `39` | `}` |
| `40` | `return`?`jedis.mget(keys);` |
| `41` | `}` |
| `42` | `}` |
仔細閱讀每個方法,確保自己理解了它們的用處。
**協調者Coordinator
**下面是本例的協調者實現。
| `01` | `public`?`static`?`class`?`TweetsTransactionalSpoutCoordinator?``implements``ITransactionalSpout.Coordinator<TransactionMetadata> {` |
| `02` | `TransactionMetadata lastTransactionMetadata;` |
| `03` | `RQ rq =?``new`?`RQ();` |
| `04` | `long`?`nextRead =?``0``;` |
| `05` | ? |
| `06` | `public`?`TweetsTransactionalSpoutCoordinator() {` |
| `07` | `nextRead = rq.getNextRead();` |
| `08` | `}` |
| `09` | ? |
| `10` | `@Override` |
| `11` | `public`?`TransactionMetadata initializeTransaction(BigInteger txid, TransactionMetadata prevMetadata) {` |
| `12` | `long`?`quantity = rq.getAvailableToRead(nextRead);` |
| `13` | `quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity;` |
| `14` | `TransactionMetadata ret =?``new`?`TransactionMetadata(nextRead, (``int``)quantity);` |
| `15` | `nextRead += quantity;` |
| `16` | `return`?`ret;` |
| `17` | `}` |
| `18` | ? |
| `19` | `@Override` |
| `20` | `public`?`boolean`?`isReady() {` |
| `21` | `return`?`rq.getAvailableToRead(nextRead) >?``0``;` |
| `22` | `}` |
| `23` | ? |
| `24` | `@Override` |
| `25` | `public`?`void`?`close() {` |
| `26` | `rq.close();` |
| `27` | `}` |
| `28` | `}` |
值得一提的是,*在整個拓撲中只會有一個提交者實例*。創建提交者實例時,它會從redis讀取一個從1開始的序列號,這個序列號標識要讀取的tweet下一條。
第一個方法是**isReady**。在**initializeTransaction**之前調用它確認數據源已就緒并可讀取。此方法應當相應的返回**true**或**false**。在此例中,讀取tweets數量并與已讀數量比較。它們之間的不同就在于可讀tweets數。如果它大于0,就意味著還有tweets未讀。
最后,執行**initializeTransaction**。正如你看到的,它接收**txid**和**prevMetadata**作為參數。第一個參數是Storm生成的事務ID,作為批次的惟一性標識。**prevMetadata**是協調器生成的前一個事務元數據對象。
在這個例子中,首先確認有多少tweets可讀。只要確認了這一點,就創建一個TransactionMetadata對象,標識讀取的第一個tweet(譯者注:對象屬性**from**),以及讀取的tweets數量(譯者注:對象屬性**quantity**)。
元數據對象一經返回,Storm把它跟**txid**一起保存在zookeeper。這樣就確保了一旦發生故障,Storm可以利用分發器(譯者注:**Emitter**,見下文)重新發送批次。
**Emitter**
創建事務性*spout*的最后一步是實現分發器(Emitter)。實現如下:
| `01` | `public`?`static`?`class`?`TweetsTransactionalSpoutEmitter``implements`?`ITransactionalSpout.Emitter<TransactionMetadata> {` |
| `02` | ? |
| `03` | `</pre>` |
| `04` | `<pre>????RQ rq =?``new`?`RQ();</pre>` |
| `05` | `<pre>????``public`?`TweetsTransactionalSpoutEmitter() {}</pre>` |
| `06` | `<pre>????``@Override` |
| `07` | `public`?`void`?`emitBatch(TransactionAttempt tx, TransactionMetadata coordinatorMeta, BatchOutputCollector collector) {` |
| `08` | `rq.setNextRead(coordinatorMeta.from+coordinatorMeta.quantity);` |
| `09` | `List<String> messages = rq.getMessages(coordinatorMeta.from, <span style=``"font-family: Georgia, 'Times New Roman', 'Bitstream Charter', Times, serif; font-size: 13px; line-height: 19px;"``>coordinatorMeta.quantity);` |
| `10` | `</span>????????``long`?`tweetId = coordinatorMeta.from;` |
| `11` | `for`?`(String message : messages) {` |
| `12` | `collector.emit(``new`?`Values(tx,?``""``+tweetId, message));` |
| `13` | `tweetId++;` |
| `14` | `}` |
| `15` | `}` |
| `16` | ? |
| `17` | `@Override` |
| `18` | `public`?`void`?`cleanupBefore(BigInteger txid) {}` |
| `19` | ? |
| `20` | `@Override` |
| `21` | `public`?`void`?`close() {` |
| `22` | `rq.close();` |
| `23` | `}</pre>` |
| `24` | `<pre>` |
| `25` | `}` |
分發器從數據源讀取數據并從數據流組發送數據。分發器應當問題能夠為相同的事務id和事務元數據發送相同的批次。這樣,如果在處理批次的過程中發生了故障,Storm就能夠利用分發器重復相同的事務id和事務元數據,并確保批次已經重復過了。Storm會在**TransactionAttempt**對象里為嘗試次數增加計數(譯者注:**attempt id**)。這樣就能知道批次已經重復過了。
在這里**emitBatch**是個重要方法。在這個方法中,使用傳入的元數據對象從redis得到tweets,同時增加redis維持的已讀tweets數。當然它還會把讀到的tweets分發到拓撲。
***Bolts***
首先看一下這個拓撲中的標準*bolt*:
| `01` | `public`?`class`?`UserSplitterBolt?``implements`?`IBasicBolt{` |
| `02` | `private`?`static`?`final`?`long`?`serialVersionUID = 1L;` |
| `03` | ? |
| `04` | `@Override` |
| `05` | `public`?`void`?`declareOutputFields(OutputFieldsDeclarer declarer) {` |
| `06` | `declarer.declareStream(``"users"``,?``new`?`Fields(``"txid"``,``"tweet_id"``,``"user"``));` |
| `07` | `}` |
| `08` | ? |
| `09` | `@Override` |
| `10` | `public`?`Map<String, Object> getComponentConfiguration() {` |
| `11` | `return`?`null``;` |
| `12` | `}` |
| `13` | ? |
| `14` | `@Override` |
| `15` | `public`?`void`?`prepare(Map stormConf, TopologyContext context) {}` |
| `16` | ? |
| `17` | `@Override` |
| `18` | `public`?`void`?`execute(Tuple input, BasicOutputCollector collector) {` |
| `19` | `String tweet = input.getStringByField(``"tweet"``);` |
| `20` | `String tweetId = input.getStringByField(``"tweet_id"``);` |
| `21` | `StringTokenizer strTok =?``new`?`StringTokenizer(tweet,?``" "``);` |
| `22` | `HashSet<String> users =?``new`?`HashSet<String>();` |
| `23` | ? |
| `24` | `while``(strTok.hasMoreTokens()) {` |
| `25` | `String user = strTok.nextToken();` |
| `26` | ? |
| `27` | `//確保這是個真實的用戶,并且在這個tweet中沒有重復` |
| `28` | `if``(user.startsWith(``"@"``) && !users.contains(user)) {` |
| `29` | `collector.emit(``"users"``,?``new`?`Values(tx, tweetId, user));` |
| `30` | `users.add(user);` |
| `31` | `}` |
| `32` | `}` |
| `33` | `}` |
| `34` | ? |
| `35` | `@Override` |
| `36` | `public`?`void`?`cleanup(){}` |
| `37` | `}` |
正如本章前面提到的,**UserSplitterBolt**接收元組,解析tweet文本,分發@開頭的單詞————tweeter用戶。**HashtagSplitterBolt**的實現也非常相似。
| `01` | `public`?`class`?`HashtagSplitterBolt?``implements`?`IBasicBolt{` |
| `02` | `private`?`static`?`final`?`long`?`serialVersionUID = 1L;` |
| `03` | ? |
| `04` | `@Override` |
| `05` | `public`?`void`?`declareOutputFields(OutputFieldsDeclarer declarer) {` |
| `06` | `declarer.declareStream(``"hashtags"``,?``new`?`Fields(``"txid"``,``"tweet_id"``,``"hashtag"``));` |
| `07` | `}` |
| `08` | ? |
| `09` | `@Override` |
| `10` | `public`?`Map<String, Object> getComponentConfiguration() {` |
| `11` | `return`?`null``;` |
| `12` | `}` |
| `13` | ? |
| `14` | `@Override` |
| `15` | `public`?`void`?`prepare(Map stormConf, TopologyContext context) {}` |
| `16` | ? |
| `17` | `@Oerride` |
| `18` | `public`?`void`?`execute(Tuple input, BasicOutputCollector collector) {` |
| `19` | `String tweet = input.getStringByField(``"tweet"``);` |
| `20` | `String tweetId = input.getStringByField(``"tweet_id"``);` |
| `21` | `StringTokenizer strTok =?``new`?`StringTokenizer(tweet,?``" "``);` |
| `22` | `TransactionAttempt tx = (TransactionAttempt)input.getValueByField(``"txid"``);` |
| `23` | `HashSet<String> words =?``new`?`HashSet<String>();` |
| `24` | ? |
| `25` | `while``(strTok.hasMoreTokens()) {` |
| `26` | `String word = strTok.nextToken();` |
| `27` | ? |
| `28` | `if``(word.startsWith(``"#"``) && !words.contains(word)){` |
| `29` | `collector.emit(``"hashtags"``,?``new`?`Values(tx, tweetId, word));` |
| `30` | `words.add(word);` |
| `31` | `}` |
| `32` | `}` |
| `33` | `}` |
| `34` | ? |
| `35` | `@Override` |
| `36` | `public`?`void`?`cleanup(){}` |
| `37` | `}` |
現在看看**UserHashTagJoinBolt**的實現。首先要注意的是它是一個**BaseBatchBolt**。這意味著,**execute**方法會操作接收到的元組,但是不會分發新的元組。批次完成時,Storm會調用**finishBatch**方法。
| `01` | `public`?`void`?`execute(Tuple tuple) {` |
| `02` | `String source = tuple.getSourceStreamId();` |
| `03` | `String tweetId = tuple.getStringByField(``"tweet_id"``);` |
| `04` | ? |
| `05` | `if``(``"hashtags"``.equals(source)) {` |
| `06` | `String hashtag = tuple.getStringByField(``"hashtag"``);` |
| `07` | `add(tweetHashtags, tweetId, hashtag);` |
| `08` | `}?``else`?`if``(``"users"``.equals(source)) {` |
| `09` | `String user = tuple.getStringByField(``"user"``);` |
| `10` | `add(userTweets, user, tweetId);` |
| `11` | `}` |
| `12` | `}` |
既然要結合tweet中提到的用戶為出現的所有話題計數,就需要加入前面的*bolts*創建的兩個數據流組。這件事要以批次為單位進程,在批次處理完成時,調用**finishBatch**方法。
| `01` | `@Override` |
| `02` | `public`?`void`?`finishBatch() {` |
| `03` | `for``(String user:userTweets.keySet()){` |
| `04` | `Set<String> tweets = getUserTweets(user);` |
| `05` | `HashMap<String, Integer> hashtagsCounter =?``new`?`HashMap<String, Integer>();` |
| `06` | `for``(String tweet:tweets){` |
| `07` | `Set<String> hashtags=getTweetHashtags(tweet);` |
| `08` | `if``(hashtags!=``null``){` |
| `09` | `for``(String hashtag:hashtags){` |
| `10` | `Integer count=hashtagsCounter.get(hashtag);` |
| `11` | `if``(count==``null``){count=``0``;}` |
| `12` | `count++;` |
| `13` | `hashtagsCounter.put(hashtag,count);` |
| `14` | `}` |
| `15` | `}` |
| `16` | `}` |
| `17` | `for``(String hashtag:hashtagsCounter.keySet()){` |
| `18` | `int`?`count=hashtagsCounter.get(hashtag);` |
| `19` | `collector.emit(``new`?`Values(id,user,hashtag,count));` |
| `20` | `}` |
| `21` | `}` |
| `22` | `}` |
這個方法計算每對用戶-話題出現的次數,并為之生成和分發元組。
你可以在GitHub上找到并下載完整代碼。(譯者注:https://github.com/storm-book/examples-ch08-transactional-topologies這個倉庫里沒有代碼,誰知道哪里有代碼麻煩說一聲。)
**提交者*bolts***
我們已經學習了,批次通過協調器和分發器怎樣在拓撲中傳遞。在拓撲中,這些批次中的元組以并行的,沒有特定次序的方式處理。
*協調者bolts*是一類特殊的批處理*bolts*,它們實現了**IComh mitter**或者通過**TransactionalTopologyBuilder**調用**setCommiterBolt**設置了提交者*bolt*。它們與其它的批處理*bolts*最大的不同在于,提交者*bolts*的**finishBatch**方法在提交就緒時執行。這一點發生在之前所有事務都已成功提交之后。另外,**finishBatch**方法是順序執行的。因此如果同時有事務ID1和事務ID2兩個事務同時執行,只有在ID1沒有任何差錯的執行了**finishBatch**方法之后,ID2才會執行該方法。
下面是這個類的實現
| `01` | `public`?`class`?`RedisCommiterCommiterBolt?``extends`?`BaseTransactionalBolt?``implements`?`ICommitter {` |
| `02` | `public`?`static`?`final`?`String LAST_COMMITED_TRANSACTION_FIELD =?``"LAST_COMMIT"``;` |
| `03` | `TransactionAttempt id;` |
| `04` | `BatchOutputCollector collector;` |
| `05` | `Jedis jedis;` |
| `06` | ? |
| `07` | `@Override` |
| `08` | `public`?`void`?`prepare(Map conf, TopologyContext context,` |
| `09` | `BatchOutputCollector collector, TransactionAttempt id) {` |
| `10` | `this``.id = id;` |
| `11` | `this``.collector = collector;` |
| `12` | `this``.jedis =?``new`?`Jedis(``"localhost"``);` |
| `13` | `}` |
| `14` | ? |
| `15` | `HashMap<String, Long> hashtags =?``new`?`HashMap<String,Long>();` |
| `16` | `HashMap<String, Long> users =?``new`?`HashMap<String, Long>();` |
| `17` | `HashMap<String, Long> usersHashtags =?``new`?`HashMap<String, Long>();` |
| `18` | ? |
| `19` | `private`?`void`?`count(HashMap<String, Long> map, String key,?``int`?`count) {` |
| `20` | `Long value = map.get(key);` |
| `21` | `if``(value ==?``null``){value = (``long``)``0``;}` |
| `22` | `value += count;` |
| `23` | `map.put(key,value);` |
| `24` | `}` |
| `25` | ? |
| `26` | `@Override` |
| `27` | `public`?`void`?`execute(Tuple tuple) {` |
| `28` | `String origin = tuple. getSourceComponent();` |
| `29` | `if``(``"sers-splitter"``.equals(origin)) {` |
| `30` | `String user = tuple.getStringByField(``"user"``);` |
| `31` | `count(users, user,?``1``);` |
| `32` | `}?``else`?`if``(``"hashtag-splitter"``.equals(origin)) {` |
| `33` | `String hashtag = tuple.getStringByField(``"hashtag"``);` |
| `34` | `count(hashtags, hashtag,?``1``);` |
| `35` | `}?``else`?`if``(``"user-hashtag-merger"``.quals(origin)) {` |
| `36` | `String hashtag = tuple.getStringByField(``"hashtag"``);` |
| `37` | `String user = tuple.getStringByField(``"user"``);` |
| `38` | `String key = user +?``":"`?`+ hashtag;` |
| `39` | `Integer count = tuple.getIntegerByField(``"count"``);` |
| `40` | `count(usersHashtags, key, count);` |
| `41` | `}` |
| `42` | `}` |
| `43` | ? |
| `44` | `@Override` |
| `45` | `public`?`void`?`finishBatch() {` |
| `46` | `String lastCommitedTransaction = jedis.get(LAST_COMMITED_TRANSACTION_FIELD);` |
| `47` | `String currentTransaction =?``""``+id.getTransactionId();` |
| `48` | ? |
| `49` | `if``(currentTransaction.equals(lastCommitedTransaction)) {``return``;}` |
| `50` | ? |
| `51` | `Transaction multi = jedis.multi();` |
| `52` | ? |
| `53` | `multi.set(LAST_COMMITED_TRANSACTION_FIELD, currentTransaction);` |
| `54` | ? |
| `55` | `Set<String> keys = hashtags.keySet();` |
| `56` | `for`?`(String hashtag : keys) {` |
| `57` | `Long count = hashtags.get(hashtag);` |
| `58` | `multi.hincrBy(``"hashtags"``, hashtag, count);` |
| `59` | `}` |
| `60` | ? |
| `61` | `keys = users.keySet();` |
| `62` | `for`?`(String user : keys) {` |
| `63` | `Long count =users.get(user);` |
| `64` | `multi.hincrBy(``"users"``,user,count);` |
| `65` | `}` |
| `66` | ? |
| `67` | `keys = usersHashtags.keySet();` |
| `68` | `for`?`(String key : keys) {` |
| `69` | `Long count = usersHashtags.get(key);` |
| `70` | `multi.hincrBy(``"users_hashtags"``, key, count);` |
| `71` | `}` |
| `72` | ? |
| `73` | `multi.exec();` |
| `74` | `}` |
| `75` | ? |
| `76` | `@Override` |
| `77` | `public`?`void`?`declareOutputFields(OutputFieldsDeclarer declarer) {}` |
| `78` | `}` |
這個實現很簡單,但是在**finishBatch**有一個細節。
| `1` | `...` |
| `2` | `multi.set(LAST_COMMITED_TRANSACTION_FIELD, currentTransaction);` |
| `3` | `...` |
在這里向數據庫保存提交的最后一個事務ID。為什么要這樣做?記住,如果事務失敗了,Storm將會盡可能多的重復必要的次數。如果你不確定已經處理了這個事務,你就會多算,事務拓撲也就沒有用了。所以請記住:保存最后提交的事務ID,并在提交前檢查。
**分區的事務*Spouts
***對一個*spout*來說,從一個分區集合中讀取批次是很普通的。接著這個例子,你可能有很多redis數據庫,而tweets可能會分別保存在這些redis數據庫里。通過實現**IPartitionedTransactionalSpout**,Storm提供了一些工具用來管理每個分區的狀態并保證重播的能力。
下面我們修改**TweetsTransactionalSpout**,使它可以處理數據分區。
首先,繼承**BasePartitionedTransactionalSpout**,它實現了**IPartitionedTransactionalSpout**。
| `1` | `public`?`class`?`TweetsPartitionedTransactionalSpout?``extends` |
| `2` | `BasePartitionedTransactionalSpout<TransactionMetadata> {` |
| `3` | `...` |
| `4` | `}` |
然后告訴Storm誰是你的協調器。
| `01` | `public`?`static`?`class`?`TweetsPartitionedTransactionalCoordinator?``implements`?`Coordinator {` |
| `02` | `@Override` |
| `03` | `public`?`int`?`numPartitions() {` |
| `04` | `return`?`4``;` |
| `05` | `}` |
| `06` | ? |
| `07` | `@Override` |
| `08` | `public`?`boolean`?`isReady() {` |
| `09` | `return`?`true``;` |
| `10` | `}` |
| `11` | ? |
| `12` | `@Override` |
| `13` | `public`?`void`?`close() {}` |
| `14` | `}` |
在這個例子里,協調器很簡單。numPartitions方法,告訴Storm一共有多少分區。而且你要注意,不要返回任何元數據。對于**IPartitionedTransactionalSpout**,元數據由分發器直接管理。
下面是分發器的實現:
| `01` | `public`?`static`?`class`?`TweetsPartitionedTransactionalEmitter` |
| `02` | `implements`?`Emitter<TransactionMetadata> {` |
| `03` | `PartitionedRQ rq =?``new`?`ParttionedRQ();` |
| `04` | ? |
| `05` | `@Override` |
| `06` | `public`?`TransactionMetadata emitPartitionBatchNew(TransactionAttempt tx,` |
| `07` | `BatchOutputCollector collector,?``int`?`partition,` |
| `08` | `TransactionMetadata lastPartitioonMeta) {` |
| `09` | `long`?`nextRead;` |
| `10` | ? |
| `11` | `if``(lastPartitionMeta ==?``null``) {` |
| `12` | `nextRead = rq.getNextRead(partition);` |
| `13` | `}``else``{` |
| `14` | `nextRead = lastPartitionMeta.from + lastPartitionMeta.quantity;` |
| `15` | `rq.setNextRead(partition, nextRead);?``//移動游標` |
| `16` | `}` |
| `17` | ? |
| `18` | `long`?`quantity = rq.getAvailableToRead(partition, nextRead);` |
| `19` | `quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity;` |
| `20` | `TransactionMetadata metadata =?``new`?`TransactionMetadata(nextRead, (``int``)quantity);` |
| `21` | ? |
| `22` | `emitPartitionBatch(tx, collector, partition, metadata);` |
| `23` | `return`?`metadata;` |
| `24` | `}` |
| `25` | ? |
| `26` | `@Override` |
| `27` | `public`?`void`?`emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector,` |
| `28` | `int`?`partition, TransactionMetadata partitionMeta) {` |
| `29` | `if``(partitionMeta.quantity <=?``0``){` |
| `30` | `return``;` |
| `31` | `}` |
| `32` | ? |
| `33` | `List<String> messages = rq.getMessages(partition, partitionMeta.from,` |
| `34` | `partitionMeta.quantity);` |
| `35` | ? |
| `36` | `long`?`tweetId = partitionMeta.from;` |
| `37` | `for`?`(String msg : messages) {` |
| `38` | `collector.emit(``new`?`Values(tx,?``""``+tweetId, msg));` |
| `39` | `tweetId++;` |
| `40` | `}` |
| `41` | `}` |
| `42` | ? |
| `43` | `@Override` |
| `44` | `public`?`void`?`close() {}` |
| `45` | `}` |
這里有兩個重要的方法,**emitPartitionBatchNew**,和**emitPartitionBatch**。對于**emitPartitionBatchNew**,從Storm接收分區參數,該參數決定應該從哪個分區讀取批次。在這個方法中,決定獲取哪些tweets,生成相應的元數據對象,調用**emitPartitionBatch**,返回元數據對象,并且元數據對象會在方法返回時立即保存到zookeeper。
Storm會為每一個分區發送相同的事務ID,表示一個事務貫穿了所有數據分區。通過**emitPartitionBatch**讀取分區中的tweets,并向拓撲分發批次。如果批次處理失敗了,Storm將會調用**emitPartitionBatch**利用保存下來的元數據重復這個批次。
**NOTE:**?完整的源碼請見:[https://github.com/storm-book/examples-ch08-transactional-topologies](https://github.com/storm-book/examples-ch08-transactional-topologies)(譯者注:原文如此,實際上這個倉庫里什么也沒有)
**模糊的事務性拓撲**
到目前為止,你可能已經學會了如何讓擁有相同事務ID的批次在出錯時重播。但是在有些場景下這樣做可能就不太合適了。然后會發生什么呢?
事實證明,你仍然可以實現在語義上精確的事務,不過這需要更多的開發工作,你要記錄由Storm重復的事務之前的狀態。既然能在不同時刻為相同的事務ID得到不同的元組,你就需要把事務重置到之前的狀態,并從那里繼續。
比如說,如果你為收到的所有tweets計數,你已數到5,而最后的事務ID是321,這時你多數了8個。你要維護以下三個值——previousCount=5,currentCount=13,以及lastTransactionId=321。假設事物ID321又發分了一次,而你又得到了4個元組,而不是之前的8個,提交器會探測到這是相同的事務ID,它將會把結果重置到**previousCount**的值5,并在此基礎上加4,然后更新**currentCount**為9。
另外,在之前的一個事務被取消時,每個并行處理的事務都要被取消。這是為了確保你沒有丟失任何數據。
你的*spout*可以實現**IOpaquePartitionedTransactionalSpout**,而且正如你看到的,協調器和分發器也很簡單。
| `01` | `public`?`static`?`class`?`TweetsOpaquePartitionedTransactionalSpoutCoordinator?``implements``IOpaquePartitionedTransactionalSpout.Coordinator {` |
| `02` | `@Override` |
| `03` | `public`?`boolean`?`isReady() {` |
| `04` | `return`?`true``;` |
| `05` | `}` |
| `06` | `}` |
| `07` | ? |
| `08` | `public`?`static`?`class`?`TweetsOpaquePartitionedTransactionalSpoutEmitter` |
| `09` | `implements`?`IOpaquePartitionedTransactionalSpout.Emitter<TransactionMetadata> {` |
| `10` | `PartitionedRQ rq? =?``new`?`PartitionedRQ();` |
| `11` | ? |
| `12` | `@Override` |
| `13` | `public`?`TransactionMetadata emitPartitionBatch(TransactionAttempt tx,` |
| `14` | `BatchOutputCollector collector,?``int`?`partion,` |
| `15` | `TransactionMetadata lastPartitonMeta) {` |
| `16` | `long`?`nextRead;` |
| `17` | ? |
| `18` | `if``(lastPartitionMeta ==?``null``) {` |
| `19` | `nextRead = rq.getNextRead(partition);` |
| `20` | `}``else``{` |
| `21` | `nextRead = lastPartitionMeta.from + lastPartitionMeta.quantity;` |
| `22` | `rq.setNextRead(partition, nextRead);``//移動游標` |
| `23` | `}` |
| `24` | ? |
| `25` | `long`?`quantity = rq.getAvailabletoRead(partition, nextRead);` |
| `26` | `quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity;` |
| `27` | `TransactionMetadata metadata =?``new`?`TransactionMetadata(nextRead, (``int``)quantity);` |
| `28` | `emitMessages(tx, collector, partition, metadata);` |
| `29` | `return`?`metadata;` |
| `30` | `}` |
| `31` | ? |
| `32` | `private`?`void`?`emitMessage(TransactionAttempt tx, BatchOutputCollector collector,` |
| `33` | `int`?`partition, TransactionMetadata partitionMeta) {` |
| `34` | `if``(partitionMeta.quantity <=?``0``){``return``;}` |
| `35` | ? |
| `36` | `List<String> messages = rq.getMessages(partition, partitionMeta.from, partitionMeta.quantity);` |
| `37` | `long`?`tweetId = partitionMeta.from;` |
| `38` | `for``(String msg : messages) {` |
| `39` | `collector.emit(``new`?`Values(tx,?``""``+tweetId, msg));` |
| `40` | `tweetId++;` |
| `41` | `}` |
| `42` | `}` |
| `43` | ? |
| `44` | `@Override` |
| `45` | `public`?`int`?`numPartitions() {` |
| `46` | `return`?`4``;` |
| `47` | `}` |
| `48` | ? |
| `49` | `@Override` |
| `50` | `public`?`void`?`close() {}` |
| `51` | `}` |
最有趣的方法是**emit**本文翻譯自《[Getting Started With Storm](http://ifeve.com/wp-content/uploads/2014/03/Getting-Started-With-Storm-Jonathan-Leibiusky-Gabriel-E_1276.pdf)》譯者:吳京潤 ? ?編輯:郭蕾 方騰飛
正如書中之前所提到的,使用Storm編程,可以通過調用ack和fail方法來確保一條消息的處理成功或失敗。不過當元組被重發時,會發生什么呢?你又該如何砍不會重復計算?[](http://ifeve.com/getting-started-with-storm-1/storm/)
*Storm0.7.0*實現了一個新特性——事務性拓撲,這一特性使消息在語義上確保你可以安全的方式重發消息,并保證它們只會被處理一次。在不支持事務性拓撲的情況下,你無法在準確性,可擴展性,以空錯性上得到保證的前提下完成計算。
**NOTE:**事務性拓撲是一個構建于標準Storm?*spout*和*bolt*之上的抽象概念。
**設計**
在事務性拓撲中,Storm以并行和順序處理混合的方式處理元組。*spout*并行分批創建供*bolt*處理的元組(譯者注:下文將這種分批創建、分批處理的元組稱做批次)。其中一些*bolt*作為提交者以嚴格有序的方式提交處理過的批次。這意味著如果你有每批五個元組的兩個批次,將有兩個元組被*bolt*并行處理,但是直到提交者成功提交了第一個元組之后,才會提交第二個元組。?**NOTE:**?使用事務性拓撲時,數據源要能夠重發批次,有時候甚至要重復多次。因此確認你的數據源——你連接到的那個*spout*——具備這個能力。 這個過程可以被描述為兩個階段:?*處理階段*?純并行階段,許多批次同時處理。?*提交階段*?嚴格有序階段,直到批次一成功提交之后,才會提交批次二。 這兩個階段合起來稱為一個Storm事務。**NOTE:**?Storm使用zookeeper儲存事務元數據,默認情況下就是拓撲使用的那個zookeeper。你可以修改以下兩個配置參數鍵指定其它的zookeeper——transactional.zookeeper.servers和transactional.zookeeper.port。
**事務實踐**
下面我們要創建一個Twitter分析工具來了解事務的工作方式。我們從一個Redis數據庫讀取tweets,通過幾個*bolt*處理它們,最后把結果保存在另一個Redis數據庫的列表中。處理結果就是所有話題和它們的在tweets中出現的次數列表,所有用戶和他們在tweets中出現的次數列表,還有一個包含發起話題和頻率的用戶列表。 這個工具的拓撲見圖8-1。[](http://ifeve.com/getting-started-of-storm8-2/figure8-1/)? 圖8-1 拓撲概覽
正如你看到的,**TweetsTransactionalSpout**會連接你的tweet數據庫并向拓撲分發批次。**UserSplitterBolt**和**HashTagSplitterBolt**兩個*bolt*,從*spout*接收元組。**UserSplitterBolt**解析tweets并查找用戶——以@開頭的單詞——然后把這些單詞分發到名為*users*的自定義數據流組。**HashtagSplitterBolt**從tweet查找**#**開頭的單詞,并把它們分發到名為*hashtags*的自定義數據流組。第三個*bolt*,**UserHashtagJoinBolt**,接收前面提到的兩個數據流組,并計算具名用戶的一條tweet內的話題數量。為了計數并分發計算結果,這是個**BaseBatchBolt**(稍后有更多介紹)。
最后一個bolt——**RedisCommitterBolt**——接收以上三個*bolt*的數據流組。它為每樣東西計數,并在對一個批次完成處理時,把所有結果保存到redis。這是一種特殊的*bolt*,叫做提交者,在本章后面做更多講解。
用**TransactionalTopologyBuilder**構建拓撲,代碼如下:
| `01` | `TransactionalTopologyBuilder builder=` |
| `02` | `new`?`TransactionalTopologyBuilder(``"test"``,?``"spout"``,?``new`?`TweetsTransactionalSpout());` |
| `03` | ? |
| `04` | `builder.setBolt(``"users-splitter"``,?``new`?`UserSplitterBolt(),?``4``).shuffleGrouping(``"spout"``);` |
| `05` | `buildeer.setBolt(``"hashtag-splitter"``,?``new`?`HashtagSplitterBolt(),?``4``).shuffleGrouping(``"spout"``);` |
| `06` | ? |
| `07` | `builder.setBolt(``"users-hashtag-manager"``,?``new`?`UserHashtagJoinBolt(), r)` |
| `08` | `.fieldsGrouping(``"users-splitter"``,?``"users"``,?``new`?`Fields(``"tweet_id"``))` |
| `09` | `.fieldsGrouping(``"hashtag-splitter"``,?``"hashtags"``,?``new`?`Fields(``"tweet_id"``));` |
| `10` | ? |
| `11` | `builder.setBolt(``"redis-commiter"``,?``new`?`RedisCommiterBolt())` |
| `12` | `.globalGrouping(``"users-splitter"``,?``"users"``)` |
| `13` | `.globalGrouping(``"hashtag-splitter"``,?``"hashtags"``)` |
| `14` | `.globalGrouping(``"user-hashtag-merger"``);` |
接下來就看看如何在一個事務性拓撲中實現*spout*。
***Spout***
一個事務性拓撲的*spout*與標準*spout*完全不同。
| `1` | `public`?`class`?`TweetsTransactionalSpout?``extends`?`BaseTransactionalSpout<TransactionMetadata>{` |
正如你在這個類定義中看到的,TweetsTransactionalSpout繼承了帶范型的**BaseTransactionalSpout**。指定的范型類型的對象是事務元數據集合。它將在后面的代碼中用于從數據源分發批次。
在這個例子中,**TransactionMetadata**定義如下:
| `01` | `public`?`class`?`TransactionMetadata?``implements`?`Serializable {` |
| `02` | `private`?`static`?`final`?`long`?`serialVersionUID = 1L;` |
| `03` | `long`?`from;` |
| `04` | `int`?`quantity;` |
| `05` | ? |
| `06` | `public`?`TransactionMetadata(``long`?`from,?``int`?`quantity) {` |
| `07` | `this``.from = from;` |
| `08` | `this``.quantity = quantity;` |
| `09` | `}` |
| `10` | `}` |
該類的對象維護著兩個屬性**from**和**quantity**,它們用來生成批次。
*spout*的最后需要實現下面的三個方法:
| `01` | `@Override` |
| `02` | `public`?`ITransactionalSpout.Coordinator<TransactionMetadata> getCoordinator(` |
| `03` | `Map conf, TopologyContext context) {` |
| `04` | `return`?`new`?`TweetsTransactionalSpoutCoordinator();` |
| `05` | `}` |
| `06` | ? |
| `07` | `@Override` |
| `08` | `public`?`backtype.storm.transactional.ITransactionalSpout.Emitter<TransactionMetadata> getEmitter(Map conf, TopologyContext contest) {` |
| `09` | `return`?`new`?`TweetsTransactionalSpoutEmitter();` |
| `10` | `}` |
| `11` | ? |
| `12` | `@Override` |
| `13` | `public`?`void`?`declareOutputFields(OuputFieldsDeclarer declarer) {` |
| `14` | `declarer.declare(``new`?`Fields(``"txid"``,?``"tweet_id"``,?``"tweet"``));` |
| `15` | `}` |
**getCoordinator**方法,告訴Storm用來協調生成批次的類。**getEmitter**,負責讀取批次并把它們分發到拓撲中的數據流組。最后,就像之前做過的,需要聲明要分發的域。
**RQ類
**為了讓例子簡單點,我們決定用一個類封裝所有對Redis的操作。
| `01` | `public`?`class`?`RQ {` |
| `02` | `public`?`static`?`final`?`String NEXT_READ =?``"NEXT_READ"``;` |
| `03` | `public`?`static`?`final`?`String NEXT_WRITE =?``"NEXT_WRITE"``;` |
| `04` | ? |
| `05` | `Jedis jedis;` |
| `06` | ? |
| `07` | `public`?`RQ() {` |
| `08` | `jedis =?``new`?`Jedis(``"localhost"``);` |
| `09` | `}` |
| `10` | ? |
| `11` | `public`?`long`?`getavailableToRead(``long`?`current) {` |
| `12` | `return`?`getNextWrite() - current;` |
| `13` | `}` |
| `14` | ? |
| `15` | `public`?`long`?`getNextRead() {` |
| `16` | `String sNextRead = jedis.get(NEXT_READ);` |
| `17` | `if``(sNextRead ==?``null``) {` |
| `18` | `return`?`1``;` |
| `19` | `}` |
| `20` | `return`?`Long.valueOf(sNextRead);` |
| `21` | `}` |
| `22` | ? |
| `23` | `public`?`long`?`getNextWrite() {` |
| `24` | `return`?`Long.valueOf(jedis.get(NEXT_WRITE));` |
| `25` | `}` |
| `26` | ? |
| `27` | `public`?`void`?`close() {` |
| `28` | `jedis.disconnect();` |
| `29` | `}` |
| `30` | ? |
| `31` | `public`?`void`?`setNextRead(``long`?`nextRead) {` |
| `32` | `jedis.set(NEXT_READ,?``""``+nextRead);` |
| `33` | `}` |
| `34` | ? |
| `35` | `public`?`List<String> getMessages(``long`?`from,?``int`?`quantity) {` |
| `36` | `String[] keys =?``new`?`String[quantity];` |
| `37` | `for`?`(``int`?`i =?``0``; i < quantity; i++) {` |
| `38` | `keys[i] =?``""``+(i+from);` |
| `39` | `}` |
| `40` | `return`?`jedis.mget(keys);` |
| `41` | `}` |
| `42` | `}` |
仔細閱讀每個方法,確保自己理解了它們的用處。
**協調者Coordinator
**下面是本例的協調者實現。
| `01` | `public`?`static`?`class`?`TweetsTransactionalSpoutCoordinator?``implements``ITransactionalSpout.Coordinator<TransactionMetadata> {` |
| `02` | `TransactionMetadata lastTransactionMetadata;` |
| `03` | `RQ rq =?``new`?`RQ();` |
| `04` | `long`?`nextRead =?``0``;` |
| `05` | ? |
| `06` | `public`?`TweetsTransactionalSpoutCoordinator() {` |
| `07` | `nextRead = rq.getNextRead();` |
| `08` | `}` |
| `09` | ? |
| `10` | `@Override` |
| `11` | `public`?`TransactionMetadata initializeTransaction(BigInteger txid, TransactionMetadata prevMetadata) {` |
| `12` | `long`?`quantity = rq.getAvailableToRead(nextRead);` |
| `13` | `quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity;` |
| `14` | `TransactionMetadata ret =?``new`?`TransactionMetadata(nextRead, (``int``)quantity);` |
| `15` | `nextRead += quantity;` |
| `16` | `return`?`ret;` |
| `17` | `}` |
| `18` | ? |
| `19` | `@Override` |
| `20` | `public`?`boolean`?`isReady() {` |
| `21` | `return`?`rq.getAvailableToRead(nextRead) >?``0``;` |
| `22` | `}` |
| `23` | ? |
| `24` | `@Override` |
| `25` | `public`?`void`?`close() {` |
| `26` | `rq.close();` |
| `27` | `}` |
| `28` | `}` |
值得一提的是,*在整個拓撲中只會有一個提交者實例*。創建提交者實例時,它會從redis讀取一個從1開始的序列號,這個序列號標識要讀取的tweet下一條。
第一個方法是**isReady**。在**initializeTransaction**之前調用它確認數據源已就緒并可讀取。此方法應當相應的返回**true**或**false**。在此例中,讀取tweets數量并與已讀數量比較。它們之間的不同就在于可讀tweets數。如果它大于0,就意味著還有tweets未讀。
最后,執行**initializeTransaction**。正如你看到的,它接收**txid**和**prevMetadata**作為參數。第一個參數是Storm生成的事務ID,作為批次的惟一性標識。**prevMetadata**是協調器生成的前一個事務元數據對象。
在這個例子中,首先確認有多少tweets可讀。只要確認了這一點,就創建一個TransactionMetadata對象,標識讀取的第一個tweet(譯者注:對象屬性**from**),以及讀取的tweets數量(譯者注:對象屬性**quantity**)。
元數據對象一經返回,Storm把它跟**txid**一起保存在zookeeper。這樣就確保了一旦發生故障,Storm可以利用分發器(譯者注:**Emitter**,見下文)重新發送批次。
**Emitter**
創建事務性*spout*的最后一步是實現分發器(Emitter)。實現如下:
| `01` | `public`?`static`?`class`?`TweetsTransactionalSpoutEmitter``implements`?`ITransactionalSpout.Emitter<TransactionMetadata> {` |
| `02` | ? |
| `03` | `</pre>` |
| `04` | `<pre>????RQ rq =?``new`?`RQ();</pre>` |
| `05` | `<pre>????``public`?`TweetsTransactionalSpoutEmitter() {}</pre>` |
| `06` | `<pre>????``@Override` |
| `07` | `public`?`void`?`emitBatch(TransactionAttempt tx, TransactionMetadata coordinatorMeta, BatchOutputCollector collector) {` |
| `08` | `rq.setNextRead(coordinatorMeta.from+coordinatorMeta.quantity);` |
| `09` | `List<String> messages = rq.getMessages(coordinatorMeta.from, <span style=``"font-family: Georgia, 'Times New Roman', 'Bitstream Charter', Times, serif; font-size: 13px; line-height: 19px;"``>coordinatorMeta.quantity);` |
| `10` | `</span>????????``long`?`tweetId = coordinatorMeta.from;` |
| `11` | `for`?`(String message : messages) {` |
| `12` | `collector.emit(``new`?`Values(tx,?``""``+tweetId, message));` |
| `13` | `tweetId++;` |
| `14` | `}` |
| `15` | `}` |
| `16` | ? |
| `17` | `@Override` |
| `18` | `public`?`void`?`cleanupBefore(BigInteger txid) {}` |
| `19` | ? |
| `20` | `@Override` |
| `21` | `public`?`void`?`close() {` |
| `22` | `rq.close();` |
| `23` | `}</pre>` |
| `24` | `<pre>` |
| `25` | `}` |
分發器從數據源讀取數據并從數據流組發送數據。分發器應當問題能夠為相同的事務id和事務元數據發送相同的批次。這樣,如果在處理批次的過程中發生了故障,Storm就能夠利用分發器重復相同的事務id和事務元數據,并確保批次已經重復過了。Storm會在**TransactionAttempt**對象里為嘗試次數增加計數(譯者注:**attempt id**)。這樣就能知道批次已經重復過了。
在這里**emitBatch**是個重要方法。在這個方法中,使用傳入的元數據對象從redis得到tweets,同時增加redis維持的已讀tweets數。當然它還會把讀到的tweets分發到拓撲。
***Bolts***
首先看一下這個拓撲中的標準*bolt*:
| `01` | `public`?`class`?`UserSplitterBolt?``implements`?`IBasicBolt{` |
| `02` | `private`?`static`?`final`?`long`?`serialVersionUID = 1L;` |
| `03` | ? |
| `04` | `@Override` |
| `05` | `public`?`void`?`declareOutputFields(OutputFieldsDeclarer declarer) {` |
| `06` | `declarer.declareStream(``"users"``,?``new`?`Fields(``"txid"``,``"tweet_id"``,``"user"``));` |
| `07` | `}` |
| `08` | ? |
| `09` | `@Override` |
| `10` | `public`?`Map<String, Object> getComponentConfiguration() {` |
| `11` | `return`?`null``;` |
| `12` | `}` |
| `13` | ? |
| `14` | `@Override` |
| `15` | `public`?`void`?`prepare(Map stormConf, TopologyContext context) {}` |
| `16` | ? |
| `17` | `@Override` |
| `18` | `public`?`void`?`execute(Tuple input, BasicOutputCollector collector) {` |
| `19` | `String tweet = input.getStringByField(``"tweet"``);` |
| `20` | `String tweetId = input.getStringByField(``"tweet_id"``);` |
| `21` | `StringTokenizer strTok =?``new`?`StringTokenizer(tweet,?``" "``);` |
| `22` | `HashSet<String> users =?``new`?`HashSet<String>();` |
| `23` | ? |
| `24` | `while``(strTok.hasMoreTokens()) {` |
| `25` | `String user = strTok.nextToken();` |
| `26` | ? |
| `27` | `//確保這是個真實的用戶,并且在這個tweet中沒有重復` |
| `28` | `if``(user.startsWith(``"@"``) && !users.contains(user)) {` |
| `29` | `collector.emit(``"users"``,?``new`?`Values(tx, tweetId, user));` |
| `30` | `users.add(user);` |
| `31` | `}` |
| `32` | `}` |
| `33` | `}` |
| `34` | ? |
| `35` | `@Override` |
| `36` | `public`?`void`?`cleanup(){}` |
| `37` | `}` |
正如本章前面提到的,**UserSplitterBolt**接收元組,解析tweet文本,分發@開頭的單詞————tweeter用戶。**HashtagSplitterBolt**的實現也非常相似。
| `01` | `public`?`class`?`HashtagSplitterBolt?``implements`?`IBasicBolt{` |
| `02` | `private`?`static`?`final`?`long`?`serialVersionUID = 1L;` |
| `03` | ? |
| `04` | `@Override` |
| `05` | `public`?`void`?`declareOutputFields(OutputFieldsDeclarer declarer) {` |
| `06` | `declarer.declareStream(``"hashtags"``,?``new`?`Fields(``"txid"``,``"tweet_id"``,``"hashtag"``));` |
| `07` | `}` |
| `08` | ? |
| `09` | `@Override` |
| `10` | `public`?`Map<String, Object> getComponentConfiguration() {` |
| `11` | `return`?`null``;` |
| `12` | `}` |
| `13` | ? |
| `14` | `@Override` |
| `15` | `public`?`void`?`prepare(Map stormConf, TopologyContext context) {}` |
| `16` | ? |
| `17` | `@Oerride` |
| `18` | `public`?`void`?`execute(Tuple input, BasicOutputCollector collector) {` |
| `19` | `String tweet = input.getStringByField(``"tweet"``);` |
| `20` | `String tweetId = input.getStringByField(``"tweet_id"``);` |
| `21` | `StringTokenizer strTok =?``new`?`StringTokenizer(tweet,?``" "``);` |
| `22` | `TransactionAttempt tx = (TransactionAttempt)input.getValueByField(``"txid"``);` |
| `23` | `HashSet<String> words =?``new`?`HashSet<String>();` |
| `24` | ? |
| `25` | `while``(strTok.hasMoreTokens()) {` |
| `26` | `String word = strTok.nextToken();` |
| `27` | ? |
| `28` | `if``(word.startsWith(``"#"``) && !words.contains(word)){` |
| `29` | `collector.emit(``"hashtags"``,?``new`?`Values(tx, tweetId, word));` |
| `30` | `words.add(word);` |
| `31` | `}` |
| `32` | `}` |
| `33` | `}` |
| `34` | ? |
| `35` | `@Override` |
| `36` | `public`?`void`?`cleanup(){}` |
| `37` | `}` |
現在看看**UserHashTagJoinBolt**的實現。首先要注意的是它是一個**BaseBatchBolt**。這意味著,**execute**方法會操作接收到的元組,但是不會分發新的元組。批次完成時,Storm會調用**finishBatch**方法。
| `01` | `public`?`void`?`execute(Tuple tuple) {` |
| `02` | `String source = tuple.getSourceStreamId();` |
| `03` | `String tweetId = tuple.getStringByField(``"tweet_id"``);` |
| `04` | ? |
| `05` | `if``(``"hashtags"``.equals(source)) {` |
| `06` | `String hashtag = tuple.getStringByField(``"hashtag"``);` |
| `07` | `add(tweetHashtags, tweetId, hashtag);` |
| `08` | `}?``else`?`if``(``"users"``.equals(source)) {` |
| `09` | `String user = tuple.getStringByField(``"user"``);` |
| `10` | `add(userTweets, user, tweetId);` |
| `11` | `}` |
| `12` | `}` |
既然要結合tweet中提到的用戶為出現的所有話題計數,就需要加入前面的*bolts*創建的兩個數據流組。這件事要以批次為單位進程,在批次處理完成時,調用**finishBatch**方法。
| `01` | `@Override` |
| `02` | `public`?`void`?`finishBatch() {` |
| `03` | `for``(String user:userTweets.keySet()){` |
| `04` | `Set<String> tweets = getUserTweets(user);` |
| `05` | `HashMap<String, Integer> hashtagsCounter =?``new`?`HashMap<String, Integer>();` |
| `06` | `for``(String tweet:tweets){` |
| `07` | `Set<String> hashtags=getTweetHashtags(tweet);` |
| `08` | `if``(hashtags!=``null``){` |
| `09` | `for``(String hashtag:hashtags){` |
| `10` | `Integer count=hashtagsCounter.get(hashtag);` |
| `11` | `if``(count==``null``){count=``0``;}` |
| `12` | `count++;` |
| `13` | `hashtagsCounter.put(hashtag,count);` |
| `14` | `}` |
| `15` | `}` |
| `16` | `}` |
| `17` | `for``(String hashtag:hashtagsCounter.keySet()){` |
| `18` | `int`?`count=hashtagsCounter.get(hashtag);` |
| `19` | `collector.emit(``new`?`Values(id,user,hashtag,count));` |
| `20` | `}` |
| `21` | `}` |
| `22` | `}` |
這個方法計算每對用戶-話題出現的次數,并為之生成和分發元組。
你可以在GitHub上找到并下載完整代碼。(譯者注:https://github.com/storm-book/examples-ch08-transactional-topologies這個倉庫里沒有代碼,誰知道哪里有代碼麻煩說一聲。)
**提交者*bolts***
我們已經學習了,批次通過協調器和分發器怎樣在拓撲中傳遞。在拓撲中,這些批次中的元組以并行的,沒有特定次序的方式處理。
*協調者bolts*是一類特殊的批處理*bolts*,它們實現了**IComh mitter**或者通過**TransactionalTopologyBuilder**調用**setCommiterBolt**設置了提交者*bolt*。它們與其它的批處理*bolts*最大的不同在于,提交者*bolts*的**finishBatch**方法在提交就緒時執行。這一點發生在之前所有事務都已成功提交之后。另外,**finishBatch**方法是順序執行的。因此如果同時有事務ID1和事務ID2兩個事務同時執行,只有在ID1沒有任何差錯的執行了**finishBatch**方法之后,ID2才會執行該方法。
下面是這個類的實現
| `01` | `public`?`class`?`RedisCommiterCommiterBolt?``extends`?`BaseTransactionalBolt?``implements`?`ICommitter {` |
| `02` | `public`?`static`?`final`?`String LAST_COMMITED_TRANSACTION_FIELD =?``"LAST_COMMIT"``;` |
| `03` | `TransactionAttempt id;` |
| `04` | `BatchOutputCollector collector;` |
| `05` | `Jedis jedis;` |
| `06` | ? |
| `07` | `@Override` |
| `08` | `public`?`void`?`prepare(Map conf, TopologyContext context,` |
| `09` | `BatchOutputCollector collector, TransactionAttempt id) {` |
| `10` | `this``.id = id;` |
| `11` | `this``.collector = collector;` |
| `12` | `this``.jedis =?``new`?`Jedis(``"localhost"``);` |
| `13` | `}` |
| `14` | ? |
| `15` | `HashMap<String, Long> hashtags =?``new`?`HashMap<String,Long>();` |
| `16` | `HashMap<String, Long> users =?``new`?`HashMap<String, Long>();` |
| `17` | `HashMap<String, Long> usersHashtags =?``new`?`HashMap<String, Long>();` |
| `18` | ? |
| `19` | `private`?`void`?`count(HashMap<String, Long> map, String key,?``int`?`count) {` |
| `20` | `Long value = map.get(key);` |
| `21` | `if``(value ==?``null``){value = (``long``)``0``;}` |
| `22` | `value += count;` |
| `23` | `map.put(key,value);` |
| `24` | `}` |
| `25` | ? |
| `26` | `@Override` |
| `27` | `public`?`void`?`execute(Tuple tuple) {` |
| `28` | `String origin = tuple. getSourceComponent();` |
| `29` | `if``(``"sers-splitter"``.equals(origin)) {` |
| `30` | `String user = tuple.getStringByField(``"user"``);` |
| `31` | `count(users, user,?``1``);` |
| `32` | `}?``else`?`if``(``"hashtag-splitter"``.equals(origin)) {` |
| `33` | `String hashtag = tuple.getStringByField(``"hashtag"``);` |
| `34` | `count(hashtags, hashtag,?``1``);` |
| `35` | `}?``else`?`if``(``"user-hashtag-merger"``.quals(origin)) {` |
| `36` | `String hashtag = tuple.getStringByField(``"hashtag"``);` |
| `37` | `String user = tuple.getStringByField(``"user"``);` |
| `38` | `String key = user +?``":"`?`+ hashtag;` |
| `39` | `Integer count = tuple.getIntegerByField(``"count"``);` |
| `40` | `count(usersHashtags, key, count);` |
| `41` | `}` |
| `42` | `}` |
| `43` | ? |
| `44` | `@Override` |
| `45` | `public`?`void`?`finishBatch() {` |
| `46` | `String lastCommitedTransaction = jedis.get(LAST_COMMITED_TRANSACTION_FIELD);` |
| `47` | `String currentTransaction =?``""``+id.getTransactionId();` |
| `48` | ? |
| `49` | `if``(currentTransaction.equals(lastCommitedTransaction)) {``return``;}` |
| `50` | ? |
| `51` | `Transaction multi = jedis.multi();` |
| `52` | ? |
| `53` | `multi.set(LAST_COMMITED_TRANSACTION_FIELD, currentTransaction);` |
| `54` | ? |
| `55` | `Set<String> keys = hashtags.keySet();` |
| `56` | `for`?`(String hashtag : keys) {` |
| `57` | `Long count = hashtags.get(hashtag);` |
| `58` | `multi.hincrBy(``"hashtags"``, hashtag, count);` |
| `59` | `}` |
| `60` | ? |
| `61` | `keys = users.keySet();` |
| `62` | `for`?`(String user : keys) {` |
| `63` | `Long count =users.get(user);` |
| `64` | `multi.hincrBy(``"users"``,user,count);` |
| `65` | `}` |
| `66` | ? |
| `67` | `keys = usersHashtags.keySet();` |
| `68` | `for`?`(String key : keys) {` |
| `69` | `Long count = usersHashtags.get(key);` |
| `70` | `multi.hincrBy(``"users_hashtags"``, key, count);` |
| `71` | `}` |
| `72` | ? |
| `73` | `multi.exec();` |
| `74` | `}` |
| `75` | ? |
| `76` | `@Override` |
| `77` | `public`?`void`?`declareOutputFields(OutputFieldsDeclarer declarer) {}` |
| `78` | `}` |
這個實現很簡單,但是在**finishBatch**有一個細節。
| `1` | `...` |
| `2` | `multi.set(LAST_COMMITED_TRANSACTION_FIELD, currentTransaction);` |
| `3` | `...` |
在這里向數據庫保存提交的最后一個事務ID。為什么要這樣做?記住,如果事務失敗了,Storm將會盡可能多的重復必要的次數。如果你不確定已經處理了這個事務,你就會多算,事務拓撲也就沒有用了。所以請記住:保存最后提交的事務ID,并在提交前檢查。
**分區的事務*Spouts
***對一個*spout*來說,從一個分區集合中讀取批次是很普通的。接著這個例子,你可能有很多redis數據庫,而tweets可能會分別保存在這些redis數據庫里。通過實現**IPartitionedTransactionalSpout**,Storm提供了一些工具用來管理每個分區的狀態并保證重播的能力。
下面我們修改**TweetsTransactionalSpout**,使它可以處理數據分區。
首先,繼承**BasePartitionedTransactionalSpout**,它實現了**IPartitionedTransactionalSpout**。
| `1` | `public`?`class`?`TweetsPartitionedTransactionalSpout?``extends` |
| `2` | `BasePartitionedTransactionalSpout<TransactionMetadata> {` |
| `3` | `...` |
| `4` | `}` |
然后告訴Storm誰是你的協調器。
| `01` | `public`?`static`?`class`?`TweetsPartitionedTransactionalCoordinator?``implements`?`Coordinator {` |
| `02` | `@Override` |
| `03` | `public`?`int`?`numPartitions() {` |
| `04` | `return`?`4``;` |
| `05` | `}` |
| `06` | ? |
| `07` | `@Override` |
| `08` | `public`?`boolean`?`isReady() {` |
| `09` | `return`?`true``;` |
| `10` | `}` |
| `11` | ? |
| `12` | `@Override` |
| `13` | `public`?`void`?`close() {}` |
| `14` | `}` |
在這個例子里,協調器很簡單。numPartitions方法,告訴Storm一共有多少分區。而且你要注意,不要返回任何元數據。對于**IPartitionedTransactionalSpout**,元數據由分發器直接管理。
下面是分發器的實現:
| `01` | `public`?`static`?`class`?`TweetsPartitionedTransactionalEmitter` |
| `02` | `implements`?`Emitter<TransactionMetadata> {` |
| `03` | `PartitionedRQ rq =?``new`?`ParttionedRQ();` |
| `04` | ? |
| `05` | `@Override` |
| `06` | `public`?`TransactionMetadata emitPartitionBatchNew(TransactionAttempt tx,` |
| `07` | `BatchOutputCollector collector,?``int`?`partition,` |
| `08` | `TransactionMetadata lastPartitioonMeta) {` |
| `09` | `long`?`nextRead;` |
| `10` | ? |
| `11` | `if``(lastPartitionMeta ==?``null``) {` |
| `12` | `nextRead = rq.getNextRead(partition);` |
| `13` | `}``else``{` |
| `14` | `nextRead = lastPartitionMeta.from + lastPartitionMeta.quantity;` |
| `15` | `rq.setNextRead(partition, nextRead);?``//移動游標` |
| `16` | `}` |
| `17` | ? |
| `18` | `long`?`quantity = rq.getAvailableToRead(partition, nextRead);` |
| `19` | `quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity;` |
| `20` | `TransactionMetadata metadata =?``new`?`TransactionMetadata(nextRead, (``int``)quantity);` |
| `21` | ? |
| `22` | `emitPartitionBatch(tx, collector, partition, metadata);` |
| `23` | `return`?`metadata;` |
| `24` | `}` |
| `25` | ? |
| `26` | `@Override` |
| `27` | `public`?`void`?`emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector,` |
| `28` | `int`?`partition, TransactionMetadata partitionMeta) {` |
| `29` | `if``(partitionMeta.quantity <=?``0``){` |
| `30` | `return``;` |
| `31` | `}` |
| `32` | ? |
| `33` | `List<String> messages = rq.getMessages(partition, partitionMeta.from,` |
| `34` | `partitionMeta.quantity);` |
| `35` | ? |
| `36` | `long`?`tweetId = partitionMeta.from;` |
| `37` | `for`?`(String msg : messages) {` |
| `38` | `collector.emit(``new`?`Values(tx,?``""``+tweetId, msg));` |
| `39` | `tweetId++;` |
| `40` | `}` |
| `41` | `}` |
| `42` | ? |
| `43` | `@Override` |
| `44` | `public`?`void`?`close() {}` |
| `45` | `}` |
這里有兩個重要的方法,**emitPartitionBatchNew**,和**emitPartitionBatch**。對于**emitPartitionBatchNew**,從Storm接收分區參數,該參數決定應該從哪個分區讀取批次。在這個方法中,決定獲取哪些tweets,生成相應的元數據對象,調用**emitPartitionBatch**,返回元數據對象,并且元數據對象會在方法返回時立即保存到zookeeper。
Storm會為每一個分區發送相同的事務ID,表示一個事務貫穿了所有數據分區。通過**emitPartitionBatch**讀取分區中的tweets,并向拓撲分發批次。如果批次處理失敗了,Storm將會調用**emitPartitionBatch**利用保存下來的元數據重復這個批次。
**NOTE:**?完整的源碼請見:[https://github.com/storm-book/examples-ch08-transactional-topologies](https://github.com/storm-book/examples-ch08-transactional-topologies)(譯者注:原文如此,實際上這個倉庫里什么也沒有)
**模糊的事務性拓撲**
到目前為止,你可能已經學會了如何讓擁有相同事務ID的批次在出錯時重播。但是在有些場景下這樣做可能就不太合適了。然后會發生什么呢?
事實證明,你仍然可以實現在語義上精確的事務,不過這需要更多的開發工作,你要記錄由Storm重復的事務之前的狀態。既然能在不同時刻為相同的事務ID得到不同的元組,你就需要把事務重置到之前的狀態,并從那里繼續。
比如說,如果你為收到的所有tweets計數,你已數到5,而最后的事務ID是321,這時你多數了8個。你要維護以下三個值——previousCount=5,currentCount=13,以及lastTransactionId=321。假設事物ID321又發分了一次,而你又得到了4個元組,而不是之前的8個,提交器會探測到這是相同的事務ID,它將會把結果重置到**previousCount**的值5,并在此基礎上加4,然后更新**currentCount**為9。
另外,在之前的一個事務被取消時,每個并行處理的事務都要被取消。這是為了確保你沒有丟失任何數據。
你的*spout*可以實現**IOpaquePartitionedTransactionalSpout**,而且正如你看到的,協調器和分發器也很簡單。
| `01` | `public`?`static`?`class`?`TweetsOpaquePartitionedTransactionalSpoutCoordinator?``implements``IOpaquePartitionedTransactionalSpout.Coordinator {` |
| `02` | `@Override` |
| `03` | `public`?`boolean`?`isReady() {` |
| `04` | `return`?`true``;` |
| `05` | `}` |
| `06` | `}` |
| `07` | ? |
| `08` | `public`?`static`?`class`?`TweetsOpaquePartitionedTransactionalSpoutEmitter` |
| `09` | `implements`?`IOpaquePartitionedTransactionalSpout.Emitter<TransactionMetadata> {` |
| `10` | `PartitionedRQ rq? =?``new`?`PartitionedRQ();` |
| `11` | ? |
| `12` | `@Override` |
| `13` | `public`?`TransactionMetadata emitPartitionBatch(TransactionAttempt tx,` |
| `14` | `BatchOutputCollector collector,?``int`?`partion,` |
| `15` | `TransactionMetadata lastPartitonMeta) {` |
| `16` | `long`?`nextRead;` |
| `17` | ? |
| `18` | `if``(lastPartitionMeta ==?``null``) {` |
| `19` | `nextRead = rq.getNextRead(partition);` |
| `20` | `}``else``{` |
| `21` | `nextRead = lastPartitionMeta.from + lastPartitionMeta.quantity;` |
| `22` | `rq.setNextRead(partition, nextRead);``//移動游標` |
| `23` | `}` |
| `24` | ? |
| `25` | `long`?`quantity = rq.getAvailabletoRead(partition, nextRead);` |
| `26` | `quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity;` |
| `27` | `TransactionMetadata metadata =?``new`?`TransactionMetadata(nextRead, (``int``)quantity);` |
| `28` | `emitMessages(tx, collector, partition, metadata);` |
| `29` | `return`?`metadata;` |
| `30` | `}` |
| `31` | ? |
| `32` | `private`?`void`?`emitMessage(TransactionAttempt tx, BatchOutputCollector collector,` |
| `33` | `int`?`partition, TransactionMetadata partitionMeta) {` |
| `34` | `if``(partitionMeta.quantity <=?``0``){``return``;}` |
| `35` | ? |
| `36` | `List<String> messages = rq.getMessages(partition, partitionMeta.from, partitionMeta.quantity);` |
| `37` | `long`?`tweetId = partitionMeta.from;` |
| `38` | `for``(String msg : messages) {` |
| `39` | `collector.emit(``new`?`Values(tx,?``""``+tweetId, msg));` |
| `40` | `tweetId++;` |
| `41` | `}` |
| `42` | `}` |
| `43` | ? |
| `44` | `@Override` |
| `45` | `public`?`int`?`numPartitions() {` |
| `46` | `return`?`4``;` |
| `47` | `}` |
| `48` | ? |
| `49` | `@Override` |
| `50` | `public`?`void`?`close() {}` |
| `51` | `}` |
最有趣的方法是**emitPartitionBatch**,它獲取之前提交的元數據。你要用它生成批次。這個批次不需要與之前的那個一致,你可能根本無法創建完全一樣的批次。剩余的工作由提交器*bolts*借助之前的狀態完成。
**原創文章,轉載請注明:**?轉載自[并發編程網 – ifeve.com](http://ifeve.com/)
**本文鏈接地址:**?[Storm入門之第8章事務性拓撲](http://ifeve.com/getting-started-of-storm8/)**PartitionBatch**,它獲取之前提交的元數據。你要用它生成批次。這個批次不需要與之前的那個一致,你可能根本無法創建完全一樣的批次。剩余的工作由提交器*bolts*借助之前的狀態完成。