<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                # RabbitMQ入門教程 For Java【3】 - Publish/Subscribe 我的開發環境: 操作系統: Windows7 64bit 開發環境: JDK 1.7 - 1.7.0_55 開發工具: Eclipse Kepler SR2 RabbitMQ版本: 3.6.0 Elang版本: erl7.2.1 關于Windows7下安裝RabbitMQ的教程請先在網上找一下,有空我再補安裝教程。 源碼地址 [https://github.com/chwshuang/rabbitmq.git](https://github.com/chwshuang/rabbitmq.git) ~~~ 在上一章中,我們學習創建了一個消息隊列,她的每個任務消息只發送給一個工人。這一章,我們會將同一個任務消息發送給多個工人。這種模式就是“發布/訂閱”。 ~~~ 為了說明這種模式,我們將以一個日志系統進行講解:一個日志發送者,兩個日志接收者,接收者1可以把這條日志寫入到磁盤上,另外一個接收者2可以將這條日志打印到控制臺中。 “發布/訂閱”模式的基礎是將消息廣播到所有的接收器上。 ### 交換器 在之前的教程中,我們都是直接在消息隊列中進行發送和接收消息,現在開始要介紹RabbitMQ完整的消息模型了。 首先,我們先來回顧一下之前學到關于RabbitMQ的內容: - 生產者是發送消息的應用程序 - 隊列是存儲消息的緩沖區 - 消費者是接收消息的應用程序 實際上,RabbitMQ中消息傳遞模型的核心思想是:生產者不直接發送消息到隊列。實際的運行環境中,生產者是不知道消息會發送到那個隊列上,她只會將消息發送到一個交換器,交換器也像一個生產線,她一邊接收生產者發來的消息,另外一邊則根據交換規則,將消息放到隊列中。交換器必須知道她所接收的消息是什么?它應該被放到那個隊列中?它應該被添加到多個隊列嗎?還是應該丟棄?這些規則都是按照交換器的規則來確定的。 ![這里寫圖片描述](https://box.kancloud.cn/2016-02-18_56c53cbd5232a.jpg "") #### **交換器的規則有:** - direct (直連) - topic (主題) - headers (標題) - fanout (分發)也有翻譯為扇出的。 我們將使用【fanout】類型創建一個名稱為 logs的交換器, ~~~ channel.exchangeDeclare("logs", "fanout"); ~~~ 分發交換器很簡單,你通過名稱也能想到,她是廣播所有的消息, > 交換器列表 通過rabbitmqctl list_exchanges指令列出服務器上所有可用的交換器 ~~~ $ sudo rabbitmqctl list_exchanges Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic logs fanout ...done. ~~~ > 這個列表里面所有以【amq.*】開頭的交換器都是RabbitMQ默認創建的。在生產環境中,可以自己定義。 > **匿名交換器** 在之前的教程中,我們知道,發送消息到隊列時根本沒有使用交換器,但是消息也能發送到隊列。這是因為RabbitMQ選擇了一個空“”字符串的默認交換器。 來看看我們之前的代碼: ~~~ channel.basicPublish("", "hello", null, message.getBytes()); ~~~ > 第一個參數就是交換器的名稱。如果輸入“”空字符串,表示使用默認的匿名交換器。 第二個參數是【routingKey】路由線索 **匿名交換器規則:** 發送到routingKey名稱對應的隊列。 現在,我們可以發送消息到交換器中: ~~~ channel.basicPublish( "logs", "", null, message.getBytes()); ~~~ ### 臨時隊列 記得前兩章中使用的隊列指定的名稱嗎?(Hello World和task_queue). 如果要在生產者和消費者之間創建一個新的隊列,又不想使用原來的隊列,臨時隊列就是為這個場景而生的: 1. 首先,每當我們連接到RabbitMQ,我們需要一個新的空隊列,我們可以用一個隨機名稱來創建,或者說讓服務器選擇一個隨機隊列名稱給我們。 1. 一旦我們斷開消費者,隊列應該立即被刪除。 在Java客戶端,提供queuedeclare()為我們創建一個非持久化、獨立、自動刪除的隊列名稱。 ~~~ String queueName = channel.queueDeclare().getQueue(); ~~~ 通過上面的代碼就能獲取到一個隨機隊列名稱。 例如:它可能是:amq.gen-jzty20brgko-hjmujj0wlg。 ### 綁定 ![這里寫圖片描述](https://box.kancloud.cn/2016-02-18_56c53cbd60cec.jpg "") 如果我們已經創建了一個分發交換器和隊列,現在我們就可以就將我們的隊列跟交換器進行綁定。 ~~~ channel.queueBind(queueName, "logs", ""); ~~~ 執行完這段代碼后,日志交換器會將消息添加到我們的隊列中。 > **綁定列表** 如果要查看綁定列表,可以執行【rabbitmqctl list_bindings】命令 ### 全部代碼 ![這里寫圖片描述](https://box.kancloud.cn/2016-02-18_56c53cbd74d47.jpg "") ### 目錄 ![這里寫圖片描述](https://box.kancloud.cn/2016-03-01_56d507d7b6df7.jpg "") 生產者程序,他負責發送日志消息,與之前不同的是它不是將消息發送到匿名交換器中,而是發送到一個名為【logs】的交換器中。我們提供一個空字符串的routingkey,它的功能被交換器的分發類型代替了。下面是EmitLog.java的代碼: ~~~ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 分發消息 for(int i = 0 ; i < 5; i++){ String message = "Hello World! " + i; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } channel.close(); connection.close(); } } ~~~ 上面的代碼中,在建立連接后,我們聲明了一個交互。如果當前沒有隊列被綁定到交換器,消息將被丟棄,因為沒有消費者監聽,這條消息將被丟棄。 下面的代碼是接收日志ReceiveLogs1.java 和ReceiveLogs2.java: ~~~ import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogs1 { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } } ~~~ ~~~ import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogs1 { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } } ~~~ ### 運行 先運行ReceiveLogs1和ReceiveLogs2可以看到日志: ~~~ [*] Waiting for messages. To exit press CTRL+C ~~~ 然后運行EmitLog: ~~~ EmitLog日志: [x] Sent 'Hello World! 0' [x] Sent 'Hello World! 1' [x] Sent 'Hello World! 2' [x] Sent 'Hello World! 3' [x] Sent 'Hello World! 4' ReceiveLogs1和ReceiveLogs2日志 [*] Waiting for messages. To exit press CTRL+C [x] Received 'Hello World! 0' [x] Received 'Hello World! 1' [x] Received 'Hello World! 2' [x] Received 'Hello World! 3' [x] Received 'Hello World! 4' ~~~ 看到這里,說明我們的程序運行正常,消費者通過聲明【logs】交換器和【fanout】類型,接收到了來自【logs】交換器的所有消息。 使用【rabbitmqctl list_bindings】命令可以看到兩個臨時隊列的名稱 ~~~ $ sudo rabbitmqctl list_bindings Listing bindings ... logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] ...done. ~~~ 以上就是這一章講的發布/訂閱模式,下一章將介紹消息路由(Routing) 本教程所有文章: [RabbitMQ入門教程 For Java【1】 - Hello World - 你好世界!](http://blog.csdn.net/chwshuang/article/details/50521708) [RabbitMQ入門教程 For Java【2】 - Work Queues - 工作隊列](http://blog.csdn.net/chwshuang/article/details/50506284) [RabbitMQ入門教程 For Java【3】 - Publish/Subscribe - 發布/訂閱](http://blog.csdn.net/chwshuang/article/details/50512057) [RabbitMQ入門教程 For Java【4】 - Routing - 消息路由](http://blog.csdn.net/chwshuang/article/details/50505060) [RabbitMQ入門教程 For Java【5】 - Topic - 模糊匹配](http://blog.csdn.net/chwshuang/article/details/50516904) [RabbitMQ入門教程 For Java【6】 - Remote procedure call (RPC) - 遠程調用](http://blog.csdn.net/chwshuang/article/details/50518570)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看