## 發布/訂閱
一次向許多消費者發送消息
### 任務
我們將建立一個簡單的日志系統。它將包含兩個程序 - 第一個將發送日志消息,第二個是接收并打印它們。
即:發布的日志消息將被廣播給所有的接收者。
### Exchanges(交換機)
RabbitMQ中消息傳遞模型的核心思想是**生產者永遠不會將任何消息直接發送到隊列中**。
代替的操作是:**producer**發送消息給**Exchanges**,
交換是一件非常簡單的事情。一方面它接收來自生產者的消息,另一方則推動他們排隊。交易所必須知道如何處理收到的消息。是否應該附加到特定的隊列?它應該附加到多少隊列中?還是應該丟棄。這些規則是由**交換類型**定義的 。

#### 交換類型
direct, topic, headers and fanout。
這里使用fanout舉例子:
創建一個這種類型的交換,并將其稱為日志:
```
channel.exchangeDeclare("logs","fanout");
```
交換非常簡單。只是將所有收到的消息廣播到它所知道的所有隊列中。這正是我們記錄器所需要的。
```
//第一個參數是交易所的名稱。空字符串表示默認或無名交換
channel.basicPublish("logs","",null,message.getBytes());
```
#### 臨時隊列
對于上面的日志,我們希望了解所有日志消息,而不僅僅是其中的一部分。也只對目前流動的消息感興趣,而不是舊消息。要解決這個問題,我們需要兩件事。
1. 首先,每當我們連接到mq,我們需要一個新的,空的隊列。要做到這一點,我們可以創建一個隨機名稱的隊列,或者,甚至更好 - 讓服務器為我們選擇一個隨機隊列名稱。
2. 其次,一旦我們斷開消費者,隊列應該被自動刪除。
在Java客戶端中,當我們不給queueDeclare()提供參數時,我們 用一個生成的名稱創建一個非持久的,獨占的自動刪除隊列:
用一個生成的名稱創建一個非持久的,獨占的自動刪除隊列(此時queueName包含一個隨機隊列名稱。例如,它可能看起來像amq.gen-JzTY20BRgKO-HjmUJj0wLg。)
`String queueName = channel.queueDeclare().getQueue();`
### 綁定

我們已經創建了一個**fanout**交換和一個隊列。現在我們需要告訴交換機將消息發送到我們的隊列。交換和隊列之間的關系被稱為綁定。
```
channel.queueBind(queueName, "logs", "");
//從現在起,日志交換將把消息附加到我們的隊列中
```
#### 列出綁定
`./rabbitmqctl list_bindings`

發出日志消息的生產者程序與前面的教程沒有什么不同。最重要的變化是,我們現在要發布消息到我們的日志交換,而不是無名的。發送時我們需要提供一個路由密鑰,但是對于fanout交換,它的**值將被忽略**。
### 代碼
**EmitLog.java**
```
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明一個交換
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "hello exchange";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
```
**ReceiveLogs**
```
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 隨機生成一個隊列
String queueName = channel.queueDeclare().getQueue();
// 把隊列和交換機綁定
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
```