[TOC]
# kafka配置詳解
## 1 server.properties
~~~
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181
############################# Socket Server Settings #############################
#listeners=PLAINTEXT://:9092
# The port the socket server listens on
port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=192.168.1.31
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=192.168.1.31
# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
advertised.port=9092
# The number of threads handling network requests
num.network.threads=3
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=/root/soft/logs/kafka
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
~~~
## 2 producer.properties
~~~
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.producer.ProducerConfig for more details
############################# Producer Basics #############################
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
metadata.broker.list=localhost:9092
# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=
# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=sync
# specify the compression codec for all data generated: none, gzip, snappy, lz4.
# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively
compression.codec=none
# message encoder
serializer.class=kafka.serializer.DefaultEncoder
# allow topic level compression
#compressed.topics=
############################# Async Producer #############################
# maximum time, in milliseconds, for buffering data on the producer queue
#queue.buffering.max.ms=
# the maximum size of the blocking queue for buffering on the producer
#queue.buffering.max.messages=
# Timeout for event enqueue:
# 0: events will be enqueued immediately or dropped if the queue is full
# -ve: enqueue will block indefinitely if the queue is full
# +ve: enqueue will block up to this many milliseconds if the queue is full
#queue.enqueue.timeout.ms=
# the number of messages batched at the producer
#batch.num.messages=
~~~
## 3 kafka broker配置詳解
| Property| Default | Description |
| --- | --- | --- |
| broker.id | | 每個broker都可以用一個唯一的非負整數id進行標識;這個id可以作為broker的“名字”,并且它的存在使得broker無須混淆consumers就可以遷移到不同的host/port上。你可以選擇任意你喜歡的數字作為id,只要id是唯一的即可。|
| log.dirs | /tmp/kafka-logs | kafka存放數據的路徑。這個路徑并不是唯一的,可以是多個,路徑之間只需要使用逗號分隔即可;每當創建新partition時,都會選擇在包含最少partitions的路徑下進行。
| port | 6667 | server接受客戶端連接的端口 |
| zookeeper.connect| null | ZooKeeper連接字符串的格式為:hostname:port,此處hostname和port分別是ZooKeeper集群中某個節點的host和port;為了當某個host宕掉之后你能通過其他ZooKeeper節點進行連接,你可以按照一下方式制定多個hosts:hostname1:port1, hostname2:port2, hostname3:port3. ZooKeeper 允許你增加一個“chroot”路徑,將集群中所有kafka數據存放在特定的路徑下。當多個Kafka集群或者其他應用使用相同ZooKeeper集群時,可以使用這個方式設置數據存放路徑。這種方式的實現可以通過這樣設置連接字符串格式,如下所示: hostname1:port1,hostname2:port2,hostname3:port3/chroot/path 這樣設置就將所有kafka集群數據存放在/chroot/path路徑下。注意,在你啟動broker之前,你必須創建這個路徑,并且consumers必須使用相同的連接格式。|
| message.max.bytes | 1000000| server可以接收的消息最大尺寸。重要的是,consumer和producer有關這個屬性的設置必須同步,否則producer發布的消息對consumer來說太大。 |
| num.network.threads| 3 | server用來處理網絡請求的網絡線程數目;一般你不需要更改這個屬性。|
| num.io.threads | 8| server用來處理請求的I/O線程的數目;這個線程數目至少要等于硬盤的個數。|
| background.threads | 4 | 用于后臺處理的線程數目,例如文件刪除;你不需要更改這個屬性。|
| queued.max.requests | 500| 在網絡線程停止讀取新請求之前,可以排隊等待I/O線程處理的最大請求個數。 |
| host.name | null | broker的hostname;如果hostname已經設置的話,broker將只會綁定到這個地址上;如果沒有設置,它將綁定到所有接口,并發布一份到ZK |
| advertised.host.name| null | 如果設置,則就作為broker 的hostname發往producer、consumers以及其他brokers |
| advertised.port| null | 此端口將給與producers、consumers、以及其他brokers,它會在建立連接時用到; 它僅在實際端口和server需要綁定的端口不一樣時才需要設置。 |
| socket.send.buffer.bytes | 100 * 1024 | SO_SNDBUFF 緩存大小,server進行socket 連接所用 |
| socket.receive.buffer.bytes | 100 * 1024| SO_RCVBUFF緩存大小,server進行socket連接時所用|
| socket.request.max.bytes| 100 * 1024 * 1024 | server允許的最大請求尺寸; 這將避免server溢出,它應該小于Java heap size |
| num.partitions | 1 | 如果創建topic時沒有給出劃分partitions個數,這個數字將是topic下partitions數目的默認數值。 |
| log.segment.bytes | 1014*1024*1024 | topic partition的日志存放在某個目錄下諸多文件中,這些文件將partition的日志切分成一段一段的;這個屬性就是每個文件的最大尺寸;當尺寸達到這個數值時,就會創建新文件。此設置可以由每個topic基礎設置時進行覆蓋。查看 the per-topic ?configuration section|
| log.roll.hours| 24 * 7| 即使文件沒有到達log.segment.bytes,只要文件創建時間到達此屬性,就會創建新文件。這個設置也可以有topic層面的設置進行覆蓋;查看the per-topic ?configuration section|
| log.cleanup.policy| delete | |
| log.retention.minutes和log.retention.hours| 7 days | 每個日志文件刪除之前保存的時間。默認數據保存時間對所有topic都一樣。 log.retention.minutes 和 log.retention.bytes 都是用來設置刪除日志文件的,無論哪個屬性已經溢出。 這個屬性設置可以在topic基本設置時進行覆蓋。查看the per-topic ?configuration section|
| log.retention.bytes | -1 | 每個topic下每個partition保存數據的總量;注意,這是每個partitions的上限,因此這個數值乘以partitions的個數就是每個topic保存的數據總量。同時注意:如果log.retention.hours和log.retention.bytes都設置了,則超過了任何一個限制都會造成刪除一個段文件注意,這項設置可以由每個topic設置時進行覆蓋。查看the per-topic ?configuration section|
| log.retention.check.interval.ms| 5 minutes | 檢查日志分段文件的間隔時間,以確定是否文件屬性是否到達刪除要求。 |
| log.cleaner.enable | false | 當這個屬性設置為false時,一旦日志的保存時間或者大小達到上限時,就會被刪除;如果設置為true,則當保存屬性達到上限時,就會進行log compaction。|
| log.cleaner.threads | 1 | 進行日志壓縮的線程數 |
| log.cleaner.io.max.bytes.per.second | None | 進行log compaction時,log cleaner可以擁有的最大I/O數目。這項設置限制了cleaner,以避免干擾活動的請求服務。|
| log.cleaner.io.buffer.size | 500*1024*1024| log cleaner清除過程中針對日志進行索引化以及精簡化所用到的緩存大小。最好設置大點,以提供充足的內存。|
| log.cleaner.io.buffer.load.factor | 512*1024 | 進行log cleaning時所需要的I/O chunk尺寸。你不需要更改這項設置。 |
| log.cleaner.io.buffer.load.factor| 0.9| log cleaning中所使用的hash表的負載因子;你不需要更改這個選項。 |
| log.cleaner.backoff.ms | 15000 | 進行日志是否清理檢查的時間間隔 |
| log.cleaner.min.cleanable.ratio | 0.5| 這項配置控制log compactor試圖清理日志的頻率(假定log compaction是打開的)。默認避免清理壓縮超過50%的日志。這個比率綁定了備份日志所消耗的最大空間(50%的日志備份時壓縮率為50%)。更高的比率則意味著浪費消耗更少,也就可以更有效的清理更多的空間。這項設置在每個topic設置中可以覆蓋。查看the per-topic ?configuration section。 |
| log.cleaner.delete.retention.ms | 1day | 保存時間;保存壓縮日志的最長時間;也是客戶端消費消息的最長時間,榮log.retention.minutes的區別在于一個控制未壓縮數據,一個控制壓縮后的數據;會被topic創建時的指定時間覆蓋。 |
| log.index.size.max.bytes | 10*1024*1024 | 每個log segment的最大尺寸。注意,如果log尺寸達到這個數值,即使尺寸沒有超過log.segment.bytes限制,也需要產生新的log segment。|
| log.index.interval.bytes | 4096 | 當執行一次fetch后,需要一定的空間掃描最近的offset,設置的越大越好,一般使用默認值就可以 |
| log.flush.interval.messages | Long.MaxValue | log文件“sync”到磁盤之前累積的消息條數。因為磁盤IO操作是一個慢操作,但又是一個“數據可靠性”的必要手段,所以檢查是否需要固化到硬盤的時間間隔。需要在“數據可靠性”與“性能”之間做必要的權衡,如果此值過大,將會導致每次“發sync”的時間過長(IO阻塞),如果此值過小,將會導致“fsync”的時間較長(IO阻塞),如果此值過小,將會導致”發sync“的次數較多,這也就意味著整體的client請求有一定的延遲,物理server故障,將會導致沒有fsync的消息丟失。|
| log.flush.scheduler.interval.ms| Long.MaxValue| 檢查是否需要fsync的時間間隔|
| log.flush.interval.ms | Long.MaxValue | 僅僅通過interval來控制消息的磁盤寫入時機,是不足的,這個數用來控制”fsync“的時間間隔,如果消息量始終沒有達到固化到磁盤的消息數,但是離上次磁盤同步的時間間隔達到閾值,也將觸發磁盤同步。|
| log.delete.delay.ms | 60000 | 文件在索引中清除后的保留時間,一般不需要修改 |
| auto.create.topics.enable | true | 是否允許自動創建topic。如果是真的,則produce或者fetch 不存在的topic時,會自動創建這個topic。否則需要使用命令行創建topic |
| controller.socket.timeout.ms | 30000 | partition管理控制器進行備份時,socket的超時時間。|
| controller.message.queue.size | Int.MaxValue | controller-to-broker-channles的buffer 尺寸 |
| default.replication.factor | 1 | 默認備份份數,僅指自動創建的topics |
| replica.lag.time.max.ms | 10000 | 如果一個follower在這個時間內沒有發送fetch請求,leader將從ISR重移除這個follower,并認為這個follower已經掛了 |
| replica.lag.max.messages | 4000| 如果一個replica沒有備份的條數超過這個數值,則leader將移除這個follower,并認為這個follower已經掛了 |
| replica.socket.timeout.ms| 30*1000| leader 備份數據時的socket網絡請求的超時時間|
| replica.socket.receive.buffer.bytes| 64*1024| 備份時向leader發送網絡請求時的socket receive buffer |
| replica.fetch.max.bytes | 1024*1024 | 備份時每次fetch的最大值 |
| replica.fetch.min.bytes | 500| leader發出備份請求時,數據到達leader的最長等待時間 |
| replica.fetch.min.bytes | 1| 備份時每次fetch之后回應的最小尺寸 |
| num.replica.fetchers | 1 | 從leader備份數據的線程數 |
| replica.high.watermark.checkpoint.interval.ms | 5000 | 每個replica檢查是否將最高水位進行固化的頻率 |
| fetch.purgatory.purge.interval.requests| 1000 | fetch 請求清除時的清除間隔 |
| producer.purgatory.purge.interval.requests | 1000| producer請求清除時的清除間隔 |
| zookeeper.session.timeout.ms | 6000| zookeeper會話超時時間。 |
| zookeeper.connection.timeout.ms | 6000 | 客戶端等待和zookeeper建立連接的最大時間 |
| zookeeper.sync.time.ms | 2000| zk follower落后于zk leader的最長時間 |
| controlled.shutdown.enable | true| 是否能夠控制broker的關閉。如果能夠,broker將可以移動所有leaders到其他的broker上,在關閉之前。這減少了不可用性在關機過程中。|
| controlled.shutdown.max.retries | 3 | 在執行不徹底的關機之前,可以成功執行關機的命令數。
| controlled.shutdown.retry.backoff.ms | 5000 | 在關機之間的backoff時間 |
| auto.leader.rebalance.enable | true | 如果這是true,控制者將會自動平衡brokers對于partitions的leadership |
| leader.imbalance.per.broker.percentage | 10 | 每個broker所允許的leader最大不平衡比率 |
| leader.imbalance.check.interval.seconds | 300| 檢查leader不平衡的頻率 |
| offset.metadata.max.bytes | 4096 | 允許客戶端保存他們offsets的最大個數 |
| max.connections.per.ip| Int.MaxValue | | 每個ip地址上每個broker可以被連接的最大數目 |
| max.connections.per.ip.overrides | | 每個ip或者hostname默認的連接的最大覆蓋 |
| connections.max.idle.ms | 600000| 空連接的超時限制 |
| log.roll.jitter.{ms,hours} |0 |從logRollTimeMillis抽離的jitter最大數目 |
| num.recovery.threads.per.data.dir |1 |每個數據目錄用來日志恢復的線程數目 |
| unclean.leader.election.enable |true | 指明了是否能夠使不在ISR中replicas設置用來作為leader |
| delete.topic.enable |false |能夠刪除topic |
| offsets.topic.num.partitions | 50 |The number of partitions for the offset commit topic. Since changing this after deployment is currently unsupported, we recommend using a higher setting for production (e.g., 100-200). |
| offsets.topic.retention.minutes |1440 | 存在時間超過這個時間限制的offsets都將被標記為待刪除
| offsets.retention.check.interval.ms |600000 |offset管理器檢查陳舊offsets的頻率
| offsets.topic.replication.factor |3 |topic的offset的備份份數。建議設置更高的數字保證更高的可用性 |
| offset.topic.segment.bytes |104857600 | offsets topic的segment尺寸。 |
| offsets.load.buffer.size |5242880 |這項設置與批量尺寸相關,當從offsets segment中讀取時使用。 |
| offsets.commit.required.acks |-1 | 在offset commit可以接受之前,需要設置確認的數目,一般不需要更改 |
## 4 kafka producer配置詳解
| Name| Type| Default | Importance | Description|
| --- | --- | --- |--- |--- |
| boostrap.servers | list | | high | 用于建立與kafka集群連接的host/port組。數據將會在所有servers上均衡加載,不管哪些server是指定用于bootstrapping。這個列表僅僅影響初始化的hosts(用于發現全部的servers)。這個列表格式:host1:port1,host2:port2,...因為這些server僅僅是用于初始化的連接,以發現集群所有成員關系(可能會動態的變化),這個列表不需要包含所有的servers(你可能想要不止一個server,盡管這樣,可能某個server宕機了)。如果沒有server在這個列表出現,則發送數據會一直失敗,直到列表可用。|
| acks | string | 1 |high | producer需要server接收到數據之后發出的確認接收的信號,此項配置就是指procuder需要多少個這樣的確認信號。此配置實際上代表了數據備份的可用性。以下設置為常用選項(1)acks=0: 設置為0表示producer不需要等待任何確認收到的信息。副本將立即加到socket buffer并認為已經發送。沒有任何保障可以保證此種情況下server已經成功接收數據,同時重試配置不會發生作用(因為客戶端不知道是否失敗)回饋的offset會總是設置為-1;(2)acks=1: 這意味著至少要等待leader已經成功將數據寫入本地log,但是并沒有等待所有follower是否成功寫入。這種情況下,如果follower沒有成功備份數據,而此時leader又掛掉,則消息會丟失。(3)acks=all: 這意味著leader需要等待所有備份都成功寫入日志,這種策略會保證只要有一個備份存活就不會丟失數據。這是最強的保(4)其他的設置,例如acks=2也是可以的,這將需要給定的acks數量,但是這種策略一般很少用 |
|buffer.memory | long |33554432 | high |producer可以用來緩存數據的內存大小。如果數據產生速度大于向broker發送的速度,producer會阻塞或者拋出異常,以“block.on.buffer.full”來表明。 這項設置將和producer能夠使用的總內存相關,但并不是一個硬性的限制,因為不是producer使用的所有內存都是用于緩存。一些額外的內存會用于壓縮(如果引入壓縮機制),同樣還有一些用于維護請求。 | |compression.type |string | none | high |producer用于壓縮數據的壓縮類型。默認是無壓縮。正確的選項值是none、gzip、snappy。 壓縮最好用于批量處理,批量處理消息越多,壓縮性能越好。 |
|retries |int | 0 |high |設置大于0的值將使客戶端重新發送任何數據,一旦這些數據發送失敗。注意,這些重試與客戶端接收到發送錯誤時的重試沒有什么不同。允許重試將潛在的改變數據的順序,如果這兩個消息記錄都是發送到同一個partition,則第一個消息失敗第二個發送成功,則第二條消息會比第一條消息出現要早。 |
|batch.size | int |16384 | medium | producer將試圖批處理消息記錄,以減少請求次數。這將改善client與server之間的性能。這項配置控制默認的批量處理消息字節數。不會試圖處理大于這個字節數的消息字節數。發送到brokers的請求將包含多個批量處理,其中會包含對每個partition的一個請求。較小的批量處理數值比較少用,并且可能降低吞吐量(0則會僅用批量處理)。較大的批量處理數值將會浪費更多內存空間,這樣就需要分配特定批量處理數值的內存大小。 |
|client.id | string | | medium |當向server發出請求時,這個字符串會發送給server。目的是能夠追蹤請求源頭,以此來允許ip/port許可列表之外的一些應用可以發送信息。這項應用可以設置任意字符串,因為沒有任何功能性的目的,除了記錄和跟蹤 |
|linger.ms | long | 0 |medium | producer組將會匯總任何在請求與發送之間到達的消息記錄一個單獨批量的請求。通常來說,這只有在記錄產生速度大于發送速度的時候才能發生。然而,在某些條件下,客戶端將希望降低請求的數量,甚至降低到中等負載一下。這項設置將通過增加小的延遲來完成--即,不是立即發送一條記錄,producer將會等待給定的延遲時間以允許其他消息記錄發送,這些消息記錄可以批量處理。這可以認為是TCP種Nagle的算法類似。這項設置設定了批量處理的更高的延遲邊界:一旦我們獲得某個partition的batch.size,他將會立即發送而不顧這項設置,然而如果我們獲得消息字節數比這項設置要小的多,我們需要“linger”特定的時間以獲取更多的消息。 這個設置默認為0,即沒有延遲。設定linger.ms=5,例如,將會減少請求數目,但是同時會增加5ms的延遲。 |
|max.request.size |int |1028576 |medium |請求的最大字節數。這也是對最大記錄尺寸的有效覆蓋。注意:server具有自己對消息記錄尺寸的覆蓋,這些尺寸和這個設置不同。此項設置將會限制producer每次批量發送請求的數目,以防發出巨量的請求。 |
|receive.buffer.bytes |int |32768 | medium | TCP receive緩存大小,當閱讀數據時使用 |
|send.buffer.bytes | int |131072 |medium | TCP send緩存大小,當發送數據時使用 |
|timeout.ms |int |30000 | medium | 此配置選項控制server等待來自followers的確認的最大時間。如果確認的請求數目在此時間內沒有實現,則會返回一個錯誤。這個超時限制是以server端度量的,沒有包含請求的網絡延遲 |
|block.on.buffer.full |boolean |true |low |當我們內存緩存用盡時,必須停止接收新消息記錄或者拋出錯誤。默認情況下,這個設置為真,然而某些阻塞可能不值得期待,因此立即拋出錯誤更好。設置為false則會這樣:producer會拋出一個異常錯誤:BufferExhaustedException, 如果記錄已經發送同時緩存已滿 |
|metadata.fetch.timeout.ms | long | 60000 |low |是指我們所獲取的一些元素據的第一個時間數據。元素據包含:topic,host,partitions。此項配置是指當等待元素據fetch成功完成所需要的時間,否則會跑出異常給客戶端。 |
|metadata.max.age.ms |long | 300000 | low | 以微秒為單位的時間,是在我們強制更新metadata的時間間隔。即使我們沒有看到任何partition leadership改變。 |
|metric.reporters |list | [] |low | 類的列表,用于衡量指標。實現MetricReporter接口,將允許增加一些類,這些類在新的衡量指標產生時就會改變。JmxReporter總會包含用于注冊JMX統計 |
|metrics.num.samples |int |2 | low | 用于維護metrics的樣本數 |
|metrics.sample.window.ms | long | 30000 |low |metrics系統維護可配置的樣本數量,在一個可修正的window size。這項配置配置了窗口大小,例如。我們可能在30s的期間維護兩個樣本。當一個窗口推出后,我們會擦除并重寫最老的窗口 |
|recoonect.backoff.ms |long | 10 | low |連接失敗時,當我們重新連接時的等待時間。這避免了客戶端反復重連 |
|retry.backoff.ms | long |100 |low | 在試圖重試失敗的produce請求之前的等待時間。避免陷入發送-失敗的死循環中。 |
## 5 kafka consumer配置詳解
|Property |Default |Description |
| --- | --- | --- |
|group.id | |用來唯一標識consumer進程所在組的字符串,如果設置同樣的group id,表示這些processes都是屬于同一個consumer group |
|zookeeper.connect | |指定zookeeper的連接的字符串,格式是hostname:port,此處host和port都是zookeeper server的host和port,為避免某個zookeeper 機器宕機之后失聯,你可以指定多個 hostname:port,使用逗號作為分隔:hostname1:port1,hostname2:port2,hostname3:port3可以在zookeeper連接字符串中加入zookeeper的chroot路徑,此路徑用于存放他自己的數據,方式:hostname1:port1,hostname2:port2,hostname3:port3/chroot/path |
|consumer.id |null | 不需要設置,一般自動產生 |
|socket.timeout.ms |30*100 |網絡請求的超時限制。真實的超時限制是 max.fetch.wait+socket.timeout.ms socket.receive.buffer.bytes 64*1024 socket用于接收網絡請求的緩存大小 fetch.message.max.bytes 1024*1024 每次fetch請求中,針對每次fetch消息的最大字節數。這些字節將會督導用于每個partition的內存中,因此,此設置將會控制consumer所使用的memory大小。這個fetch請求尺寸必須至少和server允許的最大消息尺寸相等,否則,producer可能發送的消息尺寸大于consumer所能消耗的尺寸。 |
|num.consumer.fetchers | 1 |用于fetch數據的fetcher線程數 |
|auto.commit.enable |true |如果為真,consumer所fetch的消息的offset將會自動的同步到 zookeeper。這項提交的offset將在進程掛掉時,由新的consumer使用 |
|auto.commit.interval.ms |60*1000 | consumer向zookeeper提交offset的頻率,單位是秒 |
|queued.max.message.chunks | 2 | 用于緩存消息的最大數目,以供consumption。每個chunk必須和fetch.message.max.bytes相同 |
|rebalance.max.retries | 4 | 當新的consumer加入到consumer group時,consumers集合試圖重新平衡分配到每個consumer的partitions數目。如果consumers集合改變了,當分配正在執行時,這個重新平衡會失敗并重入 |
|fetch.min.bytes |1 |每次fetch請求時,server應該返回的最小字節數。如果沒有足夠的數據返回,請求會等待,直到足夠的數據才會返回。 |
|fetch.wait.max.ms |100 | 如果沒有足夠的數據能夠滿足fetch.min.bytes,則此項配置是指在應答fetch請求之前,server會阻塞的最大時間。 |
|rebalance.backoff.ms |2000 |在重試reblance之前backoff時間 |
|refresh.leader.backoff.ms | 200 |在試圖確定某個partition的leader是否失去他的leader地位之前,需要等待的backoff時間 |
|auto.offset.reset | largest |zookeeper中沒有初始化的offset時,如果offset是以下值的回應: smallest:自動復位offset為smallest的offset,largest:自動復位offset為largest的offset,anything else:向consumer拋出異常 consumer.timeout.ms -1 如果沒有消息可用,即使等待特定的時間之后也沒有,則拋出超時異常 |
|exclude.internal.topics | true |是否將內部topics的消息暴露給consumer |
|paritition.assignment.strategy | range |選擇向consumer 流分配partitions的策略,可選值:range,roundrobin |
|client.id | group id | value 是用戶特定的字符串,用來在每次請求中幫助跟蹤調用。它應該可以邏輯上確認產生這個請求的應用 |
|zookeeper.session.timeout.ms |6000 | zookeeper 會話的超時限制。如果consumer在這段時間內沒有向zookeeper發送心跳信息,則它會被認為掛掉了,并且reblance將會產生 |
|zookeeper.connection.timeout.ms | 6000 | 客戶端在建立通zookeeper連接中的最大等待時間 |
|zookeeper.sync.time.ms |2000 |ZK follower可以落后ZK leader的最大時間 |
|offsets.storage zookeeper | |用于存放offsets的地點: zookeeper或者kafka |
|offset.channel.backoff.ms | 1000 | 重新連接offsets channel或者是重試失敗的offset的fetch/commit請求的backoff時間 |
|offsets.channel.socket.timeout.ms | 10000 | 當讀取offset的fetch/commit請求回應的socket 超時限制。此超時限制是被consumerMetadata請求用來請求offset管理 |
|offsets.commit.max.retries |5 |重試offset commit的次數。這個重試只應用于offset commits在shut-down之間。他 |
|dual.commit.enabled |true |如果使用“kafka”作為offsets.storage,你可以二次提交offset到zookeeper(還有一次是提交到kafka)。在zookeeper-based的offset storage到kafka-based的offset storage遷移時,這是必須的。對任意給定的consumer group來說,比較安全的建議是當完成遷移之后就關閉這個選項 |
|partition.assignment.strategy |range | 在“range”和“roundrobin”策略之間選擇一種作為分配partitions給consumer 數據流的策略; 循環的partition分配器分配所有可用的partitions以及所有可用consumer 線程。它會將partition循環的分配到consumer線程上。如果所有consumer實例的訂閱都是確定的,則partitions的劃分是確定的分布。循環分配策略只有在以下條件滿足時才可以:(1)每個topic在每個consumer實力上都有同樣數量的數據流。(2)訂閱的topic的集合對于consumer group中每個consumer實例來說都是確定的。 |
- hadoop
- linux基礎
- Linux入門
- Linux進階
- shell
- Zookeeper
- Zookeeper簡介及部署
- Zookeeper使用及API
- Redis
- Redis簡介安裝部署
- Redis使用及API
- Java高級增強
- Java多線程增強
- Maven簡介及搭建
- Hive
- Hive簡介及安裝
- Hive操作
- HIve常用函數
- Hive數據類型
- Flume
- Flume簡介及安裝
- flume 攔截器(interceptor)
- azkaban
- azKaban簡介及安裝
- Sqoop
- Sqoop簡介及安裝
- HDFS
- HDFS原理
- HDFS操作API
- MAPREDUCE原理
- MAPREDUCE圖片資源
- MAPREDUCE加強
- HBASE
- HBASE簡介及安裝
- HBASE操作及API
- HBASE內部原理
- Storm
- Storm簡介及安裝
- Storm原理
- kafka
- kafka簡介及安裝
- kafka常用操作及API
- kafka原理
- kafka配置詳解
- Scala
- Scala簡介及安裝
- Scala基礎語法
- Scala實戰