[TOC]
# 介紹
RabbitMQ 是一個消息代理:它接收并轉發消息。你可以把它當成一個郵局,當你把你想發送的郵件投進郵箱時,你可以確定郵遞員最終會把郵件送到你的收件人。在上面的比喻中,我們把 RabbitMQ 比作 郵箱、郵局、郵遞員。
RabbitMQ 和 郵局主要的不同是RabbitMQ不處理真實的信件,它接收、存儲和轉發二進制的數據–消息(message)。
RabbitMQ和消息傳遞(messaging)中一些常使用術語:
* Producing 僅僅指發送消息(message)。發送消息的程序就稱為生產者 producer

* queue 就像是RabbitMQ內部的郵箱。雖然消息在RabbitMQ和你的應用程序之間流動,但它們只能存儲在隊列中。隊列的大小只受主機的內存和磁盤的限制,他本質上是一個打的消息緩沖區。多個生產者可以向一個隊列發送消息,多個消費者也可以嘗試從一個隊列接收數據,這就是隊列。

* 消費和接收郵件相似,consumer 是一個主要等待接收消息的程序。

> 請注意, producer, consumer, broker 并不需要在同一個主機上,盡管它們在大多數應用程序中是存在一個主機上的。
# hello world
在這一章節我們將使用PHP來寫兩段程序: 一個生產者負責發送一條消息,還有一個消費者負責接收消息并打印出來。在這里簡單說一下 php-amqlib API, 主要集中精力來完成上面的例子。
在下面的圖中, P 代表生產者,C 代表消費者。中間的盒子代表一個隊列 - RabbitMQ提供給消費者的一個消息緩沖區。

在你的項目目錄下添加 composer.json (通過composer安裝)
~~~
{
"require": {
"php-amqplib/php-amqplib": ">=2.6.1"
}
}
~~~
安裝
~~~
composer install
~~~
現在我們已經安裝了 php-amqplib 類庫, 接下來我們僅需要寫少量的代碼。
# 生產者

我們將調用我們的消息發布者 sender.php 和我們的消息接受者 receive.php。消息發布者將會連接 RabbitMQ、發送一條消息、然后退出。
在 send.php 中。我們需要引入 php-amqplib 類庫并且 使用必要的類:
~~~
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
~~~
然后我們可以創建一個 RabbitMQ 的連接:
~~~
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
~~~
上面創建的連接是socket連接的抽象,并且完成了協議版本的協商和認證等等。在這里我們連接到一個本地機器的代理 - 因此使用 localhost 。如果我們想要連接不同機器上的代理,我們可以使用它的域名或者IP地址來替換localhost。
接下來我們新建一個 channel, 這是大部分用于完成任務的API所在的地方。
要發送消息,我們必須先聲明一個隊列給我們發送消息使用;然后我們就可以將消息發送到隊列中:
~~~
$channel->queue_declare('hello', false, false, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";
~~~
聲明一個隊列是冪等的 - 只有當它不存在時才會被創建。消息的內容是一個字節數組,所以你可以編碼你喜歡的任何東西。
最后,我們關閉channel 和 connection:
~~~
$channel->close();
$connection->close();
~~~
完整的 send.php:
~~~
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";
$channel->close();
$connection->close();
~~~
>不能正常發送消息
>可能是RabbitMQ 代理沒有足夠的的硬盤空間(默認至少需要 200MB)
# 消費者
我們的消費者是從 RabbitMQ 中獲取消息,因此與發布單個消息的生產者不同,消費者一直運行以監聽消息并將它們打印出來。
receive.php 代碼幾乎和 sender.php 有相同的依賴:
~~~
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
~~~
設置和生產者相同;我們首先打開一個連接和創建channel,并聲明我們要消費的消息隊列。注意: 和生產者發送的隊列名一致。
~~~
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
~~~
>請注意,我們在這里也聲明了隊列。這是因為我們可能在生產者之前啟動消費者,所以我們要確保隊列一定存在,然后再嘗試從中消費信息。
在我們將要告訴服務器將隊列中的消息傳遞給我們之前,我們需要定義一個可以接收服務器發送的消息的回調函數。請注意,消息是從服務器異步發送到客戶端。、
~~~
$callback = function($msg) {
echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
~~~
receive.php 完整代碼:
~~~
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg) {
echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
~~~
整合代碼
現在我們可以運行一下兩個腳本。在命令行下,運行消費者(receive.php)
~~~
php receive.php
~~~
然后,運行生產者(send.php)
~~~
php send.php
~~~
消費者將會打印 生產者通過RabbitMQ 發送的消息。receive.php 將會一直運行,等待接收消息(使用 Ctrl+C 停止),因此嘗試從另一個終端運行 send.php。
>
>列出RabbitMQ中隊列
>你可能希望看到RabbitMQ中有什么隊列和其中有多少消息。你可以使用您可以使用rabbitmqctl 工具
>sudo rabbitmqctl list_queues
>在windows上省略 sudo
>rabbitmqctl list_queues
- OAuth
- 簡介
- 步驟
- 單點登錄
- .user.ini
- 時間轉換為今天昨天前天幾天前
- 獲取ip接口
- 協程
- 概念
- yield-from && return-values
- 協程與阻塞的思考
- 中間件
- mysqli異步與php的協程
- 代碼片段
- pdo 執行的sql語句
- 二進制安全
- 捕捉異常中斷
- global
- 利用cookie模擬登陸
- 解析非正常json
- 簡單的對稱加密算法
- RSA 加密
- 過濾掉emoji表情
- 判斷遠程圖片是否存在
- 一分鐘限制請求100次
- 文件處理
- 多文件上傳
- 顯示所有文件
- 文件下載和上面顯示所有文件配合
- 文件的刪除,統計,存數組等
- 圖片處理
- 簡介
- 驗證碼
- 圖片等比縮放
- 批量添加水印
- beanstalkd
- 安裝
- 使用
- RabbitMQ
- 簡介
- debain安裝
- centos安裝
- 常用方法
- 入門
- 工作隊列
- 訂閱,發布
- 路由
- 主題
- 遠程調用RPC
- 消息中間件的選型
- .htaccess
- isset、empty、if區別以及0、‘’、null
- php各版本
- php7.2 不向后兼容的改動
- php中的各種坑
- php7改變
- php慢日志
- 郵件
- PHPMailer實現發郵件
- 驗證郵件地址真實性
- 文件下載
- FastCgi 與 PHP-fpm 之間的關系
- openssl 加解密
- 反射
- 鉤子方法
- 查找插件
- opcode
- opcache使用
- opcache優化
- 分布式一致性hash算法
- 概念
- 哈希算法好壞的四個定義
- php實現
- java實現
- 數組
- jwt
- jwt簡介
- 單點登錄
- phpize
- GeoIP擴展
- php無法獲得https網頁內容的解決方案
- homestead運行的腳本
- Unicode和Utf-8轉換
- php優化
- kafka
- fpm配置
- configure配置詳解