# 位移提交
aka. Consumer offset
* Consumer 需要向 Kafka 匯報自己的位移數據,這個匯報過程稱為`提交位移(Committing Offsets)`
* Consumer 需要為分配給它的每個分區提交各自的位移數據
* 位移提交的語義保障是 Consumer 端負責的,Kafka 只會無腦接受
* 從開發者角度,位移提交分為自動提交 & 手動提交
* 從 Consumer 端的角度,位移提交分為同步提交 & 異步提交
## 自動提交
* Kafka Consumer 后臺提交
* 開啟自動提交:`enable.auto.commit=true`
* 配置自動提交間隔:Consumer 端:`auto.commit.interval.ms`,默認 5s
``` Java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
```
* 自動提交位移的順序
* 配置 enable.auto.commit = true
* Kafka 會保證在開始調用 poll 方法時,提交上次 poll 返回的所有消息
* 因此 poll 方法是先提交上一批消息的 offset,再處理下一批消息
* 因此自動提交不會出現消費丟失,但會`重復消費`
* 重復消費舉例
* Consumer 每 5s 提交 offset
* 假設提交 offset 后的 3s 發生了 Rebalance
* Rebalance 之后的所有 Consumer 從上一次提交的 offset 處繼續消費
* 因此 Rebalance 發生前 3s 的消息會被重復消費
* 這是機制缺陷
## 手動提交
* 使用 KafkaConsumer#commitSync():會提交 KafkaConsumer#poll() 返回的最新 offset
* 該方法為同步操作,等待直到 offset 被成功提交才返回
* 示例如下
``` Java
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 處理消息
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handle(e); // 處理提交失敗異常
}
}
```
* 因此 commitSync 在處理完所有消息之后
* 手動提交優勢
* 能夠把控 offset 的時機和頻率
* 手動提交缺陷
* 調用 commitSync 時,Consumer 處于阻塞狀態,直到 Broker 返回結果
* 會影響 TPS
* 可以選擇拉長提交間隔,但有以下問題
* 會導致 Consumer 的提交頻率下降
* Consumer 重啟后,會有`更多`的消息被消費
## 同步提交
* commitSync
## 異步提交
* 鑒于手動同步提交的問題,Kafka 提供另一個 API
* KafkaConsumer#commitAsync()
* 該 API 是異步的,提供 callback
``` Java
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 處理消息
consumer.commitAsync((offsets, exception) -> {
if (exception != null)
handle(exception);
});
}
```
* commitAsync 的問題
* 出現問題不會自動重試
* 因為異步重試導致 offset 會過期,重試是沒有意義的
## 最佳實踐
* 手動提交組合 commitSync & commitAsync
* 原因
* 利用 commitSync 的自動重試規避瞬時錯誤,e.g. 網絡抖動、Broker GC
* 不想總處于阻塞狀態影響 TPS
* 最佳實踐 Code
``` Java
try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
process(records); // 處理消息
commitAysnc(); // 使用異步提交規避阻塞
}
} catch(Exception e) {
handle(e); // 處理異常
} finally {
try {
consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}
```
## 更精細管理 offset
* KafkaConsumerAPI 有一組更方便的 API
* i.e. 直接提交最新一條消息的 offset
如何更加細粒度化地提交 offset?
* Background
* poll 返回的是 5k 條消息
* 期望一個大事務分成若干小事務提交
* KafkaConsumerAPI 新方法
* commitSync(Map<TopicPartition, OffsetAndMetadata>)
* commitAsync(Map<TopicPartition, OffsetAndMetadata>)
* TopicPartition i.e. 消費分區
* OffsetAndMetadata i.e. 位移數據
* 該方法能夠實現更細粒度更新 offset,而不受限于 poll 的消息數量
```
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
……
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
process(record); // 處理消息
offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1);
if(count % 100 == 0)
consumer.commitAsync(offsets, null); // 回調處理邏輯是null
count++;
}
}
```
- 概覽
- 入門
- 1. 消息引擎系統
- 2. Kafka 術語
- 3. 分布式流處理平臺
- 4. Kafka “發行版”
- 5. Kafka 版本號
- 基本使用
- 6. 生產集群部署
- 7. 集群參數配置
- 客戶端實踐與原理
- 9. Consumer 分區機制
- 10. Consumer 壓縮算法
- 11. 無消息丟失配置
- 12. 客戶端高級功能
- 13. Producer 管理 TCP
- 14. 冪等生產者和事務生產者
- 15. 消費者組
- 16. 位移主題
- 17. 消費者組重平衡(TODO)
- 18. 位移提交
- 19. CommitFailedException
- 20. 多線程開發者實例
- 21. Consumer 管理 TCP
- 22. 消費者組消費進度監控
- Kafka 內核
- 23. 副本機制
- 24. 請求處理
- 25. Rebalance 全流程
- 26. Kafka 控制器
- 27. 高水位和 Leader Epoch
- 管理與監控
- 28. Topic 管理
- 29. Kafka 動態配置
- 30. 重設消費者組位移
- 31. 工具腳本
- 32. KafkaAdminClient
- 33. 認證機制
- 34. 云下授權
- 35. 跨集群備份 MirrorMaker
- 36. 監控 Kafka
- 37. Kafka 監控框架
- 38. 調優 Kafka
- 39. 實時日志流處理平臺
- 流處理
- 40. Kafka Streams
- 41. Kafka Streams DSL
- 42. Kafka Streams 金融
- Q&A