<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                工作隊列,又稱任務隊列,主要思想是避免立即執行資源密集型任務,并且必須等待完成。相反地,我們進行任務調度,我們將一個任務封裝成一個消息,并將其發送到隊列。工作進行在后臺運行不斷的從隊列中取出任務然后執行。當你運行了多個工作進程時,這些任務隊列中的任務將會被工作進程共享執行。 這個概念在 Web 應用程序中特別有用,在短時間 HTTP 請求內需要執行復雜的任務。 [![](https://gitee.com/chenssy/blog-home/raw/master/image/201810/rabbitmq_python-two.png)](https://gitee.com/chenssy/blog-home/raw/master/image/201810/rabbitmq_python-two.png) ## 準備工作 現在,假裝我們很忙,我們使用 Thread.sleep 來模擬耗時的任務。 ### 發送端 ~~~java public class NewTask { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { // 創建連接 ConnectionFactory factory = new ConnectionFactory(); // 設置 RabbitMQ 的主機名 factory.setHost("localhost"); // 創建一個連接 Connection connection = factory.newConnection(); // 創建一個通道 Channel channel = connection.createChannel(); // 指定一個隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 發送消息 for (int i = 0; i < 10; i++) { String message = "Liang:" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } // 關閉頻道和連接 channel.close(); connection.close(); } } ~~~ ### 接收端 ~~~java public class Worker { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { // 創建連接 ConnectionFactory factory = new ConnectionFactory(); // 設置 RabbitMQ 的主機名 factory.setHost("localhost"); // 創建一個連接 Connection connection = factory.newConnection(); // 創建一個通道 Channel channel = connection.createChannel(); // 指定一個隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 創建隊列消費者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } catch (InterruptedException e) { e.printStackTrace(); } } }; // acknowledgment is covered below boolean autoAck = true; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } private static void doWork(String task) throws InterruptedException { String[] taskArr = task.split(":"); TimeUnit.SECONDS.sleep(Long.valueOf(taskArr[1])); } } ~~~ ## 輪詢調度(Round-robin dispatching) 使用任務隊列的優點之一是能夠輕松地并行工作。如果我們積壓了很多任務,我們可以增加更多的工作進程,這樣可以輕松擴展。 首先,我們嘗試在同一時間運行兩個工作進程實例。他們都會從隊列中獲取消息,但是究竟如何?讓我們來看看。 你需要三個控制臺打開。兩個將運行工作程序。這些控制臺將是我們兩個消費者 – C1 和 C2。 Worker1 ~~~null [x] Received 'Liang:0' [x] Received 'Liang:2' [x] Received 'Liang:4' [x] Received 'Liang:6' [x] Received 'Liang:8' ~~~ Worker2 ~~~null [x] Received 'Liang-1' [x] Received 'Liang-3' [x] Received 'Liang-5' [x] Received 'Liang-7' [x] Received 'Liang-9' ~~~ 再做一個實驗,我們開啟三個工作程序。 Worker1 ~~~null [x] Received 'Liang-0' [x] Received 'Liang-3' [x] Received 'Liang-6' [x] Received 'Liang-9' ~~~ Worker2 ~~~null [x] Received 'Liang-1' [x] Received 'Liang-4' [x] Received 'Liang-7' ~~~ Worker3 ~~~null [x] Received 'Liang-2' [x] Received 'Liang-5' [x] Received 'Liang-8' ~~~ 我們發現,通過增加更多的工作程序就可以進行并行工作,并且接受的消息平均分配。注意的是,這種分配過程是一次性分配,并非一個一個分配。 默認情況下,RabbitMQ 將會發送的每一條消息按照順序給下一個消費者。平均每一個消費者將獲得相同數量的消息。這種分發消息的方式叫做輪詢調度。 ## 消息應答(Message acknowledgment) 執行一個任務可能需要幾秒鐘。你可能會想,如果一個消費者開始一個長期的任務,并且只有部分地完成它,會發生什么事情?使用我們當前的代碼,一旦 RabbitMQ 向客戶發送消息,它立即將其從內存中刪除。在這種情況下,如果你殺死正在執行任務的某個工作進程,我們會丟失它正在處理的信息。我們還會丟失所有發送給該特定工作進程但尚未處理的消息。 但是,我們不想失去任何消息。如果某個工作進程被殺死時,我們希望把這個任務交給另一個工作進程。 為了確保消息永遠不會丟失,RabbitMQ 支持消息應答。從消費者發送一個確認信息告訴 RabbitMQ 已經收到消息并已經被接收和處理,然后RabbitMQ 可以自由刪除它。 如果消費者被殺死而沒有發送應答,RabbitMQ 會認為該信息沒有被完全的處理,然后將會重新轉發給別的消費者。如果同時有其他消費者在線,則會迅速將其重新提供給另一個消費者。這樣就可以確保沒有消息丟失,即使工作進程偶爾也會死亡。 默認情況下,消息應答是開啟的。在前面的例子中,我們通過 autoAck = true 標志明確地將它們關閉。現在是一旦完成任務,將此標志設置為false ,并向工作進程發送正確的確認。 ### 發送端 ~~~java public class AckNewTask { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { // 創建連接 ConnectionFactory factory = new ConnectionFactory(); // 設置 RabbitMQ 的主機名 factory.setHost("localhost"); // 創建一個連接 Connection connection = factory.newConnection(); // 創建一個通道 Channel channel = connection.createChannel(); // 指定一個隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 發送消息 for (int i = 0; i < 10; i++) { String message = "Liang:" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } // 關閉頻道和連接 channel.close(); connection.close(); } } ~~~ ### 接收端 ~~~java public class AckWorker { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { // 創建連接 ConnectionFactory factory = new ConnectionFactory(); // 設置 RabbitMQ 的主機名 factory.setHost("localhost"); // 創建一個連接 Connection connection = factory.newConnection(); // 創建一個通道 final Channel channel = connection.createChannel(); // 指定一個隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 創建隊列消費者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } catch (InterruptedException e) { e.printStackTrace(); } finally{ // 每次處理完成一個消息后,手動發送一次應答。 channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } private static void doWork(String task) throws InterruptedException { String[] taskArr = task.split(":"); TimeUnit.SECONDS.sleep(Long.valueOf(taskArr[1])); } } ~~~ 其中,首先關閉自動應答機制。 ~~~java boolean ack = false ; channel.basicConsume(QUEUE_NAME, ack, consumer); ~~~ 然后,每次處理完成一個消息后,手動發送一次應答。 ~~~java channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); ~~~ 此時,我們開啟三個工作進程,然后,隨機關閉一個工作進程,觀察輸出結果。 ## 消息持久化(Message durability) 我們已經學會了如何確保即使消費者死了,任務也不會丟失。但是如果 RabbitMQ 服務器停止,我們的任務仍然會丟失。 當 RabbitMQ 退出或崩潰時,將會丟失所有的隊列和信息,除非你告訴它不要丟失。需要兩件事來確保消息不丟失:我們需要分別將隊列和消息標記為持久化。 首先,我們需要確保 RabbitMQ 永遠不會失去我們的隊列。為了這樣做,我們需要將其聲明為持久化的。 ~~~java boolean durable = true; channel.queueDeclare("hello_dirable", durable, false, false, null); ~~~ 其次,我們需要標識我們的信息為持久化的。通過設置 MessageProperties 值為 PERSISTENT\_TEXT\_PLAIN。 ~~~java channel.basicPublish("", "hello_dirable", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); ~~~ 注意的是,在RabbitMQ 中,已經存在的隊列,我們無法修改其屬性。 此時,我們開啟一個發送者發送消息,然后,關閉 RabbitMQ 服務,再重新開啟,觀察輸出結果。 ## 公平轉發(Fair dispatch) 您可能已經注意到,調度仍然無法正常工作。例如在兩個工作線程的情況下,一個工作線程將不斷忙碌,另一個工作人員幾乎不會做任何工作。那么,RabbitMQ 不知道什么情況,還會平均分配消息。 這是因為當消息進入隊列時,RabbitMQ 只會分派消息。它不看消費者的未確認消息的數量。它只是盲目地向第 n 個消費者發送每個第 n 個消息。 為了解決這樣的問題,我們可以使用 basicQos 方法,并將傳遞參數為 prefetchCount = 1。 這樣告訴 RabbitMQ 不要一次給一個工作線程多個消息。換句話說,在處理并確認前一個消息之前,不要向工作線程發送新消息。相反,它將發送到下一個還不忙的工作線程。 ~~~java public class FairNewTask { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { // 創建連接 ConnectionFactory factory = new ConnectionFactory(); // 設置 RabbitMQ 的主機名 factory.setHost("localhost"); // 創建一個連接 Connection connection = factory.newConnection(); // 創建一個通道 Channel channel = connection.createChannel(); // 指定一個隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 公平轉發 int prefetchCount = 1; channel.basicQos(prefetchCount); // 發送消息 for (int i = 10; i >0; i--) { String message = "Liang:" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } // 關閉頻道和連接 channel.close(); connection.close(); } } ~~~ 其中,使用 basicQos 方法,并將傳遞參數為 prefetchCount = 1。 ~~~java int prefetchCount = 1; channel.basicQos(prefetchCount) ; ~~~ ## 源代碼 > 相關示例完整代碼:[https://github.com/lianggzone/rabbitmq-action](https://github.com/lianggzone/rabbitmq-action)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看