<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                # 消費者組重平衡全流程解析 * 重平衡流程:讓組內的所有消費者實例就消費哪些 topic partition 達成一致 * 重平衡需要 Broker 端的 Coordinator 組件 * 以下基于 Kafka V2.3 版本 ## 觸發與通知 Rebalance 3 個觸發條件 * 組成員數量變化 * 訂閱 Topic 數量變化 * 訂閱 Topic 的 partition 數量變化 生產環境中常因第一個而 Rebalance。 每次消費者組重啟時,必然會 Rebalance。 Q:Rebalance 是如何通知到其他 Consumer Instance? A:依靠 Consumer 端的心跳線程(Heartbeat Thread) * Consumer 需要定期發送心跳到 Broker 的協調者以表明其存活 * Kafka V0.10.1.0 版本之前 * 心跳是在消費者主線程完成的,即調用的 KafkaConsumer.poll 方法的線程 * 問題 * 消息處理邏輯也在這個線程 * 如果消息處理耗時較長,心跳則無法及時發送到協調者 * 0.10.1.0 開始引入單獨的心跳線程 * Rebalance 的通知機制依靠心跳線程完成 * 當協調者決定 Rebalance 后,會將 `REBALANCE_IN_PROGRESS` 封裝進心跳請求響應中,返回給 Consumer * Consumer 端參數 `heartbeat.interval.ms`:心跳間隔,i.e. 控制重平衡通知頻率 ## 消費者組狀態機 * Rebalance 開始,協調者組件開始控制 Consumer Group 的狀態流轉 * Kafka 設計了`消費者組狀態機`,幫助協調者完成 Rebalance * Kafka 為消費者組定義了 5 個狀態 ![](https://img.kancloud.cn/3c/28/3c281189cfb1d87173bc2d4b8149f38b_529x414.jpeg) ![](https://img.kancloud.cn/f1/6f/f16fbcb798a53c21c3bf1bcd5b72b006_892x343.png) * 一個消費者組最開始是 Empty * 重平衡開啟后,會置于 PreparingRebalance 等待成員加入 * 之后變更到 CompletingRebalance 等待分配方案 * 最后流轉到 Stable 完成 Rebalance * 當有成員變動時,消費者組狀態從 Stable 變為 PreparingRebalance * 此時所有現存成員需要重新申請加入組 * 當所有組成員都退出組后,消費者組狀態為 Empty * 消費者組處于 Empty 狀態,Kafka 會定期自動刪除過期 offset ## 消費者端重平衡流程 * Rebalance 完整流程需要 Consumer & Coordinator 共同完成 * Consumer 端 Rebalance 步驟 * 加入組 * 對應 JoinGroup 請求 * 等待 Leader Consumer 分配方案 * 對應 SyncGroup 請求 * 當組內成員加入組時,Consumer 向協調者發送 JoinGroup 請求 * 每個 Consumer 會上報自己訂閱的 topic * Coordinator 收集到所有 JoinGroup 請求后,從這些成員中選擇一個擔任消費者組的 Leader * 通常第一個發送 JoinGroup 請求的自動成為 Leader * Leader Consumer 的任務是收集所有成員的 topic,根據信息制定具體的 partition consumer 分配方案 * 選出 Leader 后,協調者把所有 topic 信息封裝到 JoinGroup Response 中,發送給 Leader * Leader Consumer 做出統一分配方案,進入到 SyncGroup 請求 * Leader Consumer 向協調者發送 SyncGroup,將分配方案發給協調者 * 其他成員也會發出 SyncGroup 請求 * 協調者以 SyncGroup Response 的方式將方案下發給所有成員 ![](https://img.kancloud.cn/e7/d4/e7d40ce1c34d66ec36bfdaaa3ec9611f_1950x780.png) ![](https://img.kancloud.cn/62/52/6252b051450c32c143f03410f6c2b75d_1950x696.png) * 所有成員成功接收到分配方案,消費者組進入 Stable 狀態,開始正常消費 ## Broker 端重平衡場景 ## 新成員加入 * 消費者組處于 Stable 之后有新成員加入 ![](https://img.kancloud.cn/62/f8/62f85fb0b0f06989dd5a6f133599ca33_1950x1066.png) ### 組成員主動離開 * 主動離開:Consumer Instance 通過調用 close() 方法通知協調者退出 * 該場景涉及第三個請求:LeaveGroup 請求 ![](https://img.kancloud.cn/86/72/867245cbf6cfd26573aba1816516b26b_1950x1118.png) ### 組成員崩潰離開 * 協調者需要等待一段時間才能感知 * 這個時間段由 Consumer 端參數 `sessionn.timeout.ms` 控制 * Kafka 不會超過上述參數時間感知崩潰 * 處理流程相同 ![](https://img.kancloud.cn/bc/00/bc00d35060e1a4216e177e5b361ad40c_1950x1147.png) ### Rebalance 時組成員提交 offset * Rebalance 開啟時,協調者會給予成員一段緩沖時間,要求每個成員在這段時間內快速上報自己的 offset * 再開啟正常的 JoinGroup/SyncGroup 請求 ![](https://img.kancloud.cn/83/b7/83b77094d4170b9057cedfed9cdb33be_1950x1144.png)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看