[TOC]
# Publish/Subscribe
>using php-amqplib
在上一節中我們創建了一個工作隊列。在工作隊列背后的假設是每個任務都交付給一個工作進程處理。在這一節中,我們會做一些完全不同的事情–我們會向多個消費者傳遞相同的信息。這種模式被稱為“發布/訂閱”。
為了說明這種模式,我們將搭建一個簡單的日志記錄系統。它將包括兩段程序–第一個將產生日志消息,第二個將接收并且打印它們。
在我們的日志記錄系統中,接收者程序的每個運行副本都會收到消息。這樣我們就可以運行一個接收器并將日志指向磁盤,同時我們可以運行另一個接收器并將日志打印在屏幕上。
基本上,已發布的日志消息將被廣播所有接收者。
# Exchanges
在上一節中,我們通過消息隊列發送和接收消息。現在是時候在Rabbit中引入完整的消息傳遞模式了。
然我們快速回顧一下前面教程中介紹的內容:
* 生產者是發送消息的用戶應用程序
* 隊列是存儲消息的緩沖區
* 消費者是接收消息的用戶應用程序
RabbitMQ中消息傳遞模型的核心思想是,生產者從不將任何消息直接發送到隊列。實際上,生產者甚至通常不知道是否將消息傳遞到了隊列中。
相反,生產者自能將信息發送到信息交換中間件(exchange)。信息交換中間件只做一件非常簡單的事:一方面,它接收來自生產者的消息; 另一方面將接收到的消息推送到隊列。信息交換中間件準確知道接收到的消息如何處理。應該添加到特定的隊列? 應該添加到許多的隊列中?或者應該丟棄。其規則由交換類型定義

有幾種交換類型可用:direct, topic, headers, fanout. 我們將重點關注最后一個 - fanout. 讓我們創建一個種類型的交換,并且將其稱為 logs:
~~~
$channel->exchange_declare('logs', 'fanout', false, false, false);
~~~
fanout 交換非常簡單。正如你可以從名字猜測的一樣,它只是將所有的消息廣播到所有已知的隊列。這正是我們需要的日志記錄器。
# 列出所有 exchanges
你可以使用非常有用的 rabbitmqctl來獲取服務器上的 exchanges:
~~~
sudo rabbitmqctl list_exchanges
~~~
這個交換列表中會有一些 amq.*的交換和默認(未命名)交換。這些事默認創建的,但是不太可能需要使用它們。
# 默認交換
在本教程的前面部分,我們對交換沒有任何了解,但是任然能夠將消息發送到隊列。這是可能的,因為我們使用的是默認交換,我們通過空字符串”“標識。
回顧一下我們之前發布的消息:
~~~
$channel->basic_publish($msg, '', 'hello');
~~~
在這里,我們使用默認或者無名的交換:消息路由到具有 route_key 指定的名稱的隊列(如果存在)。路由秘鑰是 basic_public 的第三個參數。
現在,我們可以將消息發布到我們命名的交換機:
~~~
$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');
~~~
# 臨時隊列
你還記得我們之前使用的是具有指定名稱的隊列(hello, task_queue)。能夠給隊列命名對我們而言是非常重要的,因為我們需要將消費者指向同一個隊列。當你想要在生產者和消費者之間共享隊列時,給隊列一個名字是很重要的。
但是對我們現在的日志記錄器而言卻不是這樣。我們希望收到所有的日志消息,而不僅僅是它的一部分。我們也只對當前的日志消息感興趣,而對之前的老的日志消息不感興趣。要解決這些問題,我們需要哦解決兩件事情。
首先,每當我們連接到RabbitMQ,我們都需要一個新的,空的隊列。為止,我們可以創建一個具有隨機名稱的隊列,或者更好–讓我們的服務器生成一個隨機名稱隊列。
其次,一旦我們斷開消費者,隊列應該被自動刪除。
在 php-amqplib 客戶端。當我們提供的隊列名稱為空字符串時,我們會創建一個隨機名稱的非持久化的隊列:
~~~
list($queue_name, ,) = $channel->queue_declare("");
~~~
當該方法返回時,$queue_name 變量包含由RabbitMQ生成的隨機隊列名稱。例如,它可能看起來像這樣 amq.gen-JzTY20BRgKO-HjmUJj0wLg
當聲明它的連接關閉時,隊列將自動刪除,因為它被聲明為獨占類型。
# 綁定

