<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                # **1. 分析數據丟失的原因** ![](https://img.kancloud.cn/99/34/9934d9a1a9db8098cdbb72b079ee4a9e_681x180.png) 可以看出,一條消息整個過程要經歷兩次的網絡傳輸:**從生產者發送到RabbitMQ服務器,從RabbitMQ服務器發送到消費者**。 **在消費者未消費前存儲在隊列(Queue)中**。 所以可以知道,有三個場景下是會發生消息丟失的: * 存儲在隊列中,如果隊列沒有對消息持久化,RabbitMQ服務器宕機重啟會丟失數據。 * 生產者發送消息到RabbitMQ服務器過程中,RabbitMQ服務器如果宕機停止服務,消息會丟失。 * 消費者從RabbitMQ服務器獲取隊列中存儲的數據消費,但是消費者程序出錯或者宕機而沒有正確消費,導致數據丟失。 針對以上三種場景,RabbitMQ提供了三種解決的方式,分別是消息持久化,confirm機制,ACK事務機制。 ![](https://img.kancloud.cn/00/dd/00ddeecbc84b2b44de389e59ef40efc9_685x204.png) # 2. 防丟失策略 ## 2.1 持久化 為了防止rabbitmq故障,導致數據丟失,詳見消息持久化 ## 2.2 生產者 **在生產者發送到RabbitMQ Server時有可能因為網絡問題導致投遞失敗,從而丟失數據**。我們可以使用confirm模式防止數據丟失。工作流程是怎么樣的呢,看以下圖解: ![](https://img.kancloud.cn/95/f5/95f58764711980a333f7a66795bf7e50_658x303.png) 從上圖中可以看到是通過兩個回調函數**confirm()、returnedMessage()** 進行通知。 一條消息從生產者發送到RabbitMQ,首先會發送到Exchange,對應回調函數**confirm()**。第二步從Exchange路由分配到Queue中,對應回調函數則是**returnedMessage()**。 ### 2.2.1 配置生產者消費回調 ~~~text spring: rabbitmq: publisher-confirms: true # publisher-returns: true template: mandatory: true # publisher-confirms:設置為true時。當消息投遞到Exchange后,會回調confirm()方法進行通知生產者 # publisher-returns:設置為true時。當消息匹配到Queue并且失敗時,會通過回調returnedMessage()方法返回消息 # spring.rabbitmq.template.mandatory: 設置為true時。指定消息在沒有被隊列接收時會通過回調returnedMessage()方法退回。 ~~~ ### 2.2.2 回調函數 ~~~text @Component public class RabbitmqConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { private Logger logger = LoggerFactory.getLogger(RabbitmqConfirmCallback.class); /** * 監聽消息是否到達Exchange * * @param correlationData 包含消息的唯一標識的對象 * @param ack true 標識 ack,false 標識 nack * @param cause nack 投遞失敗的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { logger.info("消息投遞成功~消息Id:{}", correlationData.getId()); } else { logger.error("消息投遞失敗,Id:{},錯誤提示:{}", correlationData.getId(), cause); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.info("消息沒有路由到隊列,獲得返回的消息"); Map map = byteToObject(message.getBody(), Map.class); logger.info("message body: {}", map == null ? "" : map.toString()); logger.info("replyCode: {}", replyCode); logger.info("replyText: {}", replyText); logger.info("exchange: {}", exchange); logger.info("routingKey: {}", exchange); logger.info("------------> end <------------"); } @SuppressWarnings("unchecked") private <T> T byteToObject(byte[] bytes, Class<T> clazz) { T t; try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes); ObjectInputStream ois = new ObjectInputStream(bis)) { t = (T) ois.readObject(); } catch (Exception e) { e.printStackTrace(); return null; } return t; } } ~~~ ### 2.2.3 生產者實現 ~~~text @Service public class RabbitMQServiceImpl implements RabbitMQService { @Resource private RabbitmqConfirmCallback rabbitmqConfirmCallback; @Resource private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { //指定 ConfirmCallback rabbitTemplate.setConfirmCallback(rabbitmqConfirmCallback); //指定 ReturnCallback rabbitTemplate.setReturnCallback(rabbitmqConfirmCallback); } @Override public String sendMsg(String msg) throws Exception { Map<String, Object> message = getMessage(msg); try { CorrelationData correlationData = (CorrelationData) message.remove("correlationData"); rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, message, correlationData); return "ok"; } catch (Exception e) { e.printStackTrace(); return "error"; } } private Map<String, Object> getMessage(String msg) { String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32); CorrelationData correlationData = new CorrelationData(msgId); String sendTime = sdf.format(new Date()); Map<String, Object> map = new HashMap<>(); map.put("msgId", msgId); map.put("sendTime", sendTime); map.put("msg", msg); map.put("correlationData", correlationData); return map; } } ~~~ 大功告成!接下來我們進行測試,發送一條消息,我們可以控制臺: ![](https://img.kancloud.cn/6b/d4/6bd440fe1b0ac605831950c5115988e2_1250x95.png) **假設發送一條信息沒有路由匹配到隊列,可以看到如下信息:** ![](https://img.kancloud.cn/81/6f/816fa29a2db6cf68a1702abe994157ac_720x73.png) ## 2.3 消費端 消費者需要回復ack
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看