<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                >[info] fanout 訂閱/廣播模式 交換器會把所有發送到該交換器的消息路由到所有與該交換器綁定的消息隊列中。**訂閱模式與Binding Key和Routing Key無關**,交換器將接受到的消息分發給有綁定關系的所有消息隊列隊列(不論Binding Key和Routing Key是什么)。類似于子網**廣播**,子網內的每臺主機都獲得了一份復制的消息。**Fanout交換機轉發消息是最快的**。 ![](https://img.kancloud.cn/b7/77/b77788acbd5dfde4bd4dae602c75828f_1032x531.png) ![](https://img.kancloud.cn/21/2e/212e7e9cda5e69c89513eb64a24fb109_1014x413.png) 1. `rabbitmq.php` 配置文件信息 ~~~ <?php // rabbitmq 配置信息 return [ # 連接信息 'amqp' => [ 'host' => '127.0.0.1', 'port'=>'5672', 'user'=>'guest', 'password'=>'guest', 'vhost'=>'/' ], # 扇出隊列 'fanout_queue' => [ 'exchange_name' => 'fanout_exchange', 'exchange_type'=>'fanout',# 訂閱/廣播模式 'queue_name' => 'fanout_queue', 'route_key' => 'fanout_route_key', 'consumer_tag' => 'fanout' ], # 商品隊列 'goods_queue' => [ 'exchange_name' => 'goods_fanout_exchange', 'exchange_type'=>'fanout',# 訂閱/廣播模式 'queue_name' => 'goods_queue', 'route_key' => 'goods_route_key', 'consumer_tag' => 'goods' ], # 訂單隊列 'order_queue' => [ 'exchange_name' => 'order_fanout_exchange', 'exchange_type'=>'fanout',# 訂閱/廣播模式 'queue_name' => 'order_queue', 'route_key' => 'order_route_key', 'consumer_tag' => 'order' ], ]; ~~~ 2. `Consumer` 消費者 ~~~ <?php declare (strict_types = 1); namespace app\common\service\rabbitmq; //use app\common\model\test\MessageModel; use PhpAmqpLib\Connection\AMQPStreamConnection; use think\facade\Log; /** * 消費者 */ class Consumer { /** * 商品消費者 * @return \think\Response */ public function goods() { $mqConfig = config('rabbitmq.amqp'); $goodsConfig = config('rabbitmq.goods_queue'); // 創建連接 $connection = new AMQPStreamConnection( $mqConfig['host'], $mqConfig['port'], $mqConfig['user'], $mqConfig['password'], $mqConfig['vhost'] ); // 連接信道 $channel = $connection->channel(); // 設置消費者(Consumer)客戶端同時只處理一條隊列 // 這樣是告訴RabbitMQ,再同一時刻,不要發送超過1條消息給一個消費者(Consumer), // 直到它已經處理了上一條消息并且作出了響應。這樣,RabbitMQ就會把消息分發給下一個空閑的消費者(Consumer)。 // 消費者端要把自動確認autoAck設置為false,basic_qos才有效。 // 流量控制 $channel->basic_qos(0, 1, false); // 同樣是創建路由和隊列,以及綁定路由隊列,注意要跟producer(生產者)的一致 // 這里其實可以不用設置,但是為了防止隊列沒有被創建所以做的容錯處理 // 創建交換機(Exchange) $channel->exchange_declare($goodsConfig['exchange_name'], $goodsConfig['exchange_type'], false, true, false); // 創建隊列 $channel->queue_declare($goodsConfig['queue_name'], false, true, false, false); // 綁定隊列和交換機 $channel->queue_bind($goodsConfig['queue_name'], $goodsConfig['exchange_name']); /** * 消費消息 * * queue: queue_name 被消費的隊列名稱 * consumer_tag: consumer_tag 消費者客戶端身份標識,用于區分多個客戶端 * no_local: false 這個功能屬于AMQP的標準,但是RabbitMQ并沒有做實現 * no_ack: true 收到消息后,是否不需要回復確認即被認為被消費 * exclusive: false 是否排他,即這個隊列只能由一個消費者消費。適用于任務不允許進行并發處理的情況下 * nowait: false 不返回執行結果,但是如果排他開啟的話,則必須需要等待結果的,如果兩個一起開就會報錯 * callback: $callback 回調邏輯處理函數 */ $channel->basic_consume($goodsConfig['queue_name'], $goodsConfig['consumer_tag'], false, false, false, false, array($this, 'process_message')); // 退出,執行shutdown來關閉通道與連接 register_shutdown_function(array($this, 'shutdown'), $channel, $connection); // 阻塞隊列監聽事件 while (count($channel->callbacks)) { $channel->wait(); } } /** * 訂單消費者 * @return \think\Response */ public function order() { $mqConfig = config('rabbitmq.amqp'); $orderConfig = config('rabbitmq.order_queue'); // 創建連接 $connection = new AMQPStreamConnection( $mqConfig['host'], $mqConfig['port'], $mqConfig['user'], $mqConfig['password'], $mqConfig['vhost'] ); // 連接信道 $channel = $connection->channel(); // 設置消費者(Consumer)客戶端同時只處理一條隊列 // 這樣是告訴RabbitMQ,再同一時刻,不要發送超過1條消息給一個消費者(Consumer), // 直到它已經處理了上一條消息并且作出了響應。這樣,RabbitMQ就會把消息分發給下一個空閑的消費者(Consumer)。 // 消費者端要把自動確認autoAck設置為false,basic_qos才有效。 // 流量控制 $channel->basic_qos(0, 1, false); // 同樣是創建路由和隊列,以及綁定路由隊列,注意要跟producer(生產者)的一致 // 這里其實可以不用設置,但是為了防止隊列沒有被創建所以做的容錯處理 // 創建交換機(Exchange) $channel->exchange_declare($orderConfig['exchange_name'], $orderConfig['exchange_type'], false, true, false); // 創建隊列 $channel->queue_declare($orderConfig['queue_name'], false, true, false, false); // 綁定隊列和交換機 $channel->queue_bind($orderConfig['queue_name'], $orderConfig['exchange_name']); /** * 消費消息 * * queue: queue_name 被消費的隊列名稱 * consumer_tag: consumer_tag 消費者客戶端身份標識,用于區分多個客戶端 * no_local: false 這個功能屬于AMQP的標準,但是RabbitMQ并沒有做實現 * no_ack: true 收到消息后,是否不需要回復確認即被認為被消費 * exclusive: false 是否排他,即這個隊列只能由一個消費者消費。適用于任務不允許進行并發處理的情況下 * nowait: false 不返回執行結果,但是如果排他開啟的話,則必須需要等待結果的,如果兩個一起開就會報錯 * callback: $callback 回調邏輯處理函數 */ $channel->basic_consume($orderConfig['queue_name'], $orderConfig['consumer_tag'], false, false, false, false, array($this, 'process_message')); // 退出,執行shutdown來關閉通道與連接 register_shutdown_function(array($this, 'shutdown'), $channel, $connection); // 阻塞隊列監聽事件 while (count($channel->callbacks)) { $channel->wait(); } } /** * 消息處理 * @param $message */ public function process_message($message) { if ($message->body !== 'quit') { $messageBody = json_decode($message->body); // 自定義的消息類型 if (!isset($messageBody->message_type)) { Log::write("error data:" . $message->body, 2); } else { // $messageModel = new MessageModel(); try { // 消息 Log::write("message_data:" . json_encode($message, JSON_UNESCAPED_UNICODE)); $body = json_decode($message->body, true); dump("消費消息如下:"); dump($body); // $messageModel->test($body); } catch (\Think\Exception $e) { Log::write($e->getMessage(), 2); Log::write(json_encode($message), 2); } catch (\PDOException $pe) { Log::write($pe->getMessage(), 2); Log::write(json_encode($message), 2); } } } // 手動確認ack,確保消息已經處理 $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); // Send a message with the string "quit" to cancel the consumer. if ($message->body === 'quit') { $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']); } } /** * 關閉進程 * @param $channel * @param $connection */ public function shutdown($channel, $connection) { // 關閉信道 $channel->close(); // 關閉連接 $connection->close(); } } ~~~ 3. `Producer` 生產者 ~~~ <?php declare (strict_types = 1); namespace app\common\service\rabbitmq; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; /** * rabbitmq 生產者 */ class Producer { // 連接 protected $connection; // 管道 protected $channel; // 配置內容(基本) protected $mqConfig; // 配置內容(扇出) protected $fanoutConfig; public function __construct() { $this->mqConfig = config('rabbitmq.amqp'); $this->fanoutConfig = config('rabbitmq.fanout_queue'); // 創建連接 $this->connection = new AMQPStreamConnection( $this->mqConfig['host'], $this->mqConfig['port'], $this->mqConfig['user'], $this->mqConfig['password'], $this->mqConfig['vhost'] // 虛擬主機(起到消息隔離的作用) ); // 創建通道 $this->channel = $this->connection->channel(); } /** * 發送消息 * @param $data 消息內容 */ public function send($data) { /* * 流量控制 Specifies QoS * 消費者在開啟acknowledge的情況下,對接收到的消息需要異步對消息進行確認 * 由于消費者自身處理能力有限,從rabbitmq獲取一定數量的消息后,希望rabbitmq不再將隊列中的消息推送過來, * 當對消息處理完后(即對消息進行了ack,并且有能力處理更多的消息)再接收來自隊列的消息 * @param int $prefetch_size 最大unacked消息的字節數 * @param int $prefetch_count 最大unacked消息的條數 * @param bool $a_global 上述限制的限定對象,false限制單個消費者,true限制整個通道 * @return mixed */ $this->channel->basic_qos(0, 1, false); /** * 創建隊列(Queue) * name: hello 隊列名稱 * passive: false 如果設置true存在則返回OK,否則就報錯。設置false存在返回OK,不存在則自動創建 * durable: true 是否持久化,設置false是存放到內存中的,RabbitMQ重啟后會丟失;設置true,則代表是一個持久化的隊列,服務重啟后也會存在,因為服務會把持久化的queue存放到磁盤上當服務重啟的時候,會重新加載之前被持久化的queue * exclusive: false 是否排他,指定該選項為true則隊列只對當前連接有效,連接斷開后自動刪除 * auto_delete: false 是否自動刪除,當最后一個消費者斷開連接之后隊列是否自動被刪除 */ $this->channel->queue_declare($this->fanoutConfig['queue_name'], false, true, false, false); /** * 創建交換機(Exchange) * name: hello 交換機名稱 * type: direct 交換機類型,分別為direct/fanout/topic,參考另外文章的Exchange Type說明。 * passive: false 如果設置true存在則返回OK,否則就報錯。設置false存在返回OK,不存在則自動創建 * durable: false 是否持久化,設置false是存放到內存中的,RabbitMQ重啟后會丟失 * auto_delete: false 是否自動刪除,當最后一個消費者斷開連接之后隊列是否自動被刪除 */ $this->channel->exchange_declare($this->fanoutConfig['exchange_name'], $this->fanoutConfig['exchange_type'], false, true, false); // 綁定消息交換機和隊列 $this->channel->queue_bind($this->fanoutConfig['queue_name'], $this->fanoutConfig['exchange_name'],$this->fanoutConfig['route_key']); // 將要發送數據變為json字符串 $messageBody = json_encode($data, JSON_UNESCAPED_UNICODE); /** * 創建AMQP消息類型 * delivery_mode 消息是否持久化 * AMQPMessage::DELIVERY_MODE_NON_PERSISTENT 不持久化 * AMQPMessage::DELIVERY_MODE_PERSISTENT 持久化 */ $message = new AMQPMessage($messageBody, [ 'content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT ]); /** * 發送消息 * msg: $message AMQP消息內容 * exchange: vckai_exchange 交換機名稱 * routing_key: hello 路由key */ $this->channel->basic_publish($message, $this->fanoutConfig['exchange_name'], $this->fanoutConfig['route_key']); // 關閉連接 $this->stop(); } // 關閉連接 public function stop() { $this->channel->close(); $this->connection->close(); } } ~~~ 4. 創建 `goods` 和 `order` 自定義命令,作為消費者。 ~~~php php think make:command Goods ~~~ ~~~ <?php declare (strict_types = 1); namespace app\command; use think\console\Command; use think\console\Input; use think\console\input\Argument; use think\console\input\Option; use think\console\Output; use app\common\service\rabbitmq\Consumer; class Goods extends Command { protected function configure() { // 指令配置 $this->setName('goods') ->setDescription('the goods command'); } protected function execute(Input $input, Output $output) { // 啟動消費者 $consumer = new Consumer(); $consumer->goods(); } } ~~~ ``` php think make:command Order ``` ~~~ <?php declare (strict_types = 1); namespace app\command; use think\console\Command; use think\console\Input; use think\console\input\Argument; use think\console\input\Option; use think\console\Output; use app\common\service\rabbitmq\Consumer; class Order extends Command { protected function configure() { // 指令配置 $this->setName('order') ->setDescription('the order command'); } protected function execute(Input $input, Output $output) { // 啟動消費者 $consumer = new Consumer(); $consumer->order(); } } ~~~ 5. 配置 `config/console.php` 命令行 ~~~ <?php // +---------------------------------------------------------------------- // | 控制臺配置 // +---------------------------------------------------------------------- return [ // 指令定義 'commands' => [ // rabbitMq 調用消費者 'consumer' => 'app\command\Consumer', // 商品 'goods' => 'app\command\Goods', // 訂單 'order' => 'app\command\Order', ], ]; ~~~ 6. 路由發送消息 `route/app.php` ~~~ Route::get('rabbitmq', function (){ $producer = new \app\common\service\rabbitmq\Producer(); $data = [ 'message_type' => 2, 'order_id' => 3, 'user_id' => 3, 'time' => time(), 'message' => "發送的消息內容:您的快遞已到的配送站。(" . rand(1,100) . ")" ]; $producer->send($data); }); ~~~ 7. 啟動消費者 ``` php think order ``` ``` php think goods ``` 8. 生產者發送消息 ![](https://img.kancloud.cn/70/7a/707a517d9810c7401022a64dca08a17a_731x315.png) ![](https://img.kancloud.cn/25/3f/253fc7a8a4935950985518ae155712d8_686x250.png) ![](https://img.kancloud.cn/05/0e/050ed682f70fb4be31790d4e057b7596_677x289.png)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看