# **生產者配置參數解釋:**
## **bootstrap.servers**?:kafka集群broker的地址
## **key.serializer**:關鍵字的序列化方式
## **value.serializer**:消息值的序列化方式
## **acks**:指定必須要有多少個分區的副本接收到該消息,服務端才會向生產者發送響應,可選值為:0,1,2,…,all,如果設置為0,producter就只管發出不管kafka server有沒有確認收到。設置all則表示kafka所有的分區副本全部確認接收到才返回。
## **buffer.memory**:生產者的內存緩沖區大小。如果生產者發送消息的速度 > 消息發送到kafka的速度,那么消息就會在緩沖區堆積,導致緩沖區不足。這個時候,send()方法要么阻塞,要么拋出異常。
## **max.block.ms**:表示send()方法在拋出異常之前可以阻塞多久的時間,默認是60s
## **compression.type**:消息在發往kafka之前可以進行壓縮處理,以此來降低存儲開銷和網絡帶寬。默認值是null,可選的壓縮算法有snappy、gzip和lz4
## **retries**:生產者向kafka發送消息可能會發生錯誤,有的是臨時性的錯誤,比如網絡突然阻塞了一會兒,有的不是臨時的錯誤,比如“消息太大了”,對于出現的臨時錯誤,可以通過重試機制來重新發送
## **retry.backoff.ms**:每次重試之間間隔的時間,第一次失敗了,那么休息一會再重試,休息多久,可以通過這個參數來調節
## **batch.size**:生產者在發送消息時,可以將即將發往同一個分區的消息放在一個批次里,然后將這個批次整體進行發送,這樣可以節約網絡帶寬,提升性能。該參數就是用來規約一個批次的大小的。但是生產者并不是說要等到一個批次裝滿之后,才會發送,不是這樣的,有時候半滿,甚至只有一個消息的時候,也可能會發送,具體怎么選擇,我們不知道,但是不是說非要等裝滿才發。因此,如果把該參數調的比較大的話,是不會造成消息發送延遲的,但是會占用比較大的內存。但是如果設置的太小,會造成消息發送次數增加,會有額外的IO開銷
## **linger.ms**:生產者在發送一個批次之前,可以適當的等一小會,這樣可以讓更多的消息加入到該批次。這樣會造成延時增加,但是降低了IO開銷,增加了吞吐量
## **client.id**:服務器用來標志消息的來源,是一個任意的字符串
## **max.in.flight.requests.per.connection**:一個消息發送給kafka集群,在收到服務端的響應之前的這段時間里,生產者還可以發n-1個消息。這個參數配置retries,可以保證消息的順序,后面會介紹
## **request.timeout.ms**:生產者在發送消息之后,到收到服務端響應時,等待的時間限制
## **max.request.size**:生產者發送消息的大小。可以是一個消息的大小,也可以發送的一個批次的消息大小
## **receive.buffer.bytes和send.buffer.bytes**:tcp socket接收和發送消息的緩沖區大小,其實指的就是ByteBuffer的大小
# **消費者配置參數解釋:**
## **groupid**:一個字符串用來指示一組consumer所在的組群。實現同一個topic可由不同的組群消費
## **auto.offset.reset:可選三個參數**
earliest ---當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
latest---當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
none---topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常
## **socket.timeout.ms**:默認值:3000,socket超時時間。
## **socket.buffersize**:?默認值:64\*1024,socket receive buffer。
## **fetch.size**?:默認值:300 \* 1024,控制在一個請求中獲取的消息的字節數。 這個參數在0.8.x中由fetch.message.max.bytes,fetch.min.bytes取代。
## **backoff.increment.ms**:默認值:1000,這個參數避免在沒有新數據的情況下重復頻繁的拉數據。 如果拉到空數據,則多推后這個時間。
## **queued.max.message.chunks**:默認值:2,consumer內部緩存拉回來的消息到一個隊列中。 這個值控制這個隊列的大小。
## **auto.commit.enable**:默認值:true,如果true,consumer定期地往zookeeper寫入每個分區的offset。
## **auto.commit.interval.ms**:默認值:10000,往zookeeper上寫offset的頻率。
## **auto.offset.reset**:默認值:largest,如果offset出了返回,則 smallest: 自動設置reset到最小的offset. largest : 自動設置offset到最大的offset. 其它值不允許,會拋出異常。
## **consumer.timeout.ms**:默認值:\-1,默認-1,consumer在沒有新消息時無限期的block。如果設置一個正值, 一個超時異常會拋出。
## **rebalance.retries.max**:默認值:4,rebalance時的最大嘗試次數。
max.poll.interval.ms:拉取的最大時間間隔,如果你一次拉取的比較多,建議加大這個值,長時間沒有調用poll,且間隔超過這個值時,就會認為這個consumer失敗了
## **max.poll.records**:默認值:500,Consumer每次調用poll()時取到的records的最大數。
# **kafka常用命令:**
## 1、啟動kafka服務
bin/kafka-server-start.sh?config/server.properties &
## 2、停止kafka服務
./kafka-server-stop.sh
## 3、查看所有的話題
./kafka-topics.sh?--list --zookeeper localhost:2181
或:
./kafka-topics.sh--list \--bootstrap-server?localhost:9092
## 4、查看所有話題的詳細信息
./kafka-topics.sh?--zookeeper localhost:2181?--describe
## 5、列出指定話題的詳細信息
./kafka-topics.sh?--zookeeper localhost:2181--describe ?--topic test-topic
## 6、刪除一個話題,開啟刪除權限[delete.topic.enable](#brokerconfigs_delete.topic.enable)\=true,且最好關閉自動創建auto.create.topics.enable=false。并保證沒有生產者與消費者此時作用于該主題,手動清除日志或設置檢查log.retention.check.interval.ms,默認5min
./kafka-topics.sh?--zookeeper localhost:2181?--delete ?--topic test\-topic
## 7、創建一個叫test的話題,有兩個分區,每個分區3個副本
./kafka-topics.sh?--zookeeper localhost:2181--create --topic test --replication-factor 3--partitions 2
?## 8、測試kafka發送和接收消息(啟動兩個終端)
**發送消息(注意端口號為配置文件里面的端口號)**
./kafka-console-producer.sh?--broker-list localhost:9092?--topic test
**消費消息(可能端口號與配置文件保持一致,或與發送端口保持一致)**
./kafka-console-consumer.sh?--bootstrap-server localhost:9092?--topic test --from-beginning ??#加了\--from-beginning 重頭消費所有的消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test ????????#不加\--from-beginning 從最新的一條消息開始消費
**老版本消費者命令需要指定 \--zookeeper localhost:2181 具體在0.9等之前版本, 0.10版本過度的可以指定zookeeper也可以標明是 \--new-consumer:**
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test?\--new-consumer
## 9、查看某個topic對應的消息數量
./kafka-run-class.sh??kafka.tools.GetOffsetShell --broker-list localhost:9092?--topic test --time?-1
## 10、顯示所有消費者
./kafka-consumer-groups.sh?--bootstrap-server localhost:9092?--list
## 11、獲取正在消費的topic(console-consumer-63307)的group的offset
./kafka-consumer-groups.sh?--describe --group console-consumer-63307?--bootstrap-server localhost:9092
## 12、顯示消費者
./kafka-consumer-groups.sh?--bootstrap-server localhost:9092?--list
# **kafka****集群配置****:**
[https://blog.csdn.net/xuesp/article/details/88094326](https://blog.csdn.net/xuesp/article/details/88094326)
**(注意zookeeper端口2181為常用即可)**
[https://kafka.apache.org/documentation/#brokerconfigs](https://kafka.apache.org/documentation/#brokerconfigs)