<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                # 位移主題 * Kafka 中的內部主題(Internal Topic) aka. __consumer_offsets * __consumer_offsets 在 Kafka 源碼中正式名字:位移主題。i.e. Offsets Topic ## 背景 * 老版本 * Consumer 重啟后根據 ZK 中 offset 從上次位置繼續消費 * 但是 ZK 不適合高頻寫操作 * 新版本 * 將 Consumer 的位移數據作為一條條普通的 Kafka 消息,提交到 __consumer_offsets 中 * 因此該 Topic 的主要作用:保存 Kafka Consumer offset ## 消息格式 * 其消息格式是 Kafka 自定義的,即用戶不可修改格式,不可隨意寫 * 有 Consumer API 可以去寫,自定義的 Producer 不能往這個 Topic 去寫 * 消息格式就是 KV pair * 消息格式 Key:<GroupID, Topic, Partition> * 標識這個 offset 數據是哪個 Consumer 的 * 除了 Consumer Group,Kafka 還支持 Standalone Consumer。運行機制不同,但位移管理相同。 * Consumer offset 在 Partition 層面,因此還需要有 Topic、Partition * 消息題 Value:保存 offset(簡單來看) * 另外兩種消息格式 * 用于保存 Consumer Group 信息的消息,i.e. 目的是注冊 Consumer Group 的。 * 用于刪除 Group 過期位移甚至是刪除 Group 的消息 * i.e. tombstone 消息,墓碑消息,aka. delete mark。 * 消息體是 null * 何時 * 一旦某個 Consumer Group 下所有的 Consumer Instance 都停止了 * 它們的位移數據已被刪除 * Kafka 會向位移主題的對應分區寫入 tombstone 消息,表示徹底刪除這個 Group 的信息 ## 位移主題創建 * 當 Kafka 集群中的第一個 Consumer 程序啟動時,Kafka 會自動創建位移主題 * Kafka 自動創建,分區數配置根據 Broker 參數 `offsets.topic.num.partitions`,默認是 50,i.e. Kafka 會創建一個 50 分區的位移主題。 * 位移主題的副本數取決于 Broker 參數 `offsets.topic.replication.factor`,默認是 3。 * 可以選擇手動創建位移主題 * e.g. 使用 Kafka API 創建 Kafka Consumer 提交位移會寫到位移主題,提交方式 * 自動提交位移 * Consumer 端參數 `enable.auto.commit=true` * Consumer 在后臺默默定期提交位移 * 提交間隔控制參數:`auto.commit.innterval.ms` * Spark、Flink 等與 Kafka 集成的大數據框架是禁用自動提交位移的 * 手動提交位移 * Consumer 端參數:`enable.auto.commit=false` * Kafka Consumer API 提供了位移提交的方法,e.g. consumer.commitSync * 自動提交位移的一個問題 * 只要 Consumer 一直啟動,就會無限期地向位移主題寫入消息 * 極端例子: * 假設 Consumer 消費到某個 Topic的最新一條消息,位移是 100 * 之后該 Topic 沒有產生新消息 * Consumer offset 永遠保持在 100 * 由于自動提交 offset,offset topic 會不停寫入 offset=100 的消息 * 這要求 Kafka 必須要有這對 offset topic 的消息刪除策略,否則會占滿存儲 * Kafka 刪除 offset topic 中過期消息的方法 * 使用 Compaction,aka. 整理(參考 JVM GC) * Compact 策略中的過期定義 * 同一個 Key 的兩條消息 M1、M2,如果 M1 的發送時間早于 M2,則 M1 是過期消息 * Compact:掃描日志的所有消息,刪除過期消息,剩下的消息整理。 ![](https://img.kancloud.cn/86/a4/86a44073aa60ac33e0833e6a9bfd9ae7_681x397.jpeg) * Log Cleaner * Kafka 后臺定期巡檢待 Compact 的 Topic 的線程
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看