* * * * *
[TOC]
## 簡介
Laravel 隊列為不同的后臺隊列服務提供統一的 API , 例如 Beanstalk,Amazon SQS, Redis,甚至其他基于關系型數據庫的隊列。 隊列的目的是將耗時的任務延時處理,比如發送郵件,從而大幅度縮短Web請求和相應的時間。
隊列配置文件存放在?`config/queue.php`。 每一種隊列驅動的配置都可以在該文件中找到, 包括數據庫,?[Beanstalkd](https://kr.github.io/beanstalkd/),?[Amazon SQS](https://aws.amazon.com/sqs/),?[Redis](http://redis.io/), 以及同步(本地使用)驅動。 其中還包含了一個`null`隊列驅動用于那些放棄隊列的任務。
### 連接 Vs. 隊列
在開始使用 Laravel 隊列前,弄明白 「連接」 和 「隊列」 的區別是很重要的。在你的?`config/queue.php`?配置文件里, 有一個?`connections`?配置選項。 這個選項給 Amazon SQS, Beanstalk ,或者 Redis 這樣的后端服務定義了一個特有的連接。不管是哪一種,一個給定的連接可能會有多個「隊列」,而 「隊列」可以被認為是不同的棧或者大量的隊列任務。
要注意的是,?`queue`?配置文件中每個連接的配置示例中都包含一個?`queue`?屬性。這是默認隊列,任務被發給指定連接的時候會被分發到這個隊列中。換句話說,如果你分發任務的時候沒有顯式定義隊列,那么它就會被放到連接配置中?`queue`?屬性所定義的隊列中:
~~~
// 這個任務將被分發到默認隊列...
dispatch(new Job);
// 這個任務將被發送到「emails」隊列...
dispatch((new Job)->onQueue('emails'));
~~~
有些應用可能不需要把任務發到不同的隊列,而只發到一個簡單的隊列中就行了。但是把任務推到不同的隊列仍然是非常有用的,因為 Laravel 隊列處理器允許你定義隊列的優先級,所以你能給不同的隊列劃分不同的優先級或者區分不同任務的不同處理方式了。比如說,如果你把任務推到?`high`?隊列中,你就能讓隊列處理器優先處理這些任務了:
~~~
php artisan queue:work --queue=high,default
~~~
### 驅動的必要設置
#### 數據庫
要使用?`database`?這個隊列驅動的話, 你需要創建一個數據表來存儲任務,你可以用?`queue:table`?這個 Artisan 命令來創建這個數據表的遷移。 當遷移創建好以后,就可以用?`migrate`?這條命令來創建數據表:
~~~
php artisan queue:table
php artisan migrate
~~~
#### Redis
為了使用?`redis`?隊列驅動, 你需要在你的配置文件?`config/database.php`?中配置Redis的數據庫連接
如果你的 Redis 隊列連接使用的是 Redis 集群, 你的隊列名稱必須包含?[key hash tag](https://redis.io/topics/cluster-spec#keys-hash-tags)?。 這是為了確保所有的 redis 鍵對于一個給定的隊列都置于同一哈希中:
~~~
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => '{default}',
'retry_after' => 90,
],
~~~
#### 其它隊列驅動的依賴擴展包
在使用列表里的隊列服務前,必須安裝以下依賴擴展包:
* Amazon SQS:?`aws/aws-sdk-php ~3.0`
* Beanstalkd:?`pda/pheanstalk ~3.0`
* Redis:?`predis/predis ~1.0`
## 創建任務
### 生成任務類
在你的應用程序中,隊列的任務類都默認放在?`app/Jobs`?目錄下,如果這個目錄不存在,那當你運行?`make:job`artisan 命令時目錄就會被自動創建。 你可以用以下的 Artisan 命令來生成一個新的隊列任務:
~~~
php artisan make:job SendReminderEmail
~~~
生成的類實現了?`Illuminate\Contracts\Queue\ShouldQueue`?接口,這意味著這個任務將會被推送到隊列中,而不是同步執行。
### 任務類結構
任務類的結構很簡單,一般來說只會包含一個讓隊列用來調用此任務的?`handle`?方法。我們來看一個示例的任務類,這個示例里,假設我們管理著一個播客發布服務,在發布之前需要處理上傳播客文件:
~~~
<?php
namespace App\Jobs;
use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
class ProcessPodcast implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
/**
* 創建一個新的任務實例。
*
* @param Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
/**
* 運行任務。
*
* @param AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// Process uploaded podcast...
}
}
~~~
注意,在這個例子中,我們在任務類的構造器中直接傳遞了一個?[Eloquent 模型](https://laravel-china.org/docs/laravel/5.4/eloquent)。因為我們在任務類里引用了?`SerializesModels`?這個
,使得 Eloquent 模型在處理任務時可以被優雅地序列化和反序列化。如果你的隊列任務類在構造器中接收了一個 Eloquent 模型,那么只有可識別出該模型的屬性會被序列化到隊列里。當任務被實際運行時,隊列系統便會自動從數據庫中重新取回完整的模型。這整個過程對你的應用程序來說是完全透明的,這樣可以避免在序列化完整的 Eloquent 模式實例時所帶來的一些問題。
在隊列處理任務時,會調用?`handle`?方法,而這里我們也可以通過 handle 方法的參數類型提示,讓 Laravel 的?[服務容器](https://laravel-china.org/docs/laravel/5.4/container)?自動注入依賴對象。
> {note} 像圖片內容這種二進制數據, 在放入隊列任務之前必須使用?`base64_encode`?方法轉換一下。 否則,當這項任務放置到隊列中時,可能無法正確序列化為 JSON。
## 分發任務
你寫好任務類后,就能通過?`dispatch`?輔助函數來分發它了。唯一需要傳遞給?`dispatch`?的參數是這個任務類的實例:
~~~
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 保存播客。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 創建播客...
dispatch(new ProcessPodcast($podcast));
}
}
~~~
> {tip}?`dispatch`?提供了一種簡捷、全局可用的函數,它也非常容易測試。查看下 Laravel?[測試文檔](https://laravel-china.org/docs/laravel/5.4/testing)?來了解更多。
### 延遲分發
如果你想延遲執行一個隊列中的任務,你可以用任務實例的?`delay`?方法。 這個方法是?`Illuminate\Bus\Queueable`trait 提供的,而這個 trait 在所有自動生成的任務類中都是默認加載了的。對于延遲任務我們可以舉個例子,比如指定一個被分發10分鐘后才執行的任務:
~~~
<?php
namespace App\Http\Controllers;
use Carbon\Carbon;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 保存一個新的播客。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 創建播客...
$job = (new ProcessPodcast($podcast))
->delay(Carbon::now()->addMinutes(10));
dispatch($job);
}
}
~~~
> {note} Amazon SQS 隊列服務最大延遲 15 分鐘。
### 自定義隊列 & 連接
#### 分發任務到指定隊列
通過推送任務到不同的隊列,你可以給隊列任務分類,甚至可以控制給不同的隊列分配多少任務。記住,這個并不是要推送任務到隊列配置文件中不同的 「connections」 里,而是推送到一個連接中不同的隊列里。要指定隊列的話,就調用任務實例的?`onQueue`?方法:
~~~
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 保存一個新的播客。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 創建播客...
$job = (new ProcessPodcast($podcast))->onQueue('processing');
dispatch($job);
}
}
~~~
#### 分發任務到指定連接
如果你使用了多個隊列連接,你可以把任務推到指定連接。要指定連接的話,你可以調用任務實例的?`onConnection`?方法:
~~~
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 保存一個新的播客。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 創建播客...
$job = (new ProcessPodcast($podcast))->onConnection('sqs');
dispatch($job);
}
}
~~~
當然,你可以鏈式調用?`onConnection`?和?`onQueue`?來同時指定任務的連接和隊列:
~~~
$job = (new ProcessPodcast($podcast))
->onConnection('sqs')
->onQueue('processing');
~~~
### 指定任務最大嘗試次數 / 超時值
#### 最大嘗試次數
在一項任務中指定最大的嘗試次數可以嘗試通過 Artisan 命令行?`--tries`?來設置:
~~~
php artisan queue:work --tries=3
~~~
但是,你可以采取更為精致的方法來完成這項工作比如說在任務類中定義最大嘗試次數。如果在類和命令行中都定義了最大嘗試次數, Laravel 會優先執行任務類中的值:
~~~
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* 任務最大嘗試次數
*
* @var int
*/
public $tries = 5;
}
~~~
#### 超時
同樣的,任務可以運行的最大秒數可以使用 Artisan 命令行上的?`--timeout`?開關指定:
~~~
php artisan queue:work --timeout=30
~~~
然而,你也可以在任務類中定義一個變量來設置可運行的最大描述,如果在類和命令行中都定義了最大嘗試次數, Laravel 會優先執行任務類中的值:
~~~
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* 任務運行的超時時間。
*
* @var int
*/
public $timeout = 120;
}
~~~
### 錯誤處理
如果任務運行的時候拋出異常,這個任務就自動被釋放回隊列,這樣它就能被再重新運行了。如果繼續拋出異常,這個任務會繼續被釋放回隊列,直到重試次數達到你應用允許的最多次數。這個最多次數是在調用?`queue:work`?Artisan 命令的時候通過?`--tries`?參數來定義的。更多隊列處理器的信息可以?[在下面看到](https://laravel-china.org/docs/laravel/5.4/queues/1256#running-the-queue-worker)?。
## 運行隊列處理器
Laravel 包含一個隊列處理器,當新任務被推到隊列中時它能處理這些任務。你可以通過 queue:work Artisan 命令來運行處理器。要注意,一旦?`queue:work`?命令開始,它將一直運行,直到你手動停止或者你關閉控制臺:
~~~
php artisan queue:work
~~~
> {tip} 要讓?`queue:work`?進程永久在后臺運行,你應該使用進程監控工具,比如?`Supervisor`?來保證隊列處理器沒有停止運行。
一定要記得,隊列處理器是長時間運行的進程,并在內存里保存著已經啟動的應用狀態。這樣的結果就是,處理器運行后如果你修改代碼那這些改變是不會應用到處理器中的。所以在你重新部署過程中,一定要?[重啟隊列處理器](https://laravel-china.org/docs/laravel/5.4/queues/1256#queue-workers-and-deployment)?。
#### 指定連接 & 隊列
你可以指定隊列處理器所使用的連接。你在?`config/queue.php`?配置文件里定義了多個連接,而你傳遞給?`work`?命令的連接名字要至少跟它們其中一個是一致的:
~~~
php artisan queue:work redis
~~~
你可以自定義隊列處理器,方式是處理給定連接的特定隊列。舉例來說,如果你所有的郵件都是在?`redis`?連接中的?`emails`?隊列中處理的,你就能通過以下命令啟動一個只處理那個特定隊列的隊列處理器了:
~~~
php artisan queue:work redis --queue=emails
~~~
#### 資源注意事項
守護程序隊列不會在處理每個作業之前 「重新啟動」 框架。 因此,在每個任務完成后,您應該釋放任何占用過大的資源。例如,如果你使用GD庫進行圖像處理,你應該在完成后用?`imagedestroy`?釋放內存。
### 隊列優先級
有時候你希望設置處理隊列的優先級。比如在?`config/queue.php`?里你可能設置了?`redis`?連接中的默認隊列優先級為?`low`,但是你可能偶爾希望把一個任務推到?`high`?優先級的隊列中,像這樣:
~~~
dispatch((new Job)->onQueue('high'));
~~~
要驗證?`high`?隊列中的任務都是在?`low`?隊列中的任務之前處理的,你要啟動一個隊列處理器,傳遞給它隊列名字的列表并以英文逗號,間隔:
~~~
php artisan queue:work --queue=high,low
~~~
### 隊列處理器 & 部署
因為隊列處理器都是 long-lived 進程,如果代碼改變而隊列處理器沒有重啟,他們是不能應用新代碼的。所以最簡單的方式就是重新部署過程中要重啟隊列處理器。你可以很優雅地只輸入?`queue:restart`?來重啟所有隊列處理器。
~~~
php artisan queue:restart
~~~
這個命令將會告訴所有隊列處理器在執行完當前任務后結束進程,這樣才不會有任務丟失。因為隊列處理器在執行?`queue:restart`?命令時對結束進程,你應該運行一個進程管理器,比如?[Supervisor](https://laravel-china.org/docs/laravel/5.4/queues/1256#supervisor-configuration)?來自動重新啟動隊列處理器。
### 任務過期 & 超時
#### 任務過期
`config/queue.php`?配置文件里,每一個隊列連接都定義了一個?`retry_after`?選項。這個選項指定了任務最多處理多少秒后就被當做失敗重試了。比如說,如果這個選項設置為?`90`,那么當這個任務持續執行了?`90`?秒而沒有被刪除,那么它將被釋放回隊列。通常情況下,你應該把?`retry_after`?設置為最長耗時的任務所對應的時間。
> {note} 唯一沒有?`retry_after`?選項的連接是 Amazon SQS。當用 Amazon SQS 時,你必須通過?[Amazon](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html)?命令行來配置這個重試閾值。
#### 隊列處理器超時
`queue:work`?Artisan 命令對外有一個?`--timeout`?選項。這個選項指定了?`Laravel`?隊列處理器最多執行多長時間后就應該被關閉掉。有時候一個隊列的子進程會因為很多原因僵死,比如一個外部的 HTTP 請求沒有響應。這個?`--timeout`?選項會移除超出指定事件限制的僵死進程。
~~~
php artisan queue:work --timeout=60
~~~
`retry_after`?配置選項和?`--timeout`?命令行選項是不一樣的,但是可以同時工作來保證任務不會丟失并且不會重復執行。
> {note}?`--timeout`?應該永遠都要比?`retry_after`?短至少幾秒鐘的時間。這樣就能保證任務進程總能在失敗重試前就被殺死了。如果你的?`--timeout`?選項大于?`retry_after`?配置選項,你的任務可能被執行兩次。
#### 隊列進程睡眠時間
當隊列需要處理任務時,進程將繼續處理任務,它們之間沒有延遲。 但是,如果沒有新的工作可用,`sleep`?參數決定了工作進程將「睡眠」多長時間:
~~~
php artisan queue:work --sleep=3
~~~
## Supervisor 配置
#### 安裝 Supervisor
Supervisor 是一個 Linux 操作系統上的進程監控軟件,它會在?`queue:listen`?或?`queue:work`?命令發生失敗后自動重啟它們。要在 Ubuntu 安裝 Supervisor,可以用以下命令:
~~~
sudo apt-get install supervisor
~~~
> {tip} 如果自己手動配置 Supervisor 聽起來有點難以應付,可以考慮使用?[Laravel Forge](https://forge.laravel.com/)?,它能給你的 Laravel 項目自動安裝與配置 Supervisor。
#### 配置 Supervisor
Supervisor 的配置文件一般是放在?`/etc/supervisor/conf.d`?目錄下,在這個目錄中你可以創建任意數量的配置文件來要求 Supervisor 怎樣監控你的進程。例如我們創建一個?`laravel-worker.conf`?來啟動與監控一個?`queue:work`?進程:
~~~
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
~~~
這個例子里的?`numprocs`?命令會要求 Supervisor 運行并監控 8 個?`queue:work`?進程,并且在它們運行失敗后重新啟動。當然,你必須更改?`command`?命令的?`queue:work sqs`,以顯示你所選擇的隊列驅動。
#### 啟動 Supervisor
當這個配置文件被創建后,你需要更新 Supervisor 的配置,并用以下命令來啟動該進程:
~~~
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start laravel-worker:*
~~~
更多有關 Supervisor 的設置與使用,請參考?[Supervisor](http://supervisord.org/index.html)?官方文檔。
## 處理失敗的任務
有時候你隊列中的任務會失敗。不要擔心,本來事情就不會一帆風順。 Laravel 內置了一個方便的方式來指定任務重試的最大次數。當任務超出這個重試次數后,它就會被插入到?`failed_jobs`?數據表里面。要創建?`failed_jobs`?表的話,你可以用?`queue:failed-table`?命令:
~~~
php artisan queue:failed-table
php artisan migrate
~~~
然后運行隊列處理器,在調用?[queue:work](https://laravel-china.org/docs/laravel/5.4/queues/1256#running-the-queue-worker)?命令時你應該通過?`--tries`?參數指定任務的最大重試次數。如果不指定,任務就會永久重試:
~~~
php artisan queue:work redis --tries=3
~~~
### 清除失敗任務
你可以在任務類里直接定義?`failed`?方法,它能在任務失敗時運行任務的清除邏輯。這個地方用來發一條警告給用戶或者重置任務執行的操作等再好不過了。導致任務失敗的異常信息會被傳遞到?`failed`?方法:
~~~
<?php
namespace App\Jobs;
use Exception;
use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
class ProcessPodcast implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
/**
* 創建一個新的任務實例。
*
* @param Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
/**
* 執行任務。
*
* @param AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// 處理上傳播客...
}
/**
* 要處理的失敗任務。
*
* @param Exception $exception
* @return void
*/
public function failed(Exception $exception)
{
// 給用戶發送失敗通知,等等...
}
}
~~~
任務失敗事件
如果你想注冊一個當隊列任務失敗時會被調用的事件,則可以用?`Queue::failing`?方法。這樣你就有機會通過這個事件來用 e-mail 或?[HipChat](https://www.hipchat.com/)?通知你的團隊。例如我們可以在 Laravel 內置的?`AppServiceProvider`?中對這個事件附加一個回調函數:
~~~
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Support\ServiceProvider;
class AppServiceProvider extends ServiceProvider
{
/**
* 啟動任意應用程序的服務。
*
* @return void
*/
public function boot()
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
/**
* 注冊服務提供者。
*
* @return void
*/
public function register()
{
//
}
}
~~~
### 重試失敗任務
要查看你在?`failed_jobs`?數據表中的所有失敗任務,則可以用?`queue:failed`?這個 Artisan 命令:
~~~
php artisan queue:failed
~~~
`queue:failed`?命令會列出所有任務的 ID、連接、隊列以及失敗時間,任務 ID 可以被用在重試失敗的任務上。例如要重試一個 ID 為?`5`?的失敗任務,其命令如下:
~~~
php artisan queue:retry 5
~~~
要重試所有失敗的任務,可以使用?`queue:retry`?并使用?`all`?作為 ID:
~~~
php artisan queue:retry all
~~~
如果你想刪除掉一個失敗任務,可以用?`queue:forget`?命令:
~~~
php artisan queue:forget 5
~~~
`queue:flush`?命令可以讓你刪除所有失敗的任務:
~~~
php artisan queue:flush
~~~
## 任務事件
使用隊列的?`before`?和?`after`?方法,你能指定任務處理前和處理后的回調處理。在這些回調里正是實現額外的日志記錄或者增加統計數據的好時機。通常情況下,你應該在?[服務容器](https://laravel-china.org/docs/laravel/5.4/providers)?中調用這些方法。例如,我們使用 Laravel 中的 AppServiceProvider:
~~~
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
class AppServiceProvider extends ServiceProvider
{
/**
* 啟動任意服務。
*
* @return void
*/
public function boot()
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
/**
* 注冊服務提供者。
*
* @return void
*/
public function register()
{
//
}
}
~~~
在?`隊列`?[facade](https://laravel-china.org/docs/laravel/5.4/facades)?中使用?`looping`?方法,你可以嘗試在隊列獲取任務之前執行指定的回調方法。舉個例子,你可以用閉包來回滾之前已失敗任務的事務。
~~~
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});
~~~
- 前言
- 翻譯說明
- 發行說明
- 升級說明
- 貢獻導引
- 入門指南
- 安裝
- 配置信息
- 文件夾結構
- 請求周期
- 開發環境部署
- Homestead
- Valet
- 核心概念
- 服務容器
- 服務提供者
- Facades
- Contracts
- HTTP層
- 路由
- 中間件
- CSRF 保護
- 控制器
- 請求
- 響應
- 視圖
- Session
- 表單驗證
- 前端
- Blade 模板
- 本地化
- 前端指南
- 編輯資源 Mix
- 安全
- 用戶認證
- Passport OAuth 認證
- 用戶授權
- 加密解密
- 哈希
- 重置密碼
- 綜合話題
- Artisan 命令行
- 廣播系統
- 緩存系統
- 集合
- 錯誤與日志
- 事件系統
- 文件存儲
- 輔助函數
- 郵件發送
- 消息通知
- 擴展包開發
- 隊列
- 任務調度
- 數據庫
- 快速入門
- 查詢構造器
- 分頁
- 數據庫遷移
- 數據填充
- Redis
- Eloquent ORM
- 快速入門
- 模型關聯
- Eloquent 集合
- 修改器
- 序列化
- 測試
- 快速入門
- HTTP 測試
- 瀏覽器測試 Dusk
- 數據庫測試
- 測試模擬器
- 官方擴展包
- Cashier 交易工具包
- Envoy 部署工具
- Scout 全文搜索
- Socialite 社會化登錄