<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                為了說明這種模式,我們將建立一個簡單的日志系統。這個系統將由兩個程序組成,第一個將發出日志消息,第二個將接收并處理日志消息。在我們的日志系統中,每一個運行的接收程序的副本都會收到日志消息。 ## 交換器(Exchanges) 在本教程的前面部分,我們主要介紹了發送者和接受者,發送者發送消息給 RabbitMQ 隊列,接收者從 RabbitMQ 隊列獲取消息。在這一篇教程,我們會引入交換器,展示 RabbitMQ 的完整的消息模型。 讓我們快速了解在前面的教程中介紹的內容: * 生產者是發送消息的用戶應用程序。 * 隊列是存儲消息的緩沖區。 * 消費者是接收消息的用戶應用程序。 RabbitMQ 消息模型的核心思想是,生產者不直接發送任何消息給隊列。實際上,一般的情況下,生產者甚至不知道消息應該發送到哪些隊列。 相反的,生產者只能將信息發送到交換器。交換器是非常簡單的。它一邊收到來自生產者的消息,另一邊將它們推送到隊列。交換器必須準確知道接收到的消息如何處理。是否被添加到一個特定的隊列嗎?是否應該追加到多個隊列?或者是否應該被丟棄?這些規則通過交換器類型進行定義。 [![](https://gitee.com/chenssy/blog-home/raw/master/image/201810/rabbitmq_exchanges.png)](https://gitee.com/chenssy/blog-home/raw/master/image/201810/rabbitmq_exchanges.png) 交換器一共四種類型:direct、topic、headers、fanout。目前,我們先關注 fanout 類型的交換器。 ~~~java channel.exchangeDeclare("logs", "fanout"); ~~~ fanout 類型的交換器非常簡單,它只是將所有收到的消息廣播到所有它所知道的隊列。 在上一個教程中,我們不知道交換器,但仍然能夠發送消息到隊列。這是因為我們使用了一個默認的交換器,我們用空的字符串(“”)。 ~~~java // 參數1 exchange :交換器 // 參數2 routingKey : 路由鍵 // 參數3 props : 消息的其他參數 // 參數4 body : 消息體 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); ~~~ 其中,第一個參數表示交換器的名稱,我們設置為””,第二個參數表示消息由路由鍵決定發送到哪個隊列。 現在,我們可以發布消息到我們命名的交換器。 ~~~java channel.basicPublish("logs", "", null, message.getBytes()); ~~~ ## 臨時隊列(Temporary queues) 之前,我們都是使用的隊列指定了一個特定的名稱。不過,對于我們的日志系統而言,我們并不關心隊列的名稱。我們想要接收到所有的消息,而且我們也只對當前正在傳遞的消息感興趣。要解決我們需求,需要做兩件事。 首先,每當我們連接到 RabbitMQ,我們需要一個新的空的隊列。為此,我們可以創建一個具有隨機名稱的隊列,或者甚至更好 – 讓服務器或者,讓服務器為我們選擇一個隨機隊列名稱。 其次,一旦消費者與 RabbitMQ 斷開,消費者所接收的那個隊列應該被自動刪除。 在 Java 客戶端中,我們可以使用 queueDeclare() 方法來創建一個非持久的、唯一的、自動刪除的隊列,且隊列名稱由服務器隨機產生。 ~~~java String queueName = channel.queueDeclare().getQueue(); ~~~ 此時,queueName 包含一個隨機隊列名稱。 ## 綁定(Bindings) [![](https://gitee.com/chenssy/blog-home/raw/master/image/201810/rabbitmq_bindings.png)](https://gitee.com/chenssy/blog-home/raw/master/image/201810/rabbitmq_bindings.png) 我們已經創建了一個 fanout 類型的交換器和隊列。現在,我們需要告訴交換器發送消息到我們的隊列。交換器和隊列之間的關系稱為綁定。 ~~~java // 綁定交換器和隊列 // 參數1 queue :隊列名 // 參數2 exchange :交換器名 // 參數3 routingKey :路由鍵名 channel.queueBind(queueName, "logs", ""); ~~~ ## 案例實戰 [![](https://gitee.com/chenssy/blog-home/raw/master/image/201810/rabbitmq_python-three-overall.png)](https://gitee.com/chenssy/blog-home/raw/master/image/201810/rabbitmq_python-three-overall.png) ### 發送端 發送端,連接到 RabbitMQ,發送一條數據,然后退出。 ~~~java public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { // 創建連接 ConnectionFactory factory = new ConnectionFactory(); // 設置 RabbitMQ 的主機名 factory.setHost("localhost"); // 創建一個連接 Connection connection = factory.newConnection(); // 創建一個通道 Channel channel = connection.createChannel(); // 指定一個交換器 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 發送消息 String message = "Liang-MSG log."; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); // 關閉頻道和連接 channel.close(); connection.close(); } } ~~~ ### 接受端 接受端,不斷等待服務器推送消息,然后在控制臺輸出。 ~~~java public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { // 創建連接 ConnectionFactory factory = new ConnectionFactory(); // 設置 RabbitMQ 的主機名 factory.setHost("localhost"); // 創建一個連接 Connection connection = factory.newConnection(); // 創建一個通道 Channel channel = connection.createChannel(); // 指定一個交換器 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 創建一個非持久的、唯一的、自動刪除的隊列 String queueName = channel.queueDeclare().getQueue(); // 綁定交換器和隊列 // queueBind(String queue, String exchange, String routingKey) // 參數1 queue :隊列名 // 參數2 exchange :交換器名 // 參數3 routingKey :路由鍵名 channel.queueBind(queueName, EXCHANGE_NAME, ""); // 創建隊列消費者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } } ~~~ 現在,做一個實驗,我們開啟兩個 ReceiveLogs 工作程序:ReceiveLogs1 與 ReceiveLogs2。 ReceiveLogs1 ~~~null [x] Received 'Liang-MSG log.' ~~~ ReceiveLogs 2 ~~~null [x] Received 'Liang-MSG log.' ~~~ 此時,ReceiveLogs1 與 ReceiveLogs2 同時收到了消息。 ## 源代碼 > 相關示例完整代碼:[https://github.com/lianggzone/rabbitmq-action](https://github.com/lianggzone/rabbitmq-action)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看