[TOC]
# 1. 死信隊列
> 在發送異常或者網絡波動時,消息不能被處理,導致業務異常,所以要把這種異常消息發送到死信隊列處理這些異常消息,補救
## 1.1 死信和死信隊列
一、死信
> 當一條消息在隊列中出現以下三種情況的時候,該消息就會變成一條死信。
1. 消息被否定確認,使用`channel.basicNack`或`channel.basicReject`,并且此時`requeue`屬性被設置為`false`。
2. 消息在隊列的存活時間超過設置的TTL時間(隊列ttl-所有消息都生效,單條消息ttl)。
3. 消息隊列的消息數量已經超過最大隊列長度。
二、死信隊列
> **“死信”消息會被RabbitMQ進行特殊處理,如果配置了死信隊列信息,那么該消息將會被丟進死信隊列中,如果沒有配置,則該消息將會被丟棄。**
三、配置死信隊列
1. 配置業務隊列,綁定到業務交換機上
2. 為業務隊列配置死信交換機和路由key
3. 為死信交換機配置死信隊列
4. 交換機可以為任何類型【Direct、Fanout、Topic】
## 1.2 實例
### 1.2.1 配置
**1.手動確認消息**
```
spring:
rabbitmq:
host: localhost
password: guest
username: guest
listener:
type: simple
simple:
default-requeue-rejected: false
acknowledge-mode: manual
```
**2.配置交換機和隊列**
一個正常交換機綁定兩個業務隊列A和B(廣播模式);A和B綁定一個死信交換機,由不同的key綁定,
A隊列消費者產生一個死信消息
~~~
package com.tuna.mq.rabbitmq.config.deadletter;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";
public static final String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea";
public static final String BUSINESS_QUEUEB_NAME = "dead.letter.demo.simple.business.queueb";
public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange";
public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey";
public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queueb.routingkey";
public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea";
public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.demo.simple.deadletter.queueb";
// 聲明業務Exchange
@Bean("businessExchange")
public FanoutExchange businessExchange() {
return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
}
// 聲明死信Exchange
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 聲明業務隊列A
@Bean("businessQueueA")
public Queue businessQueueA() {
Map<String, Object> args = new HashMap<>(2);
//x-dead-letter-exchange 這里聲明當前隊列綁定的死信交換機
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//x-dead-letter-routing-key 這里聲明當前隊列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
}
// 聲明業務隊列B
@Bean("businessQueueB")
public Queue businessQueueB() {
Map<String, Object> args = new HashMap<>(2);
//x-dead-letter-exchange 這里聲明當前隊列綁定的死信交換機
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//x-dead-letter-routing-key 這里聲明當前隊列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build();
}
// 聲明死信隊列A
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA() {
return new Queue(DEAD_LETTER_QUEUEA_NAME);
}
// 聲明死信隊列B
@Bean("deadLetterQueueB")
public Queue deadLetterQueueB() {
return new Queue(DEAD_LETTER_QUEUEB_NAME);
}
// 聲明業務隊列A綁定關系
@Bean
public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
@Qualifier("businessExchange") FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
// 聲明業務隊列B綁定關系
@Bean
public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue,
@Qualifier("businessExchange") FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
// 聲明死信隊列A綁定關系
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
}
// 聲明死信隊列B綁定關系
@Bean
public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
}
}
~~~
### 1.2.2 消費者
1.正常消費者,Ack=false消息失敗,A隊列中的消息將會發到死信隊列
~~~
@Slf4j
@Component
public class BusinessMessageReceiver {
@RabbitListener(queues = BUSINESS_QUEUEA_NAME)
public void receiveA(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("收到業務消息A:{}", msg);
boolean ack = true;
Exception exception = null;
try {
if (msg.contains("deadletter")) {
throw new RuntimeException("dead letter exception");
}
} catch (Exception e) {
ack = false;
exception = e;
}
if (!ack) {
log.error("消息消費發生異常,error msg:{}", exception.getMessage(), exception);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} else {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
@RabbitListener(queues = BUSINESS_QUEUEB_NAME)
public void receiveB(Message message, Channel channel) throws IOException {
log.info("收到業務消息B:{}", new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
~~~
2.死信消費者
~~~
@Component
public class DeadLetterMessageReceiver {
@RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
public void receiveA(Message message, Channel channel) throws IOException {
System.out.println("收到死信消息A:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
public void receiveB(Message message, Channel channel) throws IOException {
System.out.println("收到死信消息B:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
~~~
### 1.2.3 生產者
~~~
@Component
public class BusinessMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(String msg) {
rabbitTemplate.convertSendAndReceive(BUSINESS_EXCHANGE_NAME, "", msg);
}
}
~~~
controller
~~~
@Autowired
private BusinessMessageSender sender;
@RequestMapping("sendmsg")
public void sendMsg(String msg) {
sender.sendMsg(msg);
}
~~~
1.發送正常消息
http://localhost:8080/mq/sendmsg?msg=msg
2.產生一個死信消息,如下A隊列產生一個死信消息,他對應的死信隊列會接收到這個消息
http://localhost:8080/mq/sendmsg?msg=deadletter
```
2022-06-13 15:09:43.350 INFO 3920 --- [ntContainer#1-1] c.t.m.r.s.d.BusinessMessageReceiver : 收到業務消息B:msg
2022-06-13 15:09:43.350 INFO 3920 --- [ntContainer#0-1] c.t.m.r.s.d.BusinessMessageReceiver : 收到業務消息A:msg
2022-06-13 15:12:23.739 INFO 3920 --- [nio-8080-exec-4] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-KnERgpHzUz4thORpw0BXIQ identity=566c4da9] started
2022-06-13 15:12:23.741 INFO 3920 --- [ntContainer#0-1] c.t.m.r.s.d.BusinessMessageReceiver : 收到業務消息A:deadletter
2022-06-13 15:12:23.741 INFO 3920 --- [ntContainer#1-1] c.t.m.r.s.d.BusinessMessageReceiver : 收到業務消息B:deadletter
2022-06-13 15:12:23.744 ERROR 3920 --- [ntContainer#0-1] c.t.m.r.s.d.BusinessMessageReceiver : 消息消費發生異常,error msg:dead letter exception
java.lang.RuntimeException: dead letter exception
at com.tuna.mq.rabbitmq.service.deadletter.BusinessMessageReceiver.receiveA(BusinessMessageReceiver.java:26)
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_311]
收到死信消息A:deadletter
```
# 2. 延時隊列
1. 先進先出的有序隊列
2. 用來存放需要在指定時間被處理的元素的隊列
## 2.1 場景
可用于延時性較低、數據量大的定時任務:多少時間之后,做...
1. 訂單在十分鐘之內未支付則自動取消。
2. 新創建的店鋪,如果在十天內都沒有上傳過商品,則自動發送消息提醒。
3. 賬單在一周內未支付,則自動結算。
4. 用戶注冊成功后,如果三天內沒有登陸則進行短信提醒。
5. 用戶發起退款,如果三天內沒有得到處理則通知相關運營人員。
6. 預定會議后,需要在預定的時間點前十分鐘通知各個與會人員參加會議。
7. 這些場景都有一個特點,需要在某個事件發生之后或者之前的指定時間點完成某一項任務,看起來似乎使用定時任務,一直輪詢數據,每秒查一次,取出需要被處理的數據,然后處理不就完事了嗎?
如果數據量比較少,確實可以這樣做,比如:對于“如果賬單一周內未支付則進行自動結算”這樣的需求,如果對于時間不是嚴格限制,而是寬松意義上的一周,那么每天晚上跑個定時任務檢查一下所有未支付的賬單,確實也是一個可行的方案。
但對于數據量比較大,并且時效性較強的場景,如:“訂單十分鐘內未支付則關閉“,短期內未支付的訂單數據可能會有很多,活動期間甚至會達到百萬甚至千萬級別,對這么龐大的數據量仍舊使用輪詢的方式顯然是不可取的,很可能在一秒內無法完成所
有訂單的檢查,同時會給數據庫帶來很大壓力,無法滿足業務要求而且性能低下。更重要的一點是,不!優!雅!沒錯,作為一名有追求的程序員,始終應該追求更優雅的架構和更優雅的代碼風格,寫代碼要像寫詩一樣優美。
## 2.2 實例
給消息設置了ttl,死信即為延時
1.生產者發送消息到交換機--->業務隊列
2.在ttl事件內沒有消費者讀取消息,消息進入死信(延時)隊列
3.延時隊列消費者消費延時消息
有三種實現方式:
一、隊列上設置超時,不投靈活
二、消息上設置超時,在超市時間點上不一定能發送超時消息
三、插件的方式
### 2.2.1 插件的方式
1. 聲明一個`x-delayed-message`類型的交換機
2. 發布帶`x-delay`(指定延期時間)消息頭的消息
3. 在超過制定時間后,消息發送到指定隊列,被綁定這個隊列的消費者消費
1.下載
打開官網下載:???http://www.rabbitmq.com/community-plugins.html? ,注意rabbitmq和插件的版本對應,3.x.x -3.x.x
2.將插件放入rabbitmq的plugins目錄
```
docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez 38ada6e58bba:/plugins
```
3.啟動插件
```
docker exec -it 38ada6e58bba /bin/bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
```
代碼示例
1.配置插件支持的交換機,并綁定隊列
~~~
@Configuration
public class DelayedRabbitMQConfig {
public static final String DELAYED_QUEUE_NAME = "delay.queue";
public static final String DELAYED_EXCHANGE_NAME = "delay.exchange";
public static final String DELAYED_ROUTING_KEY = "delay.routingkey";
@Bean
public Queue immediateQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
@Bean
public CustomExchange customExchange() {
Map<String, Object> args = new HashMap<>();
//指定低層交換機類型交換機類型
args.put("x-delayed-type", "direct");
//插件封裝的交換機
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
@Bean
public Binding bindingNotify(@Qualifier("immediateQueue") Queue queue,
@Qualifier("customExchange") CustomExchange customExchange) {
return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
~~~
args可以配置選項:
```
String name, 交換機名稱
String type, 交換機消息類型(x-delayed-message)
boolean durable, 是否持久化
boolean autoDelete,是否刪除
Map arguments
// 隊列中的消息什么時候會自動被刪除?
arguments.put("x-message-ttl",10000);
//設置過期時間
arguments.put("x-expires", 10000);
//x-expires用于當多長時間沒有消費者訪問該隊列的時候,該隊列會自動刪除,
//x-max-length:
arguments.put("x-max-length", 4);
用于指定隊列的長度,如果不指定,可以認為是無限長,例如指定隊列的長度是4,當超過4條消息,前面的消息將被刪除,給后面的消息騰位
```
2.生產者發送延時消息
~~~
@Slf4j
@Component
public class DelyMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendDelayMsg(String msg, Integer delayTime) {
log.info("dely sender send a message: {}",msg);
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a -> {
a.getMessageProperties().setDelay(delayTime);
return a;
});
}
}
~~~
3. 消費者
~~~
@Component
public class DelyMessageConsumer {
private static final Logger logger = LoggerFactory.getLogger(DelyMessageConsumer.class);
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveHeadersQueue(String message) {
logger.info("dely consumer received message: {}", new String(message));
}
}
~~~
4. controller
~~~
@RequestMapping("dely/{message}/{timeOut}")
public String dely(@PathVariable("message") String message,@PathVariable("timeOut") Integer timeOut) {
delyMessageSender.sendDelayMsg(message, timeOut);
return "success";
}
~~~
http://localhost:8080/mq/dely/hello/30000
輸出 時隔30秒
```
2022-07-04 09:40:34.033 INFO 6316 --- [io-8080-exec-10] c.t.m.r.service.dely.DelyMessageSender : dely sender send a message: hello
2022-07-04 09:41:04.036 INFO 6316 --- [ntContainer#2-1] c.t.m.r.s.dely.DelyMessageConsumer : dely consumer received message: hello
```