[TOC]
# 介紹


* 生產者( producer )
在圖中為 P,表示消息的發送者。
* 交換機( exchanges )
在圖中為 X, 生產者發過來的消息需要經過交換機,交換機將決定將消息放到哪些隊列當中。
* 隊列(queue)
隊列在圖 1 中由紅色矩形陣列表示,負責保存消息和發放消息。
* 消費者(consumer)
在圖中為 C,代表等待接收消息的程序。
## 信息流
消息是怎么從生產者傳遞到消費者的呢?
首先,生產者發送消息到交換機,同時發送一個 key,通過這個 key,交換機就知道該把消息發到哪個隊列。隨后交換機把消息發送到相應的隊列中。由隊列將消息發送給消費者。消費者監聽某些隊列,當有消息過來時,就立即處理消息。
提問
交換機是如何根據 key 來分配消息到隊列?
隊列怎樣將消息發送給消費者?
### 第一個問題
RabbitMQ 的交換機有四種類型:direct, topic, headers, fanout
* fanout
fanout 交換機就跟廣播一樣,對消息不作選擇地發給所有綁定的隊列。兩個隊列都將收到消息
將所有收到的消息廣播到所有已知的隊列。
* direct

在 direct 模式里,交換機和隊列之間綁定了一個 key,只有消息的 key 與綁定了的 key 相同時,交換機才會把消息發給該隊列。消息的 key 為 orange 時,消息將進入隊列 Q1 ; key 為 black 或者 green 時,消息將進入隊列 Q2。若消息的 key 是其他字符串,被交換機直接遺棄

多重綁定
同時,交換機支持多重綁定,多個隊列可以以相同的 key 與交換機綁定。當消息的 key 為 black 時,消息將進入 Q1 和 Q2
* topic
topic 模式可以理解為主題模式,當 key 包涵某個主題時,即可進入該主題的隊列。topic 模式的 key 必須具有固定的格式:以 . 作為間隔的一串單詞;比如:quick.orange.rabbit,key 最多不能超過 255byte。
交換機和隊列的key可以以類似正則表達式的方式存在,有兩種語法:
1. "*" 可以替代一個單詞
2. "#" 可以替代 0 個或多個單詞

圖中,Q1 與交換機綁定的 kye 為:“*.orange.*”,故當消息的 key 為三個單詞,且中間的單詞為 orange 時,消息將進入 Q1。Q2 與exchange 綁定的 key 為 ”rabbit.#”,當消息的 key 以 rabbit 開頭時,消息將進入 Q2 。
* headers
根據發送的消息內容中的headers屬性進行匹配。
### 第二個問題

