<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                ? ? ?翻譯地址:http://www.rabbitmq.com/tutorials/tutorial-three-java.html ? ? ?在前面的教程中,我們創建了一個工作隊列,都是假設一個任務只交給一個消費者。這次我們做一些完全不同的事兒——將消息發送給多個消費者。這種模式叫做“發布/訂閱”。 ? ? ?為了說明這個模式,我們將構建一個簡單日志系統。它包含2段程序:第一個將發出日志消息,第二個接受并打印消息。 ? ? ?如果在日志系統中每一個接受者(訂閱者)都會的得到消息的拷貝。那樣的話,我們可以運行一個接受者(訂閱者)程序,直接把日志記錄到硬盤。同時運行另一個接受者(訂閱者)程序,打印日志到屏幕上。 ? ? ?說白了,發表日志消息將被廣播給所有的接收者。 # Exchanges(轉發器) ? ? ?前面的博文匯總,我們都是基于一個隊列發送和接受消息。現在介紹一下完整的消息傳遞模式。 ? ? ?RabbitMQ消息模式的核心理念是:生產者沒有直接發送任何消費到隊列。實際上,生產者都不知道這個消費是發送給哪個隊列的。 ? ? ?相反,生產者只能發送消息給轉發器,轉發器是非常簡單的。一方面它接受生產者的消息,另一方面向隊列推送消息。轉發器必須清楚的知道如何處理接收到的消息。附加一個特定的隊列嗎?附加多個隊列?或者是否丟棄?這些規則通過轉發器的類型進行定義。 ? ? ?![](https://box.kancloud.cn/2016-02-18_56c53cbd5232a.jpg) ? ? ?類型有:Direct、Topic、Headers和Fanout。我們關注最后一個。現在讓我們創建一個該類型的轉發器,定義如下: ~~~ channel.exchangeDeclare("logs", "fanout"); ~~~ ? ? ?fanout轉發器非常簡單,從名字就可以看出,它是廣播接受到的消息給所有的隊列。而這正好符合日志系統的需求。 # Nameless exchange(匿名轉發) ? ? ?之前我們對轉換器一無所知,卻可以將消息發送到隊列,那是可能是我們用了默認的轉發器,轉發器名為空字符串""。之前我們發布消息的代碼是: ~~~ channel.basicPublish("", "hello", null, message.getBytes()); ~~~ ? ? ?第一個參數就是轉發器的名字,空字符串表示模式或者匿名的轉發器。消息通過隊列的routingKey路由到指定的隊列中去,如果存在的話。 ? ? ?現在我們可以指定轉發器的名字了: ~~~ channel.basicPublish( "logs", "", null, message.getBytes()); ~~~ # Temporary queues(臨時隊列) ? ? ?你可能還記得之前我們用隊列時,會指定一個名字。隊列有名字對我們來說是非常重要的——我們需要為消費者指定同一個隊列。 ? ? ?但這并不是我們的日志系統所關心的。我們要監聽所有日志消息,而不僅僅是一類日志。我們只對對當前流動的消息感興趣。解決這些問題,我盟需要完成兩件事。 ? ? ?首先,每當我盟連接到RabbitMQ時,需要一個新的空隊列。為此我們需要創建一個隨機名字的空隊列,或者更好的,讓服務器選好年則一個隨機名字的空隊列給我們。 ? ? ?其次,一旦消費者斷開連接,隊列將自動刪除。 我們提供一個無參的queueDeclare()方法,創建一個非持久化、獨立的、自動刪除的隊列,且名字是隨機生成的。 ~~~ String queueName = channel.queueDeclare().getQueue(); ~~~ queueName是一個隨機隊列名。看起來會像amq.gen-JzTY20BRgKO-HjmUJj0wLg。 # Bindings(綁定) ? ? ?![](https://box.kancloud.cn/2016-02-18_56c53cbd60cec.jpg) ? ? ?我們已經創建了一個廣播的轉發器和一個隨機隊列。現在需要告訴轉發器轉發消息到隊列。這個關聯轉發器和隊列的我們叫它Binding。 ~~~ channel.queueBind(queueName, "logs", ""); ~~~ 這樣,日志轉發器將附加到日志隊列上去。 # 完整的例子: ![](https://box.kancloud.cn/2016-02-18_56c53cbd74d47.jpg) 發送端代碼(生產者)EmitLog.java ~~~ public class EmitLog { private final static String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException { /** * 創建連接連接到MabbitMQ */ ConnectionFactory factory = new ConnectionFactory(); // 設置MabbitMQ所在主機ip或者主機名 factory.setHost("127.0.0.1"); // 創建一個連接 Connection connection = factory.newConnection(); // 創建一個頻道 Channel channel = connection.createChannel(); // 指定轉發——廣播 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); for(int i=0;i<3;i++){ // 發送的消息 String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } // 關閉頻道和連接 channel.close(); connection.close(); } } ~~~ 消費者1?ReceiveLogs2Console.java ~~~ public class ReceiveLogs2Console { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打開連接和創建頻道,與發送端一樣 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 聲明一個隨機隊列 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 創建隊列消費者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } } ~~~ 消費者2?ReceiveLogs2File.java ~~~ public class ReceiveLogs2File { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打開連接和創建頻道,與發送端一樣 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 聲明一個隨機隊列 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 創建隊列消費者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); print2File(message); // System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } private static void print2File(String msg) { try { String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath(); String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date()); File file = new File(dir, logFileName + ".log"); FileOutputStream fos = new FileOutputStream(file, true); fos.write(((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes()); fos.flush(); fos.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } ~~~ ? ? ?可以看到我們1個生產者用于發送log消息,2個消費者,一個用于顯示,一個用于記錄文件。 ? ? ?生產者聲明了一個廣播模式的轉換器,訂閱這個轉換器的消費者都可以收到每一條消息。可以看到在生產者中,沒有聲明隊列。這也驗證了之前說的。生產者其實只關心exchange,至于exchange會把消息轉發給哪些隊列,并不是生產者關心的。 ? ? ?2個消費者,一個打印日志,一個寫入文件,除了這2個地方不一樣,其他地方一模一樣。也是聲明一下廣播模式的轉換器,而隊列則是隨機生成的,消費者實例啟動后,會創建一個隨機實例,這個在管理頁面可以看到(如圖)。而實例關閉后,隨機隊列也會自動刪除。最后將隊列與轉發器綁定。 ![](https://box.kancloud.cn/2016-02-18_56c53cbd83e0a.jpg) ? ? ?注:運行的時候要先運行2個消費者實例,然后在運行生產者實例。否則獲取不到實例。 ? ? ?看看最終的結果吧: ![](https://box.kancloud.cn/2016-02-18_56c53cbd9c944.jpg)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看