前面一節,介紹了 pheanstalk 的接口文件,目的是為了更接近 pheanstalkd 的使用。
> 這同樣是一個從 0 到 1 了解第三方包如何使用的過程,這是一個學習方法,在其它的包時,也有借鑒意義。
在過了一遍 contracts 之后,我們可以發現,pheanstalk 最接近用戶層(開發者)的接口,就是 PheanstalkInterface 。
那么,使用手冊,我們也通過解讀 PheanstalkInterface 的具體實現 Pheanstalk 類來撰寫。
# 建立連接
```
include_once "vendor/autoload.php";
$conn = \Pheanstalk\Pheanstalk::create('127.0.0.1',11300,10);
```
上面的 create 方法,第一個參數必填,需要傳入目標 beanstalkd 服務器的 ip ,前提是,對于 beanstalkd server 而言,你的 ip (可能是內網,也可能是外網,也可能是本機),必須在它的監聽列表中。
第二個參數是端口,可不填,默認為 11300 ,當然你可以修改。
第三個參數是連接超時時間,可不填,默認為 10 ,表示 10 秒之后還未連接成功就超時了。
**要注意的是:此 create 方法只是返回一個配置好的 pheanstalk 實例,但并非是一個連接好的實例。**
如何理解呢?
從實現上, pheanstalk 的流程是等到有指令了,也就是有 dispatch 指令后,才建立一個 socket 連接,而它與 beanstalkd 通信的底層,用的也是 socket 。
所以,如果你只寫了 create 方法,并不能看出是否能夠連接成功。
你還需要發送一個指令才能知道是否能夠連接成功。
一旦發送過指令之后,Pheanstalk->connection->socket 就會指向一個 socket 資源。
# 關閉、重連
雖然,我們能看到 Pheanstalkd 類中,有 reconnect 方法,還有 $this->connection 的屬性。
然而它們的訪問權限是 private 。
這告訴我們,關閉和重連,不需要我們手動操作。
我們可以看到,這兩個方法,只有在 pheanstalk 向 beanstalkd 發送指令時,會用到。
**因為 PHP 使用 socket 連接時,socket 是有生命周期的,超出一定時間 socket 就會失效,此時,Pheanstalk 的 dispatch 方法就會自動重連。**
我們知道,pheanstalk 的底層通信基于 php 的 socket ,那么,必然具備 socket 的特性,再深入的內容,可以參考 php 手冊中的 socket 部分。
# 生產者操作
## 選擇 tube
```
$conn->useTube('myTube'); // 會返回 this ,因此支持鏈式操作
```
## put job
```
$job = $conn->put('this is my first job !');
```
這里傳入的字符串,就是放到 beanstalkd 中 job 的 body
當然,關于 priority 、 delay 、 ttr 這些參數你可以在 body 后面傳入,具體可參考方法接口。
put 會返回一個 Job 實例,如下:
```
Pheanstalk\Job Object
(
[id:Pheanstalk\Job:private] => 1
[data:Pheanstalk\Job:private] => this is my first job !
)
```
## 鏈式操作
```
$job = $conn->useTube('myTube')->put('this is my 2nd job !');
```
## 默認 Tube
我們知道,默認的 Tube 是 default ,如果我們不使用 useTube 的話,put 的 job 就會在 default tube 中。
# 消費者操作
## 接收 job
### reserve
```
$job = $conn->reserve(); // 默認情況會接收 default tube 的 job
print_r($job); // 如果 reserve 沒能得到 job ,就會一直阻塞在上面
```
上面返回的一定是 Job 實例。
### reserveWithTimeOut
```
try {
$job2 = $conn->reserveWithTimeout(10); // 阻塞接收,10秒之后超時,就不再接收了
print_r($job2);
if ($job2 === null) {
throw new Exception('超時了');
}
// 處理 job ...
sleep(60);
} catch (\Pheanstalk\Exception\DeadlineSoonException $e) {
print_r('deadline soon' . $e->getMessage());
} catch (Exception $e) {
print_r($e->getMessage());
}
```
注意:這個方法,超時的話會返回 null ,成功的話,會返回一個 Job 實例。
還有,上面代碼片段中有一行 sleep ,這一行的作用,是做一個小實驗。
當,job 被成功返回過來,但沒有 sleep 這一行,你會發現,你連續運行兩次該代碼,返回的 job 是同一個。
理論上,我們應該接收到下一個 job ,因為這個 job 被接收了之后,狀態就變成 reserved 而不再是 ready 。
那如何來測試 job 是否真的變成 reserved 了呢?
這就是 sleep 的用處,步驟如下:
- 先往你的 beanstalkd 加入兩個 job
- 再在兩個 cli 窗口運行兩次含 sleep 的消費者代碼
你會發現這個情況,如下圖:


