# 自定義進程
自定義進程的使用非常靈活,只需要繼承 GoProcess 類,并在配置文件中注冊即可。
下面用一個 redis 隊列的 demo 進行說明。
~~~
<?php
namespace app\Process;
use ESD\Core\Message\Message;
use ESD\Core\Server\Process\Process;
use ESD\Coroutine\Co;
use ESD\Go\GoProcess;
use ESD\Plugins\Redis\RedisConfig;
use ESD\Plugins\Redis\RedisOneConfig;
/**
* Created by PhpStorm.
* User: anythink
* Date: 2019/6/11
* Time: 2:08 PM
*/
class QueueTask extends GoProcess {
use GetLogger;
/**
* @var RedisConfig
*/
private $_configClass;
/**
* @var RedisOneConfig
*/
private $config;
/**
* @var \Redis
*/
protected $redis;
public function loadConfig($default = 'default')
{
$this->_configClass = DIGet(RedisConfig::class);
$this->config = $this->_configClass->getRedisConfigs()[$default];
}
public function onProcessStart()
{
$this->loadConfig();
$this->redis = new \Redis();
while(true){
$this->redis->connect($this->config->getHost(), $this->config->getPort());
if($this->config->getPassword() != ''){
$this->redis->auth($this->config->getPassword());
}
try{
while($val = $this->redis->brPop(['test'],0)){
goWithContext(function () use($val){
$this->process($val);
});
}
}catch (\RedisException $e){
$this->info('RedisException ' . $e->getMessage() .'#'. $e->getCode());
}
//連接超時每隔一秒進行一次重試
Co::sleep(1);
}
}
public function process($val){
Co::sleep(2);
$this->info('process val' , $val);
}
public function onPipeMessage(Message $message, Process $fromProcess)
{
$res = $message->getData();
$this->debug('QueueTask onPipeMessage' . $res);
}
}
~~~
>[danger] 此類實例化了一個新的 redis 連接訪問阻塞的 redis 隊列,請勿直接使用 redis 連接池,否則會長時間占用連接,導致 worker 進程的 redis 可用連接變少。
在 process 方法中實現自身的業務邏輯。
由于process 方法使用了協程調度,所以在 process 方法內執行外部API請求注意以下2點:
1. api請求的 client 是否支持協程,比如使用 saber-plugin,或guzzehttp-saber。不支持協程的 client 將會轉為同步模式,大幅度降低隊列處理的QPS,此時需要多開幾個自定義 process。
2. 使用協程需要注意 api 服務端的流控規則,請參考對應服務端的限流規則合理調整并發數。
# 注冊自定義進程
在配置文件中添加如下配置:
~~~
esd:
process:
queue-0:
name: queue-0
class_name: app\Process\QueueTask
group_name: TaskGroup
queue-1:
name: queue-1
class_name: app\Process\QueueTask
group_name: TaskGroup
queue-2:
name: queue-2
class_name: app\Process\QueueTask
group_name: TaskGroup
~~~
- 前言
- 捐贈ESD項目
- 使用篇-通用
- 環境
- 安裝
- 規范
- 壓力測試
- 配置
- 如何設置YML配置
- server配置
- 端口配置
- 項目結構
- 事件派發
- 日志
- 注解
- DI容器
- 自定義進程
- 并發及協程池
- Console插件
- Scheduled插件
- Redis插件
- AOP插件
- Saber插件
- Mysql插件
- mysql事務
- Actuator插件
- Whoops插件
- Cache插件
- PHPUnit插件
- Security插件
- Session插件
- EasyRoute插件
- http路由
- ProcessRpc插件
- AutoReload插件
- AnnotationsScan插件
- Tracing-plugin插件
- MQTT插件
- Pack插件
- AMQP插件
- Validate插件
- Uid插件
- Topic插件
- Blade插件
- CsvReader插件
- hashed-wheel-timer-plugin插件
- 使用篇-HTTP
- 路由
- 靜態文件
- 路由定義
- 修飾方法
- 路由分組
- 資源路由
- 端口作用域
- 異常處理
- 跨域請求
- 路由緩存
- 控制器
- 控制器初始化
- 前置操作
- 跳轉和重定向
- 異常處理
- 請求
- 請求對象
- 請求信息
- request消息
- response消息
- stream消息
- url接口
- 驗證器
- 內置驗證器
- 內置過濾器
- 使用篇-WS
- 如何使用
- 路由
- 使用篇-TCP
- 插件篇-PluginSystem
- 微服務篇-ESDCloud
- CircuitBreaker插件
- SaberCloud插件
- 分布式鏈路追蹤系統
- Consul插件