通過 cloud-stream-rabbitmq-provider8801 模塊作為消息的生產者,而模塊 cloud-stream-rabbitmq-consumer8802 作為消息的消費者。本次演示采用 RabbitMQ消息中間件。
<br/>
步驟如下:
[TOC]
# 1. 搭建RabbitMQ環境
關于 RabbitMQ 環境的搭建參考 http://www.hmoore.net/king_om/x_1_mq/2483251 。
<br/>
# 2. 構建 8801 消息生產者模塊
**1. 在 8801 模塊的`pom.xml`中添加 stream-rabbit 依賴**
```xml
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
...
</dependencies>
```
**2. 在 8801 模塊的`application.yml`添加相關配置**
```yml
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: #自此處配置要綁定的rabbitmq的服務信息
defaultRabbit: #表示定義的名稱,用于binding整合
type: rabbit #消息組件類型
environment: #設置rabbitmq相關的環境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: #服務的整合處理
output: #消息推送通道
destination: studyExchange #表示要使用的exchange名稱定義(相當于topic)
content-type: application/json #設置消息類型,本次為json
binder: defaultRabbit #設置要綁定的消息服務的具體設置
```
**3. 定義消息推送通道**
(1)*`com.atguigu.springcloud.service.IMessageService `*
```java
public interface IMessageService {
public String send();
}
```
(2)*`com.atguigu.springcloud.service.impl.MessageServiceImpl `*
```java
/**
* 添加注解 @EnableBinding(Source.class) 定義消息推送通道
*/
@EnableBinding(Source.class)
public class MessageServiceImpl implements IMessageService {
@Resource
private MessageChannel output; //消息發送通道
@Override
public String send() {
String serial = UUID.randomUUID().toString();
// 將消息 serial 推送
// public static <T> MessageBuilder<T> withPayload(T payload) {
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("生產者:" + serial);
return serial;
}
}
```
**4. controller層調用消息推送通道**
```java
@RestController
public class SendMessageController {
@Resource
private MessageServiceImpl messageService;
@RequestMapping("/sendMessage")
public String sendMessage() {
String send = messageService.send();
return send;
}
}
```
**5. 測試**
(1)先啟動RabbitMQ,再啟動 8801 生產者模塊,訪問 http://localhost:8801/sendMessage ,多刷新幾次頁面生產者就會生產如下消息。
```
生產者:2aedf80b-2933-4dab-abe5-5e37a02b961e
生產者:acdc3451-1b34-431c-ad32-dc7ca47aa6c8
生產者:74ba6c4c-5195-4b72-a5cf-88652ac0f347
生產者:1e42e7f4-6a5f-4873-93d3-0948319361e7
生產者:b1507cb8-3a4f-4987-9fb8-ce1ce9b970c1
```
(2)訪問RabbitMQ http://localhost:15672/ ,在RabbitMQ中會生成一個 `studyExchange` 主題。

(3)多刷新幾次 http://localhost:8801/sendMessage ,你會看到 生產消息 的速率。

<br/>
# 3. 構建 8802 消息消費者模塊
**1. 8802 模塊的`pom.xml`中添加 stream-rabbit 依賴**
```xml
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
...
</dependencies>
```
**2. 在 8802 模塊的`application.yml`添加相關配置**
```yml
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: #自此處配置要綁定的rabbitmq的服務信息
defaultRabbit: #表示定義的名稱,用于binding整合
type: rabbit #消息組件類型
environment: # 設置rabbitmq的相關的環境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: #服務的整合處理
input: #消息接收通道
destination: studyExchange #表示要使用的exchange名稱定義(相當于一個topic)
content-type: application/json #設置消息類型,本次為json
binder: defaultRabbit #設置要綁定的消息服務的具體設置
```
**3. 在 8802 模塊定義消息接收通道**
```java
/**
* 添加注解 @EnableBinding(Sink.class) 定義消息接收通道
*/
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
/**
* 添加 @StreamListener(Sink.INPUT) 監聽消息隊列
*/
@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
// 通過 getPayload 方法接收消息
System.out.println("消費者:" + message.getPayload());
}
}
```
**4. 測試**
(1)先啟動 RabbitMQ,后啟動 8801 生成者模塊,最后啟動 8802 消費者模塊。
(2)訪問 8801 生產者 http://localhost:8801/sendMessage ,并多刷新幾次頁面,生產者將生產消息,并被消費者接收到消息。
```
生產者:dc6992fe-c6f4-4606-a3b0-63d2b9f9ac3c
生產者:d55519c0-19ba-4699-b2f2-c36dd7d14815
生產者:8b1a6fc4-1d3a-40a0-aea2-c201cc9f9912
生產者:e421c64d-b3de-4e74-b326-f5abe171fb4a
生產者:57e756e5-5630-41e7-8386-4d7dc891c080
消費者:dc6992fe-c6f4-4606-a3b0-63d2b9f9ac3c
消費者:d55519c0-19ba-4699-b2f2-c36dd7d14815
消費者:8b1a6fc4-1d3a-40a0-aea2-c201cc9f9912
消費者:e421c64d-b3de-4e74-b326-f5abe171fb4a
消費者:57e756e5-5630-41e7-8386-4d7dc891c080
```
- 微服務
- 微服務是什么?
- 微服務架構
- 微服務優缺點
- 微服務技術棧
- 微服務框架對比
- SpringCloud
- SpringCloud是什么
- SpringCloud與SpringBoot對比
- SpringCloud與Dubbo對比
- Rest微服務案例
- 總體介紹
- 父工程構建步驟
- 公共模塊構建步驟
- 服務端模塊構建步驟
- 消費端模塊構建步驟
- Eureka服務注冊與發現
- Eureka是什么
- Eureka原理
- Eureka注冊服務中心構建
- 向Eureka注冊已有微服務
- Eureka的自我保護機制
- Eureka服務發現
- Eureka集群配置
- Eureka與Zookeeper對比
- Ribbon負載均衡
- Ribbon是什么
- Ribbon負載均衡演示
- 構建服務端模塊
- 構建消費端模塊
- Ribbon核心組件IRule
- 自定義負載均衡策略
- Ribbon均衡策略優先級
- 輪詢策略算法
- OpenFeign負載均衡
- OpenFeign是什么
- 負載均衡演示
- 日志打印功能
- 導出功能
- Hystrix斷路器
- Hystrix是什么
- 服務熔斷
- Hystrix服務端構建
- 服務熔斷演示
- 服務熔斷類型
- HystrixProperty配置匯總
- 服務降級
- Hystrix客戶端構建
- 服務降級演示
- fallbackFactory
- 熔斷與降級
- 服務監控
- 網關服務Zuul
- Zuul是什么
- Zuul路由服務構建
- 設置訪問映射規則
- Config分布式配置中心
- Config分布式配置中心是什么
- Config服務端與Git通信
- Config客戶端獲取配置
- Config客戶端動態刷新
- Bus消息總線
- Bus消息總線是什么
- Bus消息總線原理
- 廣播通知設計思想
- 廣播通知演示
- 定點通知演示
- Stream消息驅動
- 為什么要引入Stream
- Stream消息驅動是什么
- Stream設計思想
- Stream流程和注解
- Stream案例演示
- 重復消費問題
- 消息持久化
- Sleuth分布式鏈路跟蹤
- Sleuth是什么
- 搭建鏈路監控
- SpringCloud Alibaba
- Nacos注冊與配置中心
- Nacos是什么
- 安裝并運行Nacos
- Nacos注冊中心
- 服務端入住Nacos
- 消費端入住Nacos
- Nacos負載均衡演示
- 服務注冊中心對比
- Nacos的AP和CP轉化
- Nacos配置中心
- 基礎配置演示
- Nacos分類配置
- Nacos集群搭建
- Sentinel實現熔斷與限流
- Sentinel是什么
- Sentinel環境搭建
- Sentinel監控微服務演示
- Sentinel流控規則
- 流量監控的作用
- 設置流控規則
- Sentinel降級規則
- 熔斷降級作用
- 設置降級規則
- Sentinel熱點限流
- 什么是熱點
- 設置熱點限流
- Sentinel系統限流
- @SentinelResource
- @SentinelResource屬性
- @SentinelResource限流演示
- @SentinelResource熔斷演示
- 規則持久化
- 熔斷框架比較
- Seata分布式事務
- 分布式事務問題
- Seata是什么
- Seata分布式事務過程
- Seata環境搭建
- 演示示例
- 業務說明
- 數據庫環境準備
- 微服務環境準備
- 測試
- Consul服務注冊與發現
- Consul是什么
- Consul能做什么
- 環境搭建
- Windows平臺
- 服務端入住Consul
- 消費端入住Consul
- 注冊中心對比
- Zookeeper服務注冊與發現
- Zookeeper是什么
- 環境搭建
- 服務端入住Zookeeper
- 消費端入住Zookeeper
- 網關服務Gateway
- Gateway是什么
- Gateway能做什么
- Gateway對比Zuul
- 三大核心概念
- Gateway工作流
- 環境搭建
- 網關路由配置方式
- 配置文件配置
- 代碼中配置
- 動態路由
- Predicate斷言
- 斷言是什么
- 常用斷言
- Filter過濾器
- 過濾器是什么
- 過濾器種類
- 自定義過濾器