<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                案例代碼:https://gitee.com/flymini/codes01/tree/master/rabbitmq_/com-learn-rabbitmq01 **** [TOC] # 1. 開啟發布確認 發布確認默認是沒有開啟的,如果要開啟需要在生產者調用方法 `channel.confirmSelect`,每當你想開啟要發布確認,都需要在 channel 上調用該方法。 <br/> # 2. 發布確認策略 發布確認策略共有三種:單個確認發布、批量確認發布、異步確認發布。 ## 2.1 單個確認發布 這是一種簡單的確認方式,它是一種<mark>同步確認發布</mark>的方式,也就是發布一個消息之后只有它被確認發布,后續的消息才能繼續發布。 ```java public static void publishMessageIndividually() throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, false, false, false, null); //開啟發布確認 channel.confirmSelect(); long begin = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("", queueName, null, message.getBytes()); //服務端返回 false 或超時時間內未返回,生產者可以消息重發 boolean flag = channel.waitForConfirms(); if (flag) { System.out.println("消息發送成功"); } } long end = System.currentTimeMillis(); System.out.println("發布" + MESSAGE_COUNT + "個單獨確認消息,耗時" + (end - begin) + "ms"); } } ``` <br/> 這種確認方式有一個最大的缺點就是:**發布速度特別的慢**,因為如果沒有確認發布的消息就會阻塞所有后續消息的發布,這種方式最多提供每秒不超過數百條發布消息的吞吐量。當然對于某些應用程序來說這可能已經足夠了。 <br/> ## 2.2 批量確認發布 上面那種方式非常慢,與單個等待確認消息相比,先發布一批消息然后一起確認可以極大地提高吞吐量,當然這種方式的缺點就是:**當發生故障導致發布出現問題時,不知道是哪個消息出現問題了**,我們必須將整個批處理保存在內存中,以記錄重要的信息而后重新發布消息。當然這種方案仍然是**同步的**,也一樣阻塞消息的發布。 ```java public static void publishMessageBatch() throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, false, false, false, null); //開啟發布確認 channel.confirmSelect(); //批量確認消息大小 int batchSize = 100; //未確認消息個數 int outstandingMessageCount = 0; long begin = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("", queueName, null, message.getBytes()); outstandingMessageCount++; if (outstandingMessageCount == batchSize) { channel.waitForConfirms(); outstandingMessageCount = 0; } } //為了確保還有剩余沒有確認消息 再次確認 if (outstandingMessageCount > 0) { channel.waitForConfirms(); } long end = System.currentTimeMillis(); System.out.println("發布" + MESSAGE_COUNT + "個批量確認消息,耗時" + (end - begin) + "ms"); } } ``` <br/> ## 2.3 異步確認發布 異步確認雖然編程邏輯比上兩個要復雜,但是性價比最高,無論是可靠性還是效率都沒得說,他是利用回調函數來達到消息可靠性傳遞的,這個中間件也是通過函數回調來保證是否投遞成功。 ![](https://img.kancloud.cn/19/40/19409e173c27234d97c50ec3dabb9ca2_1397x597.jpg) ```java public static void publishMessageAsync() throws Exception { try (Channel channel = RabbitMQUtils.getChannel()) { String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, false, false, false, null); //開啟發布確認 channel.confirmSelect(); /* * 線程安全有序的一個哈希表,適用于高并發的情況 * 1.輕松的將序號與消息進行關聯 * 2.輕松批量刪除條目,只要給到序列號 * 3.支持并發訪問 */ ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>(); /* * 確認收到消息的一個回調 * sequenceNumber: 消息序列號 * multiple: 為true則可以確認小于等于當前序列號的消息、false只確認當前序列號消息 */ ConfirmCallback ackCallback = (sequenceNumber, multiple) -> { if (multiple) { //返回的是小于等于當前序列號的未確認消息 ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true); //清除該部分未確認消息 confirmed.clear(); } else { //只清除當前序列號的消息 outstandingConfirms.remove(sequenceNumber); } }; /* * 沒有被確認消息的一個回調 */ ConfirmCallback nackCallback = (sequenceNumber, multiple) -> { String message = outstandingConfirms.get(sequenceNumber); System.out.println("發布的消息" + message + "未被確認,序列號" + sequenceNumber); }; /* * 添加一個異步確認的監聽器 */ channel.addConfirmListener(ackCallback, nackCallback); long begin = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = "消息" + i; /* * channel.getNextPublishSeqNo()獲取下一個消息的序列號 * 通過序列號與消息體進行一個關聯 * 全部都是未確認的消息體 */ outstandingConfirms.put(channel.getNextPublishSeqNo(), message); channel.basicPublish("", queueName, null, message.getBytes()); } long end = System.currentTimeMillis(); System.out.println("發布" + MESSAGE_COUNT + "個異步確認消息,耗時" + (end - begin) + "ms"); } } ``` <br/> ## 2.4 如何處理異步未確認消息 最好的解決方案就是把未確認的消息放到一個基于內存的能被發布線程訪問的隊列,比如說用 `ConcurrentLinkedQueue` 這個隊列在 confirm callbacks 與發布線程之間進行消息的傳遞。 <br/> ## 2.5 3種發布確認速度對比 * 單獨發布消息:同步等待確認,簡單,但吞吐量非常有限。 * 批量發布消息: 批量同步等待確認,簡單,合理的吞吐量,一旦出現問題但很難推斷出是那條消息出現了問題。 * 異步處理:最佳性能和資源使用,在出現錯誤的情況下可以很好地控制,但是實現起來稍微難些。 ```java public static void main(String[] args) throws Exception { publishMessageIndividually(); publishMessageBatch(); publishMessageAsync(); // 運行結果 // 發布1000個單獨確認消息,耗時457ms // 發布1000個批量確認消息,耗時172ms // 發布1000個異步確認消息,耗時64ms } ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看