用 Java 編寫兩個程序。發送單個消息的生產者和接收消息并打印出來的消費者。
<br/>
在下圖中,P 是我們的生產者,C 是我們的消費者。中間的框是一個隊列,在 RabbitMQ 代
表使用者保留的消息緩沖區。

<br/>
步驟如下:
**1. `pom.xml`添加依賴**
```xml
<dependencies>
<!--rabbitmq 依賴客戶端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一個依賴-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
<!--指定 jdk 編譯版本-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
```
**2. 固定一個隊列的名字**
```java
public class QueueName {
public final static String QUEUE_HELLO = "queue.hello";
}
```
**3. 生產者**
```java
public class Producer {
public static void main(String[] args) throws Exception {
// 1. 連接rabbitmq服務器
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.0.107");
factory.setUsername("admin");
factory.setPassword("admin");
// 2. 注意:Channel 與 Connection 使用完后是需要關閉的,但這里使用了語句try-with-resources,
// 該語句可以在資源使用完后自動關閉資源。
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
/**
* 聲明一個隊列。
* queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5)
* var1: 隊列名稱
* var2: 隊列里面的消息是否持久化。false(默認)不持久化、true持久化
* var3: 該隊列是否只供一個消費者進行消費,即消息是否進行共享。false共享、true共享
* var4: 是否自動刪除。即當最后一個消費者斷開連接后該隊列是否自動刪除。true自動刪除、false不刪除
* var5:其他參數
*/
channel.queueDeclare(QueueName.QUEUE_HELLO, false, false, false, null);
String message = "hello world";
/**
* 發送一個消息。
* basicPublish(String var1, String var2, BasicProperties var3, byte[] var4)
* var1: 發送到哪個交換機,""也是一個交換機類型。
* var2: 路由的 key 是哪個
* var3: 其他參數
* var4: 發送的消息
*/
channel.basicPublish("", QueueName.QUEUE_HELLO, null, message.getBytes());
System.out.println("消息發送完畢.");
}
}
}
```
**4. 消費者**
```java
public class Consumer {
public static void main(String[] args) throws Exception {
// 1. 連接rabbitmq服務器
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.0.107");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("等待接收消息.");
//推送的消息如何進行消費的接口回調
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("接收到的消息: " + message);
};
//取消消費的一個回調接口,如在消費的時候隊列被刪除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消費被中斷.");
};
/**
* 2. 消費者消費消息。
* basicConsume(String var1, boolean var2, DeliverCallback var3, CancelCallback var4)
* var1: 消費哪個隊列
* var2: 消費成功之后是否要自動應答。true自動應答, false手動應答
* var3: 進行消費的接口回調
* var4: 取消消費的接口回調
*/
channel.basicConsume(QueueName.QUEUE_HELLO, true, deliverCallback, cancelCallback);
}
}
```
**5. 測試**
(1)先啟動生成者,控制臺輸出如下。
```
消息發送完畢.
```
(2)啟動消費者,控制臺輸出如下。
```
等待接收消息.
接收到的消息: hello world
```
****
案例代碼:https://gitee.com/flymini/codes01/tree/master/rabbitmq_/com-learn-rabbitmq01
- 消息隊列
- 什么是MQ
- MQ的作用
- MQ的分類
- MQ的選擇
- RabbitMQ
- RabbitMQ是什么
- 四大核心概念
- 工作原理
- 環境搭建
- windows系統下的搭建
- centos7系統下的搭建
- 常用命令
- 服務相關命令
- 管理用戶命令
- 管理隊列命令
- 第一個RabbitMQ程序
- 工作隊列
- 輪詢分發消息
- 消息應答
- 持久化
- 發布確認
- 發布確認原理
- 發布確認策略
- 交換機概念
- 交換機類型
- 無名交換機
- Fanout交換機
- Direct交換機
- Topic交換機
- 死信隊列
- 死信概念
- 死信來源
- 死信實戰
- 延遲隊列
- 什么是延遲隊列
- TTL設置方式
- 隊列TTL延遲隊列
- 消息TTL延遲隊列
- 插件打造延遲隊列
- 延遲隊列總結
- 發布確認高級
- 代碼實現
- 回退消息
- 備份交換機
- 冪等性
- 冪等性概念
- 消息重復消費
- 消費端冪等性保障
- 優先級隊列
- 使用場景
- 設置優先級
- 惰性隊列
- 什么是惰性隊列
- 隊列的兩種模式
- 聲明惰性隊列
- RabbitMQ集群
- 為什么要搭建集群
- 集群搭建步驟
- 集群工作方式
- 脫離集群
- 鏡像隊列
- 高可用負載均衡