? ? ?上一篇博文中簡單介紹了一下RabbitMQ的基礎知識,并寫了一個經典語言入門程序——HelloWorld。本篇博文中我們將會創建一個工作隊列用來在工作者(consumer)間分發耗時任務。同樣是翻譯的[官網實例](http://www.rabbitmq.com/tutorials/tutorial-two-java.html)。
# 工作隊列

? ? ?在[前一篇博文](http://blog.csdn.net/xiaoxian8023/article/details/48679609)中,我們完成了一個簡單的對聲明的隊列進行發送和接受消息程序。下面我們將創建一個工作隊列,來向多個工作者(consumer)分發耗時任務。
? ? ?工作隊列(又名:任務隊列)的主要任務是為了避免立即做一個資源密集型的卻又必須等待完成的任務。相反的,我們進行任務調度:將任務封裝為消息并發給隊列。在后臺運行的工作者(consumer)將其取出,然后最終執行。當你運行多個工作者(consumer),隊列中的任務被工作進行共享執行。
? ? ?這樣的概念對于在一個HTTP短鏈接的請求窗口中處理復雜任務的web應用程序,是非常有用的。
# 準備
? ? ?使用Thread.Sleep()方法來模擬耗時。采用小數點的數量來表示任務的復雜性。每一個點將住哪用1s的“工作”。例如,Hello... 處理完需要3s的時間。
? ? ?發送端(生產者):NewTask.java
~~~
public class NewTask {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException {
/**
* 創建連接連接到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
// 設置MabbitMQ所在主機ip或者主機名
factory.setHost("127.0.0.1");
// 創建一個連接
Connection connection = factory.newConnection();
// 創建一個頻道
Channel channel = connection.createChannel();
// 指定一個隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 發送的消息
String message = "Hello World...";
// 往隊列中發出一條消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
// 關閉頻道和連接
channel.close();
connection.close();
}
}
~~~
? ? ?工作者(消費者)Worker.java
~~~
public class Worker {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws IOException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
// 打開連接和創建頻道,與發送端一樣
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明隊列,主要為了防止消息接收者先運行此程序,隊列還不存在時創建隊列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 創建隊列消費者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
System.out.println(" [x] Proccessing... at " +new Date().toLocaleString());
try {
for (char ch: message.toCharArray()) {
if (ch == '.') {
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
} finally {
System.out.println(" [x] Done! at " +new Date().toLocaleString());
}
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
~~~
? ? ?運行結果如下:

# 任務分發機制
? ? ?正主來了。。。下面開始介紹各種任務分發機制。
### Round-robin(輪詢分發)
? ? ?使用任務隊列的優點之一就是可以輕易的并行工作。如果我們積壓了好多工作,我們可以通過增加工作者(消費者)來解決這一問題,使得系統的伸縮性更加容易。
修改一下NewTask,使用for循環模擬多次發送消息的過程:
~~~
for (int i = 0; i < 5; i++) {
// 發送的消息
String message = "Hello World"+Strings.repeat(".", i);
// 往隊列中發出一條消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
~~~
? ? ?我們先啟動1個生產者實例,2個工作者實例,看一下如何執行:

? ? ?從上述的結果中,我們可以得知,在默認情況下,RabbitMQ將逐個發送消息到在序列中的下一個消費者(而不考慮每個任務的時長等等,且是提前一次性分配,并非一個一個分配)。平均每個消費者獲得相同數量的消息。這種方式分發消息機制稱為Round-Robin(輪詢)。
### Fair dispatch(公平分發)
? ? ?您可能已經注意到,任務分發仍然沒有完全按照我們想要的那樣。比如:現在有2個消費者,所有的奇數的消息都是繁忙的,而偶數則是輕松的。按照輪詢的方式,奇數的任務交給了第一個消費者,所以一直在忙個不停。偶數的任務交給另一個消費者,則立即完成任務,然后閑得不行。而RabbitMQ則是不了解這些的。
? ? ?這是因為當消息進入隊列,RabbitMQ就會分派消息。它不看消費者為應答的數目,只是盲目的將第n條消息發給第n個消費者。

? ? ?為了解決這個問題,我們使用basicQos(?prefetchCount = 1)方法,來限制RabbitMQ只發不超過1條的消息給同一個消費者。當消息處理完畢后,有了反饋,才會進行第二次發送。
~~~
int prefetchCount = 1;
channel.basicQos(prefetchCount);
~~~
? ? ?注:如果所有的工作者都處于繁忙狀態,你的隊列有可能被填充滿。你可能會觀察隊列的使用情況,然后增加工作者,或者使用別的什么策略。
? ? ?還有一點需要注意,使用公平分發,必須關閉自動應答,改為手動應答。這些內容會在下篇博文中講述。
? ? ?整體代碼如下:生產者NewTask.java
~~~
public class NewTask {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException {
/**
* 創建連接連接到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
// 設置MabbitMQ所在主機ip或者主機名
factory.setHost("127.0.0.1");
// 創建一個連接
Connection connection = factory.newConnection();
// 創建一個頻道
Channel channel = connection.createChannel();
// 指定一個隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
int prefetchCount = 1;
//限制發給同一個消費者不得超過1條消息
channel.basicQos(prefetchCount);
for (int i = 0; i < 5; i++) {
// 發送的消息
String message = "Hello World"+Strings.repeat(".",5-i)+(5-i);
// 往隊列中發出一條消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
// 關閉頻道和連接
channel.close();
connection.close();
}
}
~~~
? ? ?消費者Worker.java
~~~
public class Worker {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws IOException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
// 打開連接和創建頻道,與發送端一樣
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 聲明隊列,主要為了防止消息接收者先運行此程序,隊列還不存在時創建隊列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);//保證一次只分發一個
// 創建隊列消費者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
for (char ch: message.toCharArray()) {
if (ch == '.') {
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
} finally {
System.out.println(" [x] Done! at " +new Date().toLocaleString());
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
~~~
? ? ?運行結果如下:
