<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                前面一節,介紹了 pheanstalk 的接口文件,目的是為了更接近 pheanstalkd 的使用。 > 這同樣是一個從 0 到 1 了解第三方包如何使用的過程,這是一個學習方法,在其它的包時,也有借鑒意義。 在過了一遍 contracts 之后,我們可以發現,pheanstalk 最接近用戶層(開發者)的接口,就是 PheanstalkInterface 。 那么,使用手冊,我們也通過解讀 PheanstalkInterface 的具體實現 Pheanstalk 類來撰寫。 # 建立連接 ``` include_once "vendor/autoload.php"; $conn = \Pheanstalk\Pheanstalk::create('127.0.0.1',11300,10); ``` 上面的 create 方法,第一個參數必填,需要傳入目標 beanstalkd 服務器的 ip ,前提是,對于 beanstalkd server 而言,你的 ip (可能是內網,也可能是外網,也可能是本機),必須在它的監聽列表中。 第二個參數是端口,可不填,默認為 11300 ,當然你可以修改。 第三個參數是連接超時時間,可不填,默認為 10 ,表示 10 秒之后還未連接成功就超時了。 **要注意的是:此 create 方法只是返回一個配置好的 pheanstalk 實例,但并非是一個連接好的實例。** 如何理解呢? 從實現上, pheanstalk 的流程是等到有指令了,也就是有 dispatch 指令后,才建立一個 socket 連接,而它與 beanstalkd 通信的底層,用的也是 socket 。 所以,如果你只寫了 create 方法,并不能看出是否能夠連接成功。 你還需要發送一個指令才能知道是否能夠連接成功。 一旦發送過指令之后,Pheanstalk->connection->socket 就會指向一個 socket 資源。 # 關閉、重連 雖然,我們能看到 Pheanstalkd 類中,有 reconnect 方法,還有 $this->connection 的屬性。 然而它們的訪問權限是 private 。 這告訴我們,關閉和重連,不需要我們手動操作。 我們可以看到,這兩個方法,只有在 pheanstalk 向 beanstalkd 發送指令時,會用到。 **因為 PHP 使用 socket 連接時,socket 是有生命周期的,超出一定時間 socket 就會失效,此時,Pheanstalk 的 dispatch 方法就會自動重連。** 我們知道,pheanstalk 的底層通信基于 php 的 socket ,那么,必然具備 socket 的特性,再深入的內容,可以參考 php 手冊中的 socket 部分。 # 生產者操作 ## 選擇 tube ``` $conn->useTube('myTube'); // 會返回 this ,因此支持鏈式操作 ``` ## put job ``` $job = $conn->put('this is my first job !'); ``` 這里傳入的字符串,就是放到 beanstalkd 中 job 的 body 當然,關于 priority 、 delay 、 ttr 這些參數你可以在 body 后面傳入,具體可參考方法接口。 put 會返回一個 Job 實例,如下: ``` Pheanstalk\Job Object ( [id:Pheanstalk\Job:private] => 1 [data:Pheanstalk\Job:private] => this is my first job ! ) ``` ## 鏈式操作 ``` $job = $conn->useTube('myTube')->put('this is my 2nd job !'); ``` ## 默認 Tube 我們知道,默認的 Tube 是 default ,如果我們不使用 useTube 的話,put 的 job 就會在 default tube 中。 # 消費者操作 ## 接收 job ### reserve ``` $job = $conn->reserve(); // 默認情況會接收 default tube 的 job print_r($job); // 如果 reserve 沒能得到 job ,就會一直阻塞在上面 ``` 上面返回的一定是 Job 實例。 ### reserveWithTimeOut ``` try { $job2 = $conn->reserveWithTimeout(10); // 阻塞接收,10秒之后超時,就不再接收了 print_r($job2); if ($job2 === null) { throw new Exception('超時了'); } // 處理 job ... sleep(60); } catch (\Pheanstalk\Exception\DeadlineSoonException $e) { print_r('deadline soon' . $e->getMessage()); } catch (Exception $e) { print_r($e->getMessage()); } ``` 注意:這個方法,超時的話會返回 null ,成功的話,會返回一個 Job 實例。 還有,上面代碼片段中有一行 sleep ,這一行的作用,是做一個小實驗。 當,job 被成功返回過來,但沒有 sleep 這一行,你會發現,你連續運行兩次該代碼,返回的 job 是同一個。 理論上,我們應該接收到下一個 job ,因為這個 job 被接收了之后,狀態就變成 reserved 而不再是 ready 。 那如何來測試 job 是否真的變成 reserved 了呢? 這就是 sleep 的用處,步驟如下: - 先往你的 beanstalkd 加入兩個 job - 再在兩個 cli 窗口運行兩次含 sleep 的消費者代碼 你會發現這個情況,如下圖: ![](https://box.kancloud.cn/bbcd9b836e8289007612aa75e859e21f_628x178.png) ![](https://box.kancloud.cn/fa0b5ee917e0be7fffdeb6222dac554d_1294x342.png) 它們分別接收了兩個 job 。 > 這里不要用 web 測試,用 cli 才能更直觀地看到效果。 這個現象,也很好理解: - reserve 成功之后,你的 php 進程還在于 beanstalkd 保持著 socket 連接,此連接不銷毀,beanstalkd 都會為你凍結此 job 。 - 一旦 php 進程銷毀(執行結束),beanstalkd 沒有接收到其他對 job 的操作,自然就回到 ready 隊列中了 ## 刪除 job ``` $conn->delete($job2); ``` 這個方法沒有返回值,但如果出錯的話,會拋出異常。 一般在 job 被消費者處理完畢之后才調用 delete 方法。 ## 釋放 reserved job ``` $conn->release($job,$priority,$delay); ``` 此方法沒有返回值,如果出錯會拋出異常。 此方法可以將 reserved job 重新放回 ready 隊列中,一般在消費者邏輯處理失敗后,才使用這個方法。 ## 預留 ``` $conn->bury($job,$priority); ``` 此方法沒有返回值,如果出錯會拋出異常。 此方法可以將一個 job 操作為 buried 狀態,比如當你的消費者接收到這個 job 時,對 job 進行了一系列檢查,經檢查,發現這個 job 還不能被消費,此時可以將 job 操作為 buried ,直到有客戶端對這個 job 發送了 kick 指令,才會被再次 reserve 。 ## 延遲 ttr ``` $conn->touch($job); ``` 此方法沒有返回值,如果出錯會拋出異常。 ## 添加監聽的管道 ``` $conn->watch('test')->watch('sms'); ``` 此方法返回 $conn 本身,可鏈式操作,每次運行,都會為 watch list 添加一個 tube 。 ## 刪除監聽的管道 ``` $conn->ignore('email')->ignore('sms'); ``` 此方法返回 $conn 本身,可鏈式操作,每次運行,都會從 watch list 中移除一個 tube 。 > 默認,watch list 中會有一個 default ,所以,當你執行 watch 時,default 仍然在 watch list 中。 ## 僅監聽一個管道 ``` $conn->watchOnly('sms'); ``` 此方法返回 $conn 本身,可鏈式操作。 通過此快捷方法,可以一次性移除 watch list 中所有的 tube ,除了此方法傳入的 tube 以外。 # 其他命令 ## 單純獲取 job ``` $conn->peek(new \Pheanstalk\Job(1,'')); // 根據 id 獲取 job ,id 在 beanstalkd 是唯一的,不論處于什么 tube $conn->useTube('order')->peekReady(); // 從 order tube 中獲取排在最前面的 job(這個順序,同 reserve 的順序) $conn->peekDelayed(); // 從 default tube 中獲取(最)即將變成 ready 的 delayed job $conn->peekBuried(); // 返回下一個 buried job ``` > 這里的 peekReady 、 peekDelayed 、 peekBuried 雖然在譯文解釋中,提到「下一個」,但它并非是一個指針,當你調用一起 peekReady ,下一次再調用 peekReady 就自動將指針移動到「下一個」。**如果你不手動將當前得到的 Job 操作為其他狀態,或者延長延遲時間等,你多次調用 peek 命令,他將返回同一個 Job。** ## 批量操作 job 為 ready ``` $conn->useTube('order'); $kicked = $conn->kick(10); print_r($kicked); ``` 上面的代碼片段,是針對 order tube 操作 10 條 buried 或 delayed 為 ready 。 具體可以看 beanstalkd 譯文中的 kick 命令。 ## 操作指定 job 為 ready ``` $conn->useTube('sms')->kickJob($job); ``` 此方法沒有返回值。 ## 獲取 job 的統計信息 ``` $job = $conn->reserve(); $res = $conn->statsJob($job); print_r($res); ``` 此方法返回一個 ArrayResponse 實例,具體字段意義,需參考 beanstalkd 譯文。 $res 如下: ``` Pheanstalk\Response\ArrayResponse Object ( [name:Pheanstalk\Response\ArrayResponse:private] => OK [storage:ArrayObject:private] => Array ( [id] => 2 [tube] => default [state] => reserved [pri] => 1 [age] => 92865 [delay] => 0 [ttr] => 60 [time-left] => 59 [file] => 0 [reserves] => 8 [timeouts] => 0 [releases] => 0 [buries] => 0 [kicks] => 0 ) ) ``` ## 獲取 tube 統計信息 ``` $res = $conn->statsTube('default'); print_r($res); ``` 此方法返回一個 ArrayResponse 實例,具體字段意義,需參考 beanstalkd 譯文。 $res 如下: ``` Pheanstalk\Response\ArrayResponse Object ( [name:Pheanstalk\Response\ArrayResponse:private] => OK [storage:ArrayObject:private] => Array ( [name] => default [current-jobs-urgent] => 1 [current-jobs-ready] => 2 [current-jobs-reserved] => 0 [current-jobs-delayed] => 0 [current-jobs-buried] => 0 [total-jobs] => 2 [current-using] => 1 [current-watching] => 1 [current-waiting] => 0 [cmd-delete] => 0 [cmd-pause-tube] => 0 [pause] => 0 [pause-time-left] => 0 ) ) ``` ## 獲取 beanstalkd 服務器統計信息 ``` $res = $conn->stats(); print_r($res); ``` 此方法返回一個 ArrayResponse 實例,具體字段意義,需參考 beanstalkd 譯文。 $res 如下: ``` Pheanstalk\Response\ArrayResponse Object ( [name:Pheanstalk\Response\ArrayResponse:private] => OK [storage:ArrayObject:private] => Array ( [current-jobs-urgent] => 1 [current-jobs-ready] => 2 [current-jobs-reserved] => 0 [current-jobs-delayed] => 0 [current-jobs-buried] => 0 [cmd-put] => 2 [cmd-peek] => 10 [cmd-peek-ready] => 11 [cmd-peek-delayed] => 10 [cmd-peek-buried] => 10 [cmd-reserve] => 2 [cmd-reserve-with-timeout] => 16 [cmd-delete] => 0 [cmd-release] => 0 [cmd-use] => 7 [cmd-watch] => 4 [cmd-ignore] => 4 [cmd-bury] => 0 [cmd-kick] => 4 [cmd-touch] => 0 [cmd-stats] => 1 [cmd-stats-job] => 1 [cmd-stats-tube] => 1 [cmd-list-tubes] => 0 [cmd-list-tube-used] => 0 [cmd-list-tubes-watched] => 0 [cmd-pause-tube] => 0 [job-timeouts] => 0 [total-jobs] => 2 [max-job-size] => 65535 [current-tubes] => 1 [current-connections] => 1 [current-producers] => 0 [current-workers] => 0 [current-waiting] => 0 [total-connections] => 37 [pid] => 1 [version] => 1.10 [rusage-utime] => 0.020000 [rusage-stime] => 0.030000 [uptime] => 96714 [binlog-oldest-index] => 0 [binlog-current-index] => 0 [binlog-records-migrated] => 0 [binlog-records-written] => 0 [binlog-max-size] => 10485760 [id] => a1e58a6bbd4c3b8b [hostname] => 91db88742cda ) ) ``` ## 查看當前 tube 列表 ``` $res = $conn->listTubes(); print_r($res); ``` 此方法將返回一個數組,如下: ``` Array ( [0] => default [1] => myTube ) ``` ## 查看當前 use 的 tube 列表 ``` $res = $conn->listTubeUsed(); print_r($res); ``` 此方法返回一個字符串,為當前 used 的 tube 名。 注意,同一時間,只有一個 tube 會被 used 。 ## 查看當前 watch list ``` $res = $conn->listTubesWatched(); print_r($res); ``` 此方法將返回一個數組,如下: ``` Array ( [0] => default ) ``` ## 凍結 tube ``` $conn->pauseTube('default',90); ``` 此方法沒有返回值,會將 tube 凍結 90 秒,凍結期間,消費者無法 reserve job ,如果 tube 凍結后,有客戶端發送了 reserve 指令,則會阻塞,直到凍結結束,或 reserve time out 。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看