<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                [TOC] # Storm通信機制 Worker間的通信經常需要通過網絡跨節點進行,Storm使用ZeroMQ或Netty(0.9以后默認使用)作為進程間通信的消息框架。 Worker進程內部通信:不同worker的thread通信使用LMAX Disruptor來完成。 ? 不同topologey之間的通信,Storm不負責,需要自己想辦法實現,例如使用kafka等 ![](https://box.kancloud.cn/00da445922f2896f67ba0c697479e0c8_1632x590.png) ## worker進程間通信 worker進程間消息傳遞機制,消息的接收和處理的大概流程見下圖 ![](https://box.kancloud.cn/6686ae0be5c3974901eeca654ecd4a75_1026x800.png) * 對于worker進程來說,為了管理流入和傳出的消息,每個worker進程有一個獨立的接收線程(一個worker進程運行一個專用的接收線程來負責將外部發送過來的消息移動到對應的executor線程的incoming-queue中)(對配置的TCP端口supervisor.slots.ports進行監聽); 對應Worker接收線程,每個worker存在一個獨立的發送線程(transfer-queue的大小由參數topology.transfer.buffer.size來設置。transfer-queue的每個元素實際上代表一個tuple的集合),它負責從worker的transfer-queue(transfer-queue的大小由參數topology.transfer.buffer.size來設置)中讀取消息,并通過網絡發送給其他worker * 每個executor有自己的incoming-queue(executor的incoming-queue的大小用戶可以自定義配置)和outgoing-queue(executor的outgoing-queue的大小用戶可以自定義配置)。 ![](https://box.kancloud.cn/10a430a4aa1b239f15279e93dbb67b9a_432x14.png)Worker接收線程將收到的消息通過task編號傳遞給對應的executor(一個或多個)的incoming-queues; 每個executor有單獨的線程分別來處理spout/bolt的業務邏輯,業務邏輯輸出的中間數據會存放在outgoing-queue中,當executor的outgoing-queue中的tuple達到一定的閥值,executor的發送線程將批量獲取outgoing-queue中的tuple,并發送到transfer-queue中。 * 每個worker進程控制一個或多個executor線程,用戶可在代碼中進行配置。**其實就是我們在代碼中設置的并發度個數** ## worker進程間通信分析 1. Worker接受線程通過網絡接受數據,并根據Tuple中包含的taskId,匹配到對應的executor;然后根據executor找到對應的incoming-queue,將數據存發送到incoming-queue隊列中。 2. 業務邏輯執行現成消費incoming-queue的數據,通過調用Bolt的execute(xxxx)方法,將Tuple作為參數傳輸給用戶自定義的方法 3. 業務邏輯執行完畢之后,將計算的中間數據發送給outgoing-queue隊列,當outgoing-queue中的tuple達到一定的閥值,executor的發送線程將批量獲取outgoing-queue中的tuple,并發送到Worker的transfer-queue中 4. Worker發送線程消費transfer-queue中數據,計算Tuple的目的地,連接不同的node+port將數據通過網絡傳輸的方式傳送給另一個的Worker。 ![](https://box.kancloud.cn/08d141fd8cc8a561b66557b67b0663e5_949x233.png)另一個worker執行以上步驟1的操作 ## worker進程間技術 ### Netty Netty是一個NIO client-server(客戶端服務器)框架,使用Netty可以快速開發網絡應用,例如服務器和客戶端協議。Netty提供了一種新的方式來使開發網絡應用程序,這種新的方式使得它很容易使用和有很強的擴展性。Netty的內部實現時很復雜的,但是Netty提供了簡單易用的api從網絡處理代碼中解耦業務邏輯。Netty是完全基于NIO實現的,所以整個Netty都是異步的。 書籍:Netty權威指南 ### ZeroMQ ZeroMQ是一種基于消息隊列的多線程網絡庫,其對套接字類型、連接處理、幀、甚至路由的底層細節進行抽象,提供跨越多種傳輸協議的套接字。ZeroMQ是網絡通信中新的一層,介于應用層和傳輸層之間(按照TCP/IP劃分),其是一個可伸縮層,可并行運行,分散在分布式系統間。 ZeroMQ定位為:一個簡單好用的傳輸層,像框架一樣的一個socket library,他使得Socket編程更加簡單、簡潔和性能更高。是一個消息處理隊列庫,可在多個線程、內核和主機盒之間彈性伸縮。ZMQ的明確目標是“成為標準網絡協議棧的一部分,之后進入Linux內核”。 ## worker內部通信技術(Disruptor) ### Disruptor是什么 1. 簡單理解:Disruptor是一個Queue。Disruptor是實現了“隊列”的功能,而且是一個有界隊列。而隊列的應用場景自然就是“生產者-消費者”模型。 2. 在JDK中Queue有很多實現類,包括不限于ArrayBlockingQueue、LinkBlockingQueue,這兩個底層的數據結構分別是數組和鏈表。數組查詢快,鏈表增刪快,能夠適應大多數應用場景。 3. 但是ArrayBlockingQueue、LinkBlockingQueue都是線程安全的。涉及到線程安全,就會有synchronized、lock等關鍵字,這就意味著CPU會打架。 4. Disruptor一種線程之間信息無鎖的交換方式(使用CAS(Compare And Swap/Set)操作) ### Disruptor主要特點 1. 沒有競爭=沒有鎖=非常快。 2. 所有訪問者都記錄自己的序號的實現方式,允許多個生產者與多個消費者共享相同的數據結構。 3. 在每個對象中都能跟蹤序列號(ring buffer,claim Strategy,生產者和消費者),加上神奇的cache line padding,就意味著沒有為偽共享和非預期的競爭 ### Disruptor核心技術點 Disruptor可以看成一個事件監聽或消息機制,在隊列中一邊生產者放入消息,另外一邊消費者并行取出處理. 底層是單個數據結構:一個ring buffer。 每個生產者和消費者都有一個次序計算器,以顯示當前緩沖工作方式。 每個生產者消費者能夠操作自己的次序計數器的能夠讀取對方的計數器,生產者能夠讀取消費者的計算器確保其在沒有鎖的情況下是可寫的。 核心組件 * Ring Buffer 環形的緩沖區,負責對通過 Disruptor 進行交換的數據(事件)進行存儲和更新。 * Sequence 通過順序遞增的序號來編號管理通過其進行交換的數據(事件),對數據(事件)的處理過程總是沿著序號逐個遞增處理。 * RingBuffer底層是個數組,次序計算器是一個64bit long 整數型,平滑增長 ![](https://box.kancloud.cn/a398f9c13a4872c2b3d3a4f28ef9f462_904x690.png) 1. 接受數據并寫入到腳標31的位置,之后會沿著序號一直寫入,但是不會繞過消費者所在的腳標。 2. Joumaler和replicator同時讀到24的位置,他們可以批量讀取數據到30 3. 消費邏輯線程讀到了14的位置,但是沒法繼續讀下去,因為他的sequence暫停在15的位置上,需要等到他的sequence給他序號。如果sequence能正常工作,就能讀取到30的數據
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看