? ? ?這個官網的[第二個例子](http://www.rabbitmq.com/tutorials/tutorial-two-java.html)中的消息應答和消息持久化部分。我把它摘出來作為單獨的一塊兒來分享。
# Message acknowledgment(消息應答)
? ? ?執行一個任務可能需要花費幾秒鐘,你可能會擔心如果一個消費者在執行任務過程中掛掉了。基于現在的代碼,一旦RabbitMQ將消息分發給了消費者,就會從內存中刪除。在這種情況下,如果殺死正在執行任務的消費者,會丟失正在處理的消息,也會丟失已經分發給這個消費者但尚未處理的消息。
? ? ?但是,我們不想丟失任何任務,如果有一個消費者掛掉了,那么我們應該將分發給它的任務交付給另一個消費者去處理。
? ? ?為了確保消息不會丟失,RabbitMQ支持消息應答。消費者發送一個消息應答,告訴RabbitMQ這個消息已經接收并且處理完畢了。RabbitMQ可以刪除它了。
? ? ?如果一個消費者掛掉卻沒有發送應答,RabbitMQ會理解為這個消息沒有處理完全,然后交給另一個消費者去重新處理。這樣,你就可以確認即使消費者偶爾掛掉也不會不丟失任何消息了。
? ? ?沒有任何消息超時限制;只有當消費者掛掉時,RabbitMQ才會重新投遞。即使處理一條消息會花費很長的時間。
? ? ?消息應答是**默認打開**的。我們明確地把它們關掉了(autoAck=true)。現在將應答打開,一旦我們完成任務,消費者會自動發送消息應答。
~~~
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
~~~
? ? ?修改一下Worker.java
~~~
channel.basicQos(1);//保證一次只分發一個
// 創建隊列消費者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
System.out.println(" [x] Proccessing... at " +new Date().toLocaleString());
try {
for (char ch: message.toCharArray()) {
if (ch == '.') {
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
} finally {
System.out.println(" [x] Done! at " +new Date().toLocaleString());
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
~~~
? ? ?我們還是運行1個生產者,2個消費者,在消息處理過程中,人為讓一個消費者掛掉,然后會看到剩下的任務都會被另外的消費者執行。
? ? ?運行結果如下:

? ? ?如果你關閉了自動消息應答,手動也未設置應答,這是一個很簡單錯誤,但是后果卻是極其嚴重的。消息在分發出去以后,得不到回應,所以不會在內存中刪除,結果RabbitMQ會越來越占用內存,最終的結果,你懂得。。。
# Message durability(消息持久化)
? ? ?我們已經了解了如何確保即使消費者死亡,任務也不會丟失。但是如果RabbitMQ服務器停止,我們的任務仍將失去!
? ? ?當RabbitMQ退出或者崩潰,將會丟失隊列和消息。除非你不要隊列和消息。兩件事兒必須保證消息不被丟失:我們必須把“隊列”和“消息”設為持久化。? ? ??
~~~
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
~~~
? ? ?盡管這行代碼是正確的,但他不會在我們當前的設置中起作用。因為我們已經定義了一個名叫hello的未持久化的隊列。RabbitMQ不允許使用不同的參數設定重新定義已經存在的隊列,并且會向嘗試如此做的程序返回一個錯誤。一個快速的解決方案——就是聲明一個不同名字的隊列,比如task_queue。
? ? ?(當然,我們也可以登錄到RabbitMQ的服務管理頁面,RabbitMQ默認的端口是5672,管理頁面默認端口是15672,頁面地址為:http://localhost:15672,使用是用戶名和密碼登錄。RabbitMQ的默認密碼和用戶名都是guest。點開“queue”那欄,可以看到隊列列表,點擊“hello”杜列,會展開隊列的詳細信息。把頁面拉到最后,有一項“Delete / purge”,點開,點擊“Delete”按鈕,就可以把隊列刪除掉了。 然后再運行代碼的時候,就會創建一個支持持久化的hello隊列。)
? ? ?上述的代碼需要在生產者和消費者都要作出同樣的修改。
? ? ?在這一點上我們確信task_queue的隊列不會丟失,即使RabbitMQ服務重啟。現在我們需要將消息標記為持久性的——通過設置 MessageProperties(實現BasicProperties)為PERSISTENT_TEXT_PLAIN。
? ? ?現在你可以啟動RabbitMQ服務器,執行一次生產者NewTask的程序,然后關閉服務器,再重新啟動服務器,運行消費者Work做下實驗。可以發現消費者依舊可以讀出消息來。說明在RabbitMQ服務器關閉后,消息和隊列信息都已經做了持久化。再次啟動后,會重新加載到服務器中,消費者運行后,就可以正常的從隊列中獲取消息了。