# AMQP異步任務系統
需要了解的知識。
* [Process](Process.md)
* [RabbitMQ](http://www.rabbitmq.com/documentation.html)
本系統由SD框架和RabbitMQ搭建。
## 創建異步作業進程
通過繼承AMQPTaskProcess,我們來創建一個異步任務作業的進程類。
```
class MyAMQPTaskProcess extends AMQPTaskProcess
{
public function start($process)
{
parent::start($process);
$this->createDirectConsume('msgs');
}
/**
* 路由消息返回class名稱
* @param $body
* @return string
*/
protected function route($body)
{
return TestAMQPTask::class;
}
}
```
通過createDirectConsume函數可以快速創建一個消費隊列。
* createDirectConsume
```
function createDirectConsume($queue, $prefetch_count = 2, $global = false, $exchange = null, $consumerTag = null)
```
一般情況我們只需要設置queue和prefetch_count這倆個參數。
queue為消費隊列的名稱,prefetch_count=2代表這個隊列只能被這個進程同時消費2次,直到消費成功或者失敗,簡單的來說并發為2。
global參數代表這個并發是針對隊列還是進程的。false是針對隊列,true代表是進程。
我們可以多次調用createDirectConsume來消費不同的隊列。
* route
route路由的作用,$body是消費得到的值,這個函數需要返回一個class名。
## 創建作業任務
創建類繼承AMQPTask。
```
class TestAMQPTask extends AMQPTask
{
/**
* @var TestModel
*/
public $TestModel;
public function initialization(AMQPMessage $message)
{
parent::initialization($message);
$this->TestModel = $this->loader->model(TestModel::class, $this);
}
/**
* handle
* @param $body
*/
public function handle($body)
{
var_dump($body);
$this->ack();
}
}
```
* initialization
和Model,Controller一樣用于初始化,或者進行loader
* handle
處理任務,處理任務一定需要調用ack或者是reject。
* ack
任務處理完畢
* reject
```
function reject($requeue = true)
```
任務被拒絕,requeue=true代表這個任務回到隊列,false代表任務被拋棄。
## 創建用戶進程
在AppServer中創建進程
```
/**
* 用戶進程
*/
public function startProcess()
{
parent::startProcess();
for ($i=0;$i<5;$i++)
{
ProcessManager::getInstance()->addProcess(MyAMQPTaskProcess::class,true,$i);
}
}
```
這樣我們創建了5個異步任務進程。
# 注意
1. 消費隊列必須存在,不然會報錯
2. 一定在handle處理結束后調用ack或者reject
3. AMQPTask的initialization和handle均支持協程
- Introduction
- SD 3.X文檔連接
- 導言
- 用戶案例
- 基于Swoole擴展分布式全棧開發框架
- 選擇SD框架助力企業開發
- 捐贈SwooleDistributed項目
- 框架性能報告
- 更新日志
- VIP服務福利
- 安裝與配置
- 【推薦】全自動安裝部署
- 環境要求
- 使用Composer安裝/更新SD框架
- 通過Docker安裝
- 代碼結構
- 啟動命令
- 服務器配置
- 服務器基礎配置server.php
- 客戶端協議配置client.php
- business.php
- log.php
- 微服務及集群配置consul.php
- fileHeader.php
- mysql.php
- redis.php
- 定時任務配置timerTask.php
- 服務器端口配置ports.php
- catCache.php
- 驗證服務啟動成功
- 微服務-Consul
- 日志工具-GrayLog
- 集群-Cluster
- 內核優化
- 入門教學
- 開發流程
- 開發前必讀
- 開發規范
- 基本流程
- 框架入口
- Model數據模型
- Controller控制器
- 協程
- 協程基礎
- 迭代器
- 調度器
- 使用協程的優勢
- 通過協程的方法屏蔽異步同步的區別
- Select多路選擇器
- 協程Sleep
- 通用協程方法
- 設置超時
- 設置無異常
- 設置降級函數
- initAsynPools
- dump
- 封裝器與路由器
- 封裝器
- sendToUid
- 路由器
- sendToUids
- 對象池
- 擴展組件
- 中間件
- Redis使用介紹
- RedisAsynPool
- Redis具體使用
- sendToAll
- RedisRoute
- Redis+Lua
- Mysql使用介紹
- MysqlAsynPool
- Mysql返回值
- 如何獲取構建的mysql語句
- 如何執行一個SQL
- 如何執行事務
- stopTask
- Mysql具體使用
- 異步客戶端
- Loader
- MqttClient
- model
- SdTcpRpcPool
- task
- HttpClientPool
- view
- TcpClientPool
- AMQP
- initialization
- Memory
- destory
- Cache
- Lock
- Pool
- EventDispatcher
- Process
- Cluster
- TimerTask
- Reload
- Consul
- Context
- 自定義進程
- 進程間RPC
- $http_input
- CatCache
- $http_output
- TimerCallBack
- 專題
- HTTP專欄
- TCP專欄
- 基礎知識
- WebSocket專欄
- 微服務
- Consul配置
- RPC
- REST
- AMQP異步任務系統
- MQTT簡易服務器
- Docker化以及資源編排
- 快速搭建公司內部統一的開發環境
- 使用HTTPS/WSS
- 訂閱/發布
- 游戲專題
- 類介紹
- AppServer
- clearState
- onOpenServiceInitialization
- SwooleDistributedServer
- get_instance
- kickUid
- bindUid
- unBindUid
- coroutineUidIsOnline
- coroutineCountOnline
- setTemplateEngine
- isWebSocket
- isTaskWorker
- getSocketName
- initAsynPools
- addAsynPool
- getAsynPool
- getServerAllTaskMessage
- Controller
- onExceptionHandle
- send
- sendToUid
- sendToUids
- sendToAll
- sendToGroup
- close
- getContext
- defaultMethod
- $redis_pool
- $mysql_pool
- $request_type
- $fd
- $uid
- $client_data
- $request
- $response
- $loader
- $logger
- $server
- $config
- Model
- initialization
- destory
- View
- Task
- stopTask
- HttpInput
- postGet
- post
- get
- getPost
- getAllPostGet
- getAllHeader
- getRawContent
- cookie
- getRequestHeader
- server信息
- getRequestMethod
- getRequestUri
- getPathInfo
- HttpOutput
- setStatusHeader
- setContentType
- setHeader
- end
- setCookie
- endFile
- 單元測試