Round-robin Dispatching
* 循環發放(Round-robin dispatching)
隊列分發消息給消費者的方式采用循環發放。舉例來說,若隊列里有四個消息 w, x, y, z,則 C1 將得到消息 z 和 x , C2 將得到消息 y 和 w 。即每個消費者按順序每人發一個消息。
注意,在這種分配方式下,消息其實在剛進入隊列的時候就已經內定好將要被分發的消費者。即 z, x 一定是給 C1 . y, w 一定是給 C2 。
這種方式存在一些隱患,如果 z 和 x 都是耗時的命令、y , z 都是簡單的命令,C1 將不停地工作,而 C2 就比較空閑,造成資源浪費。
* 公平發放(fair dispatching)
公平發放解決了上述問題。這種方式下,隊列只會把消息給空閑的消費者。如果它看到某個消費者正忙,就查找下一個空閑消費者。
* 消息的確認(Message acknowledgment)
若沒有特別設定,消息一旦被隊列分發給消費者,就被 Rabbitmq 從內存中刪除。
在這種情況下,如果將一個正在處理消息的消費者強行關閉,那么,消息將未被完全處理,且 RabbitMQ 完全不知情。
為了解決上述問題,可以配置使得消息處理完后,向 RabbitMQ 返回一個 Acknowledgment。RabbitMQ 直到收到Acknowledgment 后,才將消息刪除。
當消費者死亡時(its channel is closed, connection is closed, or TCP connection is lost),RabbitMQ 會知道這個消費者發生問題了,將重新發送消息給空閑的消費者。
消息沒有 TimeOut,即使消費者處理很長很長時間,乃至無窮無盡,RabbmitMQ 也認為消費者正在處理。
> 其實,消息的確認是默認開啟的,不需要特地設置。
# 概念
**Queue(消息隊列)**
queue是mq內部對象,用于存儲未被customer消費的消息。相同屬性的queue可以重復定義,每個消息都會被投入到一個或多個隊列。
**Binding(綁定)**
binding是將exchange和queue按照路由規則綁定起來。可以理解為binding是exchange和queue之間的關系
**Connection(連接)**
消息tcp連接
**Channel(信道)**
每個connection里,可建立多個channel,每個channel代表一個會話任務。做到盡量共用connection
# 簡單例子
send.php:
~~~
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// 創建連接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 創建channel,多個channel可以共用連接
$channel = $connection->channel();
// 創建交換機以及隊列(如果已經存在,不需要重新再次創建并且綁定)
// 創建直連的交換機
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
// 創建隊列
$channel->queue_declare('hello', false, false, false, false);
// 交換機跟隊列的綁定,
$channel->queue_bind('hello', 'direct_logs', 'routigKey');
// 設置消息bady傳送字符串logs(消息只能為字符串,建議消息均json格式)
$msg = new AMQPMessage('logs');
// 發送數據到對應的交換機direct_logs并設置對應的routigKey
$channel->basic_publish($msg, 'direct_logs', 'routigKey');
~~~
receive.php:
~~~
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// 創建連接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 創建channel,多個channel可以共用連接
$channel = $connection->channel();
// 可能會在數據發布之前啟動消費者,所以我們要確保隊列存在,然后再嘗試從中消費消息。
// 創建直連的交換機
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
// 創建隊列
$channel->queue_declare('hello', false, false, false, false);
// 交換機跟隊列的綁定,
$channel->queue_bind('hello', 'direct_logs', 'routigKey');
// 回調函數
$callback = function ($msg) {
echo $msg->body;
};
// 啟動隊列消費者
$channel->basic_consume('hello3', '', false, true, false, false, $callback);
// 判斷是否存在回調函數
while(count($channel->callbacks)) {
// 此處為執行回調函數
$channel->wait();
}
~~~
# 安裝
先把rabbitmq安裝上
https://github.com/php-amqplib/php-amqplib
這個需要
mbstring bcmath dom curl這些插件
安裝完rabbitermq之后,我們還要下載對應php的composer包
# RabbitMQ備注
* 非持久化會導致,隊列重啟,數據丟失
* exchange持久化,在聲明durable參數時指定為true
* queue持久化,在聲明durable參數時指定true
* 消息持久化,實例化AMQPMessage類時指定delivery_mode為2
* exchange和queue是否持久化需要一致才能綁定
* 消費者設置手動ack,在聲明no_ack參數時指定false
* 隊列消息異常需要將消息刪除并再次發送同樣的消息置于末尾并手動記錄日志
# 基本概念
來了解下RabbitMQ的成員組件:
broker:消息隊列服務器的實體,是一個中間件應用,負責接收生產者的消息,然后將消息發送到消息接受者或者其他broker
exchange:消息交換機,是消息到達的第一個地方,消息通過它指定的路由規則,分發到不同的消息隊列中
queue:消息隊列,消息通過發送和路由之后最后到達的地方,到達queue的消息即進入邏輯上的等待消費狀態。每個消息都會被發送到一個或者多個隊列
binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來,也就是exchange和queue之間的虛擬連接
virtual host:虛擬主機,他是對broker的虛擬劃分,將消費者、生產者和他們依賴的AMQP相關結構進行隔離,一般都是為了安全考慮,比如我們在一個broker中設置多個虛擬主機,對不同用戶進行權限的分離
routing key:路由關鍵字,exchange根據這個關鍵字進行消息的投遞
connection:連接,代表生產者、消費者、broker之間進行通信的物理網絡
channel:消息通道,用于連接生產者和消費者的邏輯結構。在客戶端的每個鏈接里,可可建立多個channel,每個channel代表一個會話任務,通過channel可以隔離連接中的不同交互內容
看下客戶端使用rabbitmq的基本流程:
1.消費者連接到消息隊列服務器,打開一個channel
2.消費者聲明一個exchange,并設置相關屬性
3.消費者聲明一個queue,并設置相關屬性
4.消費者使用routing key,在exchange和queue之間建立綁定
5.生產者投遞消息到exchange
6.exchange接收到消息后,根據消息的key和已經設置好的binding,進行消息路由,將消息投遞到一個或者多個queue里
# 命令行使用
查看rabbitmq的安裝目錄:whereis rabbitmq
cd到rabbitmq根目錄下的sbin文件夾下
后臺開啟rabbitmq:./rabbitmq-server start -detached
關閉:./rabbitmqctl stop
查看交換機、綁定、隊列:
~~~
./rabbitmqctl list_exchanges
./rabbitmqctl list_bindings
./rabbitmqctl list_queues
~~~
用戶管理:(以下操作生效需要重啟rabbitmq)
新建用戶:./rabbitmqctl add_user username password
刪除用戶:./rabbitmqctl delete_user username
改密碼:./rabbimqctl change_password username newpassword
設置用戶角色:./rabbitmqctl set_user_tags username tag
(Tag可以為 administrator,monitoring, management)
開啟web管理界面:./rabbitmq-plugins enable rabbitmq_management
(在瀏覽器地址欄輸入:http://127.0.0.1:15672進入)
# 介紹
RabbitMQ其他小介紹
RabbitMQ端口問題,RabbitMQ是默認霸占了5672,15672,25672這三個端口的
5672端口是用于AMQP協議連接
15672端口是用于http協議連接(不信可以試試web訪問5672看行不行)
RabbitMQ數據持久化
RabbitMQ有三種可設置的持久化,分別為Exchange(交換機)持久化,Queue(隊列)持久化,信息持久化,如果設置了交換機和隊列持久化,路由也會自動的持久化
# 參考
文檔http://previous.rabbitmq.com/v3_5_7/getstarted.html
http://www.cnblogs.com/bluebirds/p/6069623.html
https://www.cnblogs.com/chunguang/p/5634342.html
http://blog.csdn.net/sinat_21125451/article/details/53422648
https://www.phpxy.com/article/115.html
重點
http://blog.csdn.net/demon3182/article/details/77335206 (**超級重點,里面還有laragon**)
http://blog.csdn.net/super_rd/article/details/70241007?utm_source=itdadao&utm_medium=referral
http://rabbitmq.org.cn/ (中文)
http://rabbitmq.mr-ping.com (python版)
laragon
http://blog.csdn.net/demon3182/article/details/76423340
http://blog.csdn.net/demon3182/article/details/76528612
- OAuth
- 簡介
- 步驟
- 單點登錄
- .user.ini
- 時間轉換為今天昨天前天幾天前
- 獲取ip接口
- 協程
- 概念
- yield-from && return-values
- 協程與阻塞的思考
- 中間件
- mysqli異步與php的協程
- 代碼片段
- pdo 執行的sql語句
- 二進制安全
- 捕捉異常中斷
- global
- 利用cookie模擬登陸
- 解析非正常json
- 簡單的對稱加密算法
- RSA 加密
- 過濾掉emoji表情
- 判斷遠程圖片是否存在
- 一分鐘限制請求100次
- 文件處理
- 多文件上傳
- 顯示所有文件
- 文件下載和上面顯示所有文件配合
- 文件的刪除,統計,存數組等
- 圖片處理
- 簡介
- 驗證碼
- 圖片等比縮放
- 批量添加水印
- beanstalkd
- 安裝
- 使用
- RabbitMQ
- 簡介
- debain安裝
- centos安裝
- 常用方法
- 入門
- 工作隊列
- 訂閱,發布
- 路由
- 主題
- 遠程調用RPC
- 消息中間件的選型
- .htaccess
- isset、empty、if區別以及0、‘’、null
- php各版本
- php7.2 不向后兼容的改動
- php中的各種坑
- php7改變
- php慢日志
- 郵件
- PHPMailer實現發郵件
- 驗證郵件地址真實性
- 文件下載
- FastCgi 與 PHP-fpm 之間的關系
- openssl 加解密
- 反射
- 鉤子方法
- 查找插件
- opcode
- opcache使用
- opcache優化
- 分布式一致性hash算法
- 概念
- 哈希算法好壞的四個定義
- php實現
- java實現
- 數組
- jwt
- jwt簡介
- 單點登錄
- phpize
- GeoIP擴展
- php無法獲得https網頁內容的解決方案
- homestead運行的腳本
- Unicode和Utf-8轉換
- php優化
- kafka
- fpm配置
- configure配置詳解