<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                [TOC] # 簡介 消息的消費 使用容器的方式進行消費 認識一個接口`org.springframework.amqp.rabbit.listener.MessageListenerContainer`, 其默認實現類`org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer` `MessageListenerContainer#setMessageListener`方法,接收的參數類型 `org.springframework.amqp.core.MessageListener`或者`org.springframework.amqp.rabbit.core.ChannelAwareMessageListener`接口 代碼: 將`ConnectionFactory`,`RabbitTemplate`,`SimpleMessageListenerContainer`實例納入到spring容器中進行管理 # SimpleMessageListenerContainer詳解 同一個queue上有多個消費者的時候,只會有一個消費者收到消息,一般是多個消費者輪流收到消息。 `SimpleMessageListenerContainer`可以監聽多個隊列, `container.setQueueNames`的api接收的是一個字符串數組對象 ~~~ @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("zhihao.debug","zhihao.error","zhihao.info"); container.setMessageListener((MessageListener) message -> { System.out.println("====接收到"+message.getMessageProperties().getConsumerQueue()+"隊列的消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); }); return container; } ~~~ ## SimpleMessageListenerContainer運行時動態的添加監聽隊列 ~~~java @ComponentScan public class Application { public static void main(String[] args) throws Exception{ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class); SimpleMessageListenerContainer container = context.getBean(SimpleMessageListenerContainer.class); TimeUnit.SECONDS.sleep(20); container.addQueueNames("zhihao.error"); TimeUnit.SECONDS.sleep(20); container.addQueueNames("zhihao.debug"); TimeUnit.SECONDS.sleep(20); context.close(); } } ~~~ SimpleMessageListenerContainer納入容器 ~~~ @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("zhihao.debug"); container.setMessageListener((MessageListener) message -> { if("zhihao.debug".equals(message.getMessageProperties().getConsumerQueue())){ System.out.println("====接收到"+message.getMessageProperties().getConsumerQueue()+"隊列的消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); }else if("zhihao.error".equals(message.getMessageProperties().getConsumerQueue())){ System.out.println("====接收到"+message.getMessageProperties().getConsumerQueue()+"隊列的消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); }else if("zhihao.info".equals(message.getMessageProperties().getConsumerQueue())){ System.out.println("====接收到"+message.getMessageProperties().getConsumerQueue()+"隊列的消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); } }); return container; } ~~~ ## 運行時動態的移除監聽隊列 SimpleMessageListenerContainer運行時后動態的移除監聽隊列 ~~~ container.removeQueueNames("zhihao.debug"); ~~~ ## 后置處理器 SimpleMessageListenerContainer增加后置處理 ~~~csharp @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("zhihao.miao.order"); //后置處理器,接收到的消息都添加了Header請求頭 container.setAfterReceivePostProcessors(message -> { message.getMessageProperties().getHeaders().put("desc",10); return message; }); container.setMessageListener((MessageListener) message -> { System.out.println("====接收到消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); }); return container; } ~~~ 應用啟動類: ~~~java @ComponentScan public class Application { public static void main(String[] args) throws Exception{ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class); SimpleMessageListenerContainer container = context.getBean(SimpleMessageListenerContainer.class); System.out.println(container.getQueueNames()[0]); TimeUnit.SECONDS.sleep(30); context.close(); } } ~~~ 控制臺打印: ~~~ ====接收到消息===== MessageProperties [headers={desc=10}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=zhihao.miao.order, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-2xCE8upxgGgf-u1haCwt6A, consumerQueue=zhihao.miao.order] 消息2 ~~~ `setAfterReceivePostProcessors`方法可以對消息進行后置處理。 ## 設置消費者的Consumer\_tag和Arguments ~~~ int count=0; @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("zhihao.miao.order"); //設置消費者的consumerTag_tag container.setConsumerTagStrategy(queue -> "order_queue_"+(++count)); //設置消費者的Arguments Map<String, Object> args = new HashMap<>(); args.put("module","訂單模塊"); args.put("fun","發送消息"); container.setConsumerArguments(args); container.setMessageListener((MessageListener) message -> { System.out.println("====接收到消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); }); return container; } ~~~ ![](https://img.kancloud.cn/49/17/491715fe924a56c2d9d963db2d0e5077_657x466.png) `container.setConsumerTagStrategy`可以設置消費者的 `Consumer_tag`, `container.setConsumerArguments`可以設置消費者的 `Arguments` ## setConcurrentConsumers設置并發消費者 ~~~ @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("zhihao.miao.order"); container.setConcurrentConsumers(5); container.setMaxConcurrentConsumers(10); container.setMessageListener((MessageListener) message -> { System.out.println("====接收到消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); }); return container; } ~~~ ![](https://img.kancloud.cn/df/e4/dfe455e66a63a776a6df0b864836e3e7_407x348.png) 應用啟動類, ~~~java @ComponentScan public class Application { public static void main(String[] args) throws Exception{ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class); SimpleMessageListenerContainer container = context.getBean(SimpleMessageListenerContainer.class); container.setConcurrentConsumers(7); TimeUnit.SECONDS.sleep(30); context.close(); } } ~~~ ![](https://img.kancloud.cn/7f/b5/7fb5a056eb03ff8ccac8623b9a2744bd_473x403.png) `setConcurrentConsumers`設置多個并發消費者一起消費,并支持運行時動態修改。`setMaxConcurrentConsumers`設置最多的并發消費者。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看