[TOC]
# kafka角色
~~~
* producer:生產者。
* consumer:消費者。
* topic: 消息以topic為類別記錄,Kafka將消息種子(Feed)分類, 每一類的消息稱之為一個主題(Topic)。
* broker:以集群的方式運行,可以由一個或多個服務組成,每個服務叫做一個broker;消費者可以訂閱一個或多個主題(topic), 并從Broker拉數據,從而消費這些已發布的消息。
~~~
# 經典模型
1. 一個主題下的分區不能小于消費者數量,即一個主題下消費者數量不能大于分區屬,大了就浪費了空閑了
2. 一個主題下的一個分區可以同時被不同消費組其中某一個消費者消費
3. 一個主題下的一個分區只能被同一個消費組的一個消費者消費

# 常用參數說明
## request.required.acks
~~~
Kafka producer的ack有3中機制,初始化producer時的producerconfig可以通過配置request.required.acks不同的值來實現。
0:這意味著生產者producer不等待來自broker同步完成的確認繼續發送下一條(批)消息。此選項提供最低的延遲但最弱的耐久性保證(當服務器發生故障時某些數據會丟失,如leader已死,但producer并不知情,發出去的信息broker就收不到)。
1:這意味著producer在leader已成功收到的數據并得到確認后發送下一條message。此選項提供了更好的耐久性為客戶等待服務器確認請求成功(被寫入死亡leader但尚未復制將失去了唯一的消息)。
-1:這意味著producer在follower副本確認接收到數據后才算一次發送完成。
此選項提供最好的耐久性,我們保證沒有信息將丟失,只要至少一個同步副本保持存活。
三種機制,性能依次遞減 (producer吞吐量降低),數據健壯性則依次遞增。
~~~
## auto.offset.reset
1. earliest:自動將偏移重置為最早的偏移量
2. latest:自動將偏移量重置為最新的偏移量(默認)
3. none:如果consumer group沒有發現先前的偏移量,則向consumer拋出異常。
4. 其他的參數:向consumer拋出異常(無效參數)
# kafka安裝
安裝kafka
~~~
# 官方下載地址:http://kafka.apache.org/downloads
# wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.12-1.1.1.tgz
tar -xzf kafka_2.12-1.1.1.tgz
cd kafka_2.12-1.1.0
~~~
啟動kafka server
~~~
# 需先啟動zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
~~~
啟動kafka客戶端測試
~~~
# 創建一個話題,test話題2個分區
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test
Created topic "test".
# 顯示所有話題
bin/kafka-topics.sh --list --zookeeper localhost:2181
test
# 顯示話題信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:2 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0
# 啟動一個生產者(輸入消息)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[等待輸入自己的內容 出現>輸入即可]
>i am a new msg !
>i am a good msg ?
# 啟動一個生產者(等待消息)
# 注意這里的--from-beginning,每次都會從頭開始讀取,你可以嘗試去掉和不去掉看下效果
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
[等待消息]
i am a new msg !
i am a good msg ?
~~~
安裝kafka的php擴展
~~~
git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure
make all -j 5
sudo make install
vim [php]/php.ini
extension=rdkafka.so
~~~
mac
~~~
brew install librdkafka
pecl install rdkafka
~~~
# php代碼
## 生產者
~~~
<?php
$conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});
$rk = new RdKafka\Producer($conf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1");
$cf = new RdKafka\TopicConf();
$cf->set('request.required.acks', 0);
$topic = $rk->newTopic("test", $cf);
$option = 'qkl';
for ($i = 0; $i < 20; $i++) {
//RD_KAFKA_PARTITION_UA自動選擇分區
//$option可選
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);
}
$len = $rk->getOutQLen();
while ($len > 0) {
$len = $rk->getOutQLen();
var_dump($len);
$rk->poll(50);
}
~~~
## 消費者
~~~
<?php
$conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
file_put_contents("./c_dr_cb.log", var_export($message, true), FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});
//設置消費組
$conf->set('group.id', 'myConsumerGroup');
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");
$topicConf = new RdKafka\TopicConf();
$topicConf->set('request.required.acks', 1);
//在interval.ms的時間內自動提交確認、建議不要啟動
//$topicConf->set('auto.commit.enable', 1);
$topicConf->set('auto.commit.enable', 0);
$topicConf->set('auto.commit.interval.ms', 100);
// 設置offset的存儲為file
//$topicConf->set('offset.store.method', 'file');
// 設置offset的存儲為broker
$topicConf->set('offset.store.method', 'broker');
//$topicConf->set('offset.store.path', __DIR__);
//smallest:簡單理解為從頭開始消費,其實等價于上面的 earliest
//largest:簡單理解為從最新的開始消費,其實等價于上面的 latest
//$topicConf->set('auto.offset.reset', 'smallest');
$topic = $rk->newTopic("test", $topicConf);
// 參數1消費分區0
// RD_KAFKA_OFFSET_BEGINNING 重頭開始消費
// RD_KAFKA_OFFSET_STORED 最后一條消費的offset記錄開始消費
// RD_KAFKA_OFFSET_END 最后一條消費
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
//$topic->consumeStart(0, RD_KAFKA_OFFSET_END); //
//$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
//參數1表示消費分區,這里是分區0
//參數2表示同步阻塞多久
$message = $topic->consume(0, 12 * 1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
~~~
## 查看服務器元數據(topic/partition/broker)
~~~
<?php
$conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
file_put_contents("./xx.log", var_export($message, true), FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);
});
$conf->set('group.id', 'myConsumerGroup');
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");
$allInfo = $rk->metadata(true, NULL, 60e3);
$topics = $allInfo->getTopics();
echo rd_kafka_offset_tail(100);
echo "--";
echo count($topics);
echo "--";
foreach ($topics as $topic) {
$topicName = $topic->getTopic();
if ($topicName == "__consumer_offsets") {
continue ;
}
$partitions = $topic->getPartitions();
foreach ($partitions as $partition) {
// $rf = new ReflectionClass(get_class($partition));
// foreach ($rf->getMethods() as $f) {
// var_dump($f);
// }
// die();
$topPartition = new RdKafka\TopicPartition($topicName, $partition->getId());
echo "當前的話題:" . ($topPartition->getTopic()) . " - " . $partition->getId() . " - ";
echo "offset:" . ($topPartition->getOffset()) . PHP_EOL;
}
}
~~~
- OAuth
- 簡介
- 步驟
- 單點登錄
- .user.ini
- 時間轉換為今天昨天前天幾天前
- 獲取ip接口
- 協程
- 概念
- yield-from && return-values
- 協程與阻塞的思考
- 中間件
- mysqli異步與php的協程
- 代碼片段
- pdo 執行的sql語句
- 二進制安全
- 捕捉異常中斷
- global
- 利用cookie模擬登陸
- 解析非正常json
- 簡單的對稱加密算法
- RSA 加密
- 過濾掉emoji表情
- 判斷遠程圖片是否存在
- 一分鐘限制請求100次
- 文件處理
- 多文件上傳
- 顯示所有文件
- 文件下載和上面顯示所有文件配合
- 文件的刪除,統計,存數組等
- 圖片處理
- 簡介
- 驗證碼
- 圖片等比縮放
- 批量添加水印
- beanstalkd
- 安裝
- 使用
- RabbitMQ
- 簡介
- debain安裝
- centos安裝
- 常用方法
- 入門
- 工作隊列
- 訂閱,發布
- 路由
- 主題
- 遠程調用RPC
- 消息中間件的選型
- .htaccess
- isset、empty、if區別以及0、‘’、null
- php各版本
- php7.2 不向后兼容的改動
- php中的各種坑
- php7改變
- php慢日志
- 郵件
- PHPMailer實現發郵件
- 驗證郵件地址真實性
- 文件下載
- FastCgi 與 PHP-fpm 之間的關系
- openssl 加解密
- 反射
- 鉤子方法
- 查找插件
- opcode
- opcache使用
- opcache優化
- 分布式一致性hash算法
- 概念
- 哈希算法好壞的四個定義
- php實現
- java實現
- 數組
- jwt
- jwt簡介
- 單點登錄
- phpize
- GeoIP擴展
- php無法獲得https網頁內容的解決方案
- homestead運行的腳本
- Unicode和Utf-8轉換
- php優化
- kafka
- fpm配置
- configure配置詳解