[TOC]
# 1. 消息可靠投遞
1. 生產者準備好需要投遞的消息。
2. 生產者與RabbitMQ服務器建立連接。
3. 生產者發送消息。
4. RabbitMQ服務器接收到消息,并將其路由到指定隊列。
5. RabbitMQ服務器發起回調,告知生產者消息發送成功。

**所謂可靠投遞,就是確保消息能夠百分百從生產者發送到服務器。**

為了避免爭議,補充說明一下,如果沒有設置Mandatory參數,是不需要先路由消息才發起回調的,服務器收到消息后就會進行回調確認。
2、3、5步都是通過TCP連接進行交互,有網絡調用的地方就會有事故,網絡波動隨時都有可能發生,不管是內部機房停電,還是外部光纜被切,網絡事故無法預測,雖然這些都是小概率事件,但對于訂單等敏感數據處理來說,這些情況下導致消息丟失都是不可接受的。
# 2. 實現可靠投遞
默認情況下,發送消息的操作是不會返回任何信息給生產者的,也就是說,默認情況下生產者是不知道消息有沒有正確地到達服務器。
那么如何解決這個問題呢?
對此,RabbitMQ中有一些相關的解決方案:
1. 使用事務機制來讓生產者感知消息被成功投遞到服務器。
2. 通過生產者確認機制實現。
在RabbitMQ中,所有確保消息可靠投遞的機制都會對性能產生一定影響,如使用不當,可能會對吞吐量造成重大影響,只有通過執行性能基準測試,才能在確定性能與可靠投遞之間的平衡。
在使用可靠投遞前,需要先思考以下問題:
1. 消息發布時,保證消息進入隊列的重要性有多高?
2. 如果消息無法進行路由,是否應該將該消息返回給發布者?
3. 如果消息無法被路由,是否應該將其發送到其他地方稍后再重新進行路由?
4. 如果RabbitMQ服務器崩潰了,是否可以接受消息丟失?
5. RabbitMQ在處理新消息時是否應該確認它已經為發布者執行了所有請求的路由和持久化?
6. 消息發布者是否可以批量投遞消息?
7. 在可靠投遞上是否有可以接受的平衡性?是否可以接受一部分的不可靠性來提升性能?
只考慮平衡性不考慮性能是不行的,至于這個平衡的度具體如何把握,就要具體情況具體分析了,比如像訂單數據這樣敏感的信息,對可靠性的要求自然要比一般的業務消息對可靠性的要求高的多,因為訂單數據是跟錢直接相關的,可能會導致直接的經濟損失。
## 2.1 RabbitMQ的事務機制
RabbitMQ是支持AMQP事務機制的,在生產者確認機制之前,事務是確保消息被成功投遞的唯一方法。
在SpringBoot項目中,使用RabbitMQ事務其實很簡單,只需要聲明一個事務管理的Bean,并將RabbitTemplate的事務設置為true即可。
配置文件如下:
RabbitMQ是支持AMQP事務機制的,在生產者確認機制之前,事務是確保消息被成功投遞的唯一方法。
在SpringBoot項目中,使用RabbitMQ事務其實很簡單,只需要聲明一個事務管理的Bean,并將RabbitTemplate的事務設置為true即可。
配置文件如下:
~~~
spring:
rabbitmq:
host: localhost
password: guest
username: guest
listener:
type: simple
simple:
default-requeue-rejected: false
acknowledge-mode: manual
~~~
先來配置一下交換機和隊列,以及事務管理器。
~~~
@Configuration
public class RabbitMQConfig {
public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.tx.demo.simple.business.exchange";
public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.tx.demo.simple.business.queue";
// 聲明業務Exchange
@Bean("businessExchange")
public FanoutExchange businessExchange(){
return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
}
// 聲明業務隊列
@Bean("businessQueue")
public Queue businessQueue(){
return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).build();
}
// 聲明業務隊列綁定關系
@Bean
public Binding businessBinding(@Qualifier("businessQueue") Queue queue,
@Qualifier("businessExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
/**
* 配置啟用rabbitmq事務
* @param connectionFactory
* @return
*/
@Bean
public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
}
~~~
然后創建一個消費者,來監聽消息,用以判斷消息是否成功發送。
~~~
@Slf4j
@Component
public class BusinessMsgConsumer {
@RabbitListener(queues = BUSINESS_QUEUEA_NAME)
public void receiveMsg(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("收到業務消息:{}", msg);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
~~~
然后是消息生產者:
~~~
@Slf4j
@Component
public class BusinessMsgProducer{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
private void init() {
rabbitTemplate.setChannelTransacted(true);
}
@Transactional
public void sendMsg(String msg) {
rabbitTemplate.convertAndSend(BUSINESS_EXCHANGE_NAME, "key", msg);
log.info("msg:{}", msg);
if (msg != null && msg.contains("exception"))
throw new RuntimeException("surprise!");
log.info("消息已發送 {}" ,msg);
}
}
~~~
這里有兩個注意的地方:
1. 在初始化方法里,通過使用`rabbitTemplate.setChannelTransacted(true);`來開啟事務。
2. 在發送消息的方法上加上`@Transactional`注解,這樣在該方法中發生異常時,消息將不會發送。
在controller中加一個接口來生產消息:
~~~
@RestController
public class BusinessController {
@Autowired
private BusinessMsgProducer producer;
@RequestMapping("send")
public void sendMsg(String msg){
producer.sendMsg(msg);
}
}
~~~
來驗證一下:
~~~
msg:1
消息已發送 1
收到業務消息:1
msg:2
消息已發送 2
收到業務消息:2
msg:3
消息已發送 3
收到業務消息:3
msg:exception
Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException: surprise!] with root cause
java.lang.RuntimeException: surprise!
at com.mfrank.rabbitmqdemo.producer.BusinessMsgProducer.sendMsg(BusinessMsgProducer.java:30)
...
~~~
當`msg`的值為`exception`時, 在調用`rabbitTemplate.convertAndSend`方法之后,程序拋出了異常,消息并沒有發送出去,而是被當前事務回滾了。
當然,你可以將事務管理器注釋掉,或者將初始化方法的開啟事務注釋掉,這樣事務就不會生效,即使在調用了發送消息方法之后,程序發生了異常,消息也會被正常發送和消費。
RabbitMQ中的事務使用起來雖然簡單,但是對性能的影響是不可忽視的,因為每次事務的提交都是阻塞式的等待服務器處理返回結果,而默認模式下,客戶端是不需要等待的,直接發送就完事了,除此之外,事務消息需要比普通消息多4次與服務器的交互,這就意味著會占用更多的處理時間,所以如果對消息處理速度有較高要求時,盡量不要采用事務機制。
## 2.2 RabbitMQ的生產者確認機制
1.發送確認:生產者-Broker
2.路由確認:交換機-隊列
以上兩個環節消息失敗,都可以回調生產者
### 2.2.1 投遞失敗回調(confirm)
通過生產者確認機制,生產者可以在消息被服務器成功接收時得到反饋,并有機會處理未被成功接收的消息。
在Springboot中開啟RabbitMQ的生產者確認模式也很簡單,只多了一行配置:
~~~
spring:
rabbitmq:
host: localhost
password: guest
username: guest
listener:
type: simple
simple:
default-requeue-rejected: false
acknowledge-mode: manual
publisher-confirms: true
~~~
`publisher-confirms: true`即表示開啟生產者確認模式。
然后將消息生產者的代表進行部分修改:
~~~
@Slf4j
@Component
public class BusinessMsgProducer implements RabbitTemplate.ConfirmCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
private void init() {
// rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.setConfirmCallback(this);
}
public void sendCustomMsg(String exchange, String msg) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData != null ? correlationData.getId() : "";
if (b) {
log.info("消息確認成功, id:{}", id);
} else {
log.error("消息未成功投遞, id:{}, cause:{}", id, s);
}
}
}
~~~
讓生產者繼承自`RabbitTemplate.ConfirmCallback`類,然后實現其`confirm`方法,即可用其接收服務器回調。
### 2.2.2 路由失敗回調(return機制)
消息發送到Exchange后,沒有找到綁定的隊列,投遞消息失敗才執行return消息確認機制
1. yml配置增加`spring.rabbitmq.publisher-returns=true`
2. 實現接口,當有消息路由失敗時,回調

