# **1. 分析數據丟失的原因**

可以看出,一條消息整個過程要經歷兩次的網絡傳輸:**從生產者發送到RabbitMQ服務器,從RabbitMQ服務器發送到消費者**。
**在消費者未消費前存儲在隊列(Queue)中**。
所以可以知道,有三個場景下是會發生消息丟失的:
* 存儲在隊列中,如果隊列沒有對消息持久化,RabbitMQ服務器宕機重啟會丟失數據。
* 生產者發送消息到RabbitMQ服務器過程中,RabbitMQ服務器如果宕機停止服務,消息會丟失。
* 消費者從RabbitMQ服務器獲取隊列中存儲的數據消費,但是消費者程序出錯或者宕機而沒有正確消費,導致數據丟失。
針對以上三種場景,RabbitMQ提供了三種解決的方式,分別是消息持久化,confirm機制,ACK事務機制。

# 2. 防丟失策略
## 2.1 持久化
為了防止rabbitmq故障,導致數據丟失,詳見消息持久化
## 2.2 生產者
**在生產者發送到RabbitMQ Server時有可能因為網絡問題導致投遞失敗,從而丟失數據**。我們可以使用confirm模式防止數據丟失。工作流程是怎么樣的呢,看以下圖解:

從上圖中可以看到是通過兩個回調函數**confirm()、returnedMessage()** 進行通知。
一條消息從生產者發送到RabbitMQ,首先會發送到Exchange,對應回調函數**confirm()**。第二步從Exchange路由分配到Queue中,對應回調函數則是**returnedMessage()**。
### 2.2.1 配置生產者消費回調
~~~text
spring:
rabbitmq:
publisher-confirms: true
# publisher-returns: true
template:
mandatory: true
# publisher-confirms:設置為true時。當消息投遞到Exchange后,會回調confirm()方法進行通知生產者
# publisher-returns:設置為true時。當消息匹配到Queue并且失敗時,會通過回調returnedMessage()方法返回消息
# spring.rabbitmq.template.mandatory: 設置為true時。指定消息在沒有被隊列接收時會通過回調returnedMessage()方法退回。
~~~
### 2.2.2 回調函數
~~~text
@Component
public class RabbitmqConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
private Logger logger = LoggerFactory.getLogger(RabbitmqConfirmCallback.class);
/**
* 監聽消息是否到達Exchange
*
* @param correlationData 包含消息的唯一標識的對象
* @param ack true 標識 ack,false 標識 nack
* @param cause nack 投遞失敗的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
logger.info("消息投遞成功~消息Id:{}", correlationData.getId());
} else {
logger.error("消息投遞失敗,Id:{},錯誤提示:{}", correlationData.getId(), cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.info("消息沒有路由到隊列,獲得返回的消息");
Map map = byteToObject(message.getBody(), Map.class);
logger.info("message body: {}", map == null ? "" : map.toString());
logger.info("replyCode: {}", replyCode);
logger.info("replyText: {}", replyText);
logger.info("exchange: {}", exchange);
logger.info("routingKey: {}", exchange);
logger.info("------------> end <------------");
}
@SuppressWarnings("unchecked")
private <T> T byteToObject(byte[] bytes, Class<T> clazz) {
T t;
try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bis)) {
t = (T) ois.readObject();
} catch (Exception e) {
e.printStackTrace();
return null;
}
return t;
}
}
~~~
### 2.2.3 生產者實現
~~~text
@Service
public class RabbitMQServiceImpl implements RabbitMQService {
@Resource
private RabbitmqConfirmCallback rabbitmqConfirmCallback;
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
//指定 ConfirmCallback
rabbitTemplate.setConfirmCallback(rabbitmqConfirmCallback);
//指定 ReturnCallback
rabbitTemplate.setReturnCallback(rabbitmqConfirmCallback);
}
@Override
public String sendMsg(String msg) throws Exception {
Map<String, Object> message = getMessage(msg);
try {
CorrelationData correlationData = (CorrelationData) message.remove("correlationData");
rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, message, correlationData);
return "ok";
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
private Map<String, Object> getMessage(String msg) {
String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
CorrelationData correlationData = new CorrelationData(msgId);
String sendTime = sdf.format(new Date());
Map<String, Object> map = new HashMap<>();
map.put("msgId", msgId);
map.put("sendTime", sendTime);
map.put("msg", msg);
map.put("correlationData", correlationData);
return map;
}
}
~~~
大功告成!接下來我們進行測試,發送一條消息,我們可以控制臺:

**假設發送一條信息沒有路由匹配到隊列,可以看到如下信息:**

## 2.3 消費端
消費者需要回復ack