## 工作隊列
在工作中分配任務(競爭消費者模式)。
創建一個工作隊列,用于在多個工作人員之間分配耗時的任務。

#### 需求
工作隊列:工作隊列(又名:任務隊列)背后的主要思想是避免立即執行資源密集型任務,安排稍后完成任務。把一個任務封裝成一個消息并發送給一個隊列。在后臺運行的工作進程將彈出任務并最終執行作業。當你運行許多工人時,任務將在他們之間共享。
#### 舉例
把字符串中的點數作為它的復雜度。每一個點將占到“工作”的一秒鐘。例如,Hello ...描述的假任務 將需要三秒鐘。
修改Send.java代碼,以允許從命令行發送任意消息。
##### 生產者代碼
```
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
String message = getMessage(argv);
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
private static String getMessage(String[] argv) {
if (argv.length < 1)
return "Hello World!";
return joinStrings(argv, ".");
}
private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
```
##### 消費者代碼
```
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
try {
doWork(message);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" [x] Done");
}
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
//偽造每個點有一個任務要執行。它將處理交付的消息并執行任務.
private static void doWork(String task) throws InterruptedException {
for (char ch : task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
```
### 循環調度
使用任務隊列的優點之一是能夠輕松地平行工作。如果我們積壓工作,我們可以增加更多的工人,這樣可以輕松擴展。
我們嘗試同時運行兩個工作者實例。他們都會從隊列中得到消息。
**做法**
將消費者的main方法執行二次,得到二個執行中的運行任務。并開始執行生產者的main方法。結果:默認情況下,RabbitMQ將按順序將每條消息發送給下一個使用者。平均而言,每個消費者將獲得相同數量的消息。這種分發消息的方式稱為循環法(round-robin)。
### 消息確認
當消息發送出去,到達其中一個消費者時,如果恰巧這個時候,消費者死亡,那么就會導致該條消息不會被處理。
為了確保消息永不丟失,RabbitMQ支持 消息確認。消費者發回確認(告知)告訴RabbitMQ已經收到,處理了一個特定的消息,并且RabbitMQ可以自由刪除它。
如果消費者死亡(其通道關閉,連接關閉或TCP連接丟失),RabbitMQ將理解消息未被完全處理,并將重新排隊。如果有其他消費者同時上網,則會迅速重新發送給其他消費者。
手動消息確認默認打開。
`channel.basicConsume(QUEUE_NAME, true, consumer);`//默認為false為打開狀態
即:第二個參數為false時-》即使在處理消息的時候使用CTRL + C來殺死一個工作者,也不會丟失任何東西。工人死后不久,所有未確認的消息將被重新發送。
### 消息持久性
如果RabbitMQ服務器停止,我們的任務仍然會丟失。
當RabbitMQ退出或崩潰時,它會忘記隊列和消息,此時需要將隊列和消息標記為持久以確保消息不會丟失
#### 1.持久隊列
注意 生產者消費者的隊列定義都要設置為true,當生產者不同發送時,一旦啟動了消費者,就會自動的進行消息的消費。
即使RabbitMQ重新啟動,task_queue隊列也不會丟失
**send**
```
boolean durable = true ; //代表隊列持久化
channel.queueDeclare("task_queue",durable,假,假,空);
```
#### 2.消息持久性
現在我們需要將消息標記為持久的 - 通過將MessageProperties(實現BasicProperties)設置為值PERSISTENT_TEXT_PLAIN。
```
channel.basicPublish("","task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
```
### 公平派遣
當任務的難度不一致時,一個工人可能會一直做務比較重的任務,而另一個工人幾乎不會做什么事,RabbitMQ不知道任何關于這個,并將仍然均勻地發送消息。
發生這種情況是因為RabbitMQ只在消息進入隊列時調度消息。它沒有考慮消費者未確認消息的數量。它只是盲目地把第n條消息分發給第n個消費者。

#### 解決方案
我們可以使用basicQos方法`prefetchCount = 1`設置。
這告訴RabbitMQ一次不能給一個工作者多個消息。或者換句話說,不要向工作人員發送新消息,直到處理并確認了前一個消息。
```
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
int prefetchCount = 1 ;
channel.basicQos(prefetchCount);
```