<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                # 消費者組消費進度監控 aka. 消費者 Lag(Consumer Lag) * 滯后成都:消費者當前落后于生產者的程度 * Lag 的單位:消息數 * Kafka 監控 Lag 的層級是 Partition * 計算 Topic 級別的 Lag:需要自己匯總 * 如果 Consumer 速度無法匹及 Producer,會導致消費數據不在 OS 的 Page Cache,導致失去 Zero-copy 特性 * 最好的 Lag 應趨近于 0 * 因此,需要時刻關注消費進度 * 監控 Lag 的方法 * Kafka 自帶的命令行工具 `kafka-consumer-groups` 腳本 * Kafka Java Consumer API * Kafka 自帶的 JMX 監控指標 ## Kafka shell cmd * 能夠監控獨立消費者(Standalone Consumer) Lag * Standalone Consumer 調用 KafkaConsumer.assign() 直接消費指定 Partition * 查看 Lag ``` $ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker連接信息> --describe --group <group名稱> ``` ## Kafka Java Consumer API ``` public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException { Properties props = new Properties(); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); try (AdminClient client = AdminClient.create(props)) { ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID); try { Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自動提交位移 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet()); return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset())); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 處理中斷異常 // ... return Collections.emptyMap(); } catch (ExecutionException e) { // 處理ExecutionException // ... return Collections.emptyMap(); } catch (TimeoutException e) { throw new TimeoutException("Timed out when getting lag for consumer group " + groupID); } } } ``` * 調用 AdminClient.listConsumerGroupOffsets 方法獲取給定消費者組的最新消費消息的位移 * 獲取訂閱分區的最新消息位移 * 執行相應的減法操作,獲取 Lag 值并封裝進一個 Map 對象 ## Kafka JMX * Kafka 提供了 kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”的 JMX 指標 * records-lag-max * records-lead-min * Lead 值:消費者最新消費消息的 offset 與 Partition 當前第一條消息 offset 的差值 * Lag 越大的話,Lead 就越小,反之同理 * 當 Lead 越來越小,快接近于 0,有可能 Consumer 要丟消息 * 因為 Kafka 的消息有留存時間,默認 1 周 * 如果 Consumer 足夠慢到要消費的數據會被 Kafka 刪除 * 此時會造成丟消息假象 * Kafka 消費者還在分區級別提供了額外的 JMX 指標,用于單獨監控分區級別的 Lag 和 Lead 值。JMX 名稱為:kafka.consumer:type=consumer-fetch-manager-metrics,partition=“{partition}”,topic=“{topic}”,client-id=“{client-id}”。 ## 總結 * 生產環境推薦使用 Kafka JMX
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看