[TOC]
# 1. 什么是持久化
使用手動應答可以處理任務不丟失的情況,但是如何保障當 RabbitMQ 服務停掉以后生產者發送過來的消息不丟失。
<br/>
默認情況下 RabbitMQ 退出或由于某種原因崩潰時,它會忽視隊列和消息,除非告知它不要這樣做。確保消息不會丟失需要做兩件事:我們需要<mark>將隊列和消息都標記為持久化</mark>。
<br/>
# 2. 隊列持久化
之前我們創建的隊列都是非持久化的,rabbitmq 如果重啟該隊列就會被刪除掉,如果要隊列實現持久化,需要在聲明隊列的時候將`var2=true`。
```java
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
/**
* 聲明一個隊列。
* queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5)
* var1: 隊列名稱
* var2: 隊列里面的消息是否持久化。false(默認)不持久化、true持久化
* var3: 該隊列是否只供一個消費者進行消費,即消息是否進行共享。false共享、true共享
* var4: 是否自動刪除。即當最后一個消費者斷開連接后該隊列是否自動刪除。true自動刪除、false不刪除
* var5:其他參數
*/
channel.queueDeclare("queue.hello", true, false, false, null);
...
```
<br/>
>[info]注意:如果之前聲明的隊列不是持久化的,需要把原先隊列先刪除,或者重新創建一個持久化的隊列,不然就會報出下面的錯誤。

```shell
-- 刪除queueName隊列的命令
rabbitmqctl delete_queue queueName
```
<br/>
以下為控制臺中持久化與非持久化隊列的 UI 顯示區、這個時候即使重啟 rabbitmq 隊列也依然存在。

<br/>
# 3. 消息持久化
要想讓消息持久化需要在生產者添加屬性`MessageProperties.PERSISTENT_TEXT_PLAIN` 。
```java
/**
* 發送一個消息
* 1.發送到那個交換機
* 2.路由的 key 是哪個
* 3.其他的參數信息
* 4.發送消息的消息體
*/
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
```
將消息標記為持久化并不能完全保證不會丟失消息。盡管它告訴 RabbitMQ 將消息保存到磁盤,但是這里依然存在當消息剛準備存儲在磁盤的時候,但是還沒有存儲完,消息還在緩存的一個間隔點。此時并沒有真正寫入磁盤。
<br/>
持久性保證并不強,但是對于我們的簡單任務隊列而言,這已經綽綽有余了。如果需要更強有力的持久化策略,參考后邊的[發布確認](http://www.hmoore.net/king_om/x_1_mq/2483259)章節。
<br/>
# 4. 不公平分發
在最開始的時候我們學習到 RabbitMQ 分發消息采用的輪詢分發,但是在某種場景下這種策略并不是很好,比方說有兩個消費者在處理任務,其中有個消費者 1 處理任務的速度非常快,而另外一個消費者 2 處理速度卻很慢,這個時候我們還是采用輪訓分發的話就會讓處理速度快的這個消費者很大一部分時間處于空閑狀態,而處理慢的那個消費者一直在干活,這種分配方式在這種情況下其實就不太好,但是 RabbitMQ 并不知道這種情況它依然很公平的進行分發。
<br/>
為了避免這種情況,我們可以在消費者設置參數 `channel.basicQos(1)`。
```java
Channel channel = RabbitMQUtils.getChannel();
//消息消費的時候如何處理消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
channel.basicQos(1);
...
};
```
<br/>
消費者設置 `channel.basicQos(1)`后,可以看到 channel 的 Prefetch 的數量增加。


意思就是如果這個任務我還沒有處理完或者我還沒有應答你,你先別分配其它任務給我,我目前只能處理一個任務,然后 rabbitmq 就會把該任務分配給沒有那么忙的那個空閑消費者,當然如果所有的消費者都沒有完成手上任務,隊列還在不停的添加新任務,隊列有可能就會遇到隊列被撐滿的情況,這個時候就只能添加新的 worker 或者改變其他存儲任務的策略。
<br/>
# 5. 預取值
RabbitMQ 消息的發送就是異步發送的,所以在任何時候,channel 上肯定不止只有一個消息。另外來自消費者的手動確認本質上也是異步的。因此這里就存在一個未確認的消息緩沖區,因此希望開發人員能<mark>限制此緩沖區的大小,以避免緩沖區里面無限制的未確認消息問題<mark>。
<br/>
這個時候就可以在消費者通過使用 `channel.basicQos` 方法設置<mark>預取計數</mark>值來完成。<mark>該值定義通道上允許的未確認消息的最大數量</mark>。一旦數量達到配置的數量,RabbitMQ 將停止在通道上傳遞更多消息,除非至少有一個未處理的消息被確認,例如,假設在通道上有未確認的消息 5、6、7,8,并且通道的預取計數設置為 4,此時 RabbitMQ 將不會在該通道上再傳遞任何消息,除非至少有一個未應答的消息被 ack。比方說 tag=6 這個消息剛剛被確認 ACK,RabbitMQ 將會感知到這個情況并再發送一條消息。

<br/>
消息應答和 QoS 預取值對用戶吞吐量有重大影響。通常,增加預取將提高向消費者傳遞消息的速度。雖然<mark>自動應答傳輸消息速率是最佳的,但是,在這種情況下已傳遞但尚未處理的消息的數量也會增加,從而增加了消費者的 RAM 消耗</mark>(隨機存取存儲器)應該小心使用具有無限預處理的自動確認模式或手動確認模式,消費者消費了大量的消息如果沒有確認的話,會導致消費者連接節點的內存消耗變大,所以找到合適的預取值是一個反復試驗的過程,不同的負載該值取值也不同, 100 到 300 范圍內的值通常可提供最佳的吞吐量,并且不會給消費者帶來太大的風險。
<br/>
預取值為 1 是最保守的。當然這將使吞吐量變得很低,特別是消費者連接延遲很嚴重的情況下,特別是在消費者連接等待時間較長的環境中。對于大多數應用來說,稍微高一點的值將是最佳的。
- 消息隊列
- 什么是MQ
- MQ的作用
- MQ的分類
- MQ的選擇
- RabbitMQ
- RabbitMQ是什么
- 四大核心概念
- 工作原理
- 環境搭建
- windows系統下的搭建
- centos7系統下的搭建
- 常用命令
- 服務相關命令
- 管理用戶命令
- 管理隊列命令
- 第一個RabbitMQ程序
- 工作隊列
- 輪詢分發消息
- 消息應答
- 持久化
- 發布確認
- 發布確認原理
- 發布確認策略
- 交換機概念
- 交換機類型
- 無名交換機
- Fanout交換機
- Direct交換機
- Topic交換機
- 死信隊列
- 死信概念
- 死信來源
- 死信實戰
- 延遲隊列
- 什么是延遲隊列
- TTL設置方式
- 隊列TTL延遲隊列
- 消息TTL延遲隊列
- 插件打造延遲隊列
- 延遲隊列總結
- 發布確認高級
- 代碼實現
- 回退消息
- 備份交換機
- 冪等性
- 冪等性概念
- 消息重復消費
- 消費端冪等性保障
- 優先級隊列
- 使用場景
- 設置優先級
- 惰性隊列
- 什么是惰性隊列
- 隊列的兩種模式
- 聲明惰性隊列
- RabbitMQ集群
- 為什么要搭建集群
- 集群搭建步驟
- 集群工作方式
- 脫離集群
- 鏡像隊列
- 高可用負載均衡