[TOC]
# 1. 消息應答概念
消費者完成一個任務可能需要一段時間,如果其中一個消費者在處理一個長的任務時只完成了部分突然掛掉了,會發生什么情況。
<br/>
RabbitMQ 一旦向消費者傳遞了一條消息,便立即將該消息標記為刪除。在這種情況下,突然有個消費者掛掉了,我們將丟失正在處理的消息。以及后續發送給該消費者的消息,因為它無法接收到。
<br/>
為了保證消息在發送過程中不丟失,rabbitmq 引入消息應答機制,消息應答就是:<mark>消費者在接收到消息并且處理該消息之后,告訴 rabbitmq 它已經處理了,rabbitmq 可以把該消息刪除了</mark>。
<br/>
消息應答方式有:自動應答和手動應答兩種方式。
<br/>
# 2. 自動應答
消息發送后立即被認為已經傳送成功,這種模式需要在<mark>高吞吐量和數據傳輸安全性方面做權衡</mark>,因為這種模式如果消息在接收到之前,消費者那邊出現連接或者 channel 關閉,那么消息就丟失了。
<br/>
另一方面這種模式可能導致消費者過載,<mark>因為沒有對傳遞的消息數量進行限制</mark>,有可能使得消費者這邊由于接收太多還來不及處理的消息,導致這些消息的積壓,最終使得內存耗盡,最終這些消費者線程被操作系統殺死,<mark>所以這種模式僅適用在消費者可以高效并以某種速率能夠處理這些消息的情況下使用</mark>。
<br/>
# 3. 手動應答
## 3.1 消息自動重新入隊
如果消費者由于某些原因失去連接(如通道已關閉、連接已關閉或 TCP 連接丟失),導致消息未發送 ACK 確認,RabbitMQ 了解到消息未完全處理,將對消息重新排隊。
<br/>
如果此時其他消費者可以處理,該條消息將會很快被重新分發給另一個消費者。這樣,即使某個消費者偶爾死亡,也可以確保不會丟失任何消息。
<br/>
## 3.2 手動應答實現
消息默認采用的是自動應答,改用手動應答有如下好處。
(1)消息消費過程中數據不丟失。
(2)可以批量應答并且減少網絡擁堵。
<br/>
實現手動應答的 API 如下。
```java
public interface Channel extends ShutdownNotifier, AutoCloseable {
// 用于肯定確認
// RabbitMQ 已知道該消息被處理成功,可以將其丟棄了
void basicAck(long var1, boolean var3) throws IOException;
// 消費者拒絕消費
void basicNack(long var1, boolean var3, boolean var4) throws IOException;
// 消費者拒絕消費
// 與 basicNack 相比少一個參數var4,當消息不被處理時直接丟棄
void basicReject(long var1, boolean var3) throws IOException;
}
```
這三個方法都有一個參數 var3,是 multiple 域。
當`var3=true`代表批量應答 channel 上未應答的消息。比如 channel 上有正在傳送 tag 為 5、6、7、8 的消息。假設當前 tag=8,那么此時 5 — 8 的這些還未應答的消息都會被確認收到消息應答。
<br/>
當`var3=false`時,只會應答 tag=8 的消息, 5、6、7 這三個消息依然不會被確認收到消息應答。
<br/>
實現手動應答代碼演示如下:
**1. 工具類**
```java
public class RabbitMQUtils {
/**
* 連接RabbitMQ服務器
*/
public static Channel getChannel() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.0.107");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
```
```java
public class SleepUtils {
public static void sleep(int second) {
try {
Thread.sleep(1000 * second);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
```
**2. 生產者**
```java
public class Task02 {
private static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMQUtils.getChannel()) {
//聲明隊列
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
Scanner sc = new Scanner(System.in);
System.out.println("請輸入信息");
while (sc.hasNext()) {
String message = sc.nextLine();
//發送消息
channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("生產者發出消息:" + message);
}
}
}
}
```
**3. 兩個消費者**
在消費端調用手動應答 API 實現手動應答。
```java
public class Worker03 {
private static final String ACK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
System.out.println("C1 等待接收消息處理時間較短");
//消息消費的時候如何處理消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
SleepUtils.sleep(1);
System.out.println("接收到消息:" + message);
/**
* 消息應答。
* basicAck(long var1, boolean var3)
* var1: 消息標記tag
* var3: 是否批量應答未應答消息
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
/**
* basicConsume(String var1, boolean var2, DeliverCallback var3, CancelCallback var4)
* var1: 隊列名稱
* var2: 應答方式。true自動應答、false手動應答
*/
channel.basicConsume(ACK_QUEUE_NAME, false, deliverCallback, (consumerTag) -> {
System.out.println(consumerTag + "消費者取消消費接口回調邏輯");
});
}
}
```
```java
public class Worker04 {
private static final String ACK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
System.out.println("C2 等待接收消息處理時間較長");
//消息消費的時候如何處理消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
SleepUtils.sleep(30);
System.out.println("接收到消息:" + message);
/**
* 消息應答。
* basicAck(long var1, boolean var3)
* var1: 消息標記tag
* var3: 是否批量應答未應答消息
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
/**
* basicConsume(String var1, boolean var2, DeliverCallback var3, CancelCallback var4)
* var1: 隊列名稱
* var2: 應答方式。true自動應答、false手動應答
*/
channel.basicConsume(ACK_QUEUE_NAME, false, deliverCallback, (consumerTag) -> {
System.out.println(consumerTag + "消費者取消消費接口回調邏輯");
});
}
}
```
**4. 測試**
(1)啟動生產者 Task02,生產4條消息。

(2)啟動兩個消費者。
正常情況下兩個消費者按照輪詢機制分別獲取兩條消息。


<br/>
演示消息自動重新入隊:當生產者 Task02 發送消息 C4 的時候,立即把消費者 Worker04 停掉,就會發現本來應該由 Worker04 消費的消息 C4,被 Worker03 消費了,實現了消息自動重新入隊。


- 消息隊列
- 什么是MQ
- MQ的作用
- MQ的分類
- MQ的選擇
- RabbitMQ
- RabbitMQ是什么
- 四大核心概念
- 工作原理
- 環境搭建
- windows系統下的搭建
- centos7系統下的搭建
- 常用命令
- 服務相關命令
- 管理用戶命令
- 管理隊列命令
- 第一個RabbitMQ程序
- 工作隊列
- 輪詢分發消息
- 消息應答
- 持久化
- 發布確認
- 發布確認原理
- 發布確認策略
- 交換機概念
- 交換機類型
- 無名交換機
- Fanout交換機
- Direct交換機
- Topic交換機
- 死信隊列
- 死信概念
- 死信來源
- 死信實戰
- 延遲隊列
- 什么是延遲隊列
- TTL設置方式
- 隊列TTL延遲隊列
- 消息TTL延遲隊列
- 插件打造延遲隊列
- 延遲隊列總結
- 發布確認高級
- 代碼實現
- 回退消息
- 備份交換機
- 冪等性
- 冪等性概念
- 消息重復消費
- 消費端冪等性保障
- 優先級隊列
- 使用場景
- 設置優先級
- 惰性隊列
- 什么是惰性隊列
- 隊列的兩種模式
- 聲明惰性隊列
- RabbitMQ集群
- 為什么要搭建集群
- 集群搭建步驟
- 集群工作方式
- 脫離集群
- 鏡像隊列
- 高可用負載均衡