## RabbitMQ
[RabbitMQ](https://www.rabbitmq.com/) 是一個開源的輕量級消息代理,支持多種消息協議。它可以通過分布式部署、聯合配置來滿足高彈性、高可用性的需求。此外,它是部署最廣泛的開源消息代理,在全球范圍內從初創企業到大企業都在使用。
### 安裝
在開始之前,我們必須安裝所需的包:
```bash
$ npm i --save amqplib amqp-connection-manager
```
### 概述
為了使用 **RabbitMQ** 傳輸器,傳遞以下選項對象到 `createMicroservice()` 方法。
> main.ts
```typescript
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false
},
},
});
```
> `Transport` 需要從 `@nestjs/microservices` 包導入。
### 選項
`options`對象和選擇的傳輸器有關,`RabbitMQ`傳輸器暴露了一些屬性:
-| -
---|---
urls|連接urls
queue|服務器要監聽的隊列名稱
prefetchCount|頻道預讀取的數量
isGlobalPrefetchCount|使能預讀取的頻道
noAck|設置為`false`以啟用手動確認模式
queueOptions|額外的隊列選項([更多](https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue))
socketOptions|額外的socket選項([更多](https://www.squaremobius.net/amqp.node/channel_api.html#socket-options))
### 客戶端
像其他微服務傳輸器一樣,你可以在創建`ClientProxy`實例時傳輸[一些選項](https://docs.nestjs.com/microservices/basics#client)。
一種來創建實例的方法是使用`ClientsModule`。要使用`ClientsModule`創建一個客戶端實例,引入并使用`register()`方法并傳遞一個 `options` 對象,該對象具有與前面在 `createMicroservice()` 方法具有相同的屬性。`name`屬性被用于注入`token`,更多關于`ClientsModule`內容參見[這里](https://docs.nestjs.com/microservices/basics#client)。
```typescript
@Module({
imports: [
ClientsModule.register([
{
name: 'MATH_SERVICE',
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false
},
},
},
]),
]
...
})
```
也可以使用其他創建客戶端的實例( `ClientProxyFactory` 或 `@Client()` )。
### 上下文
在更復雜的場景中,您可能希望訪問關于傳入請求的更多信息。在`RabbitMQ` 中,您可以訪問 `RmqContext`對象。
```typescript
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(`Pattern: ${context.getPattern()}`);
}
```
?> `@Payload()`, `@Ctx()` 和 `RedisContext` 需要從 `@nestjs/microservices` 包導入.
要實用原生的`RabbitMQ`消息(包含`properties`, `fields`, 和`content`), 使用 `RmqContext`對象的`getMessage()`方法:
```typescript
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(context.getMessage());
}
```
要獲取`RabbitMQ`頻道的引用,使用`RmqContext`對象的`getChannelRef`方法。
```typescript
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(context.getChannelRef());
}
```
### 消息確認
要確保消息沒有丟失,RabbitMQ支持[消息確認](https://www.rabbitmq.com/confirms.html)。消息確認是指消費者發回給RabbitMQ確認消息已收到,RabbitMQ可以刪除它了。如果消費者不工作(頻道關閉,連接關閉或者TCP連接丟失)也沒有發送確認,RabbitMQ會認為消息沒有被處理,因此會重新將其加入隊列。
要使能手動消息確認模式,將`noAck`設置為`false`:
```typescript
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
noAck: false,
queueOptions: {
durable: false
},
},
```
當手動消費者確認開啟時,我們必須從工作者到到信號發送一個合適的確認信息,以表示我們已經完成了一件工作。
```typescript
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
const channel = context.getChannelRef();
const originalMsg = context.getMessage();
channel.ack(originalMsg);
}
```
### 記錄建設者
要配置消息選項,您可以使用`RmqRecordBuilder`該類(注意:這對于基于事件的流也是可行的)。例如,要設置`headers`和`priority`屬性,使用`setOptions`方法,如下:
~~~typescript
const message = ':cat:';
const record = new RmqRecordBuilder(message)
.setOptions({
headers: {
['x-version']: '1.0.0',
},
priority: 3,
})
.build();
this.client.send('replace-emoji', record).subscribe(...);
~~~
> **提示**`RmqRecordBuilder``@nestjs/microservices`類是從包中導出的。
您也可以通過訪問 來在服務器端讀取這些值`RmqContext`,如下所示:
~~~typescript
@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: RmqContext): string {
const { properties: { headers } } = context.getMessage();
return headers['x-version'] === '1.0.0' ? '??' : '??';
}
~~~
- 介紹
- 概述
- 第一步
- 控制器
- 提供者
- 模塊
- 中間件
- 異常過濾器
- 管道
- 守衛
- 攔截器
- 自定義裝飾器
- 基礎知識
- 自定義提供者
- 異步提供者
- 動態模塊
- 注入作用域
- 循環依賴
- 模塊參考
- 懶加載模塊
- 應用上下文
- 生命周期事件
- 跨平臺
- 測試
- 技術
- 數據庫
- Mongo
- 配置
- 驗證
- 緩存
- 序列化
- 版本控制
- 定時任務
- 隊列
- 日志
- Cookies
- 事件
- 壓縮
- 文件上傳
- 流式處理文件
- HTTP模塊
- Session(會話)
- MVC
- 性能(Fastify)
- 服務器端事件發送
- 安全
- 認證(Authentication)
- 授權(Authorization)
- 加密和散列
- Helmet
- CORS(跨域請求)
- CSRF保護
- 限速
- GraphQL
- 快速開始
- 解析器(resolvers)
- 變更(Mutations)
- 訂閱(Subscriptions)
- 標量(Scalars)
- 指令(directives)
- 接口(Interfaces)
- 聯合類型
- 枚舉(Enums)
- 字段中間件
- 映射類型
- 插件
- 復雜性
- 擴展
- CLI插件
- 生成SDL
- 其他功能
- 聯合服務
- 遷移指南
- Websocket
- 網關
- 異常過濾器
- 管道
- 守衛
- 攔截器
- 適配器
- 微服務
- 概述
- Redis
- MQTT
- NATS
- RabbitMQ
- Kafka
- gRPC
- 自定義傳輸器
- 異常過濾器
- 管道
- 守衛
- 攔截器
- 獨立應用
- Cli
- 概述
- 工作空間
- 庫
- 用法
- 腳本
- Openapi
- 介紹
- 類型和參數
- 操作
- 安全
- 映射類型
- 裝飾器
- CLI插件
- 其他特性
- 遷移指南
- 秘籍
- CRUD 生成器
- 熱重載
- MikroORM
- TypeORM
- Mongoose
- 序列化
- 路由模塊
- Swagger
- 健康檢查
- CQRS
- 文檔
- Prisma
- 靜態服務
- Nest Commander
- 問答
- Serverless
- HTTP 適配器
- 全局路由前綴
- 混合應用
- HTTPS 和多服務器
- 請求生命周期
- 常見錯誤
- 實例
- 遷移指南
- 發現
- 誰在使用Nest?