<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                # **生產者配置參數解釋:** ## **bootstrap.servers**?:kafka集群broker的地址 ## **key.serializer**:關鍵字的序列化方式 ## **value.serializer**:消息值的序列化方式 ## **acks**:指定必須要有多少個分區的副本接收到該消息,服務端才會向生產者發送響應,可選值為:0,1,2,…,all,如果設置為0,producter就只管發出不管kafka server有沒有確認收到。設置all則表示kafka所有的分區副本全部確認接收到才返回。 ## **buffer.memory**:生產者的內存緩沖區大小。如果生產者發送消息的速度 > 消息發送到kafka的速度,那么消息就會在緩沖區堆積,導致緩沖區不足。這個時候,send()方法要么阻塞,要么拋出異常。 ## **max.block.ms**:表示send()方法在拋出異常之前可以阻塞多久的時間,默認是60s ## **compression.type**:消息在發往kafka之前可以進行壓縮處理,以此來降低存儲開銷和網絡帶寬。默認值是null,可選的壓縮算法有snappy、gzip和lz4 ## **retries**:生產者向kafka發送消息可能會發生錯誤,有的是臨時性的錯誤,比如網絡突然阻塞了一會兒,有的不是臨時的錯誤,比如“消息太大了”,對于出現的臨時錯誤,可以通過重試機制來重新發送 ## **retry.backoff.ms**:每次重試之間間隔的時間,第一次失敗了,那么休息一會再重試,休息多久,可以通過這個參數來調節 ## **batch.size**:生產者在發送消息時,可以將即將發往同一個分區的消息放在一個批次里,然后將這個批次整體進行發送,這樣可以節約網絡帶寬,提升性能。該參數就是用來規約一個批次的大小的。但是生產者并不是說要等到一個批次裝滿之后,才會發送,不是這樣的,有時候半滿,甚至只有一個消息的時候,也可能會發送,具體怎么選擇,我們不知道,但是不是說非要等裝滿才發。因此,如果把該參數調的比較大的話,是不會造成消息發送延遲的,但是會占用比較大的內存。但是如果設置的太小,會造成消息發送次數增加,會有額外的IO開銷 ## **linger.ms**:生產者在發送一個批次之前,可以適當的等一小會,這樣可以讓更多的消息加入到該批次。這樣會造成延時增加,但是降低了IO開銷,增加了吞吐量 ## **client.id**:服務器用來標志消息的來源,是一個任意的字符串 ## **max.in.flight.requests.per.connection**:一個消息發送給kafka集群,在收到服務端的響應之前的這段時間里,生產者還可以發n-1個消息。這個參數配置retries,可以保證消息的順序,后面會介紹 ## **request.timeout.ms**:生產者在發送消息之后,到收到服務端響應時,等待的時間限制 ## **max.request.size**:生產者發送消息的大小。可以是一個消息的大小,也可以發送的一個批次的消息大小 ## **receive.buffer.bytes和send.buffer.bytes**:tcp socket接收和發送消息的緩沖區大小,其實指的就是ByteBuffer的大小 # **消費者配置參數解釋:** ## **groupid**:一個字符串用來指示一組consumer所在的組群。實現同一個topic可由不同的組群消費 ## **auto.offset.reset:可選三個參數** earliest ---當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 latest---當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 none---topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常 ## **socket.timeout.ms**:默認值:3000,socket超時時間。 ## **socket.buffersize**:?默認值:64\*1024,socket receive buffer。 ## **fetch.size**?:默認值:300 \* 1024,控制在一個請求中獲取的消息的字節數。 這個參數在0.8.x中由fetch.message.max.bytes,fetch.min.bytes取代。 ## **backoff.increment.ms**:默認值:1000,這個參數避免在沒有新數據的情況下重復頻繁的拉數據。 如果拉到空數據,則多推后這個時間。 ## **queued.max.message.chunks**:默認值:2,consumer內部緩存拉回來的消息到一個隊列中。 這個值控制這個隊列的大小。 ## **auto.commit.enable**:默認值:true,如果true,consumer定期地往zookeeper寫入每個分區的offset。 ## **auto.commit.interval.ms**:默認值:10000,往zookeeper上寫offset的頻率。 ## **auto.offset.reset**:默認值:largest,如果offset出了返回,則 smallest: 自動設置reset到最小的offset. largest : 自動設置offset到最大的offset. 其它值不允許,會拋出異常。 ## **consumer.timeout.ms**:默認值:\-1,默認-1,consumer在沒有新消息時無限期的block。如果設置一個正值, 一個超時異常會拋出。 ## **rebalance.retries.max**:默認值:4,rebalance時的最大嘗試次數。 max.poll.interval.ms:拉取的最大時間間隔,如果你一次拉取的比較多,建議加大這個值,長時間沒有調用poll,且間隔超過這個值時,就會認為這個consumer失敗了 ## **max.poll.records**:默認值:500,Consumer每次調用poll()時取到的records的最大數。 # **kafka常用命令:** ## 1、啟動kafka服務 bin/kafka-server-start.sh?config/server.properties & ## 2、停止kafka服務 ./kafka-server-stop.sh ## 3、查看所有的話題 ./kafka-topics.sh?--list --zookeeper localhost:2181 或: ./kafka-topics.sh--list \--bootstrap-server?localhost:9092 ## 4、查看所有話題的詳細信息 ./kafka-topics.sh?--zookeeper localhost:2181?--describe ## 5、列出指定話題的詳細信息 ./kafka-topics.sh?--zookeeper localhost:2181--describe ?--topic test-topic ## 6、刪除一個話題,開啟刪除權限[delete.topic.enable](#brokerconfigs_delete.topic.enable)\=true,且最好關閉自動創建auto.create.topics.enable=false。并保證沒有生產者與消費者此時作用于該主題,手動清除日志或設置檢查log.retention.check.interval.ms,默認5min ./kafka-topics.sh?--zookeeper localhost:2181?--delete ?--topic test\-topic ## 7、創建一個叫test的話題,有兩個分區,每個分區3個副本 ./kafka-topics.sh?--zookeeper localhost:2181--create --topic test --replication-factor 3--partitions 2 ?## 8、測試kafka發送和接收消息(啟動兩個終端) **發送消息(注意端口號為配置文件里面的端口號)** ./kafka-console-producer.sh?--broker-list localhost:9092?--topic test **消費消息(可能端口號與配置文件保持一致,或與發送端口保持一致)** ./kafka-console-consumer.sh?--bootstrap-server localhost:9092?--topic test --from-beginning ??#加了\--from-beginning 重頭消費所有的消息 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test ????????#不加\--from-beginning 從最新的一條消息開始消費 **老版本消費者命令需要指定 \--zookeeper localhost:2181 具體在0.9等之前版本, 0.10版本過度的可以指定zookeeper也可以標明是 \--new-consumer:** ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test?\--new-consumer ## 9、查看某個topic對應的消息數量 ./kafka-run-class.sh??kafka.tools.GetOffsetShell --broker-list localhost:9092?--topic test --time?-1 ## 10、顯示所有消費者 ./kafka-consumer-groups.sh?--bootstrap-server localhost:9092?--list ## 11、獲取正在消費的topic(console-consumer-63307)的group的offset ./kafka-consumer-groups.sh?--describe --group console-consumer-63307?--bootstrap-server localhost:9092 ## 12、顯示消費者 ./kafka-consumer-groups.sh?--bootstrap-server localhost:9092?--list # **kafka****集群配置****:** [https://blog.csdn.net/xuesp/article/details/88094326](https://blog.csdn.net/xuesp/article/details/88094326) **(注意zookeeper端口2181為常用即可)** [https://kafka.apache.org/documentation/#brokerconfigs](https://kafka.apache.org/documentation/#brokerconfigs)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看