<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                [TOC] # 1.簡單工作模式 > **無交換機**、只有一個隊列一個消費者 > 1.聲明一個隊列 > 2.生產者向隊列發送消息 > 3.消費者消費消息,如果隊列有多個消費者則變成了work模式 1.聲明一個隊列 ~~~ @Configuration public class SimpleQueueConfig { /** * 定義簡單隊列名. */ private final String simpleQueue = "queue_simple"; @Bean public Queue simpleQueue() { return new Queue(simpleQueue); } } ~~~ # 自動創建隊列 ``` //1. 手動創建,需在RabbitMQ中手動創建myQueue1 隊列,否則報錯 @RabbitListener(queues = “myQueue1”) public void process1(String message){ log.info(“MqReceiver1: {}”, message); } //2. 自動創建隊列 @RabbitListener(queuesToDeclare = @Queue(“myQueue2”)) public void process2(String message){ log.info(“MqReceiver2: {}”, message); } //3. 自動創建隊列,Exchange 與 Queue綁定 @RabbitListener(bindings = @QueueBinding( value = @Queue(“myQueue3”), exchange = @Exchange(“testExChange”) )) public void process3(String message){ log.info(“MqReceiver3: {}”, message); } ``` 2.生產者向隊列中發送消息 ~~~ @Slf4j @Component public class SimpleProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage() { for (int i = 0; i < 5; i++) { String message = "簡單消息" + i; log.info("我是生產信息:{}", message); rabbitTemplate.convertAndSend( "queue_simple", message); } } } ~~~ 3.消費者從隊列中取到消息 ~~~ @Slf4j @Component public class SimpleConsumers { @RabbitListener(queues = "queue_simple") public void readMessage(Message message, Channel channel) throws IOException { //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("我是消費信息:{}", new String(message.getBody())); } } ~~~ 4.controller ~~~ @RequestMapping("/simple") public String sendSimpleMsg(){ simpleProducer.sendMessage(); return "success"; } ~~~ 訪問接口,輸出 ``` 2022-06-07 14:58:57.380 INFO 19004 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生產信息:簡單消息0 2022-06-07 14:58:57.387 INFO 19004 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生產信息:簡單消息1 2022-06-07 14:58:57.387 INFO 19004 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生產信息:簡單消息2 2022-06-07 14:58:57.387 INFO 19004 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生產信息:簡單消息3 2022-06-07 14:58:57.388 INFO 19004 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生產信息:簡單消息4 2022-06-07 14:58:57.397 INFO 19004 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消費信息:簡單消息0 2022-06-07 14:58:57.398 INFO 19004 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消費信息:簡單消息1 2022-06-07 14:58:57.398 INFO 19004 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消費信息:簡單消息2 2022-06-07 14:58:57.400 INFO 19004 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消費信息:簡單消息3 2022-06-07 14:58:57.400 INFO 19004 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消費信息:簡單消息4 ``` # 2.work隊列 就是簡單模式(也沒有交換機),多了一個消費者,來搶占消息 ~~~ @Slf4j @Component public class WorkConsumers1 { @RabbitListener(queues = "queue_simple") public void readMessage(Message message, Channel channel) throws IOException { //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("我是work消費信息{}", new String(message.getBody())); } } ~~~ 訪問簡單模式的controller,輸出如下,兩個消費者都在消費消息 ``` 2022-06-07 15:29:21.002 INFO 16108 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生產信息:簡單消息0 2022-06-07 15:29:21.006 INFO 16108 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生產信息:簡單消息1 2022-06-07 15:29:21.006 INFO 16108 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生產信息:簡單消息2 2022-06-07 15:29:21.007 INFO 16108 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生產信息:簡單消息3 2022-06-07 15:29:21.007 INFO 16108 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生產信息:簡單消息4 2022-06-07 15:29:21.015 INFO 16108 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消費信息:簡單消息0 2022-06-07 15:29:21.015 INFO 16108 --- [ntContainer#1-1] c.t.m.r.service.work.WorkConsumers1 : 我是work消費信息簡單消息1 2022-06-07 15:29:21.016 INFO 16108 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消費信息:簡單消息2 2022-06-07 15:29:21.016 INFO 16108 --- [ntContainer#1-1] c.t.m.r.service.work.WorkConsumers1 : 我是work消費信息簡單消息3 2022-06-07 15:29:21.016 INFO 16108 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消費信息:簡單消息4 ``` # 3. 發布訂閱模式FanoutExchange > **多了一個交換機** > 1.創建隊列 > 2.創建交換機FanoutExchange > **3.將隊列綁定到交換機**,沒有route-key,只要綁定到交換機,就給發消息 > 4.生產者發送數據到交換機 > 5.交換機把消息廣播到所有綁定的隊列 > 6.隊列的消費者搶占消費消息 ## **3.1 配置隊列和交換機** 如下 生命了三個隊列,綁定到了同一個交換機 ~~~ @Configuration public class FanoutMQConfig { @Bean public Queue fanoutQueue1() { return new Queue("queue1", true); } @Bean public Queue fanoutQueue2() { return new Queue("queue2", false); } @Bean public Queue fanoutQueue3() { return new Queue("queue3", true); } //創建扇形交換機,參數為交換機的名稱 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("FANOUT_EXCHANGE"); } //將三個隊列都與該交換機綁定起來,無需binding_key @Bean public Binding bindingFanoutExchange1() { return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange()); } @Bean public Binding bindingFanoutExchange2() { return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange()); } @Bean public Binding bindingFanoutExchange3() { return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange()); } } ~~~ ## **3.2 生產者** ~~~ @Service public class FanoutMQSender { @Autowired private AmqpTemplate amqpTemplate; public void send(String message) { //廣播消息,無需指定routing_key amqpTemplate.convertAndSend("FANOUT_EXCHANGE", "", message); } } ~~~ ## **3.3 消費者** controller ~~~ @GetMapping("/fanoutExchange/{message}") public String fanoutExchange(@PathVariable("message") String message) { fanoutMQSender.send(message); return "success"; } ~~~ 消費者代碼 ~~~ @Service public class FanoutMQReceiver { private static final Logger logger = LoggerFactory.getLogger(FanoutMQReceiver.class); //queue1訂閱者1 @RabbitListener(queues = "queue1") public void receive1(String message) { logger.info("queue1 receive : fanout message {}", message); } //queue1訂閱者2 @RabbitListener(queues = "queue1") public void receive1_1(String message) { logger.info("queue1_1 receive : fanout message {}", message); } @RabbitListener(queues = "queue2") public void receive2(String message) { logger.info("queue2 receive : fanout message {}", message); } @RabbitListener(queues = "queue3") public void receive3(String message) { logger.info("queue3 receive : fanout message {}", message); } } ~~~ 隊列queue1,有兩個消費者搶占消費,如下 ``` 2022-06-07 15:48:09.048 INFO 10784 --- [ntContainer#3-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue2 receive : fanout message 001 2022-06-07 15:48:09.048 INFO 10784 --- [ntContainer#2-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue3 receive : fanout message 001 2022-06-07 15:48:09.048 INFO 10784 --- [ntContainer#1-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue1 receive : fanout message 001 2022-06-07 15:48:11.904 INFO 10784 --- [ntContainer#3-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue2 receive : fanout message 002 2022-06-07 15:48:11.904 INFO 10784 --- [ntContainer#2-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue3 receive : fanout message 002 2022-06-07 15:48:11.904 INFO 10784 --- [ntContainer#0-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue1_1 receive : fanout message 002 2022-06-07 15:48:14.370 INFO 10784 --- [ntContainer#1-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue1 receive : fanout message 003 2022-06-07 15:48:14.370 INFO 10784 --- [ntContainer#3-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue2 receive : fanout message 003 2022-06-07 15:48:14.370 INFO 10784 --- [ntContainer#2-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue3 receive : fanout message 003 ``` 界面統計未被消費的消息 ![](https://img.kancloud.cn/2d/3d/2d3da0a53958750a791fe65c9ae0951b_1176x380.png) ## 3.4 新應用接入 如果有新的應用也想訂閱消息怎么辦?如一個新的springboot項目 ### 3.4.1 將自己創建的隊列綁定到已有的交換機 1.應用自己生命一個隊列 2.將隊列綁定到上邊的交換機 ~~~ @Configuration public class FanoutMQConfig { @Bean public Queue newQuene() { return new Queue("new_queue", true); } //創建扇形交換機,參數為交換機的名稱 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("FANOUT_EXCHANGE"); } @Bean public Binding bindingFanoutExchange3() { return BindingBuilder.bind(newQuene()).to(fanoutExchange()); } } ~~~ ### 3.4.2 訂閱自己的隊列即可 ~~~ @Configuration public class FanoutMQConfig { @Bean public Queue newQuene() { return new Queue("new_queue", true); } //創建扇形交換機,參數為交換機的名稱 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("FANOUT_EXCHANGE"); } @Bean public Binding bindingFanoutExchange3() { return BindingBuilder.bind(newQuene()).to(fanoutExchange()); } } ~~~ 原有項目 ![](https://img.kancloud.cn/36/1d/361d2fc1fbedcbdc737156805393ae9c_1234x293.png) 新項目,接收到交換機發來的消息消息 ![](https://img.kancloud.cn/e1/a9/e1a91bec7a7be74c9c75bc24a622c6b7_1366x221.png) * [ ] 消費者自動創建隊列,沒有配置的情況下 項目中引入了RabbitMQ,但是在加了@bean配置交換機和queue,啟動項目卻沒自動化創建隊列 原因:RabbitMQ懶加載模式, 需要配置消費者監聽才會創建 ``` @RabbitListener(queues = "short_link.add.link.queue") ``` 另外一種方式(若Mq中無相應名稱的隊列,會自動創建Queue) ``` @RabbitListener(queuesToDeclare = { @Queue("short_link.add.link.queue") }) ``` # 4.路由模式DirectExchange 1.隊列和交換機的綁定多了一個key 2.生產者發送消息到交換機帶著一個key,交換機根據key,選擇消息發送的隊列 應用:可根據消息路由隊列,如地區、不同應用 ## 4.1 配置 ~~~ @Configuration public class DirectMQConfig { @Bean public Queue directQueue1() { return new Queue("route-queue1", true); } @Bean public Queue directQueue2() { return new Queue("route-queue2", false); } //創建直連交換機,參數為交換機的名稱 @Bean public DirectExchange directExchange() { return new DirectExchange("DIRECT_EXCHANGE"); } @Bean public Binding bindingDirectExchange1() { return BindingBuilder.bind(directQueue1()).to(directExchange()).with("key.1"); } @Bean public Binding bindingDirectExchange2() { return BindingBuilder.bind(directQueue2()).to(directExchange()).with("key.2"); } } ~~~ ## 4.2 producer ~~~ @Service public class DirectMQSender { //注入AmqpTemplate接口,該接口定義了發送和接收消息的基本操作 @Autowired private AmqpTemplate amqpTemplate; //直接路由 public void send(String message) { //第一個參數指將消息發送到該名稱的交換機,第二個參數為對應的routing_key,第三個參數為發送的具體消息 amqpTemplate.convertAndSend("DIRECT_EXCHANGE", "key.1", "key1:" + message); amqpTemplate.convertAndSend("DIRECT_EXCHANGE", "key.2", "key2:" + message); } } ~~~ ## 4.3 Consumer ~~~ @Service public class DirectMQReceiver { private static final Logger logger = LoggerFactory.getLogger(DirectMQReceiver.class); //此注解表示監聽某個隊列,參數為隊列名 @RabbitListener(queues = "route-queue1") public void receive1(String message) { logger.info("queue1 receive : route direct message {}", message); } @RabbitListener(queues = "route-queue2") public void receive2(String message) { logger.info("queue2 receive : route direct message {}", message); } } ~~~ 控制臺: ``` 2022-06-07 16:58:48.546 INFO 4112 --- [ntContainer#4-1] c.t.m.r.s.routedirect.DirectMQReceiver : queue2 receive : route direct message key2:m1 2022-06-07 16:58:48.546 INFO 4112 --- [ntContainer#5-1] c.t.m.r.s.routedirect.DirectMQReceiver : queue1 receive : route direct message key1:m1 ``` # 5. 主體模式TopicExchange 在key的基礎上,提供交換機到隊列的模式匹配 ## 5.1 配置 ~~~ topic.# 將發送到所有topic.開頭的綁定隊列 ~~~ ~~~ @Configuration public class TopicMQConfig { @Bean public Queue topicQueue1() { return new Queue("TOPIC_QUEUE1", true); } @Bean public Queue topicQueue2() { return new Queue("TOPIC_QUEUE2", true); } @Bean public TopicExchange topicExchange() { return new TopicExchange("TOPIC_EXCHANGE"); } //將topicQueue1與topicExchange交換機綁定 @Bean public Binding bindQueue1() { return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.key1"); } //將topicQueue2與topicExchange交換機綁定,隊列可以接收topic. 開頭的routing_key @Bean public Binding bindQueue2() { return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#"); } } ~~~ ## 5.2 producer ~~~ @Service public class TopicMQSender { //注入AmqpTemplate接口,該接口定義了發送和接收消息的基本操作 @Autowired private AmqpTemplate amqpTemplate; //直接路由 public void send(String message) { //第一個參數指將消息發送到該名稱的交換機,第二個參數為對應的routing_key,第三個參數為發送的具體消息 amqpTemplate.convertAndSend("TOPIC_EXCHANGE", "topic.key1", message); amqpTemplate.convertAndSend("TOPIC_EXCHANGE", "topic.#", "#號匹配消息:"+message); } } ~~~ ## 5.3 Consumer ~~~ @Service public class TopicMQReceiver { private static final Logger logger = LoggerFactory.getLogger(TopicMQReceiver.class); @RabbitListener(queues = "TOPIC_QUEUE1") public void receiveQueue1(String message) { logger.info("receive : TOPIC_QUEUE1 {}", message); } @RabbitListener(queues ="TOPIC_QUEUE2") public void receiveQueue2(String message) { logger.info("receive : TOPIC_QUEUE2 {}", message); } } ~~~ 輸出 ``` 2022-06-07 17:06:10.944 INFO 11812 --- [ntContainer#7-1] c.t.m.r.service.topic.TopicMQReceiver : receive : TOPIC_QUEUE2 m1 2022-06-07 17:06:10.944 INFO 11812 --- [ntContainer#7-1] c.t.m.r.service.topic.TopicMQReceiver : receive : TOPIC_QUEUE2 #號匹配消息:m1 2022-06-07 17:06:10.945 INFO 11812 --- [ntContainer#8-1] c.t.m.r.service.topic.TopicMQReceiver : receive : TOPIC_QUEUE1 m1 ``` # 6. 參數設置 ## 1. 消費限額 指每次獲取多少條消息,設置為1時,指每次接受一條消息,然后接受下一條。如果數據多時,可以適量增加改配置 ``` spring: rabbitmq: listener: simple: prefetch: 1 # qos=1, 默認250 ``` ## 2. 消息積壓 >當隊列數據消息積壓時,可以增加消費者的并發,同時調大上邊的消費限額 concurrency min-max 表示并發數,表示有多少個消費者處理隊列里的消息 最小-最大數 ``` @RabbitListener(queues = “testDirectQueue”,concurrency=“5-10”) public class DirectReceiver { @RabbitHandler public void process(Map testMessage){ System.out.println(Thread.currentThread().getName()+testMessage.toString()); } } ``` # 7. 配置json序列化與反序列化 ~~~ @Configuration public class RabbitMQConfig implements RabbitListenerConfigurer { //序列化 object -> json @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) { rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory()); } @Bean MessageHandlerMethodFactory messageHandlerMethodFactory(){ DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory(); messageHandlerMethodFactory.setMessageConverter(mappingJackson2MessageConverter()); return messageHandlerMethodFactory; } @Bean public MappingJackson2MessageConverter mappingJackson2MessageConverter(){ return new MappingJackson2MessageConverter(); } // 反序列化 json -> object @Bean public RabbitTemplate jacksonRabbitTemplate(final ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return rabbitTemplate; } } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看