[TOC]
# 1.簡單工作模式
> **無交換機**、只有一個隊列一個消費者
> 1.聲明一個隊列
> 2.生產者向隊列發送消息
> 3.消費者消費消息,如果隊列有多個消費者則變成了work模式
1.聲明一個隊列
~~~
@Configuration
public class SimpleQueueConfig {
/**
* 定義簡單隊列名.
*/
private final String simpleQueue = "queue_simple";
@Bean
public Queue simpleQueue() {
return new Queue(simpleQueue);
}
}
~~~
# 自動創建隊列
```
//1. 手動創建,需在RabbitMQ中手動創建myQueue1 隊列,否則報錯
@RabbitListener(queues = “myQueue1”)
public void process1(String message){
log.info(“MqReceiver1: {}”, message);
}
//2. 自動創建隊列
@RabbitListener(queuesToDeclare = @Queue(“myQueue2”))
public void process2(String message){
log.info(“MqReceiver2: {}”, message);
}
//3. 自動創建隊列,Exchange 與 Queue綁定
@RabbitListener(bindings = @QueueBinding(
value = @Queue(“myQueue3”),
exchange = @Exchange(“testExChange”)
))
public void process3(String message){
log.info(“MqReceiver3: {}”, message);
}
```
2.生產者向隊列中發送消息
~~~
@Slf4j
@Component
public class SimpleProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
for (int i = 0; i < 5; i++) {
String message = "簡單消息" + i;
log.info("我是生產信息:{}", message);
rabbitTemplate.convertAndSend( "queue_simple", message);
}
}
}
~~~
3.消費者從隊列中取到消息
~~~
@Slf4j
@Component
public class SimpleConsumers {
@RabbitListener(queues = "queue_simple")
public void readMessage(Message message, Channel channel) throws IOException {
//channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是消費信息:{}", new String(message.getBody()));
}
}
~~~
4.controller
~~~
@RequestMapping("/simple")
public String sendSimpleMsg(){
simpleProducer.sendMessage();
return "success";
}
~~~
訪問接口,輸出
```
2022-06-07 14:58:57.380 INFO 19004 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生產信息:簡單消息0
2022-06-07 14:58:57.387 INFO 19004 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生產信息:簡單消息1
2022-06-07 14:58:57.387 INFO 19004 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生產信息:簡單消息2
2022-06-07 14:58:57.387 INFO 19004 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生產信息:簡單消息3
2022-06-07 14:58:57.388 INFO 19004 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生產信息:簡單消息4
2022-06-07 14:58:57.397 INFO 19004 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消費信息:簡單消息0
2022-06-07 14:58:57.398 INFO 19004 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消費信息:簡單消息1
2022-06-07 14:58:57.398 INFO 19004 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消費信息:簡單消息2
2022-06-07 14:58:57.400 INFO 19004 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消費信息:簡單消息3
2022-06-07 14:58:57.400 INFO 19004 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消費信息:簡單消息4
```
# 2.work隊列
就是簡單模式(也沒有交換機),多了一個消費者,來搶占消息
~~~
@Slf4j
@Component
public class WorkConsumers1 {
@RabbitListener(queues = "queue_simple")
public void readMessage(Message message, Channel channel) throws IOException {
//channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是work消費信息{}", new String(message.getBody()));
}
}
~~~
訪問簡單模式的controller,輸出如下,兩個消費者都在消費消息
```
2022-06-07 15:29:21.002 INFO 16108 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生產信息:簡單消息0
2022-06-07 15:29:21.006 INFO 16108 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生產信息:簡單消息1
2022-06-07 15:29:21.006 INFO 16108 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生產信息:簡單消息2
2022-06-07 15:29:21.007 INFO 16108 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生產信息:簡單消息3
2022-06-07 15:29:21.007 INFO 16108 --- [nio-8080-exec-1] c.t.m.r.service.simple.SimpleProducer : 我是生產信息:簡單消息4
2022-06-07 15:29:21.015 INFO 16108 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消費信息:簡單消息0
2022-06-07 15:29:21.015 INFO 16108 --- [ntContainer#1-1] c.t.m.r.service.work.WorkConsumers1 : 我是work消費信息簡單消息1
2022-06-07 15:29:21.016 INFO 16108 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消費信息:簡單消息2
2022-06-07 15:29:21.016 INFO 16108 --- [ntContainer#1-1] c.t.m.r.service.work.WorkConsumers1 : 我是work消費信息簡單消息3
2022-06-07 15:29:21.016 INFO 16108 --- [ntContainer#0-1] c.t.m.r.service.simple.SimpleConsumers : 我是消費信息:簡單消息4
```
# 3. 發布訂閱模式FanoutExchange
> **多了一個交換機**
> 1.創建隊列
> 2.創建交換機FanoutExchange
> **3.將隊列綁定到交換機**,沒有route-key,只要綁定到交換機,就給發消息
> 4.生產者發送數據到交換機
> 5.交換機把消息廣播到所有綁定的隊列
> 6.隊列的消費者搶占消費消息
## **3.1 配置隊列和交換機**
如下 生命了三個隊列,綁定到了同一個交換機
~~~
@Configuration
public class FanoutMQConfig {
@Bean
public Queue fanoutQueue1() {
return new Queue("queue1", true);
}
@Bean
public Queue fanoutQueue2() {
return new Queue("queue2", false);
}
@Bean
public Queue fanoutQueue3() {
return new Queue("queue3", true);
}
//創建扇形交換機,參數為交換機的名稱
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("FANOUT_EXCHANGE");
}
//將三個隊列都與該交換機綁定起來,無需binding_key
@Bean
public Binding bindingFanoutExchange1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding bindingFanoutExchange2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
@Bean
public Binding bindingFanoutExchange3() {
return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange());
}
}
~~~
## **3.2 生產者**
~~~
@Service
public class FanoutMQSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(String message) {
//廣播消息,無需指定routing_key
amqpTemplate.convertAndSend("FANOUT_EXCHANGE", "", message);
}
}
~~~
## **3.3 消費者**
controller
~~~
@GetMapping("/fanoutExchange/{message}")
public String fanoutExchange(@PathVariable("message") String message) {
fanoutMQSender.send(message);
return "success";
}
~~~
消費者代碼
~~~
@Service
public class FanoutMQReceiver {
private static final Logger logger = LoggerFactory.getLogger(FanoutMQReceiver.class);
//queue1訂閱者1
@RabbitListener(queues = "queue1")
public void receive1(String message) {
logger.info("queue1 receive : fanout message {}", message);
}
//queue1訂閱者2
@RabbitListener(queues = "queue1")
public void receive1_1(String message) {
logger.info("queue1_1 receive : fanout message {}", message);
}
@RabbitListener(queues = "queue2")
public void receive2(String message) {
logger.info("queue2 receive : fanout message {}", message);
}
@RabbitListener(queues = "queue3")
public void receive3(String message) {
logger.info("queue3 receive : fanout message {}", message);
}
}
~~~
隊列queue1,有兩個消費者搶占消費,如下
```
2022-06-07 15:48:09.048 INFO 10784 --- [ntContainer#3-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue2 receive : fanout message 001
2022-06-07 15:48:09.048 INFO 10784 --- [ntContainer#2-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue3 receive : fanout message 001
2022-06-07 15:48:09.048 INFO 10784 --- [ntContainer#1-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue1 receive : fanout message 001
2022-06-07 15:48:11.904 INFO 10784 --- [ntContainer#3-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue2 receive : fanout message 002
2022-06-07 15:48:11.904 INFO 10784 --- [ntContainer#2-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue3 receive : fanout message 002
2022-06-07 15:48:11.904 INFO 10784 --- [ntContainer#0-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue1_1 receive : fanout message 002
2022-06-07 15:48:14.370 INFO 10784 --- [ntContainer#1-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue1 receive : fanout message 003
2022-06-07 15:48:14.370 INFO 10784 --- [ntContainer#3-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue2 receive : fanout message 003
2022-06-07 15:48:14.370 INFO 10784 --- [ntContainer#2-1] c.t.m.r.s.psfanout.FanoutMQReceiver : queue3 receive : fanout message 003
```
界面統計未被消費的消息

