<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                [TOC] `RabbitTemplate`類是簡化RabbitMQ訪問的工具類(發送和接收消息) > 總結: > 1.使用RabbitTemplate進行消息的發送。 > 2.使用SimpleMessageListenerContainer類監聽隊列,進行消息的消費。 ~~~xml <dependencies> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.3.RELEASE</version> </dependency> </dependencies> ~~~ **配置類** 將`ConnectionFactory`和`RabbitTemplate`納入到spring容器中 ~~~java import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MQConfig { @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672"); return factory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); return rabbitAdmin; } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); //設置Exchange默認操作的exchange和routingkey rabbitTemplate.setExchange("zhihao.direct.exchange"); rabbitTemplate.setRoutingKey("zhihao.debug"); return rabbitTemplate; } } ~~~ **測試類** `RabbitTemplate#send`方法發送消息, ~~~java import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.ComponentScan; @ComponentScan public class Application { public static void main(String[] args) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class); RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); System.out.println(rabbitTemplate); MessageProperties messageProperties = new MessageProperties(); messageProperties.getHeaders().put("desc","消息發送"); messageProperties.getHeaders().put("type",10); Message message = new Message("hello".getBytes(),messageProperties); /** * 調用rabbitTemplate的send方法發送消息,如果沒有指定exchange,Routing,則使用聲明Exchange指定的 * exchange,Routing * 如果RabbitTemplate沒有設置,則默認的exchange 是DEFAULT_EXCHANGE為"", * 默認的routkey是DEFAULT_ROUTING_KEY為"" */ //1. //rabbitTemplate.send(message); //2.指定Routingkey,而exchange是Rabbitmq默認指定的 //rabbitTemplate.send("zhihao.error",message); //3.即指定exchange,又指定了routing_key //rabbitTemplate.send("zhihao.login","ulogin",message); /** * 4.使用默認的defaultExchange進行投遞消息,route key就是隊列名,指定correlation_id屬性,correlation_id屬性是rabbitmq 進行異步rpc進行標識每次請求的唯一 * id,下面會講到 */ rabbitTemplate.send("","zhihao.order.queue",message,new CorrelationData("spring.amqp")); context.close(); } } ~~~ 查看web管控臺發現消息都發送成功了。 使用`RabbitTemplate#convertAndSend`方法發送消息, ~~~ import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.ComponentScan; @ComponentScan public class Application2 { public static void main(String[] args) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class); RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); System.out.println(rabbitTemplate); /** * 使用convertAndSend方法,接收的參數是Object對象,其實是將接收的對象轉換成Message對象,不指定exchange和routing key,那么就 * 使用RabbitTemplate中設置的exchange和routing key */ //rabbitTemplate.convertAndSend("this is my message"); //指定exchange或者指定exchage和routing key //rabbitTemplate.convertAndSend("zhihao.error","this is my message order111"); //rabbitTemplate.convertAndSend("","zhihao.user.queue","this is my message order222"); //發送消息的后置處理器,MessagePostProcessor類的postProcessMessage方法得到的Message就是將參數Object內容轉換成Message對象 /* rabbitTemplate.convertAndSend("", "zhihao.user.queue", "this is my message processor", new MessagePostProcessor() { //在后置處理器上加上order和count屬性 @Override public Message postProcessMessage(Message message) throws AmqpException { System.out.println("-------處理前message-------------"); System.out.println(message); message.getMessageProperties().getHeaders().put("order",10); message.getMessageProperties().getHeaders().put("count",1); return message; } }); */ rabbitTemplate.convertAndSend("", "zhihao.user.queue", "message before", message1 -> { //使用lamdba的語法 MessageProperties properties = new MessageProperties(); properties.getHeaders().put("desc","消息發送"); properties.getHeaders().put("type",10); Message messageafter = new Message("message after".getBytes(),properties); return messageafter; }); context.close(); } } ~~~ **消息的消費** 使用容器的方式進行消費 認識一個接口`org.springframework.amqp.rabbit.listener.MessageListenerContainer`, 其默認實現類`org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer`。 `MessageListenerContainer#setMessageListener`方法,接收的參數類型 `org.springframework.amqp.core.MessageListener`或者`org.springframework.amqp.rabbit.core.ChannelAwareMessageListener`接口 代碼: 將`ConnectionFactory`,`RabbitTemplate`,`SimpleMessageListenerContainer`實例納入到spring容器中進行管理。 ~~~java import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MQConfig { @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672"); return factory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); return rabbitAdmin; } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } /* @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //監聽隊列zhihao.user.queue,監聽隊列可以多個,參數類型是String[] container.setQueueNames("zhihao.user.queue"); container.setMessageListener(new MessageListener() { //具體的消費邏輯 @Override public void onMessage(Message message) { System.out.println("====接收到消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); } }); return container; } */ @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //隊列可以是多個,參數是String的數組 container.setQueueNames("zhihao.user.queue"); container.setMessageListener(new ChannelAwareMessageListener(){ @Override //得到了Channel參數,具體使用會在下面的博客詳細講解 public void onMessage(Message message, Channel channel) throws Exception { System.out.println("====接收到消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); } }); return container; } } ~~~ 關于setMessageListener接收的類型參數 當接收的參數不是`MessageListener`或者`ChannelAwareMessageListener`類型,則會拋出異常,具體的邏輯在`checkMessageListener(messageListener);`方法 應用啟動類,向zhihao.user.queue對象發送消息,并啟動了spring容器,發現監聽到隊列并且消費了。 ~~~java import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.ComponentScan; import java.util.concurrent.TimeUnit; @ComponentScan public class Application { public static void main(String[] args) throws Exception{ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class); RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); System.out.println(rabbitTemplate); rabbitTemplate.convertAndSend("","zhihao.user.queue","hello spring amqp"); TimeUnit.SECONDS.sleep(30); context.close(); } } ~~~ **原理分析** 稍微分析一下原理 `org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer`接口,它繼承`AbstractMessageListenerContainer`類,實現`SmartLifecycle`接口然后繼承`Lifecycle`接口,意味著一旦`SimpleMessageListenerContainer`實例被spring容器管理,其生命周期就托管與spring容器來管理了,意味著當spring容器運行起來的時候,`SimpleMessageListenerContainer`容器啟動,spring容器關閉的時候,`SimpleMessageListenerContainer`容器也關閉了。 設置在spring容器初始化的時候設置SimpleMessageListenerContainer不啟動,(`container.setAutoStartup(false);`) ~~~ @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //隊列可以是多個,參數是String的數組 container.setQueueNames("zhihao.miao.order"); //設置autoStartUp為false表示SimpleMessageListenerContainer沒有啟動 container.setAutoStartup(false); container.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { System.out.println("====接收到消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); } }); return container; } } ~~~ 此時不能消費消息,也可以在應用啟動類啟動`SimpleMessageListenerContainer`容器,在應用啟動類中啟動 ~~~java import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.ComponentScan; import java.util.concurrent.TimeUnit; @ComponentScan public class Application { public static void main(String[] args) throws Exception{ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class); //在spring容器中啟動SimpleMessageListenerContainer context.getBean(SimpleMessageListenerContainer.class).start(); TimeUnit.SECONDS.sleep(30); context.close(); } } ~~~ 以上說明只有`org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer`啟動了,才會消費消息。 **總結** `SimpleMessageListenerContainer`可以托管到spring容器中,由spring容器進行`SimpleMessageListenerContainer`的生命周期管理,默認情況下spring容器啟動的時候,啟動`SimpleMessageListenerContainer`,spring容器關閉,會stop掉`SimpleMessageListenerContainer`,也可以設置`SimpleMessageListenerContainer`手動啟動`(context.getBean(SimpleMessageListenerContainer.class).start();`)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看