案例代碼:https://gitee.com/flymini/codes01/tree/master/rabbitmq_/com-learn-rabbitmq01
****

將通過不同的死信來源演示上圖的消息消費過程。
[TOC]
# 1. 消息 TTL 過期
**1. 生產者**
```java
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMQUtils.getChannel()) {
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//設置消息的 TTL 時間為 10000 ms
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
//生產者生產10條消息
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
System.out.println(Producer.class.getSimpleName() + "[生產者發出消息]: " + message);
}
}
}
}
```
**2. 兩個消費者**
(1)C1消費者
```java
public class Consumer01 {
//普通交換機
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交換機
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
//聲明普通交換機和死信交換機
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//聲明死信隊列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信隊列綁定死信交換機
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
//正常隊列綁定死信隊列信息
Map<String, Object> params = new HashMap<>(16);
//正常隊列設置死信交換機。x-dead-letter-exchange是固定值,不可以隨便寫
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常隊列設置死信routing-key。x-dead-letter-routing-key是固定值,不可以隨便寫
params.put("x-dead-letter-routing-key", "lisi");
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println(Consumer01.class.getSimpleName() + "[等待接收消息...]");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(Consumer01.class.getSimpleName() + "[接收到的消息]: " + message);
};
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
});
}
}
```
(2)C2消費者
```java
public class Consumer02 {
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println(Consumer02.class.getSimpleName() + "[等待接收死信隊列消息...]");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(Consumer02.class.getSimpleName() + "[接收到死信隊列的消息]: " + message);
};
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
});
}
}
```
**3. 測試**
(1)先啟動C1消費者,然后將其關閉。
(2)啟動生產者生產10條消息,就得到如下信息。

(3)啟動C2消費者,C2消費了死信隊列的消息。
```
-----------Consumer02消費者收到了死信隊列的消息-----------
Consumer02[接收到死信隊列的消息]: info1
Consumer02[接收到死信隊列的消息]: info2
Consumer02[接收到死信隊列的消息]: info3
Consumer02[接收到死信隊列的消息]: info4
Consumer02[接收到死信隊列的消息]: info5
Consumer02[接收到死信隊列的消息]: info6
Consumer02[接收到死信隊列的消息]: info7
Consumer02[接收到死信隊列的消息]: info8
Consumer02[接收到死信隊列的消息]: info9
Consumer02[接收到死信隊列的消息]: info10
```
<br/>
# 2. 隊列達到最大長度
**1. 生產者**
```java
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMQUtils.getChannel()) {
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//生產者生產10條消息
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
System.out.println(Producer.class.getSimpleName() + "[生產者發出消息]: " + message);
}
}
}
}
```
**2. 兩個消費者**
(1)C1消費者
```java
public class Consumer01 {
//普通交換機名稱
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交換機名稱
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
//聲明死信和普通交換機
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//聲明死信隊列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信隊列綁定死信交換機
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
//正常隊列綁定死信隊列信息
Map<String, Object> params = new HashMap<>(16);
//正常隊列設置死信交換機。x-dead-letter-exchange是固定寫法,不可以自定義
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常隊列設置死信routing-key。x-dead-letter-routing-key是固定寫法,不可以自定義
params.put("x-dead-letter-routing-key", "lisi");
//設置正常隊列長度為6
params.put("x-max-length", 6);
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println(Consumer01.class.getSimpleName() + "[等待接收消息...]");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(Consumer01.class.getSimpleName() + "[接收到的消息]: " + message);
};
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
});
}
}
```
(2)C2消費者
```java
public class Consumer02 {
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println(Consumer02.class.getSimpleName() + "[等待接收死信隊列消息...]");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(Consumer02.class.getSimpleName() + "[接收到死信隊列的消息]: " + message);
};
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
});
}
}
```
**3. 測試**
(1)刪除隊列。
```shell
[root@localhost ~]# rabbitmqctl delete_queue dead-queue
[root@localhost ~]# rabbitmqctl delete_queue normal-queue
```
(2)先啟動C1消費者,然后將其關閉。
(3)啟動生產者生產10條消息,就得到如下信息。

