RabbitMQ入門教程 For Java【2】 - Work Queues
提示:
我的開發環境:
操作系統: Windows7 64bit
開發環境: JDK 1.7 - 1.7.0_55
開發工具: Eclipse Kepler SR2
RabbitMQ版本: 3.6.0
Elang版本: erl7.2.1
關于Windows7下安裝RabbitMQ的教程請先在網上找一下,有空我再補安裝教程。
源碼地址
[https://github.com/chwshuang/rabbitmq.git](https://github.com/chwshuang/rabbitmq.git)
# 工作隊列

> 在使用此教程的時候,請記住,一定要將所有內容看一遍,特別是代碼片段說明部分,這是非常重要的基礎,如果你跳過這里直接將最后的源碼拿去運行查看,效果會大打折扣。基礎不牢固的情況下,后面學習就更難了。學習一定要靜下心、琢磨透。
在第一個教程中,我們編寫了一個程序來發送和接收來自一個指定隊列的消息。在這一篇,我們將創建一個工作隊列,將信息發送到多個消費者。這中分配方式主要場景是消費者需要根據消息中的內容進行業務邏輯處理,這種消息可以看成是一個任務指令,處理起來比較耗時,通過多個消費者來處理這些消息,來提高數據的吞吐能力。
工作隊列(即任務隊列)的主要思想是不用一直等待資源密集型的任務處理完成,這就像一個生產線,將半成品放到生產線中,然后在生產線后面安排多個工人同時對半成品進行處理,這樣比一個生產線對應一個工人的吞吐量大幾個數量級。
準備
在第一篇教程中,我們通過Hello World的例子,從生產者發送一條消息到RabbitMQ,然后消費者接收到這條消息并打印出來。這次我們模擬一個工廠流水線的場景,由工廠任務安排者(生產者P)向流水線(RabbitMQ的隊列hello)放入半成品,然后由多個工人(消費者C1和C2)從流水線獲取半成品進行處理。
我們先來看看工廠任務安排者的代碼,我們一共發送5條消息,然后給每個消息編號,看看消費者分別收到了那些消息:
~~~
for(int i = 0 ; i < 5; i++){
String message = "Hello World! " + i;
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
~~~
再來看看流水線上的工人處理半成品的函數,我們使用線程休眠模擬工作處理一條消息花費1秒鐘:
~~~
private static void doWork(String task) {
try {
Thread.sleep(1000); // 暫停1秒鐘
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
~~~
### 源碼
目錄結構:

工廠任務安排者(生產者P)NewTask.java:
~~~
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
/**
* @author hushuang
*
*/
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws java.io.IOException, Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
// 分發消息
for(int i = 0 ; i < 5; i++){
String message = "Hello World! " + i;
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
channel.close();
connection.close();
}
}
~~~
工人(消費者C1和C2)Worker1.java
~~~
import java.io.IOException;
public class Worker1 {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println("Worker1 [*] Waiting for messages. To exit press CTRL+C");
// 每次從隊列中獲取數量
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Worker1 [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println("Worker1 [x] Done");
// 消息處理完成確認
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 消息消費完成確認
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
}
private static void doWork(String task) {
try {
Thread.sleep(1000); // 暫停1秒鐘
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
~~~
工人(消費者C1和C2)Worker2.java
~~~
import com.rabbitmq.client.*;
import java.io.IOException;
public class Worker2 {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println("Worker2 [*] Waiting for messages. To exit press CTRL+C");
// 每次從隊列中獲取數量
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Worker2 [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println("Worker2 [x] Done");
// 消息處理完成確認
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 消息消費完成確認
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
}
/**
* 任務處理
*
* @param task
* void
*/
private static void doWork(String task) {
try {
Thread.sleep(1000); // 暫停1秒鐘
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
~~~
### 消息輪詢分發
#### **啟動RabbitMQ 服務器**
在RabbitMQ Server\rabbitmq_server-3.6.0\sbin目錄中,我們雙擊rabbitmq-server.bat,啟動RabbitMQ ,Window下會彈出一個窗口,看到下面Starting broker…的信息就說明啟動成功了:(關于RabbitMQ 在Windows7下的安裝參考這里)

#### **啟動工人(消費者)**
然后在eclipse中,啟動Worker1.java 和Worker2.java,可以看到Worker的啟動日志:

~~~
Worker1 [*] Waiting for messages. To exit press CTRL+C
Worker2 [*] Waiting for messages. To exit press CTRL+C
~~~
#### **啟動工廠任務安排者(生產者)**
生產者啟動后打印的日志:
~~~
[x] Sent 'Hello World! 0'
[x] Sent 'Hello World! 1'
[x] Sent 'Hello World! 2'
[x] Sent 'Hello World! 3'
[x] Sent 'Hello World! 4'
~~~
Worker1日志輸入結果:
~~~
Worker1 [x] Received 'Hello World! 1'
Worker1 [x] Done
Worker1 [x] Received 'Hello World! 3'
Worker1 [x] Done
Worker1 [x] Received 'Hello World! 4'
Worker1 [x] Done
~~~
Worker2日志輸入結果:
~~~
Worker2 [x] Received 'Hello World! 0'
Worker2 [x] Done
Worker2 [x] Received 'Hello World! 2'
Worker2 [x] Done
~~~
### 消息確認
如果處理一條消息需要幾秒鐘的時間,你可能會想,如果在處理消息的過程中,消費者服務器、網絡、網卡出現故障掛了,那可能這條正在處理的消息或者任務就沒有完成,就會失去這個消息和任務。
為了確保消息或者任務不會丟失,RabbitMQ支持消息確認–ACK。ACK機制是消費者端從RabbitMQ收到消息并處理完成后,反饋給RabbitMQ,RabbitMQ收到反饋后才將此消息從隊列中刪除。如果一個消費者在處理消息時掛掉(網絡不穩定、服務器異常、網站故障等原因導致頻道、連接關閉或者TCP連接丟失等),那么他就不會有ACK反饋,RabbitMQ會認為這個消息沒有正常消費,會將此消息重新放入隊列中。如果有其他消費者同時在線,RabbitMQ會立即將這個消息推送給這個在線的消費者。這種機制保證了在消費者服務器故障的時候,能不丟失任何消息和任務。
如果RabbitMQ向消費者發送消息時,消費者服務器掛了,消息也不會有超時;即使一個消息需要非常長的時間處理,也不會導致消息超時。這樣消息永遠不會從RabbitMQ服務器中刪除。只有當消費者正確的發送ACK確認反饋,RabbitMQ確認收到后,消息才會從RabbitMQ服務器的數據中刪除。
消息的ACK確認機制默認是打開的。在上面的代碼中,我們顯示返回autoAck=true 這個標簽。
看看下面的代碼,即使你在發送消息過程中,停掉一個消費者,消費者沒有通過ACK反饋確認的消息,很快會被退回。
~~~
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
~~~
> 忘記確認
忘記通過basicAck返回確認信息是常見的錯誤。這個錯誤非常嚴重,將導致消費者客戶端退出或者關閉后,消息會被退回RabbitMQ服務器,這會使RabbitMQ服務器內存爆滿,而且RabbitMQ也不會主動刪除這些被退回的消息。
如果要監控這種錯誤,可以使用rabbitmqctl messages_unacknowledged命令打印出出相關的信息。
~~~
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello 0 0
...done.
~~~
### 消息持久化
通過上一節我們已經知道如何確保消費者掛掉的情況下,任務不會消失。但是如果RabbitMQ服務器掛了呢?
如果你不告訴RabbitMQ,當RabbitMQ服務器掛了,她可能就丟失所有隊列中的消息和任務。如果你想讓RabbitMQ記住她當前的狀態和內容,就需要通過2件事來確保消息和任務不會丟失。
第一件事,在隊列聲明時,告訴RabbitMQ,這個隊列需要持久化:
~~~
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
~~~
上面的這個方法是正確的,當在我們的例子中也無法持久化!因為已經定義的隊列,再次定義是無效的,這就是冪次原理。RabbitMQ不允許重新定義一個已有的隊列信息,也就是說不允許修改已經存在的隊列的參數。如果你非要這樣做,只會返回異常。
咋整?
一個快速有效的方法就是重新聲明另一個名稱的隊列,不過這需要修改生產者和消費者的代碼,所以,在開發時,最好是將隊列名稱放到配置文件中。
這時,即使RabbitMQ服務器重啟,新隊列中的消息也不會丟失。
下面我們來看看新消息發送的代碼:
~~~
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
~~~
> 關于消息持久化的說明
標記為持久化后的消息也不能完全保證不會丟失。雖然已經告訴RabbitMQ消息要保存到磁盤上,但是理論上,RabbitMQ已經接收到生產者的消息,但是還沒有來得及保存到磁盤上,服務器就掛了(比如機房斷電),那么重啟后,RabbitMQ中的這條未及時保存的消息就會丟失。因為RabbitMQ不做實時立即的磁盤同步(fsync)。這種情況下,對于持久化要求不是特別高的簡單任務隊列來說,還是可以滿足的。如果需要更強大的保證,那么你可以考慮使用生產者確認反饋機制。
### 負載均衡
默認情況下,RabbitMQ將隊列消息隨機分配給每個消費者,這時可能出現消息調度不均衡的問題。例如有兩臺消費者服務器,一個服務器可能非常繁忙,消息不斷,另外一個卻很悠閑,沒有什么負載。RabbitMQ不會主動介入這些情況,還是會隨機調度消息到每臺服務器。
這是因為RabbitMQ此時只負責調度消息,不會根據ACK的反饋機制來分析那臺服務器返回反饋慢,是不是處理不過來啊?
就像下面這個圖:

為了解決這個問題,我們可以使用【prefetchcount = 1】這個設置。這個設置告訴RabbitMQ,不要一次將多個消息發送給一個消費者。這樣做的好處是只有當消費者處理完成當前消息并反饋后,才會收到另外一條消息或任務。這樣就避免了負載不均衡的事情了。
~~~
int prefetchCount = 1;
channel.basicQos(prefetchCount);
~~~
> 關于隊列大小的說明
你必選注意:如果所有的消費者負載都很高,你的隊列很可能會被塞滿。這時你需要增加更多的消費者或者其他方案。
想了解更多關于 Channel 方法和 MessageProperties 的信息,請瀏覽以下相關的文檔:
[javadocs 在線文檔.](http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/)
現在,咱們可以進入第三章的教程了。
本教程所有文章:
[RabbitMQ入門教程 For Java【1】 - Hello World - 你好世界!](http://blog.csdn.net/chwshuang/article/details/50521708)
[RabbitMQ入門教程 For Java【2】 - Work Queues - 工作隊列](http://blog.csdn.net/chwshuang/article/details/50506284)
[RabbitMQ入門教程 For Java【3】 - Publish/Subscribe - 發布/訂閱](http://blog.csdn.net/chwshuang/article/details/50512057)
[RabbitMQ入門教程 For Java【4】 - Routing - 消息路由](http://blog.csdn.net/chwshuang/article/details/50505060)
[RabbitMQ入門教程 For Java【5】 - Topic - 模糊匹配](http://blog.csdn.net/chwshuang/article/details/50516904)
[RabbitMQ入門教程 For Java【6】 - Remote procedure call (RPC) - 遠程調用](http://blog.csdn.net/chwshuang/article/details/50518570)
### 提示
由于本教程中rabbitmq是在本機安裝,使用的是默認端口(5672)。
如果你的例子運行中的主機、端口不同,請進行必要設置,否則可能無法運行。
### 獲得幫助
如果你閱讀這個教程有障礙,可以通過GitHub項目成員找到開發者的郵件地址聯系他們。
~~~
https://github.com/orgs/rabbitmq/people
~~~