# 保證消息的可靠性傳輸
> 在消息傳輸的過程中,有三個環節可能會造成消息遺失,分別是**生產者寫入隊列**,**隊列處理消息**,**消費者讀取隊列**
## RabbitMQ
### 生產者弄丟了消息
* **事務**
數據發送失敗返回異常,生產者根據異常回滾事務并重試;數據發送成功時提交事務。
* **confirm機制**
生產者寫消息RabbitMQ都會分配一個唯一id,如果RabbitMQ成功接收消息會返回ack消息,如果RabbitMQ沒能處理這個消息,會回調生產者的nack接口,生產者可以在nack寫重試機制。
#### 小結
事務采用了同步發送消息的機制,而confirm采用了異步機制,擁有更大的吞吐量
### 隊列弄丟了消息
> 由于宕機等原因導致的隊列緩存數據丟失
通過開啟持久化,將緩存數據寫入到磁盤
分兩步開啟持久化
1. 設置Queue持久化,RabbitMQ只會持久化queue的元數據
2. 向queue寫入消息的時候將deliveryMode設置為2,將消息持久化。
消息被持久化到磁盤之前丟失
結合寫入消息的confirm機制,消息寫入queue并持久化磁盤成功,才通知生產者ack。如果RabbitMQ持久化過程中碰到了問題,回調生產者的nack接口。
#### 小結
通過持久化消息,再結合消息寫入的confirm機制,保證了隊列消息的可靠性
### 消費端弄丟了數據
> 消費者從RabbitMQ取到消息之后,還沒來得及進行業務處理,消費者的進程意外掛掉了,RabbitMQ會認為消息被消費了,但實際業務還沒有處理。
通過RabbitMQ提供的ack機制可以解決這個問題。關閉RabbitMQ的自動ack,消費者處理完成邏輯之后主動提交ack。
## Kafka
基本架構
由多個broker組成,每個topic被分隔成多個partition,每個partition被復制多份保存在多個broker中。每個系列的partition被抽象為一個replica,每個replica中包含一個leader,剩下的都是follower
### 生產者寫入消息失敗
生產者提交了消息,但是寫入隊列失敗。
Producer配置acks=all之后,會在寫入隊列成功之后向Producer發送ack消息。保證了[消息提交]&[消息寫入隊列]的操作原子性
### Kafka弄丟了消息
當leader所在的broker節點宕機之后,需要從follower中選舉出新的leader ,如果leader到follower的同步還沒完成,新的leader中的消息可能是不完整的。
通過以下四步保證replica成員的數據一致性
1. 設置topic,replication.factor > 1,表示每個partition設置的副本個數
2. 設置服務端Kafka,min.insync.replicas > 1,表示每個leader需要感知的follower個數
3. Producer設置acks=all,保證消息寫入所有replica之后,才返回ack
4. Producer設置retries=MAX,設置消息寫入失敗之后重試的次數
### 消費者弄丟了消息
初始的消息消費流程,消費者得到消息之后立即提交offset向服務器確認消息已消費,再執行業務代碼。可能發生消息已消費,但是進程掛掉了,來不及執行業務代碼的情況。
修改消息確認消費機制,等到業務代碼執行完成之后再向Kafka確認消息以消費。同時需要考慮業務代碼的冪等性,保證消息重復消費的問題