# 依賴
redis-server
redis php擴展
> 參考[此教程](http://www.yiibai.com/redis/)
# 安裝
## composer
最好通過composer安裝?`composer require topthink/think-queue 1.1.4`
## 手動安裝
去?[https://github.com/top-think/think-queue](https://github.com/top-think/think-queue)?里
把包 下載下來 放入 項目的Vendor里
然后在 common.php 公共函數庫里 加入這么一句:
`require './vendor/topthink/think-queue/src/common.php';`
當你在命令行里 切換到項目根目錄后, 執行?`php think queue:work -h`
能出現以下 結果 就表示think-queue的 安裝好了

# 配置
> 配置文件位于?`application/extra/queue.php`
## 公共配置
~~~
[
'connector'=>'sync' //驅動類型,可選擇 sync(默認):同步執行,database:數據庫驅動,redis:Redis驅動,topthink:Topthink驅動
//或其他自定義的完整的類名
]
~~~
## 驅動配置
> 各個驅動的具體可用配置項在`think\queue\connector`目錄下各個驅動類里的`options`屬性中,寫在上面的`queue`配置里即可覆蓋
這里我們只需記住 redis 和sync類型的就好了,因為database是官方不推薦的,topthink 完全不開放。
下面是我的配置:
~~~
return [
'connector'=>'redis',
// 'connector'=>'sync',
'expire' => 0,
'default' => 'default',
'host' => '127.0.0.1',
'port' => 6379,
'password' => '',
'select' => 0,
'timeout' => 0,
'persistent' => false
];
~~~
# 使用
隊列的特點是 先進先出,本質是將同步的調用,轉成別的進程里的 異步任務。
think-queue 是基于cli 的php任務管理。
## 入隊
首先,入隊是queue的 push 和delay 兩個方法。但是 兩個方法的名稱和參數不一樣。因此,我的建議是 再包一層,封裝成一個方法。
~~~
/**
* 添加隊列任務
*
* @param string $job_name 隊列執行的類路徑 不帶走類fire方法 帶@方法 走類@的方法
* @param array $data 傳入數據
* @param mixed $queue_name 隊列名 null 或字符串
* @param integer $delay 延遲執行的時間 單位秒
* @return void
*/
public function push_job($job_name, $data, $queue_name = null, $delay = 0){
trace($queue_name);
config('default_return_type', 'json');
$class_name = \strstr($job_name, '@', true);
if(class_exists($class_name)){
if($delay > 0){
$ret = \think\Queue::later($delay, $job_name, $data, $queue_name);
}else{
trace($job_name);
$ret = \think\Queue::push($job_name, $data, $queue_name);
}
trace(sprintf("加入任務%s, 時間%s", $job_name, datetime()));
return $ret;
}
return $this->error('job類 '.$job_name.'不存在');
}
~~~
think-queue 支持普通隊列和延遲隊列兩種,分別對應push 和delay。
然后 這兩個方法都有共同參數 $job, $data, $queue。job是隊列入隊的任務執行路徑,比如’A\B@c’
如果隊列任務類只有一個方法需要隊列執行,請定義為fire, $job里只要指向類路徑就可以了。如果有多個 $job里 要@后接方法名。
> 如果是push的話返回的是一個隊列任務的id ,延遲的話返回null
## 執行隊列任務
隊列任務 類 ,最好以job為后綴,放入相關job目錄里。如果就一個任務,將隊列執行方法 直接命名為fire, 多個子任務的話,方法可以定義多個。在入隊時 用@指定消費的任務+方法>
且傳遞過來的 參數基本?`Job $job, $data`。
舉例:
~~~
public function test(Job $job, $data){
// trace("正在執行隊列:{$job->getName()}");
// $time = rand(0, 4);
// sleep($time);
$isJobDone = $this->send($data);
trace($data);
if ($isJobDone) {
//如果任務執行成功, 記得刪除任務
$job->delete();
trace("隊列執行完成:".__FUNCTION__." ". datetime());
} else {
$try_nums = $job->attempts();
trace("報警內容:".__FUNCTION__.",報警失敗{$try_nums }次, 執行時間:". datetime());
// $job->release();
// $job->delete();
}
}
~~~
隊列的數據data 用戶自己定義,通常是一些查詢需要的條件,或者第三方服務需要的參數。
這里執行的時候注意一定不要拋異常,容易把隊列進程搞掛了。
## 出隊
當隊列任務執行完畢,最好將任務刪除掉。不然堆積在隊列里,不斷重試。
`$job->delete()`
## 再入隊
`$job->release()`
當默認開啟隊列 最大重試次數參數時, 如果一直失敗 一直release 的話他會無限重試。
所以最好設置一個最大重試次數。
## 整個任務失敗
當任務重試超過最大嘗試次數后, 最后會執行當前任務類的 failed方法 但是參數只有$data, 當一個類里如果有多個子任務的話 不好區分哪個任務。
### 失敗事件
首先,我們添加?`queue_failed`?事件標簽, 及其對應的回調方法
~~~
// 文件路徑: \application\tags.php
// 應用行為擴展定義文件
return [
// 應用初始化
'app_init' => [],
// 應用開始
'app_begin' => [],
// 模塊初始化
'module_init' => [],
// 操作開始執行
'action_begin' => [],
// 視圖內容過濾
'view_filter' => [],
// 日志寫入
'log_write' => [],
// 應用結束
'app_end' => [],
// 任務失敗統一回調,有四種定義方式
'queue.failed'=> [
// 數組形式,[ 'ClassName' , 'methodName']
['application\\behavior\\MyQueueFailedLogger', 'logAllFailedQueues']
// 字符串(靜態方法),'StaicClassName::methodName'
// 'MyQueueFailedLogger::logAllFailedQueues'
// 字符串(對象方法),'ClassName',此時需在對應的ClassName類中添加一個名為 queueFailed 的方法
// 'application\\behavior\\MyQueueFailedLogger'
// 閉包形式
/*
function( &$jobObject , $extra){
// var_dump($jobObject);
return true;
}
*/
]
];
~~~
這里,我們選擇數組形式的回調方式,新增?`\application\behavior\MyQueueFailedLogger`?類,添加一個?`logAllFailedQueues()`?方法
~~~
<?php
/**
* 文件路徑: \application\behavior\MyQueueFailedLogger.php
* 這是一個行為類,用于處理所有的消息隊列中的任務失敗回調
*/
namespace application\behavior;
class MyQueueFailedLogger {
const should_run_hook_callback = true;
/**
* @param $jobObject \think\queue\Job //任務對象,保存了該任務的執行情況和業務數據
* @return bool true //是否需要刪除任務并觸發其failed() 方法
*/
public function logAllFailedQueues(&$jobObject){
$failedJobLog = [
'jobHandlerClassName' => $jobObject->getName(), // 'application\index\job\Hello'
'queueName' => $jobObject->getQueue(), // 'helloJobQueue'
'jobData' => $jobObject->getRawBody()['data'], // '{'a': 1 }'
'attempts' => $jobObject->attempts(), // 3
];
var_export(json_encode($failedJobLog,true));
// $jobObject->release(); //重發任務
//$jobObject->delete(); //刪除任務
//$jobObject->failed(); //通知消費者類任務執行失敗
return self::should_run_hook_callback;
}
}
~~~
需要注意該回調方法的返回值:
* 返回 true 時,系統會自動刪除該任務,并且自動調用消費者類中的?`failed()`?方法
* 返回 false 時,系統不會自動刪除該任務,也不會自動調用消費者類中的?`failed()`?方法,需要開發者另行處理失敗任務的刪除和通知。
最后,在消費者類中,添加?`failed()`?方法
~~~
/**
* 文件路徑: \application\index\job\HelloJob.php
*/
/**
* 該方法用于接收任務執行失敗的通知,你可以發送郵件給相應的負責人員
* @param $jobData string|array|... //發布任務時傳遞的 jobData 數據
*/
public function failed($jobData){
send_mail_to_somebody() ;
print("Warning: Job failed after max retries. job data is :".var_export($data,true)."\n";
}
~~~
這樣,就可以做到任務失敗的記錄與告警
## 啟動隊列監聽服務
* Work 模式
~~~
php think queue:work \
--daemon //是否循環執行,如果不加該參數,則該命令處理完下一個消息就退出
--queue helloJobQueue //要處理的隊列的名稱
--delay 0 \ //如果本次任務執行拋出異常且任務未被刪除時,設置其下次執行前延遲多少秒,默認為0
--force \ //系統處于維護狀態時是否仍然處理任務,并未找到相關說明
--memory 128 \ //該進程允許使用的內存上限,以 M 為單位
--sleep 3 \ //如果隊列中無任務,則sleep多少秒后重新檢查(work+daemon模式)或者退出(listen或非daemon模式)
--tries 2 //如果任務已經超過嘗試次數上限,則觸發‘任務嘗試次數超限’事件,默認為0
~~~
* Listen 模式
~~~
php think queue:listen \
--queue helloJobQueue \ //監聽的隊列的名稱
--delay 0 \ //如果本次任務執行拋出異常且任務未被刪除時,設置其下次執行前延遲多少秒,默認為0
--memory 128 \ //該進程允許使用的內存上限,以 M 為單位
--sleep 3 \ //如果隊列中無任務,則多長時間后重新檢查,daemon模式下有效
--tries 0 \ //如果任務已經超過重發次數上限,則進入失敗處理邏輯,默認為0
--timeout 60 //創建的work子進程的允許執行的最長時間,以秒為單位
~~~
可以看到 listen 模式下,不包含?`--deamon`?參數,原因下面會說明
# 注意點
## 安全
redis 默認是沒密碼的,可以去服務器配置 auth password
## 模式區別
兩者都可以用于處理消息隊列中的任務
區別在于:
* 執行原理不同
* work 命令是單進程的處理模式。
按照是否設置了?`--daemon`?參數,work命令又可分為單次執行和循環執行兩種模式。
* 單次執行:不添加?`--daemon`參數,該模式下,work進程在處理完下一個消息后直接結束當前進程。當不存在新消息時,會sleep一段時間然后退出。
* 循環執行:添加了?`--daemon`參數,該模式下,work進程會循環地處理隊列中的消息,直到內存超出參數配置才結束進程。當不存在新消息時,會在每次循環中sleep一段時間。
* listen 命令是?父進程 + 子進程?的處理模式。
listen命令所在的父進程會創建一個單次執行模式的work子進程,并通過該work子進程來處理隊列中的下一個消息,當這個work子進程退出之后,listen命令所在的父進程會監聽到該子進程的退出信號,并重新創建一個新的單次執行的work子進程
* 退出時機不同
* work 命令的退出時機在上面的執行原理部分已敘述,此處不再重復
* listen 命令中,listen所在的父進程正常情況會一直運行,除非遇到下面兩種情況:
* 創建的某個work子進程的執行時間超過了 listen命令行中的`--timeout`?參數配置,此時work子進程會被強制結束,listen所在的父進程也會拋出一個?`ProcessTimeoutException`?異常并退出。開發者可以選擇捕獲該異常,讓父進程繼續執行,也可以選擇通過 supervisor 等監控軟件重啟一個新的listen命令。
* listen 命令所在的父進程因某種原因存在內存泄露,則當父進程本身占用的內存超過了命令行中的?`--memory`?參數配置時,父子進程均會退出。正常情況下,listen進程本身占用的內存是穩定不變的。
* 性能不同
* work 命令是在腳本內部做循環,框架腳本在命令執行的初期就已加載完畢;
* 而listen模式則是處理完一個任務之后新開一個work進程,此時會重新加載框架腳本。
因此:?work 模式的性能會比listen模式高。
注意:當代碼有更新時,work 模式下需要手動去執行?`php think queue:restart`?命令重啟隊列來使改動生效;而listen 模式會自動生效,無需其他操作。
* 超時控制能力
* work 模式本質上既不能控制進程自身的運行時間,也無法限制執行中的任務的執行時間。
舉例來說,假如你在某次上線之后,在上文中的?`\application\index\job\Hello.php`?消費者的`fire`方法中添加了一段死循環 :
~~~
public function fire(){
while(true){ //死循環
$consoleOutPut->writeln("<info>I am looping forever inside a job.</info> \n");
sleep(1);
}
}
~~~
那么這個循環將永遠不能停止,直到任務所在的進程超過內存限制或者由管理員手動結束。這個過程不會有任何的告警。更嚴重的是,如果你配置了expire ,那么這個死循環的任務可能會污染到同樣處理?`helloJobQueue`?隊列的其他work進程,最后好幾個work進程將被卡死在這段死循環中。詳情后文會說明。
work 模式下的超時控制能力,實際上應該理解為 多個work 進程配合下的過期任務重發能力。
* 而 listen命令可以限制其創建的work子進程的超時時間。
listen 命令可通過?`--timeout`?參數限制work子進程允許運行的最長時間,超過該時間限制仍未結束的子進程會被強制結束;
* 這里有必要補充一下 expire 和 timeout 之間的區別:
* expire 在配置文件中設置,timeout 在 listen命令 的命令行參數中設置,而且,expire 和 timeout 是兩個不同層次上的概念:
* expire 是指任務的過期時間。這個時間是全局的,影響到所有的work進程。(不管是獨立的work命令還是 listen 模式下創建的的work子進程) 。expire 針對的對象是?任務。
* timeout 是指work子進程的超時時間。這個時間只對當前執行的listen 命令有效。timeout 針對的對象是?work子進程。
* 使用場景不同
根據上面的介紹,可以看到,
work 命令的適用場景是:
* 任務數量較多
* 性能要求較高
* 任務的執行時間較短
* 消費者類中不存在死循環,sleep() ,exit() ,die() 等容易導致bug的邏輯
listen命令的適用場景是:
* 任務數量較少
* 任務的執行時間較長(如生 成大型的excel報表等),
* 任務的執行時間需要有嚴格限制
## 調試方式
首先,隊列的日志在runtime里cli.log 而不是web的log
因此最好的方式 時自定義一個日志統一存儲位置。
然后,隊列由兩部分組成,業務代碼和隊列部分。
### 輔助終端神器vscode
編輯代碼時直接 ctrl+` 打開終端 可以開啟多個終端 添加多個隊列
### 業務測試
最方便的方式是本地將隊列改為同步, 將隊列的任務從推入到消費。都給測試一便。
確保隊列業務邏輯部分沒有異常。
### 隊列的測試
主要測試 業務執行失敗后,會不會無限循環入隊列,隊列失敗后,有沒有進入報警和寫入錯誤日志。
還有隊列進程的內存,是否會掛掉。
## 思考
隊列傳送數據格式的定義?
什么時候該用隊列重試,什么時候直接報警讓技術查問題? 是不是應該在告警時留有手動補救執行隊列的方式的url?
延遲隊列是不是 sleep設為0 才精確
大隊列數據時 是不是多幾個進程能加速?
參考文檔:
[think-queue 筆記](https://github.com/coolseven/notes/tree/master/thinkphp-queue)
[官方文檔](https://github.com/top-think/think-queue)