消息隊列產品有很多,比如 ActiveMQ、RabbitMQ ,ZeroMQ ,RocketMQ ,連 redis 這樣的 NoSQL 數據庫也支持 MQ 功能。總之這塊知名的產品就有十幾種,本文先講 RabbitMQ ,在此之前先看下消息隊列的相關概念。
## 什么是消息隊列
消息(Message)是指在應用間傳送的數據。消息可以非常簡單,比如只包含文本字符串,也可以更復雜,可能包含嵌入對象。
消息隊列(Message Queue)是一種應用間的通信方式,消息發送后可以立即返回,由消息系統來確保消息的可靠傳遞。消息發布者只管把消息發布到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不管是誰發布的。這樣發布者和使用者都不用知道對方的存在。
## RabbitMQ特點
RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。RabbitMQ 最初起源于金融系統,用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。具體特點包括:
1. 可靠性(Reliability)
RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發布確認。
2. 靈活的路由(Flexible Routing)
在消息進入隊列之前,通過 Exchange 來路由消息的。對于典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,可以將多個 Exchange 綁定在一起,也通過插件機制實現自己的 Exchange 。
3. 消息集群(Clustering)
多個 RabbitMQ 服務器可以組成一個集群,形成一個邏輯 Broker 。
4. 高可用(Highly Available Queues)
隊列可以在集群中的機器上進行鏡像,使得在部分節點出問題的情況下隊列仍然可用。
5. 多種協議(Multi-protocol)
RabbitMQ 支持多種消息隊列協議,比如 STOMP、MQTT 等等。
6. 多語言客戶端(Many Clients)
RabbitMQ 幾乎支持所有常用語言,比如 Java、.NET、Ruby 等等。
7. 管理界面(Management UI)
RabbitMQ 提供了一個易用的用戶界面,使得用戶可以監控和管理消息 Broker 的許多方面。
8. 跟蹤機制(Tracing)
如果消息異常,RabbitMQ 提供了消息跟蹤機制,使用者可以找出發生了什么。
9. 插件機制(Plugin System)
RabbitMQ 提供了許多插件,來從多方面進行擴展,也可以編寫自己的插件。
RabbitMQ的安裝可以參考博客:
https://www.cnblogs.com/ericli-ericli/p/5902270.html
## RabbitMQ的概念模型
### 基本模型
所有 MQ 產品從模型抽象上來說都是一樣的過程:
消費者(consumer)訂閱某個隊列。生產者(producer)創建消息,然后發布到隊列(queue)中,最后將消息發送到監聽的消費者。

### 基本概念

* Message
消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對于其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等。
* Publisher
消息的生產者,也是一個向交換器發布消息的客戶端應用程序。
* Exchange
交換器,用來接收生產者發送的消息并將這些消息路由給服務器中的隊列。
* Binding
綁定,用于消息隊列和交換器之間的關聯。一個綁定就是基于路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。
* Queue
消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
* Connection
網絡連接,比如一個TCP連接。
* Channel
信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內地虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對于操作系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。
* Consumer
消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
* Virtual Host
虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 / 。
* Broker
表示消息隊列服務器實體。
## RabbitMQ實現消息傳遞實例
**最簡單的隊列通信**
接收端
~~~
import pika,time
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello1')
def callback(ch, method, properties, body):
print('>>',ch,method,properties)
time.sleep(30)
print(" [x] Received %r" % body)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='hello1',
no_ack=True
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
~~~
發送端
~~~
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello1')
channel.basic_publish(exchange='',
routing_key='hello1',
body='Hello World!')
connection.close()
~~~
PS:**遠程連接rabbitmq server的話,需要配置權限**
可參考博客:https://www.cnblogs.com/alex3714/articles/5248247.html

在這種工作模式下,如果啟動了多個接收端,RabbitMQ會默認把p發的消息依次分發給各個消費者(c),相當于輪詢。
但是上面這樣寫的話,隊列里面的消息如果沒有被全部取出來,而且同時RabbitMQ又down機了,那這樣的話隊列里面的消息就都沒有了,所以我們需要做一個**消息的持久化**。
```
import pika,time
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello',durable=True)
def callback(ch, method, properties, body):
print('>>',ch,method,properties)
time.sleep(30)
print(" [x] Received %r" % body)
channel.basic_consume(callback,
queue='hello',
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
receive端
```
```
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
# 聲明queue
channel.queue_declare(queue='hello',durable=True)
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(" [x] Sent 'Hello World!'")
connection.close()
send端
```
**消息公平分發**
如果Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,很可能出現,一個機器配置不高的消費者那里堆積了很多消息處理不完,同時配置高的消費者卻一直很輕松。為解決此問題,**可以在各個消費者端,配置perfetch=1**,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。
~~~
channel.basic_qos(prefetch_count=1)
~~~
```
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(60)
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
接收端
```
```
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2,
))
connection.close()
發送端
```
**Publish\\Subscribe(消息發布\\訂閱) **
之前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue里,但有些時候你想讓你的消息被所有的Queue收到,類似廣播的效果,這時候就要用到exchange了。
fanout:?所有bind到此exchange的queue都可以接收消息

```
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
message = "Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()
publisher
```
```
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',exchange_type='fanout')
result = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
subscriber
```
direct:?通過routingKey和exchange決定的那個唯一的queue可以接收消息

```
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
publisher
```
```
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s info warning error\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
subscriber
```
topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息

```
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
exchange_type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
publisher
```
```
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
exchange_type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
subscriber
```
PS:表達式符號說明:#代表收所有,\*代表任何字符。(在終端運行,運行命令后面跟上符合條件的topic)
- Python學習
- Python基礎
- Python初識
- 列表生成式,生成器,可迭代對象,迭代器詳解
- Python面向對象
- Python中的單例模式
- Python變量作用域、LEGB、閉包
- Python異常處理
- Python操作正則
- Python中的賦值與深淺拷貝
- Python自定義CLI三方庫
- Python并發編程
- Python之進程
- Python之線程
- Python之協程
- Python并發編程與IO模型
- Python網絡編程
- Python之socket網絡編程
- Django學習
- 反向解析
- Cookie和Session操作
- 文件上傳
- 緩存的配置和使用
- 信號
- FBV&&CBV&&中間件
- Django補充
- 用戶認證
- 分頁
- 自定義搜索組件
- Celery
- 搭建sentry平臺監控
- DRF學習
- drf概述
- Flask學習
- 項目拆分
- 三方模塊使用
- 爬蟲學習
- Http和Https區別
- 請求相關庫
- 解析相關庫
- 常見面試題
- 面試題
- 面試題解析
- 網絡原理
- 計算機網絡知識簡單介紹
- 詳解TCP三次握手、四次揮手及11種狀態
- 消息隊列和數據庫
- 消息隊列之RabbitMQ
- 數據庫之Redis
- 數據庫之初識MySQL
- 數據庫之MySQL進階
- 數據庫之MySQL補充
- 數據庫之Python操作MySQL
- Kafka常用命令
- Linux學習
- Linux基礎命令
- Git
- Git介紹
- Git基本配置及理論
- Git常用命令
- Docker
- Docker基本使用
- Docker常用命令
- Docker容器數據卷
- Dockerfile
- Docker網絡原理
- docker-compose
- Docker Swarm
- HTML
- CSS
- JS
- VUE