有了 mandatory 參數和回退消息,我們獲得了對無法投遞消息的感知能力,有機會在生產者的消息無法被投遞時發現并處理。
<br/>
但有時候,我們并不知道該如何處理這些無法路由的消息,最多打個日志,然后觸發報警,再來手動處理。而通過日志來處理這些無法路由的消息是很不優雅的做法,特別是當生產者所在的服務有多臺機器的時候,手動復制日志會更加麻煩而且容易出錯。而且設置 mandatory 參數會增加生產者的復雜性,需要添加處理這些被退回的消息的邏輯。如果既不想丟失消息,又不想增加生產者的復雜性,該怎么做呢?
<br/>
前面在設置死信隊列的文章中,我們提到,可以為隊列設置死信交換機來存儲那些處理失敗的消息,可是這些不可路由消息根本沒有機會進入到隊列,因此無法使用死信隊列來保存消息。
<br/>
在 RabbitMQ 中,有一種備份交換機的機制存在,可以很好的應對這個問題。什么是備份交換機呢?備份交換機可以理解為 RabbitMQ 中交換機的備胎,當我們為某一個交換機聲明一個對應的備份交換機時,就是為它創建一個備胎。
<br/>
當交換機接收到一條不可路由消息時,將會把這條消息轉發到備份交換機中,由備份交換機來進行轉發和處理,通常備份交換機的類型為 Fanout ,這樣就能把所有消息都投遞到與其綁定的隊列中,然后我們在備份交換機下綁定一個隊列,這樣所有那些原交換機無法被路由的消息,就會都進入這個隊列了。當然,我們還可以建立一個報警隊列,用獨立的消費者來進行監測和報警。
<br/>
:-: 
圖1:架構圖
演示圖1的消息過程步驟如下:
**1. 配置類**
```java
@Configuration
public class BackupConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
public static final String BACKUP_QUEUE_NAME = "backup.queue";
public static final String WARNING_QUEUE_NAME = "warning.queue";
public static final String KEY_1 = "key1";
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
/**
* backup.exchange交換機作為confirm.exchange交換機的備份交換機
*/
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
.durable(true)
//設置該交換機的備份交換機
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
return (DirectExchange) exchangeBuilder.build();
}
@Bean
public Binding confirmBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(KEY_1);
}
@Bean("backupExchange")
public FanoutExchange backupExchange() {
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
@Bean("warningQueue")
public Queue warningQueue() {
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
@Bean
public Binding warningBinding(@Qualifier("warningQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange backupExchange) {
return BindingBuilder.bind(queue).to(backupExchange);
}
@Bean("backQueue")
public Queue backQueue() {
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
@Bean
public Binding backupBinding(@Qualifier("backQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange backupExchange) {
return BindingBuilder.bind(queue).to(backupExchange);
}
}
}
```
**2. 報警消費者**
```java
@Slf4j
@Component
public class WarningConsumer {
public static final String WARNING_QUEUE_NAME = "warning.queue";
@RabbitListener(queues = WARNING_QUEUE_NAME)
public void receiveWarningMsg(Message message) {
String msg = new String(message.getBody());
log.info("報警發現不可路由消息:{}", msg);
}
}
```
**3. 測試**
(1)因為改變了 `confirm.exchange`的綁定屬性,所以先將該交換機刪除。

(2)啟動項目后訪問 http://localhost:8080/return/send 生產了兩條消息,控制臺輸出如下。
```
...] c.l.r.controller.ReturnController : 生產消息: message#key1
...] c.l.r.controller.ReturnController : 生產消息: message#key2
...] c.l.rabbitmq03.service.ConfirmConsumer : 收到消息: message#key1
...] c.l.rabbitmq03.service.WarningConsumer : 報警發現不可路由消息:message#key2
...] c.l.rabbitmq03.callback.CustomCallBack : 交換機已收到id=1的消息
...] c.l.rabbitmq03.callback.CustomCallBack : 交換機已收到id=2的消息
```
可見不可路由消息`message#key2`被轉發到備份交換機并被報警消費者消費了。
<br/>
>[info]如果 mandatory 參數與備份交換機一起使用則備份交換機優先級高。
- 消息隊列
- 什么是MQ
- MQ的作用
- MQ的分類
- MQ的選擇
- RabbitMQ
- RabbitMQ是什么
- 四大核心概念
- 工作原理
- 環境搭建
- windows系統下的搭建
- centos7系統下的搭建
- 常用命令
- 服務相關命令
- 管理用戶命令
- 管理隊列命令
- 第一個RabbitMQ程序
- 工作隊列
- 輪詢分發消息
- 消息應答
- 持久化
- 發布確認
- 發布確認原理
- 發布確認策略
- 交換機概念
- 交換機類型
- 無名交換機
- Fanout交換機
- Direct交換機
- Topic交換機
- 死信隊列
- 死信概念
- 死信來源
- 死信實戰
- 延遲隊列
- 什么是延遲隊列
- TTL設置方式
- 隊列TTL延遲隊列
- 消息TTL延遲隊列
- 插件打造延遲隊列
- 延遲隊列總結
- 發布確認高級
- 代碼實現
- 回退消息
- 備份交換機
- 冪等性
- 冪等性概念
- 消息重復消費
- 消費端冪等性保障
- 優先級隊列
- 使用場景
- 設置優先級
- 惰性隊列
- 什么是惰性隊列
- 隊列的兩種模式
- 聲明惰性隊列
- RabbitMQ集群
- 為什么要搭建集群
- 集群搭建步驟
- 集群工作方式
- 脫離集群
- 鏡像隊列
- 高可用負載均衡