如果不能實現在消息粒度上的 TTL,并使其在設置的 TTL 時間及時死亡,就無法設計成一個通用的延遲隊列。對此我們可以使用 rabbitmq_delayed_message_exchange 插件來實現一個通用的延遲隊列。
<br/>
步驟如下:
[TOC]
# 1. 安裝延遲隊列插件
**1. 下載:https://www.rabbitmq.com/community-plugins.html**


**2. 上傳插件到 RabbitMQ 的插件目錄**
```shell
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
```
**3. 運行下面的命令讓插件生效**
```shell
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
```

<br/>
# 2. 打造延遲隊列
:-: 
架構圖
delayed.exchange 是一種新的交換機類型,該類型消息支持延遲投遞機制,消息傳遞后并不會立即投遞到目標隊列中,而是存儲在 mnesia (一個分布式數據系統)表中,當達到投遞時間時,才投遞到目標隊列中。
<br/>
步驟如下:
**1. 創建SpringBoot項目**
```xml
<dependencies>
<!--RabbitMQ 依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--RabbitMQ 測試依賴-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
</dependencies>
```
**2. `resources/application.properties`**
```properties
spring.rabbitmq.host=192.168.0.107
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
```
**3. 配置類**
```java
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
/**
* 聲明隊列
*/
@Bean("delayedQueue")
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
/**
* 聲明延遲交換機
*/
@Bean("delayedExchange")
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>(16);
//延遲交換機類型。x-delayed-type是固定寫法,不可自定義
args.put("x-delayed-type", ExchangeTypes.DIRECT);
//x-delayed-message是固定寫法,不可自定義
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
/**
* 隊列與交換機綁定
*/
@Bean
public Binding binding(@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayedExchange") CustomExchange delayedExchange) {
return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
```
**4. 生產者**
```java
@Slf4j
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@GetMapping("/plugins/send")
public void pluginsSend() {
Map<Integer, String> data = new HashMap<>(16);
data.put(20000, "ttl為20000的消息");
data.put(2000, "ttl為2000的消息");
for (Map.Entry<Integer, String> entry : data.entrySet()) {
//convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor processor)
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, entry.getValue(), processor -> {
processor.getMessageProperties().setDelay(entry.getKey());
return processor;
});
log.info(entry.getValue());
}
}
}
```
**5. 消費者**
```java
@Slf4j
@Component
public class ConsumerService {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message) {
String msg = new String(message.getBody());
log.info(msg);
}
}
```
**6. 測試**
啟動項目后訪問 http://localhost:8080/plugins/send ,生產者生產了兩條消息,控制臺輸出如下。
```
2022-10-27 22:38:21.395 INFO 7604 --- [nr.ProducerController : ttl為20000的消息
2022-10-27 22:38:21.395 INFO 7604 --- [nr.ProducerController : ttl為2000的消息
2022-10-27 22:38:23.420 INFO 7604 --- [nservice.ConsumerService : ttl為2000的消息
2022-10-27 22:38:41.398 INFO 7604 --- [nservice.ConsumerService : ttl為20000的消息
```
第二個消息TTL時間短,所以先被消費掉,符合預期。
- 消息隊列
- 什么是MQ
- MQ的作用
- MQ的分類
- MQ的選擇
- RabbitMQ
- RabbitMQ是什么
- 四大核心概念
- 工作原理
- 環境搭建
- windows系統下的搭建
- centos7系統下的搭建
- 常用命令
- 服務相關命令
- 管理用戶命令
- 管理隊列命令
- 第一個RabbitMQ程序
- 工作隊列
- 輪詢分發消息
- 消息應答
- 持久化
- 發布確認
- 發布確認原理
- 發布確認策略
- 交換機概念
- 交換機類型
- 無名交換機
- Fanout交換機
- Direct交換機
- Topic交換機
- 死信隊列
- 死信概念
- 死信來源
- 死信實戰
- 延遲隊列
- 什么是延遲隊列
- TTL設置方式
- 隊列TTL延遲隊列
- 消息TTL延遲隊列
- 插件打造延遲隊列
- 延遲隊列總結
- 發布確認高級
- 代碼實現
- 回退消息
- 備份交換機
- 冪等性
- 冪等性概念
- 消息重復消費
- 消費端冪等性保障
- 優先級隊列
- 使用場景
- 設置優先級
- 惰性隊列
- 什么是惰性隊列
- 隊列的兩種模式
- 聲明惰性隊列
- RabbitMQ集群
- 為什么要搭建集群
- 集群搭建步驟
- 集群工作方式
- 脫離集群
- 鏡像隊列
- 高可用負載均衡