### 服務質量
默認情況下,生產者發送的消息當匹配到隊列中,而此時有消費者.會把所有的消息一次性的打入消費者(如果有多個消費者會平均分配)的本地緩存中的.就算引入了AckOK情況也是如此.當消息數量巨大的時候,那么久會產生oom,內存會爆掉.
還有一種情況是,有兩個消費者,一個處理的速度較快,一個處理的速度較慢.如果按上面的方式去分配消息的話,會造成一個消費者很多將消息處理完畢,然后進入空閑狀態,而另一個消費者計算壓力很大. 這樣也會增加總的消費耗時.
### 證明
證明當隊列發現消費者后將所有的消息全部打入消費者客戶端. 可以發現,生產者生產完數據后,web管理界面沒有需要處理的消息,說明此時消息已經全部被發送給了消費者.而消費者并沒有一次性全部處理完這些消息,因為沒處理一個消息會停止1S.
生產者:
~~~
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
defer conn.Close()
if err != nil {
fmt.Println(err)
return
}
channel, err := conn.Channel()
defer channel.Close()
if err != nil {
fmt.Println(err)
return
}
channel.ExchangeDeclare("myExchange", amqp.ExchangeDirect, true, false, false, false, nil)
channel.QueueDeclare("myQueue", true, false, false, false, nil)
channel.QueueBind("myQueue", "test", "myExchange", false, nil)
for i := 0; i < 100; i++ {
channel.Publish("myExchange", "test", false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte(strconv.Itoa(i)),
})
fmt.Printf("發送: %d\n", i)
}
~~~
消費者:
~~~
conn, err := amqp.Dial("amqp://jack:111111@localhost:5672")
if err != nil {
fmt.Println(err)
}
ch, err := conn.Channel()
if err != nil {
fmt.Println(err)
}
ch.QueueDeclare("myQueue", true, false, false, false, nil)
b := make(chan bool)
msg, err := ch.Consume("myQueue", "", true, false, false, false, nil)
for m := range msg {
fmt.Printf("消費: %s\n", string(m.Body))
time.Sleep(time.Second)
}
<-b
~~~

### 使用qos
修改消費者:
~~~
conn, err := amqp.Dial("amqp://jack:111111@localhost:5672")
if err != nil {
fmt.Println(err)
}
ch, err := conn.Channel()
if err != nil {
fmt.Println(err)
}
ch.QueueDeclare("myQueue", true, false, false, false, nil)
ch.Qos(1, 0, false) //限制獲取的消息數量,其中第二個參數和第三個參數rabbit沒有實現,可以無視
b := make(chan bool)
msg, err := ch.Consume("myQueue", "", false, false, false, false, nil) //關閉autoack
for m := range msg {
fmt.Printf("消費: %s\n", string(m.Body))
m.Ack(true) //手動ack
time.Sleep(time.Second)
}
<-b
~~~
此時我們看到隊列中的消息并沒有直接打入消費者,而且保留在了隊列中,當隊列收到消費者ack確認后,才會推送下一條消息.所以當autoack設置為true的時候,數據還是全部發送給了客戶端.只不過是每次發送就是等待了一個ack確認.
需要注意的是,如果autoack設置為true,那么設置qos是無效的.所有數據還是會全部一次性的打入消費者客戶端.

- 定義和特征
- 安裝
- 基本概念
- 插件管理
- 核心概念
- virtual hosts
- connextion
- exchange
- channel
- queue
- binding
- 工作模式
- simple模式
- work模式
- 訂閱模式
- routing模式
- topic模式
- QOS服務質量
- =====分割線=====
- RabbitMQ核心概念
- 初識RabbitMQ
- 什么是AMQP高級消息隊列協議
- AMQP核心概念
- RabbitMQ整體架構模型
- 命令行與管控臺操作
- RabbitMQ消息生產與消費
- RabbitMQ交換機詳解
- 什么是exchange
- direct
- topic
- fanout
- headers
- RabbitMQ綁定,隊列,虛擬主機,消息
- RabbitMQ高級特性
- 消息保障100%投遞成功
- 冪等性概念及業界主流解決方案
- confirm確認消息
- return返回消息
- 自定義消費者
- 消費端限流策略
- 消費端ack與重回隊列機制
- TTL消息
- 死信隊列
- RabbitMQ集群架構