<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                ? ? ?翻譯地址:http://www.rabbitmq.com/tutorials/tutorial-four-java.html ? ? ?在[前篇博文](http://blog.csdn.net/xiaoxian8023/article/details/48729479)中,我們建立了一個簡單的日志系統。可以廣播消息給多個消費者。本篇博文,我們將添加新的特性——我們可以只訂閱部分消息。比如:我們可以接收Error級別的消息寫入文件。同時仍然可以在控制臺打印所有日志。 # Bindings(綁定) ? ? ?在上一篇博客中我們已經使用過綁定。類似下面的代碼: ~~~ channel.queueBind(queueName, EXCHANGE_NAME, ""); ~~~ ? ? ?綁定表示轉換器與隊列之間的關系。可以簡單的人為:隊列對該轉發器上的消息感興趣。 ? ? ?綁定可以設定額外的routingKey參數。為了與避免basicPublish方法(發布消息的方法)的參數混淆,我們準備把它稱作綁定鍵(binding key)。下面展示如何使用綁定鍵(binding key)來創建一個綁定: ~~~ channel.queueBind(queueName, EXCHANGE_NAME, "black"); ~~~ ? ? ?綁定鍵關鍵取決于轉換器的類型。對于fanout類型,忽略此參數。 # Direct exchange(直接轉發) ? ? ?前面講到我們的日志系統廣播消息給所有的消費者。我們想對其擴展,根據消息的嚴重性來過濾消息。例如:我們希望將致命錯誤的日志消息記錄到文件,而不是把磁盤空間浪費在warn和info類型的日志上。我們使用的fanout轉發器,不能給我們太多的靈活性。它僅僅只是盲目的廣播而已。我們使用direct轉發器進行代替,其背后的算法很簡單——消息會被推送至綁定鍵(binding key)和消息發布附帶的選擇鍵(routing key)完全匹配的隊列。 ![](https://box.kancloud.cn/2016-02-18_56c53cbdb84fc.jpg) ? ? ?在上圖中,我們可以看到direct類型的轉發器與2個隊列進行了綁定。第一個隊列使用的綁定鍵是orange,第二個隊列綁定鍵為black和green。這樣當消息發布到轉發器是,附帶orange綁定鍵的消息將被路由到隊列Q1中去。附帶black和green綁定鍵的消息被路由到Q2中去。其他消息全部丟棄。 # Multiple bindings(多重綁定) ![](https://box.kancloud.cn/2016-02-18_56c53cbdc6130.jpg) ? ? ?使用一個綁定鍵綁定多個隊列是完全合法的。如上圖,綁定鍵black綁定了2個隊列——Q1和Q2。 # Emitting logs(發送日志) ? ? ?我們將這種模式用于日志系統,發送消息給direct類型的轉發器。我們將 提供日志嚴重性做為綁定鍵。那樣,接收程序可以選擇性的接收嚴重性的消息。首先關注發送日志的代碼: 像往常一樣首先創建一個轉換器: ~~~ channel.exchangeDeclare(EXCHANGE_NAME, "direct"); ~~~ ? ? ?然后為發送消息做準備: ~~~ channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); ~~~ ? ? ?為了簡化代碼,我們假定日志的嚴重性是‘info’,‘warning’,‘error’中之一。 # Subscribing(訂閱) ? ? ?接收消息跟前面博文中的一樣。我們僅需要修改一個地方:為每一個我們感興趣的嚴重性的消息,創建一個新的綁定。 ~~~ String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); } ~~~ # 完整的例子 ![](https://box.kancloud.cn/2016-02-18_56c53cbdd4423.jpg) 發送端代碼(EmitLogDirect.java) ~~~ public class EmitLogDirect { private final static String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException { /** * 創建連接連接到MabbitMQ */ ConnectionFactory factory = new ConnectionFactory(); // 設置MabbitMQ所在主機ip或者主機名 factory.setHost("127.0.0.1"); // 創建一個連接 Connection connection = factory.newConnection(); // 創建一個頻道 Channel channel = connection.createChannel(); // 指定轉發——廣播 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); //所有日志嚴重性級別 String[] severities={"error","info","warning"}; for(int i=0;i<3;i++){ String severity = severities[i%3];//每一次發送一條不同嚴重性的日志 // 發送的消息 String message = "Hello World"+Strings.repeat(".", i+1); //參數1:exchange name //參數2:routing key channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent '" + severity +"':'"+ message + "'"); } // 關閉頻道和連接 channel.close(); connection.close(); } } ~~~ 消費者1(ReceiveLogs2Console.java) ~~~ public class ReceiveLogs2Console { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打開連接和創建頻道,與發送端一樣 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 聲明一個隨機隊列 String queueName = channel.queueDeclare().getQueue(); //所有日志嚴重性級別 String[] severities={"error","info","warning"}; for (String severity : severities) { //關注所有級別的日志(多重綁定) channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 創建隊列消費者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } } ~~~ 消費者2(ReceiveLogs2File.java) ~~~ public class ReceiveLogs2File { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打開連接和創建頻道,與發送端一樣 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 聲明一個隨機隊列 String queueName = channel.queueDeclare().getQueue(); String severity="error";//只關注error級別的日志,然后記錄到文件中去。 channel.queueBind(queueName, EXCHANGE_NAME, severity); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 創建隊列消費者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); //記錄日志到文件: print2File( "["+ envelope.getRoutingKey() + "] "+message); } }; channel.basicConsume(queueName, true, consumer); } private static void print2File(String msg) { try { String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath(); String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date()); File file = new File(dir, logFileName + ".log"); FileOutputStream fos = new FileOutputStream(file, true); fos.write((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes()); fos.flush(); fos.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } ~~~ ? ? ?最終結果: ![](https://box.kancloud.cn/2016-02-18_56c53cbde186c.jpg) ? ? ?羅哩羅嗦的說這么多,其實就是說了這么一件事:我們可以使用Direct exchange+routingKey來過濾自己感興趣的消息。一個隊列可以綁定多個routingKey。這就是我們今天的主題——路由選擇。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看