:-: 
圖1:架構圖

圖2:確認機制
下面演示圖1的消費過程,圖2是發布確認的確認機制。
<br/>
步驟如下:
**1. 先創建一個SpringBoot項目**
```xml
<dependencies>
<!--RabbitMQ 依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--RabbitMQ 測試依賴-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
```
**2. `resources/application.properties`**
```properties
spring.rabbitmq.host=192.168.0.107
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.publisher-confirm-type=correlated
```
**`spring.rabbitmq.publisher-confirm-type`** 屬性的取值如下:
* `NONE`:禁用發布確認模式,是默認值。
* `CORRELATED`:發布消息成功到交換器后會觸發回調方法。
* `SIMPLE`:經測試有兩種效果,其一和 CORRELATED 值一樣會觸發回調方法;其二在發布消息成功后使用 rabbitTemplate 調用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 節點返回發送結果,根據返回結果來判定下一步的邏輯,要注意的點是 waitForConfirmsOrDie 方法如果返回 false 則會關閉 channel,則接下來無法發送消息到 broker。
**3. 配置類**
```java
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
public static final String KEY_1 = "key1";
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
/**
* 交換機與隊列綁定
*/
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(KEY_1);
}
}
```
**4. 生產者**
```java
@Slf4j
@RestController
public class ProducerController {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private CustomCallBack customCallBack;
/**
* @PostConstruct 注解標記的方法可以在項目啟動時時執行一次,這里在項目啟動時
* 注入CustomCallBack對象到RabbitTemplate中去
*/
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(customCallBack);
}
@GetMapping("/confirm/send")
public void confirmSend() {
//指定消息id為1
CorrelationData corId = new CorrelationData("1");
//convertAndSend(String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData)
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, "key1", "message#key1", corId);
log.info("生產消息: {}", "message#key1");
CorrelationData corId2 = new CorrelationData("2");
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, "key2", "message#key2", corId2);
log.info("生產消息: {}", "message#key2");
}
}
```
**5. 回調對象**
```java
@Slf4j
@Component
public class CustomCallBack implements RabbitTemplate.ConfirmCallback {
/**
* 不論交換機是否收到消息該方法都會被調用
*
* @param corData 消息相關數據
* @param ack 交換機是否收到消息。true收到、false沒收到
* @param cause 收到或沒收到的理由
*/
@Override
public void confirm(CorrelationData corData, boolean ack, String cause) {
if (ack) {
log.info("交換機已收到id={}的消息", corData.getId());
} else {
log.info("交換機未收到id={}的消息,原因是:{}", corData.getId(), cause);
}
}
}
```
**6. 消費者**
```java
@Slf4j
@Component
public class ConfirmConsumer {
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@RabbitListener(queues = CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message) {
String msg = new String(message.getBody());
log.info("收到消息: {}", msg);
}
}
```
**7. 測試**
啟動項目后訪問 http://localhost:8080/confirm/send 生產了兩條消息,控制臺輸出如下。
```
...] c.l.r.controller.ProducerController : 生產消息: message#key1
...] c.l.r.controller.ProducerController : 生產消息: message#key2
...] c.l.rabbitmq03.service.ConfirmConsumer : 收到消息: message#key1
...] c.l.rabbitmq03.callback.CustomCallBack : 交換機已收到id=2的消息
...] c.l.rabbitmq03.callback.CustomCallBack : 交換機已收到id=1的消息
```
可以看到,生產者發送了兩條消息,第一條消息的 RoutingKey 為 `key1`,第二條消息的 RoutingKey 為`key2`,兩條消息都成功被交換機接收,也收到了交換機的確認回調,但消費者只收到了一條消息,因為第二條消息的 RoutingKey 與隊列的 BindingKey 不一致,也沒有其它隊列能接收這個消息,所以第二條消息被直接丟棄了。
- 消息隊列
- 什么是MQ
- MQ的作用
- MQ的分類
- MQ的選擇
- RabbitMQ
- RabbitMQ是什么
- 四大核心概念
- 工作原理
- 環境搭建
- windows系統下的搭建
- centos7系統下的搭建
- 常用命令
- 服務相關命令
- 管理用戶命令
- 管理隊列命令
- 第一個RabbitMQ程序
- 工作隊列
- 輪詢分發消息
- 消息應答
- 持久化
- 發布確認
- 發布確認原理
- 發布確認策略
- 交換機概念
- 交換機類型
- 無名交換機
- Fanout交換機
- Direct交換機
- Topic交換機
- 死信隊列
- 死信概念
- 死信來源
- 死信實戰
- 延遲隊列
- 什么是延遲隊列
- TTL設置方式
- 隊列TTL延遲隊列
- 消息TTL延遲隊列
- 插件打造延遲隊列
- 延遲隊列總結
- 發布確認高級
- 代碼實現
- 回退消息
- 備份交換機
- 冪等性
- 冪等性概念
- 消息重復消費
- 消費端冪等性保障
- 優先級隊列
- 使用場景
- 設置優先級
- 惰性隊列
- 什么是惰性隊列
- 隊列的兩種模式
- 聲明惰性隊列
- RabbitMQ集群
- 為什么要搭建集群
- 集群搭建步驟
- 集群工作方式
- 脫離集群
- 鏡像隊列
- 高可用負載均衡