# 消費者組消費進度監控
aka. 消費者 Lag(Consumer Lag)
* 滯后成都:消費者當前落后于生產者的程度
* Lag 的單位:消息數
* Kafka 監控 Lag 的層級是 Partition
* 計算 Topic 級別的 Lag:需要自己匯總
* 如果 Consumer 速度無法匹及 Producer,會導致消費數據不在 OS 的 Page Cache,導致失去 Zero-copy 特性
* 最好的 Lag 應趨近于 0
* 因此,需要時刻關注消費進度
* 監控 Lag 的方法
* Kafka 自帶的命令行工具 `kafka-consumer-groups` 腳本
* Kafka Java Consumer API
* Kafka 自帶的 JMX 監控指標
## Kafka shell cmd
* 能夠監控獨立消費者(Standalone Consumer) Lag
* Standalone Consumer 調用 KafkaConsumer.assign() 直接消費指定 Partition
* 查看 Lag
```
$ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker連接信息> --describe --group <group名稱>
```
## Kafka Java Consumer API
```
public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (AdminClient client = AdminClient.create(props)) {
ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
try {
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自動提交位移
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 處理中斷異常
// ...
return Collections.emptyMap();
} catch (ExecutionException e) {
// 處理ExecutionException
// ...
return Collections.emptyMap();
} catch (TimeoutException e) {
throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
}
}
}
```
* 調用 AdminClient.listConsumerGroupOffsets 方法獲取給定消費者組的最新消費消息的位移
* 獲取訂閱分區的最新消息位移
* 執行相應的減法操作,獲取 Lag 值并封裝進一個 Map 對象
## Kafka JMX
* Kafka 提供了 kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”的 JMX 指標
* records-lag-max
* records-lead-min
* Lead 值:消費者最新消費消息的 offset 與 Partition 當前第一條消息 offset 的差值
* Lag 越大的話,Lead 就越小,反之同理
* 當 Lead 越來越小,快接近于 0,有可能 Consumer 要丟消息
* 因為 Kafka 的消息有留存時間,默認 1 周
* 如果 Consumer 足夠慢到要消費的數據會被 Kafka 刪除
* 此時會造成丟消息假象
* Kafka 消費者還在分區級別提供了額外的 JMX 指標,用于單獨監控分區級別的 Lag 和 Lead 值。JMX 名稱為:kafka.consumer:type=consumer-fetch-manager-metrics,partition=“{partition}”,topic=“{topic}”,client-id=“{client-id}”。
## 總結
* 生產環境推薦使用 Kafka JMX
- 概覽
- 入門
- 1. 消息引擎系統
- 2. Kafka 術語
- 3. 分布式流處理平臺
- 4. Kafka “發行版”
- 5. Kafka 版本號
- 基本使用
- 6. 生產集群部署
- 7. 集群參數配置
- 客戶端實踐與原理
- 9. Consumer 分區機制
- 10. Consumer 壓縮算法
- 11. 無消息丟失配置
- 12. 客戶端高級功能
- 13. Producer 管理 TCP
- 14. 冪等生產者和事務生產者
- 15. 消費者組
- 16. 位移主題
- 17. 消費者組重平衡(TODO)
- 18. 位移提交
- 19. CommitFailedException
- 20. 多線程開發者實例
- 21. Consumer 管理 TCP
- 22. 消費者組消費進度監控
- Kafka 內核
- 23. 副本機制
- 24. 請求處理
- 25. Rebalance 全流程
- 26. Kafka 控制器
- 27. 高水位和 Leader Epoch
- 管理與監控
- 28. Topic 管理
- 29. Kafka 動態配置
- 30. 重設消費者組位移
- 31. 工具腳本
- 32. KafkaAdminClient
- 33. 認證機制
- 34. 云下授權
- 35. 跨集群備份 MirrorMaker
- 36. 監控 Kafka
- 37. Kafka 監控框架
- 38. 調優 Kafka
- 39. 實時日志流處理平臺
- 流處理
- 40. Kafka Streams
- 41. Kafka Streams DSL
- 42. Kafka Streams 金融
- Q&A