通過設置每條消息的 TTL 來打造延遲隊列,本章通過 SpringBoot 演示下圖的延遲隊列架構。

<br/>
步驟如下:
**1. 創建SpringBoot項目**
```xml
<dependencies>
<!--RabbitMQ 依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--RabbitMQ 測試依賴-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
</dependencies>
```
**3. `resources/application.properties`**
```properties
spring.rabbitmq.host=192.168.0.107
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
```
**4. 配置類**
```java
@Configuration
public class MsgTtlQueueConfig {
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String QUEUE_C = "QC";
/**
* 隊列C綁定死信交換機Y
*/
@Bean("queueC")
public Queue queueB() {
Map<String, Object> args = new HashMap<>(16);
//聲明當前隊列綁定的死信交換機
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//聲明當前隊列的死信路由key
args.put("x-dead-letter-routing-key", "YD");
return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
}
/**
* 隊列C綁定X交換機
*/
@Bean
public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange) {
//with(routingKey)
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
}
```
**5. 生產者**
```java
@Slf4j
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/msg/ttl/send")
public void msgTtlSend(@RequestParam Map<String, String> params) {
String message = "time: " + params.get("time") + ", message: " + params.get("message");
//convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor processor)
rabbitTemplate.convertAndSend("X", "XC", message, processor -> {
//設置消息的TTL
processor.getMessageProperties().setExpiration(params.get("time"));
return processor;
});
}
}
```
**6. 消費者**
```java
@Slf4j
@Component
public class ConsumerService {
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody(), "UTF-8");
log.info(msg);
}
}
```
**7. 測試**
啟動項目后訪問 http://localhost:8080/msg/ttl/send?time=20000&message=C1 ,生產者將生產一條消息,該消息將在 20000ms 后被消費者接收到。
```
2022-10-27 22:18:09.908 INFO 3184 --- [ntConsumerService : time: 20000, message: C1
```
<br/>
如果在消息屬性上設置 TTL 的方式,消息可能并不會按時死亡,因為 RabbitMQ <mark>只會檢查第一個消息是否過期</mark>,如果過期則丟到死信隊列。<mark>如果第一個消息的延時時長很長,而第二個消息的延時時長很短,第二個消息并不會優先得到執行</mark>。如果想打造不存在這個限制的延遲隊列就需要 RabbitMQ 的[延遲插件](http://www.hmoore.net/king_om/x_1_mq/2483280)來支持了。
<br/>
如果將隊列 TTL 延遲隊列和消息 TTL 延遲隊列用在一個項目中,便可以優化為如下的架構。

- 消息隊列
- 什么是MQ
- MQ的作用
- MQ的分類
- MQ的選擇
- RabbitMQ
- RabbitMQ是什么
- 四大核心概念
- 工作原理
- 環境搭建
- windows系統下的搭建
- centos7系統下的搭建
- 常用命令
- 服務相關命令
- 管理用戶命令
- 管理隊列命令
- 第一個RabbitMQ程序
- 工作隊列
- 輪詢分發消息
- 消息應答
- 持久化
- 發布確認
- 發布確認原理
- 發布確認策略
- 交換機概念
- 交換機類型
- 無名交換機
- Fanout交換機
- Direct交換機
- Topic交換機
- 死信隊列
- 死信概念
- 死信來源
- 死信實戰
- 延遲隊列
- 什么是延遲隊列
- TTL設置方式
- 隊列TTL延遲隊列
- 消息TTL延遲隊列
- 插件打造延遲隊列
- 延遲隊列總結
- 發布確認高級
- 代碼實現
- 回退消息
- 備份交換機
- 冪等性
- 冪等性概念
- 消息重復消費
- 消費端冪等性保障
- 優先級隊列
- 使用場景
- 設置優先級
- 惰性隊列
- 什么是惰性隊列
- 隊列的兩種模式
- 聲明惰性隊列
- RabbitMQ集群
- 為什么要搭建集群
- 集群搭建步驟
- 集群工作方式
- 脫離集群
- 鏡像隊列
- 高可用負載均衡