>[info] 感謝Google翻譯
[TOC]
# "Hello World!"
## 簡介
RabbitMQ是一個消息代理:它接受和轉發消息。您可以將其視為郵局:當您將要發布的郵件放在郵箱中時,您可以確定郵件先生或Mailperson女士最終會將郵件發送給您的收件人。在這個比喻中,RabbitMQ是郵箱,郵局和郵遞員。
RabbitMQ和郵局之間的主要區別在于它不處理紙張,而是接受,存儲和轉發二進制blob數據 - 消息。
RabbitMQ和一般的消息傳遞使用了一些術語。
制作只不過是發送。發送消息的程序是生產者:

隊列是RabbitMQ中的郵箱的名稱。雖然消息流經RabbitMQ和您的應用程序,但它們只能存儲在隊列中。隊列僅受主機的內存和磁盤限制的約束,它本質上是一個大的消息緩沖區。許多生產者可以發送到一個隊列的消息,并且許多消費者可以嘗試從一個隊列接收數據。這就是我們代表隊列的方式:

消費與接受有類似的意義。消費者是一個主要等待接收消息的程序:

請注意,生產者,消費者和代理不必駐留在同一主機上;實際上在大多數應用中他們沒有。
## "Hello World"
### (使用 the php-amqplib 客戶端)
在本教程的這一部分中,我們將用PHP編寫兩個程序;發送單個消息的生產者,以及接收消息并將其打印出來的消費者。我們將掩蓋[php-amqplib](https://github.com/php-amqplib/php-amqplib) API中的一些細節,專注于這個非常簡單的事情才開始。它是消息傳遞的“Hello World”。
在下圖中,“P”是我們的生產者,“C”是我們的消費者。中間的框是一個隊列 - RabbitMQ代表消費者保留的消息緩沖區。

>[danger]php-amqplib客戶端庫
RabbitMQ說多種協議。本教程介紹AMQP 0-9-1,它是一種開放的,通用的消息傳遞協議。RabbitMQ有許多不同語言的客戶端。我們將在本教程中使用php-amqplib,并使用Composer進行依賴關系管理。
將composer.json文件添加到項目中:
```json
{
"require": {
"php-amqplib/php-amqplib": ">=2.6.1"
}
}
```
如果已安裝Composer并且功能正常,則可以運行以下命令:
```shell
composer.phar install
```
現在我們安裝了php-amqplib庫,我們可以編寫一些代碼。
### 發送

我們將調用我們的消息發布者(發送者)`send.php` 和我們的消息接收者 `receive.php` 。發布者將連接到RabbitMQ,發送單個消息,然后退出。
在 `send.php` 中,我們需要包含庫并使用必要的類:
```php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
```
然后我們可以創建到服務器的連接:
```php
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
```
連接抽象套接字連接,并為我們負責協議版本協商和身份驗證等。這里我們連接到本地機器上的代理 - 因此是本地主機。如果我們想要連接到不同機器上的代理,我們只需在此處指定其名稱或IP地址。
接下來,我們創建一個頻道,這是完成任務的大部分API所在的位置。
要發送,我們必須聲明一個隊列供我們發送;然后我們可以向隊列發布消息:
```php
$channel->queue_declare('hello', false, false, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";
```
聲明隊列是冪等的 - 只有在它不存在的情況下才會創建它。消息內容是一個字節數組,因此您可以編碼任何您喜歡的內容。
最后,我們關閉了頻道和連接;
```php
$channel->close();
$connection->close();
```
>[danger] 發送不起作用!
如果這是您第一次使用RabbitMQ并且沒有看到“已發送”消息,那么您可能會感到頭疼,想知道可能出現的問題。也許代理是在沒有足夠的可用磁盤空間的情況下啟動的(默認情況下它至少需要200 MB空閑),因此拒絕接受消息。檢查代理日志文件以確認并在必要時減少限制。配置文件文檔將向您展示如何設置`disk_free_limit`。
### 接收
那是我們的發布者。我們的接收器是來自RabbitMQ的推送消息,因此與發布單個消息的發布者不同,我們將保持其運行以偵聽消息并將其打印出來。

代碼(在`receive.php`中)具有幾乎相同的include和用作send:
```php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
```
設置與發布者相同;我們打開一個連接和一個通道,并聲明我們將要消耗的隊列。請注意,這與發送到的隊列匹配。
```php
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
```
請注意,我們也在這里聲明隊列。因為我們可能會在發布者之前啟動消費者,所以我們希望在嘗試使用消息之前確保隊列存在。
我們即將告訴服務器從隊列中傳遞消息。我們將定義一個PHP回調函數,它將接收服務器發送的消息。請記住,消息是從服務器異步發送到客戶端的。
```php
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
```
當`$channel`有回調,我們的代碼將阻止。每當我們收到消息時,我們的`$callback`函數將傳遞收到的消息。
### 歸納
現在我們可以運行這兩個腳本。在終端中,運行消費者(接收者):
```php
php receive.php
```
然后,運行發布者(發件人):
```php
php send.php
```
消費者將通過RabbitMQ打印從發件人處獲得的消息。接收器將繼續運行,等待消息(使用Ctrl-C停止它),因此嘗試從另一個終端運行發送器。
>[danger]列出隊列
您可能希望看到RabbitMQ有哪些隊列以及它們中有多少消息。您可以使用`rabbitmqctl`工具(作為特權用戶)執行此操作:
```shell
sudo rabbitmqctl list_queues
```
在Windows上,省略sudo:
```shell
rabbitmqctl.bat list_queues
```
# 工作隊列
## 工作隊列

在第一篇教程中,我們編寫了程序來發送和接收來自命名隊列的消息。在這個中,我們將創建一個工作隊列,用于在多個工作程序分配耗時的任務。
工作隊列(又稱:任務隊列)背后的主要思想是避免立即執行資源密集型任務,并且必須等待它完成。相反,我們安排任務稍后完成。我們將任務封裝為消息并將其發送到隊列。在后臺運行的工作進程將彈出任務并最終執行作業。當您運行許多工作程序時,它們之間將共享任務。
這個概念在Web應用程序中特別有用,因為在短的HTTP請求窗口中無法處理復雜的任務。
## 準備工作
在本教程的前一部分中,我們發送了一條包含“Hello World!”的消息。現在我們將發送代表復雜任務的字符串。我們沒有現實世界的任務,比如要調整大小的圖像或要渲染的pdf文件,所以讓我們通過假裝我們很忙來偽造它 - 使用`sleep()`函數。我們將字符串中的點數作為其復雜性;每個點都會占據“工作”的一秒鐘。例如,Hello ...描述的假任務將花費三秒鐘。
我們將稍微修改前一個示例中的send.php代碼,以允許從命令行發送任意消息。該程序將任務安排到我們的工作隊列,所以我們將其命名為`new_task.php`:
```php
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
$data = "Hello World!";
}
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, '', 'hello');
echo ' [x] Sent ', $data, "\n";
```
我們舊的`receive.php`腳本還需要進行一些更改:它需要為消息體中的每個點偽造一秒鐘的工作。它將從隊列中彈出消息并執行任務,所以我們稱之為`worker.php`:
```php
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
```
請注意,我們的假任務模擬執行時間。
像在教程一中那樣運行它們:
```shell
# shell 1
php worker.php
```
```php
# shell 2
php new_task.php "A very hard task which takes two seconds.."
```
## 循環調度
使用任務隊列的一個優點是能夠輕松地并行工作。如果我們正在積壓工作,我們可以添加更多進程,這樣就可以輕松擴展。
首先,讓我們嘗試同時運行兩個`worker.php`腳本。他們都會從隊列中獲取消息,但究竟如何呢?讓我們來看看。
你需要打開三個控制臺。兩個將運行`worker.php`腳本。這些控制臺將成為我們的兩個消費者 - C1和C2。
```shell
# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
```
```shell
# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
```
在第三個中,我們將發布新任務。啟動消費者后,您可以發布一些消息:
```shell
# shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....
```
讓我們看看交給我們進程的是什么:
```shell
# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
```
```shell
# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
```
默認情況下,RabbitMQ將按順序將每條消息發送給下一個消費者。平均而言,每個消費者將獲得相同數量的消息。這種分發消息的方式稱為循環法。與三個或更多進程一起嘗試。
## 消息確認
執行任務可能需要幾秒鐘。您可能想知道如果其中一個消費者開始執行長任務并且僅在部分完成時死亡會發生什么。使用我們當前的代碼,一旦RabbitMQ向客戶發送消息,它立即將其標記為刪除。在這種情況下,如果你殺死一個進程,我們將丟失它剛剛處理的消息。我們還將丟失分發給這個特定進程但尚未處理的所有消息。
但我們不想失去任何任務。如果進程死亡,我們希望將任務交付給另一個進程。
為了確保消息永不丟失,RabbitMQ支持消息確認。消費者發回ack(nowledgement)告訴RabbitMQ已收到,處理了特定消息,RabbitMQ可以自由刪除它。
如果消費者死亡(其通道關閉,連接關閉或TCP連接丟失)而不發送確認,RabbitMQ將理解消息未完全處理并將重入隊列。如果其他消費者同時在線,則會迅速將其重新發送給其他消費者。這樣你就可以確保沒有消息丟失,即使進程偶爾會死亡。
沒有任何消息超時;當消費者死亡時,RabbitMQ將重新發送消息。即使處理消息需要非常長的時間,也沒關系。
默認情況下,消息確認已關閉。是時候通過basic_consume將第四個參數設置為false來打開它們(true表示沒有確認),并在完成任務后從進程發送適當的確認。
```php
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
```
使用此代碼,我們可以確定即使您在處理消息時使用CTRL + C殺死一個進程,也不會丟失任何內容。進程死后不久,所有未經確認的消息將被重新傳遞。
確認必須在收到的交付的同一信道上發送。嘗試使用不同的通道進行確認將導致通道級協議異常。
>[danger]被遺忘的確認
錯過ack是一個常見的錯誤。這是一個簡單的錯誤,但后果是嚴重的。當您的客戶端退出時,消息將被重新傳遞(這可能看起來像隨機重新傳遞),但RabbitMQ將會占用越來越多的內存,因為它無法釋放任何未經處理的消息。
為了調試這種錯誤,您可以使用`rabbitmqctl`來打印`messages_unacknowledged`字段:
```shell
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
```
在Windows上,刪除sudo:
```powershell
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
```
## 消息持久性
我們已經學會了如何確保即使消費者死亡,任務也不會丟失。但是如果RabbitMQ服務器停止,我們的任務仍然會丟失。
當RabbitMQ退出或崩潰時,它將忘記隊列和消息,除非你告訴它不要。確保消息不會丟失需要做兩件事:我們需要將隊列和消息都標記為持久。
首先,我們需要確保RabbitMQ永遠不會丟失我們的隊列。為此,我們需要聲明它是持久的。為此,我們將第三個參數傳遞給`queue_declare`為`true`:
```php
$channel->queue_declare('hello', false, true, false, false);
```
雖然此命令本身是正確的,但它在我們當前的設置中不起作用。那是因為我們已經定義了一個名為hello的隊列,這個隊列不耐用。RabbitMQ不允許您使用不同的參數重新定義現有隊列,并將向嘗試執行此操作的任何程序返回錯誤。但是有一個快速的解決方法 - 讓我們聲明一個具有不同名稱的隊列,例如`task_queue`:
```php
$channel->queue_declare('task_queue', false, true, false, false);
```
此標志設置為`true`需要應用于生產者和消費者代碼。
此時我們確信即使RabbitMQ重新啟動,`task_queue`隊列也不會丟失。現在我們需要將消息標記為持久性 - 通過設置`delivery_mode = 2`消息屬性,AMQPMessage將其作為屬性數組的一部分。
```php
$msg = new AMQPMessage(
$data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
```
>[danger]有關消息持久性的注釋
將消息標記為持久性并不能完全保證消息不會丟失。雖然它告訴RabbitMQ將消息保存到磁盤,但是當RabbitMQ接受消息并且尚未保存消息時,仍然有一個短時間窗口。此外,RabbitMQ不會為每條消息執行fsync(2) - 它可能只是保存到緩存而不是真正寫入磁盤。持久性保證不強,但對于我們簡單的任務隊列來說已經足夠了。如果您需要更強的保證,那么您可以使用發布者確認。
## 公平派遣
您可能已經注意到調度仍然無法完全按照我們的意愿運行。例如,在有兩個工作進程的情況下,當所有奇數消息都很重或者消息很輕量時,一個進程將經常忙碌而另一個工作進程幾乎不會做任何工作。好吧,RabbitMQ對此一無所知,仍然會均勻地發送消息。
發生這種情況是因為RabbitMQ只是在消息進入隊列時調度消息。它不會查看消費者未確認消息的數量。它只是盲目地向第n個消費者發送每個第n個消息。

為了解決弊端我們可以使用`basic_qos`方法和`prefetch_count = 1`設置。這告訴RabbitMQ不要一次向一個worker發送一條消息。或者,換句話說,在處理并確認前一個消息之前,不要向工作人員發送新消息。相反,它會將它發送給下一個仍然不忙的進程。
```php
$channel->basic_qos(null, 1, null);
```
>[danger]關于隊列大小的說明
如果所有woker都很忙,您的隊列就會填滿。您將需要密切關注這一點,并可能添加更多woker,或者采取其他策略。
## 歸納
`new_task.php`文件的最終代碼:
```php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
$data = "Hello World!";
}
$msg = new AMQPMessage(
$data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$channel->basic_publish($msg, '', 'task_queue');
echo ' [x] Sent ', $data, "\n";
$channel->close();
$connection->close();
```
`worker.php`:
```php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
```
您設置工作隊列可以使用消息確認和預取。即使RabbitMQ重新啟動,持久性選項也可以使任務生效。
# 發布/訂閱
## 發布/訂閱
在上一個教程中,我們創建了一個工作隊列。工作隊列背后的假設是每個任務都交付給一個進程。在這一部分,我們將做一些完全不同的事情 - 我們將向多個消費者傳遞信息。此模式稱為“發布/訂閱”。
為了說明這種模式,我們將構建一個簡單的日志記錄系統。它將包含兩個程序 - 第一個將發出日志消息,第二個將接收和打印它們。
在我們的日志記錄系統中,接收程序的每個運行副本都將獲取消息。這樣我們就可以運行一個接收器并將日志定向到磁盤;同時我們將能夠運行另一個接收器并在屏幕上看到日志。
基本上,發布的日志消息將被廣播給所有接收者。
## 交換機
在本教程的前幾部分中,我們向隊列發送消息和從隊列接收消息。現在是時候在Rabbit中引入完整的消息傳遞模型了。
讓我們快速瀏覽前面教程中介紹的內容:
- 生產者是發送消息的用戶應用程序。
- 隊列是存儲消息的緩沖區。
- 消費者是接收消息的用戶應用程序。
RabbitMQ中消息傳遞模型的核心思想是生產者永遠不會將任何消息直接發送到隊列。實際上,生產者通常甚至不知道消息是否會被傳遞到任何隊列。
相反,生產者只能向交換機發送消息。交換機是一件非常簡單的事情。一方面,它接收來自生產者的消息,另一方面將它們推送到隊列。交換機必須確切知道如何處理收到的消息。它應該附加到特定隊列嗎?它應該附加到許多隊列嗎?或者它應該被丟棄。其規則由交換機類型定義。

有幾種交換類型可供選擇:直連,主題,headers和扇形。我們將專注于最后一個 - 扇形。讓我們創建一個這種類型的交換機,并將其稱為日志:
```php
$channel->exchange_declare('logs', 'fanout', false, false, false);
```
扇形交換機非常簡單。正如您可能從名稱中猜到的那樣,它只是將收到的所有消息廣播到它知道的所有隊列中。而這正是我們記錄器所需要的。
>[danger]列出交換機
要列出服務器上的交換,您可以運行有用的rabbitmqctl:
```shell
sudo rabbitmqctl list_exchanges
```
在此列表中將有一些amq。*交換和默認(未命名)交換。這些是默認創建的,但目前您不太可能需要使用它們。
>[danger]默認交換機
在本教程的前幾部分中,我們對交換機一無所知,但仍能夠向隊列發送消息。這是可能的,因為我們使用的是默認交換機,我們通過空字符串(“”)來識別。
回想一下我們之前是如何發布消息的:
```php
$channel->basic_publish($msg, '', 'hello');
```
這里我們使用默認或匿名交換機:消息被路由到具有routing_key指定的名稱的隊列(如果存在)。路由鍵是basic_publish的第三個參數。
現在,我們可以發布到我們的命名交換機:
```php
$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');
```
## 臨時隊列
您可能還記得以前我們使用的是具有指定名稱的隊列(請記住`hello`和`task_queue`)。能夠命名隊列對我們來說至關重要 - 我們需要將worker指向同一個隊列。當您想要在生產者和消費者之間共享隊列時,為隊列命名很重要。
但我們的記錄器并非如此。我們希望了解所有日志消息,而不僅僅是它們的一部分。我們也只對目前流動的消息感興趣,而不是舊消息。要解決這個問題,我們需要兩件事。
首先,每當我們連接到Rabbit時,我們都需要一個新的空隊列。為此,我們可以使用隨機名稱創建隊列,或者更好 - 讓服務器為我們選擇隨機隊列名稱。
其次,一旦我們斷開消費者,就應該自動刪除隊列。
在php-amqplib客戶端中,當我們將隊列名稱作為空字符串提供時,我們使用生成的名稱創建一個非持久隊列:
```php
list($queue_name, ,) = $channel->queue_declare("");
```
方法返回時,`$queue_name`變量包含RabbitMQ生成的隨機隊列名稱。例如,它可能看起來像`amq.gen-JzTY20BRgKO-HjmUJj0wLg`。
當聲明它的連接關閉時,隊列將被刪除,因為它被聲明為獨占。您可以在隊列指南中了解有關獨占標志和其他隊列屬性的更多信息。
## 綁定

我們已經創建了一個扇形交換機和一個隊列。現在我們需要告訴交換機將消息發送到我們的隊列。交換機和隊列之間的關系稱為綁定。
```php
$channel->queue_bind($queue_name, 'logs');
```
從現在開始,日志交換機會將消息附加到我們的隊列中。
>[danger]列出綁定
您可以使用,您猜對了,列出現有綁定
```shell
rabbitmqctl list_bindings
```
## 歸納

生成日志消息的生產者程序與前一個教程沒有太大的不同。最重要的變化是我們現在想要將消息發布到我們的日志交換機而不是匿名交換機。這里是`emit_log.php`腳本的代碼:
```php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('logs', 'fanout', false, false, false);
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
$data = "info: Hello World!";
}
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'logs');
echo ' [x] Sent ', $data, "\n";
$channel->close();
$connection->close();
```
如您所見,在建立連接后我們聲明了交換機。此步驟是必要的,因為禁止發布到不存在的交換機。
如果沒有隊列綁定到交換機,消息將會丟失,但這對我們沒有問題;如果沒有消費者在聽,我們可以安全地丟棄該消息。
`receive_logs.php`的代碼:
```php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('logs', 'fanout', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queue_name, 'logs');
echo " [*] Waiting for logs. To exit press CTRL+C\n";
$callback = function ($msg) {
echo ' [x] ', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
```
如果要將日志保存到文件,只需打開控制臺并鍵入:
```shell
php receive_logs.php > logs_from_rabbit.log
```
如果您希望在屏幕上看到日志,請生成一個新終端并運行:
```shell
php receive_logs.php
```
當然,要發出日志類型:
```shell
php emit_log.php
```
使用`rabbitmqctl list_bindings`,您可以驗證代碼是否實際創建了我們想要的綁定和隊列。運行兩個`receive_logs.php`程序時,您應該看到類似的內容:
```shell
sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# => ...done.
```
結果的解釋很簡單:來自交換日志的數據轉到兩個具有服務器分配名稱的隊列。而這正是我們的意圖。
# 路由
## 路由
在上一個教程中,我們構建了一個簡單的日志系統我們能夠向許多接收者廣播日志消息。
在本教程中,我們將為其添加一個功能 - 我們將只能訂閱一部分消息。例如,我們只能將關鍵錯誤消息定向到日志文件(以節省磁盤空間),同時仍然能夠在控制臺上打印所有日志消息。
## 綁定
在前面的例子中,我們已經在創建綁定。您可能會記得以下代碼:
```shell
$channel->queue_bind($queue_name, 'logs');
```
綁定是交換機和隊列之間的關系。這可以簡單地理解為:隊列對來自此交換機的消息感興趣。
綁定可以采用額外的`routing_key`參數。為了避免與`$channel :: basic_publish`參數混淆,我們將其稱為綁定密鑰。這就是我們如何使用鍵創建綁定:
```php
$binding_key = 'black';
$channel->queue_bind($queue_name, $exchange_name, $binding_key);
```
綁定密鑰的含義取決于交換類型。我們之前使用的扇形交換只是忽略了它的價值。
## 直連交換機
我們上一個教程中的日志記錄系統向所有消費者廣播所有消息。我們希望擴展它以允許根據消息的嚴重性過濾消息。例如,我們可能希望將日志消息寫入磁盤的腳本僅接收嚴重錯誤,而不是在警告或信息日志消息上浪費磁盤空間。
我們使用的是扇形交換機,它沒有給我們太大的靈活性 - 它只能進行無意識的廣播。
我們將使用直連交換機。直連交換機背后的路由算法很簡單 - 消息進入隊列,其綁定密鑰與消息的路由密鑰完全匹配。
為了說明這一點,請考慮以下設置:

在此設置中,我們可以看到直接交換機X與兩個綁定到它的隊列。第一個隊列綁定`orange`,第二個綁定有兩個綁定,一個綁定密鑰為`black`,另一個綁定為`green`。
在這樣的設置中,使用路由密鑰`orange`發布到交換機的消息將被路由到隊列Q1。路由鍵為`balck`或`green`的消息將轉到Q2。所有其他消息將被丟棄。
## 多個綁定

使用相同的綁定密鑰綁定多個隊列是完全合法的。在我們的例子中,我們可以在X和Q1之間添加綁定鍵黑色的綁定。在這種情況下,直連交換機將表現得像扇形交換機一樣,并將消息廣播到所有匹配的隊列。路由密鑰為黑色的消息將傳送到Q1和Q2。
## 發送日志
我們將此模型用于我們的日志系統。我們會將消息發送給直連交換機,而不是扇形。我們將提供日志嚴重性作為路由密鑰。這樣接收腳本將能夠選擇它想要接收的嚴重性。讓我們首先關注發送日志。
一如既往,我們需要先創建一個交換:
```php
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
```
我們已準備好發送消息:
```php
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$channel->basic_publish($msg, 'direct_logs', $severity);
```
為簡化起見,我們假設“嚴重性”可以是“info”,“warning”,“error”之一。
## 訂閱
接收消息將像上一個教程一樣工作,但有一個例外 - 我們將為我們感興趣的每個嚴重性創建一個新的綁定。
```php
foreach ($severities as $severity) {
$channel->queue_bind($queue_name, 'direct_logs', $severity);
}
```
## 歸納

`emit_log_direct.php`類的代碼:
````php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
$data = "Hello World!";
}
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'direct_logs', $severity);
echo ' [x] Sent ', $severity, ':', $data, "\n";
$channel->close();
$connection->close();
````
`receive_logs_direct.php`的代碼:
```php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$severities = array_slice($argv, 1);
if (empty($severities)) {
file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
exit(1);
}
foreach ($severities as $severity) {
$channel->queue_bind($queue_name, 'direct_logs', $severity);
}
echo " [*] Waiting for logs. To exit press CTRL+C\n";
$callback = function ($msg) {
echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
```
如果您只想將“warning”和“error”(而非“info”)日志消息保存到文件,只需打開控制臺并鍵入:
```shell
php receive_logs_direct.php warning error > logs_from_rabbit.log
```
如果您想在屏幕上看到所有日志消息,請打開一個新終端并執行以下操作:
```shell
php receive_logs_direct.php info warning error
# => [*] Waiting for logs. To exit press CTRL+C
```
并且,例如,要發出錯誤日志消息,只需鍵入:
```php
php emit_log_direct.php error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'
```
# 主題
## 主題
在上一個教程中,我們改進了日志系統。我們使用的是直連的,而不是使用只能進行虛假廣播的扇形交換機,并且有可能選擇性地接收日志。
雖然使用直連交換機改進了我們的系統,但它仍然有局限性 - 它不能基于多個標準進行路由。
在我們的日志系統中,我們可能不僅要根據嚴重性訂閱日志,還要根據發出日志的源來訂閱日志。您可能從syslog unix工具中了解這個概念,該工具根據嚴重性(info / warn / crit ...)和facility(auth / cron / kern ...)來路由日志。
這會給我們帶來很大的靈活性 - 我們可能想聽聽來自'cron'的關鍵錯誤以及來自'kern'的所有日志。
要在我們的日志系統中實現這一點,我們需要了解更復雜的主題交換機。
## 主題交換機
發送到主題交換機的消息不能具有任意routing_key - 它必須是由`點`分隔的單詞列表。單詞可以是任何內容,但通常它們指定與消息相關的一些功能。一些有效的路由密鑰示例:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。路由密鑰中可以包含任意數量的單詞,最多可達255個字節。
綁定密鑰也必須采用相同的形式。主題交換機背后的邏輯類似于直連交換 - 使用特定路由密鑰發送的消息將被傳遞到與匹配綁定密鑰綁定的所有隊列。但是,綁定鍵有兩個重要的特殊情況:
- *(星號)可以替代一個單詞。
- #(hash)可以替換零個或多個單詞。
最簡單的示例如下:

