<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                [TOC] # 安裝 ## 下載 安裝jdk,并配置好環境變量 下載kafka到/tmp下 ~~~ wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.11.0.0/kafka_2.12-0.11.0.0.tgz -P /tmp ~~~ 你可以登錄Apache kafka 官方下載。 http://kafka.apache.org/downloads.html 注意:別下成源文件了! 帶src的是源文件,如: ~~~ Source download: kafka-0.10.1.0-src.tgz (asc, md5) ~~~ 你應該下的是: ~~~ Scala 2.11 - kafka_2.11-0.10.1.0.tgz (asc, md5) ~~~ 推薦下載scala 2.11版本的 ## server.properties配置文件 在kafak目錄下創建個logs文件夾 解壓去config文件夾下,編輯server.properties ~~~ # broker的全局唯一編號,不能重復 broker.id=0 # 用來監聽鏈接的端口,producer或consumer將在此端口建立連接 port=9092 #刪除topic功能使能 delete.topic.enable=true # 處理網絡請求的線程數量 num.network.threads=3 # 用來處理磁盤IO線程數量 num.io.threads=8 # 發送套接字的緩沖區大小 socket.send.buffer.bytes=102400 # 接受套接字的緩沖區大小 socket.receive.buffer.bytes=102400 # 請求套接字的緩沖區大小 socket.request.max.bytes=104857600 # kafka運行日志存放的路徑 log.dirs=/root/tools/kafka/logs # topic在當前broker上的分片個數 num.partitions=2 # 用來恢復和清理data下數據 num.recovery.threads.per.data.dir=1 # segment文件保留的最長時間,超時將被刪除,和下面參數配合,達到這個時間生成新的 log.retention.hours=168 # 滾動生成新的segment文件的最大時間 log.roll.hours=168 # 日志文件中每個segment的大小,默認為1G log.segment.bytes=1073741824 # 周期性檢查文件大小的時間,檢查上面一個參數 log.retention.check.interval.ms=300000 # 消息保存的最大值5M message.max.byte=5242880 #kafka保存消息的副本數,如果一個副本失效了,另一個還可以繼續提供服務 default.replication.factor=2 # 日志清理是否打開 log.cleaner.enable=true # broker需要使用zookeeper保存meta數據 zookeeper.connect=master:2181,slave1:2181,slave2:2181 # zookeeper鏈接超時時間 zookeeper.connection.timeout.ms=6000 # partion buffer中,消息的條數達到閾值,將觸發flush到磁盤 log.flush.interval.messages=10000 # 消息buffer的時間,達到閾值,將觸發flush到磁盤 log.flush.interval.ms=3000 # 刪除topic需要server.properties中設置delete.topic.enable=true否則只是標記刪除 delete.topic.enable=true # 此處的host.name為本機ip,如果不改,則客戶端會拋出:Producer connection to localhost:9092 unsuccessful錯誤 host.name=master advertised.host.name=192.168.33.70 ~~~ 分發到各個機器上 **`改下broker.id還有host.name還有advertised.host.name還有log的位置`** 配置下kafka的環境變量 ## producer.proerties配置文件 ~~~ #指定kafka節點列表,用于獲取metadata,不必全部指定 metadata.broker.list=master:9092,slave1:9092,slave2:9092 #指定分區處理類.,默認kafka.producer.DefaultPartitioner,表通過key哈希到對應分區 #partitioner.class=kafka.producer.DefaultPartitioner #是否壓縮,默認0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮.壓縮后消息中會有頭來指明消息壓縮的類型,故在消費者端消息解壓是透明的無需指定 compression.codec=none #指定序列化處理類,數據傳輸需要序列化 serializer.class=kafka.serializer.DefaultEncoder #如果要壓縮消息,這里指定哪些topic要壓縮消息,默認empty,表示不壓縮 #compressed.topic= #設置發送數據是否需要服務端的反饋,有三個值0,-1,1 # 0:producer不會等待broker發送ack # 1:當leader接收到消息之后發送ack # -1:當所有的follower都同步消息成功后發送ack request.required.acks=0 #在向producer發送ack之前,broker允許等待的最大時間,如果超時,broker將會向producer發送一個error ACK.意味著上一次消息因為某種原因未能成功(比如follower未能同步成功) request.timeout.ms=10000 #同步還是異步發送消息,默認sync表示同步,async表示異步. #異步可以提高發送吞吐量,也意味著消息將會在本地buffer中,并適時批量發送,但是也可能導致丟失未發送過去的消息 producer.type=sync #在async模式下,當message被緩存的時間超過此值后,將會批量發送給broker,默認為5000ms #在值和batch.num.message協同工作 queue.buffering.max.ms=5000 #在async模式下,producer端允許buffer的最大消息量 #無論如何,producer都無法盡快的將消息發送給broker,從而導致消息在producer端大量沉積 #此時,如果消息的條數達到閾值,將會導致producer端阻塞或者消息被拋棄,默認為10000 queue.buffering.max.messages=20000 #如果是異步,指定每次批量發送數據量,默認為200 batch.num.messages=500 #當消息在producer端沉積的條數達到"queue.buffering.max.messages"后 #阻塞一定時間后,隊列仍然沒有enqueue(producer仍然沒有發送任何消息) #此時producer可以繼續阻塞或者將消息拋棄,此timeout值用于控制"阻塞"的時間 #-1:無阻塞超時限制,消息不會被拋棄 #0:立即清空隊列,消息被拋棄 queue.enqueue.timeout.ms=-1 #當producer接收到error ACK,或者沒有接收到ACK時,允許消息重發的次數 #因為broker并沒有完整的機制來避免消息重復,所以當網絡異常時(比如ACK丟失) #有可能導致broker接收到重復的消息,默認值為3 message.send.max.retries=3 #producer刷新topic metadata的時間間隔,producer需要知道partition leader的位置,以及當前topic的情況 #因此producer需要一個機制來獲取最新的metadata,當producer遇到特定錯誤時,將會立即刷新 #(比如topic失效,partition丟失,leader失效等),此外也可以通過此參數來配置額外的刷新機制,默認值600000 topic.metadata.refresh.interval.ms=60000 ~~~ **`metadata.broker.list`要修改地址** ## consumer.properties配置文件 ~~~ #zookeeper連接服務器地址 zookeeper.connect=master:2181,slave1:2181,slave2:2181 #zookeeper的session過期時間,默認5000ms,用于檢測消費者是否掛掉 zookeeper.session.timeout.ms=5000 #當消費者掛掉,其他消費者要等該指定時間才能檢查到并且觸發重新負載均衡 zookeeper.connection.timeout.ms=10000 #指定多久消費者更新offset到zookeeper中.注意offset更新時基于time而不是每次獲得的消息.一旦在更新zookeeper發生異常并重啟,將可能拿到已拿到過的消息 zookeeper.sync.time.ms=2000 #指定消費 #group.id=master #當consumer消費一定量的消息之后,將會自動向zookeeper提交offset信息 #注意offset信息并不是每消費一次消息就向zk提交一次,而先在本地保存(內存),并定期提交,默認為true auto.commit.enable=true #自動更新時間.默認60*1000 auto.commit.interval.ms=1000 #當前consumer的標識,可以設定,也可以有系統生成,主要用來跟蹤消息消費情況,便于觀察 consumer.id=xx #消費者客戶端編號,用戶區分不同客戶端,默認客戶端程序自動產生 client.id=xxx #最大取多少塊緩存到消費者(默認10) queued.max.message.chunks=50 #當有新的consumer加入到group時,將會reblance,此后將會有partitions的消費端遷移到新的consumer上. #如果一個consumer獲得了某個partition的消費權限,那么它將會向zk注冊"Partition Owner registry"節點信息, #但是有可能此時舊的consumer尚沒有釋放此節點,此值用于控制,注冊節點的重試次數 rebalance.max.retries=5 #獲取消息的最大尺寸,broker不會像consumer輸出大于此值的消息chunk.每次feth將得到多條消息,此值為總大小,提示此值,將會消耗更多的consumer端內存 fetch.min.bytes=6553600 #當消息的尺寸不足時,server阻塞的時間.如果超時,消息將立即發送給consumer #就是consumer拉取消息的時候,消息比如生產了一半,這時候會等待,等待的時間超過這個時候就不管了,發給consumer fetch.wait.max.ms=5000 socket.receive.buffer.bytes=655360 #如果zookeeper沒有offset值或者offset值超出范圍.那么就給個初始的offset. #有smallest,largest,anything可選,分別表示給當前最小的offset,當前最大的offset,拋異常.默認largest auto.offset.reset=smallest #指定序列化處理類 derializer.class=kafka.serializer.DefaultDecoder ~~~ ## 啟動 然后各個節點啟動kafka ~~~ kafka-server-start.sh config/server.properties ~~~ 啟動成功,jps查看會出現kafka 如果報這種錯誤表示jdk版本要java8的 ~~~ kafka/Kafka : Unsupported major.minor version 52.0 ~~~ # 日志 ~~~ server.log #kafka的運行日志 state-change.log #kafka他是用zookeeper來保存狀態,所以他可能會進行切換,切換的日志就保存在這里 controller.log #kafka選擇一個節點作為“controller”,當發現有節點down掉的時候它負責在游泳分區的所有節點中選擇新的leader,這使得Kafka可以批量的高效的管理所有分區節點的主從關系。如果controller down掉了,活著的節點中的一個會備切換為新的controller. ~~~ # zk中查看狀態 ~~~ #查看目錄情況 執行“ls /” [zk: 127.0.0.1:2181(CONNECTED) 0] ls / #顯示結果:[consumers, config, controller, isr_change_notification, admin, brokers, zookeeper, controller_epoch] ''' 上面的顯示結果中:只有zookeeper是原生的,其他都是Kafka創建的 ''' #標注一個重要的 [zk: 127.0.0.1:12181(CONNECTED) 1] get /brokers/ids/0 {"jmx_port":-1,"timestamp":"1456125963355","endpoints":["PLAINTEXT://192.168.7.100:19092"],"host":"192.168.7.100","version":2,"port":19092} cZxid = 0x1000001c1 ctime = Mon Feb 22 15:26:03 CST 2016 mZxid = 0x1000001c1 mtime = Mon Feb 22 15:26:03 CST 2016 pZxid = 0x1000001c1 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x152e40aead20016 dataLength = 139 numChildren = 0 [zk: 127.0.0.1:12181(CONNECTED) 2] #還有一個是查看partion [zk: 127.0.0.1:12181(CONNECTED) 7] get /brokers/topics/shuaige/partitions/0 null cZxid = 0x100000029 ctime = Mon Feb 22 10:05:11 CST 2016 mZxid = 0x100000029 mtime = Mon Feb 22 10:05:11 CST 2016 pZxid = 0x10000002a cversion = 1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 1 [zk: 127.0.0.1:12181(CONNECTED) 8] ~~~ # 常見問題 ## 啟動advertised.listeners配置異常 ~~~ java.lang.IllegalArgumentException: requirement failed: advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address. at scala.Predef$.require(Predef.scala:277) at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1203) at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1170) at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:881) at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:878) at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28) at kafka.Kafka$.main(Kafka.scala:82) at kafka.Kafka.main(Kafka.scala) ~~~ **解決方法:修改server.properties** ~~~ advertised.listeners=PLAINTEXT://{ip}:9092 # ip可以內網、外網ip、127.0.0.1 或域名 ~~~ **解析** server.properties中有兩個listeners。 listeners:啟動kafka服務監聽的ip和端口,可以監聽內網ip和0.0.0.0(不能為外網ip),默認為java.net.InetAddress.getCanonicalHostName()獲取的ip。advertised.listeners:生產者和消費者連接的地址,kafka會把該地址注冊到zookeeper中,所以只能為除0.0.0.0之外的合法ip或域名 ,默認和listeners的配置一致 ## 啟動PrintGCDateStamps異常 ~~~ [0.004s][warning][gc] -Xloggc is deprecated. Will use -Xlog:gc:/data/service/kafka_2.11-0.11.0.2/bin/../logs/kafkaServer-gc.log instead. Unrecognized VM option 'PrintGCDateStamps' Error: Could not create the Java Virtual Machine. Error: A fatal exception has occurred. Program will exit. ~~~ **解決方法: 更換jdk1.8.x版本或者使用>=kafka1.0.x的版本** **解析** 只有在jdk1.9并且kafka版本在1.0.x之前的版本才會出現 ## 生成者發送message失敗或消費者不能消費(kafka1.0.1) ~~~ #(java)org.apache.kafka警告 Connection to node 0 could not be established. Broker may not be available. # (nodejs) kafka-node異常 (執行producer.send后的異常) { TimeoutError: Request timed out after 30000ms at new TimeoutError (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\errors\TimeoutError.js:6:9) at Timeout.setTimeout [as _onTimeout] (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\kafkaClient.js:737:14) at ontimeout (timers.js:466:11) at tryOnTimeout (timers.js:304:5) at Timer.listOnTimeout (timers.js:264:5) message: 'Request timed out after 30000ms' } ~~~ **解決方法**: 檢查advertised.listeners的配置(如果有多個Broker可根據java版本的對應的node號檢查配置),判斷當前的網絡是否可以連接到地址(telnet等) ## partitions配置的值過小造成錯誤(kafka1.0.1) ~~~ #(java)org.apache.kafka(執行producer.send) Exception in thread "main" org.apache.kafka.common.KafkaException: Invalid partition given with record: 1 is not in the range [0...1). at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:908) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:778) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:768) at com.wenshao.dal.TestProducer.main(TestProducer.java:36) # (nodejs) kafka-node異常 (執行producer.send后的異常) { BrokerNotAvailableError: Could not find the leader at new BrokerNotAvailableError (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\errors\BrokerNotAvailableError.js:11:9) at refreshMetadata.error (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\kafkaClient.js:831:16) at D:\project\node\kafka-test\src\node_modules\kafka-node\lib\client.js:514:9 at KafkaClient.wrappedFn (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\kafkaClient.js:379:14) at KafkaClient.Client.handleReceivedData (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\client.js:770:60) at Socket.<anonymous> (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\kafkaClient.js:618:10) at Socket.emit (events.js:159:13) at addChunk (_stream_readable.js:265:12) at readableAddChunk (_stream_readable.js:252:11) at Socket.Readable.push (_stream_readable.js:209:10) message: 'Could not find the leader' } ~~~ **解決方法**: 修改num.partitions的值,partitions在是在創建topic的時候默認創建的partitions節點的個數,只對新創建的topic生效,所有盡量在項目規劃時候定一個合理的值。也可以通過命令行動態擴容() ~~~ ./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 2 --topic foo ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看