案例代碼:https://gitee.com/flymini/codes01/tree/master/rabbitmq_/com-learn-rabbitmq01
****
[TOC]
# 1. 開啟發布確認
發布確認默認是沒有開啟的,如果要開啟需要在生產者調用方法 `channel.confirmSelect`,每當你想開啟要發布確認,都需要在 channel 上調用該方法。
<br/>
# 2. 發布確認策略
發布確認策略共有三種:單個確認發布、批量確認發布、異步確認發布。
## 2.1 單個確認發布
這是一種簡單的確認方式,它是一種<mark>同步確認發布</mark>的方式,也就是發布一個消息之后只有它被確認發布,后續的消息才能繼續發布。
```java
public static void publishMessageIndividually() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//開啟發布確認
channel.confirmSelect();
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
//服務端返回 false 或超時時間內未返回,生產者可以消息重發
boolean flag = channel.waitForConfirms();
if (flag) {
System.out.println("消息發送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("發布" + MESSAGE_COUNT + "個單獨確認消息,耗時" + (end - begin) + "ms");
}
}
```
<br/>
這種確認方式有一個最大的缺點就是:**發布速度特別的慢**,因為如果沒有確認發布的消息就會阻塞所有后續消息的發布,這種方式最多提供每秒不超過數百條發布消息的吞吐量。當然對于某些應用程序來說這可能已經足夠了。
<br/>
## 2.2 批量確認發布
上面那種方式非常慢,與單個等待確認消息相比,先發布一批消息然后一起確認可以極大地提高吞吐量,當然這種方式的缺點就是:**當發生故障導致發布出現問題時,不知道是哪個消息出現問題了**,我們必須將整個批處理保存在內存中,以記錄重要的信息而后重新發布消息。當然這種方案仍然是**同步的**,也一樣阻塞消息的發布。
```java
public static void publishMessageBatch() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//開啟發布確認
channel.confirmSelect();
//批量確認消息大小
int batchSize = 100;
//未確認消息個數
int outstandingMessageCount = 0;
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms();
outstandingMessageCount = 0;
}
}
//為了確保還有剩余沒有確認消息 再次確認
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}
long end = System.currentTimeMillis();
System.out.println("發布" + MESSAGE_COUNT + "個批量確認消息,耗時" + (end - begin) + "ms");
}
}
```
<br/>
## 2.3 異步確認發布
異步確認雖然編程邏輯比上兩個要復雜,但是性價比最高,無論是可靠性還是效率都沒得說,他是利用回調函數來達到消息可靠性傳遞的,這個中間件也是通過函數回調來保證是否投遞成功。

```java
public static void publishMessageAsync() throws Exception {
try (Channel channel = RabbitMQUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//開啟發布確認
channel.confirmSelect();
/*
* 線程安全有序的一個哈希表,適用于高并發的情況
* 1.輕松的將序號與消息進行關聯
* 2.輕松批量刪除條目,只要給到序列號
* 3.支持并發訪問
*/
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
/*
* 確認收到消息的一個回調
* sequenceNumber: 消息序列號
* multiple: 為true則可以確認小于等于當前序列號的消息、false只確認當前序列號消息
*/
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
//返回的是小于等于當前序列號的未確認消息
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);
//清除該部分未確認消息
confirmed.clear();
} else {
//只清除當前序列號的消息
outstandingConfirms.remove(sequenceNumber);
}
};
/*
* 沒有被確認消息的一個回調
*/
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
String message = outstandingConfirms.get(sequenceNumber);
System.out.println("發布的消息" + message + "未被確認,序列號" + sequenceNumber);
};
/*
* 添加一個異步確認的監聽器
*/
channel.addConfirmListener(ackCallback, nackCallback);
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i;
/*
* channel.getNextPublishSeqNo()獲取下一個消息的序列號
* 通過序列號與消息體進行一個關聯
* 全部都是未確認的消息體
*/
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("發布" + MESSAGE_COUNT + "個異步確認消息,耗時" + (end - begin) + "ms");
}
}
```
<br/>
## 2.4 如何處理異步未確認消息
最好的解決方案就是把未確認的消息放到一個基于內存的能被發布線程訪問的隊列,比如說用 `ConcurrentLinkedQueue` 這個隊列在 confirm callbacks 與發布線程之間進行消息的傳遞。
<br/>
## 2.5 3種發布確認速度對比
* 單獨發布消息:同步等待確認,簡單,但吞吐量非常有限。
* 批量發布消息: 批量同步等待確認,簡單,合理的吞吐量,一旦出現問題但很難推斷出是那條消息出現了問題。
* 異步處理:最佳性能和資源使用,在出現錯誤的情況下可以很好地控制,但是實現起來稍微難些。
```java
public static void main(String[] args) throws Exception {
publishMessageIndividually();
publishMessageBatch();
publishMessageAsync();
// 運行結果
// 發布1000個單獨確認消息,耗時457ms
// 發布1000個批量確認消息,耗時172ms
// 發布1000個異步確認消息,耗時64ms
}
```
- 消息隊列
- 什么是MQ
- MQ的作用
- MQ的分類
- MQ的選擇
- RabbitMQ
- RabbitMQ是什么
- 四大核心概念
- 工作原理
- 環境搭建
- windows系統下的搭建
- centos7系統下的搭建
- 常用命令
- 服務相關命令
- 管理用戶命令
- 管理隊列命令
- 第一個RabbitMQ程序
- 工作隊列
- 輪詢分發消息
- 消息應答
- 持久化
- 發布確認
- 發布確認原理
- 發布確認策略
- 交換機概念
- 交換機類型
- 無名交換機
- Fanout交換機
- Direct交換機
- Topic交換機
- 死信隊列
- 死信概念
- 死信來源
- 死信實戰
- 延遲隊列
- 什么是延遲隊列
- TTL設置方式
- 隊列TTL延遲隊列
- 消息TTL延遲隊列
- 插件打造延遲隊列
- 延遲隊列總結
- 發布確認高級
- 代碼實現
- 回退消息
- 備份交換機
- 冪等性
- 冪等性概念
- 消息重復消費
- 消費端冪等性保障
- 優先級隊列
- 使用場景
- 設置優先級
- 惰性隊列
- 什么是惰性隊列
- 隊列的兩種模式
- 聲明惰性隊列
- RabbitMQ集群
- 為什么要搭建集群
- 集群搭建步驟
- 集群工作方式
- 脫離集群
- 鏡像隊列
- 高可用負載均衡