# 并發及協程池
說這個章節之前,先看一下自定義進程章節的示例。
## 自定義進程
自定義進程的使用非常靈活,只需要繼承 GoProcess 類,并在配置文件中注冊即可。
下面用一個 redis 隊列的 demo 進行說明。
~~~
<?php
namespace app\Process;
use ESD\Core\Message\Message;
use ESD\Core\Server\Process\Process;
use ESD\Coroutine\Co;
use ESD\Go\GoProcess;
use ESD\Plugins\Redis\RedisConfig;
use ESD\Plugins\Redis\RedisOneConfig;
/**
* Created by PhpStorm.
* User: anythink
* Date: 2019/6/11
* Time: 2:08 PM
*/
class QueueTask extends GoProcess {
use GetLogger;
/**
* @var RedisConfig
*/
private $_configClass;
/**
* @var RedisOneConfig
*/
private $config;
/**
* @var \Redis
*/
protected $redis;
public function loadConfig($default = 'default')
{
$this->_configClass = DIGet(RedisConfig::class);
$this->config = $this->_configClass->getRedisConfigs()[$default];
}
public function onProcessStart()
{
$this->loadConfig();
$this->redis = new \Redis();
while(true){
$this->redis->connect($this->config->getHost(), $this->config->getPort());
if($this->config->getPassword() != ''){
$this->redis->auth($this->config->getPassword());
}
try{
while($val = $this->redis->brPop(['test'],0)){
goWithContext(function () use($val){
$this->process($val);
});
}
}catch (\RedisException $e){
$this->info('RedisException ' . $e->getMessage() .'#'. $e->getCode());
}
//連接超時每隔一秒進行一次重試
Co::sleep(1);
}
}
public function process($val){
Co::sleep(2);
$this->info('process val' , $val);
}
public function onPipeMessage(Message $message, Process $fromProcess)
{
$res = $message->getData();
$this->debug('QueueTask onPipeMessage' . $res);
}
}
~~~
通過觀察代碼,我們發現消費redis隊列的方法 QueueTask::process 是在 goWithContext 的協程內執行的。
>[danger] ?那么問題來了:消費方法模擬了需要耗時2秒才能處理完畢。那么如果有隊列有10條消息,請問10條消息需要多久才能消費完?
# 并發
帶著上面的問題,接著來說說并發。答案是,20秒嗎?其實僅需2秒。 當我們的執行代碼包裹在 goWithContext 里,那么該方法就會變成并發執行。
再簡單看一個例子
~~~
goWithContext(){
Co::sleep(2)
});
goWithContext(){
Co::sleep(2)
});
~~~
以上代碼,如果不支持協程的話需要4秒執行完畢,如果使用ESD框架,那么2段代碼會并行執行,只需2秒。如果讓10條消息在2秒內處理完,那么在傳統的框架下,需要開10個worker并行處理。試想下,如果有100條,甚至1000條消息呢,你的服務器開得起這么多worker嗎??
# ??上限
上面所提到的并發,實際上是應用了協程的自動切換機制,此處不做過多的擴展,使用這種特性會受到 esd.server.max_coroutine 配置的限制,默認為3000,也就是說最多可以創建3000個協程。
# ?? 協程池
由于框架的worker 也同樣受到 esd.server.max_coroutine 配置的限制,所以如果像上面的例子無腦使用,在高并發下就會出現如下問題
~~~
Warning: go(): exceed max number of coroutine 3000 in /data/vendor/esd/esd-core/src/Core/Common.php on line 109
Warning: go(): exceed max number of coroutine 3000 in /data/vendor/esd/esd-core/src/Core/Common.php on line 109
~~~
你的 request 可能無法繼續處理請求了。當然可以提高max_coroutine的配置,但是終歸不是保險的方案。
那么使用協程池來限制最大并發數,就是一個更優秀的選擇。
看一下例子,還是上面的代碼,這里只摘出修改的部分
~~~
public function onProcessStart()
{
$this->loadConfig();
$this->redis = new \Redis();
$pool = CoPoolFactory::createCoPool('queue_co-' . $this->getProcessId(), 2, 10, 5);
$pool->preStartAllCoreThreads();
while (true) {
$this->redis->connect($this->config->getHost(), $this->config->getPort());
if ($this->config->getPassword() != '') {
$this->redis->auth($this->config->getPassword());
}
try {
while ($val = $this->redis->brPop(['test'], 0)) {
$pool->execute(function() use ($val){
$this->process($val);
});
}
} catch (\RedisException $e) {
$this->info('RedisException ' . $e->getMessage() . '#' . $e->getCode());
}
Co::sleep(1);
}
}
~~~
通過 CoPoolFactory::createCoPool 創建一個協程池,以下代碼創建了最低2個協程,最多10個協程的連接池來控制并發。注意協程池的名稱不要重復。
~~~
$pool = CoPoolFactory::createCoPool('queue_co-' . $this->getProcessId(), 2, 10, 5);
$pool->preStartAllCoreThreads();
~~~
然后使用 $pool->execute 將需要執行的代碼通過匿名函數進行傳遞。
~~~
$pool->execute(function() use ($val){
$this->process($val);
});
~~~
此時超過協程池最大數量的請求就會被阻塞,直到協程空閑。通過此手段,我們就實現了一個可以控制并發數的消費隊列。
## 實際場景
如果隊列消費是請求三方接口,就可以根據其限流規則合理規劃協程池的數量,保證不會因為并發過大被警告。
# 帶有返回的并發執行
在需要執行并發的控制器中使用 new Runnable 創建類,在構造函數中傳遞一個需要并發執行代碼的匿名函數。
使用 CoroutineExecutor::getInstance()->execute() 執行。
使用 $ret_1->getResult() 獲取返回的數據。
~~~
/**
* @GetMapping()
* @return string
*/
public function gorun(){
$ret_1 = new Runnable(function (){
Co::sleep(2);
return Co::getCid();
},true);
$ret_2 = new Runnable(function (){
Co::sleep(2);
return Co::getCid();
},true);
CoroutineExecutor::getInstance()->execute($ret_1);
CoroutineExecutor::getInstance()->execute($ret_2);
$data = [
'ret1' => $ret_1->getResult(),
'ret2' => $ret_2->getResult(),
];
return $this->successResponse($data);
}
~~~
- 前言
- 捐贈ESD項目
- 使用篇-通用
- 環境
- 安裝
- 規范
- 壓力測試
- 配置
- 如何設置YML配置
- server配置
- 端口配置
- 項目結構
- 事件派發
- 日志
- 注解
- DI容器
- 自定義進程
- 并發及協程池
- Console插件
- Scheduled插件
- Redis插件
- AOP插件
- Saber插件
- Mysql插件
- mysql事務
- Actuator插件
- Whoops插件
- Cache插件
- PHPUnit插件
- Security插件
- Session插件
- EasyRoute插件
- http路由
- ProcessRpc插件
- AutoReload插件
- AnnotationsScan插件
- Tracing-plugin插件
- MQTT插件
- Pack插件
- AMQP插件
- Validate插件
- Uid插件
- Topic插件
- Blade插件
- CsvReader插件
- hashed-wheel-timer-plugin插件
- 使用篇-HTTP
- 路由
- 靜態文件
- 路由定義
- 修飾方法
- 路由分組
- 資源路由
- 端口作用域
- 異常處理
- 跨域請求
- 路由緩存
- 控制器
- 控制器初始化
- 前置操作
- 跳轉和重定向
- 異常處理
- 請求
- 請求對象
- 請求信息
- request消息
- response消息
- stream消息
- url接口
- 驗證器
- 內置驗證器
- 內置過濾器
- 使用篇-WS
- 如何使用
- 路由
- 使用篇-TCP
- 插件篇-PluginSystem
- 微服務篇-ESDCloud
- CircuitBreaker插件
- SaberCloud插件
- 分布式鏈路追蹤系統
- Consul插件