<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                [TOC] # MessageListenerAdapter詳解 消息監聽適配器(adapter),通過反射將消息處理委托給目標監聽器的處理方法,并進行靈活的消息類型轉換。允許監聽器方法對消息內容類型進行操作,完全獨立于Rabbit API 默認情況下,傳入Rabbit消息的內容在被傳遞到目標監聽器方法之前被提取,以使目標方法對消息內容類型進行操作以String或者byte類型進行操作,而不是原始Message類型。 (消息轉換器) 消息類型轉換委托給`MessageConverter`接口的實現類。 默認情況下,將使用`SimpleMessageConverter`。 (如果您不希望進行這樣的自動消息轉換, 那么請自己通過`#setMessageConverter MessageConverter`設置為null) 如果目標監聽器方法返回一個非空對象(通常是消息內容類型,例如String或byte數組),它將被包裝在一個Rabbit Message 中,并發送使用來自`Rabbit ReplyTo`屬性或通過`#setResponseRoutingKey(String)`指定的`routingKey`的`routingKey`來傳送消息。(使用rabbitmq 來實現異步rpc功能時候會使用到這個屬性)。 注意:發送響應消息僅在使用ChannelAwareMessageListener入口點(通常通過Spring消息監聽器容器)時可用。 用作MessageListener不支持生成響應消息。 # Demo 配置類MQConfig ~~~java import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class MQConfig { @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672"); return factory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); return rabbitAdmin; } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setExchange("zhihao.direct.exchange"); rabbitTemplate.setRoutingKey("zhihao.debug"); return rabbitTemplate; } @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("order","pay","zhihao.miao.order"); MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageHandler()); //設置處理器的消費消息的默認方法,如果沒有設置,那么默認的處理器中的默認方式是handleMessage方法 adapter.setDefaultListenerMethod("onMessage"); Map<String, String> queueOrTagToMethodName = new HashMap<>(); queueOrTagToMethodName.put("order","onorder"); queueOrTagToMethodName.put("pay","onpay"); queueOrTagToMethodName.put("zhihao.miao.order","oninfo"); adapter.setQueueOrTagToMethodName(queueOrTagToMethodName); container.setMessageListener(adapter); return container; } } ~~~ Handler類`MessageHandler`,`MessageHandler`類中定義的方法也就是上面翻譯的目標監聽器的處理方法: ~~~ public class MessageHandler { //沒有設置默認的處理方法的時候,方法名是handleMessage public void handleMessage(byte[] message){ System.out.println("---------handleMessage-------------"); System.out.println(new String(message)); } //通過設置setDefaultListenerMethod時候指定的方法名 public void onMessage(byte[] message){ System.out.println("---------onMessage-------------"); System.out.println(new String(message)); } //以下指定不同的隊列不同的處理方法名 public void onorder(byte[] message){ System.out.println("---------onorder-------------"); System.out.println(new String(message)); } public void onpay(byte[] message){ System.out.println("---------onpay-------------"); System.out.println(new String(message)); } public void oninfo(byte[] message){ System.out.println("---------oninfo-------------"); System.out.println(new String(message)); } } ~~~ 啟動應用類: ~~~java @ComponentScan public class Application { public static void main(String[] args) throws Exception{ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class); System.out.println("===start up======"); TimeUnit.SECONDS.sleep(60); context.close(); } } ~~~ # 總結 使用`MessageListenerAdapter`處理器進行消息隊列監聽處理,如果容器沒有設置`setDefaultListenerMethod`,則處理器中默認的處理方法名是handleMessage,如果設置了`setDefaultListenerMethod`,則處理器中處理消息的方法名就是`setDefaultListenerMethod`方法參數設置的值。也可以通過`setQueueOrTagToMethodName`方法為不同的隊列設置不同的消息處理方法。 # 源碼分析: 我們知道`MessageListenerAdapter`繼承`AbstractAdaptableMessageListener`類,實現`MessageListener`和`ChannelAwareMessageListener`接口,而我們知道`MessageListener`和`ChannelAwareMessageListener`接口的`onMessage`方法就是具體容器監聽隊列處理隊列消息的方法。 `MessageListenerAdapter`的`onMessage`方法 ~~~ @Override public void onMessage(Message message, Channel channel) throws Exception { // Check whether the delegate is a MessageListener impl itself. // In that case, the adapter will simply act as a pass-through. Object delegate = getDelegate(); if (delegate != this) { if (delegate instanceof ChannelAwareMessageListener) { if (channel != null) { ((ChannelAwareMessageListener) delegate).onMessage(message, channel); return; } else if (!(delegate instanceof MessageListener)) { throw new AmqpIllegalStateException("MessageListenerAdapter cannot handle a " + "ChannelAwareMessageListener delegate if it hasn't been invoked with a Channel itself"); } } if (delegate instanceof MessageListener) { ((MessageListener) delegate).onMessage(message); return; } } // Regular case: find a handler method reflectively. Object convertedMessage = extractMessage(message); //獲取處理消息的方法名 String methodName = getListenerMethodName(message, convertedMessage); if (methodName == null) { throw new AmqpIllegalStateException("No default listener method specified: " + "Either specify a non-null value for the 'defaultListenerMethod' property or " + "override the 'getListenerMethodName' method."); } // Invoke the handler method with appropriate arguments. Object[] listenerArguments = buildListenerArguments(convertedMessage); Object result = invokeListenerMethod(methodName, listenerArguments, message); if (result != null) { handleResult(result, message, channel); } else { logger.trace("No result object given - no result to handle"); } } ~~~ 獲取處理消息的方法名 ~~~ protected String getListenerMethodName(Message originalMessage, Object extractedMessage) throws Exception { if (this.queueOrTagToMethodName.size() > 0) { MessageProperties props = originalMessage.getMessageProperties(); String methodName = this.queueOrTagToMethodName.get(props.getConsumerQueue()); if (methodName == null) { methodName = this.queueOrTagToMethodName.get(props.getConsumerTag()); } if (methodName != null) { return methodName; } } return getDefaultListenerMethod(); } ~~~ # 結論 `MessageListenerAdapter` 1.可以把一個沒有實現`MessageListener`和`ChannelAwareMessageListener`接口的類適配成一個可以處理消息的處理器 2.默認的方法名稱為:`handleMessage`,可以通過`setDefaultListenerMethod`設置新的消息處理方法 3.`MessageListenerAdapter`支持不同的隊列交給不同的方法去執行。使用`setQueueOrTagToMethodName`方法設置,當根據queue名稱沒有找到匹配的方法的時候,就會交給默認的方法去處理
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看