<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                [TOC] # kafka角色 ~~~ * producer:生產者。 * consumer:消費者。 * topic: 消息以topic為類別記錄,Kafka將消息種子(Feed)分類, 每一類的消息稱之為一個主題(Topic)。 * broker:以集群的方式運行,可以由一個或多個服務組成,每個服務叫做一個broker;消費者可以訂閱一個或多個主題(topic), 并從Broker拉數據,從而消費這些已發布的消息。 ~~~ # 經典模型 1. 一個主題下的分區不能小于消費者數量,即一個主題下消費者數量不能大于分區屬,大了就浪費了空閑了 2. 一個主題下的一個分區可以同時被不同消費組其中某一個消費者消費 3. 一個主題下的一個分區只能被同一個消費組的一個消費者消費 ![](https://box.kancloud.cn/4a83b67503a6b98d54b311856f59ec3f_465x242.png) # 常用參數說明 ## request.required.acks ~~~ Kafka producer的ack有3中機制,初始化producer時的producerconfig可以通過配置request.required.acks不同的值來實現。 0:這意味著生產者producer不等待來自broker同步完成的確認繼續發送下一條(批)消息。此選項提供最低的延遲但最弱的耐久性保證(當服務器發生故障時某些數據會丟失,如leader已死,但producer并不知情,發出去的信息broker就收不到)。 1:這意味著producer在leader已成功收到的數據并得到確認后發送下一條message。此選項提供了更好的耐久性為客戶等待服務器確認請求成功(被寫入死亡leader但尚未復制將失去了唯一的消息)。 -1:這意味著producer在follower副本確認接收到數據后才算一次發送完成。 此選項提供最好的耐久性,我們保證沒有信息將丟失,只要至少一個同步副本保持存活。 三種機制,性能依次遞減 (producer吞吐量降低),數據健壯性則依次遞增。 ~~~ ## auto.offset.reset 1. earliest:自動將偏移重置為最早的偏移量 2. latest:自動將偏移量重置為最新的偏移量(默認) 3. none:如果consumer group沒有發現先前的偏移量,則向consumer拋出異常。 4. 其他的參數:向consumer拋出異常(無效參數) # kafka安裝 安裝kafka ~~~ # 官方下載地址:http://kafka.apache.org/downloads # wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.12-1.1.1.tgz tar -xzf kafka_2.12-1.1.1.tgz cd kafka_2.12-1.1.0 ~~~ 啟動kafka server ~~~ # 需先啟動zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties ~~~ 啟動kafka客戶端測試 ~~~ # 創建一個話題,test話題2個分區 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test Created topic "test". # 顯示所有話題 bin/kafka-topics.sh --list --zookeeper localhost:2181 test # 顯示話題信息 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:2 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0 # 啟動一個生產者(輸入消息) bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [等待輸入自己的內容 出現>輸入即可] >i am a new msg ! >i am a good msg ? # 啟動一個生產者(等待消息) # 注意這里的--from-beginning,每次都會從頭開始讀取,你可以嘗試去掉和不去掉看下效果 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning [等待消息] i am a new msg ! i am a good msg ? ~~~ 安裝kafka的php擴展 ~~~ git clone https://github.com/arnaud-lb/php-rdkafka.git cd php-rdkafka phpize ./configure make all -j 5 sudo make install vim [php]/php.ini extension=rdkafka.so ~~~ mac ~~~ brew install librdkafka pecl install rdkafka ~~~ # php代碼 ## 生產者 ~~~ <?php $conf = new RdKafka\Conf(); $conf->setDrMsgCb(function ($kafka, $message) { file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND); }); $conf->setErrorCb(function ($kafka, $err, $reason) { file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND); }); $rk = new RdKafka\Producer($conf); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("127.0.0.1"); $cf = new RdKafka\TopicConf(); $cf->set('request.required.acks', 0); $topic = $rk->newTopic("test", $cf); $option = 'qkl'; for ($i = 0; $i < 20; $i++) { //RD_KAFKA_PARTITION_UA自動選擇分區 //$option可選 $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option); } $len = $rk->getOutQLen(); while ($len > 0) { $len = $rk->getOutQLen(); var_dump($len); $rk->poll(50); } ~~~ ## 消費者 ~~~ <?php $conf = new RdKafka\Conf(); $conf->setDrMsgCb(function ($kafka, $message) { file_put_contents("./c_dr_cb.log", var_export($message, true), FILE_APPEND); }); $conf->setErrorCb(function ($kafka, $err, $reason) { file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND); }); //設置消費組 $conf->set('group.id', 'myConsumerGroup'); $rk = new RdKafka\Consumer($conf); $rk->addBrokers("127.0.0.1"); $topicConf = new RdKafka\TopicConf(); $topicConf->set('request.required.acks', 1); //在interval.ms的時間內自動提交確認、建議不要啟動 //$topicConf->set('auto.commit.enable', 1); $topicConf->set('auto.commit.enable', 0); $topicConf->set('auto.commit.interval.ms', 100); // 設置offset的存儲為file //$topicConf->set('offset.store.method', 'file'); // 設置offset的存儲為broker $topicConf->set('offset.store.method', 'broker'); //$topicConf->set('offset.store.path', __DIR__); //smallest:簡單理解為從頭開始消費,其實等價于上面的 earliest //largest:簡單理解為從最新的開始消費,其實等價于上面的 latest //$topicConf->set('auto.offset.reset', 'smallest'); $topic = $rk->newTopic("test", $topicConf); // 參數1消費分區0 // RD_KAFKA_OFFSET_BEGINNING 重頭開始消費 // RD_KAFKA_OFFSET_STORED 最后一條消費的offset記錄開始消費 // RD_KAFKA_OFFSET_END 最后一條消費 $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING); //$topic->consumeStart(0, RD_KAFKA_OFFSET_END); // //$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { //參數1表示消費分區,這里是分區0 //參數2表示同步阻塞多久 $message = $topic->consume(0, 12 * 1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } ~~~ ## 查看服務器元數據(topic/partition/broker) ~~~ <?php $conf = new RdKafka\Conf(); $conf->setDrMsgCb(function ($kafka, $message) { file_put_contents("./xx.log", var_export($message, true), FILE_APPEND); }); $conf->setErrorCb(function ($kafka, $err, $reason) { printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason); }); $conf->set('group.id', 'myConsumerGroup'); $rk = new RdKafka\Consumer($conf); $rk->addBrokers("127.0.0.1"); $allInfo = $rk->metadata(true, NULL, 60e3); $topics = $allInfo->getTopics(); echo rd_kafka_offset_tail(100); echo "--"; echo count($topics); echo "--"; foreach ($topics as $topic) { $topicName = $topic->getTopic(); if ($topicName == "__consumer_offsets") { continue ; } $partitions = $topic->getPartitions(); foreach ($partitions as $partition) { // $rf = new ReflectionClass(get_class($partition)); // foreach ($rf->getMethods() as $f) { // var_dump($f); // } // die(); $topPartition = new RdKafka\TopicPartition($topicName, $partition->getId()); echo "當前的話題:" . ($topPartition->getTopic()) . " - " . $partition->getId() . " - "; echo "offset:" . ($topPartition->getOffset()) . PHP_EOL; } } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看