<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                # 循環分發 啟動一個發送端往隊列發消息,此時啟動多個接收端。發送的消息會對接收端一個一個挨著發送消息。 ![](https://box.kancloud.cn/0dfbb61885d91b32807858d2ea66e669_1584x709.png) 這就是默認情況下,多個接收端輪流消費消息。隊列發送給消費端后,就立即刪除。那么問題來了,當某個消費者在處理消息的時候,異常終止了怎么辦?此時,我們更希望這樣:若是那個消費者掛掉了,消息自動轉給另一個消費者處理。 幸好,rabbitmq就有效確認機制。消費者收到消息后,正常處理完成,此時才通知隊列可以自由刪除。那么問題又來了,消費者掛掉了連確認消息都發不出,該怎么辦?rabbitmq維持了消費者的連接信息。消費者掛掉,與server的連接通道會關閉或tcp連接丟失。這時server知道了這個情況,就自動重發消息。 這里還有個問題,就是server掛掉了怎么辦?**注意: durable=True。這個就是,當server掛了隊列還存在。delivery_mode=2:server掛了消息還存在。若是保證消息不丟,這兩個參數都要設置。** 發送端 ``` import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() # durable:server掛了隊列仍然存在 channel.queue_declare(queue='task_queue', durable=True) # 使用默認的交換機發送消息。exchange為空就使用默認的。delivery_mode=2:使消息持久化。和隊列名稱綁定routing_key message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, )) print(" [x] Sent %r" % message) connection.close() ``` 接收端 ``` import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") # 手動對消息進行確認 ch.basic_ack(delivery_tag=method.delivery_tag) # basic_consume:這個函數有no_ack參數。該參數默認為false。表示:需要對message進行確認。怎么理解:no設置成false,表示要確認 channel.basic_consume(callback, queue='task_queue') channel.start_consuming() ``` # 公平派遣 此刻,我們已經知道如何保證消息不丟,那么問題又來了。有的消費干得快,有的干得慢。這樣分發消息,有的累死有的沒事干。這個問題如何解決? ![](https://box.kancloud.cn/c967559388971bc1277b7021edfcf1c9_396x111.png) rabbitmq已經考慮到了。那就是:那個干完了,通知給server,server就發送給那個。 在上面的接收端的 ``` channel.basic_consume(callback, queue='task_queue') ``` 代碼前加: ``` channel.basic_qos(prefetch_count=1) ``` # 發布訂閱模式 我們要將同一個消息發給多個客戶端。 發送端: ``` import pika import sys credentials = pika.PlainCredentials("yang", "123456") connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.2.176', credentials=credentials)) channel = connection.channel() # 原則上,消息,只能有交換機傳到隊列。就像我們家里面的交換機道理一樣。 # 有多個設備連接到交換機,那么,這個交換機把消息發給那個設備呢,就是根據 # 交換機的類型來定。類型有:direct\topic\headers\fanout # fanout:這個就是,所有的設備都能收到消息,就是廣播。 # 此處定義一個名稱為'logs'的'fanout'類型的exchange channel.exchange_declare(exchange='logs', exchange_type='fanout') # 將消息發送到名為log的exchange中 # 因為是fanout類型的exchange,所以無需指定routing_key message = "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close() ``` 接收端: ``` import pika credentials = pika.PlainCredentials("yang", "123456") connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.2.176', credentials=credentials)) channel = connection.channel() # 這里需要和發送端保持一致(習慣和要求) channel.exchange_declare(exchange='logs', exchange_type='fanout') # 類似的,比如log,我們其實最想看的,當連接上的時刻到消費者退出,這段時間的日志 # 有些消息,過期了的對我們并沒有什么用 # 并且,一個終端,我們要收到隊列的所有消息,比如:這個隊列收到兩個消息,一個終端收到一個。 # 我們現在要做的是:兩個終端都要收到兩個 # 那么,我們就只需做個臨時隊列。消費端斷開后就自動刪除 result = channel.queue_declare(exclusive=True) # 取得隊列名稱 queue_name = result.method.queue # 將隊列和交換機綁定一起 channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) # no_ack=True:此刻沒必要回應了 channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming() ``` # 根據類型訂閱消息 一些消息,仍然發送給所有接收端。其中,某個接收端,只對其中某些消息感興趣,它只想接收這一部分消息。如下圖:C1,只對error感興趣,C2對其他三種甚至對所有都感興趣,我們該怎么搞呢? ![](https://box.kancloud.cn/65d418b54973525e03432d314234141d_423x171.png) 發送端: ``` import pika import sys credentials = pika.PlainCredentials("yang", "123456") connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.2.176', credentials=credentials)) channel = connection.channel() # 創建一個交換機:direct_logs 類型是:direct channel.exchange_declare(exchange='direct_logs', exchange_type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' # 向exchage按照設置的 routing_key=severity 發送message channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close() ``` 接收端: ``` import pika import sys credentials = pika.PlainCredentials("yang", "123456") connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.2.176', credentials=credentials)) channel = connection.channel() # 跟發送端一致 channel.exchange_declare(exchange='direct_logs', exchange_type='direct') # 還是聲明臨時隊列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) # 使用routing_key綁定交換機和隊列。廣播類型,無需使用這個 # direct類型:會對消息進行精確匹配 # 對個隊列使用相同路由key是可以的 for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming() ``` # 進行RPC調用 RPC:是遠程過程調用。簡單點說:比如,我們在本地的代碼中調用一個函數,那么這個函數不一定有返回值, 但一定有返回。若是在分布式環境中,前面的例子,發送消息出去后,發送端是不清楚客戶端處理完后的結果的。由于rabbitmq的響應機制,頂多能獲取到客戶端的處理狀態,但并不能獲取處理結果。那么,我們想像本地調用那樣,需要客戶端處理后返回結果該怎么辦呢。就是如下圖: ![](https://box.kancloud.cn/2e78d61bca855f6c743a3b7fe0e26eaf_576x200.png) client發送請求,同時告訴server處理完后要發送消息給:回調隊列的ID:correlation_id=abc,并調用replay_to回調隊列對應的回調函數。 ## 客戶端 客戶端:發消息也收消息 ``` import pika import uuid class FibonacciRpcClient(object): def __init__(self): # 創建連接 credentials = pika.PlainCredentials("yang", "123456") self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.2.176', credentials=credentials)) self.channel = self.connection.channel() # 創建回調隊列 result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue # 這里:這個是消息發送方,當要執行回調的時候,它又是接收方 # 使用callback_queue 實現消息接收。即是回調。注意:這里的回調 # 不需要對消息進行確認。反復確認,沒玩沒了就成了死循環 #這里設置回調 self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) # 定義回調的響應函數。 # 判斷:若是當前的回調ID和響應的回調ID相同,即表示,是本次請求的回調 # 原因:若是發起上百個請求,發送端總得知道回來的對應的是哪一個發送的 def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): # 設置響應和回調通道的ID self.response = None self.corr_id = str(uuid.uuid4()) # properties中指定replay_to:表示回調要調用那個函數 # 指定correlation_id:表示回調返回的請求ID是那個 # body:是要交給接收端的參數 self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties(reply_to=self.callback_queue, correlation_id=self.corr_id,), body=str(n)) # 監聽回調 while self.response is None: self.connection.process_data_events() # 返回的結果是整數,這里進行強制轉換 return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response) ``` 服務端: ``` import pika credentials = pika.PlainCredentials("yang", "123456") connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.2.176', credentials=credentials)) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body): #收到的消息 n = int(body) print(" [.] fib(%s)" % n) #要處理的任務 response = fib(n) #發布消息。通知到客戶端 ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id= props.correlation_id), body=str(response)) #手動響應 ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming() ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看