<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                [TOC] # 1. 死信隊列 > 在發送異常或者網絡波動時,消息不能被處理,導致業務異常,所以要把這種異常消息發送到死信隊列處理這些異常消息,補救 ## 1.1 死信和死信隊列 一、死信 > 當一條消息在隊列中出現以下三種情況的時候,該消息就會變成一條死信。 1. 消息被否定確認,使用`channel.basicNack`或`channel.basicReject`,并且此時`requeue`屬性被設置為`false`。 2. 消息在隊列的存活時間超過設置的TTL時間(隊列ttl-所有消息都生效,單條消息ttl)。 3. 消息隊列的消息數量已經超過最大隊列長度。 二、死信隊列 > **“死信”消息會被RabbitMQ進行特殊處理,如果配置了死信隊列信息,那么該消息將會被丟進死信隊列中,如果沒有配置,則該消息將會被丟棄。** 三、配置死信隊列 1. 配置業務隊列,綁定到業務交換機上 2. 為業務隊列配置死信交換機和路由key 3. 為死信交換機配置死信隊列 4. 交換機可以為任何類型【Direct、Fanout、Topic】 ## 1.2 實例 ### 1.2.1 配置 **1.手動確認消息** ``` spring: rabbitmq: host: localhost password: guest username: guest listener: type: simple simple: default-requeue-rejected: false acknowledge-mode: manual ``` **2.配置交換機和隊列** 一個正常交換機綁定兩個業務隊列A和B(廣播模式);A和B綁定一個死信交換機,由不同的key綁定, A隊列消費者產生一個死信消息 ~~~ package com.tuna.mq.rabbitmq.config.deadletter; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitMQConfig { public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange"; public static final String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea"; public static final String BUSINESS_QUEUEB_NAME = "dead.letter.demo.simple.business.queueb"; public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange"; public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey"; public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queueb.routingkey"; public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea"; public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.demo.simple.deadletter.queueb"; // 聲明業務Exchange @Bean("businessExchange") public FanoutExchange businessExchange() { return new FanoutExchange(BUSINESS_EXCHANGE_NAME); } // 聲明死信Exchange @Bean("deadLetterExchange") public DirectExchange deadLetterExchange() { return new DirectExchange(DEAD_LETTER_EXCHANGE); } // 聲明業務隊列A @Bean("businessQueueA") public Queue businessQueueA() { Map<String, Object> args = new HashMap<>(2); //x-dead-letter-exchange 這里聲明當前隊列綁定的死信交換機 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); //x-dead-letter-routing-key 這里聲明當前隊列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY); return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build(); } // 聲明業務隊列B @Bean("businessQueueB") public Queue businessQueueB() { Map<String, Object> args = new HashMap<>(2); //x-dead-letter-exchange 這里聲明當前隊列綁定的死信交換機 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); //x-dead-letter-routing-key 這里聲明當前隊列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY); return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build(); } // 聲明死信隊列A @Bean("deadLetterQueueA") public Queue deadLetterQueueA() { return new Queue(DEAD_LETTER_QUEUEA_NAME); } // 聲明死信隊列B @Bean("deadLetterQueueB") public Queue deadLetterQueueB() { return new Queue(DEAD_LETTER_QUEUEB_NAME); } // 聲明業務隊列A綁定關系 @Bean public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } // 聲明業務隊列B綁定關系 @Bean public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } // 聲明死信隊列A綁定關系 @Bean public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY); } // 聲明死信隊列B綁定關系 @Bean public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY); } } ~~~ ### 1.2.2 消費者 1.正常消費者,Ack=false消息失敗,A隊列中的消息將會發到死信隊列 ~~~ @Slf4j @Component public class BusinessMessageReceiver { @RabbitListener(queues = BUSINESS_QUEUEA_NAME) public void receiveA(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("收到業務消息A:{}", msg); boolean ack = true; Exception exception = null; try { if (msg.contains("deadletter")) { throw new RuntimeException("dead letter exception"); } } catch (Exception e) { ack = false; exception = e; } if (!ack) { log.error("消息消費發生異常,error msg:{}", exception.getMessage(), exception); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } else { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } @RabbitListener(queues = BUSINESS_QUEUEB_NAME) public void receiveB(Message message, Channel channel) throws IOException { log.info("收到業務消息B:{}", new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } ~~~ 2.死信消費者 ~~~ @Component public class DeadLetterMessageReceiver { @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME) public void receiveA(Message message, Channel channel) throws IOException { System.out.println("收到死信消息A:" + new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME) public void receiveB(Message message, Channel channel) throws IOException { System.out.println("收到死信消息B:" + new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } ~~~ ### 1.2.3 生產者 ~~~ @Component public class BusinessMessageSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg(String msg) { rabbitTemplate.convertSendAndReceive(BUSINESS_EXCHANGE_NAME, "", msg); } } ~~~ controller ~~~ @Autowired private BusinessMessageSender sender; @RequestMapping("sendmsg") public void sendMsg(String msg) { sender.sendMsg(msg); } ~~~ 1.發送正常消息 http://localhost:8080/mq/sendmsg?msg=msg 2.產生一個死信消息,如下A隊列產生一個死信消息,他對應的死信隊列會接收到這個消息 http://localhost:8080/mq/sendmsg?msg=deadletter ``` 2022-06-13 15:09:43.350 INFO 3920 --- [ntContainer#1-1] c.t.m.r.s.d.BusinessMessageReceiver : 收到業務消息B:msg 2022-06-13 15:09:43.350 INFO 3920 --- [ntContainer#0-1] c.t.m.r.s.d.BusinessMessageReceiver : 收到業務消息A:msg 2022-06-13 15:12:23.739 INFO 3920 --- [nio-8080-exec-4] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-KnERgpHzUz4thORpw0BXIQ identity=566c4da9] started 2022-06-13 15:12:23.741 INFO 3920 --- [ntContainer#0-1] c.t.m.r.s.d.BusinessMessageReceiver : 收到業務消息A:deadletter 2022-06-13 15:12:23.741 INFO 3920 --- [ntContainer#1-1] c.t.m.r.s.d.BusinessMessageReceiver : 收到業務消息B:deadletter 2022-06-13 15:12:23.744 ERROR 3920 --- [ntContainer#0-1] c.t.m.r.s.d.BusinessMessageReceiver : 消息消費發生異常,error msg:dead letter exception java.lang.RuntimeException: dead letter exception at com.tuna.mq.rabbitmq.service.deadletter.BusinessMessageReceiver.receiveA(BusinessMessageReceiver.java:26) at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_311] 收到死信消息A:deadletter ``` # 2. 延時隊列 1. 先進先出的有序隊列 2. 用來存放需要在指定時間被處理的元素的隊列 ## 2.1 場景 可用于延時性較低、數據量大的定時任務:多少時間之后,做... 1. 訂單在十分鐘之內未支付則自動取消。 2. 新創建的店鋪,如果在十天內都沒有上傳過商品,則自動發送消息提醒。 3. 賬單在一周內未支付,則自動結算。 4. 用戶注冊成功后,如果三天內沒有登陸則進行短信提醒。 5. 用戶發起退款,如果三天內沒有得到處理則通知相關運營人員。 6. 預定會議后,需要在預定的時間點前十分鐘通知各個與會人員參加會議。 7. 這些場景都有一個特點,需要在某個事件發生之后或者之前的指定時間點完成某一項任務,看起來似乎使用定時任務,一直輪詢數據,每秒查一次,取出需要被處理的數據,然后處理不就完事了嗎? 如果數據量比較少,確實可以這樣做,比如:對于“如果賬單一周內未支付則進行自動結算”這樣的需求,如果對于時間不是嚴格限制,而是寬松意義上的一周,那么每天晚上跑個定時任務檢查一下所有未支付的賬單,確實也是一個可行的方案。 但對于數據量比較大,并且時效性較強的場景,如:“訂單十分鐘內未支付則關閉“,短期內未支付的訂單數據可能會有很多,活動期間甚至會達到百萬甚至千萬級別,對這么龐大的數據量仍舊使用輪詢的方式顯然是不可取的,很可能在一秒內無法完成所 有訂單的檢查,同時會給數據庫帶來很大壓力,無法滿足業務要求而且性能低下。更重要的一點是,不!優!雅!沒錯,作為一名有追求的程序員,始終應該追求更優雅的架構和更優雅的代碼風格,寫代碼要像寫詩一樣優美。 ## 2.2 實例 給消息設置了ttl,死信即為延時 1.生產者發送消息到交換機--->業務隊列 2.在ttl事件內沒有消費者讀取消息,消息進入死信(延時)隊列 3.延時隊列消費者消費延時消息 有三種實現方式: 一、隊列上設置超時,不投靈活 二、消息上設置超時,在超市時間點上不一定能發送超時消息 三、插件的方式 ### 2.2.1 插件的方式 1. 聲明一個`x-delayed-message`類型的交換機 2. 發布帶`x-delay`(指定延期時間)消息頭的消息 3. 在超過制定時間后,消息發送到指定隊列,被綁定這個隊列的消費者消費 1.下載 打開官網下載:???http://www.rabbitmq.com/community-plugins.html? ,注意rabbitmq和插件的版本對應,3.x.x -3.x.x 2.將插件放入rabbitmq的plugins目錄 ``` docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez 38ada6e58bba:/plugins ``` 3.啟動插件 ``` docker exec -it 38ada6e58bba /bin/bash rabbitmq-plugins enable rabbitmq_delayed_message_exchange ``` 代碼示例 1.配置插件支持的交換機,并綁定隊列 ~~~ @Configuration public class DelayedRabbitMQConfig { public static final String DELAYED_QUEUE_NAME = "delay.queue"; public static final String DELAYED_EXCHANGE_NAME = "delay.exchange"; public static final String DELAYED_ROUTING_KEY = "delay.routingkey"; @Bean public Queue immediateQueue() { return new Queue(DELAYED_QUEUE_NAME); } @Bean public CustomExchange customExchange() { Map<String, Object> args = new HashMap<>(); //指定低層交換機類型交換機類型 args.put("x-delayed-type", "direct"); //插件封裝的交換機 return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } @Bean public Binding bindingNotify(@Qualifier("immediateQueue") Queue queue, @Qualifier("customExchange") CustomExchange customExchange) { return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs(); } } ~~~ args可以配置選項: ``` String name, 交換機名稱 String type, 交換機消息類型(x-delayed-message) boolean durable, 是否持久化 boolean autoDelete,是否刪除 Map arguments // 隊列中的消息什么時候會自動被刪除? arguments.put("x-message-ttl",10000); //設置過期時間 arguments.put("x-expires", 10000); //x-expires用于當多長時間沒有消費者訪問該隊列的時候,該隊列會自動刪除, //x-max-length: arguments.put("x-max-length", 4); 用于指定隊列的長度,如果不指定,可以認為是無限長,例如指定隊列的長度是4,當超過4條消息,前面的消息將被刪除,給后面的消息騰位 ``` 2.生產者發送延時消息 ~~~ @Slf4j @Component public class DelyMessageSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendDelayMsg(String msg, Integer delayTime) { log.info("dely sender send a message: {}",msg); rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a -> { a.getMessageProperties().setDelay(delayTime); return a; }); } } ~~~ 3. 消費者 ~~~ @Component public class DelyMessageConsumer { private static final Logger logger = LoggerFactory.getLogger(DelyMessageConsumer.class); @RabbitListener(queues = DELAYED_QUEUE_NAME) public void receiveHeadersQueue(String message) { logger.info("dely consumer received message: {}", new String(message)); } } ~~~ 4. controller ~~~ @RequestMapping("dely/{message}/{timeOut}") public String dely(@PathVariable("message") String message,@PathVariable("timeOut") Integer timeOut) { delyMessageSender.sendDelayMsg(message, timeOut); return "success"; } ~~~ http://localhost:8080/mq/dely/hello/30000 輸出 時隔30秒 ``` 2022-07-04 09:40:34.033 INFO 6316 --- [io-8080-exec-10] c.t.m.r.service.dely.DelyMessageSender : dely sender send a message: hello 2022-07-04 09:41:04.036 INFO 6316 --- [ntContainer#2-1] c.t.m.r.s.dely.DelyMessageConsumer : dely consumer received message: hello ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看