在這個例子中,我們將發送所有描述動物的消息。消息將與包含三個單詞(兩個點)的路由鍵一起發送。路由鍵中的第一個單詞將描述速度,第二個是顏色,第三個是物種:“<speed>.<color>.<species>”。
這些綁定可以概括為:
Q1對所有橙色動物感興趣。
Q2希望聽到關于兔子的一切,以及關于懶惰動物的一切。
路由密鑰設置為“quick.orange.rabbit”的消息將傳遞到兩個隊列。消息“lazy.orange.elephant”也將同時發送給他們。另一方面,“quick.orange.fox”只會進入第一個隊列,而“lazy.brown.fox”只會進入第二個隊列。“lazy.pink.rabbit”將僅傳遞到第二個隊列一次,即使它匹配兩個綁定。“quick.brown.fox”與任何綁定都不匹配,因此它將被丟棄。
如果我們違反約定并發送帶有一個或四個單詞的消息,例如“orange”或“quick.orange.male.rabbit”,會發生什么?好吧,這些消息將不匹配任何綁定,將丟失。
另一方面,“lazy.orange.male.rabbit”,即使它有四個單詞,也會匹配最后一個綁定,并將被傳遞到第二個隊列。
>[danger]主題交換機
主題交換機功能強大,可以像其他交換機一樣。
當隊列與“#”(哈希)綁定密鑰綁定時 - 它將接收所有消息,而不管路由密鑰 - 如扇形交換機。
當特殊字符“*”(星號)和“#”(哈希)未在綁定中使用時,主題交換的行為就像直連交換機一樣。
## 歸納
我們將在日志記錄系統中使用主題交換。我們將首先假設日志的路由鍵有兩個詞:“<facility>.<severity>”。
代碼與上一個教程中的代碼幾乎相同。
`emit_log_topic.php`的代碼:
```php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('topic_logs', 'topic', false, false, false);
$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
$data = "Hello World!";
}
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'topic_logs', $routing_key);
echo ' [x] Sent ', $routing_key, ':', $data, "\n";
$channel->close();
$connection->close();
```
`receive_logs_topic.php`的代碼:
```php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('topic_logs', 'topic', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$binding_keys = array_slice($argv, 1);
if (empty($binding_keys)) {
file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
exit(1);
}
foreach ($binding_keys as $binding_key) {
$channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}
echo " [*] Waiting for logs. To exit press CTRL+C\n";
$callback = function ($msg) {
echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
```
要接收所有日志:
```shell
php receive_logs_topic.php "#"
```
要從設施“kern”接收所有日志:
```php
php receive_logs_topic.php "kern.*"
```
或者,如果您只想聽到“critical”日志:
```shell
php receive_logs_topic.php "*.critical"
```
您可以創建多個綁定:
```shell
php receive_logs_topic.php "kern.*" "*.critical"
```
并使用路由鍵“kern.critical”類型發出日志:
```shell
php emit_log_topic.php "kern.critical" "A critical kernel error"
```
玩這些程序玩得開心。請注意,代碼不會對路由或綁定密鑰做出任何假設,您可能希望使用兩個以上的路由密鑰參數。
# RPC
## 遠程過程調用 (RPC)
在第二篇教程中,我們學習了如何使用工作隊列在多個worker之間分配耗時的任務。
但是如果我們需要在遠程計算機上運行一個函數并等待結果呢?嗯,這是一個不同的故事。此模式通常稱為遠程過程調用或RPC。
在本教程中,我們將使用RabbitMQ構建RPC系統:客戶端和可伸縮的RPC服務器。由于我們沒有任何值得分發的耗時任務,我們將創建一個返回Fibonacci數字的虛擬RPC服務。
### 客戶端界面
為了說明如何使用RPC服務,我們將創建一個簡單的客戶端類。它將公開一個名為call的方法,該方法發送一個RPC請求并阻塞,直到收到答案為止:
```php
$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo ' [.] Got ', $response, "\n";
```
>[danger]關于RPC的說明
盡管RPC在計算中是一種非常常見的模式,但它經常受到批評。當程序員不知道函數調用是本地的還是慢的RPC時,會出現問題。這樣的混淆導致系統不可預測,并增加了調試的不必要的復雜性。錯誤使用RPC可以導致不可維護的意大利面條代碼,而不是簡化軟件。
考慮到這一點,請考慮以下建議:
- 確保顯而易見哪個函數調用是本地的,哪個是遠程的。
- 記錄您的系統。使組件之間的依賴關系變得清晰。
- 處理錯誤案例。當RPC服務器長時間停機時,客戶端應該如何反應?
如有疑問,請避免使用RPC。如果可以,您應該使用異步管道 - 而不是類似RPC的阻塞,將結果異步推送到下一個計算階段。
### 回調隊列
一般來說,通過RabbitMQ進行RPC很容易。客戶端發送請求消息,服務器回復響應消息。為了接收響應,我們需要發送帶有請求的“回調”隊列地址。我們可以使用默認隊列。我們來試試吧:
```php
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$msg = new AMQPMessage(
$payload,
array('reply_to' => $queue_name)
);
$channel->basic_publish($msg, '', 'rpc_queue');
# ... then code to read a response message from the callback_queue ...
```
>[danger]消息屬性
AMQP 0-9-1協議預定義了一組帶有消息的14個屬性。大多數屬性很少使用,但以下情況除外:
`delivery_mode`:將消息標記為持久性(值為2)或瞬態(1)。您可能會記住第二個教程中的這個屬性。
`content_type`:用于描述編碼的mime類型。例如,對于經常使用的JSON編碼,將此屬性設置為:`application / json`是一種很好的做法。
`reply_to`:通常用于命名回調隊列。
`correlation_id`:用于將RPC響應與請求相關聯。
### 相關ID
在上面介紹的方法中,我們建議為每個RPC請求創建一個回調隊列。這是非常低效的,但幸運的是有更好的方法 - 讓我們為每個客戶端創建一個回調隊列。
這引發了一個新問題,在該隊列中收到響應后,不清楚響應屬于哪個請求。那是在使用`correlation_id`屬性的時候。我們將為每個請求將其設置為唯一值。稍后,當我們在回調隊列中收到一條消息時,我們將查看此屬性,并根據該屬性,我們將能夠將響應與請求進行匹配。如果我們看到未知的`correlation_id`值,我們可以安全地丟棄該消息 - 它不屬于我們的請求。
您可能會問,為什么我們應該忽略回調隊列中的未知消息,而不是失敗并出現錯誤?這是由于服務器端可能存在競爭條件。雖然不太可能,但RPC服務器可能會在向我們發送答案之后,但在發送請求的確認消息之前死亡。如果發生這種情況,重新啟動的RPC服務器將再次處理請求。這就是為什么在客戶端上我們必須優雅地處理重復的響應,理想情況下RPC應該是冪等的。
### 總結

