<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                # 并發及協程池 說這個章節之前,先看一下自定義進程章節的示例。 ## 自定義進程 自定義進程的使用非常靈活,只需要繼承 GoProcess 類,并在配置文件中注冊即可。 下面用一個 redis 隊列的 demo 進行說明。 ~~~ <?php namespace app\Process; use ESD\Core\Message\Message; use ESD\Core\Server\Process\Process; use ESD\Coroutine\Co; use ESD\Go\GoProcess; use ESD\Plugins\Redis\RedisConfig; use ESD\Plugins\Redis\RedisOneConfig; /** * Created by PhpStorm. * User: anythink * Date: 2019/6/11 * Time: 2:08 PM */ class QueueTask extends GoProcess { use GetLogger; /** * @var RedisConfig */ private $_configClass; /** * @var RedisOneConfig */ private $config; /** * @var \Redis */ protected $redis; public function loadConfig($default = 'default') { $this->_configClass = DIGet(RedisConfig::class); $this->config = $this->_configClass->getRedisConfigs()[$default]; } public function onProcessStart() { $this->loadConfig(); $this->redis = new \Redis(); while(true){ $this->redis->connect($this->config->getHost(), $this->config->getPort()); if($this->config->getPassword() != ''){ $this->redis->auth($this->config->getPassword()); } try{ while($val = $this->redis->brPop(['test'],0)){ goWithContext(function () use($val){ $this->process($val); }); } }catch (\RedisException $e){ $this->info('RedisException ' . $e->getMessage() .'#'. $e->getCode()); } //連接超時每隔一秒進行一次重試 Co::sleep(1); } } public function process($val){ Co::sleep(2); $this->info('process val' , $val); } public function onPipeMessage(Message $message, Process $fromProcess) { $res = $message->getData(); $this->debug('QueueTask onPipeMessage' . $res); } } ~~~ 通過觀察代碼,我們發現消費redis隊列的方法 QueueTask::process 是在 goWithContext 的協程內執行的。 >[danger] ?那么問題來了:消費方法模擬了需要耗時2秒才能處理完畢。那么如果有隊列有10條消息,請問10條消息需要多久才能消費完? # 并發 帶著上面的問題,接著來說說并發。答案是,20秒嗎?其實僅需2秒。 當我們的執行代碼包裹在 goWithContext 里,那么該方法就會變成并發執行。 再簡單看一個例子 ~~~ goWithContext(){ Co::sleep(2) }); goWithContext(){ Co::sleep(2) }); ~~~ 以上代碼,如果不支持協程的話需要4秒執行完畢,如果使用ESD框架,那么2段代碼會并行執行,只需2秒。如果讓10條消息在2秒內處理完,那么在傳統的框架下,需要開10個worker并行處理。試想下,如果有100條,甚至1000條消息呢,你的服務器開得起這么多worker嗎?? # ??上限 上面所提到的并發,實際上是應用了協程的自動切換機制,此處不做過多的擴展,使用這種特性會受到 esd.server.max_coroutine 配置的限制,默認為3000,也就是說最多可以創建3000個協程。 # ?? 協程池 由于框架的worker 也同樣受到 esd.server.max_coroutine 配置的限制,所以如果像上面的例子無腦使用,在高并發下就會出現如下問題 ~~~ Warning: go(): exceed max number of coroutine 3000 in /data/vendor/esd/esd-core/src/Core/Common.php on line 109 Warning: go(): exceed max number of coroutine 3000 in /data/vendor/esd/esd-core/src/Core/Common.php on line 109 ~~~ 你的 request 可能無法繼續處理請求了。當然可以提高max_coroutine的配置,但是終歸不是保險的方案。 那么使用協程池來限制最大并發數,就是一個更優秀的選擇。 看一下例子,還是上面的代碼,這里只摘出修改的部分 ~~~ public function onProcessStart() { $this->loadConfig(); $this->redis = new \Redis(); $pool = CoPoolFactory::createCoPool('queue_co-' . $this->getProcessId(), 2, 10, 5); $pool->preStartAllCoreThreads(); while (true) { $this->redis->connect($this->config->getHost(), $this->config->getPort()); if ($this->config->getPassword() != '') { $this->redis->auth($this->config->getPassword()); } try { while ($val = $this->redis->brPop(['test'], 0)) { $pool->execute(function() use ($val){ $this->process($val); }); } } catch (\RedisException $e) { $this->info('RedisException ' . $e->getMessage() . '#' . $e->getCode()); } Co::sleep(1); } } ~~~ 通過 CoPoolFactory::createCoPool 創建一個協程池,以下代碼創建了最低2個協程,最多10個協程的連接池來控制并發。注意協程池的名稱不要重復。 ~~~ $pool = CoPoolFactory::createCoPool('queue_co-' . $this->getProcessId(), 2, 10, 5); $pool->preStartAllCoreThreads(); ~~~ 然后使用 $pool->execute 將需要執行的代碼通過匿名函數進行傳遞。 ~~~ $pool->execute(function() use ($val){ $this->process($val); }); ~~~ 此時超過協程池最大數量的請求就會被阻塞,直到協程空閑。通過此手段,我們就實現了一個可以控制并發數的消費隊列。 ## 實際場景 如果隊列消費是請求三方接口,就可以根據其限流規則合理規劃協程池的數量,保證不會因為并發過大被警告。 # 帶有返回的并發執行 在需要執行并發的控制器中使用 new Runnable 創建類,在構造函數中傳遞一個需要并發執行代碼的匿名函數。 使用 CoroutineExecutor::getInstance()->execute() 執行。 使用 $ret_1->getResult() 獲取返回的數據。 ~~~ /** * @GetMapping() * @return string */ public function gorun(){ $ret_1 = new Runnable(function (){ Co::sleep(2); return Co::getCid(); },true); $ret_2 = new Runnable(function (){ Co::sleep(2); return Co::getCid(); },true); CoroutineExecutor::getInstance()->execute($ret_1); CoroutineExecutor::getInstance()->execute($ret_2); $data = [ 'ret1' => $ret_1->getResult(), 'ret2' => $ret_2->getResult(), ]; return $this->successResponse($data); } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看