Work Queues(工作隊列)
大體結構如下所示:

在第一節的教程里,我們創建了一個程序,發送和接收消息,從一個named queue(命名隊列 )。
本節,我們會創建一個 Work Queue(工作隊列),用來分發耗時任務給多個Workers(工人)。
使用Work Queues(別名:Task Queue)是為了避免立即做一個資源密集型任務,而不得不等待它完成。我們可以把這個耗時的任務封裝提取起來作為message,發送給一個queue。一個Worker 后臺進程會獲取task,然后執行他。當有多個Workers 時,他們平分這些task。
##Preparation 準備
上一節的教程,我們發送已一條包含“Hello World”的消息。這一節我們要發送一個耗時的任務。為了簡單起見,我們使用
`Thread.Sleep()`方法,
###發送端
為了簡單起見,我們修改上一節的`Send.cs`的代碼,從命令行獲取message的參數。重命名文件為“NewTask.cs”
```
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);
channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
```
其中 `GetMessage(string[] args)`方法定義如下:
```
private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
```
###接收端
接下來修改上衣節的`Receive.cs`文件,我們重命名為“Worker.cs”
```
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);
```
然后編譯代碼。
##Round-robin dispatching 循環調度
使用Task Queue的一個優點就是可以很容易的平均分配任務。如果queue里有堆積過多的任務,我們可以添加更多的Worker就行了。
接下來,我們運行兩個Worker.cs的實例,分別為C1和C2
C1
```
shell1$ Worker.exe
Worker
[*] Waiting for messages. To exit press CTRL+C
```
C2
```
shell2$ Worker.exe
Worker
[*] Waiting for messages. To exit press CTRL+C
```
在接下來我們運行 NewTask.exe 來發布任務
```
shell3$ NewTask.exe First message.
shell3$ NewTask.exe Second message..
shell3$ NewTask.exe Third message...
shell3$ NewTask.exe Fourth message....
shell3$ NewTask.exe Fifth message.....
```
我們會看到任務的分配情況
C1
```
shell1$ Worker.exe
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
[x] Received 'Third message...'
[x] Received 'Fifth message.....'
```
C2
```
shell2$ Worker.exe
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Second message..'
[x] Received 'Fourth message....'
```
默認,RabbitMQ會順序的,平均的把任務發給每個consumer,到最后每個Consumer會得到相同數量的任務。
##Message acknowledgment
執行一個耗時的任務,你可能會想知道任務的執行情況。是否有Consumer開始執行任務了?是否任務執行到一半死機了?
當前我們上面的代碼,一旦RabbitMQ分發message給Custoerm,它就會立刻從內存刪除。這種情況下,如果你關閉一個Worker,我們就會丟失他正在執行的消息。同樣,我們也會丟失之前分發給他,還沒有來的及執行的消息。
但是我們不想丟失任何 task。如果一個Worker死了,我們想把任務分發給其他的Worker。
為了確保message不丟失,RabbitMQ 提供了 message acknowledgments。Ack是consumer 發送給RabbitMQ的,告訴它,task 已經接受,并處理了,RabbitMQ 可以刪除它了。
如果一個consumer死機了(channel closed,connection closed or Tcp connection lost),沒有返回ack,RabbitMQ就會知道task 沒有處理完,該task就會重新排隊。如果這時候有另外一個Consumer在線,RabbitMQ 就會把它分發給他。
默認Message acknowLedgments 是打開的,之前的例子,我們是顯式的關閉了(設置 noAck=true)。
下面修改代碼,打開Message acknowLedgments。
```
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);
```
運行上面的代碼,如果我們kill 一個Worker,message 不會丟失,他會被分發給其他Worker。
> **Forgotten acknowledgment 遺失acknowledgment**
丟失BasicAck是很常見的錯誤,盡管這個錯很小,但后果很嚴重。當Client quit,Messages 會重新分發,但是RabbitMQ 由于不能釋放掉那些unacked message ,所以會消耗越來越多的內存。
為了 調試這種錯誤, 你可以使用`rabbitmqctl`來打印出 `messages_unacknowledged` 的message信息
> ```
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello 0 0
...done.
>```
##Message durability 持久化
通過上面的ACK配置,當consumer 死亡的時候,task 不會丟失。但是如果RabbitMQ服務停了,task 仍然會丟失。
這里我們就要持久化 task的信息了。
首先,我們確保RabbitMQ不會丟失我們的queue
```
channel.QueueDeclare(queue: "hello",
durable: true, //持久化
exclusive: false,
autoDelete: false,
arguments: null);
```
盡管我們定義名字叫hello 隊列要持久化,但是仍然不會生效。這是因為我們已經定義了一個沒有持久化的名字叫hello 隊列。RabbitMQ 不允許重新定義(用不同的參數)一個已經存在的隊列,會報錯。因此這里我們應該另外定義一個隊列,
例如 task_queue
```
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
```
>NOTE:queue 持久化的修改,producer 和consumer的代碼都要修改。
通過上面的代碼設置我們的queue,即使RabbitMQ重啟也不會丟失。
接下來,我們來持久化message。
```
var properties = channel.CreateBasicProperties();
properties.Persistent=true;
```
>NOTE:盡管我們設置message持久化了,但是這也不能完全保證message不會丟失。
>這是由于RabbitMQ保存message到硬盤是需要時間的,如果再此期間RabbitMQ服務掛了,message就丟失了。不過對于一般的程序已經足夠了。如果要一個更強壯的方案,你可是使用[publisher confirms](https://www.rabbitmq.com/confirms.html)
##Fair dispatch 公平調度
也許你已經主要到,上面代碼實現的message的調度不是你想要的。例如,假設有兩個Worker,所有的奇數的message都是耗時的操作,而偶數的message都是很簡單的。你會發現一個Worker很空閑,而另一個Woker累死累活的。然而RabbitMQ不知道,還是不停的給他發任務。
這個情況的發生,是由于RabbitMQ 不看 the number of unacknowledged message,只要message進入隊列就分發message。他只是盲目的分發message。

為了解決上面的問題,我們可以使用 `basicQos`方法 設置 `prefetchCount=1`。這個設置會告訴RabbitMQ 每次給Workder只分配一個task,只有當task執行完了,才分發下一個任務。
```
channel.BasicQos(0, 1, false);
```
>NOTE:注意queue的size
> 如果所有的Worker都busy,你的queue會填滿,因此你需要監測queue的情況,添加更多的worker 或者采用其他的策略
##演示
全部代碼
NewTask.cs
```
using System;
using RabbitMQ.Client;
using System.Text;
class NewTask
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);
channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
}
```
Worker.cs
```
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;
class Worker
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue",
noAck: false,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
```