我們的RPC將這樣工作:
客戶端啟動時,會創建一個匿名的獨占回調隊列。
對于RPC請求,客戶端發送帶有兩個屬性的消息:`reply_to`(設置為回調隊列)和`correlation_id`(設置為每個請求的唯一值)。
請求被發送到`rpc_queue`隊列。
RPC worker(aka:server)正在等待該隊列上的請求。當出現請求時,它會執行該作業,并使用`reply_to`字段中的隊列將帶有結果的消息發送回客戶端。
客戶端等待回調隊列上的數據。出現消息時,它會檢查`correlation_id`屬性。如果它與請求中的值匹配,則返回對應用程序的響應。
## 歸納
斐波納契任務:
```php
function fib($n)
{
if ($n == 0) {
return 0;
}
if ($n == 1) {
return 1;
}
return fib($n-1) + fib($n-2);
}
```
我們聲明我們的斐波那契函數。它假定只有有效的正整數輸入。(不要指望這個適用于大數字,它可能是最慢的遞歸實現)。
我們的RPC服務器`rpc_server.php`的代碼如下所示:
```php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('rpc_queue', false, false, false, false);
function fib($n)
{
if ($n == 0) {
return 0;
}
if ($n == 1) {
return 1;
}
return fib($n-1) + fib($n-2);
}
echo " [x] Awaiting RPC requests\n";
$callback = function ($req) {
$n = intval($req->body);
echo ' [.] fib(', $n, ")\n";
$msg = new AMQPMessage(
(string) fib($n),
array('correlation_id' => $req->get('correlation_id'))
);
$req->delivery_info['channel']->basic_publish(
$msg,
'',
$req->get('reply_to')
);
$req->delivery_info['channel']->basic_ack(
$req->delivery_info['delivery_tag']
);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
```
服務器代碼非常簡單:
- 像往常一樣,我們首先建立連接,通道和聲明隊列。
- 我們可能希望運行多個服務器進程。為了在多個服務器上平均分配負載,我們需要在`$channel.basic_qos`中設置`prefetch_count`設置。
- 我們使用`basic_consume`來訪問隊列。然后我們進入`while`循環,在其中我們等待請求消息,完成工作并發回響應。
我們的RPC客戶端rpc_client.php的代碼:
```php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class FibonacciRpcClient
{
private $connection;
private $channel;
private $callback_queue;
private $response;
private $corr_id;
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
list($this->callback_queue, ,) = $this->channel->queue_declare(
"",
false,
false,
true,
false
);
$this->channel->basic_consume(
$this->callback_queue,
'',
false,
false,
false,
false,
array(
$this,
'onResponse'
)
);
}
public function onResponse($rep)
{
if ($rep->get('correlation_id') == $this->corr_id) {
$this->response = $rep->body;
}
}
public function call($n)
{
$this->response = null;
$this->corr_id = uniqid();
$msg = new AMQPMessage(
(string) $n,
array(
'correlation_id' => $this->corr_id,
'reply_to' => $this->callback_queue
)
);
$this->channel->basic_publish($msg, '', 'rpc_queue');
while (!$this->response) {
$this->channel->wait();
}
return intval($this->response);
}
}
$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo ' [.] Got ', $response, "\n";
```
現在是查看`rpc_client.php`和`rpc_server.php`的完整示例源代碼的好時機。
我們的RPC服務現已準備就緒。我們可以啟動服務器:
```shell
php rpc_server.php
# => [x] Awaiting RPC requests
```
要請求斐波納契數,請運行客戶端:
```shell
php rpc_client.php
# => [x] Requesting fib(30)
```
此處介紹的設計并不是RPC服務的唯一可能實現,但它具有一些重要優勢:
- 如果RPC服務器太慢,您可以通過運行另一個服務器來擴展。嘗試在新控制臺中運行第二個`rpc_server.php`。
- 在客戶端,RPC只需要發送和接收一條消息。不需要像`queue_declare`這樣的同步調用。因此,對于單個RPC請求,RPC客戶端只需要一次網絡往返。
我們的代碼仍然相當簡單,并不試圖解決更復雜(但重要)的問題,例如:
- 如果沒有運行服務器,客戶應該如何反應?
- 客戶端是否應該為RPC設置某種超時?
- 如果服務器出現故障并引發異常,是否應將其轉發給客戶端?
- 在處理之前防止無效的傳入消息(例如檢查邊界,鍵入)。