<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                [TOC] # 1. 消息應答概念 消費者完成一個任務可能需要一段時間,如果其中一個消費者在處理一個長的任務時只完成了部分突然掛掉了,會發生什么情況。 <br/> RabbitMQ 一旦向消費者傳遞了一條消息,便立即將該消息標記為刪除。在這種情況下,突然有個消費者掛掉了,我們將丟失正在處理的消息。以及后續發送給該消費者的消息,因為它無法接收到。 <br/> 為了保證消息在發送過程中不丟失,rabbitmq 引入消息應答機制,消息應答就是:<mark>消費者在接收到消息并且處理該消息之后,告訴 rabbitmq 它已經處理了,rabbitmq 可以把該消息刪除了</mark>。 <br/> 消息應答方式有:自動應答和手動應答兩種方式。 <br/> # 2. 自動應答 消息發送后立即被認為已經傳送成功,這種模式需要在<mark>高吞吐量和數據傳輸安全性方面做權衡</mark>,因為這種模式如果消息在接收到之前,消費者那邊出現連接或者 channel 關閉,那么消息就丟失了。 <br/> 另一方面這種模式可能導致消費者過載,<mark>因為沒有對傳遞的消息數量進行限制</mark>,有可能使得消費者這邊由于接收太多還來不及處理的消息,導致這些消息的積壓,最終使得內存耗盡,最終這些消費者線程被操作系統殺死,<mark>所以這種模式僅適用在消費者可以高效并以某種速率能夠處理這些消息的情況下使用</mark>。 <br/> # 3. 手動應答 ## 3.1 消息自動重新入隊 如果消費者由于某些原因失去連接(如通道已關閉、連接已關閉或 TCP 連接丟失),導致消息未發送 ACK 確認,RabbitMQ 了解到消息未完全處理,將對消息重新排隊。 <br/> 如果此時其他消費者可以處理,該條消息將會很快被重新分發給另一個消費者。這樣,即使某個消費者偶爾死亡,也可以確保不會丟失任何消息。 <br/> ## 3.2 手動應答實現 消息默認采用的是自動應答,改用手動應答有如下好處。 (1)消息消費過程中數據不丟失。 (2)可以批量應答并且減少網絡擁堵。 <br/> 實現手動應答的 API 如下。 ```java public interface Channel extends ShutdownNotifier, AutoCloseable { // 用于肯定確認 // RabbitMQ 已知道該消息被處理成功,可以將其丟棄了 void basicAck(long var1, boolean var3) throws IOException; // 消費者拒絕消費 void basicNack(long var1, boolean var3, boolean var4) throws IOException; // 消費者拒絕消費 // 與 basicNack 相比少一個參數var4,當消息不被處理時直接丟棄 void basicReject(long var1, boolean var3) throws IOException; } ``` 這三個方法都有一個參數 var3,是 multiple 域。 當`var3=true`代表批量應答 channel 上未應答的消息。比如 channel 上有正在傳送 tag 為 5、6、7、8 的消息。假設當前 tag=8,那么此時 5 — 8 的這些還未應答的消息都會被確認收到消息應答。 <br/> 當`var3=false`時,只會應答 tag=8 的消息, 5、6、7 這三個消息依然不會被確認收到消息應答。 <br/> 實現手動應答代碼演示如下: **1. 工具類** ```java public class RabbitMQUtils { /** * 連接RabbitMQ服務器 */ public static Channel getChannel() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.107"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } } ``` ```java public class SleepUtils { public static void sleep(int second) { try { Thread.sleep(1000 * second); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } ``` **2. 生產者** ```java public class Task02 { private static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMQUtils.getChannel()) { //聲明隊列 channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null); Scanner sc = new Scanner(System.in); System.out.println("請輸入信息"); while (sc.hasNext()) { String message = sc.nextLine(); //發送消息 channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("生產者發出消息:" + message); } } } } ``` **3. 兩個消費者** 在消費端調用手動應答 API 實現手動應答。 ```java public class Worker03 { private static final String ACK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); System.out.println("C1 等待接收消息處理時間較短"); //消息消費的時候如何處理消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); SleepUtils.sleep(1); System.out.println("接收到消息:" + message); /** * 消息應答。 * basicAck(long var1, boolean var3) * var1: 消息標記tag * var3: 是否批量應答未應答消息 */ channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; /** * basicConsume(String var1, boolean var2, DeliverCallback var3, CancelCallback var4) * var1: 隊列名稱 * var2: 應答方式。true自動應答、false手動應答 */ channel.basicConsume(ACK_QUEUE_NAME, false, deliverCallback, (consumerTag) -> { System.out.println(consumerTag + "消費者取消消費接口回調邏輯"); }); } } ``` ```java public class Worker04 { private static final String ACK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); System.out.println("C2 等待接收消息處理時間較長"); //消息消費的時候如何處理消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); SleepUtils.sleep(30); System.out.println("接收到消息:" + message); /** * 消息應答。 * basicAck(long var1, boolean var3) * var1: 消息標記tag * var3: 是否批量應答未應答消息 */ channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; /** * basicConsume(String var1, boolean var2, DeliverCallback var3, CancelCallback var4) * var1: 隊列名稱 * var2: 應答方式。true自動應答、false手動應答 */ channel.basicConsume(ACK_QUEUE_NAME, false, deliverCallback, (consumerTag) -> { System.out.println(consumerTag + "消費者取消消費接口回調邏輯"); }); } } ``` **4. 測試** (1)啟動生產者 Task02,生產4條消息。 ![](https://img.kancloud.cn/89/d2/89d247ff917157dda9d042719ca7f6b1_1452x266.jpg) (2)啟動兩個消費者。 正常情況下兩個消費者按照輪詢機制分別獲取兩條消息。 ![](https://img.kancloud.cn/26/a8/26a8fbcfb011890c574d5b293f4ed739_1284x108.jpg) ![](https://img.kancloud.cn/91/14/9114ebde2740af04a190958c19be521f_1484x100.jpg) <br/> 演示消息自動重新入隊:當生產者 Task02 發送消息 C4 的時候,立即把消費者 Worker04 停掉,就會發現本來應該由 Worker04 消費的消息 C4,被 Worker03 消費了,實現了消息自動重新入隊。 ![](https://img.kancloud.cn/59/34/59347c41922ca3f6e08d7b81618ad3fa_1350x135.jpg) ![](https://img.kancloud.cn/4d/14/4d14649cd4d283d6c7ba84f023eb29e3_1368x84.jpg)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看