[TOC]
# 主題 (topics)
> using php-amqplib
在上一節教程中,我們改進了我們的日志記錄系統。我們使用可以選擇性接收信息的 direct 類型交換機,而不是使用只能進行虛擬廣播的 fanout 類型交換機。
雖然使用 direct 類型交換改進了我們的系統,但它仍有限制 - 它不能基于對多個標準進行路由選擇。
在我們的日志系統中,我們可能不僅要根據日志的嚴重性訂閱日志,還可以基于發出日志的源進行訂閱。你也許在 unix syslog 中了解了這個概念(不理解也沒關系,接著往下看,自然就明白了)。
這將給我們的系統帶來很大的靈活性 - 我們可能要監聽來自 cron的嚴重錯誤,也要監聽 kern 的所有日志。
為了在我們的系統中實現這樣的功能,我們需要學習一個更復雜的 topic exchange 。
# Topic exchange
發送到 topic exchange 的消息不能任意命名一個 routing key - 它必須是由一個.劃分單詞列表。這些單詞可以是任意的,但它們通常指定與消息相關聯的一些功能。這里有幾個有效的 routing key: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit" , routing key 中可以包含多個單詞,最多可以達到255個字節。
binding key也必須是相同的形式。topic exchange背后的邏輯類似于 direct exchange - 使用特定routing key1 發送的消息將被傳遞到與binding key綁定的所有隊列。但是,binding key 有兩個重要的特殊情況:
1. *(星號) 可以代替一個單詞
2. #(哈希) 可以匹配0個或多個單詞
在下面的例子中簡單解釋一下(類似于正則):

在本例中,我們將發送所有描述動物的消息。消息將使用由三個單詞(兩個點)組成的routing key發送。其中第一個單詞描述 速度,第二個描述顏色,第三個描述種類:”..”。
我們接著創建三個綁定:Q1 binding key “*.orange.*”, Q2 binding key “*.*.rabbit” “lazy.#”。
這三個綁定可以解釋為:
Q1 對所有橙色的動物感興趣。
Q2 想要獲取有關兔子的一切消息,以及所有惰性動物的一切。
一條 `routing key` 為 “quick.orange.rabbit” 的消息將傳遞上面的到兩個對列。`routing key `為 “lazy.orange.elephant” 的消息也將傳遞上面的到兩個對列。另外 “lazy.pink.rabbit” 消息將只會被傳遞到Q2一次, 即使它匹配了兩個` binding key`。”quick.brown.fox” 不匹配任何 `binding key`, 所以它將被丟棄。
如果我們不遵守以上的規則發送 routing key 為一個或者四個單詞的消息會發生什么? 比如,”orange” 或者 “quick.orange.male.rabbit”。那么我們將丟失這些消息,因為它們不匹配任何 binding key。
另一方面,”lazy.orange.male.rabbit” 即使它有四個單詞,但它能匹配最后一個綁定,并且將被傳遞到Q2中。
> 注意事項
> topic exchange 很強大,并且表現得和其他類型交換一樣
> 當隊列與 “#” 綁定時,它將接收所有消息而不管routing key的值,如同 fanout exchange
> 當不使用 “*” 和 “#” 特殊字符時,topic exchange 如同 direct exchange
# 整合代碼
我們將在日志記錄系統中使用主題交換。我們將假設 routing key中有兩個單詞開始工作,比如:”.”。
代碼幾乎和上一節一樣。
emit_log_topic.php:
~~~
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('topic_logs', 'topic', false, false, false);
$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data = implode(' ', array_slice($argv, 2));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'topic_logs', $routing_key);
echo " [x] Sent ",$routing_key,':',$data," \n";
$channel->close();
$connection->close();
~~~
receive_logs_topic.php:
~~~
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('topic_logs', 'topic', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$binding_keys = array_slice($argv, 1);
if( empty($binding_keys )) {
file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
exit(1);
}
foreach($binding_keys as $binding_key) {
$channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
$callback = function($msg){
echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
~~~
收聽所有日志:
~~~
php receive_logs_topic.php "#"
~~~
收聽所有來自 kern 的日志:
~~~
php receive_logs_topic.php "kern.*"
~~~
只收聽 “critical” 類型的日志:
~~~
php receive_logs_topic.php "*.critical"
~~~
你也可以創建多個綁定:
~~~
php receive_logs_topic.php "kern.*" "*.critical"
~~~
發出 routing key為 “kern.critical”類型的日志
~~~
php emit_log_topic.php "kern.critical" "A critical kernel error"
~~~
至此你可以盡情鼓弄這些程序。需要注意的是,這些代碼沒有對 binding key 和 routing key 賦默認值,除此之外你可能需要路由不止兩個單詞。
接下來,我們需要弄明白怎樣做一個往返信息的遠程程序:遠程調用
- OAuth
- 簡介
- 步驟
- 單點登錄
- .user.ini
- 時間轉換為今天昨天前天幾天前
- 獲取ip接口
- 協程
- 概念
- yield-from && return-values
- 協程與阻塞的思考
- 中間件
- mysqli異步與php的協程
- 代碼片段
- pdo 執行的sql語句
- 二進制安全
- 捕捉異常中斷
- global
- 利用cookie模擬登陸
- 解析非正常json
- 簡單的對稱加密算法
- RSA 加密
- 過濾掉emoji表情
- 判斷遠程圖片是否存在
- 一分鐘限制請求100次
- 文件處理
- 多文件上傳
- 顯示所有文件
- 文件下載和上面顯示所有文件配合
- 文件的刪除,統計,存數組等
- 圖片處理
- 簡介
- 驗證碼
- 圖片等比縮放
- 批量添加水印
- beanstalkd
- 安裝
- 使用
- RabbitMQ
- 簡介
- debain安裝
- centos安裝
- 常用方法
- 入門
- 工作隊列
- 訂閱,發布
- 路由
- 主題
- 遠程調用RPC
- 消息中間件的選型
- .htaccess
- isset、empty、if區別以及0、‘’、null
- php各版本
- php7.2 不向后兼容的改動
- php中的各種坑
- php7改變
- php慢日志
- 郵件
- PHPMailer實現發郵件
- 驗證郵件地址真實性
- 文件下載
- FastCgi 與 PHP-fpm 之間的關系
- openssl 加解密
- 反射
- 鉤子方法
- 查找插件
- opcode
- opcache使用
- opcache優化
- 分布式一致性hash算法
- 概念
- 哈希算法好壞的四個定義
- php實現
- java實現
- 數組
- jwt
- jwt簡介
- 單點登錄
- phpize
- GeoIP擴展
- php無法獲得https網頁內容的解決方案
- homestead運行的腳本
- Unicode和Utf-8轉換
- php優化
- kafka
- fpm配置
- configure配置詳解