<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                正如你已經看到的,*bolts*是一個Storm集群中的關鍵組件。你將在這一章學到*bolt*生命周期,一些*bolt*設計策略,以及幾個有關這些內容的例子。 ## ***Bolt*生命周期** *Bolt*是這樣一種組件,它把元組作為輸入,然后產生新的元組作為輸出。實現一個*bolt*時,通常需要實現**IRichBolt**接口。*Bolts*對象由客戶端機器創建,序列化為拓撲,并提交給集群中的主機。然后集群啟動工人進程反序列化*bolt*,調用**prepare**,最后開始處理元組。 **NOTE:**要創建一個bolt對象,它通過構造器參數初始化成員屬性,*bolt*被提交到集群時,這些屬性值會隨著一起序列化。 ## ***Bolt*結構** *Bolts*擁有如下方法: ~~~ declareOutputFields(OutputFieldsDeclarer declarer) 為bolt聲明輸出模式 prepare(java.util.Map stormConf, TopologyContext context, OutputCollector collector) 僅在bolt開始處理元組之前調用 execute(Tuple input) 處理輸入的單個元組 cleanup() 在bolt即將關閉時調用 ~~~ 下面看一個例子,在這個例子中*bolt*把一句話分割成單詞列表: ~~~ class SplitSentence implements IRichBolt { private OutputCollector collector; publlic void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple tuple) { String sentence = tuple.getString(0); for(String word : sentence.split(" ")) { collector.emit(new Values(word)); } } public void cleanup(){} public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } ~~~ 正如你所看到的,這是一個很簡單的*bolt*。值得一提的是在這個例子里,沒有消息擔保。這就意味著,如果bolt因為某些原因丟棄了一些消息——不論是因為*bolt*掛了,還是因為程序故意丟棄的——生成這條消息的*spout*不會收到任何通知,任何其它的*spouts*和*bolts*也不會收到。 然而在許多情況下,你想確保消息在整個拓撲范圍內都被處理過了。 ## **可靠的*bolts*和不可靠的*bolts*** 正如前面所說的,Storm保證通過*spout*發送的每條消息會得到所有*bolt*的全面處理。基于設計上的考慮,這意味著你要自己決定你的*bolts*是否保證這一點。 拓撲是一個樹型結構,消息(元組)穿過其中一條或多條分支。樹上的每個節點都會調用**ack(tuple)**或**fail(tuple)**,Storm因此知道一條消息是否失敗了,并通知那個/那些制造了這些消息的*spout(s)*。既然一個Storm拓撲運行在高度并行化的環境里,跟蹤始發*spout*實例的最好方法就是在消息元組內包含一個始發spout引用。這一技巧稱做*錨定(譯者注:原文為**Anchoring*)。修改一下剛剛講過的*SplitSentence*,使它能夠確保消息都被處理了。 ~~~ class SplitSentence implenents IRichBolt { private OutputCollector collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple tuple) { String sentence = tuple.getString(0); for(String word : sentence.split(" ")) { collector.emit(tuple, new Values(word)); } collector.ack(tuple); } public void cleanup(){} public void declareOutputFields(OutputFieldsDeclarer declarer){ declar.declare(new Fields("word")); } } ~~~ 錨定發生在調用**collector.emit()**時。正如前面提到的,Storm可以沿著元組追蹤到始發*spout*。**collector.ack(tuple)**和**collector.fail(tuple)**會告知spout每條消息都發生了什么。當樹上的每條消息都已被處理了,Storm就認為來自*spout*的元組被全面的處理了。如果一個元組沒有在設置的超時時間內完成對消息樹的處理,就認為這個元組處理失敗。默認超時時間為30秒。 **NOTE:**你可以通過修改Config.TOPOLOGY_MESSAGE_TIMEOUT修改拓撲的超時時間。 當然了spout需要考慮消息的失敗情況,并相應的重試或丟棄消息。 **NOTE:**你處理的每條消息要么是確認的(譯者注:collector.ack())要么是失敗的(譯者注:collector.fail())。Storm使用內存跟蹤每個元組,所以如果你不調用這兩個方法,該任務最終將耗盡內存。 ## **多數據流** 一個*bolt*可以使用**emit(streamId, tuple)**把元組分發到多個流,其中參數**streamId**是一個用來標識流的字符串。然后,你可以在**TopologyBuilder**決定由哪個流訂閱它。 ## **多錨定** 為了用*bolt*連接或聚合數據流,你需要借助內存緩沖元組。為了在這一場景下確保消息完成,你不得不把流錨定到多個元組上。可以向**emit**方法傳入一個元組列表來達成目的。 ~~~ ... List anchors = new ArrayList(); anchors.add(tuple1); anchors.add(tuple2); collector.emit(anchors, values); ... ~~~ 通過這種方式,bolt在任意時刻調用**ack**或**fail**方法,都會通知消息樹,而且由于流錨定了多個元組,所有相關的*spout*都會收到通知。 ## **使用IBasicBolt自動確認** 你可能已經注意到了,在許多情況下都需要消息確認。簡單起見,Storm提供了另一個用來實現*bolt*的接口,**IBasicBolt**。對于該接口的實現類的對象,會在執行**execute**方法之后自動調用**ack**方法。 ~~~ class SplitSentence extends BaseBasicBolt { public void execute(Tuple tuple, BasicOutputCollector collector) { String sentence = tuple.getString(0); for(String word : sentence.split(" ")) { collector.emit(new Values(word)); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看