[TOC]
# Work Queues
> using php-amqplib

在第一篇教程中,我們編寫了兩段程序通過一個指定的隊列來收發消息。在本節教程中,我們將創建一個工作隊列,用于多個工作人員之間分配耗時的任務。
工作隊列(又名:任務隊列)其主要的思想是避免立即執行資源密集型任務,并且阻塞進程等待任務完成。相反,我們讓這類型任務稍后執行,我們將它封裝為消息,并將其添加到任務隊列中。在后臺啟動一個工作進程,讀取工作隊列中的任務,并且解釋執行。當你運行多個工作進程時,他們共享工作隊列中的任務。
這個思想在Web應用程序中特別有用,用于處理在短時間的HTTP請求中無法處理的復雜任務
# 準備
在上一節的教程中,我們發送了一個包含 “hello world” 的消息。下面我們將發送代表復雜任務的字符串。我們當前沒有一個真實的應用場景,比如:調整圖像的大小或者渲染pdf文件,所以我們通過使用 sleep() 函數來模擬復雜的任務需要較多的處理時間的場景。我們假設字符串中的 . 的數量為任務的復雜度;每個.將占用一秒鐘的工作時間。例如 Hello ... 代表任務需要執行 3 秒鐘。
我們稍微修改上一節例子中的 send.php,以允許從命令行發送任意消息。這段程序將會把任務放到工作隊列中,所以我們將其命名為 new_task.php
~~~
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent ", $data, "\n";
~~~
我們之前的 receive.php也需要一些修改:它需要為消息體中的每個 . 偽造一秒的工作時間。它將從隊列中取出消息并解析執行任務,所以我們稱之為 worker.php
~~~
$callback = function($msg){
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
~~~
請注意,我們模擬的任務會占用執行時間。
運行他們的方式和上一節一樣:
~~~
# shell 1
php worker.php
# shell 2
php new_task.php "A very hard task takes two seconds.."
~~~
# 循環調度
使用隊列的有點之一就是能夠輕松地并行工作。如果我們正面臨大量積壓的工作,我們可以啟動更多的工作進程,這樣就可以輕松地擴展系統的處理能力。
首先,我們嘗試同時運行兩個worker.php 腳本。它們都會從隊列中獲取消息,但是究竟如何,讓我們試試看。
你需要打開三個命令行控制臺,其中兩個運行worker.php腳本,它們就是我們啟動的兩個消費者-C1和C2.
~~~
# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
~~~
~~~
# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
~~~
在第三個控制臺我們將發布新的任務。一旦你開始使用消費者,你可以發布一些消息:
~~~
# shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....
~~~
接著我們看看兩個消費者控制臺輸出的內容:
~~~
# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
~~~
~~~
# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
~~~
在默認情況下,RabbitMQ 將按順序將隊列中的消息分發給消費者。平均每個消費者獲得相同數量的消息。這種奮發消息的方式叫做循環(round-robin)。你可以嘗試一下三個或者更多的消費者同時運行。
# 消息確認機制 (ack)
執行任務可能需要幾秒鐘。你可能會想,如果當一個消費者在執行一個長時間的任務的過程中異常退出會怎樣。在我們目前的代碼下,一旦RabbitMQ向消費者發送了消息后,此條消息將立即從內存中刪除。在這種情況下,如果你殺掉一個正在處理消息工作進程,那么我們將丟失這條消息。并且我們還會丟失發送給這個工作進程所有還未處理的消息。
但是我們不想丟失任何任務。如果一個工作進程發生異常退出,我們希望將他在處理的任務交給另一個工作進程繼續處理。
為了確保消息永遠不會丟失,RabbitMQ支持消息確認機制。消費者發送一個確認信息告訴RabbitMQ已經收到消息并且處理完成,此后RabbitMQ就可以自由刪除這條消息了。
如果消費者進程異常退出(其通道關閉,連接斷開或者TCP連接丟失),而不發送確認信息,RabbitMQ將會認為此進程當前處理的消息未處理完成,就會將此消息重新放回消息隊列。如果此時有其他消費者在線,則會迅速將這條消息重新提供給另一個消費者。這樣就可以確保沒有消息丟失。即使有消費者進程異常推出的情況。
不會有消息處理超時的情況,只有當消費者進程異常退出時,RabbitMQ才會將其消息重發。即使處理消息需要一段非常長的時間。
在默認情況下,消息確認機制是關閉的。現在是時候開啟消息確認機制,將basic_consumer的第四個參數設置為false(true表示不開啟消息確認),并且工作進程處理完消息后發送確認消息。
~~~
# 處理消息回調函數
$callback = function($msg){
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
# 開啟消息確認
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
~~~
使用這段代碼,我們可以確認即使在處理消息時,使用 CTRL + C 殺掉一個工作進程,也不會丟失任何消息。工作進程掛掉之后不久,所有未確認的消息將會被重新發送。
# 忘記發送確認消息
忘記發送確認消息是一個常見的錯誤,并且這是一個很容易犯的錯誤,但是它的后果是很嚴重的。當你的客戶端退出時,消息將被重新發送,但RabbitMQ將會消耗越來越多的內存,因為它將無法釋放任何沒有確認信息的消息。
為了調試這種錯誤,你可以使用 rabbitmqctl 打印 messages_unacknowledged 字段:
~~~
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
~~~
在windows 上去掉前面的 sudo
~~~
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
~~~
# 消息持久化
我們已經學習了如何確保即使消費者異常退出后,任務也不會丟失。但是如果RabbitMQ服務器掛了,我們的任務任然會丟失。
當RabbitMQ退出或崩潰時,它會丟失隊列和其中的消息,除非你告訴他要持久化這些信息。需要兩件事來確保消息不會丟失:我們需要將隊列和消息標記為 durable(持久化)。
首先,我們需要確保RabbitMQ不會丟失消息隊列。為了做到這一點,我們需要將隊列聲明為持久化,只要將 queue_declare 的第三個參數設置為 true 就行了:
~~~
$channel->queue_declare('hello', false, true, false, false);
~~~
雖然這個命令本身是正確的,但是在我們目前的設置中是不行的。那是因為我們已經聲明了一個非持久化名為hello的隊列了。RabbitMQ不允許使用不同的參數重新定義一個已經存在的隊列,并且會返回一個錯誤給任何嘗試做此事的程序。但有一個快速的解決方法-我們用不同的名稱聲明一個隊列,發送消息那邊,例如 task_queue:
~~~
$channel->queue_declare('task_queue', false, true, false, false);
~~~
設置為 true 的標志需要同時應用于生產者和消費者中。
到這一步,我們確信即使RabbitMQ重啟,task_queue隊列也不會丟失。現在我們需要將我們需要的消息標記為持久化 - 通過設置AMQPMessage 的參數 delivery = 2:
~~~
$msg = new AMQPMessage($data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
~~~
# 注意消息的持久性
將消息標記為持久化并不能完全確保消息一點也不會丟失。雖然它告訴RabbitMQ將消息保存到磁盤,但是當RabbitMQ接收到消息并且還沒有保存到磁盤之間,仍然有一個很短的時間段。此外,RabbitMQ也不會每來一條信息就執行 fsync - 它可能只是保存在緩存中,而不是真的寫入磁盤。持久化為保證不強,但是對于我們簡單的任務隊列來說已經足夠了。如果你需要更加強大的保證,那么你可以使用 publisher confirms(起初不要過分糾結于此)。
# 公平調度
你可能已經注意到,任務調度并不是完全按照我們所設想一樣執行。比如在兩個消費者的情況下,當所有奇數序號的任務量都很大,并且偶數序號的任務量小的情況下,一個消費者將不斷忙碌,另一個消費者幾乎沒有任何工作量。在這種情境下,RabbitMQ依然不知道什么信息,還是會將消息平均分配。
這種情況的發生是因為RabbitMQ只管分配進入隊列的消息。它不會去看消費者的未確認消息的數量。它只是盲目地向第n個消費者發送第n條消息。

為了解決以上問題,我們可以通過設置 basic_qos 第二個參數 prefetch_count = 1。這一項告訴RabbitMQ不要一次給一個消費者發送多個消息。或者換一種說法,在確認前一個消息之前,不要向消費者發送新的消息。相反,新的消息將發送到一個處于空閑的消費者。
~~~
$channel->basic_qos(null, 1, null);
~~~
# 關于隊列的大小
如果所有的消費者進程都處于忙碌狀態,你的隊列可能會溢出。你應該留意一下這種情況,也許你可以增加更多的消費者進程,或者采取一些其他的策略。
# 程序匯總
我們最終版本的 new_task.php
~~~
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$channel->basic_publish($msg, '', 'task_queue');
echo " [x] Sent ", $data, "\n";
$channel->close();
$connection->close();
~~~
worker.php
~~~
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg){
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
~~~
使用消息確認和預讀取你可以設置一個持久化的工作隊列,即使RabbitMQ重新啟動,持久性選項也能讓任務不丟失。
現在我們可以進入第三節學習向多個消費者發送給相同的消息。
- OAuth
- 簡介
- 步驟
- 單點登錄
- .user.ini
- 時間轉換為今天昨天前天幾天前
- 獲取ip接口
- 協程
- 概念
- yield-from && return-values
- 協程與阻塞的思考
- 中間件
- mysqli異步與php的協程
- 代碼片段
- pdo 執行的sql語句
- 二進制安全
- 捕捉異常中斷
- global
- 利用cookie模擬登陸
- 解析非正常json
- 簡單的對稱加密算法
- RSA 加密
- 過濾掉emoji表情
- 判斷遠程圖片是否存在
- 一分鐘限制請求100次
- 文件處理
- 多文件上傳
- 顯示所有文件
- 文件下載和上面顯示所有文件配合
- 文件的刪除,統計,存數組等
- 圖片處理
- 簡介
- 驗證碼
- 圖片等比縮放
- 批量添加水印
- beanstalkd
- 安裝
- 使用
- RabbitMQ
- 簡介
- debain安裝
- centos安裝
- 常用方法
- 入門
- 工作隊列
- 訂閱,發布
- 路由
- 主題
- 遠程調用RPC
- 消息中間件的選型
- .htaccess
- isset、empty、if區別以及0、‘’、null
- php各版本
- php7.2 不向后兼容的改動
- php中的各種坑
- php7改變
- php慢日志
- 郵件
- PHPMailer實現發郵件
- 驗證郵件地址真實性
- 文件下載
- FastCgi 與 PHP-fpm 之間的關系
- openssl 加解密
- 反射
- 鉤子方法
- 查找插件
- opcode
- opcache使用
- opcache優化
- 分布式一致性hash算法
- 概念
- 哈希算法好壞的四個定義
- php實現
- java實現
- 數組
- jwt
- jwt簡介
- 單點登錄
- phpize
- GeoIP擴展
- php無法獲得https網頁內容的解決方案
- homestead運行的腳本
- Unicode和Utf-8轉換
- php優化
- kafka
- fpm配置
- configure配置詳解