依賴
`composer require php-amqplib/php-amqplib`
```
<?php
namespace app\common\tool;
use Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
/**
* RabbitMQ 客戶端
*/
class Rabbit {
private static $_instance = null;
# 連接資源
private $connect = null;
# 通道資源
private $channel = null;
# 連接配置
private $config = [];
/**
* 構造函數
*
* @param string $connectName 連接名稱
*/
private function __construct($connectName){
# 對象是否已經存儲了連接,沒有則連接并存儲
if(!isset($this->connect)){
$config = $this->config;
if(empty($config)){
$config = config('queue.'.$connectName);
if(!$config){
throw new Exception('config\queue.php文件中,沒有配置'.$connectName, -1);
}
$this->config = $config;
}
$this->connect = new AMQPStreamConnection($config['host'], $config['port'], $config['username'], $config['password']);
$this->channel = $this->connect->channel();
}
}
/**
* 每一個連接配置生成一個單例
*
* @param string $connectName
* @return void
*/
public static function instance($connectName = 'rabbit'){
if(is_null(self::$_instance)){
self::$_instance = new self($connectName);
}
return self::$_instance;
}
/**
* 僅用來查看編輯器提示mq相關方法的參數意思
*
* @return void
*/
private static function document(){
$conf = [
'host'=> '192.168.7.236',
'port'=> '15672',
'username'=> 'guest',
'password'=> ':guest~',
];
# 創建連接
$connection = new AMQPStreamConnection($conf['host'], $conf['port'], $conf['username'], $conf['password']);
# 創建通道
$channel = $connection->channel();
/***************** 生產者寫入隊列開始 *******************/
# 聲明交換機,聲明則需要綁定隊列;可以不聲明,則默認default交換機。
# 交換機相當于是郵箱,可以聲明多個郵箱,隊列相當于信件,可以一信多投。
# 因為可能一個隊列信息,多個子系統都需要單獨處理。有點像廣播
$exchangeName = 'default';
$channel->exchange_declare($exchangeName, 'direct', false, true, false);
# 必須聲明隊列,通常一個數據隊列定義一個名詞,里面的數據結構都是一致,隊列模式必須指定持久化,否則服務器重啟,隊列會丟失
$queueName = 'queueName';
$channel->queue_declare($queueName, false, true, false, false);
# 將隊列與某個交換機進行綁定,并使用路由關鍵字.個人理解路由關鍵字更像是隊列別名,不一定準確,默認空字符串則使用隊列名替代。
$routingKey = '';
$channel->queue_bind($queueName, $exchangeName, $routingKey);
# 必須聲明一個消息體,且為字符串類型,具體的格式,可以對數據采用各種需要的encode,投遞模式必須指定持久化,否則服務器重啟,隊列消息會丟失
# 第二個消息狀態配置,還包含有 content_type=> text/plain【默認】 correlation_id=> 自定義唯一id【比如, uniqid('rmq', true)】correlation_id 會在消費確認里用到
$msg = new AMQPMessage($msgBody, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
# 推送入隊列
$channel->basic_publish($msg, $exchangeName, $routingKey);
/***************** 生產者寫入隊列結束 *******************/
/***************** 消費者讀取隊列開始 *******************/
# 必須聲明隊列,通常一個數據隊列定義一個名詞,里面的數據結構都是一致,隊列模式必須指定持久化,否則服務器重啟,隊列會丟失
$channel->queue_declare($queueName, false, true, false, false);
# 聲明消息的回調處理與哪個隊列綁定
$callback = function($msg){
# 處理隊列信息的邏輯
$queueData = $msg->body;
# 業務邏輯處理
$businessDone = true;
# 處理成功要手動確認下消息,告知隊列已處理完,可以清理了。如果未聲明自動確認刪除模式,也沒有其他消費者處理,則永遠獲取同一條。明顯這是不符合的
# 所以在未聲明自動確認刪除模式下,一定要手動確認,最好是有個重試邏輯,重試多少次后,將隊列消息推到另外的隊列中,當作異常保留,然后在本隊列進行確認刪除,避免阻塞
if($businessDone==true){
# 如果沒有在basic_publish的最后參數為消息指定一個唯一ID,則rabbit會默認生成一個唯一標識delivery_tag
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
};
# 設置限流,
# 參數一,限制消息大小,0代表不限制
# 參數二,限制允許unack的最大消息數,
# 參數三,限制對象,true對整個channel限制,false對當前消費者限制
$channel->basic_qos(0, 1, false);
# 聲明消費隊列綁定的回調邏輯
$channel->basic_consume($queueName, '', true, false, false, false, $callback);
# 持久檢測隊列,看是否有數據心跳,如果只是單次獲取,直接$channel->wait();即可
while($channel->is_consuming()) {
# 執行消費回調,PHP應該是沒有回溯執行的,猜測wait方法內部的實現是,通過聲明的隊列參數獲取消息,然后傳遞給前面聲明的回調方法進行執行
$channel->wait();
}
/***************** 消費者讀取隊列開始 *******************/
/***************** 心跳檢測開始 *******************/
$connection->checkHeartBeat();
/***************** 心跳檢測開始 *******************/
}
public function checkHeartBeat(){
try{
$this->connect->checkHeartBeat();
}catch(Exception $e){
$this->connect();
}
}
/**
* 隊列設置:有配置讀配置,無配置讀設定
*
* @param [type] $queueName
* @param [type] $routingKey
* @param [type] $exchangeName
* @return void
*/
protected function queueSet($queueName, $routingKey, $exchangeName){
$config = $this->config;
if(isset($config['queue'][$queueName])){
$queueConf = $config['queue'][$queueName];
if(isset($queueConf['exchangeName'])){
$exchangeName = $queueConf['exchangeName'];
}
if(isset($queueConf['routingKey'])){
$routingKey = $queueConf['routingKey'];
}
}
# 聲明交換機屬性,持久化
$this->channel->exchange_declare($exchangeName, 'direct', false, true, false);
# 聲明隊列屬性,持久化
$this->channel->queue_declare($queueName, false, true, false, false);
# 綁定交換機,隊列,以及路由鍵
$this->channel->queue_bind($queueName, $exchangeName, $routingKey);
}
/**
* 生產隊列復雜版:相同exchangeName和routingKey綁定的任何隊列之一投遞任務,會導致所有的綁定的隊列都被投遞
*
* @param string $msgBody 隊列消息,我們要傳輸的數據
* @param string $queueName 隊列名稱
* @param string $routingKey 路由關鍵字,不指定該參數,會往所有的相同的exchangeName里的所有隊列都投遞隊列消息。適用于一條數據走多個任務
* @param string $exchangeName 交換機 【傳任務所在的平臺,理解為命名空間也可】
* @return void
*/
public function produce($msgBody='', $queueName='default', $routingKey='', $exchangeName='default'){
// $this->queueSet($queueName, $routingKey, $exchangeName);
# 聲明一個消息體屬性,持久化
$msg = new AMQPMessage($msgBody, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
# 推送入隊列
$this->channel->basic_publish($msg);
# 記得在調用層,顯示調用【 $this->close() 】關閉連接,因為可能涉及循環推送隊列,反復連接和關閉浪費資源,所以這里不主動關閉
# $this->close();
return true;
}
/**
* 簡化版,僅使用默認的交換機投遞隊列
*
* @param string $msgBody
* @param string $queueName
* @return void
*/
public function easyProduce($msgBody='', $queueName='default'){
# 聲明隊列屬性,持久化
$this->channel->queue_declare($queueName, false, true, false, false);
# 定義一個消息,格式化
$msg = new AMQPMessage($msgBody, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
# 這里測試publish的返回是null 后面需要考慮加確認機制 目前是穩定可以推送
$pulishResult = $this->channel->basic_publish($msg, '', $queueName);
# 記得在調用層,顯示調用【 $this->close() 】關閉連接,因為可能涉及循環推送隊列,反復連接和關閉浪費資源,所以這里不主動關閉
# $this->close();
return true;
}
/**
* 取出指定隊列的指定條數消息
*
* @param string $queueName 隊列名詞
* @param integer $msgCount 消息條數
* @param bool $close 獲取數據后是否關閉連接
* @return void
*/
public function consumeData($queueName='default', $msgCount=5, $close=false, $routingKey='', $exchangeName='default'){
# 隊列設置
$this->queueSet($queueName, $routingKey, $exchangeName);
$data = [];
$callback = function($msg) use (&$data) {
$rawData = $msg->body;
$data[] = [
'delivery_tag'=> $msg->delivery_info['delivery_tag'],
'rawData'=> $rawData
];
# 手動確認刪除
// $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
# 限流
$this->channel->basic_qos(0, $msgCount, false);
# 定義消費隊列,并指定回調處理
$this->channel->basic_consume($queueName, '', true, false, false, false, $callback);
# 觸發回調
for($i=0; $i<$msgCount; $i++){
$this->channel->wait();
}
if($close){
$this->close();
}
return $data;
}
/**
* 手動確認刪除指定的消息
*
* @param string $delivery_tag 隊列消息的唯一標識
* @return void
*/
public function ack($delivery_tag){
$this->channel->basic_ack($delivery_tag);
}
/**
* 消費隊列
*
* @param [function] $callback 函數,只有一個參數$msg,就是消息體.也可以寫匿名函數,函數體內是消費者實現。如果消費任務完成,需要在函數體內調用:
* $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 用以通知MQ已完成消費,則隊列消息會被刪除
*
* @param string $queueName 隊列名稱
* @param string $connectName 連接名稱
* @param integer $unackLimit 限流:最多允許unack的消息數量,達到上限則隊列不再繼續獲取消息處理,默認0不限制
* @param bool $a_global 限流:是對整個channel影響,還是只影響當前消費者
* @return void
*/
public function consume($callback, $queueName='default', $unackLimit=0, $a_global=false, $routingKey='', $exchangeName='default'){
# 隊列設置
$this->queueSet($queueName, $routingKey, $exchangeName);
# 限流
if($unackLimit > 0){
$this->channel->basic_qos(0, $unackLimit, $a_global);
}
//在接收消息的時候調用$callback函數
$this->channel->basic_consume($queueName, '', true, false, false, false, $callback);
while($this->channel->is_consuming()) {
$this->channel->wait();
}
}
/**
* 簡化版消費隊列,使用默認的交換機
*
* @param [function] $callback 函數,只有一個參數$msg,就是消息體.也可以寫匿名函數,函數體內是消費者實現。如果消費任務完成,需要在函數體內調用:
* $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 用以通知MQ已完成消費,則隊列消息會被刪除
*
* @param string $queueName 隊列名稱
* @param string $connectName 連接名稱
* @param integer $unackLimit 限流:最多允許unack的消息數量,達到上限則隊列不再繼續獲取消息處理,默認0不限制
* @param bool $a_global 限流:是對整個channel影響,還是只影響當前消費者
* @return void
*/
public function easyConsume($callback, $queueName='default', $unackLimit=0, $a_global=false){
# 聲明隊列屬性,持久化
$this->channel->queue_declare($queueName, false, true, false, false);
# 限流
if($unackLimit > 0){
$this->channel->basic_qos(0, $unackLimit, $a_global);
}
//在接收消息的時候調用$callback函數
$this->channel->basic_consume($queueName, '', true, false, false, false, $callback);
while($this->channel->is_consuming()) {
$this->channel->wait();
}
}
/**
* 消費隊列
*
* @param [function] $callback 函數,只有一個參數$msg,就是消息體.也可以寫匿名函數,函數體內是消費者實現。如果消費任務完成,需要在函數體內調用:
* $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 用以通知MQ已完成消費,則隊列消息會被刪除
*
* @param string $queueName 隊列名稱
* @return void
*/
public function consumeOne($callback, $queueName='default', $routingKey='', $exchangeName='default'){
$this->queueSet($queueName, $routingKey, $exchangeName);
//在接收消息的時候調用$callback函數
$this->channel->basic_consume($queueName, '', true, false, false, false, $callback);
$this->channel->wait();
}
/**
* 關閉通道
*
* @return void
*/
private function closeChannel(){
$this->channel->close();
$this->channel = null;
}
/**
* 關閉連接
*
* @return void
*/
private function closeConnect(){
$this->connect->close();
$this->connect = null;
}
/**
* 關閉所有的連接和通道
*
* @return void
*/
public function close(){
$this->closeChannel();
$this->closeConnect();
}
/**
* 連接rabbit
*
* @return void
*/
public function connect(){
if(empty($this->config)){
throw new Exception("配置信息丟失", -1);
return false;
}
$config = $this->config;
if(is_null($this->connect)){
$this->connect = new AMQPStreamConnection($config['host'], $config['port'], $config['username'], $config['password']);
}
if(is_null($this->channel)){
$this->channel = $this->connect->channel();
}
return $this;
}
/*
# produce demo
$queueName = 'test';
$msg = json_encode(['queueName'=> $queueName]);
$rabbit = \App\Http\Controllers\Common\RabbitMQ::instance('rabbit');
$result = $rabbit->easyProduce($msg, $queueName);
$rabbit->close();
*/
/*
# consumer demo
$rabbit = \app\common\tool\Rabbit::instance('rabbit');
$rabbit->consumeDemo($queueName);
$rabbit->consume(function($msg){
$info = " [x] Received ". $msg->body. "\n";
echo $info;
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}, $queueName);
$rabbit->close();
*/
}
```
- 常見功能
- 第三方授權登錄
- 郵件發送
- 簡易聊天室
- 獲取各國匯率
- PHP獲取服務器硬件指標
- 數據上報之
- web開發
- 開發規范
- 前端
- 踩坑
- 將footer固定在底部
- bootstrap
- Metronic
- 用到的jquery插件
- bootstrap-hover-dropdown
- jquery.slimscroll
- jquery.blockui
- bootstrap-switch
- js.cookie
- moment
- bootstrap-daterangepicker
- morris
- raphael
- jquery.waypoints
- jquery.counterup
- select2
- 取值和設置默認值
- vue
- axios
- 瀏覽器
- 谷歌瀏覽器
- 谷歌插件
- layui
- layui-表格
- layui-表單
- layui-彈窗
- layui-分頁
- 后端
- 操作系統
- linux
- 用戶管理
- 文件管理
- 目錄管理
- 壓縮和解壓縮
- 進程查看
- 端口查看
- 開機自啟動服務
- 定時任務
- shell腳本
- 殺掉運行超過指定時長指定服務的進程
- 獲取服務器使用狀態
- bash-shell連接socket
- 自定義快捷命令
- centos-踩坑
- 防火墻
- 軟件
- yum
- vim
- screen
- window
- 語言
- PHP
- 配置優化
- 框架
- thinkphp5.1+
- think命令行
- laravel6.+
- 維護模式
- 根據環境讀取不同配置
- laravel6.+采坑
- laravel坑位
- 數據庫事務
- 任務調度
- 文件權限問題
- 增強框架
- larvel:elastic-search
- 圖形驗證碼
- laravel獲取ip
- 函數
- strtotime
- 正則匹配
- 類
- 接口類與抽象類
- 類相關的關鍵字 - abstract
- 類相關的關鍵字 - interface
- PHP有關類的調用方式"->"與"::"的區別
- 擴展
- 問題歸納
- json_encode和json_decode
- 字符串的運算
- curl
- 優化php效率
- 數組相加合并與array_merge
- 時區轉換
- 不常用特性
- php反射
- 包管理器-composer
- GuzzleHttp
- Python
- Go
- 數據庫
- Redis
- 安裝
- 本地化-數據備份
- php-redis操作
- Mysql
- mysql-命令集合
- 設置終端可訪問
- 數據庫設計
- 用戶基礎信息表
- 踩坑集合
- mysql-2002
- mysql-2054
- 優化策略
- mysql-密碼驗證插件
- 一些牛逼的sql查詢
- topN
- 無限級分類
- Memcache
- MongoDb
- 安裝mongo-server
- 安裝php-mongodb擴展
- 在laravel中使用mongoDB
- 客戶端軟件
- Hbase
- Elasticsearch
- elastic-search
- restfulApi操作es
- web服務器
- 1.nginx
- 配置語法規則
- 配置詳解
- rewrite規則
- request_filename
- 2.apache
- 功能設計
- 加密解密
- Base64
- 對亞馬遜SKU加密
- 兼職項目中的加解密
- 騰訊外包時的加密
- 接口設計
- 接口限流設計
- 分庫分表
- 遍歷展示文件目錄結構
- 時區換算
- 文件切割
- 解析xml字符串
- 項目
- 博客后臺管理
- 亞馬遜廣告API
- 官方指引文檔
- 開發人員中心
- 應用商店
- 第三方庫
- 申請API郵件記錄
- 亞馬遜MWS
- 付款報告
- 亂碼
- 亞馬遜管理庫存報告
- 報告
- 商品
- 入庫
- 履行
- 出庫
- 財務
- 訂單
- 異步任務處理
- 集群如何同步代碼
- 基本開發流程
- 文檔管理
- showdoc
- 運行環境
- 開發環境
- vagrant
- windows上配置安裝
- vagrant安裝插件緩慢
- 更換ssh默認端口映射
- 設置x-shell密碼登錄
- 使用市場的box-homestead
- homestead-7: Box 'lc/homestead'
- 常見問題
- 虛擬環境reboot
- 突然無法使用
- phpStudy
- wamp
- 壓測性能
- VPN
- vultr
- 凌空圖床
- 寶塔
- 自動化部署
- 版本管理軟件鉤子
- 線上環境-LNMP
- centos7
- nginx
- mysql
- mysql開機自啟
- mysql-更換默認端口
- datetime字段類型默認值
- php
- php擴展安裝
- redis
- swoole
- gd
- BCMath
- igbinary
- zstd
- 包管理器:composer
- 優化性能
- nodejs
- 更新gcc版本
- 版本控制
- git
- 常用命令
- gitlab
- 版本管理規范
- 使用阿里云創建遠程倉庫
- git自動化部署
- svn
- 忽略指定文件
- 拉取代碼
- 自動化運維
- jekins
- 容器
- 集群
- 架構設計
- 設計原則
- 閱讀參考
- 代碼規劃
- 架構實戰
- 服務治理
- 權限控制設計
- 具體設計
- 計劃
- 疑問知識點
- 讀書筆記
- 高性能Mysql
- TCP-IP詳解-卷一:協議
- 思考
- php如何實現并發執行
- 對接調用設計
- 如何在瀏覽器上實現插件
- 如何設計一個app結合業務告警
- mysql的where查詢沒有用到索引
- 為啥in查詢比循環嵌套sql的查詢還要慢
- 使用git來創建屬于自己的composer包
- 翻頁獲取數據的時候又新增了數據
- 安全思路
- 月報
- PHP ?? 和 ?: 的區別
- PHP異步執行
- redis集群的目標是什么
- 大文件數據處理
- 性能瓶頸分析
- 命令行里輸出帶顏色的字體
- 面試問題合集
- 基礎
- 安全
- 算法
- 冒泡排序
- 快速排序
- 二分法查詢數組指定成員
- 字符查找匹配
- 令牌桶
- 漏桶
- 計數器
- 代理
- 協議
- http
- 狀態碼
- tcp
- udp
- Oauth2.0
- 設計模式
- 單例模式
- 適配器模式
- 工廠模式
- 觀察者模式
- 流程化
- 地址欄輸入網址到返回網頁的流程
- 題目收集
- 工具
- rabbitMq
- rabbitMQ用戶管理
- 生產者
- 消費者
- 支持TP5.*的think-queue
- 消息丟失
- 消費者報錯
- rabbitMQ配置優化
- 磁盤滿載導致服務掛掉
- PHP類庫
- rabbitMQ踩坑
- navicat
- vscode
- phpstorm
- 激活碼
- markdown
- PHP自定義類庫
- 工具類
- 領導力
- 任務分配
- 代碼組織
- 不要重復
- 避免污染
- 接口定義規范
- 小業務需求
- 獲取充值面額組成
- 監控服務器CPU和內存
- shell腳本版本