# 循環分發
啟動一個發送端往隊列發消息,此時啟動多個接收端。發送的消息會對接收端一個一個挨著發送消息。

這就是默認情況下,多個接收端輪流消費消息。隊列發送給消費端后,就立即刪除。那么問題來了,當某個消費者在處理消息的時候,異常終止了怎么辦?此時,我們更希望這樣:若是那個消費者掛掉了,消息自動轉給另一個消費者處理。
幸好,rabbitmq就有效確認機制。消費者收到消息后,正常處理完成,此時才通知隊列可以自由刪除。那么問題又來了,消費者掛掉了連確認消息都發不出,該怎么辦?rabbitmq維持了消費者的連接信息。消費者掛掉,與server的連接通道會關閉或tcp連接丟失。這時server知道了這個情況,就自動重發消息。
這里還有個問題,就是server掛掉了怎么辦?**注意: durable=True。這個就是,當server掛了隊列還存在。delivery_mode=2:server掛了消息還存在。若是保證消息不丟,這兩個參數都要設置。**
發送端
```
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# durable:server掛了隊列仍然存在
channel.queue_declare(queue='task_queue', durable=True)
# 使用默認的交換機發送消息。exchange為空就使用默認的。delivery_mode=2:使消息持久化。和隊列名稱綁定routing_key
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2,
))
print(" [x] Sent %r" % message)
connection.close()
```
接收端
```
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
# 手動對消息進行確認
ch.basic_ack(delivery_tag=method.delivery_tag)
# basic_consume:這個函數有no_ack參數。該參數默認為false。表示:需要對message進行確認。怎么理解:no設置成false,表示要確認
channel.basic_consume(callback, queue='task_queue')
channel.start_consuming()
```
# 公平派遣
此刻,我們已經知道如何保證消息不丟,那么問題又來了。有的消費干得快,有的干得慢。這樣分發消息,有的累死有的沒事干。這個問題如何解決?

rabbitmq已經考慮到了。那就是:那個干完了,通知給server,server就發送給那個。
在上面的接收端的
```
channel.basic_consume(callback, queue='task_queue')
```
代碼前加:
```
channel.basic_qos(prefetch_count=1)
```
# 發布訂閱模式
我們要將同一個消息發給多個客戶端。
發送端:
```
import pika
import sys
credentials = pika.PlainCredentials("yang", "123456")
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.2.176', credentials=credentials))
channel = connection.channel()
# 原則上,消息,只能有交換機傳到隊列。就像我們家里面的交換機道理一樣。
# 有多個設備連接到交換機,那么,這個交換機把消息發給那個設備呢,就是根據
# 交換機的類型來定。類型有:direct\topic\headers\fanout
# fanout:這個就是,所有的設備都能收到消息,就是廣播。
# 此處定義一個名稱為'logs'的'fanout'類型的exchange
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
# 將消息發送到名為log的exchange中
# 因為是fanout類型的exchange,所以無需指定routing_key
message = "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()
```
接收端:
```
import pika
credentials = pika.PlainCredentials("yang", "123456")
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.2.176', credentials=credentials))
channel = connection.channel()
# 這里需要和發送端保持一致(習慣和要求)
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
# 類似的,比如log,我們其實最想看的,當連接上的時刻到消費者退出,這段時間的日志
# 有些消息,過期了的對我們并沒有什么用
# 并且,一個終端,我們要收到隊列的所有消息,比如:這個隊列收到兩個消息,一個終端收到一個。
# 我們現在要做的是:兩個終端都要收到兩個
# 那么,我們就只需做個臨時隊列。消費端斷開后就自動刪除
result = channel.queue_declare(exclusive=True)
# 取得隊列名稱
queue_name = result.method.queue
# 將隊列和交換機綁定一起
channel.queue_bind(exchange='logs',
queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
# no_ack=True:此刻沒必要回應了
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
```
# 根據類型訂閱消息
一些消息,仍然發送給所有接收端。其中,某個接收端,只對其中某些消息感興趣,它只想接收這一部分消息。如下圖:C1,只對error感興趣,C2對其他三種甚至對所有都感興趣,我們該怎么搞呢?

發送端:
```
import pika
import sys
credentials = pika.PlainCredentials("yang", "123456")
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.2.176', credentials=credentials))
channel = connection.channel()
# 創建一個交換機:direct_logs 類型是:direct
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
# 向exchage按照設置的 routing_key=severity 發送message
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
```
接收端:
```
import pika
import sys
credentials = pika.PlainCredentials("yang", "123456")
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.2.176', credentials=credentials))
channel = connection.channel()
# 跟發送端一致
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
# 還是聲明臨時隊列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
# 使用routing_key綁定交換機和隊列。廣播類型,無需使用這個
# direct類型:會對消息進行精確匹配
# 對個隊列使用相同路由key是可以的
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
```
# 進行RPC調用
RPC:是遠程過程調用。簡單點說:比如,我們在本地的代碼中調用一個函數,那么這個函數不一定有返回值, 但一定有返回。若是在分布式環境中,前面的例子,發送消息出去后,發送端是不清楚客戶端處理完后的結果的。由于rabbitmq的響應機制,頂多能獲取到客戶端的處理狀態,但并不能獲取處理結果。那么,我們想像本地調用那樣,需要客戶端處理后返回結果該怎么辦呢。就是如下圖:

client發送請求,同時告訴server處理完后要發送消息給:回調隊列的ID:correlation_id=abc,并調用replay_to回調隊列對應的回調函數。
## 客戶端
客戶端:發消息也收消息
```
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
# 創建連接
credentials = pika.PlainCredentials("yang", "123456")
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.2.176', credentials=credentials))
self.channel = self.connection.channel()
# 創建回調隊列
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
# 這里:這個是消息發送方,當要執行回調的時候,它又是接收方
# 使用callback_queue 實現消息接收。即是回調。注意:這里的回調
# 不需要對消息進行確認。反復確認,沒玩沒了就成了死循環
#這里設置回調
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
# 定義回調的響應函數。
# 判斷:若是當前的回調ID和響應的回調ID相同,即表示,是本次請求的回調
# 原因:若是發起上百個請求,發送端總得知道回來的對應的是哪一個發送的
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
# 設置響應和回調通道的ID
self.response = None
self.corr_id = str(uuid.uuid4())
# properties中指定replay_to:表示回調要調用那個函數
# 指定correlation_id:表示回調返回的請求ID是那個
# body:是要交給接收端的參數
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(reply_to=self.callback_queue,
correlation_id=self.corr_id,),
body=str(n))
# 監聽回調
while self.response is None:
self.connection.process_data_events()
# 返回的結果是整數,這里進行強制轉換
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
```
服務端:
```
import pika
credentials = pika.PlainCredentials("yang", "123456")
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.2.176', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)
def on_request(ch, method, props, body):
#收到的消息
n = int(body)
print(" [.] fib(%s)" % n)
#要處理的任務
response = fib(n)
#發布消息。通知到客戶端
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id= props.correlation_id),
body=str(response))
#手動響應
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
print(" [x] Awaiting RPC requests")
channel.start_consuming()
```