## mandatory 參數
設置 mandatory 參數可以在當消息傳遞過程中不可達目的地時將消息返回給生產者。
當把 mandotory 參數設置為 true 時,如果交換機無法將消息進行路由時,會將該消息返回給生產者,而如果該參數設置為false,如果發現消息無法進行路由,則直接丟棄。
那么如何設置這個參數呢?在發送消息的時候,只需要在初始化方法添加一行代碼即可:
~~~
rabbitTemplate.setMandatory(true);
~~~
開啟之后我們再重新運行前面的代碼:
~~~
消息id:19729f33-15c4-4c1b-8d48-044c301e2a8e, msg:1
消息id:4aea5c57-3e71-4a7b-8a00-1595d2b568eb, msg:1
消息確認成功, id:19729f33-15c4-4c1b-8d48-044c301e2a8e
Returned message but no callback available
消息確認成功, id:4aea5c57-3e71-4a7b-8a00-1595d2b568eb
收到業務消息:1
~~~
我們看到中間多了一行提示`Returned message but no callback available`這是什么意思呢?
我們上面提到,設置 mandatory 參數后,如果消息無法被路由,則會返回給生產者,是通過回調的方式進行的,所以,生產者需要設置相應的回調函數才能接受該消息。
為了進行回調,我們需要實現一個接口`RabbitTemplate.ReturnCallback`。
~~~
@Slf4j
@Component
public class BusinessMsgProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
private void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(this);
}
public void sendCustomMsg(String exchange, String msg) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key2", msg, correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData != null ? correlationData.getId() : "";
if (b) {
log.info("消息確認成功, id:{}", id);
} else {
log.error("消息未成功投遞, id:{}, cause:{}", id, s);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息被服務器退回。msg:{}, replyCode:{}. replyText:{}, exchange:{}, routingKey :{}",
new String(message.getBody()), replyCode, replyText, exchange, routingKey);
}
}
~~~
然后我們再來重新運行一次:
~~~
消息id:2e5c336a-883a-474e-b40e-b6e3499088ef, msg:1
消息id:85c771cb-c88f-47dd-adea-f0da57138423, msg:1
消息確認成功, id:2e5c336a-883a-474e-b40e-b6e3499088ef
消息無法被路由,被服務器退回。msg:1, replyCode:312. replyText:NO_ROUTE, exchange:rabbitmq.tx.demo.simple.business.exchange, routingKey :key2
消息確認成功, id:85c771cb-c88f-47dd-adea-f0da57138423
收到業務消息:1
~~~
可以看到,我們接收到了被退回的消息,并帶上了消息被退回的原因:`NO_ROUTE`。但是要注意的是, mandatory 參數僅僅是在當消息無法被路由的時候,讓生產者可以感知到這一點,只要開啟了生產者確認機制,無論是否設置了 mandatory 參數,都會在交換機接收到消息時進行消息確認回調,而且通常消息的退回回調會在消息的確認回調之前。
## ***0***|***1*****備份交換機**
有了 mandatory 參數,我們獲得了對無法投遞消息的感知能力,有機會在生產者的消息無法被投遞時發現并處理。但有時候,我們并不知道該如何處理這些無法路由的消息,最多打個日志,然后觸發報警,再來手動處理。而通過日志來處理這些無法路由的消息是很不優雅的做法,特別是當生產者所在的服務有多臺機器的時候,手動復制日志會更加麻煩而且容易出錯。
而且設置 mandatory 參數會增加生產者的復雜性,需要添加處理這些被退回的消息的邏輯。如果既不想丟失消息,又不想增加生產者的復雜性,該怎么做呢?
前面在設置死信隊列的文章中,我們提到,可以為隊列設置死信交換機來存儲那些處理失敗的消息,可是這些不可路由消息根本沒有機會進入到隊列,因此無法使用死信隊列來保存消息。
不要慌,在 RabbitMQ 中,有一種備份交換機的機制存在,可以很好的應對這個問題。
什么是備份交換機呢?備份交換機可以理解為 RabbitMQ 中交換機的“備胎”,當我們為某一個交換機聲明一個對應的備份交換機時,就是為它創建一個備胎,當交換機接收到一條不可路由消息時,將會將這條消息轉發到備份交換機中,由備份交換機來進行轉發和處理,通常備份交換機的類型為 Fanout ,這樣就能把所有消息都投遞到與其綁定的隊列中,然后我們在備份交換機下綁定一個隊列,這樣所有那些原交換機無法被路由的消息,就會都進入這個隊列了。當然,我們還可以建立一個報警隊列,用獨立的消費者來進行監測和報警。
聽的不太明白?沒關系,看個圖就知道是怎么回事了。