它們分別接收了兩個 job 。
> 這里不要用 web 測試,用 cli 才能更直觀地看到效果。
這個現象,也很好理解:
- reserve 成功之后,你的 php 進程還在于 beanstalkd 保持著 socket 連接,此連接不銷毀,beanstalkd 都會為你凍結此 job 。
- 一旦 php 進程銷毀(執行結束),beanstalkd 沒有接收到其他對 job 的操作,自然就回到 ready 隊列中了
## 刪除 job
```
$conn->delete($job2);
```
這個方法沒有返回值,但如果出錯的話,會拋出異常。
一般在 job 被消費者處理完畢之后才調用 delete 方法。
## 釋放 reserved job
```
$conn->release($job,$priority,$delay);
```
此方法沒有返回值,如果出錯會拋出異常。
此方法可以將 reserved job 重新放回 ready 隊列中,一般在消費者邏輯處理失敗后,才使用這個方法。
## 預留
```
$conn->bury($job,$priority);
```
此方法沒有返回值,如果出錯會拋出異常。
此方法可以將一個 job 操作為 buried 狀態,比如當你的消費者接收到這個 job 時,對 job 進行了一系列檢查,經檢查,發現這個 job 還不能被消費,此時可以將 job 操作為 buried ,直到有客戶端對這個 job 發送了 kick 指令,才會被再次 reserve 。
## 延遲 ttr
```
$conn->touch($job);
```
此方法沒有返回值,如果出錯會拋出異常。
## 添加監聽的管道
```
$conn->watch('test')->watch('sms');
```
此方法返回 $conn 本身,可鏈式操作,每次運行,都會為 watch list 添加一個 tube 。
## 刪除監聽的管道
```
$conn->ignore('email')->ignore('sms');
```
此方法返回 $conn 本身,可鏈式操作,每次運行,都會從 watch list 中移除一個 tube 。
> 默認,watch list 中會有一個 default ,所以,當你執行 watch 時,default 仍然在 watch list 中。
## 僅監聽一個管道
```
$conn->watchOnly('sms');
```
此方法返回 $conn 本身,可鏈式操作。
通過此快捷方法,可以一次性移除 watch list 中所有的 tube ,除了此方法傳入的 tube 以外。
# 其他命令
## 單純獲取 job
```
$conn->peek(new \Pheanstalk\Job(1,'')); // 根據 id 獲取 job ,id 在 beanstalkd 是唯一的,不論處于什么 tube
$conn->useTube('order')->peekReady(); // 從 order tube 中獲取排在最前面的 job(這個順序,同 reserve 的順序)
$conn->peekDelayed(); // 從 default tube 中獲取(最)即將變成 ready 的 delayed job
$conn->peekBuried(); // 返回下一個 buried job
```
> 這里的 peekReady 、 peekDelayed 、 peekBuried 雖然在譯文解釋中,提到「下一個」,但它并非是一個指針,當你調用一起 peekReady ,下一次再調用 peekReady 就自動將指針移動到「下一個」。**如果你不手動將當前得到的 Job 操作為其他狀態,或者延長延遲時間等,你多次調用 peek 命令,他將返回同一個 Job。**
## 批量操作 job 為 ready
```
$conn->useTube('order');
$kicked = $conn->kick(10);
print_r($kicked);
```
上面的代碼片段,是針對 order tube 操作 10 條 buried 或 delayed 為 ready 。
具體可以看 beanstalkd 譯文中的 kick 命令。
## 操作指定 job 為 ready
```
$conn->useTube('sms')->kickJob($job);
```
此方法沒有返回值。
## 獲取 job 的統計信息
```
$job = $conn->reserve();
$res = $conn->statsJob($job);
print_r($res);
```
此方法返回一個 ArrayResponse 實例,具體字段意義,需參考 beanstalkd 譯文。
$res 如下:
```
Pheanstalk\Response\ArrayResponse Object
(
[name:Pheanstalk\Response\ArrayResponse:private] => OK
[storage:ArrayObject:private] => Array
(
[id] => 2
[tube] => default
[state] => reserved
[pri] => 1
[age] => 92865
[delay] => 0
[ttr] => 60
[time-left] => 59
[file] => 0
[reserves] => 8
[timeouts] => 0
[releases] => 0
[buries] => 0
[kicks] => 0
)
)
```
## 獲取 tube 統計信息
```
$res = $conn->statsTube('default');
print_r($res);
```
此方法返回一個 ArrayResponse 實例,具體字段意義,需參考 beanstalkd 譯文。
$res 如下:
```
Pheanstalk\Response\ArrayResponse Object
(
[name:Pheanstalk\Response\ArrayResponse:private] => OK
[storage:ArrayObject:private] => Array
(
[name] => default
[current-jobs-urgent] => 1
[current-jobs-ready] => 2
[current-jobs-reserved] => 0
[current-jobs-delayed] => 0
[current-jobs-buried] => 0
[total-jobs] => 2
[current-using] => 1
[current-watching] => 1
[current-waiting] => 0
[cmd-delete] => 0
[cmd-pause-tube] => 0
[pause] => 0
[pause-time-left] => 0
)
)
```
## 獲取 beanstalkd 服務器統計信息
```
$res = $conn->stats();
print_r($res);
```
此方法返回一個 ArrayResponse 實例,具體字段意義,需參考 beanstalkd 譯文。
$res 如下:
```
Pheanstalk\Response\ArrayResponse Object
(
[name:Pheanstalk\Response\ArrayResponse:private] => OK
[storage:ArrayObject:private] => Array
(
[current-jobs-urgent] => 1
[current-jobs-ready] => 2
[current-jobs-reserved] => 0
[current-jobs-delayed] => 0
[current-jobs-buried] => 0
[cmd-put] => 2
[cmd-peek] => 10
[cmd-peek-ready] => 11
[cmd-peek-delayed] => 10
[cmd-peek-buried] => 10
[cmd-reserve] => 2
[cmd-reserve-with-timeout] => 16
[cmd-delete] => 0
[cmd-release] => 0
[cmd-use] => 7
[cmd-watch] => 4
[cmd-ignore] => 4
[cmd-bury] => 0
[cmd-kick] => 4
[cmd-touch] => 0
[cmd-stats] => 1
[cmd-stats-job] => 1
[cmd-stats-tube] => 1
[cmd-list-tubes] => 0
[cmd-list-tube-used] => 0
[cmd-list-tubes-watched] => 0
[cmd-pause-tube] => 0
[job-timeouts] => 0
[total-jobs] => 2
[max-job-size] => 65535
[current-tubes] => 1
[current-connections] => 1
[current-producers] => 0
[current-workers] => 0
[current-waiting] => 0
[total-connections] => 37
[pid] => 1
[version] => 1.10
[rusage-utime] => 0.020000
[rusage-stime] => 0.030000
[uptime] => 96714
[binlog-oldest-index] => 0
[binlog-current-index] => 0
[binlog-records-migrated] => 0
[binlog-records-written] => 0
[binlog-max-size] => 10485760
[id] => a1e58a6bbd4c3b8b
[hostname] => 91db88742cda
)
)
```
## 查看當前 tube 列表
```
$res = $conn->listTubes();
print_r($res);
```
此方法將返回一個數組,如下:
```
Array
(
[0] => default
[1] => myTube
)
```
## 查看當前 use 的 tube 列表
```
$res = $conn->listTubeUsed();
print_r($res);
```
此方法返回一個字符串,為當前 used 的 tube 名。
注意,同一時間,只有一個 tube 會被 used 。
## 查看當前 watch list
```
$res = $conn->listTubesWatched();
print_r($res);
```
此方法將返回一個數組,如下:
```
Array
(
[0] => default
)
```
## 凍結 tube
```
$conn->pauseTube('default',90);
```
此方法沒有返回值,會將 tube 凍結 90 秒,凍結期間,消費者無法 reserve job ,如果 tube 凍結后,有客戶端發送了 reserve 指令,則會阻塞,直到凍結結束,或 reserve time out 。