<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                依賴 `composer require php-amqplib/php-amqplib` ``` <?php namespace app\common\tool; use Exception; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; /** * RabbitMQ 客戶端 */ class Rabbit { private static $_instance = null; # 連接資源 private $connect = null; # 通道資源 private $channel = null; # 連接配置 private $config = []; /** * 構造函數 * * @param string $connectName 連接名稱 */ private function __construct($connectName){ # 對象是否已經存儲了連接,沒有則連接并存儲 if(!isset($this->connect)){ $config = $this->config; if(empty($config)){ $config = config('queue.'.$connectName); if(!$config){ throw new Exception('config\queue.php文件中,沒有配置'.$connectName, -1); } $this->config = $config; } $this->connect = new AMQPStreamConnection($config['host'], $config['port'], $config['username'], $config['password']); $this->channel = $this->connect->channel(); } } /** * 每一個連接配置生成一個單例 * * @param string $connectName * @return void */ public static function instance($connectName = 'rabbit'){ if(is_null(self::$_instance)){ self::$_instance = new self($connectName); } return self::$_instance; } /** * 僅用來查看編輯器提示mq相關方法的參數意思 * * @return void */ private static function document(){ $conf = [ 'host'=> '192.168.7.236', 'port'=> '15672', 'username'=> 'guest', 'password'=> ':guest~', ]; # 創建連接 $connection = new AMQPStreamConnection($conf['host'], $conf['port'], $conf['username'], $conf['password']); # 創建通道 $channel = $connection->channel(); /***************** 生產者寫入隊列開始 *******************/ # 聲明交換機,聲明則需要綁定隊列;可以不聲明,則默認default交換機。 # 交換機相當于是郵箱,可以聲明多個郵箱,隊列相當于信件,可以一信多投。 # 因為可能一個隊列信息,多個子系統都需要單獨處理。有點像廣播 $exchangeName = 'default'; $channel->exchange_declare($exchangeName, 'direct', false, true, false); # 必須聲明隊列,通常一個數據隊列定義一個名詞,里面的數據結構都是一致,隊列模式必須指定持久化,否則服務器重啟,隊列會丟失 $queueName = 'queueName'; $channel->queue_declare($queueName, false, true, false, false); # 將隊列與某個交換機進行綁定,并使用路由關鍵字.個人理解路由關鍵字更像是隊列別名,不一定準確,默認空字符串則使用隊列名替代。 $routingKey = ''; $channel->queue_bind($queueName, $exchangeName, $routingKey); # 必須聲明一個消息體,且為字符串類型,具體的格式,可以對數據采用各種需要的encode,投遞模式必須指定持久化,否則服務器重啟,隊列消息會丟失 # 第二個消息狀態配置,還包含有 content_type=> text/plain【默認】 correlation_id=> 自定義唯一id【比如, uniqid('rmq', true)】correlation_id 會在消費確認里用到 $msg = new AMQPMessage($msgBody, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); # 推送入隊列 $channel->basic_publish($msg, $exchangeName, $routingKey); /***************** 生產者寫入隊列結束 *******************/ /***************** 消費者讀取隊列開始 *******************/ # 必須聲明隊列,通常一個數據隊列定義一個名詞,里面的數據結構都是一致,隊列模式必須指定持久化,否則服務器重啟,隊列會丟失 $channel->queue_declare($queueName, false, true, false, false); # 聲明消息的回調處理與哪個隊列綁定 $callback = function($msg){ # 處理隊列信息的邏輯 $queueData = $msg->body; # 業務邏輯處理 $businessDone = true; # 處理成功要手動確認下消息,告知隊列已處理完,可以清理了。如果未聲明自動確認刪除模式,也沒有其他消費者處理,則永遠獲取同一條。明顯這是不符合的 # 所以在未聲明自動確認刪除模式下,一定要手動確認,最好是有個重試邏輯,重試多少次后,將隊列消息推到另外的隊列中,當作異常保留,然后在本隊列進行確認刪除,避免阻塞 if($businessDone==true){ # 如果沒有在basic_publish的最后參數為消息指定一個唯一ID,則rabbit會默認生成一個唯一標識delivery_tag $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } }; # 設置限流, # 參數一,限制消息大小,0代表不限制 # 參數二,限制允許unack的最大消息數, # 參數三,限制對象,true對整個channel限制,false對當前消費者限制 $channel->basic_qos(0, 1, false); # 聲明消費隊列綁定的回調邏輯 $channel->basic_consume($queueName, '', true, false, false, false, $callback); # 持久檢測隊列,看是否有數據心跳,如果只是單次獲取,直接$channel->wait();即可 while($channel->is_consuming()) { # 執行消費回調,PHP應該是沒有回溯執行的,猜測wait方法內部的實現是,通過聲明的隊列參數獲取消息,然后傳遞給前面聲明的回調方法進行執行 $channel->wait(); } /***************** 消費者讀取隊列開始 *******************/ /***************** 心跳檢測開始 *******************/ $connection->checkHeartBeat(); /***************** 心跳檢測開始 *******************/ } public function checkHeartBeat(){ try{ $this->connect->checkHeartBeat(); }catch(Exception $e){ $this->connect(); } } /** * 隊列設置:有配置讀配置,無配置讀設定 * * @param [type] $queueName * @param [type] $routingKey * @param [type] $exchangeName * @return void */ protected function queueSet($queueName, $routingKey, $exchangeName){ $config = $this->config; if(isset($config['queue'][$queueName])){ $queueConf = $config['queue'][$queueName]; if(isset($queueConf['exchangeName'])){ $exchangeName = $queueConf['exchangeName']; } if(isset($queueConf['routingKey'])){ $routingKey = $queueConf['routingKey']; } } # 聲明交換機屬性,持久化 $this->channel->exchange_declare($exchangeName, 'direct', false, true, false); # 聲明隊列屬性,持久化 $this->channel->queue_declare($queueName, false, true, false, false); # 綁定交換機,隊列,以及路由鍵 $this->channel->queue_bind($queueName, $exchangeName, $routingKey); } /** * 生產隊列復雜版:相同exchangeName和routingKey綁定的任何隊列之一投遞任務,會導致所有的綁定的隊列都被投遞 * * @param string $msgBody 隊列消息,我們要傳輸的數據 * @param string $queueName 隊列名稱 * @param string $routingKey 路由關鍵字,不指定該參數,會往所有的相同的exchangeName里的所有隊列都投遞隊列消息。適用于一條數據走多個任務 * @param string $exchangeName 交換機 【傳任務所在的平臺,理解為命名空間也可】 * @return void */ public function produce($msgBody='', $queueName='default', $routingKey='', $exchangeName='default'){ // $this->queueSet($queueName, $routingKey, $exchangeName); # 聲明一個消息體屬性,持久化 $msg = new AMQPMessage($msgBody, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); # 推送入隊列 $this->channel->basic_publish($msg); # 記得在調用層,顯示調用【 $this->close() 】關閉連接,因為可能涉及循環推送隊列,反復連接和關閉浪費資源,所以這里不主動關閉 # $this->close(); return true; } /** * 簡化版,僅使用默認的交換機投遞隊列 * * @param string $msgBody * @param string $queueName * @return void */ public function easyProduce($msgBody='', $queueName='default'){ # 聲明隊列屬性,持久化 $this->channel->queue_declare($queueName, false, true, false, false); # 定義一個消息,格式化 $msg = new AMQPMessage($msgBody, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); # 這里測試publish的返回是null 后面需要考慮加確認機制 目前是穩定可以推送 $pulishResult = $this->channel->basic_publish($msg, '', $queueName); # 記得在調用層,顯示調用【 $this->close() 】關閉連接,因為可能涉及循環推送隊列,反復連接和關閉浪費資源,所以這里不主動關閉 # $this->close(); return true; } /** * 取出指定隊列的指定條數消息 * * @param string $queueName 隊列名詞 * @param integer $msgCount 消息條數 * @param bool $close 獲取數據后是否關閉連接 * @return void */ public function consumeData($queueName='default', $msgCount=5, $close=false, $routingKey='', $exchangeName='default'){ # 隊列設置 $this->queueSet($queueName, $routingKey, $exchangeName); $data = []; $callback = function($msg) use (&$data) { $rawData = $msg->body; $data[] = [ 'delivery_tag'=> $msg->delivery_info['delivery_tag'], 'rawData'=> $rawData ]; # 手動確認刪除 // $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; # 限流 $this->channel->basic_qos(0, $msgCount, false); # 定義消費隊列,并指定回調處理 $this->channel->basic_consume($queueName, '', true, false, false, false, $callback); # 觸發回調 for($i=0; $i<$msgCount; $i++){ $this->channel->wait(); } if($close){ $this->close(); } return $data; } /** * 手動確認刪除指定的消息 * * @param string $delivery_tag 隊列消息的唯一標識 * @return void */ public function ack($delivery_tag){ $this->channel->basic_ack($delivery_tag); } /** * 消費隊列 * * @param [function] $callback 函數,只有一個參數$msg,就是消息體.也可以寫匿名函數,函數體內是消費者實現。如果消費任務完成,需要在函數體內調用: * $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 用以通知MQ已完成消費,則隊列消息會被刪除 * * @param string $queueName 隊列名稱 * @param string $connectName 連接名稱 * @param integer $unackLimit 限流:最多允許unack的消息數量,達到上限則隊列不再繼續獲取消息處理,默認0不限制 * @param bool $a_global 限流:是對整個channel影響,還是只影響當前消費者 * @return void */ public function consume($callback, $queueName='default', $unackLimit=0, $a_global=false, $routingKey='', $exchangeName='default'){ # 隊列設置 $this->queueSet($queueName, $routingKey, $exchangeName); # 限流 if($unackLimit > 0){ $this->channel->basic_qos(0, $unackLimit, $a_global); } //在接收消息的時候調用$callback函數 $this->channel->basic_consume($queueName, '', true, false, false, false, $callback); while($this->channel->is_consuming()) { $this->channel->wait(); } } /** * 簡化版消費隊列,使用默認的交換機 * * @param [function] $callback 函數,只有一個參數$msg,就是消息體.也可以寫匿名函數,函數體內是消費者實現。如果消費任務完成,需要在函數體內調用: * $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 用以通知MQ已完成消費,則隊列消息會被刪除 * * @param string $queueName 隊列名稱 * @param string $connectName 連接名稱 * @param integer $unackLimit 限流:最多允許unack的消息數量,達到上限則隊列不再繼續獲取消息處理,默認0不限制 * @param bool $a_global 限流:是對整個channel影響,還是只影響當前消費者 * @return void */ public function easyConsume($callback, $queueName='default', $unackLimit=0, $a_global=false){ # 聲明隊列屬性,持久化 $this->channel->queue_declare($queueName, false, true, false, false); # 限流 if($unackLimit > 0){ $this->channel->basic_qos(0, $unackLimit, $a_global); } //在接收消息的時候調用$callback函數 $this->channel->basic_consume($queueName, '', true, false, false, false, $callback); while($this->channel->is_consuming()) { $this->channel->wait(); } } /** * 消費隊列 * * @param [function] $callback 函數,只有一個參數$msg,就是消息體.也可以寫匿名函數,函數體內是消費者實現。如果消費任務完成,需要在函數體內調用: * $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 用以通知MQ已完成消費,則隊列消息會被刪除 * * @param string $queueName 隊列名稱 * @return void */ public function consumeOne($callback, $queueName='default', $routingKey='', $exchangeName='default'){ $this->queueSet($queueName, $routingKey, $exchangeName); //在接收消息的時候調用$callback函數 $this->channel->basic_consume($queueName, '', true, false, false, false, $callback); $this->channel->wait(); } /** * 關閉通道 * * @return void */ private function closeChannel(){ $this->channel->close(); $this->channel = null; } /** * 關閉連接 * * @return void */ private function closeConnect(){ $this->connect->close(); $this->connect = null; } /** * 關閉所有的連接和通道 * * @return void */ public function close(){ $this->closeChannel(); $this->closeConnect(); } /** * 連接rabbit * * @return void */ public function connect(){ if(empty($this->config)){ throw new Exception("配置信息丟失", -1); return false; } $config = $this->config; if(is_null($this->connect)){ $this->connect = new AMQPStreamConnection($config['host'], $config['port'], $config['username'], $config['password']); } if(is_null($this->channel)){ $this->channel = $this->connect->channel(); } return $this; } /* # produce demo $queueName = 'test'; $msg = json_encode(['queueName'=> $queueName]); $rabbit = \App\Http\Controllers\Common\RabbitMQ::instance('rabbit'); $result = $rabbit->easyProduce($msg, $queueName); $rabbit->close(); */ /* # consumer demo $rabbit = \app\common\tool\Rabbit::instance('rabbit'); $rabbit->consumeDemo($queueName); $rabbit->consume(function($msg){ $info = " [x] Received ". $msg->body. "\n"; echo $info; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }, $queueName); $rabbit->close(); */ } ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看