# Kafka系統工具
### 前言
Kafka為我們提供了很多有用的系統工具,這些工具都放在kafka.tools包中,具體的類如下圖:

### 工具介紹
下面簡要的介紹一下一些比較常用的工具:
**1. Consumer Offset Checker**
用來展示消費者組、話題、分區、指針偏移量(offset)等值。
可選描述:
~~~
--broker-info 打印broker的信息
--group 消費者組
--help 打印幫助信息
--retry.backoff.ms <Integer> 錯誤查詢重新嘗試間隔(默認為3000)
--socket.timeout.ms <Integer> 連接超時時間 (默認為6000)
--topic 消費者的某一個話題,缺失時默認包含所有的話題
--zookeeper zookeeper的地址(默認為localhost:2181)
~~~
**2. Dump Log Segments**
從日志文件中打印消息或者驗證日志下標是否正確。
可選描述:
~~~
--deep-iteration 使用深迭代而不是淺迭代
--files <file1, file2, ...> 輸入的文件
--key-decoder-class 用自己定義的反序列化方式反序列化關鍵字
--max-message-size <Integer: size> 消息最大的字節數(默認為5242880)
--print-data-log 同時打印出日志消息
--value-decoder-class 用自己定義的序列化方式來序列化關鍵字
--verify-index-only 只是認證下標
~~~
**3. Export Zookeeper Offsets**
把不同Kafka節點分區中的指針偏移量輸出到如下格式的文件中:
> /consumers/group1/offsets/topic1/1-0:286894308
/consumers/group1/offsets/topic1/2-0:284803985
可選描述:
~~~
--group 消費者組
--help 打印幫助信息
--output-file 導出的文件名
--zkconnect zookeeper的地址(默認為localhost:2181)
~~~
**4. Get Offset Shell**
獲得某一個消息的指針偏移量。
可選描述:
~~~
--broker-list <hostname:port,..., 每個broker的主機名和端口號
hostname:port>
--max-wait-ms <Integer: ms> 每次抓取最長的等待時間 (默認為1000)
--offsets <Integer: count> 返回的偏移量的數目(默認為1)
--partitions <partition ids> 需要在哪些分區中查詢,默認為所有的分區
--time <Long: timestamp/-1(latest)/-2 設定時間區間
(earliest)>
--topic <topic> 設定特定的話題
~~~
**5. Import Zookeeper Offsets**
把導出的指針偏移量文件再倒入并放到對應的分區中
必須的參數:
group
可選描述:
~~~
--help 打印幫助信息
--input-file 需要導入的文件
--zkconnect zookeeper的地址(默認為localhost:2181)
~~~
**6. JMX Tool**
通過JMX管理來打印metrics
可選描述:
~~~
--attributes <name> 需要查詢的屬性的白名單
--date-format <format> 用來格式化時間信息
--help 打印幫助信息
--jmx-url <service-url> 獲取JMX信息的URL
--object-name <name> 查詢特定的JMX對象信息,可以設置多個值,
如果不設置會查詢所有的JMX對象
--reporting-interval <Integer: ms> 多久獲取一次JMX信息 (默認為2000)
~~~
**7. Kafka Migration**
可以將Kafka從0.7版本遷移到0.8版本
可選描述:
~~~
--blacklist <Java regex (String)> 不需要復制的話題的黑名單
--consumer.config <config file> 消費者配置文件
--help 打印幫助信息
--kafka.07.jar <kafka 0.7 jar> kafka 0.7版本的壓縮包
--num.producers <Integer: Number of 生產者實例數目(默認為1)
producers>
--num.streams <Integer: Number of 消費者實例數目(默認為1)
consumer threads>
--producer.config <config file> 生產者配置文件
--queue.size <Integer: Queue size in 在版本間遷移時消息的緩沖數目
terms of number of messages>
--whitelist <Java regex (String)> 需要從舊集群復制過去的話題的白名單
--zkclient.01.jar <zkClient 0.1 jar zookeeper 0.1版本壓縮包
file required by Kafka 0.7>
~~~
**8. Mirror Maker**
提供Kafka集群之間的映射關系,實現跨集群的同步。
可選描述:
~~~
--abort.on.send.failure <Stop the 出錯時是否終止操作(默認為true)
entire mirror maker when a send
failure occurs>
--blacklist <Java regex (String)> 不需要同步的話題的黑名單
--consumer.config <config file> 消費者配置文件
--consumer.rebalance.listener <A 消費者復雜均衡監聽器
custom rebalance listener of type
ConsumerRebalanceListener>
--help 打印幫助信息
--message.handler <A custom message 在生產者和消費者之間來處理消息的處理器
handler of type
MirrorMakerMessageHandler>
--message.handler.args <Arguments 消費者負載均衡參數
passed to message handler
constructor.>
--new.consumer 在mirror maker時使用新的消費者
--num.streams <Integer: Number of 指定消費者的線程數(默認為1)
threads>
--offset.commit.interval.ms <Integer: 偏移量提交間隔(默認為60000)
offset commit interval in
millisecond>
--producer.config <config file> 生產者配置文件
--rebalance.listener.args <Arguments 消費者負載均衡參數
passed to custom rebalance listener
constructor as a string.>
--whitelist <Java regex (String)> 需要同步的話題的白名單
~~~
**9. State Change Log Merger**
狀態轉變日志整合工具,可以把來自不同節點不同時間的日志進行整合。
可選描述:
~~~
--end-time <end timestamp in the 需要整合的日志的截止時間,在這之前的
format java.text. 日志都要整合(默認為 9999-12-31 23:59:59,999)
SimpleDateFormat@f17a63e7>
--logs <file1,file2,...> 需要整合的日志
--logs-regex <for example: /tmp/state- 日志名的正則表達式
change.log*>
--partitions <0,1,2,...> 哪些分區的日志需要整合
--start-time <start timestamp in the 需要整合的日志的開始時間,在這之后的
format java.text. 日志都要整合(默認為 0000-00-00 00:00:00,000)
SimpleDateFormat@f17a63e7>
--topic <topic> 哪些話題的日志需要整合
~~~
**10. Verify Consumer Rebalance**
確認消費者是否平衡,確保每個分區只有一個消費者,因為Kafka不支持多個消費者同時對一個分區進行讀寫。
可選描述:
~~~
--group 消費者組
--help 打印幫助信息
--zookeeper.connect zookeeper的地址(默認為localhost:2181)
~~~
### 使用方式
### 腳本命令
在${KAFKA_HOME}/bin目錄下(windows用戶可以在${KAFKA_HOME}/bin/windows中找到對應的bat腳本)為我們封裝好了很多基本的命令腳本,可以直接調用,如:
~~~
drfish@kafka:~/kafka_2.11-0.9.0.0$ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --group group-1
~~~
### 類調用
在${KAFKA_HOME}/bin目錄下還有一個特殊的腳本kafka-run-class.sh,它可以調用所需的類來運行類中的方法,具體方法如下:
~~~
drfish@kafka:~/kafka_2.11-0.9.0.0$ bin/kfka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group=group-1
~~~
需要注意的是,當通過調用類來使用工具時,需要使用完整的類名,而且有些類名使用了縮寫,如kafka.tools.ImportZkOffsets。
### Java代碼
同樣我們可以用Java代碼直接通過類來調用相應的工具:
~~~
String[] arg = new String[] { "--zookeeper=10.64.253.238:2181",
"--group=group-1" };
ConsumerOffsetChecker.main(arg);
~~~
### 運行結果
上面三種不同的調用方式,最后都會返回類似如下的結果:
~~~
Group Topic Pid Offset LogSize Lag Owner
group-1 test 0 255229 255229 0 none
~~~
### 總結
本文介紹了一下Kafka提供的系統工具的常用工具,并給出了通過不同方式來運用工具的方法,能夠幫助大家更好地來管理Kafka集群。