<mark>在僅開啟了生產者確認機制的情況下,交換機接收到消息后,會直接給消息生產者發送確認消息,如果發現該消息不可路由,那么消息會被直接丟棄,此時生產者是不知道消息被丟棄這個事件的</mark>。
<br/>
那么如何讓無法被路由的消息幫我想辦法處理一下?最起碼通知我一聲,我好自己處理啊。通過在生產者設置`mandatory` 參數可以在當消息傳遞過程中不可到達目的地時將消息返回給生產者。
```java
public class RabbitTemplate extends ... {
//mandatory參數
private Expression mandatoryExpression = new ValueExpression(false);
```
<br/>
**1. 生產者**
```java
@Slf4j
@RestController
public class ReturnController {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private CustomCallBack customCallBack;
/**
* @PostConstruct 注解標記的方法可以在項目啟動時時執行一次,這里在項目啟動時
* 注入CustomCallBack對象到RabbitTemplate中去
*/
@PostConstruct
private void init() {
rabbitTemplate.setConfirmCallback(customCallBack);
//setMandatory(boolean mandatory)
//mandatory=true: 交換機無法將消息進行路由時,會將該消息返回給生產者
//mandatory=false: 如果發現消息無法進行路由,則直接丟棄
rabbitTemplate.setMandatory(true);
//設置回退消息交給誰處理
rabbitTemplate.setReturnCallback(customCallBack);
}
@GetMapping("/return/send")
public void sendMessage() {
//指定消息id為1
CorrelationData corId = new CorrelationData("1");
//convertAndSend(String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData)
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, "key1", "message#key1", corId);
log.info("生產消息: {}", "message#key1");
CorrelationData corId2 = new CorrelationData("2");
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, "key2", "message#key2", corId2);
log.info("生產消息: {}", "message#key2");
}
}
```
**2. 回調對象**
```java
@Slf4j
@Component
public class CustomCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
/**
* 不論交換機是否收到消息該方法都會被調用
*
* @param corData 消息相關數據
* @param ack 交換機是否收到消息。true收到、false沒收到
* @param cause 收到或沒收到的理由
*/
@Override
public void confirm(CorrelationData corData, boolean ack, String cause) {
if (ack) {
log.info("交換機已收到id={}的消息", corData.getId());
} else {
log.info("交換機未收到id={}的消息,原因是:{}", corData.getId(), cause);
}
}
/**
* 當消息無法路由到交換機時被調用
*
* @param message 消息
* @param replyCode 消息退回代碼
* @param replyText 消息退回原因
* @param exchange 交換機名稱
* @param routingKey 路由key
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText
, String exchange, String routingKey) {
log.info("消息被退回 -> message={}, replyCode={}, replyText={}, exchange={}, routingKey={}"
, new String(message.getBody()), replyCode, replyText, exchange, routingKey);
}
}
```
**3. 測試**
啟動項目后訪問 http://localhost:8080/return/send 生產了兩條消息,控制臺輸出如下。
```
...] c.l.r.controller.ReturnController : 生產消息: message#key1
...] c.l.r.controller.ReturnController : 生產消息: message#key2
...] c.l.rabbitmq03.callback.CustomCallBack : 消息被退回 -> message=message#key2, replyCode=312, replyText=NO_ROUTE
, exchange=confirm.exchange, routingKey=key2
...] c.l.rabbitmq03.callback.CustomCallBack : 交換機已收到id=2的消息
...] c.l.rabbitmq03.service.ConfirmConsumer : 收到消息: message#key1
...] c.l.rabbitmq03.callback.CustomCallBack : 交換機已收到id=1的消息
```
可見消息 `message#key2` 因為沒有隊列與之綁定無法路由,所以被退回給了生產者。
- 消息隊列
- 什么是MQ
- MQ的作用
- MQ的分類
- MQ的選擇
- RabbitMQ
- RabbitMQ是什么
- 四大核心概念
- 工作原理
- 環境搭建
- windows系統下的搭建
- centos7系統下的搭建
- 常用命令
- 服務相關命令
- 管理用戶命令
- 管理隊列命令
- 第一個RabbitMQ程序
- 工作隊列
- 輪詢分發消息
- 消息應答
- 持久化
- 發布確認
- 發布確認原理
- 發布確認策略
- 交換機概念
- 交換機類型
- 無名交換機
- Fanout交換機
- Direct交換機
- Topic交換機
- 死信隊列
- 死信概念
- 死信來源
- 死信實戰
- 延遲隊列
- 什么是延遲隊列
- TTL設置方式
- 隊列TTL延遲隊列
- 消息TTL延遲隊列
- 插件打造延遲隊列
- 延遲隊列總結
- 發布確認高級
- 代碼實現
- 回退消息
- 備份交換機
- 冪等性
- 冪等性概念
- 消息重復消費
- 消費端冪等性保障
- 優先級隊列
- 使用場景
- 設置優先級
- 惰性隊列
- 什么是惰性隊列
- 隊列的兩種模式
- 聲明惰性隊列
- RabbitMQ集群
- 為什么要搭建集群
- 集群搭建步驟
- 集群工作方式
- 脫離集群
- 鏡像隊列
- 高可用負載均衡