接下來,我們就來設置一下備份交換機:
~~~
@Configuration
public class RabbitMQConfig {
public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.backup.test.exchange";
public static final String BUSINESS_QUEUE_NAME = "rabbitmq.backup.test.queue";
public static final String BUSINESS_BACKUP_EXCHANGE_NAME = "rabbitmq.backup.test.backup-exchange";
public static final String BUSINESS_BACKUP_QUEUE_NAME = "rabbitmq.backup.test.backup-queue";
public static final String BUSINESS_BACKUP_WARNING_QUEUE_NAME = "rabbitmq.backup.test.backup-warning-queue";
// 聲明業務 Exchange
@Bean("businessExchange")
public DirectExchange businessExchange(){
ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(BUSINESS_EXCHANGE_NAME)
.durable(true)
.withArgument("alternate-exchange", BUSINESS_BACKUP_EXCHANGE_NAME);
return (DirectExchange)exchangeBuilder.build();
}
// 聲明備份 Exchange
@Bean("backupExchange")
public FanoutExchange backupExchange(){
ExchangeBuilder exchangeBuilder = ExchangeBuilder.fanoutExchange(BUSINESS_BACKUP_EXCHANGE_NAME)
.durable(true);
return (FanoutExchange)exchangeBuilder.build();
}
// 聲明業務隊列
@Bean("businessQueue")
public Queue businessQueue(){
return QueueBuilder.durable(BUSINESS_QUEUE_NAME).build();
}
// 聲明業務隊列綁定關系
@Bean
public Binding businessBinding(@Qualifier("businessQueue") Queue queue,
@Qualifier("businessExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("key");
}
// 聲明備份隊列
@Bean("backupQueue")
public Queue backupQueue(){
return QueueBuilder.durable(BUSINESS_BACKUP_QUEUE_NAME).build();
}
// 聲明報警隊列
@Bean("warningQueue")
public Queue warningQueue(){
return QueueBuilder.durable(BUSINESS_BACKUP_WARNING_QUEUE_NAME).build();
}
// 聲明備份隊列綁定關系
@Bean
public Binding backupBinding(@Qualifier("backupQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
// 聲明備份報警隊列綁定關系
@Bean
public Binding backupWarningBinding(@Qualifier("warningQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
}
~~~
這里我們使用`ExchangeBuilder`來創建交換機,并為其設置備份交換機:
~~~
.withArgument("alternate-exchange", BUSINESS_BACKUP_EXCHANGE_NAME);
~~~
為業務交換機綁定了一個隊列,為備份交換機綁定了兩個隊列,一個用來存儲不可投遞消息,待之后人工處理,一個專門用來做報警用途。
接下來,分別為業務交換機和備份交換機創建消費者:
~~~
@Slf4j
@Component
public class BusinessMsgConsumer {
@RabbitListener(queues = BUSINESS_QUEUE_NAME)
public void receiveMsg(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("收到業務消息:{}", msg);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
~~~
~~~
@Slf4j
@Component
public class BusinessWaringConsumer {
@RabbitListener(queues = BUSINESS_BACKUP_WARNING_QUEUE_NAME)
public void receiveMsg(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.error("發現不可路由消息:{}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
~~~
接下來我們分別發送一條可路由消息和不可路由消息:
~~~
@Slf4j
@Component
public class BusinessMsgProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendCustomMsg(String exchange, String msg) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key2", msg, correlationData);
}
}
~~~
消息如下:
~~~
消息id:5c3a33c9-0764-4d1f-bf6a-a00d771dccb4, msg:1
消息id:42ac8c35-1d0a-4413-a1df-c26a85435354, msg:1
收到業務消息:1
發現不可路由消息:1
~~~
這里僅僅使用 error 日志配合日志系統進行報警,如果是敏感數據,可以使用郵件、釘釘、短信、電話等報警方式來提高時效性。
那么問題來了,mandatory 參數與備份交換機可以一起使用嗎?設置 mandatory 參數會讓交換機將不可路由消息退回給生產者,而備份交換機會讓交換機將不可路由消息轉發給它,那么如果兩者同時開啟,消息究竟何去何從??
emmm,想這么多干嘛,試試不就知道了。
修改一下生產者即可:
~~~
@Slf4j
@Component
public class BusinessMsgProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
private void init() {
// rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(this);
}
public void sendCustomMsg(String exchange, String msg) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key2", msg, correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData != null ? correlationData.getId() : "";
if (b) {
log.info("消息確認成功, id:{}", id);
} else {
log.error("消息未成功投遞, id:{}, cause:{}", id, s);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息被服務器退回。msg:{}, replyCode:{}. replyText:{}, exchange:{}, routingKey :{}",
new String(message.getBody()), replyCode, replyText, exchange, routingKey);
}
}
~~~
再來測試一下:
~~~
消息id:0a3eca1e-d937-418c-a7ce-bfb8ce25fdd4, msg:1
消息id:d8c9e010-e120-46da-a42e-1ba21026ff06, msg:1
消息確認成功, id:0a3eca1e-d937-418c-a7ce-bfb8ce25fdd4
消息確認成功, id:d8c9e010-e120-46da-a42e-1ba21026ff06
發現不可路由消息:1
收到業務消息:1
~~~
可以看到,兩條消息都可以收到確認成功回調,但是不可路由消息不會被回退給生產者,而是直接轉發給備份交換機。可見備份交換機的處理優先級更高。
## ***0***|***1*****總結**