通過設置隊列的 TTL 來打造延時隊列,本章通過 SpringBoot 演示下圖的延遲隊列架構。

創建兩個隊列 QA 和 QB,兩個隊列 TTL 分別設置為 10S 和 40S,然后在創建一個交換機 X 和死信交換機 Y,它們的類型都是 direct。再創建一個死信隊列 QD。
<br/>
步驟如下:
**1. 創建SpringBoot項目**
```xml
<dependencies>
<!--RabbitMQ 依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--RabbitMQ 測試依賴-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
</dependencies>
```
**2. `resources/application.properties`**
```properties
spring.rabbitmq.host=192.168.0.107
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
```
**3. 配置類**
```java
@Configuration
public class TtlQueueConfig {
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String DEAD_LETTER_QUEUE = "QD";
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
/**
* 隊列 A 綁定到對應的死信交換機
*/
@Bean("queueA")
public Queue queueA() {
Map<String, Object> args = new HashMap<>(16);
//聲明當前隊列綁定的死信交換機
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//聲明當前隊列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//聲明隊列的 TTL
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}
/**
* 隊列 A 綁定 X 交換機
*/
@Bean
public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange) {
// with(routingKey)
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
/**
* 隊列 B 綁定到對應的死信交換機
*/
@Bean("queueB")
public Queue queueB() {
Map<String, Object> args = new HashMap<>(3);
//聲明當前隊列綁定的死信交換機
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//聲明當前隊列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//聲明隊列的 TTL
args.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}
/**
* 隊列 B 綁定 X 交換機
*/
@Bean
public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
@Qualifier("xExchange") DirectExchange xExchange) {
// with(routingKey)
return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
}
@Bean("queueD")
public Queue queueD() {
return new Queue(DEAD_LETTER_QUEUE);
}
/**
* 死信隊列 QD 綁定死信交換機Y
*/
@Bean
public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange) {
// with(routingKey)
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
```
**4. 生產者**
```java
@Slf4j
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/ttl/send")
public void ttlSend() {
//convertAndSend(String exchange, String routingKey, Object object)
rabbitTemplate.convertAndSend("X", "XA", "ttl為10S隊列消息");
rabbitTemplate.convertAndSend("X", "XB", "ttl為40S隊列消息");
}
}
```
**5. 消費者**
```java
@Slf4j
@Component
public class ConsumerService {
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody(), "UTF-8");
log.info(msg);
}
}
```
**6. 測試**
啟動項目后訪問 http://localhost:8080/ttl/send 生產者將生產兩條消息。消費者消費的消息如下。
```
2022-10-27 21:58:26 : ttl為10S隊列消息
2022-10-27 21:58:56 : ttl為40S隊列消息
```
第一條消息在 10S 后變成了死信消息,然后被消費者消費掉。
第二條消息在 40S 之后變成了死信消息,然后被消費掉。
這樣一個延時隊列就打造完成了。
- 消息隊列
- 什么是MQ
- MQ的作用
- MQ的分類
- MQ的選擇
- RabbitMQ
- RabbitMQ是什么
- 四大核心概念
- 工作原理
- 環境搭建
- windows系統下的搭建
- centos7系統下的搭建
- 常用命令
- 服務相關命令
- 管理用戶命令
- 管理隊列命令
- 第一個RabbitMQ程序
- 工作隊列
- 輪詢分發消息
- 消息應答
- 持久化
- 發布確認
- 發布確認原理
- 發布確認策略
- 交換機概念
- 交換機類型
- 無名交換機
- Fanout交換機
- Direct交換機
- Topic交換機
- 死信隊列
- 死信概念
- 死信來源
- 死信實戰
- 延遲隊列
- 什么是延遲隊列
- TTL設置方式
- 隊列TTL延遲隊列
- 消息TTL延遲隊列
- 插件打造延遲隊列
- 延遲隊列總結
- 發布確認高級
- 代碼實現
- 回退消息
- 備份交換機
- 冪等性
- 冪等性概念
- 消息重復消費
- 消費端冪等性保障
- 優先級隊列
- 使用場景
- 設置優先級
- 惰性隊列
- 什么是惰性隊列
- 隊列的兩種模式
- 聲明惰性隊列
- RabbitMQ集群
- 為什么要搭建集群
- 集群搭建步驟
- 集群工作方式
- 脫離集群
- 鏡像隊列
- 高可用負載均衡