>[danger] ## **RabbitMQ 是一個消息代理:它接收并轉發消息。**
RabbitMQ 你可以把它當成一個郵箱,當你把你想發送的郵件投進郵箱時,你可以確定郵遞員最終會把郵件送到你的收件人。
RabbitMQ 它接收、存儲和轉發二進制的數據–即消息(message)。
RabbitMQ中一些常使用術語:
* Publisher:生產者,消息的發送方。
* Connection:網絡連接。
* Channel:信道,多路復用連接中的一條獨立的雙向數據流通道。
* Exchange:交換機(路由器),負責消息的路由到相應隊列。
* Binding:隊列與交換器間的關聯綁定。消費者將關注的隊列綁定到指定交換器上,以便Exchange能準確分發消息到指定隊列。
* Queue:隊列,消息的緩沖存儲區。
* Virtual Host:虛擬主機,虛擬主機提供資源的邏輯分組和分離。包含連接,交換,隊列,綁定,用戶權限,策略等。
* Broker:消息隊列的服務器實體。
* Consumer:消費者,消息的接收方。
RabbitMQ 消息結構:

# **hello world**
現在我們來實現一個生產者負責發送5條消息,一個消費者負責接收消息并打印出來
### **1、生產者**
生產者將會連接 RabbitMQ、發送5條消息、然后關閉連接。

```
<?php
// +----------------------------------------------------------------------
// | najing [ 通用后臺管理系統 ]
// +----------------------------------------------------------------------
// | Copyright (c) 2020 http://www.najingquan.com All rights reserved.
// +----------------------------------------------------------------------
// | Author: 救火隊隊長
// +----------------------------------------------------------------------
namespace app\controller;
use app\BaseController;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class Mq extends BaseController
{
/**
* 功能描述: 生產者,負責發送消息
* @author 救火隊隊長
* @return string
*/
public function send()
{
//隊列名 消息隊列載體,每個消息都會被投入到一個或多個隊列。
$queue = 'hello';
//建立連接
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'zq', '123456', '/');
//獲取信道
$channel = $connection->channel();
//聲明創建隊列
$channel->queue_declare($queue, false, false, false, false);
for ($i=0; $i < 5; ++$i) {
sleep(1);//休眠1秒
//消息內容
$messageBody = "Hello,Zq Now Time:".date("h:i:s");
//將我們需要的消息標記為持久化 - 通過設置AMQPMessage的參數delivery_mode = AMQPMessage::DELIVERY_MODE_PERSISTENT
$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
//發送消息
$channel->basic_publish($message, '', $routing);
echo "Send Message:". $i."\n";
}
//關閉信道
$channel->close();
//關閉連接
$connection->close();
return 'Send Success';
}
}
```
在瀏覽器訪問http://mq.najingquan.com/mq/send,發送5條消息

登陸RabbitMQ Web 管理界面

### **2、消費者**
消費者從 RabbitMQ 中獲取消息,與發布消息的生產者不同,消費者一直運行以監聽消息,有新消息就立即處理

利用TP6自定義命令行實現消費者
```
<?php
// +----------------------------------------------------------------------
// | najing [ 通用后臺管理系統 ]
// +----------------------------------------------------------------------
// | Copyright (c) 2020 http://www.najingquan.com All rights reserved.
// +----------------------------------------------------------------------
// | Author: 救火隊隊長
// +----------------------------------------------------------------------
declare (strict_types = 1);
namespace app\command;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class Mq extends Command
{
protected function configure()
{
// 指令配置
$this->setName('mq')
->setDescription('the mq command');
}
protected function execute(Input $input, Output $output)
{
//隊列名 消息隊列載體,每個消息都會被投入到一個或多個隊列。
$queue = 'hello';
//建立連接
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'zq', '123456', '/');
//獲取信道
$channel = $connection->channel();
//聲明創建隊列
$channel->queue_declare($queue, false, false, false, false);
//消息消費
$channel->basic_consume($queue, '', false, true, false, false, function ($msg) use ($output) {
$output->writeln(" Received " . $msg->body . PHP_EOL);
});
while (count($channel->callbacks)) {
$channel->wait();
}
//關閉信道
$channel->close();
//關閉連接
$connection->close();
}
}
```
在控制臺啟動消費者:

切換到RabbitMQ Web 管理界面,消息已經被消費:

- 消息隊列中間件-前言
- RabbitMQ安裝
- PHP安裝rabbitmq、php-amqplib擴展
- RabbitMQ入門
- 工作隊列(Work Queues)
- 發布/訂閱(Publish/Subscribe)
- 直接交換機 (Direct exchange)
- 通配符交換機(Topic exchange)
- 遠程調用(RPC)
- 延遲隊列、死信隊列
- 重試隊列(可靠性投遞,重試超過3次,入庫告警)
- 消費冪等
- RabbitMQ + think-swoole + Redis秒殺高并發實戰
- redis商品庫存預減
- 秒殺請求入隊,可靠性投遞
- 秒殺請求出隊,生成秒殺訂單,減少商品庫存
- 性能測試 - 單機(2核4G)2000并發,搶購100個商品