<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                [**可靠的消息 VS 不可靠的消息**](http://ifeve.com/getting-started-with-storm-4/#reliable-versus-unreliable-messages) 在設計拓撲結構時,始終在頭腦中記著的一件重要事情就是消息的可靠性。當有無法處理的消息時,你就要決定該怎么辦,以及作為一個整體的拓撲結構該做些什么。舉個例子,在處理銀行存款時,不要丟失任何事務報文就是很重要的事情。但是如果你要統計分析數以百萬的tweeter消息,即使有一條丟失了,仍然可以認為你的結果是準確的。 對于Storm來說,根據每個拓撲的需要擔保消息的可靠性是開發者的責任。這就涉及到消息可靠性和資源消耗之間的權衡。高可靠性的拓撲必須管理丟失的消息,必然消耗更多資源;可靠性較低的拓撲可能會丟失一些消息,占用的資源也相應更少。不論選擇什么樣的可靠性策略,Storm都提供了不同的工具來實現它。 要在*spout*中管理可靠性,你可以在分發時包含一個元組的消息ID(**collector.emit(new Values(…),tupleId)**)。在一個元組被正確的處理時調用**ack**方法,而在失敗時調用**fail**方法。當一個元組被所有的靶*bolt*和錨*bolt*處理過,即可判定元組處理成功(你將在[第5章](http://ifeve.com/getting-started-with-storm-5)學到更多錨*bolt*知識)。 發生下列情況之一時為元組處理失敗: * 提供數據的*spout*調用**collector.fail(tuple)** * 處理時間超過配置的超時時間 讓我們來看一個例子。想象你正在處理銀行事務,需求如下: * 如果事務失敗了,重新發送消息 * 如果失敗了太多次,終結拓撲運行 創建一個*spout*和一個*bolt*,*spout*隨機發送100個事務ID,有80%的元組不會被*bolt*收到(你可以在[例子ch04-spout](https://github.com/storm-book/examples-ch04-spouts/)查看完整代碼)。實現*spout*時利用**Map**分發事務消息元組,這樣就比較容易實現重發消息。 ~~~ public void nextTuple() { if(!toSend.isEmpty()){ for(Map.Entry<Integer, String> transactionEntry : toSend.entrySet()){ Integer transactionId = transactionEntry.getKey(); String transactionMessage = transactionEntry.getValue(); collector.emit(new Values(transactionMessage),transactionId); } toSend.clear(); } } ~~~ 如果有未發送的消息,得到每條事務消息和它的關聯ID,把它們作為一個元組發送出去,最后清空消息隊列。值得一提的是,調用map的**clear**是安全的,因為**nextTuple**失敗時,只有**ack**方法會修改map,而它們都運行在一個線程內。 維護兩個map用來跟蹤待發送的事務消息和每個事務的失敗次數。**ack**方法只是簡單的把事務從每個列表中刪除。 ~~~ public void ack(Object msgId) { messages.remove(msgId); failCounterMessages.remove(msgId); } ~~~ **fail**方法決定應該重新發送一條消息,還是已經失敗太多次而放棄它。 **NOTE:**如果你使用全部數據流組,而拓撲里的所有*bolt*都失敗了,*spout*的**fail**方法才會被調用。 ~~~ public void fail(Object msgId) { Integer transactionId = (Integer) msgId; //檢查事務失敗次數 Integer failures = transactionFailureCount.get(transactionId) + 1; if(failes >= MAX_FAILS){ //失敗數太高了,終止拓撲 throw new RuntimeException("錯誤, transaction id 【"+ transactionId+"】 已失敗太多次了 【"+failures+"】"); } //失敗次數沒有達到最大數,保存這個數字并重發此消息 transactionFailureCount.put(transactionId, failures); toSend.put(transactionId, messages.get(transactionId)); LOG.info("重發消息【"+msgId+"】"); } ~~~ 首先,檢查事務失敗次數。如果一個事務失敗次數太多,通過拋出**RuntimeException**終止發送此條消息的工人。否則,保存失敗次數,并把消息放入待發送隊列(**toSend**),它就會再次調用**nextTuple**時得以重新發送。 **NOTE:**Storm節點不維護狀態,因此如果你在內存保存信息(就像本例做的那樣),而節點又不幸掛了,你就會丟失所有緩存的消息。 Storm是一個快速失敗的系統。拓撲會在拋出異常時掛掉,然后再由Storm重啟,恢復到拋出異常前的狀態。 ## **獲取數據** 接下來你會了解到一些設計*spout*的技巧,幫助你從多數據源獲取數據。 ### **直接連接** 在一個直接連接的架構中,spout直接與一個消息分發器連接(見圖4-1)。 [![圖4-1直接連接的spout](https://box.kancloud.cn/2015-09-21_55ffedd410f5d.png)](http://ifeve.com/%e7%ac%ac%e5%9b%9b%e7%ab%a0spouts/%e5%9b%be4-1%e7%9b%b4%e6%8e%a5%e8%bf%9e%e6%8e%a5%e7%9a%84spout/) 圖4-1 直接連接的*spout* 這個架構很容易實現,尤其是在消息分發器是已知設備或已知設備組時。已知設備滿足:拓撲從啟動時就已知道該設備,并貫穿拓撲的整個生命周期保持不變。未知設備就是在拓撲運行期添加進來的。已知設備組就是從拓撲啟動時組內所有設備都是已知的。 下面舉個例子說明這一點。創建一個spout使用[Twitter流API](https://dev.twitter.com/docs/streaming-api)讀取twitter數據流。spout把API當作消息分發器直接連接。從數據流中得到符合**track**參數的公共tweets(參考twitter開發頁面)。完整的例子可以在鏈接[https://github.com/storm-book/examples-ch04-spouts/](https://github.com/storm-book/examples-ch04-spouts/)找到。 *spout*從配置對象得到連接參數(**track,user,password**),并連接到API(在這個例子中使用[Apache](http://apache.org/)的[DefaultHttpClient](http://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/org/apache/http/impl/client/DefaultHttpClient.html))。它一次讀一行數據,并把數據從JSON轉化成Java對象,然后發布它。 ~~~ public void nextTuple() { //創建http客戶端 client = new DefaultHttpClient(); client.setCredentialsProvider(credentialProvider); HttpGet get = new HttpGet(STREAMING_API_URL+track); HttpResponse response; try { //執行http訪問 response = client.execute(get); StatusLine status = response.getStatusLine(); if(status.getStatusCode() == 200){ InputStream inputStream = response.getEntity().getContent(); BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); String in; //逐行讀取數據 while((in = reader.readLine())!=null){ try{ //轉化并發布消息 Object json = jsonParser.parse(in); collector.emit(new Values(track,json)); }catch (ParseException e) { LOG.error("Error parsing message from twitter",e); } } } } catch (IOException e) { LOG.error("Error in communication with twitter api ["+get.getURI().toString()+"], sleeping 10s"); try { Thread.sleep(10000); } catch (InterruptedException e1) {} } } ~~~ **NOTE:**在這里你鎖定了**nextTuple**方法,所以你永遠也不會執行**ack**和**fail**方法。在真實的應用中,我們推薦你在一個單獨的線程中執行鎖定,并維持一個內部隊列用來交換數據(你會在下一個例子中學到如何實現這一點:[消息隊列](http://ifeve.com/getting-started-with-storm-4/#enqueued-messages))。 棒極了! 現在你用一個*spout*讀取Twitter數據。一個明智的做法是,采用拓撲并行化,多個*spout*從同一個流讀取數據的不同部分。那么如果你有多個流要讀取,你該怎么做呢?Storm的第二個有趣的特性(譯者注:第一個有趣的特性已經出現過,這句話原文都是一樣的,不過按照中文的行文習慣還是不重復使用措詞了)是,你可以在任意組件內(*spouts/bolts*)訪問**TopologyContext**。利用這一特性,你能夠把流劃分到多個*spouts*讀取。 ~~~ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { //從context對象獲取spout大小 int spoutsSize = context.getComponentTasks(context.getThisComponentId()).size(); //從這個spout得到任務id int myIdx = context.getThisTaskIndex(); String[] tracks = ((String) conf.get("track")).split(","); StringBuffer tracksBuffer = new StringBuffer(); for(int i=0; i< tracks.length;i++){ //Check if this spout must read the track word if( i % spoutsSize == myIdx){ tracksBuffer.append(","); tracksBuffer.append(tracks[i]); } } if(tracksBuffer.length() == 0) { throw new RuntimeException("沒有為spout得到track配置" + " [spouts大小:"+spoutsSize+", tracks:"+tracks.length+"] tracks的數量必須高于spout的數量"); this.track =tracksBuffer.substring(1).toString(); } ... } ~~~ 利用這一技巧,你可以把collector對象均勻的分配給多個數據源,當然也可以應用到其它的情形。比如說,從web服務器收集日志文件見圖4-2 [![圖4-2直連hash](https://box.kancloud.cn/2015-09-21_55ffedd45d1fe.png)](http://ifeve.com/%e7%ac%ac%e5%9b%9b%e7%ab%a0spouts/%e5%9b%be4-2%e7%9b%b4%e8%bf%9ehash-2/) 圖4-2 直連hash 通過上一個例子,你學會了從一個*spout*連接到已知設備。你也可以使用相同的方法連接未知設備,不過這時你需要借助于一個協同系統維護的設備列表。協同系統負責探察列表的變化,并根據變化創建或銷毀連接。比如,從web服務器收集日志文件時,web服務器列表可能隨著時間變化。當添加一臺web服務器時,協同系統探查到變化并為它創建一個新的*spout*。見圖4-3 [![圖4-3直連協同](https://box.kancloud.cn/2015-09-21_55ffedd51b19b.png)](http://ifeve.com/%e7%ac%ac%e5%9b%9b%e7%ab%a0spouts/%e5%9b%be4-3%e7%9b%b4%e8%bf%9e%e5%8d%8f%e5%90%8c/) 圖4-3 直連協同 [**消息隊列**](http://ifeve.com/getting-started-with-storm-4/#enqueued-messages) 第二種方法是,通過一個隊列系統接收來自消息分發器的消息,并把消息轉發給*spout*。更進一步的做法是,把隊列系統作為*spout*和數據源之間的中間件,在許多情況下,你可以利用多隊列系統的重播能力增強隊列可靠性。這意味著你不需要知道有關消息分發器的任何事情,而且添加或移除分發器的操作比直接連接簡單的多。這個架構的問題在于隊列是一個故障點,另外你還要為處理流程引入新的環節。 圖4-4展示了這一架構模型 [![圖4-4使用隊列系統](https://box.kancloud.cn/2015-09-21_55ffedd64c311.png)](http://ifeve.com/%e7%ac%ac%e5%9b%9b%e7%ab%a0spouts/%e5%9b%be4-4%e4%bd%bf%e7%94%a8%e9%98%9f%e5%88%97%e7%b3%bb%e7%bb%9f/) 圖4-4 使用隊列系統 **NOTE:**你可以通過輪詢隊列或哈希隊列(把隊列消息通過哈希發送給*spouts*或創建多個隊列使隊列*spouts*一一對應)在多個*spouts*之間實現并行性。 接下來我們利用[Redis](http://redis.io/)和它的java庫[Jedis](https://github.com/xetorthio/jedis)創建一個隊列系統。在這個例子中,我們創建一個日志處理器從一個未知的來源收集日志,利用**lpush**命令把消息插入隊列,利用**blpop**命令等待消息。如果你有很多處理過程,blpop命令采用了輪詢方式獲取消息。 我們在*spout*的**open**方法創建一個線程,用來獲取消息(使用線程是為了避免鎖定**nextTuple**在主循環的調用): ~~~ new Thread(new Runnable() { @Override public void run() { try{ Jedis client= new Jedis(redisHost, redisPort); List res = client.blpop(Integer.MAX_VALUE, queues); messages.offer(res.get(1)); }catch(Exception e){ LOG.error("從redis讀取隊列出錯",e); try { Thread.sleep(100); }catch(InterruptedException e1){} } } }).start(); ~~~ 這個線程的惟一目的就是,創建redis連接,然后執行**blpop**命令。每當收到了一個消息,它就被添加到一個內部消息隊列,然后會被**nextTuple**消費。對于*spout*來說數據源就是redis隊列,它不知道消息分發者在哪里也不知道消息的數量。 **NOTE:**我們不推薦你在*spout*創建太多線程,因為每個spout都運行在不同的線程。一個更好的替代方案是增加拓撲并行性,也就是通過Storm集群在分布式環境創建更多線程。 在**nextTuple**方法中,要做的惟一的事情就是從內部消息隊列獲取消息并再次分發它們。 ~~~ public void nextTuple(){ while(!messages.isEmpty()){ collector.emit(new Values(messages.poll())); } } ~~~ **NOTE:**你還可以借助redis在*spout*實現消息重發,從而實現可靠的拓撲。(譯者注:這里是相對于開頭的**可靠的消息VS不可靠的消息**講的) **DRPC** DRPCSpout從DRPC服務器接收一個函數調用,并執行它(見[第三章的例子](http://ifeve.com/getting-started-with-storm-3#drpc-topology))。對于最常見的情況,使用[backtype.storm.drpc.DRPCSpout](http://nathanmarz.github.io/storm/doc/backtype/storm/drpc/DRPCSpout.html)就足夠了,不過仍然有可能利用Storm包內的DRPC類創建自己的實現。 **小結** 現在你已經學習了常見的*spout*實現模式,它們的優勢,以及如何確保消息可靠性。不存在適用于所有拓撲的架構模式。如果你知道數據源,并且能夠控制它們,你就可以使用直接連接;然而如果你需要添加未知數據源或從多種數據源接收數據,就最好使用消息隊列。如果你要執行在線過程,你可以使用DRPCSpout或類似的實現。 你已經學習了三種常見連接方式,不過依賴于你的需求仍然有無限的可能。 **原創文章,轉載請注明:**?轉載自[并發編程網 – ifeve.com](http://ifeve.com/) **本文鏈接地址:**?[Storm入門之第四章Spouts](http://ifeve.com/getting-started-with-storm-4/)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看