<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                如果不能實現在消息粒度上的 TTL,并使其在設置的 TTL 時間及時死亡,就無法設計成一個通用的延遲隊列。對此我們可以使用 rabbitmq_delayed_message_exchange 插件來實現一個通用的延遲隊列。 <br/> 步驟如下: [TOC] # 1. 安裝延遲隊列插件 **1. 下載:https://www.rabbitmq.com/community-plugins.html** ![](https://img.kancloud.cn/8a/f2/8af27d6cb8515a34bd50b1639f4770c0_1260x213.jpg) ![](https://img.kancloud.cn/a7/1e/a71ee33270e6873b220eebb5b55923bd_1259x206.jpg) **2. 上傳插件到 RabbitMQ 的插件目錄** ```shell /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins ``` **3. 運行下面的命令讓插件生效** ```shell rabbitmq-plugins enable rabbitmq_delayed_message_exchange ``` ![](https://img.kancloud.cn/28/3c/283ce57cf9e20c73fcbf3049927c96dd_1346x319.jpg) <br/> # 2. 打造延遲隊列 :-: ![](https://img.kancloud.cn/ab/84/ab84268652333d0387545d66364d6189_1275x212.jpg) 架構圖 delayed.exchange 是一種新的交換機類型,該類型消息支持延遲投遞機制,消息傳遞后并不會立即投遞到目標隊列中,而是存儲在 mnesia (一個分布式數據系統)表中,當達到投遞時間時,才投遞到目標隊列中。 <br/> 步驟如下: **1. 創建SpringBoot項目** ```xml <dependencies> <!--RabbitMQ 依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--RabbitMQ 測試依賴--> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency> </dependencies> ``` **2. `resources/application.properties`** ```properties spring.rabbitmq.host=192.168.0.107 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin ``` **3. 配置類** ```java @Configuration public class DelayedQueueConfig { public static final String DELAYED_QUEUE_NAME = "delayed.queue"; public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey"; /** * 聲明隊列 */ @Bean("delayedQueue") public Queue delayedQueue() { return new Queue(DELAYED_QUEUE_NAME); } /** * 聲明延遲交換機 */ @Bean("delayedExchange") public CustomExchange delayedExchange() { Map<String, Object> args = new HashMap<>(16); //延遲交換機類型。x-delayed-type是固定寫法,不可自定義 args.put("x-delayed-type", ExchangeTypes.DIRECT); //x-delayed-message是固定寫法,不可自定義 return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } /** * 隊列與交換機綁定 */ @Bean public Binding binding(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange delayedExchange) { return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); } } ``` **4. 生產者** ```java @Slf4j @RestController public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey"; @GetMapping("/plugins/send") public void pluginsSend() { Map<Integer, String> data = new HashMap<>(16); data.put(20000, "ttl為20000的消息"); data.put(2000, "ttl為2000的消息"); for (Map.Entry<Integer, String> entry : data.entrySet()) { //convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor processor) rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, entry.getValue(), processor -> { processor.getMessageProperties().setDelay(entry.getKey()); return processor; }); log.info(entry.getValue()); } } } ``` **5. 消費者** ```java @Slf4j @Component public class ConsumerService { public static final String DELAYED_QUEUE_NAME = "delayed.queue"; @RabbitListener(queues = DELAYED_QUEUE_NAME) public void receiveDelayedQueue(Message message) { String msg = new String(message.getBody()); log.info(msg); } } ``` **6. 測試** 啟動項目后訪問 http://localhost:8080/plugins/send ,生產者生產了兩條消息,控制臺輸出如下。 ``` 2022-10-27 22:38:21.395 INFO 7604 --- [nr.ProducerController : ttl為20000的消息 2022-10-27 22:38:21.395 INFO 7604 --- [nr.ProducerController : ttl為2000的消息 2022-10-27 22:38:23.420 INFO 7604 --- [nservice.ConsumerService : ttl為2000的消息 2022-10-27 22:38:41.398 INFO 7604 --- [nservice.ConsumerService : ttl為20000的消息 ``` 第二個消息TTL時間短,所以先被消費掉,符合預期。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看