## 3.4 新應用接入
如果有新的應用也想訂閱消息怎么辦?如一個新的springboot項目
### 3.4.1 將自己創建的隊列綁定到已有的交換機
1.應用自己生命一個隊列
2.將隊列綁定到上邊的交換機
~~~
@Configuration
public class FanoutMQConfig {
@Bean
public Queue newQuene() {
return new Queue("new_queue", true);
}
//創建扇形交換機,參數為交換機的名稱
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("FANOUT_EXCHANGE");
}
@Bean
public Binding bindingFanoutExchange3() {
return BindingBuilder.bind(newQuene()).to(fanoutExchange());
}
}
~~~
### 3.4.2 訂閱自己的隊列即可
~~~
@Configuration
public class FanoutMQConfig {
@Bean
public Queue newQuene() {
return new Queue("new_queue", true);
}
//創建扇形交換機,參數為交換機的名稱
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("FANOUT_EXCHANGE");
}
@Bean
public Binding bindingFanoutExchange3() {
return BindingBuilder.bind(newQuene()).to(fanoutExchange());
}
}
~~~
原有項目

新項目,接收到交換機發來的消息消息

* [ ] 消費者自動創建隊列,沒有配置的情況下
項目中引入了RabbitMQ,但是在加了@bean配置交換機和queue,啟動項目卻沒自動化創建隊列
原因:RabbitMQ懶加載模式, 需要配置消費者監聽才會創建
```
@RabbitListener(queues = "short_link.add.link.queue")
```
另外一種方式(若Mq中無相應名稱的隊列,會自動創建Queue)
```
@RabbitListener(queuesToDeclare = { @Queue("short_link.add.link.queue") })
```
# 4.路由模式DirectExchange
1.隊列和交換機的綁定多了一個key
2.生產者發送消息到交換機帶著一個key,交換機根據key,選擇消息發送的隊列
應用:可根據消息路由隊列,如地區、不同應用
## 4.1 配置
~~~
@Configuration
public class DirectMQConfig {
@Bean
public Queue directQueue1() {
return new Queue("route-queue1", true);
}
@Bean
public Queue directQueue2() {
return new Queue("route-queue2", false);
}
//創建直連交換機,參數為交換機的名稱
@Bean
public DirectExchange directExchange() {
return new DirectExchange("DIRECT_EXCHANGE");
}
@Bean
public Binding bindingDirectExchange1() {
return BindingBuilder.bind(directQueue1()).to(directExchange()).with("key.1");
}
@Bean
public Binding bindingDirectExchange2() {
return BindingBuilder.bind(directQueue2()).to(directExchange()).with("key.2");
}
}
~~~
## 4.2 producer
~~~
@Service
public class DirectMQSender {
//注入AmqpTemplate接口,該接口定義了發送和接收消息的基本操作
@Autowired
private AmqpTemplate amqpTemplate;
//直接路由
public void send(String message) {
//第一個參數指將消息發送到該名稱的交換機,第二個參數為對應的routing_key,第三個參數為發送的具體消息
amqpTemplate.convertAndSend("DIRECT_EXCHANGE", "key.1", "key1:" + message);
amqpTemplate.convertAndSend("DIRECT_EXCHANGE", "key.2", "key2:" + message);
}
}
~~~
## 4.3 Consumer
~~~
@Service
public class DirectMQReceiver {
private static final Logger logger = LoggerFactory.getLogger(DirectMQReceiver.class);
//此注解表示監聽某個隊列,參數為隊列名
@RabbitListener(queues = "route-queue1")
public void receive1(String message) {
logger.info("queue1 receive : route direct message {}", message);
}
@RabbitListener(queues = "route-queue2")
public void receive2(String message) {
logger.info("queue2 receive : route direct message {}", message);
}
}
~~~
控制臺:
```
2022-06-07 16:58:48.546 INFO 4112 --- [ntContainer#4-1] c.t.m.r.s.routedirect.DirectMQReceiver : queue2 receive : route direct message key2:m1
2022-06-07 16:58:48.546 INFO 4112 --- [ntContainer#5-1] c.t.m.r.s.routedirect.DirectMQReceiver : queue1 receive : route direct message key1:m1
```
# 5. 主體模式TopicExchange
在key的基礎上,提供交換機到隊列的模式匹配
## 5.1 配置
~~~
topic.# 將發送到所有topic.開頭的綁定隊列
~~~
~~~
@Configuration
public class TopicMQConfig {
@Bean
public Queue topicQueue1() {
return new Queue("TOPIC_QUEUE1", true);
}
@Bean
public Queue topicQueue2() {
return new Queue("TOPIC_QUEUE2", true);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("TOPIC_EXCHANGE");
}
//將topicQueue1與topicExchange交換機綁定
@Bean
public Binding bindQueue1() {
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.key1");
}
//將topicQueue2與topicExchange交換機綁定,隊列可以接收topic. 開頭的routing_key
@Bean
public Binding bindQueue2() {
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#");
}
}
~~~
## 5.2 producer
~~~
@Service
public class TopicMQSender {
//注入AmqpTemplate接口,該接口定義了發送和接收消息的基本操作
@Autowired
private AmqpTemplate amqpTemplate;
//直接路由
public void send(String message) {
//第一個參數指將消息發送到該名稱的交換機,第二個參數為對應的routing_key,第三個參數為發送的具體消息
amqpTemplate.convertAndSend("TOPIC_EXCHANGE", "topic.key1", message);
amqpTemplate.convertAndSend("TOPIC_EXCHANGE", "topic.#", "#號匹配消息:"+message);
}
}
~~~
## 5.3 Consumer
~~~
@Service
public class TopicMQReceiver {
private static final Logger logger = LoggerFactory.getLogger(TopicMQReceiver.class);
@RabbitListener(queues = "TOPIC_QUEUE1")
public void receiveQueue1(String message) {
logger.info("receive : TOPIC_QUEUE1 {}", message);
}
@RabbitListener(queues ="TOPIC_QUEUE2")
public void receiveQueue2(String message) {
logger.info("receive : TOPIC_QUEUE2 {}", message);
}
}
~~~
輸出
```
2022-06-07 17:06:10.944 INFO 11812 --- [ntContainer#7-1] c.t.m.r.service.topic.TopicMQReceiver : receive : TOPIC_QUEUE2 m1
2022-06-07 17:06:10.944 INFO 11812 --- [ntContainer#7-1] c.t.m.r.service.topic.TopicMQReceiver : receive : TOPIC_QUEUE2 #號匹配消息:m1
2022-06-07 17:06:10.945 INFO 11812 --- [ntContainer#8-1] c.t.m.r.service.topic.TopicMQReceiver : receive : TOPIC_QUEUE1 m1
```
# 6. 參數設置
## 1. 消費限額
指每次獲取多少條消息,設置為1時,指每次接受一條消息,然后接受下一條。如果數據多時,可以適量增加改配置
```
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # qos=1, 默認250
```
## 2. 消息積壓
>當隊列數據消息積壓時,可以增加消費者的并發,同時調大上邊的消費限額
concurrency min-max 表示并發數,表示有多少個消費者處理隊列里的消息 最小-最大數
```
@RabbitListener(queues = “testDirectQueue”,concurrency=“5-10”)
public class DirectReceiver {
@RabbitHandler
public void process(Map testMessage){
System.out.println(Thread.currentThread().getName()+testMessage.toString());
}
}
```
# 7. 配置json序列化與反序列化
~~~
@Configuration
public class RabbitMQConfig implements RabbitListenerConfigurer {
//序列化 object -> json
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}
@Bean
MessageHandlerMethodFactory messageHandlerMethodFactory(){
DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
messageHandlerMethodFactory.setMessageConverter(mappingJackson2MessageConverter());
return messageHandlerMethodFactory;
}
@Bean
public MappingJackson2MessageConverter mappingJackson2MessageConverter(){
return new MappingJackson2MessageConverter();
}
// 反序列化 json -> object
@Bean
public RabbitTemplate jacksonRabbitTemplate(final ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
}
~~~