可以看到因為在C1中設置隊列最大長度為6(`params.put("x-max-length", 6)`),所以正常隊列只能存放6條消息,超過6條消息的被存儲到了死信隊列中。
(4)啟動C2消費者,C2消費了死信隊列的消息。
```
-----------Consumer02消費者收到了死信隊列的消息-----------
Consumer02[接收到死信隊列的消息]: info1
Consumer02[接收到死信隊列的消息]: info2
Consumer02[接收到死信隊列的消息]: info3
Consumer02[接收到死信隊列的消息]: info4
```
(5)啟動C1消費者,消費正常隊列中的6條消息。
```
-----------Consumer01消費者收到了死信隊列的消息-----------
Consumer01[接收到的消息]: info5
Consumer01[接收到的消息]: info6
Consumer01[接收到的消息]: info7
Consumer01[接收到的消息]: info8
Consumer01[接收到的消息]: info9
Consumer01[接收到的消息]: info10
```
<br/>
# 3. 消息被拒
消息被拒只需要在消費端調用下面的方法即可。
```java
channel.basicReject(long var1, boolean var3)
```
**1. 生產者**
```java
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMQUtils.getChannel()) {
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//生產者生產10條消息
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
System.out.println(Producer.class.getSimpleName() + "[生產者發出消息]: " + message);
}
}
}
}
```
**2. 兩個消費者**
(1)C1消費者
```java
public class Consumer01 {
//普通交換機名稱
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交換機名稱
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
//聲明死信和普通交換機
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//聲明死信隊列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信隊列綁定死信交換機與 routingkey
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
//正常隊列綁定死信隊列信息
Map<String, Object> params = new HashMap<>();
//正常隊列設置死信交換機。x-dead-letter-exchange是固定寫法,不可以隨便定義
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常隊列設置死信。x-dead-letter-routing-key是固定寫法,不可以隨便定義。
params.put("x-dead-letter-routing-key", "lisi");
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println(Consumer01.class.getSimpleName() + "[等待接收消息...]");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
if (message.equals("info5")) {
System.out.println(Consumer01.class.getSimpleName() + "[接收到的消息]: " + message + ", 但拒絕簽收該消息.");
//拒絕消費info5消息
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
} else {
System.out.println(Consumer01.class.getSimpleName() + "[接收到的消息]: " + message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {
});
}
}
```
(2)C2消費者
```java
public class Consumer02 {
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println(Consumer02.class.getSimpleName() + "[等待接收死信隊列消息...]");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(Consumer02.class.getSimpleName() + "[接收到死信隊列的消息]: " + message);
};
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
});
}
}
```
**3. 測試**
(1)刪除隊列。
```shell
[root@localhost ~]# rabbitmqctl delete_queue dead-queue
[root@localhost ~]# rabbitmqctl delete_queue normal-queue
```
(2)啟動C1消費者,C2消費者,最后啟動生產者。
```
-----------Producer生產者生產了10條消息-----------
Producer[生產者發出消息]: info1
Producer[生產者發出消息]: info2
Producer[生產者發出消息]: info3
Producer[生產者發出消息]: info4
Producer[生產者發出消息]: info5
Producer[生產者發出消息]: info6
Producer[生產者發出消息]: info7
Producer[生產者發出消息]: info8
Producer[生產者發出消息]: info9
Producer[生產者發出消息]: info10
-----------Consumer01消費者拒絕簽收info5消息-----------
Consumer01[接收到的消息]: info1
Consumer01[接收到的消息]: info2
Consumer01[接收到的消息]: info3
Consumer01[接收到的消息]: info4
Consumer01[接收到的消息]: info5, 但拒絕簽收該消息.
Consumer01[接收到的消息]: info6
Consumer01[接收到的消息]: info7
Consumer01[接收到的消息]: info8
Consumer01[接收到的消息]: info9
Consumer01[接收到的消息]: info10
-----------Consumer02消費者收到了死信隊列的消息-----------
Consumer02[接收到死信隊列的消息]: info5
```
- 消息隊列
- 什么是MQ
- MQ的作用
- MQ的分類
- MQ的選擇
- RabbitMQ
- RabbitMQ是什么
- 四大核心概念
- 工作原理
- 環境搭建
- windows系統下的搭建
- centos7系統下的搭建
- 常用命令
- 服務相關命令
- 管理用戶命令
- 管理隊列命令
- 第一個RabbitMQ程序
- 工作隊列
- 輪詢分發消息
- 消息應答
- 持久化
- 發布確認
- 發布確認原理
- 發布確認策略
- 交換機概念
- 交換機類型
- 無名交換機
- Fanout交換機
- Direct交換機
- Topic交換機
- 死信隊列
- 死信概念
- 死信來源
- 死信實戰
- 延遲隊列
- 什么是延遲隊列
- TTL設置方式
- 隊列TTL延遲隊列
- 消息TTL延遲隊列
- 插件打造延遲隊列
- 延遲隊列總結
- 發布確認高級
- 代碼實現
- 回退消息
- 備份交換機
- 冪等性
- 冪等性概念
- 消息重復消費
- 消費端冪等性保障
- 優先級隊列
- 使用場景
- 設置優先級
- 惰性隊列
- 什么是惰性隊列
- 隊列的兩種模式
- 聲明惰性隊列
- RabbitMQ集群
- 為什么要搭建集群
- 集群搭建步驟
- 集群工作方式
- 脫離集群
- 鏡像隊列
- 高可用負載均衡