<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                # 依賴 redis-server redis php擴展 > 參考[此教程](http://www.yiibai.com/redis/) # 安裝 ## composer 最好通過composer安裝?`composer require topthink/think-queue 1.1.4` ## 手動安裝 去?[https://github.com/top-think/think-queue](https://github.com/top-think/think-queue)?里 把包 下載下來 放入 項目的Vendor里 然后在 common.php 公共函數庫里 加入這么一句: `require './vendor/topthink/think-queue/src/common.php';` 當你在命令行里 切換到項目根目錄后, 執行?`php think queue:work -h` 能出現以下 結果 就表示think-queue的 安裝好了 ![](https://box.kancloud.cn/5ea49d4cb20cd0936e84d1cf6aa3ec49_1482x460.png) # 配置 > 配置文件位于?`application/extra/queue.php` ## 公共配置 ~~~ [ 'connector'=>'sync' //驅動類型,可選擇 sync(默認):同步執行,database:數據庫驅動,redis:Redis驅動,topthink:Topthink驅動 //或其他自定義的完整的類名 ] ~~~ ## 驅動配置 > 各個驅動的具體可用配置項在`think\queue\connector`目錄下各個驅動類里的`options`屬性中,寫在上面的`queue`配置里即可覆蓋 這里我們只需記住 redis 和sync類型的就好了,因為database是官方不推薦的,topthink 完全不開放。 下面是我的配置: ~~~ return [ 'connector'=>'redis', // 'connector'=>'sync', 'expire' => 0, 'default' => 'default', 'host' => '127.0.0.1', 'port' => 6379, 'password' => '', 'select' => 0, 'timeout' => 0, 'persistent' => false ]; ~~~ # 使用 隊列的特點是 先進先出,本質是將同步的調用,轉成別的進程里的 異步任務。 think-queue 是基于cli 的php任務管理。 ## 入隊 首先,入隊是queue的 push 和delay 兩個方法。但是 兩個方法的名稱和參數不一樣。因此,我的建議是 再包一層,封裝成一個方法。 ~~~ /** * 添加隊列任務 * * @param string $job_name 隊列執行的類路徑 不帶走類fire方法 帶@方法 走類@的方法 * @param array $data 傳入數據 * @param mixed $queue_name 隊列名 null 或字符串 * @param integer $delay 延遲執行的時間 單位秒 * @return void */ public function push_job($job_name, $data, $queue_name = null, $delay = 0){ trace($queue_name); config('default_return_type', 'json'); $class_name = \strstr($job_name, '@', true); if(class_exists($class_name)){ if($delay > 0){ $ret = \think\Queue::later($delay, $job_name, $data, $queue_name); }else{ trace($job_name); $ret = \think\Queue::push($job_name, $data, $queue_name); } trace(sprintf("加入任務%s, 時間%s", $job_name, datetime())); return $ret; } return $this->error('job類 '.$job_name.'不存在'); } ~~~ think-queue 支持普通隊列和延遲隊列兩種,分別對應push 和delay。 然后 這兩個方法都有共同參數 $job, $data, $queue。job是隊列入隊的任務執行路徑,比如’A\B@c’ 如果隊列任務類只有一個方法需要隊列執行,請定義為fire, $job里只要指向類路徑就可以了。如果有多個 $job里 要@后接方法名。 > 如果是push的話返回的是一個隊列任務的id ,延遲的話返回null ## 執行隊列任務 隊列任務 類 ,最好以job為后綴,放入相關job目錄里。如果就一個任務,將隊列執行方法 直接命名為fire, 多個子任務的話,方法可以定義多個。在入隊時 用@指定消費的任務+方法> 且傳遞過來的 參數基本?`Job $job, $data`。 舉例: ~~~ public function test(Job $job, $data){ // trace("正在執行隊列:{$job->getName()}"); // $time = rand(0, 4); // sleep($time); $isJobDone = $this->send($data); trace($data); if ($isJobDone) { //如果任務執行成功, 記得刪除任務 $job->delete(); trace("隊列執行完成:".__FUNCTION__." ". datetime()); } else { $try_nums = $job->attempts(); trace("報警內容:".__FUNCTION__.",報警失敗{$try_nums }次, 執行時間:". datetime()); // $job->release(); // $job->delete(); } } ~~~ 隊列的數據data 用戶自己定義,通常是一些查詢需要的條件,或者第三方服務需要的參數。 這里執行的時候注意一定不要拋異常,容易把隊列進程搞掛了。 ## 出隊 當隊列任務執行完畢,最好將任務刪除掉。不然堆積在隊列里,不斷重試。 `$job->delete()` ## 再入隊 `$job->release()` 當默認開啟隊列 最大重試次數參數時, 如果一直失敗 一直release 的話他會無限重試。 所以最好設置一個最大重試次數。 ## 整個任務失敗 當任務重試超過最大嘗試次數后, 最后會執行當前任務類的 failed方法 但是參數只有$data, 當一個類里如果有多個子任務的話 不好區分哪個任務。 ### 失敗事件 首先,我們添加?`queue_failed`?事件標簽, 及其對應的回調方法 ~~~ // 文件路徑: \application\tags.php // 應用行為擴展定義文件 return [ // 應用初始化 'app_init' => [], // 應用開始 'app_begin' => [], // 模塊初始化 'module_init' => [], // 操作開始執行 'action_begin' => [], // 視圖內容過濾 'view_filter' => [], // 日志寫入 'log_write' => [], // 應用結束 'app_end' => [], // 任務失敗統一回調,有四種定義方式 'queue.failed'=> [ // 數組形式,[ 'ClassName' , 'methodName'] ['application\\behavior\\MyQueueFailedLogger', 'logAllFailedQueues'] // 字符串(靜態方法),'StaicClassName::methodName' // 'MyQueueFailedLogger::logAllFailedQueues' // 字符串(對象方法),'ClassName',此時需在對應的ClassName類中添加一個名為 queueFailed 的方法 // 'application\\behavior\\MyQueueFailedLogger' // 閉包形式 /* function( &$jobObject , $extra){ // var_dump($jobObject); return true; } */ ] ]; ~~~ 這里,我們選擇數組形式的回調方式,新增?`\application\behavior\MyQueueFailedLogger`?類,添加一個?`logAllFailedQueues()`?方法 ~~~ <?php /** * 文件路徑: \application\behavior\MyQueueFailedLogger.php * 這是一個行為類,用于處理所有的消息隊列中的任務失敗回調 */ namespace application\behavior; class MyQueueFailedLogger { const should_run_hook_callback = true; /** * @param $jobObject \think\queue\Job //任務對象,保存了該任務的執行情況和業務數據 * @return bool true //是否需要刪除任務并觸發其failed() 方法 */ public function logAllFailedQueues(&$jobObject){ $failedJobLog = [ 'jobHandlerClassName' => $jobObject->getName(), // 'application\index\job\Hello' 'queueName' => $jobObject->getQueue(), // 'helloJobQueue' 'jobData' => $jobObject->getRawBody()['data'], // '{'a': 1 }' 'attempts' => $jobObject->attempts(), // 3 ]; var_export(json_encode($failedJobLog,true)); // $jobObject->release(); //重發任務 //$jobObject->delete(); //刪除任務 //$jobObject->failed(); //通知消費者類任務執行失敗 return self::should_run_hook_callback; } } ~~~ 需要注意該回調方法的返回值: * 返回 true 時,系統會自動刪除該任務,并且自動調用消費者類中的?`failed()`?方法 * 返回 false 時,系統不會自動刪除該任務,也不會自動調用消費者類中的?`failed()`?方法,需要開發者另行處理失敗任務的刪除和通知。 最后,在消費者類中,添加?`failed()`?方法 ~~~ /** * 文件路徑: \application\index\job\HelloJob.php */ /** * 該方法用于接收任務執行失敗的通知,你可以發送郵件給相應的負責人員 * @param $jobData string|array|... //發布任務時傳遞的 jobData 數據 */ public function failed($jobData){ send_mail_to_somebody() ; print("Warning: Job failed after max retries. job data is :".var_export($data,true)."\n"; } ~~~ 這樣,就可以做到任務失敗的記錄與告警 ## 啟動隊列監聽服務 * Work 模式 ~~~ php think queue:work \ --daemon //是否循環執行,如果不加該參數,則該命令處理完下一個消息就退出 --queue helloJobQueue //要處理的隊列的名稱 --delay 0 \ //如果本次任務執行拋出異常且任務未被刪除時,設置其下次執行前延遲多少秒,默認為0 --force \ //系統處于維護狀態時是否仍然處理任務,并未找到相關說明 --memory 128 \ //該進程允許使用的內存上限,以 M 為單位 --sleep 3 \ //如果隊列中無任務,則sleep多少秒后重新檢查(work+daemon模式)或者退出(listen或非daemon模式) --tries 2 //如果任務已經超過嘗試次數上限,則觸發‘任務嘗試次數超限’事件,默認為0 ~~~ * Listen 模式 ~~~ php think queue:listen \ --queue helloJobQueue \ //監聽的隊列的名稱 --delay 0 \ //如果本次任務執行拋出異常且任務未被刪除時,設置其下次執行前延遲多少秒,默認為0 --memory 128 \ //該進程允許使用的內存上限,以 M 為單位 --sleep 3 \ //如果隊列中無任務,則多長時間后重新檢查,daemon模式下有效 --tries 0 \ //如果任務已經超過重發次數上限,則進入失敗處理邏輯,默認為0 --timeout 60 //創建的work子進程的允許執行的最長時間,以秒為單位 ~~~ 可以看到 listen 模式下,不包含?`--deamon`?參數,原因下面會說明 # 注意點 ## 安全 redis 默認是沒密碼的,可以去服務器配置 auth password ## 模式區別 兩者都可以用于處理消息隊列中的任務 區別在于: * 執行原理不同 * work 命令是單進程的處理模式。 按照是否設置了?`--daemon`?參數,work命令又可分為單次執行和循環執行兩種模式。 * 單次執行:不添加?`--daemon`參數,該模式下,work進程在處理完下一個消息后直接結束當前進程。當不存在新消息時,會sleep一段時間然后退出。 * 循環執行:添加了?`--daemon`參數,該模式下,work進程會循環地處理隊列中的消息,直到內存超出參數配置才結束進程。當不存在新消息時,會在每次循環中sleep一段時間。 * listen 命令是?父進程 + 子進程?的處理模式。 listen命令所在的父進程會創建一個單次執行模式的work子進程,并通過該work子進程來處理隊列中的下一個消息,當這個work子進程退出之后,listen命令所在的父進程會監聽到該子進程的退出信號,并重新創建一個新的單次執行的work子進程 * 退出時機不同 * work 命令的退出時機在上面的執行原理部分已敘述,此處不再重復 * listen 命令中,listen所在的父進程正常情況會一直運行,除非遇到下面兩種情況: * 創建的某個work子進程的執行時間超過了 listen命令行中的`--timeout`?參數配置,此時work子進程會被強制結束,listen所在的父進程也會拋出一個?`ProcessTimeoutException`?異常并退出。開發者可以選擇捕獲該異常,讓父進程繼續執行,也可以選擇通過 supervisor 等監控軟件重啟一個新的listen命令。 * listen 命令所在的父進程因某種原因存在內存泄露,則當父進程本身占用的內存超過了命令行中的?`--memory`?參數配置時,父子進程均會退出。正常情況下,listen進程本身占用的內存是穩定不變的。 * 性能不同 * work 命令是在腳本內部做循環,框架腳本在命令執行的初期就已加載完畢; * 而listen模式則是處理完一個任務之后新開一個work進程,此時會重新加載框架腳本。 因此:?work 模式的性能會比listen模式高。 注意:當代碼有更新時,work 模式下需要手動去執行?`php think queue:restart`?命令重啟隊列來使改動生效;而listen 模式會自動生效,無需其他操作。 * 超時控制能力 * work 模式本質上既不能控制進程自身的運行時間,也無法限制執行中的任務的執行時間。 舉例來說,假如你在某次上線之后,在上文中的?`\application\index\job\Hello.php`?消費者的`fire`方法中添加了一段死循環 : ~~~ public function fire(){ while(true){ //死循環 $consoleOutPut->writeln("<info>I am looping forever inside a job.</info> \n"); sleep(1); } } ~~~ 那么這個循環將永遠不能停止,直到任務所在的進程超過內存限制或者由管理員手動結束。這個過程不會有任何的告警。更嚴重的是,如果你配置了expire ,那么這個死循環的任務可能會污染到同樣處理?`helloJobQueue`?隊列的其他work進程,最后好幾個work進程將被卡死在這段死循環中。詳情后文會說明。 work 模式下的超時控制能力,實際上應該理解為 多個work 進程配合下的過期任務重發能力。 * 而 listen命令可以限制其創建的work子進程的超時時間。 listen 命令可通過?`--timeout`?參數限制work子進程允許運行的最長時間,超過該時間限制仍未結束的子進程會被強制結束; * 這里有必要補充一下 expire 和 timeout 之間的區別: * expire 在配置文件中設置,timeout 在 listen命令 的命令行參數中設置,而且,expire 和 timeout 是兩個不同層次上的概念: * expire 是指任務的過期時間。這個時間是全局的,影響到所有的work進程。(不管是獨立的work命令還是 listen 模式下創建的的work子進程) 。expire 針對的對象是?任務。 * timeout 是指work子進程的超時時間。這個時間只對當前執行的listen 命令有效。timeout 針對的對象是?work子進程。 * 使用場景不同 根據上面的介紹,可以看到, work 命令的適用場景是: * 任務數量較多 * 性能要求較高 * 任務的執行時間較短 * 消費者類中不存在死循環,sleep() ,exit() ,die() 等容易導致bug的邏輯 listen命令的適用場景是: * 任務數量較少 * 任務的執行時間較長(如生 成大型的excel報表等), * 任務的執行時間需要有嚴格限制 ## 調試方式 首先,隊列的日志在runtime里cli.log 而不是web的log 因此最好的方式 時自定義一個日志統一存儲位置。 然后,隊列由兩部分組成,業務代碼和隊列部分。 ### 輔助終端神器vscode 編輯代碼時直接 ctrl+` 打開終端 可以開啟多個終端 添加多個隊列 ### 業務測試 最方便的方式是本地將隊列改為同步, 將隊列的任務從推入到消費。都給測試一便。 確保隊列業務邏輯部分沒有異常。 ### 隊列的測試 主要測試 業務執行失敗后,會不會無限循環入隊列,隊列失敗后,有沒有進入報警和寫入錯誤日志。 還有隊列進程的內存,是否會掛掉。 ## 思考 隊列傳送數據格式的定義? 什么時候該用隊列重試,什么時候直接報警讓技術查問題? 是不是應該在告警時留有手動補救執行隊列的方式的url? 延遲隊列是不是 sleep設為0 才精確 大隊列數據時 是不是多幾個進程能加速? 參考文檔: [think-queue 筆記](https://github.com/coolseven/notes/tree/master/thinkphp-queue) [官方文檔](https://github.com/top-think/think-queue)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看