<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                正如書中之前所提到的,使用Storm編程,可以通過調用ack和fail方法來確保一條消息的處理成功或失敗。不過當元組被重發時,會發生什么呢?你又該如何砍不會重復計算? *Storm0.7.0*實現了一個新特性——事務性拓撲,這一特性使消息在語義上確保你可以安全的方式重發消息,并保證它們只會被處理一次。在不支持事務性拓撲的情況下,你無法在準確性,可擴展性,以空錯性上得到保證的前提下完成計算。 **NOTE:**事務性拓撲是一個構建于標準Storm?*spout*和*bolt*之上的抽象概念。 **設計** 在事務性拓撲中,Storm以并行和順序處理混合的方式處理元組。*spout*并行分批創建供*bolt*處理的元組(譯者注:下文將這種分批創建、分批處理的元組稱做批次)。其中一些*bolt*作為提交者以嚴格有序的方式提交處理過的批次。這意味著如果你有每批五個元組的兩個批次,將有兩個元組被*bolt*并行處理,但是直到提交者成功提交了第一個元組之后,才會提交第二個元組。?**NOTE:**?使用事務性拓撲時,數據源要能夠重發批次,有時候甚至要重復多次。因此確認你的數據源——你連接到的那個*spout*——具備這個能力。 這個過程可以被描述為兩個階段:?*處理階段*?純并行階段,許多批次同時處理。?*提交階段*?嚴格有序階段,直到批次一成功提交之后,才會提交批次二。 這兩個階段合起來稱為一個Storm事務。**NOTE:**?Storm使用zookeeper儲存事務元數據,默認情況下就是拓撲使用的那個zookeeper。你可以修改以下兩個配置參數鍵指定其它的zookeeper——transactional.zookeeper.servers和transactional.zookeeper.port。 **事務實踐** 下面我們要創建一個Twitter分析工具來了解事務的工作方式。我們從一個Redis數據庫讀取tweets,通過幾個*bolt*處理它們,最后把結果保存在另一個Redis數據庫的列表中。處理結果就是所有話題和它們的在tweets中出現的次數列表,所有用戶和他們在tweets中出現的次數列表,還有一個包含發起話題和頻率的用戶列表。 這個工具的拓撲見圖8-1。[![拓撲概覽](https://box.kancloud.cn/2015-09-21_55ffef14a3838.png)](http://ifeve.com/getting-started-of-storm8-2/figure8-1/)? 圖8-1 拓撲概覽 正如你看到的,**TweetsTransactionalSpout**會連接你的tweet數據庫并向拓撲分發批次。**UserSplitterBolt**和**HashTagSplitterBolt**兩個*bolt*,從*spout*接收元組。**UserSplitterBolt**解析tweets并查找用戶——以@開頭的單詞——然后把這些單詞分發到名為*users*的自定義數據流組。**HashtagSplitterBolt**從tweet查找**#**開頭的單詞,并把它們分發到名為*hashtags*的自定義數據流組。第三個*bolt*,**UserHashtagJoinBolt**,接收前面提到的兩個數據流組,并計算具名用戶的一條tweet內的話題數量。為了計數并分發計算結果,這是個**BaseBatchBolt**(稍后有更多介紹)。 最后一個bolt——**RedisCommitterBolt**——接收以上三個*bolt*的數據流組。它為每樣東西計數,并在對一個批次完成處理時,把所有結果保存到redis。這是一種特殊的*bolt*,叫做提交者,在本章后面做更多講解。 用**TransactionalTopologyBuilder**構建拓撲,代碼如下: | `01` | `TransactionalTopologyBuilder builder=` | | `02` | `new`?`TransactionalTopologyBuilder(``"test"``,?``"spout"``,?``new`?`TweetsTransactionalSpout());` | | `03` | ? | | `04` | `builder.setBolt(``"users-splitter"``,?``new`?`UserSplitterBolt(),?``4``).shuffleGrouping(``"spout"``);` | | `05` | `buildeer.setBolt(``"hashtag-splitter"``,?``new`?`HashtagSplitterBolt(),?``4``).shuffleGrouping(``"spout"``);` | | `06` | ? | | `07` | `builder.setBolt(``"users-hashtag-manager"``,?``new`?`UserHashtagJoinBolt(), r)` | | `08` | `.fieldsGrouping(``"users-splitter"``,?``"users"``,?``new`?`Fields(``"tweet_id"``))` | | `09` | `.fieldsGrouping(``"hashtag-splitter"``,?``"hashtags"``,?``new`?`Fields(``"tweet_id"``));` | | `10` | ? | | `11` | `builder.setBolt(``"redis-commiter"``,?``new`?`RedisCommiterBolt())` | | `12` | `.globalGrouping(``"users-splitter"``,?``"users"``)` | | `13` | `.globalGrouping(``"hashtag-splitter"``,?``"hashtags"``)` | | `14` | `.globalGrouping(``"user-hashtag-merger"``);` | 接下來就看看如何在一個事務性拓撲中實現*spout*。 ***Spout*** 一個事務性拓撲的*spout*與標準*spout*完全不同。 | `1` | `public`?`class`?`TweetsTransactionalSpout?``extends`?`BaseTransactionalSpout<TransactionMetadata>{` | 正如你在這個類定義中看到的,TweetsTransactionalSpout繼承了帶范型的**BaseTransactionalSpout**。指定的范型類型的對象是事務元數據集合。它將在后面的代碼中用于從數據源分發批次。 在這個例子中,**TransactionMetadata**定義如下: | `01` | `public`?`class`?`TransactionMetadata?``implements`?`Serializable {` | | `02` | `private`?`static`?`final`?`long`?`serialVersionUID = 1L;` | | `03` | `long`?`from;` | | `04` | `int`?`quantity;` | | `05` | ? | | `06` | `public`?`TransactionMetadata(``long`?`from,?``int`?`quantity) {` | | `07` | `this``.from = from;` | | `08` | `this``.quantity = quantity;` | | `09` | `}` | | `10` | `}` | 該類的對象維護著兩個屬性**from**和**quantity**,它們用來生成批次。 *spout*的最后需要實現下面的三個方法: | `01` | `@Override` | | `02` | `public`?`ITransactionalSpout.Coordinator<TransactionMetadata> getCoordinator(` | | `03` | `Map conf, TopologyContext context) {` | | `04` | `return`?`new`?`TweetsTransactionalSpoutCoordinator();` | | `05` | `}` | | `06` | ? | | `07` | `@Override` | | `08` | `public`?`backtype.storm.transactional.ITransactionalSpout.Emitter<TransactionMetadata> getEmitter(Map conf, TopologyContext contest) {` | | `09` | `return`?`new`?`TweetsTransactionalSpoutEmitter();` | | `10` | `}` | | `11` | ? | | `12` | `@Override` | | `13` | `public`?`void`?`declareOutputFields(OuputFieldsDeclarer declarer) {` | | `14` | `declarer.declare(``new`?`Fields(``"txid"``,?``"tweet_id"``,?``"tweet"``));` | | `15` | `}` | **getCoordinator**方法,告訴Storm用來協調生成批次的類。**getEmitter**,負責讀取批次并把它們分發到拓撲中的數據流組。最后,就像之前做過的,需要聲明要分發的域。 **RQ類 **為了讓例子簡單點,我們決定用一個類封裝所有對Redis的操作。 | `01` | `public`?`class`?`RQ {` | | `02` | `public`?`static`?`final`?`String NEXT_READ =?``"NEXT_READ"``;` | | `03` | `public`?`static`?`final`?`String NEXT_WRITE =?``"NEXT_WRITE"``;` | | `04` | ? | | `05` | `Jedis jedis;` | | `06` | ? | | `07` | `public`?`RQ() {` | | `08` | `jedis =?``new`?`Jedis(``"localhost"``);` | | `09` | `}` | | `10` | ? | | `11` | `public`?`long`?`getavailableToRead(``long`?`current) {` | | `12` | `return`?`getNextWrite() - current;` | | `13` | `}` | | `14` | ? | | `15` | `public`?`long`?`getNextRead() {` | | `16` | `String sNextRead = jedis.get(NEXT_READ);` | | `17` | `if``(sNextRead ==?``null``) {` | | `18` | `return`?`1``;` | | `19` | `}` | | `20` | `return`?`Long.valueOf(sNextRead);` | | `21` | `}` | | `22` | ? | | `23` | `public`?`long`?`getNextWrite() {` | | `24` | `return`?`Long.valueOf(jedis.get(NEXT_WRITE));` | | `25` | `}` | | `26` | ? | | `27` | `public`?`void`?`close() {` | | `28` | `jedis.disconnect();` | | `29` | `}` | | `30` | ? | | `31` | `public`?`void`?`setNextRead(``long`?`nextRead) {` | | `32` | `jedis.set(NEXT_READ,?``""``+nextRead);` | | `33` | `}` | | `34` | ? | | `35` | `public`?`List<String> getMessages(``long`?`from,?``int`?`quantity) {` | | `36` | `String[] keys =?``new`?`String[quantity];` | | `37` | `for`?`(``int`?`i =?``0``; i < quantity; i++) {` | | `38` | `keys[i] =?``""``+(i+from);` | | `39` | `}` | | `40` | `return`?`jedis.mget(keys);` | | `41` | `}` | | `42` | `}` | 仔細閱讀每個方法,確保自己理解了它們的用處。 **協調者Coordinator **下面是本例的協調者實現。 | `01` | `public`?`static`?`class`?`TweetsTransactionalSpoutCoordinator?``implements``ITransactionalSpout.Coordinator<TransactionMetadata> {` | | `02` | `TransactionMetadata lastTransactionMetadata;` | | `03` | `RQ rq =?``new`?`RQ();` | | `04` | `long`?`nextRead =?``0``;` | | `05` | ? | | `06` | `public`?`TweetsTransactionalSpoutCoordinator() {` | | `07` | `nextRead = rq.getNextRead();` | | `08` | `}` | | `09` | ? | | `10` | `@Override` | | `11` | `public`?`TransactionMetadata initializeTransaction(BigInteger txid, TransactionMetadata prevMetadata) {` | | `12` | `long`?`quantity = rq.getAvailableToRead(nextRead);` | | `13` | `quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity;` | | `14` | `TransactionMetadata ret =?``new`?`TransactionMetadata(nextRead, (``int``)quantity);` | | `15` | `nextRead += quantity;` | | `16` | `return`?`ret;` | | `17` | `}` | | `18` | ? | | `19` | `@Override` | | `20` | `public`?`boolean`?`isReady() {` | | `21` | `return`?`rq.getAvailableToRead(nextRead) >?``0``;` | | `22` | `}` | | `23` | ? | | `24` | `@Override` | | `25` | `public`?`void`?`close() {` | | `26` | `rq.close();` | | `27` | `}` | | `28` | `}` | 值得一提的是,*在整個拓撲中只會有一個提交者實例*。創建提交者實例時,它會從redis讀取一個從1開始的序列號,這個序列號標識要讀取的tweet下一條。 第一個方法是**isReady**。在**initializeTransaction**之前調用它確認數據源已就緒并可讀取。此方法應當相應的返回**true**或**false**。在此例中,讀取tweets數量并與已讀數量比較。它們之間的不同就在于可讀tweets數。如果它大于0,就意味著還有tweets未讀。 最后,執行**initializeTransaction**。正如你看到的,它接收**txid**和**prevMetadata**作為參數。第一個參數是Storm生成的事務ID,作為批次的惟一性標識。**prevMetadata**是協調器生成的前一個事務元數據對象。 在這個例子中,首先確認有多少tweets可讀。只要確認了這一點,就創建一個TransactionMetadata對象,標識讀取的第一個tweet(譯者注:對象屬性**from**),以及讀取的tweets數量(譯者注:對象屬性**quantity**)。 元數據對象一經返回,Storm把它跟**txid**一起保存在zookeeper。這樣就確保了一旦發生故障,Storm可以利用分發器(譯者注:**Emitter**,見下文)重新發送批次。 **Emitter** 創建事務性*spout*的最后一步是實現分發器(Emitter)。實現如下: | `01` | `public`?`static`?`class`?`TweetsTransactionalSpoutEmitter``implements`?`ITransactionalSpout.Emitter<TransactionMetadata> {` | | `02` | ? | | `03` | `</pre>` | | `04` | `<pre>????RQ rq =?``new`?`RQ();</pre>` | | `05` | `<pre>????``public`?`TweetsTransactionalSpoutEmitter() {}</pre>` | | `06` | `<pre>????``@Override` | | `07` | `public`?`void`?`emitBatch(TransactionAttempt tx, TransactionMetadata coordinatorMeta, BatchOutputCollector collector) {` | | `08` | `rq.setNextRead(coordinatorMeta.from+coordinatorMeta.quantity);` | | `09` | `List<String> messages = rq.getMessages(coordinatorMeta.from, <span style=``"font-family: Georgia, 'Times New Roman', 'Bitstream Charter', Times, serif; font-size: 13px; line-height: 19px;"``>coordinatorMeta.quantity);` | | `10` | `</span>????????``long`?`tweetId = coordinatorMeta.from;` | | `11` | `for`?`(String message : messages) {` | | `12` | `collector.emit(``new`?`Values(tx,?``""``+tweetId, message));` | | `13` | `tweetId++;` | | `14` | `}` | | `15` | `}` | | `16` | ? | | `17` | `@Override` | | `18` | `public`?`void`?`cleanupBefore(BigInteger txid) {}` | | `19` | ? | | `20` | `@Override` | | `21` | `public`?`void`?`close() {` | | `22` | `rq.close();` | | `23` | `}</pre>` | | `24` | `<pre>` | | `25` | `}` | 分發器從數據源讀取數據并從數據流組發送數據。分發器應當問題能夠為相同的事務id和事務元數據發送相同的批次。這樣,如果在處理批次的過程中發生了故障,Storm就能夠利用分發器重復相同的事務id和事務元數據,并確保批次已經重復過了。Storm會在**TransactionAttempt**對象里為嘗試次數增加計數(譯者注:**attempt id**)。這樣就能知道批次已經重復過了。 在這里**emitBatch**是個重要方法。在這個方法中,使用傳入的元數據對象從redis得到tweets,同時增加redis維持的已讀tweets數。當然它還會把讀到的tweets分發到拓撲。 ***Bolts*** 首先看一下這個拓撲中的標準*bolt*: | `01` | `public`?`class`?`UserSplitterBolt?``implements`?`IBasicBolt{` | | `02` | `private`?`static`?`final`?`long`?`serialVersionUID = 1L;` | | `03` | ? | | `04` | `@Override` | | `05` | `public`?`void`?`declareOutputFields(OutputFieldsDeclarer declarer) {` | | `06` | `declarer.declareStream(``"users"``,?``new`?`Fields(``"txid"``,``"tweet_id"``,``"user"``));` | | `07` | `}` | | `08` | ? | | `09` | `@Override` | | `10` | `public`?`Map<String, Object> getComponentConfiguration() {` | | `11` | `return`?`null``;` | | `12` | `}` | | `13` | ? | | `14` | `@Override` | | `15` | `public`?`void`?`prepare(Map stormConf, TopologyContext context) {}` | | `16` | ? | | `17` | `@Override` | | `18` | `public`?`void`?`execute(Tuple input, BasicOutputCollector collector) {` | | `19` | `String tweet = input.getStringByField(``"tweet"``);` | | `20` | `String tweetId = input.getStringByField(``"tweet_id"``);` | | `21` | `StringTokenizer strTok =?``new`?`StringTokenizer(tweet,?``" "``);` | | `22` | `HashSet<String> users =?``new`?`HashSet<String>();` | | `23` | ? | | `24` | `while``(strTok.hasMoreTokens()) {` | | `25` | `String user = strTok.nextToken();` | | `26` | ? | | `27` | `//確保這是個真實的用戶,并且在這個tweet中沒有重復` | | `28` | `if``(user.startsWith(``"@"``) && !users.contains(user)) {` | | `29` | `collector.emit(``"users"``,?``new`?`Values(tx, tweetId, user));` | | `30` | `users.add(user);` | | `31` | `}` | | `32` | `}` | | `33` | `}` | | `34` | ? | | `35` | `@Override` | | `36` | `public`?`void`?`cleanup(){}` | | `37` | `}` | 正如本章前面提到的,**UserSplitterBolt**接收元組,解析tweet文本,分發@開頭的單詞————tweeter用戶。**HashtagSplitterBolt**的實現也非常相似。 | `01` | `public`?`class`?`HashtagSplitterBolt?``implements`?`IBasicBolt{` | | `02` | `private`?`static`?`final`?`long`?`serialVersionUID = 1L;` | | `03` | ? | | `04` | `@Override` | | `05` | `public`?`void`?`declareOutputFields(OutputFieldsDeclarer declarer) {` | | `06` | `declarer.declareStream(``"hashtags"``,?``new`?`Fields(``"txid"``,``"tweet_id"``,``"hashtag"``));` | | `07` | `}` | | `08` | ? | | `09` | `@Override` | | `10` | `public`?`Map<String, Object> getComponentConfiguration() {` | | `11` | `return`?`null``;` | | `12` | `}` | | `13` | ? | | `14` | `@Override` | | `15` | `public`?`void`?`prepare(Map stormConf, TopologyContext context) {}` | | `16` | ? | | `17` | `@Oerride` | | `18` | `public`?`void`?`execute(Tuple input, BasicOutputCollector collector) {` | | `19` | `String tweet = input.getStringByField(``"tweet"``);` | | `20` | `String tweetId = input.getStringByField(``"tweet_id"``);` | | `21` | `StringTokenizer strTok =?``new`?`StringTokenizer(tweet,?``" "``);` | | `22` | `TransactionAttempt tx = (TransactionAttempt)input.getValueByField(``"txid"``);` | | `23` | `HashSet<String> words =?``new`?`HashSet<String>();` | | `24` | ? | | `25` | `while``(strTok.hasMoreTokens()) {` | | `26` | `String word = strTok.nextToken();` | | `27` | ? | | `28` | `if``(word.startsWith(``"#"``) && !words.contains(word)){` | | `29` | `collector.emit(``"hashtags"``,?``new`?`Values(tx, tweetId, word));` | | `30` | `words.add(word);` | | `31` | `}` | | `32` | `}` | | `33` | `}` | | `34` | ? | | `35` | `@Override` | | `36` | `public`?`void`?`cleanup(){}` | | `37` | `}` | 現在看看**UserHashTagJoinBolt**的實現。首先要注意的是它是一個**BaseBatchBolt**。這意味著,**execute**方法會操作接收到的元組,但是不會分發新的元組。批次完成時,Storm會調用**finishBatch**方法。 | `01` | `public`?`void`?`execute(Tuple tuple) {` | | `02` | `String source = tuple.getSourceStreamId();` | | `03` | `String tweetId = tuple.getStringByField(``"tweet_id"``);` | | `04` | ? | | `05` | `if``(``"hashtags"``.equals(source)) {` | | `06` | `String hashtag = tuple.getStringByField(``"hashtag"``);` | | `07` | `add(tweetHashtags, tweetId, hashtag);` | | `08` | `}?``else`?`if``(``"users"``.equals(source)) {` | | `09` | `String user = tuple.getStringByField(``"user"``);` | | `10` | `add(userTweets, user, tweetId);` | | `11` | `}` | | `12` | `}` | 既然要結合tweet中提到的用戶為出現的所有話題計數,就需要加入前面的*bolts*創建的兩個數據流組。這件事要以批次為單位進程,在批次處理完成時,調用**finishBatch**方法。 | `01` | `@Override` | | `02` | `public`?`void`?`finishBatch() {` | | `03` | `for``(String user:userTweets.keySet()){` | | `04` | `Set<String> tweets = getUserTweets(user);` | | `05` | `HashMap<String, Integer> hashtagsCounter =?``new`?`HashMap<String, Integer>();` | | `06` | `for``(String tweet:tweets){` | | `07` | `Set<String> hashtags=getTweetHashtags(tweet);` | | `08` | `if``(hashtags!=``null``){` | | `09` | `for``(String hashtag:hashtags){` | | `10` | `Integer count=hashtagsCounter.get(hashtag);` | | `11` | `if``(count==``null``){count=``0``;}` | | `12` | `count++;` | | `13` | `hashtagsCounter.put(hashtag,count);` | | `14` | `}` | | `15` | `}` | | `16` | `}` | | `17` | `for``(String hashtag:hashtagsCounter.keySet()){` | | `18` | `int`?`count=hashtagsCounter.get(hashtag);` | | `19` | `collector.emit(``new`?`Values(id,user,hashtag,count));` | | `20` | `}` | | `21` | `}` | | `22` | `}` | 這個方法計算每對用戶-話題出現的次數,并為之生成和分發元組。 你可以在GitHub上找到并下載完整代碼。(譯者注:https://github.com/storm-book/examples-ch08-transactional-topologies這個倉庫里沒有代碼,誰知道哪里有代碼麻煩說一聲。) **提交者*bolts*** 我們已經學習了,批次通過協調器和分發器怎樣在拓撲中傳遞。在拓撲中,這些批次中的元組以并行的,沒有特定次序的方式處理。 *協調者bolts*是一類特殊的批處理*bolts*,它們實現了**IComh mitter**或者通過**TransactionalTopologyBuilder**調用**setCommiterBolt**設置了提交者*bolt*。它們與其它的批處理*bolts*最大的不同在于,提交者*bolts*的**finishBatch**方法在提交就緒時執行。這一點發生在之前所有事務都已成功提交之后。另外,**finishBatch**方法是順序執行的。因此如果同時有事務ID1和事務ID2兩個事務同時執行,只有在ID1沒有任何差錯的執行了**finishBatch**方法之后,ID2才會執行該方法。 下面是這個類的實現 | `01` | `public`?`class`?`RedisCommiterCommiterBolt?``extends`?`BaseTransactionalBolt?``implements`?`ICommitter {` | | `02` | `public`?`static`?`final`?`String LAST_COMMITED_TRANSACTION_FIELD =?``"LAST_COMMIT"``;` | | `03` | `TransactionAttempt id;` | | `04` | `BatchOutputCollector collector;` | | `05` | `Jedis jedis;` | | `06` | ? | | `07` | `@Override` | | `08` | `public`?`void`?`prepare(Map conf, TopologyContext context,` | | `09` | `BatchOutputCollector collector, TransactionAttempt id) {` | | `10` | `this``.id = id;` | | `11` | `this``.collector = collector;` | | `12` | `this``.jedis =?``new`?`Jedis(``"localhost"``);` | | `13` | `}` | | `14` | ? | | `15` | `HashMap<String, Long> hashtags =?``new`?`HashMap<String,Long>();` | | `16` | `HashMap<String, Long> users =?``new`?`HashMap<String, Long>();` | | `17` | `HashMap<String, Long> usersHashtags =?``new`?`HashMap<String, Long>();` | | `18` | ? | | `19` | `private`?`void`?`count(HashMap<String, Long> map, String key,?``int`?`count) {` | | `20` | `Long value = map.get(key);` | | `21` | `if``(value ==?``null``){value = (``long``)``0``;}` | | `22` | `value += count;` | | `23` | `map.put(key,value);` | | `24` | `}` | | `25` | ? | | `26` | `@Override` | | `27` | `public`?`void`?`execute(Tuple tuple) {` | | `28` | `String origin = tuple. getSourceComponent();` | | `29` | `if``(``"sers-splitter"``.equals(origin)) {` | | `30` | `String user = tuple.getStringByField(``"user"``);` | | `31` | `count(users, user,?``1``);` | | `32` | `}?``else`?`if``(``"hashtag-splitter"``.equals(origin)) {` | | `33` | `String hashtag = tuple.getStringByField(``"hashtag"``);` | | `34` | `count(hashtags, hashtag,?``1``);` | | `35` | `}?``else`?`if``(``"user-hashtag-merger"``.quals(origin)) {` | | `36` | `String hashtag = tuple.getStringByField(``"hashtag"``);` | | `37` | `String user = tuple.getStringByField(``"user"``);` | | `38` | `String key = user +?``":"`?`+ hashtag;` | | `39` | `Integer count = tuple.getIntegerByField(``"count"``);` | | `40` | `count(usersHashtags, key, count);` | | `41` | `}` | | `42` | `}` | | `43` | ? | | `44` | `@Override` | | `45` | `public`?`void`?`finishBatch() {` | | `46` | `String lastCommitedTransaction = jedis.get(LAST_COMMITED_TRANSACTION_FIELD);` | | `47` | `String currentTransaction =?``""``+id.getTransactionId();` | | `48` | ? | | `49` | `if``(currentTransaction.equals(lastCommitedTransaction)) {``return``;}` | | `50` | ? | | `51` | `Transaction multi = jedis.multi();` | | `52` | ? | | `53` | `multi.set(LAST_COMMITED_TRANSACTION_FIELD, currentTransaction);` | | `54` | ? | | `55` | `Set<String> keys = hashtags.keySet();` | | `56` | `for`?`(String hashtag : keys) {` | | `57` | `Long count = hashtags.get(hashtag);` | | `58` | `multi.hincrBy(``"hashtags"``, hashtag, count);` | | `59` | `}` | | `60` | ? | | `61` | `keys = users.keySet();` | | `62` | `for`?`(String user : keys) {` | | `63` | `Long count =users.get(user);` | | `64` | `multi.hincrBy(``"users"``,user,count);` | | `65` | `}` | | `66` | ? | | `67` | `keys = usersHashtags.keySet();` | | `68` | `for`?`(String key : keys) {` | | `69` | `Long count = usersHashtags.get(key);` | | `70` | `multi.hincrBy(``"users_hashtags"``, key, count);` | | `71` | `}` | | `72` | ? | | `73` | `multi.exec();` | | `74` | `}` | | `75` | ? | | `76` | `@Override` | | `77` | `public`?`void`?`declareOutputFields(OutputFieldsDeclarer declarer) {}` | | `78` | `}` | 這個實現很簡單,但是在**finishBatch**有一個細節。 | `1` | `...` | | `2` | `multi.set(LAST_COMMITED_TRANSACTION_FIELD, currentTransaction);` | | `3` | `...` | 在這里向數據庫保存提交的最后一個事務ID。為什么要這樣做?記住,如果事務失敗了,Storm將會盡可能多的重復必要的次數。如果你不確定已經處理了這個事務,你就會多算,事務拓撲也就沒有用了。所以請記住:保存最后提交的事務ID,并在提交前檢查。 **分區的事務*Spouts ***對一個*spout*來說,從一個分區集合中讀取批次是很普通的。接著這個例子,你可能有很多redis數據庫,而tweets可能會分別保存在這些redis數據庫里。通過實現**IPartitionedTransactionalSpout**,Storm提供了一些工具用來管理每個分區的狀態并保證重播的能力。 下面我們修改**TweetsTransactionalSpout**,使它可以處理數據分區。 首先,繼承**BasePartitionedTransactionalSpout**,它實現了**IPartitionedTransactionalSpout**。 | `1` | `public`?`class`?`TweetsPartitionedTransactionalSpout?``extends` | | `2` | `BasePartitionedTransactionalSpout<TransactionMetadata> {` | | `3` | `...` | | `4` | `}` | 然后告訴Storm誰是你的協調器。 | `01` | `public`?`static`?`class`?`TweetsPartitionedTransactionalCoordinator?``implements`?`Coordinator {` | | `02` | `@Override` | | `03` | `public`?`int`?`numPartitions() {` | | `04` | `return`?`4``;` | | `05` | `}` | | `06` | ? | | `07` | `@Override` | | `08` | `public`?`boolean`?`isReady() {` | | `09` | `return`?`true``;` | | `10` | `}` | | `11` | ? | | `12` | `@Override` | | `13` | `public`?`void`?`close() {}` | | `14` | `}` | 在這個例子里,協調器很簡單。numPartitions方法,告訴Storm一共有多少分區。而且你要注意,不要返回任何元數據。對于**IPartitionedTransactionalSpout**,元數據由分發器直接管理。 下面是分發器的實現: | `01` | `public`?`static`?`class`?`TweetsPartitionedTransactionalEmitter` | | `02` | `implements`?`Emitter<TransactionMetadata> {` | | `03` | `PartitionedRQ rq =?``new`?`ParttionedRQ();` | | `04` | ? | | `05` | `@Override` | | `06` | `public`?`TransactionMetadata emitPartitionBatchNew(TransactionAttempt tx,` | | `07` | `BatchOutputCollector collector,?``int`?`partition,` | | `08` | `TransactionMetadata lastPartitioonMeta) {` | | `09` | `long`?`nextRead;` | | `10` | ? | | `11` | `if``(lastPartitionMeta ==?``null``) {` | | `12` | `nextRead = rq.getNextRead(partition);` | | `13` | `}``else``{` | | `14` | `nextRead = lastPartitionMeta.from + lastPartitionMeta.quantity;` | | `15` | `rq.setNextRead(partition, nextRead);?``//移動游標` | | `16` | `}` | | `17` | ? | | `18` | `long`?`quantity = rq.getAvailableToRead(partition, nextRead);` | | `19` | `quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity;` | | `20` | `TransactionMetadata metadata =?``new`?`TransactionMetadata(nextRead, (``int``)quantity);` | | `21` | ? | | `22` | `emitPartitionBatch(tx, collector, partition, metadata);` | | `23` | `return`?`metadata;` | | `24` | `}` | | `25` | ? | | `26` | `@Override` | | `27` | `public`?`void`?`emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector,` | | `28` | `int`?`partition, TransactionMetadata partitionMeta) {` | | `29` | `if``(partitionMeta.quantity <=?``0``){` | | `30` | `return``;` | | `31` | `}` | | `32` | ? | | `33` | `List<String> messages = rq.getMessages(partition, partitionMeta.from,` | | `34` | `partitionMeta.quantity);` | | `35` | ? | | `36` | `long`?`tweetId = partitionMeta.from;` | | `37` | `for`?`(String msg : messages) {` | | `38` | `collector.emit(``new`?`Values(tx,?``""``+tweetId, msg));` | | `39` | `tweetId++;` | | `40` | `}` | | `41` | `}` | | `42` | ? | | `43` | `@Override` | | `44` | `public`?`void`?`close() {}` | | `45` | `}` | 這里有兩個重要的方法,**emitPartitionBatchNew**,和**emitPartitionBatch**。對于**emitPartitionBatchNew**,從Storm接收分區參數,該參數決定應該從哪個分區讀取批次。在這個方法中,決定獲取哪些tweets,生成相應的元數據對象,調用**emitPartitionBatch**,返回元數據對象,并且元數據對象會在方法返回時立即保存到zookeeper。 Storm會為每一個分區發送相同的事務ID,表示一個事務貫穿了所有數據分區。通過**emitPartitionBatch**讀取分區中的tweets,并向拓撲分發批次。如果批次處理失敗了,Storm將會調用**emitPartitionBatch**利用保存下來的元數據重復這個批次。 **NOTE:**?完整的源碼請見:[https://github.com/storm-book/examples-ch08-transactional-topologies](https://github.com/storm-book/examples-ch08-transactional-topologies)(譯者注:原文如此,實際上這個倉庫里什么也沒有) **模糊的事務性拓撲** 到目前為止,你可能已經學會了如何讓擁有相同事務ID的批次在出錯時重播。但是在有些場景下這樣做可能就不太合適了。然后會發生什么呢? 事實證明,你仍然可以實現在語義上精確的事務,不過這需要更多的開發工作,你要記錄由Storm重復的事務之前的狀態。既然能在不同時刻為相同的事務ID得到不同的元組,你就需要把事務重置到之前的狀態,并從那里繼續。 比如說,如果你為收到的所有tweets計數,你已數到5,而最后的事務ID是321,這時你多數了8個。你要維護以下三個值——previousCount=5,currentCount=13,以及lastTransactionId=321。假設事物ID321又發分了一次,而你又得到了4個元組,而不是之前的8個,提交器會探測到這是相同的事務ID,它將會把結果重置到**previousCount**的值5,并在此基礎上加4,然后更新**currentCount**為9。 另外,在之前的一個事務被取消時,每個并行處理的事務都要被取消。這是為了確保你沒有丟失任何數據。 你的*spout*可以實現**IOpaquePartitionedTransactionalSpout**,而且正如你看到的,協調器和分發器也很簡單。 | `01` | `public`?`static`?`class`?`TweetsOpaquePartitionedTransactionalSpoutCoordinator?``implements``IOpaquePartitionedTransactionalSpout.Coordinator {` | | `02` | `@Override` | | `03` | `public`?`boolean`?`isReady() {` | | `04` | `return`?`true``;` | | `05` | `}` | | `06` | `}` | | `07` | ? | | `08` | `public`?`static`?`class`?`TweetsOpaquePartitionedTransactionalSpoutEmitter` | | `09` | `implements`?`IOpaquePartitionedTransactionalSpout.Emitter<TransactionMetadata> {` | | `10` | `PartitionedRQ rq? =?``new`?`PartitionedRQ();` | | `11` | ? | | `12` | `@Override` | | `13` | `public`?`TransactionMetadata emitPartitionBatch(TransactionAttempt tx,` | | `14` | `BatchOutputCollector collector,?``int`?`partion,` | | `15` | `TransactionMetadata lastPartitonMeta) {` | | `16` | `long`?`nextRead;` | | `17` | ? | | `18` | `if``(lastPartitionMeta ==?``null``) {` | | `19` | `nextRead = rq.getNextRead(partition);` | | `20` | `}``else``{` | | `21` | `nextRead = lastPartitionMeta.from + lastPartitionMeta.quantity;` | | `22` | `rq.setNextRead(partition, nextRead);``//移動游標` | | `23` | `}` | | `24` | ? | | `25` | `long`?`quantity = rq.getAvailabletoRead(partition, nextRead);` | | `26` | `quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity;` | | `27` | `TransactionMetadata metadata =?``new`?`TransactionMetadata(nextRead, (``int``)quantity);` | | `28` | `emitMessages(tx, collector, partition, metadata);` | | `29` | `return`?`metadata;` | | `30` | `}` | | `31` | ? | | `32` | `private`?`void`?`emitMessage(TransactionAttempt tx, BatchOutputCollector collector,` | | `33` | `int`?`partition, TransactionMetadata partitionMeta) {` | | `34` | `if``(partitionMeta.quantity <=?``0``){``return``;}` | | `35` | ? | | `36` | `List<String> messages = rq.getMessages(partition, partitionMeta.from, partitionMeta.quantity);` | | `37` | `long`?`tweetId = partitionMeta.from;` | | `38` | `for``(String msg : messages) {` | | `39` | `collector.emit(``new`?`Values(tx,?``""``+tweetId, msg));` | | `40` | `tweetId++;` | | `41` | `}` | | `42` | `}` | | `43` | ? | | `44` | `@Override` | | `45` | `public`?`int`?`numPartitions() {` | | `46` | `return`?`4``;` | | `47` | `}` | | `48` | ? | | `49` | `@Override` | | `50` | `public`?`void`?`close() {}` | | `51` | `}` | 最有趣的方法是**emit**本文翻譯自《[Getting Started With Storm](http://ifeve.com/wp-content/uploads/2014/03/Getting-Started-With-Storm-Jonathan-Leibiusky-Gabriel-E_1276.pdf)》譯者:吳京潤 ? ?編輯:郭蕾 方騰飛 正如書中之前所提到的,使用Storm編程,可以通過調用ack和fail方法來確保一條消息的處理成功或失敗。不過當元組被重發時,會發生什么呢?你又該如何砍不會重復計算?[![](https://box.kancloud.cn/2015-09-21_55ffef13584f9.jpg)](http://ifeve.com/getting-started-with-storm-1/storm/) *Storm0.7.0*實現了一個新特性——事務性拓撲,這一特性使消息在語義上確保你可以安全的方式重發消息,并保證它們只會被處理一次。在不支持事務性拓撲的情況下,你無法在準確性,可擴展性,以空錯性上得到保證的前提下完成計算。 **NOTE:**事務性拓撲是一個構建于標準Storm?*spout*和*bolt*之上的抽象概念。 **設計** 在事務性拓撲中,Storm以并行和順序處理混合的方式處理元組。*spout*并行分批創建供*bolt*處理的元組(譯者注:下文將這種分批創建、分批處理的元組稱做批次)。其中一些*bolt*作為提交者以嚴格有序的方式提交處理過的批次。這意味著如果你有每批五個元組的兩個批次,將有兩個元組被*bolt*并行處理,但是直到提交者成功提交了第一個元組之后,才會提交第二個元組。?**NOTE:**?使用事務性拓撲時,數據源要能夠重發批次,有時候甚至要重復多次。因此確認你的數據源——你連接到的那個*spout*——具備這個能力。 這個過程可以被描述為兩個階段:?*處理階段*?純并行階段,許多批次同時處理。?*提交階段*?嚴格有序階段,直到批次一成功提交之后,才會提交批次二。 這兩個階段合起來稱為一個Storm事務。**NOTE:**?Storm使用zookeeper儲存事務元數據,默認情況下就是拓撲使用的那個zookeeper。你可以修改以下兩個配置參數鍵指定其它的zookeeper——transactional.zookeeper.servers和transactional.zookeeper.port。 **事務實踐** 下面我們要創建一個Twitter分析工具來了解事務的工作方式。我們從一個Redis數據庫讀取tweets,通過幾個*bolt*處理它們,最后把結果保存在另一個Redis數據庫的列表中。處理結果就是所有話題和它們的在tweets中出現的次數列表,所有用戶和他們在tweets中出現的次數列表,還有一個包含發起話題和頻率的用戶列表。 這個工具的拓撲見圖8-1。[![拓撲概覽](https://box.kancloud.cn/2015-09-21_55ffef14a3838.png)](http://ifeve.com/getting-started-of-storm8-2/figure8-1/)? 圖8-1 拓撲概覽 正如你看到的,**TweetsTransactionalSpout**會連接你的tweet數據庫并向拓撲分發批次。**UserSplitterBolt**和**HashTagSplitterBolt**兩個*bolt*,從*spout*接收元組。**UserSplitterBolt**解析tweets并查找用戶——以@開頭的單詞——然后把這些單詞分發到名為*users*的自定義數據流組。**HashtagSplitterBolt**從tweet查找**#**開頭的單詞,并把它們分發到名為*hashtags*的自定義數據流組。第三個*bolt*,**UserHashtagJoinBolt**,接收前面提到的兩個數據流組,并計算具名用戶的一條tweet內的話題數量。為了計數并分發計算結果,這是個**BaseBatchBolt**(稍后有更多介紹)。 最后一個bolt——**RedisCommitterBolt**——接收以上三個*bolt*的數據流組。它為每樣東西計數,并在對一個批次完成處理時,把所有結果保存到redis。這是一種特殊的*bolt*,叫做提交者,在本章后面做更多講解。 用**TransactionalTopologyBuilder**構建拓撲,代碼如下: | `01` | `TransactionalTopologyBuilder builder=` | | `02` | `new`?`TransactionalTopologyBuilder(``"test"``,?``"spout"``,?``new`?`TweetsTransactionalSpout());` | | `03` | ? | | `04` | `builder.setBolt(``"users-splitter"``,?``new`?`UserSplitterBolt(),?``4``).shuffleGrouping(``"spout"``);` | | `05` | `buildeer.setBolt(``"hashtag-splitter"``,?``new`?`HashtagSplitterBolt(),?``4``).shuffleGrouping(``"spout"``);` | | `06` | ? | | `07` | `builder.setBolt(``"users-hashtag-manager"``,?``new`?`UserHashtagJoinBolt(), r)` | | `08` | `.fieldsGrouping(``"users-splitter"``,?``"users"``,?``new`?`Fields(``"tweet_id"``))` | | `09` | `.fieldsGrouping(``"hashtag-splitter"``,?``"hashtags"``,?``new`?`Fields(``"tweet_id"``));` | | `10` | ? | | `11` | `builder.setBolt(``"redis-commiter"``,?``new`?`RedisCommiterBolt())` | | `12` | `.globalGrouping(``"users-splitter"``,?``"users"``)` | | `13` | `.globalGrouping(``"hashtag-splitter"``,?``"hashtags"``)` | | `14` | `.globalGrouping(``"user-hashtag-merger"``);` | 接下來就看看如何在一個事務性拓撲中實現*spout*。 ***Spout*** 一個事務性拓撲的*spout*與標準*spout*完全不同。 | `1` | `public`?`class`?`TweetsTransactionalSpout?``extends`?`BaseTransactionalSpout<TransactionMetadata>{` | 正如你在這個類定義中看到的,TweetsTransactionalSpout繼承了帶范型的**BaseTransactionalSpout**。指定的范型類型的對象是事務元數據集合。它將在后面的代碼中用于從數據源分發批次。 在這個例子中,**TransactionMetadata**定義如下: | `01` | `public`?`class`?`TransactionMetadata?``implements`?`Serializable {` | | `02` | `private`?`static`?`final`?`long`?`serialVersionUID = 1L;` | | `03` | `long`?`from;` | | `04` | `int`?`quantity;` | | `05` | ? | | `06` | `public`?`TransactionMetadata(``long`?`from,?``int`?`quantity) {` | | `07` | `this``.from = from;` | | `08` | `this``.quantity = quantity;` | | `09` | `}` | | `10` | `}` | 該類的對象維護著兩個屬性**from**和**quantity**,它們用來生成批次。 *spout*的最后需要實現下面的三個方法: | `01` | `@Override` | | `02` | `public`?`ITransactionalSpout.Coordinator<TransactionMetadata> getCoordinator(` | | `03` | `Map conf, TopologyContext context) {` | | `04` | `return`?`new`?`TweetsTransactionalSpoutCoordinator();` | | `05` | `}` | | `06` | ? | | `07` | `@Override` | | `08` | `public`?`backtype.storm.transactional.ITransactionalSpout.Emitter<TransactionMetadata> getEmitter(Map conf, TopologyContext contest) {` | | `09` | `return`?`new`?`TweetsTransactionalSpoutEmitter();` | | `10` | `}` | | `11` | ? | | `12` | `@Override` | | `13` | `public`?`void`?`declareOutputFields(OuputFieldsDeclarer declarer) {` | | `14` | `declarer.declare(``new`?`Fields(``"txid"``,?``"tweet_id"``,?``"tweet"``));` | | `15` | `}` | **getCoordinator**方法,告訴Storm用來協調生成批次的類。**getEmitter**,負責讀取批次并把它們分發到拓撲中的數據流組。最后,就像之前做過的,需要聲明要分發的域。 **RQ類 **為了讓例子簡單點,我們決定用一個類封裝所有對Redis的操作。 | `01` | `public`?`class`?`RQ {` | | `02` | `public`?`static`?`final`?`String NEXT_READ =?``"NEXT_READ"``;` | | `03` | `public`?`static`?`final`?`String NEXT_WRITE =?``"NEXT_WRITE"``;` | | `04` | ? | | `05` | `Jedis jedis;` | | `06` | ? | | `07` | `public`?`RQ() {` | | `08` | `jedis =?``new`?`Jedis(``"localhost"``);` | | `09` | `}` | | `10` | ? | | `11` | `public`?`long`?`getavailableToRead(``long`?`current) {` | | `12` | `return`?`getNextWrite() - current;` | | `13` | `}` | | `14` | ? | | `15` | `public`?`long`?`getNextRead() {` | | `16` | `String sNextRead = jedis.get(NEXT_READ);` | | `17` | `if``(sNextRead ==?``null``) {` | | `18` | `return`?`1``;` | | `19` | `}` | | `20` | `return`?`Long.valueOf(sNextRead);` | | `21` | `}` | | `22` | ? | | `23` | `public`?`long`?`getNextWrite() {` | | `24` | `return`?`Long.valueOf(jedis.get(NEXT_WRITE));` | | `25` | `}` | | `26` | ? | | `27` | `public`?`void`?`close() {` | | `28` | `jedis.disconnect();` | | `29` | `}` | | `30` | ? | | `31` | `public`?`void`?`setNextRead(``long`?`nextRead) {` | | `32` | `jedis.set(NEXT_READ,?``""``+nextRead);` | | `33` | `}` | | `34` | ? | | `35` | `public`?`List<String> getMessages(``long`?`from,?``int`?`quantity) {` | | `36` | `String[] keys =?``new`?`String[quantity];` | | `37` | `for`?`(``int`?`i =?``0``; i < quantity; i++) {` | | `38` | `keys[i] =?``""``+(i+from);` | | `39` | `}` | | `40` | `return`?`jedis.mget(keys);` | | `41` | `}` | | `42` | `}` | 仔細閱讀每個方法,確保自己理解了它們的用處。 **協調者Coordinator **下面是本例的協調者實現。 | `01` | `public`?`static`?`class`?`TweetsTransactionalSpoutCoordinator?``implements``ITransactionalSpout.Coordinator<TransactionMetadata> {` | | `02` | `TransactionMetadata lastTransactionMetadata;` | | `03` | `RQ rq =?``new`?`RQ();` | | `04` | `long`?`nextRead =?``0``;` | | `05` | ? | | `06` | `public`?`TweetsTransactionalSpoutCoordinator() {` | | `07` | `nextRead = rq.getNextRead();` | | `08` | `}` | | `09` | ? | | `10` | `@Override` | | `11` | `public`?`TransactionMetadata initializeTransaction(BigInteger txid, TransactionMetadata prevMetadata) {` | | `12` | `long`?`quantity = rq.getAvailableToRead(nextRead);` | | `13` | `quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity;` | | `14` | `TransactionMetadata ret =?``new`?`TransactionMetadata(nextRead, (``int``)quantity);` | | `15` | `nextRead += quantity;` | | `16` | `return`?`ret;` | | `17` | `}` | | `18` | ? | | `19` | `@Override` | | `20` | `public`?`boolean`?`isReady() {` | | `21` | `return`?`rq.getAvailableToRead(nextRead) >?``0``;` | | `22` | `}` | | `23` | ? | | `24` | `@Override` | | `25` | `public`?`void`?`close() {` | | `26` | `rq.close();` | | `27` | `}` | | `28` | `}` | 值得一提的是,*在整個拓撲中只會有一個提交者實例*。創建提交者實例時,它會從redis讀取一個從1開始的序列號,這個序列號標識要讀取的tweet下一條。 第一個方法是**isReady**。在**initializeTransaction**之前調用它確認數據源已就緒并可讀取。此方法應當相應的返回**true**或**false**。在此例中,讀取tweets數量并與已讀數量比較。它們之間的不同就在于可讀tweets數。如果它大于0,就意味著還有tweets未讀。 最后,執行**initializeTransaction**。正如你看到的,它接收**txid**和**prevMetadata**作為參數。第一個參數是Storm生成的事務ID,作為批次的惟一性標識。**prevMetadata**是協調器生成的前一個事務元數據對象。 在這個例子中,首先確認有多少tweets可讀。只要確認了這一點,就創建一個TransactionMetadata對象,標識讀取的第一個tweet(譯者注:對象屬性**from**),以及讀取的tweets數量(譯者注:對象屬性**quantity**)。 元數據對象一經返回,Storm把它跟**txid**一起保存在zookeeper。這樣就確保了一旦發生故障,Storm可以利用分發器(譯者注:**Emitter**,見下文)重新發送批次。 **Emitter** 創建事務性*spout*的最后一步是實現分發器(Emitter)。實現如下: | `01` | `public`?`static`?`class`?`TweetsTransactionalSpoutEmitter``implements`?`ITransactionalSpout.Emitter<TransactionMetadata> {` | | `02` | ? | | `03` | `</pre>` | | `04` | `<pre>????RQ rq =?``new`?`RQ();</pre>` | | `05` | `<pre>????``public`?`TweetsTransactionalSpoutEmitter() {}</pre>` | | `06` | `<pre>????``@Override` | | `07` | `public`?`void`?`emitBatch(TransactionAttempt tx, TransactionMetadata coordinatorMeta, BatchOutputCollector collector) {` | | `08` | `rq.setNextRead(coordinatorMeta.from+coordinatorMeta.quantity);` | | `09` | `List<String> messages = rq.getMessages(coordinatorMeta.from, <span style=``"font-family: Georgia, 'Times New Roman', 'Bitstream Charter', Times, serif; font-size: 13px; line-height: 19px;"``>coordinatorMeta.quantity);` | | `10` | `</span>????????``long`?`tweetId = coordinatorMeta.from;` | | `11` | `for`?`(String message : messages) {` | | `12` | `collector.emit(``new`?`Values(tx,?``""``+tweetId, message));` | | `13` | `tweetId++;` | | `14` | `}` | | `15` | `}` | | `16` | ? | | `17` | `@Override` | | `18` | `public`?`void`?`cleanupBefore(BigInteger txid) {}` | | `19` | ? | | `20` | `@Override` | | `21` | `public`?`void`?`close() {` | | `22` | `rq.close();` | | `23` | `}</pre>` | | `24` | `<pre>` | | `25` | `}` | 分發器從數據源讀取數據并從數據流組發送數據。分發器應當問題能夠為相同的事務id和事務元數據發送相同的批次。這樣,如果在處理批次的過程中發生了故障,Storm就能夠利用分發器重復相同的事務id和事務元數據,并確保批次已經重復過了。Storm會在**TransactionAttempt**對象里為嘗試次數增加計數(譯者注:**attempt id**)。這樣就能知道批次已經重復過了。 在這里**emitBatch**是個重要方法。在這個方法中,使用傳入的元數據對象從redis得到tweets,同時增加redis維持的已讀tweets數。當然它還會把讀到的tweets分發到拓撲。 ***Bolts*** 首先看一下這個拓撲中的標準*bolt*: | `01` | `public`?`class`?`UserSplitterBolt?``implements`?`IBasicBolt{` | | `02` | `private`?`static`?`final`?`long`?`serialVersionUID = 1L;` | | `03` | ? | | `04` | `@Override` | | `05` | `public`?`void`?`declareOutputFields(OutputFieldsDeclarer declarer) {` | | `06` | `declarer.declareStream(``"users"``,?``new`?`Fields(``"txid"``,``"tweet_id"``,``"user"``));` | | `07` | `}` | | `08` | ? | | `09` | `@Override` | | `10` | `public`?`Map<String, Object> getComponentConfiguration() {` | | `11` | `return`?`null``;` | | `12` | `}` | | `13` | ? | | `14` | `@Override` | | `15` | `public`?`void`?`prepare(Map stormConf, TopologyContext context) {}` | | `16` | ? | | `17` | `@Override` | | `18` | `public`?`void`?`execute(Tuple input, BasicOutputCollector collector) {` | | `19` | `String tweet = input.getStringByField(``"tweet"``);` | | `20` | `String tweetId = input.getStringByField(``"tweet_id"``);` | | `21` | `StringTokenizer strTok =?``new`?`StringTokenizer(tweet,?``" "``);` | | `22` | `HashSet<String> users =?``new`?`HashSet<String>();` | | `23` | ? | | `24` | `while``(strTok.hasMoreTokens()) {` | | `25` | `String user = strTok.nextToken();` | | `26` | ? | | `27` | `//確保這是個真實的用戶,并且在這個tweet中沒有重復` | | `28` | `if``(user.startsWith(``"@"``) && !users.contains(user)) {` | | `29` | `collector.emit(``"users"``,?``new`?`Values(tx, tweetId, user));` | | `30` | `users.add(user);` | | `31` | `}` | | `32` | `}` | | `33` | `}` | | `34` | ? | | `35` | `@Override` | | `36` | `public`?`void`?`cleanup(){}` | | `37` | `}` | 正如本章前面提到的,**UserSplitterBolt**接收元組,解析tweet文本,分發@開頭的單詞————tweeter用戶。**HashtagSplitterBolt**的實現也非常相似。 | `01` | `public`?`class`?`HashtagSplitterBolt?``implements`?`IBasicBolt{` | | `02` | `private`?`static`?`final`?`long`?`serialVersionUID = 1L;` | | `03` | ? | | `04` | `@Override` | | `05` | `public`?`void`?`declareOutputFields(OutputFieldsDeclarer declarer) {` | | `06` | `declarer.declareStream(``"hashtags"``,?``new`?`Fields(``"txid"``,``"tweet_id"``,``"hashtag"``));` | | `07` | `}` | | `08` | ? | | `09` | `@Override` | | `10` | `public`?`Map<String, Object> getComponentConfiguration() {` | | `11` | `return`?`null``;` | | `12` | `}` | | `13` | ? | | `14` | `@Override` | | `15` | `public`?`void`?`prepare(Map stormConf, TopologyContext context) {}` | | `16` | ? | | `17` | `@Oerride` | | `18` | `public`?`void`?`execute(Tuple input, BasicOutputCollector collector) {` | | `19` | `String tweet = input.getStringByField(``"tweet"``);` | | `20` | `String tweetId = input.getStringByField(``"tweet_id"``);` | | `21` | `StringTokenizer strTok =?``new`?`StringTokenizer(tweet,?``" "``);` | | `22` | `TransactionAttempt tx = (TransactionAttempt)input.getValueByField(``"txid"``);` | | `23` | `HashSet<String> words =?``new`?`HashSet<String>();` | | `24` | ? | | `25` | `while``(strTok.hasMoreTokens()) {` | | `26` | `String word = strTok.nextToken();` | | `27` | ? | | `28` | `if``(word.startsWith(``"#"``) && !words.contains(word)){` | | `29` | `collector.emit(``"hashtags"``,?``new`?`Values(tx, tweetId, word));` | | `30` | `words.add(word);` | | `31` | `}` | | `32` | `}` | | `33` | `}` | | `34` | ? | | `35` | `@Override` | | `36` | `public`?`void`?`cleanup(){}` | | `37` | `}` | 現在看看**UserHashTagJoinBolt**的實現。首先要注意的是它是一個**BaseBatchBolt**。這意味著,**execute**方法會操作接收到的元組,但是不會分發新的元組。批次完成時,Storm會調用**finishBatch**方法。 | `01` | `public`?`void`?`execute(Tuple tuple) {` | | `02` | `String source = tuple.getSourceStreamId();` | | `03` | `String tweetId = tuple.getStringByField(``"tweet_id"``);` | | `04` | ? | | `05` | `if``(``"hashtags"``.equals(source)) {` | | `06` | `String hashtag = tuple.getStringByField(``"hashtag"``);` | | `07` | `add(tweetHashtags, tweetId, hashtag);` | | `08` | `}?``else`?`if``(``"users"``.equals(source)) {` | | `09` | `String user = tuple.getStringByField(``"user"``);` | | `10` | `add(userTweets, user, tweetId);` | | `11` | `}` | | `12` | `}` | 既然要結合tweet中提到的用戶為出現的所有話題計數,就需要加入前面的*bolts*創建的兩個數據流組。這件事要以批次為單位進程,在批次處理完成時,調用**finishBatch**方法。 | `01` | `@Override` | | `02` | `public`?`void`?`finishBatch() {` | | `03` | `for``(String user:userTweets.keySet()){` | | `04` | `Set<String> tweets = getUserTweets(user);` | | `05` | `HashMap<String, Integer> hashtagsCounter =?``new`?`HashMap<String, Integer>();` | | `06` | `for``(String tweet:tweets){` | | `07` | `Set<String> hashtags=getTweetHashtags(tweet);` | | `08` | `if``(hashtags!=``null``){` | | `09` | `for``(String hashtag:hashtags){` | | `10` | `Integer count=hashtagsCounter.get(hashtag);` | | `11` | `if``(count==``null``){count=``0``;}` | | `12` | `count++;` | | `13` | `hashtagsCounter.put(hashtag,count);` | | `14` | `}` | | `15` | `}` | | `16` | `}` | | `17` | `for``(String hashtag:hashtagsCounter.keySet()){` | | `18` | `int`?`count=hashtagsCounter.get(hashtag);` | | `19` | `collector.emit(``new`?`Values(id,user,hashtag,count));` | | `20` | `}` | | `21` | `}` | | `22` | `}` | 這個方法計算每對用戶-話題出現的次數,并為之生成和分發元組。 你可以在GitHub上找到并下載完整代碼。(譯者注:https://github.com/storm-book/examples-ch08-transactional-topologies這個倉庫里沒有代碼,誰知道哪里有代碼麻煩說一聲。) **提交者*bolts*** 我們已經學習了,批次通過協調器和分發器怎樣在拓撲中傳遞。在拓撲中,這些批次中的元組以并行的,沒有特定次序的方式處理。 *協調者bolts*是一類特殊的批處理*bolts*,它們實現了**IComh mitter**或者通過**TransactionalTopologyBuilder**調用**setCommiterBolt**設置了提交者*bolt*。它們與其它的批處理*bolts*最大的不同在于,提交者*bolts*的**finishBatch**方法在提交就緒時執行。這一點發生在之前所有事務都已成功提交之后。另外,**finishBatch**方法是順序執行的。因此如果同時有事務ID1和事務ID2兩個事務同時執行,只有在ID1沒有任何差錯的執行了**finishBatch**方法之后,ID2才會執行該方法。 下面是這個類的實現 | `01` | `public`?`class`?`RedisCommiterCommiterBolt?``extends`?`BaseTransactionalBolt?``implements`?`ICommitter {` | | `02` | `public`?`static`?`final`?`String LAST_COMMITED_TRANSACTION_FIELD =?``"LAST_COMMIT"``;` | | `03` | `TransactionAttempt id;` | | `04` | `BatchOutputCollector collector;` | | `05` | `Jedis jedis;` | | `06` | ? | | `07` | `@Override` | | `08` | `public`?`void`?`prepare(Map conf, TopologyContext context,` | | `09` | `BatchOutputCollector collector, TransactionAttempt id) {` | | `10` | `this``.id = id;` | | `11` | `this``.collector = collector;` | | `12` | `this``.jedis =?``new`?`Jedis(``"localhost"``);` | | `13` | `}` | | `14` | ? | | `15` | `HashMap<String, Long> hashtags =?``new`?`HashMap<String,Long>();` | | `16` | `HashMap<String, Long> users =?``new`?`HashMap<String, Long>();` | | `17` | `HashMap<String, Long> usersHashtags =?``new`?`HashMap<String, Long>();` | | `18` | ? | | `19` | `private`?`void`?`count(HashMap<String, Long> map, String key,?``int`?`count) {` | | `20` | `Long value = map.get(key);` | | `21` | `if``(value ==?``null``){value = (``long``)``0``;}` | | `22` | `value += count;` | | `23` | `map.put(key,value);` | | `24` | `}` | | `25` | ? | | `26` | `@Override` | | `27` | `public`?`void`?`execute(Tuple tuple) {` | | `28` | `String origin = tuple. getSourceComponent();` | | `29` | `if``(``"sers-splitter"``.equals(origin)) {` | | `30` | `String user = tuple.getStringByField(``"user"``);` | | `31` | `count(users, user,?``1``);` | | `32` | `}?``else`?`if``(``"hashtag-splitter"``.equals(origin)) {` | | `33` | `String hashtag = tuple.getStringByField(``"hashtag"``);` | | `34` | `count(hashtags, hashtag,?``1``);` | | `35` | `}?``else`?`if``(``"user-hashtag-merger"``.quals(origin)) {` | | `36` | `String hashtag = tuple.getStringByField(``"hashtag"``);` | | `37` | `String user = tuple.getStringByField(``"user"``);` | | `38` | `String key = user +?``":"`?`+ hashtag;` | | `39` | `Integer count = tuple.getIntegerByField(``"count"``);` | | `40` | `count(usersHashtags, key, count);` | | `41` | `}` | | `42` | `}` | | `43` | ? | | `44` | `@Override` | | `45` | `public`?`void`?`finishBatch() {` | | `46` | `String lastCommitedTransaction = jedis.get(LAST_COMMITED_TRANSACTION_FIELD);` | | `47` | `String currentTransaction =?``""``+id.getTransactionId();` | | `48` | ? | | `49` | `if``(currentTransaction.equals(lastCommitedTransaction)) {``return``;}` | | `50` | ? | | `51` | `Transaction multi = jedis.multi();` | | `52` | ? | | `53` | `multi.set(LAST_COMMITED_TRANSACTION_FIELD, currentTransaction);` | | `54` | ? | | `55` | `Set<String> keys = hashtags.keySet();` | | `56` | `for`?`(String hashtag : keys) {` | | `57` | `Long count = hashtags.get(hashtag);` | | `58` | `multi.hincrBy(``"hashtags"``, hashtag, count);` | | `59` | `}` | | `60` | ? | | `61` | `keys = users.keySet();` | | `62` | `for`?`(String user : keys) {` | | `63` | `Long count =users.get(user);` | | `64` | `multi.hincrBy(``"users"``,user,count);` | | `65` | `}` | | `66` | ? | | `67` | `keys = usersHashtags.keySet();` | | `68` | `for`?`(String key : keys) {` | | `69` | `Long count = usersHashtags.get(key);` | | `70` | `multi.hincrBy(``"users_hashtags"``, key, count);` | | `71` | `}` | | `72` | ? | | `73` | `multi.exec();` | | `74` | `}` | | `75` | ? | | `76` | `@Override` | | `77` | `public`?`void`?`declareOutputFields(OutputFieldsDeclarer declarer) {}` | | `78` | `}` | 這個實現很簡單,但是在**finishBatch**有一個細節。 | `1` | `...` | | `2` | `multi.set(LAST_COMMITED_TRANSACTION_FIELD, currentTransaction);` | | `3` | `...` | 在這里向數據庫保存提交的最后一個事務ID。為什么要這樣做?記住,如果事務失敗了,Storm將會盡可能多的重復必要的次數。如果你不確定已經處理了這個事務,你就會多算,事務拓撲也就沒有用了。所以請記住:保存最后提交的事務ID,并在提交前檢查。 **分區的事務*Spouts ***對一個*spout*來說,從一個分區集合中讀取批次是很普通的。接著這個例子,你可能有很多redis數據庫,而tweets可能會分別保存在這些redis數據庫里。通過實現**IPartitionedTransactionalSpout**,Storm提供了一些工具用來管理每個分區的狀態并保證重播的能力。 下面我們修改**TweetsTransactionalSpout**,使它可以處理數據分區。 首先,繼承**BasePartitionedTransactionalSpout**,它實現了**IPartitionedTransactionalSpout**。 | `1` | `public`?`class`?`TweetsPartitionedTransactionalSpout?``extends` | | `2` | `BasePartitionedTransactionalSpout<TransactionMetadata> {` | | `3` | `...` | | `4` | `}` | 然后告訴Storm誰是你的協調器。 | `01` | `public`?`static`?`class`?`TweetsPartitionedTransactionalCoordinator?``implements`?`Coordinator {` | | `02` | `@Override` | | `03` | `public`?`int`?`numPartitions() {` | | `04` | `return`?`4``;` | | `05` | `}` | | `06` | ? | | `07` | `@Override` | | `08` | `public`?`boolean`?`isReady() {` | | `09` | `return`?`true``;` | | `10` | `}` | | `11` | ? | | `12` | `@Override` | | `13` | `public`?`void`?`close() {}` | | `14` | `}` | 在這個例子里,協調器很簡單。numPartitions方法,告訴Storm一共有多少分區。而且你要注意,不要返回任何元數據。對于**IPartitionedTransactionalSpout**,元數據由分發器直接管理。 下面是分發器的實現: | `01` | `public`?`static`?`class`?`TweetsPartitionedTransactionalEmitter` | | `02` | `implements`?`Emitter<TransactionMetadata> {` | | `03` | `PartitionedRQ rq =?``new`?`ParttionedRQ();` | | `04` | ? | | `05` | `@Override` | | `06` | `public`?`TransactionMetadata emitPartitionBatchNew(TransactionAttempt tx,` | | `07` | `BatchOutputCollector collector,?``int`?`partition,` | | `08` | `TransactionMetadata lastPartitioonMeta) {` | | `09` | `long`?`nextRead;` | | `10` | ? | | `11` | `if``(lastPartitionMeta ==?``null``) {` | | `12` | `nextRead = rq.getNextRead(partition);` | | `13` | `}``else``{` | | `14` | `nextRead = lastPartitionMeta.from + lastPartitionMeta.quantity;` | | `15` | `rq.setNextRead(partition, nextRead);?``//移動游標` | | `16` | `}` | | `17` | ? | | `18` | `long`?`quantity = rq.getAvailableToRead(partition, nextRead);` | | `19` | `quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity;` | | `20` | `TransactionMetadata metadata =?``new`?`TransactionMetadata(nextRead, (``int``)quantity);` | | `21` | ? | | `22` | `emitPartitionBatch(tx, collector, partition, metadata);` | | `23` | `return`?`metadata;` | | `24` | `}` | | `25` | ? | | `26` | `@Override` | | `27` | `public`?`void`?`emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector,` | | `28` | `int`?`partition, TransactionMetadata partitionMeta) {` | | `29` | `if``(partitionMeta.quantity <=?``0``){` | | `30` | `return``;` | | `31` | `}` | | `32` | ? | | `33` | `List<String> messages = rq.getMessages(partition, partitionMeta.from,` | | `34` | `partitionMeta.quantity);` | | `35` | ? | | `36` | `long`?`tweetId = partitionMeta.from;` | | `37` | `for`?`(String msg : messages) {` | | `38` | `collector.emit(``new`?`Values(tx,?``""``+tweetId, msg));` | | `39` | `tweetId++;` | | `40` | `}` | | `41` | `}` | | `42` | ? | | `43` | `@Override` | | `44` | `public`?`void`?`close() {}` | | `45` | `}` | 這里有兩個重要的方法,**emitPartitionBatchNew**,和**emitPartitionBatch**。對于**emitPartitionBatchNew**,從Storm接收分區參數,該參數決定應該從哪個分區讀取批次。在這個方法中,決定獲取哪些tweets,生成相應的元數據對象,調用**emitPartitionBatch**,返回元數據對象,并且元數據對象會在方法返回時立即保存到zookeeper。 Storm會為每一個分區發送相同的事務ID,表示一個事務貫穿了所有數據分區。通過**emitPartitionBatch**讀取分區中的tweets,并向拓撲分發批次。如果批次處理失敗了,Storm將會調用**emitPartitionBatch**利用保存下來的元數據重復這個批次。 **NOTE:**?完整的源碼請見:[https://github.com/storm-book/examples-ch08-transactional-topologies](https://github.com/storm-book/examples-ch08-transactional-topologies)(譯者注:原文如此,實際上這個倉庫里什么也沒有) **模糊的事務性拓撲** 到目前為止,你可能已經學會了如何讓擁有相同事務ID的批次在出錯時重播。但是在有些場景下這樣做可能就不太合適了。然后會發生什么呢? 事實證明,你仍然可以實現在語義上精確的事務,不過這需要更多的開發工作,你要記錄由Storm重復的事務之前的狀態。既然能在不同時刻為相同的事務ID得到不同的元組,你就需要把事務重置到之前的狀態,并從那里繼續。 比如說,如果你為收到的所有tweets計數,你已數到5,而最后的事務ID是321,這時你多數了8個。你要維護以下三個值——previousCount=5,currentCount=13,以及lastTransactionId=321。假設事物ID321又發分了一次,而你又得到了4個元組,而不是之前的8個,提交器會探測到這是相同的事務ID,它將會把結果重置到**previousCount**的值5,并在此基礎上加4,然后更新**currentCount**為9。 另外,在之前的一個事務被取消時,每個并行處理的事務都要被取消。這是為了確保你沒有丟失任何數據。 你的*spout*可以實現**IOpaquePartitionedTransactionalSpout**,而且正如你看到的,協調器和分發器也很簡單。 | `01` | `public`?`static`?`class`?`TweetsOpaquePartitionedTransactionalSpoutCoordinator?``implements``IOpaquePartitionedTransactionalSpout.Coordinator {` | | `02` | `@Override` | | `03` | `public`?`boolean`?`isReady() {` | | `04` | `return`?`true``;` | | `05` | `}` | | `06` | `}` | | `07` | ? | | `08` | `public`?`static`?`class`?`TweetsOpaquePartitionedTransactionalSpoutEmitter` | | `09` | `implements`?`IOpaquePartitionedTransactionalSpout.Emitter<TransactionMetadata> {` | | `10` | `PartitionedRQ rq? =?``new`?`PartitionedRQ();` | | `11` | ? | | `12` | `@Override` | | `13` | `public`?`TransactionMetadata emitPartitionBatch(TransactionAttempt tx,` | | `14` | `BatchOutputCollector collector,?``int`?`partion,` | | `15` | `TransactionMetadata lastPartitonMeta) {` | | `16` | `long`?`nextRead;` | | `17` | ? | | `18` | `if``(lastPartitionMeta ==?``null``) {` | | `19` | `nextRead = rq.getNextRead(partition);` | | `20` | `}``else``{` | | `21` | `nextRead = lastPartitionMeta.from + lastPartitionMeta.quantity;` | | `22` | `rq.setNextRead(partition, nextRead);``//移動游標` | | `23` | `}` | | `24` | ? | | `25` | `long`?`quantity = rq.getAvailabletoRead(partition, nextRead);` | | `26` | `quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity;` | | `27` | `TransactionMetadata metadata =?``new`?`TransactionMetadata(nextRead, (``int``)quantity);` | | `28` | `emitMessages(tx, collector, partition, metadata);` | | `29` | `return`?`metadata;` | | `30` | `}` | | `31` | ? | | `32` | `private`?`void`?`emitMessage(TransactionAttempt tx, BatchOutputCollector collector,` | | `33` | `int`?`partition, TransactionMetadata partitionMeta) {` | | `34` | `if``(partitionMeta.quantity <=?``0``){``return``;}` | | `35` | ? | | `36` | `List<String> messages = rq.getMessages(partition, partitionMeta.from, partitionMeta.quantity);` | | `37` | `long`?`tweetId = partitionMeta.from;` | | `38` | `for``(String msg : messages) {` | | `39` | `collector.emit(``new`?`Values(tx,?``""``+tweetId, msg));` | | `40` | `tweetId++;` | | `41` | `}` | | `42` | `}` | | `43` | ? | | `44` | `@Override` | | `45` | `public`?`int`?`numPartitions() {` | | `46` | `return`?`4``;` | | `47` | `}` | | `48` | ? | | `49` | `@Override` | | `50` | `public`?`void`?`close() {}` | | `51` | `}` | 最有趣的方法是**emitPartitionBatch**,它獲取之前提交的元數據。你要用它生成批次。這個批次不需要與之前的那個一致,你可能根本無法創建完全一樣的批次。剩余的工作由提交器*bolts*借助之前的狀態完成。 **原創文章,轉載請注明:**?轉載自[并發編程網 – ifeve.com](http://ifeve.com/) **本文鏈接地址:**?[Storm入門之第8章事務性拓撲](http://ifeve.com/getting-started-of-storm8/)**PartitionBatch**,它獲取之前提交的元數據。你要用它生成批次。這個批次不需要與之前的那個一致,你可能根本無法創建完全一樣的批次。剩余的工作由提交器*bolts*借助之前的狀態完成。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看