<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                [TOC] # 消費模型 消息是由生產者發布到kafka集群后,會被消費者消費.消息的消費模型有兩種:推送模型(push)和拉取模型(pull). 基于推送模型(push)的消息系統,由消息代理記錄消費者的消費狀態.消息代理在將消息推送到消費者后,標記這條消息已經消費,但這種方式無法很好保證消息被處理.比如,消息代理把消息發送出去后,當消費進程掛掉或者由于網絡原因沒有收到這條消息時,就有可能造成消息丟失(因為消息代理已經把這條消息標記為已消費了,但實際上這條消息并沒有被實際處理).如果要保證消息被處理,消息代理發送完消息后,要設置狀態為"已發送".只有收到消費者的確認請求后才更新為"已消費",這就需要消息代理中記錄所有的消費狀態,這種做法顯然是不可取的. kafka采用拉取模型,由消費者自己記錄消費狀態,每個消費者互相獨立的順序讀取每個分區的消息. 如下圖所示,有兩個消費者(不同消費者組)拉取同一個主題的消息,消費者A的消費進度是3,消費者B的消費進度是6.消費者拉取的最大上限通過最高水位(watermark)控制,生產者最新寫入的消息如果還沒有達到備份數量,對消費者是不可見的.這種由消費者控制偏移量的優點是:消費者可以按照任意的順序消費消息. 比如,消費者可以重置到舊的偏移量,重新處理之前已經消費過的消息.或者直接跳到最近的位置,從當前的時刻開始消費. ![](https://box.kancloud.cn/6fb9dd2f5a3d96b307979aeb5ca9b08d_743x236.png) 在一些消息系統中,消息代理會在消息被消費之后立即刪除消息.如果有不同類型的消費者訂閱同有一個主題,消息代理可能需要冗余地存儲同一消息.或者等所有消費者都消費完才刪除,這就需要消息代理跟蹤每個消費者的消費狀態,這種設計很大程度上限制了消息系統的整體吞吐量和處理延遲.kafka的做法是生產者發布的所有消息會一致保存在kafka集群中,不管消息有沒有被消費.用戶可以通過設置保留時間來清理過期的數據.比如,設置保留策略為兩天.那么,在消息發布后,它可以被不同的消費者消費,在兩天之后,過期的消息就會自動清理掉 # 高級API **優點** 高級API寫起來簡單 **不需要自行去管理offset,系統通過zookeeper自行管理** **不需要管理分區,副本等情況.系統自動管理.** 消費者斷線會自動根據上一次記錄在zookeeper中的offset去接著獲取數據(默認設置1分鐘更新一下zookeeper中存的offset) 可以使用group來區分對同一個topic的不同程序范圍內離開來(不同的group記錄不同的offset,這樣不同程序讀取同一個topic才不會因為offset互相影響). **缺點** **不能自行控制offset**(對于某些特殊需求來說) 不能細化控制如分區,副本,zk等 # 低級API **優點** **能夠讓開發者自己控制offset,想從哪里讀取就從哪里讀取.** 自行控制連接分區,對分區自定義進行負載均衡 對zookeeper的依賴性降低(如:offset不一定非要靠zk存儲,自行存儲offset即可,比如存在文件或者內存中) **缺點** **太過復雜**,需要自行控制offset,連接那個分區,找到分區leader等 # 消費者組 ![](https://box.kancloud.cn/ba1a4b29dd8e2c68c8953ac160f4f370_718x270.png) 消費者是以consumer group消費者組的方式工作,由一個或者多個消費者組成一個組,共同消費一個topic.每個分區在同一時間只能由group中的一個消費者讀取,但是多個group可以同時消費這個partition. 在圖中,有一個由三個消費者組成的group,有一個消費者讀取主題中的兩個分區,另外兩個分別讀取一個分區.某個消費者讀取某個分區,也可以叫做某個消費者是某個分區的擁有者 在這種情況下,消費者可以通過水平擴展的方式同時讀取大量的消息.另外,如果一個消費者失敗了,那么其他的group成員會自動負載均衡之前失敗的消費者讀取的分區 # 消費方式 consumer采用pull(拉)模式從broker中讀取數據. push(推)模式很難適應消費速率不同的消費者,因為消費發送速率是由broker決定的. 它的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞.而pull模式可以根據consumer的消費能力以適當的速率消費消息. 對于kafka而言,pull模式更合適,它可以簡化broker的設計,consumer可自主控制消費消息的速率,同時consumer可以自己控制消費方式-即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現不同的傳輸語義. pull模式不足之處是,如果kafka沒有數據,消費者可能會陷入循環中,一直等待數據到達.為了避免這種情況,我們在拉請求中有參數,允許消費者請求在等待數據到達的"長輪詢"中進行阻塞(并且可選的等待到給定的字節數,以確保大的傳輸大小). # 消費組案例 需求:測試同一個消費者組中的消費者,同一時刻只能有一個消費者消費。 1. 在hadoop102、hadoop103上修改/opt/module/kafka/config/consumer.properties配置文件中的group.id屬性為任意組名。 ~~~ $ vi consumer.properties group.id=console ~~~ 2. 在master,slave1,slave2上分別啟動消費者 ~~~ $ bin/kafka-console-consumer.sh --zookeeper master:2181 --topic first --consumer.config config/consumer.properties $ bin/kafka-console-consumer.sh --zookeeper master:2181 --topic first --consumer.config config/consumer.properties ~~~ 3. 在hadoop104上啟動生產者 ~~~ $ bin/kafka-console-producer.sh --broker-list master:9092 --topic first >hello world ~~~ 4. 查看hadoop102和hadoop103的接收者 同一時刻只有一個消費者接收到消息 # Consumer和ConsumerGroup ![](https://box.kancloud.cn/269d0a648b8ac08135252b70c7938c06_1828x734.png) 1. consumerGroup之間消費的數據互不干擾 2. 每個consumer對應一個或多個分片,一個分片只能屬于一個consumer 3. 多于分片數的consumer消費者,是沒有分片的數據可以被消費的 4. 多于consumer只能得到其中一個或多個consumer掛掉的時候才能有機會消費到數據.這個過程叫做rebalnace
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看