<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                # AMQP異步任務系統 需要了解的知識。 * [Process](Process.md) * [RabbitMQ](http://www.rabbitmq.com/documentation.html) 本系統由SD框架和RabbitMQ搭建。 ## 創建異步作業進程 通過繼承AMQPTaskProcess,我們來創建一個異步任務作業的進程類。 ``` class MyAMQPTaskProcess extends AMQPTaskProcess { public function start($process) { parent::start($process); $this->createDirectConsume('msgs'); } /** * 路由消息返回class名稱 * @param $body * @return string */ protected function route($body) { return TestAMQPTask::class; } } ``` 通過createDirectConsume函數可以快速創建一個消費隊列。 * createDirectConsume ``` function createDirectConsume($queue, $prefetch_count = 2, $global = false, $exchange = null, $consumerTag = null) ``` 一般情況我們只需要設置queue和prefetch_count這倆個參數。 queue為消費隊列的名稱,prefetch_count=2代表這個隊列只能被這個進程同時消費2次,直到消費成功或者失敗,簡單的來說并發為2。 global參數代表這個并發是針對隊列還是進程的。false是針對隊列,true代表是進程。 我們可以多次調用createDirectConsume來消費不同的隊列。 * route route路由的作用,$body是消費得到的值,這個函數需要返回一個class名。 ## 創建作業任務 創建類繼承AMQPTask。 ``` class TestAMQPTask extends AMQPTask { /** * @var TestModel */ public $TestModel; public function initialization(AMQPMessage $message) { parent::initialization($message); $this->TestModel = $this->loader->model(TestModel::class, $this); } /** * handle * @param $body */ public function handle($body) { var_dump($body); $this->ack(); } } ``` * initialization 和Model,Controller一樣用于初始化,或者進行loader * handle 處理任務,處理任務一定需要調用ack或者是reject。 * ack 任務處理完畢 * reject ``` function reject($requeue = true) ``` 任務被拒絕,requeue=true代表這個任務回到隊列,false代表任務被拋棄。 ## 創建用戶進程 在AppServer中創建進程 ``` /** * 用戶進程 */ public function startProcess() { parent::startProcess(); for ($i=0;$i<5;$i++) { ProcessManager::getInstance()->addProcess(MyAMQPTaskProcess::class,true,$i); } } ``` 這樣我們創建了5個異步任務進程。 # 注意 1. 消費隊列必須存在,不然會報錯 2. 一定在handle處理結束后調用ack或者reject 3. AMQPTask的initialization和handle均支持協程
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看