上一節,我們創建了一個工作隊列,work queue 會把message 平均分發給每一個worker,一個message只會分發給一個worker。
這一節,我們做些改變,我們把一個message分發給多個consumer。這種模式叫“publish/subscribe” 發布、訂閱模式。
為了闡明這種模式,我們建立一個日志系統。它包含兩個程序——一個發射日志信息,另一個接收并打印日志信息。
在這個日志系統中,每一個receiver程序都會獲得日志信息。因此我們可以設置一個receiver接收并保存日志信息到硬盤,另一個接收并打印日志信息。
也就是說,published log message 會廣播給所有的receiver (訂閱者)。
##Exchanges(交換機)
之前的教程里,我們的程序結構都是:
- A producer is a user application that sends messages.
- A queue is a buffer(緩存) that stores messages.
- A consumer is a user application that receives messages.
RabbitMQ的消息模型的核心思想是,生產者沒有直接向隊列發送任何消息。實際上,經常生產者甚至不知道一個消息將傳遞給任何隊列。事實上,Producer只能發送message給**exchange**。exchange 很簡單,一方面它從producers 接收messages,另一方面,它把messages 推送給queues。因此exchange要知道怎么處理接收到的message。是把message發給一個特定的隊列?還是發給多個隊列?或者丟棄?這個規則是由 exchange type 定義的。

exchange 有以下幾種:`direct`, `topic`, `headers` 和 `fanout`。這一節,我們主要使用最后一種——**fanout**。
下面我們定義個 名字叫logs,類型為fanout的exchange
```
channel.ExchangeDeclare("logs", "fanout");
```
fanout exchange很簡單。它會廣播所有收到的message傳遞給它知道的queue。
**listing exchanges 列出exchange**
我們可以使用下面的命令 服務器上的exchanges
```
$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
logs fanout
...done.
```
**Nameless(無名) exchange**
在之前章節的教程中,我們不知道exchange,但是我們仍然能夠把message傳遞給queue,這是因為我們使用了默認的exchange。
之前我們發布message,用的代碼如下所示:
```
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "hello",
basicProperties: null,
body: body);
```
第一參數代表exchange的名字,這里使用空字符串表示用默認的或nameless exchange。message會被發送給routingKey指定的隊列。
接下來,我們把它替換成發布到我們命名的exchange。
```
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
routingKey: "",
basicProperties: null,
body: body);
```
##Temporary(臨時) queues
之前我們使用的queue都有一個特定的名字(hello 或者 task_queue?)。當你想在生產者和消費者之間共享隊列,給queue命名是至關重要的。
但是 這種情況對于我們的日志系統是不適合的。我們想看到所有的日志信息,而不是僅僅是他的一個子集。我們也只對當前流動的感興趣而不是舊的消息。
為了解決這個問題我們需要做兩件事:
1、我們需要一個新的,空的隊列,不管什么時候我們連接到Rabbit。這就需要,我們每次連接rabbti都要創建一個名字隨機的隊列,或者讓服務器選擇一個名字隨機的隊列給我們。
2、一旦我們consumer斷開與queue的連接,queue應該自動刪除。
在.NET client 我們提供了一個無參的 `queueDeclare()`方法,使用它,我們可以創建一個 不持久化,名字唯一,自動刪除的隊列。
```
var queueName = channel.QueueDeclare().QueueName;
```
這里queueName是隨機生成的隊列的名字。例如amq.gen-JzTY20BRgKO-HjmUJj0wLg
##Bindings 綁定

我們已經創建了一個 fanout類型的exchange和所需的queue。現在,我們就需要告訴 exchange 發送messages 到我們指定的 queue。這里,exchange和queue的關系我們叫做binding(綁定)。
```
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");
```
>**Listing bindings**
你可以通過`rabbitmqctl list_bindings`命令,查看已經存在的bingding。
##Putting it all together

EmitLog.cs
```
using System;
using RabbitMQ.Client;
using System.Text;
class EmitLog
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
routingKey: "",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static string GetMessage(string[] args)
{
return ((args.Length > 0)
? string.Join(" ", args)
: "info: Hello World!");
}
}
```
ReceiveLogs.cs
```
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
class ReceiveLogs
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");
Console.WriteLine(" [*] Waiting for logs.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] {0}", message);
};
channel.BasicConsume(queue: queueName,
noAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
```
編譯上面的代碼
```
$ csc /r:"RabbitMQ.Client.dll" EmitLogs.cs
$ csc /r:"RabbitMQ.Client.dll" ReceiveLogs.cs
```
如果你想保存log到文件,
```
$ ReceiveLogs.exe > logs_from_rabbit.log
```
發送日志:
```
$ EmitLog.exe first log
```
使用 rabbitmqctl list_bindings命令 來查看我們創建的綁定。
```
$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
...done.
```