我們已經創建了一個fanout交換和一個隊列。現在我們需要告訴交換機發送消息到我們的隊列。交換機和隊列之間的關系稱為綁定。
~~~
$channel->queue_bind($queue_name, 'logs');
~~~
從現在開始,日志交換器將發送消息到我們的隊列。
# 列出綁定項
你可以列出你想要的現有的綁定。
~~~
rabbitmqctl list_bindings
~~~
# 總結

發出日志消息的生產者程序與上一節教程中的生產者并沒有太大的區別。最重要的變化是,我們現在想把消息發布到我們的logs交換機, 而不是不指定交換機的名稱。這里是emir_log.php的代碼:
~~~
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('logs', 'fanout', false, false, false);
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "info: Hello World!";
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'logs');
echo " [x] Sent ", $data, "\n";
$channel->close();
$connection->close();
~~~
如你所見,建立連接后,我們聲明了交換機。此步驟是必須的,因為RabbitMQ禁止發布消息到不存在的交換機。
如果沒有任何隊列綁定到交換機,消息將丟失,但是這對我們來說是沒關系的;如果沒有消費者正在監聽,我們可以放心的放棄消息。
receive.logs.php:
~~~
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('logs', 'fanout', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queue_name, 'logs');
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
$callback = function($msg){
echo ' [x] ', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
~~~
如果想要將日志保存到文件,只需要打開控制臺并且鍵入:
~~~
php receive_logs.php > logs_from_rabbit.log
~~~
如果你希望在屏幕上看到日志,則再打開一個控制臺并運行:
~~~
php receive_logs.php
~~~
當然,需要生產日志:
~~~
php emit_log.php
~~~
使用 rabbitmqctl list_bindings 可以驗證實際上根據需要創建的綁定和隊列。現在運行了兩個receive_log.php 程序,你應該會看到類似下面的的內容:
~~~
sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# => ...done.
~~~
對結果的解釋很簡單:來自交換機日志數據轉到具有服務器分配名稱的兩個隊列。這正是我們想要的。
要了解怎樣訂閱一個子集,請移步下一節:路由
- OAuth
- 簡介
- 步驟
- 單點登錄
- .user.ini
- 時間轉換為今天昨天前天幾天前
- 獲取ip接口
- 協程
- 概念
- yield-from && return-values
- 協程與阻塞的思考
- 中間件
- mysqli異步與php的協程
- 代碼片段
- pdo 執行的sql語句
- 二進制安全
- 捕捉異常中斷
- global
- 利用cookie模擬登陸
- 解析非正常json
- 簡單的對稱加密算法
- RSA 加密
- 過濾掉emoji表情
- 判斷遠程圖片是否存在
- 一分鐘限制請求100次
- 文件處理
- 多文件上傳
- 顯示所有文件
- 文件下載和上面顯示所有文件配合
- 文件的刪除,統計,存數組等
- 圖片處理
- 簡介
- 驗證碼
- 圖片等比縮放
- 批量添加水印
- beanstalkd
- 安裝
- 使用
- RabbitMQ
- 簡介
- debain安裝
- centos安裝
- 常用方法
- 入門
- 工作隊列
- 訂閱,發布
- 路由
- 主題
- 遠程調用RPC
- 消息中間件的選型
- .htaccess
- isset、empty、if區別以及0、‘’、null
- php各版本
- php7.2 不向后兼容的改動
- php中的各種坑
- php7改變
- php慢日志
- 郵件
- PHPMailer實現發郵件
- 驗證郵件地址真實性
- 文件下載
- FastCgi 與 PHP-fpm 之間的關系
- openssl 加解密
- 反射
- 鉤子方法
- 查找插件
- opcode
- opcache使用
- opcache優化
- 分布式一致性hash算法
- 概念
- 哈希算法好壞的四個定義
- php實現
- java實現
- 數組
- jwt
- jwt簡介
- 單點登錄
- phpize
- GeoIP擴展
- php無法獲得https網頁內容的解決方案
- homestead運行的腳本
- Unicode和Utf-8轉換
- php優化
- kafka
- fpm配置
- configure配置詳解