<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                # 位移提交 aka. Consumer offset * Consumer 需要向 Kafka 匯報自己的位移數據,這個匯報過程稱為`提交位移(Committing Offsets)` * Consumer 需要為分配給它的每個分區提交各自的位移數據 * 位移提交的語義保障是 Consumer 端負責的,Kafka 只會無腦接受 * 從開發者角度,位移提交分為自動提交 & 手動提交 * 從 Consumer 端的角度,位移提交分為同步提交 & 異步提交 ## 自動提交 * Kafka Consumer 后臺提交 * 開啟自動提交:`enable.auto.commit=true` * 配置自動提交間隔:Consumer 端:`auto.commit.interval.ms`,默認 5s ``` Java Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "2000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } ``` * 自動提交位移的順序 * 配置 enable.auto.commit = true * Kafka 會保證在開始調用 poll 方法時,提交上次 poll 返回的所有消息 * 因此 poll 方法是先提交上一批消息的 offset,再處理下一批消息 * 因此自動提交不會出現消費丟失,但會`重復消費` * 重復消費舉例 * Consumer 每 5s 提交 offset * 假設提交 offset 后的 3s 發生了 Rebalance * Rebalance 之后的所有 Consumer 從上一次提交的 offset 處繼續消費 * 因此 Rebalance 發生前 3s 的消息會被重復消費 * 這是機制缺陷 ## 手動提交 * 使用 KafkaConsumer#commitSync():會提交 KafkaConsumer#poll() 返回的最新 offset * 該方法為同步操作,等待直到 offset 被成功提交才返回 * 示例如下 ``` Java while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); process(records); // 處理消息 try { consumer.commitSync(); } catch (CommitFailedException e) { handle(e); // 處理提交失敗異常 } } ``` * 因此 commitSync 在處理完所有消息之后 * 手動提交優勢 * 能夠把控 offset 的時機和頻率 * 手動提交缺陷 * 調用 commitSync 時,Consumer 處于阻塞狀態,直到 Broker 返回結果 * 會影響 TPS * 可以選擇拉長提交間隔,但有以下問題 * 會導致 Consumer 的提交頻率下降 * Consumer 重啟后,會有`更多`的消息被消費 ## 同步提交 * commitSync ## 異步提交 * 鑒于手動同步提交的問題,Kafka 提供另一個 API * KafkaConsumer#commitAsync() * 該 API 是異步的,提供 callback ``` Java while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); process(records); // 處理消息 consumer.commitAsync((offsets, exception) -> { if (exception != null) handle(exception); }); } ``` * commitAsync 的問題 * 出現問題不會自動重試 * 因為異步重試導致 offset 會過期,重試是沒有意義的 ## 最佳實踐 * 手動提交組合 commitSync & commitAsync * 原因 * 利用 commitSync 的自動重試規避瞬時錯誤,e.g. 網絡抖動、Broker GC * 不想總處于阻塞狀態影響 TPS * 最佳實踐 Code ``` Java try { while(true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); process(records); // 處理消息 commitAysnc(); // 使用異步提交規避阻塞 } } catch(Exception e) { handle(e); // 處理異常 } finally { try { consumer.commitSync(); // 最后一次提交使用同步阻塞式提交 } finally { consumer.close(); } } ``` ## 更精細管理 offset * KafkaConsumerAPI 有一組更方便的 API * i.e. 直接提交最新一條消息的 offset 如何更加細粒度化地提交 offset? * Background * poll 返回的是 5k 條消息 * 期望一個大事務分成若干小事務提交 * KafkaConsumerAPI 新方法 * commitSync(Map<TopicPartition, OffsetAndMetadata>) * commitAsync(Map<TopicPartition, OffsetAndMetadata>) * TopicPartition i.e. 消費分區 * OffsetAndMetadata i.e. 位移數據 * 該方法能夠實現更細粒度更新 offset,而不受限于 poll 的消息數量 ``` private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); int count = 0; …… while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record: records) { process(record); // 處理消息 offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1); if(count % 100 == 0) consumer.commitAsync(offsets, null); // 回調處理邏輯是null count++; } } ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看