<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                ## 工作隊列 在工作中分配任務(競爭消費者模式)。 創建一個工作隊列,用于在多個工作人員之間分配耗時的任務。 ![](http://www.rabbitmq.com/img/tutorials/python-two.png =400x120) #### 需求 工作隊列:工作隊列(又名:任務隊列)背后的主要思想是避免立即執行資源密集型任務,安排稍后完成任務。把一個任務封裝成一個消息并發送給一個隊列。在后臺運行的工作進程將彈出任務并最終執行作業。當你運行許多工人時,任務將在他們之間共享。 #### 舉例 把字符串中的點數作為它的復雜度。每一個點將占到“工作”的一秒鐘。例如,Hello ...描述的假任務 將需要三秒鐘。 修改Send.java代碼,以允許從命令行發送任意消息。 ##### 生產者代碼 ``` public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { String message = getMessage(argv); ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } private static String getMessage(String[] argv) { if (argv.length < 1) return "Hello World!"; return joinStrings(argv, "."); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } } ``` ##### 消費者代碼 ``` Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); try { doWork(message); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(" [x] Done"); } } }; channel.basicConsume(QUEUE_NAME, true, consumer); //偽造每個點有一個任務要執行。它將處理交付的消息并執行任務. private static void doWork(String task) throws InterruptedException { for (char ch : task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } } ``` ### 循環調度 使用任務隊列的優點之一是能夠輕松地平行工作。如果我們積壓工作,我們可以增加更多的工人,這樣可以輕松擴展。 我們嘗試同時運行兩個工作者實例。他們都會從隊列中得到消息。 **做法** 將消費者的main方法執行二次,得到二個執行中的運行任務。并開始執行生產者的main方法。結果:默認情況下,RabbitMQ將按順序將每條消息發送給下一個使用者。平均而言,每個消費者將獲得相同數量的消息。這種分發消息的方式稱為循環法(round-robin)。 ### 消息確認 當消息發送出去,到達其中一個消費者時,如果恰巧這個時候,消費者死亡,那么就會導致該條消息不會被處理。 為了確保消息永不丟失,RabbitMQ支持 消息確認。消費者發回確認(告知)告訴RabbitMQ已經收到,處理了一個特定的消息,并且RabbitMQ可以自由刪除它。 如果消費者死亡(其通道關閉,連接關閉或TCP連接丟失),RabbitMQ將理解消息未被完全處理,并將重新排隊。如果有其他消費者同時上網,則會迅速重新發送給其他消費者。 手動消息確認默認打開。 `channel.basicConsume(QUEUE_NAME, true, consumer);`//默認為false為打開狀態 即:第二個參數為false時-》即使在處理消息的時候使用CTRL + C來殺死一個工作者,也不會丟失任何東西。工人死后不久,所有未確認的消息將被重新發送。 ### 消息持久性 如果RabbitMQ服務器停止,我們的任務仍然會丟失。 當RabbitMQ退出或崩潰時,它會忘記隊列和消息,此時需要將隊列和消息標記為持久以確保消息不會丟失 #### 1.持久隊列 注意 生產者消費者的隊列定義都要設置為true,當生產者不同發送時,一旦啟動了消費者,就會自動的進行消息的消費。 即使RabbitMQ重新啟動,task_queue隊列也不會丟失 **send** ``` boolean durable = true ; //代表隊列持久化 channel.queueDeclare("task_queue",durable,假,假,空); ``` #### 2.消息持久性 現在我們需要將消息標記為持久的 - 通過將MessageProperties(實現BasicProperties)設置為值PERSISTENT_TEXT_PLAIN。 ``` channel.basicPublish("","task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); ``` ### 公平派遣 當任務的難度不一致時,一個工人可能會一直做務比較重的任務,而另一個工人幾乎不會做什么事,RabbitMQ不知道任何關于這個,并將仍然均勻地發送消息。 發生這種情況是因為RabbitMQ只在消息進入隊列時調度消息。它沒有考慮消費者未確認消息的數量。它只是盲目地把第n條消息分發給第n個消費者。 ![](http://www.rabbitmq.com/img/tutorials/prefetch-count.png =400x120) #### 解決方案 我們可以使用basicQos方法`prefetchCount = 1`設置。 這告訴RabbitMQ一次不能給一個工作者多個消息。或者換句話說,不要向工作人員發送新消息,直到處理并確認了前一個消息。 ``` System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); int prefetchCount = 1 ; channel.basicQos(prefetchCount); ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看