# 環境
首先,確定好你的環境已經擁有這些內容:
- php
- redis
- mysql
- phpredis 擴展
而接下來,我們通過運行 Mysql 和 Redis 各自的生產者、消費者,來看消息隊列是否已經在工作。
# 實例化 Queue
打開生產者和消費者,我們可以看到有這么一段代碼:
```
Queue::init('Mysql', [
'dsn' => 'mysql:host=mysql;dbname=test',
'username' => 'root',
'password' => 'root',
'table' => 'queues',
'ttr' => 60,
]); // 隊列初始化
```
或:
```
Queue::init('Redis', [
'ip' => 'redis',
'port' => 6379,
'tubes' => 'tubes'
]); // 隊列初始化
```
如果前面實現環節,我們有達成共識,那么,相信這里你一定已經知道,這段代碼是用來傳入連接參數的。
你需要根據你運行的環境,去單獨設置它。
# 測試步驟
- 打開 4 個 client ( cmd , terminal )窗口
- 在其中兩個命令行中,分別運行如下命令:
```
php Consumer.php // 運行 Mysql 消費者
php RedisConsumer.php // 運行 Redis 消費者
```
運行成功后,這兩個窗口將處于運行狀態,兩個消費者都會始終處于 reserve 循環中,直到接收到數據,才會輸出。
- 再另外兩個窗口,運行如下命令:
```
php Producer.php
php RedisProducer.php
```
# 小結
到此為止,我們的模擬消息隊列就成功了。
我們可以看到,當我們運行生產者的時候,各自對應的消費者窗口中,就會彈出相關的信息。
這就是一個簡單的消息隊列案例。
**但是,這里需要注意,這只是一個模擬案例,它幫助我們理解消息隊列,但并不能應用于實際項目中。**
我們可以推理一下,消息隊列,本該可以使用在高并發的場景,但我們現在實現的消息隊列,它還有許多漏洞,根本無法應用在實戰場景中。
比如:
- mysql 驅動的 reserve 方法,它先接收,接收成功后再修改 job 的狀態,這在高并發的情況下,將導致我們的 job 被重復接收,因為在「接收」和「修改狀態」兩個環節中間,可能會有另一個「請求」進來,此時的 job 還處于 ready 狀態
- redis 驅動的 reserve 方法,使用了 redis 的 rPop 操作,這個方法,會在取出數據的同時,將數據從 list 中刪除,那么,如果我們消費者處理失敗,這個數據就有可能丟失
- redis 驅動的 put 方法,使用了 lPush 操作,此操作執行成功后,將會返回 list 的長度,我們的 redis 驅動會使用這個長度來作為該 job 的下標( id )。然而, list 的長度會不斷變化,并發場景下,你 put 成功后,可能還在處理其他流程,此進程并未結束,此時若有消費者接收走一個消息,則 list 的長度就發生了變化,你在 put 時的 id 就無法與 list 中的數據對應起來