# 隊列
* * * * *
**簡介**
消息隊列中間件是分布式系統中重要的組件,主要解決應用耦合,異步消息,流量削鋒等問題。
實現高性能,高可用,可伸縮和最終一致性架構。
是大型分布式系統不可缺少的中間件。
場景:異步處理、應用解耦、流量削鋒、日志處理
* * *
以上就是關于為什么要使用隊列的大致說明
下面介紹在OneBase使用TP官方開發的隊列think-queue。
首先需要仔細理解下面這張圖

執行流程
1. 命令行`Command`開始監聽隊列`queue:work`
2. 執行進程`Worker`獲取新消息`Queue::pop()`
3. 消息隊列`Queue`返回一個可用的`Job`實例`$job`
3.1 生產者推送`Queue::push()`新消息到消息隊列`Queue`
3.2 消息隊列`Queue`返回是否推送成功給生產者
4. 執行進程`Worker`調用`$job`的`fire()`方法
5. 消息`Job`解析`$job`的`payload`,實例化一個消費者,并調用消費者實例的`fire($job, $data)`方法。
6. 消費者讀取消息內容`$data`,處理業務邏輯,刪除或重發該消息 `$job->delete()` 或 `$job->release()`。
7. 消息`Job`從Database或Redis中刪除消息或重發消息
8. 消息`Job`返回消息處理結果給執行進程`Worker`
9. 執行進程`Worker`在終端輸出響應或結束運行
**一、隊列配置**
隊列配置在app目錄下config.php配置文件中,可以配置驅動,建議Redis。
此處為演示方便使用的數據庫驅動。
```
// +----------------------------------------------------------------------
// | 隊列配置
// +----------------------------------------------------------------------
'queue' => [
// sync驅動表示取消消息隊列還原為同步執行
// 'connector' => 'Sync',
// Redis驅動
// 'connector' => 'redis',
// "expire"=>60,//任務過期時間默認為秒,禁用為null
// "default"=>"default",//默認隊列名稱
// "host"=>Env::get("redis.host", "127.0.0.1"),//Redis主機IP地址
// "port"=>Env::get("redis.port", 6379),//Redis端口
// "password"=>Env::get("redis.password", "123456"),//Redis密碼
// "select"=>5,//Redis數據庫索引
// "timeout"=>0,//Redis連接超時時間
// "persistent"=>false,//是否長連接
// Database驅動
"type"=>"Database",//數據庫驅動
"expire"=>60,//任務過期時間,單位為秒,禁用為null
"default"=>"default",//默認隊列名稱
"table"=>"jobs",//存儲消息的表明,不帶前綴
"dsn"=>[],
]
```
**二、消息創建與推送**
```
// 將任務加入隊列
public function push()
{
$job_data = [];
$job_data["member_id"] = time();
$job_data["to_member_id"] = time();
$job_data["params"] = ['xx' => 'cc', 'vv' => 'bb'];
$is_pushed = Queue::push("app\queue\controller\Test", $job_data, 'test_job_queue');
if($is_pushed !== false ) {
echo date("Y-m-d H:i:s")." a new job is pushed to the message queue";
} else {
echo date("Y-m-d H:i:s")." a new job pushed fail";
}
}
```
$job_data 是隊列中依賴的業務數據
app\queue\controller\Test 是處理隊列數據的類路徑
test_job_queue 是隊列的名稱
**三、啟動消息隊列**

此時執行多次消息推送代碼。


可以看到 消息已經入庫,但是沒有處理。
**四、消費與刪除**
打開 queue 模塊下的控制器與邏輯目錄,參考Test與TestLogic 來編寫相關數據處理業務邏輯代碼。

**五、啟動監聽處理**
命令行執行:php think queue:listen --queue test_job_queue

就能看到 數據表中的任務迅速消失,此時再調用消息創建推送到表中,也會迅速處理。
到此隊列基本上算是使用起來了,任務種類不同比如批量下載任務和批量消息發送任務 可以寫多個不同的消費者類進行處理。^_^
- 序言
- 基礎
- 安裝環境
- 安裝演示
- 規范
- 目錄
- 介紹
- 后臺介紹
- 后臺首頁
- 會員管理
- 系統管理
- 系統設置與配置管理
- 菜單管理
- 系統回收站
- 服務管理
- 插件管理
- 文章管理
- 接口管理
- 優化維護
- SEO管理
- 數據庫
- 文件清理
- 行為日志
- 執行記錄
- 統計分析
- 接口介紹
- 接口文檔
- 錯誤碼設計
- Token介紹
- 前臺介紹
- 架構
- 架構總覽
- 生命周期
- 入口文件
- 模塊設計
- 依賴注入
- 控制器架構
- 邏輯架構
- 驗證架構
- 服務架構
- 模型架構
- 行為架構
- 插件架構
- 配置
- 配置介紹
- 配置加載
- 配置擴展
- 請求
- 請求信息
- 日志
- 后臺行為日志
- 系統執行日志
- 框架日志
- 數據
- 數據庫設計
- 數據字典
- 數據庫操作
- 事務控制
- 混合操作
- 實戰
- 控制器
- 邏輯與驗證
- 視圖與模型
- 插件研發
- 服務研發
- 接口研發
- 雜項
- 數據導入導出
- 二維碼條形碼
- 郵件發送
- 云存儲服務
- 支付服務
- 短信服務
- 微信分享
- 生成海報
- 聊天室
- PJAX
- Demo
- Widget
- 附錄
- 常量參考
- 配置參考
- 函數參考
- 進階
- Redis
- 自動緩存
- 全自動緩存
- 索引
- 數據簽名
- 全自動事務
- 隊列