<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                [TOC] 英文文檔:http://www.rabbitmq.com/getstarted.html 中文文檔:https://rabbitmq.shujuwajue.com/tutorials_with_php/[2]Work_Queues.md.html 總結:官方文檔看起來有點亂,總結了一下,基本都是互通的,以下為簡單基本步驟,僅供參考,NO BB。 # 生產者: ## 1、創建連接 主要參數說明: ~~~ $host: RabbitMQ服務器主機IP地址 $port: RabbitMQ服務器端口 $user: 連接RabbitMQ服務器的用戶名 $password: 連接RabbitMQ服務器的用戶密碼 $vhost: 連接RabbitMQ服務器的vhost(服務器可以有多個vhost,虛擬主機,類似nginx的vhost) ~~~ ~~~ $connection = new AMQPStreamConnection($host,$port,$user,$password,$vhost); ~~~ ## 2、獲取信道 ~~~ // $channel_id 信道id,不傳則獲取$channel[“”]信道,再無則循環$this->channle數組,下標從1到最大信道數找第一個不是AMQPChannel對象的下標,實例化并返回AMQPChannel對象,無則拋出異常No free channel ids ~~~ ~~~ $channel = $connection->channel($channel_id); ~~~ ## 3、在信道里創建交換器 ~~~ # $exhcange_name 交換器名字 # $type 交換器類型: ’’ 默認交換機 匿名交換器 未顯示聲明類型都是該類型 fanout 扇形交換器 會發送消息到它所知道的所有隊列,每個消費者獲取的消息都是一致的 headers 頭部交換器 direct 直連交換器,該交換機將會對綁定鍵(binding key)和路由鍵(routing key)進行精確匹配 topic 話題交換器 該交換機會對路由鍵正則匹配,必須是*(一個單詞)、#(多個單詞,以.分割) 、 user.key .abc.* 類型的key rpc #$passive false #durable false #auto_detlete false $channel->exchange_declare($exhcange_name,$type,$passive,$durable,$auto_delete); //常用設置 $passive=>false $durable=>false $auto_delete->false ~~~ ## 4、創建要發送的信息 ,可以創建多個消息 ~~~ #$data string類型 要發送的消息 #$properties array類型 設置的屬性,比如設置該消息持久化[‘delivery_mode’=>2] $msg = new AMQPMessage($data,$properties) ~~~ ## 5、發送消息 ~~~ #$msg object AMQPMessage對象 #$exchange string 交換機名字 #$routing_key string 路由鍵 如果交換機類型 fanout: 該值會被忽略,因為該類型的交換機會把所有它知道的隊列發消息,無差別區別 direct 只有精確匹配該路由鍵的隊列,才會發送消息到該隊列 topic 只有正則匹配到的路由鍵的隊列,才會發送到該隊列 $channel->basic_publish($msg,$exchange,$routing_key); ~~~ ## 6、關閉信道和鏈接 ~~~ $channel->close(); $connection->close(); ~~~ # 消費者: ## 1、創建連接 主要參數說明: ~~~ $host: RabbitMQ服務器主機IP地址 $port: RabbitMQ服務器端口 $user: 連接RabbitMQ服務器的用戶名 $password: 連接RabbitMQ服務器的用戶密碼 $vhost: 連接RabbitMQ服務器的vhost(服務器可以有多個vhost,虛擬主機,類似nginx的vhost) ~~~ ~~~ $connection = new AMQPStreamConnection($host,$port,$user,$password,$vhost); ~~~ ## 2、獲取信道 ~~~ // $channel_id 信道id,不傳則獲取$channel[“”]信道,再無則循環$this->channle數組,下標從1到最大信道數找第一個不是AMQPChannel對象的下標,實例化并返回AMQPChannel對象,無則拋出異常No free channel ids $channel = $connection->channel($channel_id); ~~~ ## 3、在信道里創建交換器,未顯式聲明交換機都是使用匿名交換機 ~~~ # $exhcange_name 交換器名字 # $type 交換器類型: ’’ 默認交換機 匿名交換器 未顯示聲明類型都是該類型 fanout 扇形交換器 會發送消息到它所知道的所有隊列,每個消費者獲取的消息都是一致的 headers 頭部交換器 direct 直連交換器,該交換機將會對綁定鍵(binding key)和路由鍵(routing key)進行精確匹配 topic 話題交換器 該交換機會對路由鍵正則匹配,必須是*(一個單詞)、#(多個單詞,以.分割) 、 user.key .abc.* 類型的key rpc #$passive false #durable false #auto_detlete false $channel->exchange_declare($exhcange_name,$type,$passive,$durable,$auto_delete); //常用設置 $passive=>false $durable=>false $auto_delete->false ~~~ ## 4、聲明消費者隊列 (1) 非持久化隊列,RabbitMQ退出或者崩潰時,該隊列就不存在 ~~~ list($queue_name, ,) = $channel->queue_declare("", false, false, false, false) ~~~ (2) 持久化隊列(需要顯示聲明,第三個參數要設置為true),保存到磁盤,但不一定完全保證不丟失信息,因為保存總是要有時間的。 ~~~ list($queue_name, ,) = $channel->queue_declare("ex_queue", false, false, true, false) ~~~ ## 5、綁定交換機,當未顯示綁定交換機時,默認是綁定匿名交換機 ~~~ #綁定:交換機與隊列的關系,如下面這句代碼意思是,$queue_name隊列對logs交換機數據感興趣,該隊列就消費該交換機傳過來的數據:這個隊列(queue)對這個交換機(exchange)的消息感興趣. $binding_key默認為空,表示對該交換機所有消息感興趣,如果值不為空,則該隊列只對該類型的消息感興趣(除了fanout交換機以外) $channel->queue_bind($queue_name, 'logs', $binding_key); ~~~ ## 6、消費消息 ~~~ #該代碼表示使用basic.qos方法,并設置prefetch_count=1。這樣是告訴RabbitMQ,再同一時刻,不要發送超過1條消息給一個工作者(worker),直到它已經處理了上一條消息并且作出了響應。這樣,RabbitMQ就會把消息分發給下一個空閑的工作者(worker),輪詢、負載均衡配置 #$channel->basic_qos(null, 1, null); #第四個參數 no_ack = false 時,表示進行ack應答,確保消息已經處理 #$callback 表示回調函數,傳入消息參數 $channel->basic_consume('ex_queue', '', false, false, false, false, $callback); $callback = function($msg){ echo " [x] Received ", $msg->body, "\n"; sleep(substr_count($msg->body, '.')); echo " [x] Done", "\n"; #當no_ack=false時, 需要寫下行代碼,否則可能出現內存不足情況#$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; #監聽消息,一有消息,立馬就處理 while(count($channel->callbacks)) { $channel->wait(); } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看