# job消息隊列實現
ThinkPHP的Queue內置了 Redis、Database、Topthink、Sync四種驅動,這里使用的是 Redis,也推薦使用 Redis
think-queue 隊列消息可以進行任務的發布、獲取、執行、刪除、重新發布、延遲發布、超時控制等操作
## config配置query.php
~~~
<?php
return [
'default' => 'redis',
'connector' => 'sync',
'connections' => [
'sync' => [
'type' => 'sync',
],
'database' => [
'type' => 'database',
'queue' => 'default',
'table' => 'jobs',
'connection' => null,
],
'redis' => [
'type' => 'redis',
'queue' => 'queue',
'host' => '127.0.0.1',
'port' => 6379,
'password' => 'niushop123!@#',
'select' => 0,
'timeout' => 0,
'persistent' => false,
],
],
'failed' => [
'type' => 'none',
'table' => 'failed_jobs',
],
];
~~~
## 生產者:寫入消息隊列
~~~
Queue::push($job_handler_classname, $params);//$job_handler_classname指需要執行的消息類 $params指傳入參數
Queue::later($later_time, $job_handler_classname, $params);//$later_time指延遲時間
~~~
例如計劃任務
~~~
public function execute()
{
$system_config_model = new SystemConfig();
$config = $system_config_model->getSystemConfig()[ 'data' ] ?? [];
$is_open_queue = $config[ 'is_open_queue' ] ?? 0;
$query_execute_time = $is_open_queue == 1 ? time() + 60 : time();
$list = model('cron')->getList([ [ 'execute_time', '<=', $query_execute_time ] ]);
if (!empty($list)) {
foreach ($list as $k => $v) {
$event_res = checkQueue($v, function($params) {
//加入消息隊列
$job_handler_classname = 'Cronexecute';
try {
if ($params[ 'execute_time' ] <= time()) {
Queue::push($job_handler_classname, $params);
} else {
Queue::later($params[ 'execute_time' ] - time(), $job_handler_classname, $params);
}
} catch (\Exception $e) {
$res = $this->error($e->getMessage());
}
return $res ?? $this->success();
}, function($params) {
try {
$res = event($params[ 'event' ], [ 'relate_id' => $params[ 'relate_id' ] ]);
} catch (\Exception $e) {
$res = $this->error($e->getMessage());
}
$data_log = [
'name' => $params[ 'name' ],
'event' => $params[ 'event' ],
'relate_id' => $params[ 'relate_id' ],
'message' => json_encode($res)
];
$this->addCronLog($data_log);
return $res;
});
$event_code = $event_res[ 'code' ] ?? 0;
if ($event_code < 0) {
Log::write('自動任務888');
Log::write($event_res);
continue;
}
//循環任務
if ($v[ 'type' ] == 2) {
$period = $v[ 'period' ] == 0 ? 1 : $v[ 'period' ];
switch ( $v[ 'period_type' ] ) {
case 0://分
$execute_time = $v[ 'execute_time' ] + $period * 60;
break;
case 1://天
$execute_time = strtotime('+' . $period . 'day', $v[ 'execute_time' ]);
break;
case 2://周
$execute_time = strtotime('+' . $period . 'week', $v[ 'execute_time' ]);
break;
case 3://月
$execute_time = strtotime('+' . $period . 'month', $v[ 'execute_time' ]);
break;
}
model('cron')->update([ 'execute_time' => $execute_time ], [ [ 'id', '=', $v[ 'id' ] ] ]);
} else {
model('cron')->delete([ [ 'id', '=', $v[ 'id' ] ] ]);
}
}
}
~~~
## 消費者:指實現消息隊列執行,統一在系統或者插件的job文件夾下
~~~
/**
* 事件通過隊列異步調用
* Class Eventasync
* @package app\job
*/
class Cronexecute
{
public function fire(Job $job, $data)
{
$job->delete();
try {
$res = event($data[ 'event' ], [ 'relate_id' => $data[ 'relate_id' ] ]);
$data_log = [
'name' => $data[ 'name' ],
'event' => $data[ 'event' ],
'relate_id' => $data[ 'relate_id' ],
'message' => json_encode($res)
];
Log::write("計劃任務:{$data[ 'event' ]} relate_id: {$data[ 'relate_id' ]}執行結果:" . json_encode($res, JSON_UNESCAPED_UNICODE));
$cron_model = new Cron();
$cron_model->addCronLog($data_log);
} catch (\Exception $e) {
Log::write($e->getMessage());
$job->delete();
}
}
}
~~~
## 消息隊列啟動
~~~
php think queue:work
~~~
- 序言
- 安裝教程
- 運行環境
- 安裝手冊
- 基礎
- 前期準備
- 偽靜態配置
- 后臺目錄結構
- uniapp(手機端)目錄結構
- 開發命名規范
- 控制器命名規范
- model層命名規范
- 前端(管理頁面)命名規范
- 提示面板
- 表單
- uniapp(手機端)命名規范
- api接口命名規范
- 架構
- 入口文件
- config設置
- app應用目錄
- component(自定義模板組件)
- model層(數據業務層)
- 數據庫操作
- job(消息隊列)
- event(事件)
- request(請求對象)
- common(公共函數)
- log(日志處理)
- lang(語言包)
- addon插件
- 數據字典
- 系統基礎表
- 配送相關表
- 商品相關表
- 網站設置相關
- 會員相關表
- 訂單相關表
- 營銷(組合套餐)
- 營銷(砍價)
- 營銷(優惠券)
- 營銷(滿減)
- 營銷(拼團)
- 營銷(秒殺)
- 店鋪相關表
- 微信相關表
- 門店相關表
- 結算相關表
- 應用(分銷)
- 功能模塊
- 商品模塊
- 會員模塊
- 訂單模塊
- 數據統計
- 消息隊列
- 支付模塊
- 短信模塊
- 客服
- api接口
- 接口開發
- 插件開發
- 事件開發
- 常用事件
- 插件目錄與開發
- 常用插件
- 支付插件
- 拼團插件
- 新人禮