## 隊列
隊列是一種有用的設計模式,可以幫助你處理一般應用規模和性能的挑戰。一些隊列可以幫助你處理的問題示例包括:
- 平滑輸出峰值。例如,如果用戶可以在任何時間創建資源敏感型任務,你可以將其添加到一個消息隊列中而不是同步執行。然后你可以通過工作者進程從隊列中以一個可控的方式取出進程。在應用規模增大時,你可以輕松添加新的隊列消費者來提高后端任務處理能力。
- 將可能阻塞`Node.js`事件循環的整體任務打碎。例如,如果一個用戶請求是 CPU 敏感型工作,例如音頻轉碼,你可以將其委托給其他進程,從而保證用戶接口進程保持響應。
- 在不同的服務間提供一個可信的通訊通道。例如,你可以將任務(工作)加入一個進程或服務,并由另一個進程或服務來消費他們。你可以在由其他任何進程或服務執行的工作完成、錯誤或者其他狀態變化時得到通知(通過監聽狀態事件)。當隊列生產者或者消費者失敗時,他們的狀態會被保留,任務將在 node 重啟后自動重啟。
Nest 提供了`@nestjs/bull`包,這是[Bull](https://github.com/OptimalBits/bull)包的一個包裝器,Bull 是一個流行的、支持良好的、高性能的基于 Nodejs 的消息隊列系統應用。該包將 Bull 隊列以 Nest 友好的方式添加到你的應用中。
Bull 使用[Redis](https://redis.io/)持久化工作數據,因此你需要在你的系統中安裝 Redis。因為他是基于 Redis 的,你的隊列結構可以是完全分布式的并且和平臺無關。例如,你可以有一些隊列[生產者](https://docs.nestjs.com/techniques/queues#producers)、[消費者](https://docs.nestjs.com/techniques/queues#consumers)和[監聽者](https://docs.nestjs.com/techniques/queues#event-listeners),他們運行在 Nest 的一個或多個節點上,同時,其他生產者、消費者和監聽者在其他 Node.js 平臺或者其他網絡節點上。
本章使用`@nestjs/bull`包,我們同時推薦閱讀[BUll 文檔](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md)來獲取更多背景和應用細節。
### 安裝
要開始使用,我們首先安裝需要的依賴:
```typescript
$ npm install --save @nestjs/bull bull
$ npm install --save-dev @types/bull
```
一旦安裝過程完成,我們可以在根`AppModule`中導入`BullModule`。
> app.module.ts
```typescript
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
@Module({
imports: [
BullModule.registerQueue({
name: 'audio',
redis: {
host: 'localhost',
port: 6379,
},
}),
],
})
export class AppModule {}
```
`registerQueue()`方法用于實例化并/或注冊隊列。隊列在不同的模塊和進程之間共享,在底層則通過同樣的憑據連接到同樣的 Redis 數據庫。每個隊列由其`name`屬性區分(如下),當共享隊列(跨模塊/進程)時,第一個`registerQueue()`方法同時實例化該隊列并向模塊注冊它。其他模塊(在相同或者不同進程下)則簡單地注冊隊列。隊列注冊創建一個`injection token`,它可以被用在給定 Nest 模塊中獲取隊列。
針對每個隊列,傳遞一個包含下列屬性的配置對象:
-`name:string`- 一個隊列名稱,它可以被用作`injection token`(用于將隊列注冊到控制器/提供者),也可以作為裝飾器參數來將消費者類和監聽者與隊列聯系起來。是必須的。 -`limiter:RateLimiter`-該選項用于確定消息隊列處理速率,查看[RateLimiter](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queue)獲取更多信息。可選的。 -`redis:RedisOpts`-該選項用于配置 Redis 連接,查看[RedisOpts](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queue)獲取更多信息。可選的。 -`prefix: string`-隊列所有鍵的前綴。可選的。 -`defaultJobOptions: JobOpts`-選項用以控制新任務的默認屬性。查看[JobOpts](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queueadd)獲取更多信息。可選的。 -`settings: AdvancedSettings`-高級隊列配置設置。這些通常不需要改變。查看[AdvancedSettings](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queue)獲取更多信息。可選的。
注意,`name`屬性是必須的。其他選項是可選的,為隊列行為提供更細節的控制。這些會直接傳遞給 Bull 的`Queue`構造器。在[這里](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queue)閱讀更多選項。當在第二個或者子模塊中注冊一個隊列時,最佳時間是省略配置對象中除`name`屬性之外的所有選項。這些選項僅應該在實例化隊列的模塊中確定。
> 在`registerQueue()`方法中傳遞多個逗號分隔的選項對象來創建多個隊列。
由于任務在 Redis 中是持久化的,每次當一個特定名稱的隊列被實例化時(例如,當一個 app 啟動/重啟時),它嘗試處理任何可能在前一個舊的任務遺留未完成的`session`。
每個隊里可能有一個或很多生產者、消費者以及監聽者。消費者從一個特定命令隊列中獲取任務:FIFO(默認,先進先出),LIFO(后進先出)或者依據優先級。
控制隊列處理命令在[這里](https://docs.nestjs.com/techniques/queues#consumers)討論。
### 生產者
任務生產者添加任務到隊列中。生產者是典型的應用服務(Nest [提供者](https://docs.nestjs.com/providers))。要添加工作到一個隊列,首先注冊隊列到服務中:
```typescript
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';
@Injectable()
export class AudioService {
constructor(@InjectQueue('audio') private audioQueue: Queue) {}
}
```
> `@InjectQueue()`裝飾器由其名稱指定隊列,像它在`registerQueue()`方法中提供的那樣(例如,`audio`)。
現在,通過調用隊列的`add()`方法添加一個任務,傳遞一個用戶定義的任務對象。任務表現為序列化的`JavaScript`對象(因為它們被存儲在 Redis 數據庫中)。你傳遞的任務形式是可選的;用它來在語義上表示你任務對象:
```typescript
const job = await this.audioQueue.add({
foo: 'bar',
});
```
### 命名的任務
任務需要獨一無二的名字。這允許你創建專用的[消費者](https://docs.nestjs.com/techniques/queues#consumers),這將僅處理給定名稱的處理任務。
```typescript
const job = await this.audioQueue.add('transcode', {
foo: 'bar',
});
```
> 當使用命名任務時,你必須為每個添加到隊列中的特有名稱創建處理者,否則隊列會反饋缺失了給定任務的處理器。查看[這里](https://docs.nestjs.com/techniques/queues#consumers)閱讀更多關于消費命名任務的信息。
### 任務選項
任務可以包括附加選項。在`Quene.add()`方法的`job`參數之后傳遞選項對象。任務選項屬性有:
- `priority: number`-選項優先級值。范圍從 1(最高優先)到 MAX_INT(最低優先)。注意使用屬性對性能有輕微影響,因此要小心使用。
- `delay: number`- 任務執行前等待的時間(毫秒)。注意,為了精確延時,服務端和客戶端時鐘應該同步。
- `attempts: number`-任務結束前總的嘗試次數。
- `repeat: RepeatOpts`-按照定時設置重復任務記錄,查看[RepeatOpts](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queueadd)。
- `backoff: number | BackoffOpts`- 如果任務失敗,自動重試閃避設置,查看[BackoffOpts](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queueadd)。
- `lifo: boolean`-如果為`true`,從隊列右端添加任務以替代從左邊添加(默認為 false)。
- `timeout: number`-任務超時失敗的毫秒數。
- `jobId: number | string`- 覆蓋任務 ID-默認地,任務 ID 是唯一的整數,但你可以使用該參數覆蓋它。如果你使用這個選項,你需要保證`jobId`是唯一的。如果你嘗試添加一個包含已有 id 的任務,它不會被添加。
- `removeOnComplete: boolean | number`-如果為`true`,當任務完成時移除任務。一個數字用來指定要保存的任務數。默認行為是將完成的工作保存在已完成的設置中。
- `removeOnFail: boolean | number`-如果為`true`,當所有嘗試失敗時移除任務。一個數字用來指定要保存的任務數。默認行為是將失敗的任務保存在已失敗的設置中。
- `stackTraceLimit: number`-限制在`stacktrace`中保存的堆棧跟蹤線。
這里是一些帶有任務選項的自定義任務示例。
要延遲任務的開始,使用`delay`配置屬性:
```typescript
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ delay: 3000 } // 3 seconds delayed
);
```
要從右端添加任務到隊列(以 LIFO(后進先出)處理任務),設置配置對象的`lifo`屬性為`true`。
```typescript
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ lifo: true }
);
```
要優先一個任務,使用`priority`屬性。
```typescript
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ priority: 2 }
);
```
### 消費者
消費者是一個類,定義的方法要么處理添加到隊列中的任務,要么監聽隊列的事件,或者兩者皆有。使用`@Processor()`裝飾器來定義消費者類,如下:
```typescript
import { Processor } from '@nestjs/bull';
@Processor('audio')
export class AudioConsumer {}
```
裝飾器的字符串參數(例如,`audio`)是和類方法關聯的隊列名稱。
在消費者類中,使用`@Process()`裝飾器來裝飾任務處理者。
```typescript
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('audio')
export class AudioConsumer {
@Process()
async transcode(job: Job<unknown>) {
let progress = 0;
for (i = 0; i < 100; i++) {
await doSomething(job.data);
progress += 10;
job.progress(progress);
}
return {};
}
}
```
裝飾器方法(例如`transcode()`) 在工作空閑或者隊列中有消息要處理的時候被調用。該處理器方法接受`job`對象作為其僅有的參數。處理器方法的返回值被保存在任務對象中,可以在之后被訪問,例如,在用于完成事件的監聽者中。
`Job`對象有多個方法,允許你和他們的狀態交互。例如,上述代碼使用`progress()`方法來更新工作進程。查看[這里](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#job)以了解完整的`Job`對象 API 參照。
你可以指定一個任務處理方法,僅處理指定類型(包含特定`name`的任務)的任務,這可以通過如下所述的將`name`傳遞給`@Process()`裝飾器完成。你在一個給定消費者類中可以有多個`@Process()`處理器,以反應每個任務類型(`name`),確保每個`name`有相應的處理者。
```typescript
@Process('transcode')
async transcode(job: Job<unknown>) { ... }
```
### 事件監聽者
當隊列和/或任務狀態改變時,`Bull`生成一個有用的事件集合。Nest 提供了一個裝飾器集合,允許訂閱一系列標準核心事件集合。他們從`@nestjs/bull`包中導出。
事件監聽者必須在一個[消費者](https://docs.nestjs.com/techniques/queues#consumers)類中聲明(通過`@Processor()`裝飾器)。要監聽一個事件,使用如下表格之一的裝飾器來聲明一個事件處理器。例如,當一個任務進入`audio`隊列活躍狀態時,要監聽其發射的事件,使用下列結構:
```typescript
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('audio')
export class AudioConsumer {
@OnQueueActive()
onActive(job: Job) {
console.log(
`Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
);
}
```
鑒于 BUll 運行于分布式(多 node)環境,它定義了本地事件概念。該概念可以辨識出一個由完整的單一進程觸發的事件,或者由不同進程共享的隊列。一個本地事件是指在本地進程中觸發的一個隊列行為或者狀態變更。換句話說,當你的事件生產者和消費者是本地單進程時,隊列中所有事件都是本地的。
當一個隊列在多個進程中共享時,我們可能要遇到全局事件。對一個由其他進程觸發的事件通知器進程的監聽者來說,它必須注冊為全局事件。
當相應事件發射時事件處理器被喚醒。該處理器被下表所示的簽名調用,提供訪問事件相關的信息。我們討論下面簽名中本地和全局事件處理器。
| 本地事件監聽者 | 全局事件監聽者 | 處理器方法簽名/當觸發時 |
| ------------------- | ------------------------- | ----------------------------------------------------------------------------------------------------------|
| @OnQueueError() | @OnGlobalQueueError() | handler(error: Error) - 當錯誤發生時,`error`包括觸發錯誤 |
| @OnQueueWaiting() | @OnGlobalQueueWaiting() | handler(jobId: number \| string) - 一旦工作者空閑就等待執行的任務,`jobId`包括進入此狀態的 id |
| @OnQueueActive() | @OnGlobalQueueActive() | handler(job: Job)-`job`任務已啟動 |
| @OnQueueStalled() | @OnGlobalQueueStalled() | handler(job: Job)-`job`任務被標記為延遲。這在時間循環崩潰或暫停時進行調試工作時是很有效的 |
| @OnQueueProgress() | @OnGlobalQueueProgress() | handler(job: Job, progress: number)-`job`任務進程被更新為`progress`值 |
| @OnQueueCompleted() | @OnGlobalQueueCompleted() | handler(job: Job, result: any) `job`任務進程成功以`result`結束 |
| @OnQueueFailed() | @OnGlobalQueueFailed() | handler(job: Job, err: Error)`job`任務以`err`原因失敗 |
| @OnQueuePaused() | @OnGlobalQueuePaused() | handler()隊列被暫停 |
| @OnQueueResumed() | @OnGlobalQueueResumed() | handler(job: Job)隊列被恢復 |
| @OnQueueCleaned() | @OnGlobalQueueCleaned() | handler(jobs: Job[], type: string) 舊任務從隊列中被清理,`job`是一個清理任務數組,`type`是要清理的任務類型 |
| @OnQueueDrained() | @OnGlobalQueueDrained() | handler()在隊列處理完所有等待的任務(除非有些尚未處理的任務被延遲)時發射出 |
| @OnQueueRemoved() | @OnGlobalQueueRemoved() | handler(job: Job)`job`任務被成功移除 |
當監聽全局事件時,簽名方法可能和本地有一點不同。特別地,本地版本的任何方法簽名接受`job`對象的方法簽名而不是全局版本的`jobId(number)`。要在這種情況下獲取實際的`job`對象的引用,使用`Queue#getJob`方法。這種調用可能需要等待,因此處理者應該被聲明為`async`,例如:
```typescript
@OnGlobalQueueCompleted()
async onGlobalCompleted(jobId: number, result: any) {
const job = await this.immediateQueue.getJob(jobId);
console.log('(Global) on completed: job ', job.id, ' -> result: ', result);
}
```
> 要獲取一個`Queue`對象(使用`getJob()`調用),你當然必須注入它。同時,隊列必須注冊到你要注入的模塊中。
在特定事件監聽器裝飾器之外,你可以使用通用的`@OnQueueEvent()`裝飾器與`BullQueueEvents`或者`BullQueueGlobalEvents`枚舉相結合。在[這里](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#events)閱讀更多有關事件的內容。
### 隊列管理
隊列有一個 API 來實現管理功能比如暫停、恢復、檢索不同狀態的任務數量等。你可以在[這里](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queue)找到完整的隊列 API。直接在`Queue`對象上調用這些方法,如下所示的暫停/恢復示例。
使用`pause()`方法調用來暫停隊列。一個暫停的隊列在恢復前將不會處理新的任務,但會繼續處理完當前執行的任務。
```typescript
await audioQueue.pause();
```
要恢復一個暫停的隊列,使用`resume()`方法,如下:
```typescript
await audioQueue.resume();
```
## 分離進程[#](#separate-processes)
作業處理程序也可以在單獨的(分叉的)進程([source](https://github.com/OptimalBits/bull#separate-processes))中運行。這有幾個優點:
* 該進程是沙盒化的,因此如果它崩潰,它不會影響工作人員。
* 您可以在不影響隊列的情況下運行阻塞代碼(作業不會停止)。
* 更好地利用多核 CPU。
* 與 redis 的連接更少。
>app.module.ts
~~~ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { join } from 'path';
@Module({
imports: [
BullModule.registerQueue({
name: 'audio',
processors: [join(__dirname, 'processor.js')],
}),
],
})
export class AppModule {}
~~~
請注意,由于您的函數是在分叉進程中執行的,因此依賴注入(和 IoC 容器)將不可用。這意味著您的處理器函數將需要包含(或創建)它所需的所有外部依賴項實例。
>processor.ts
~~~ts
import { Job, DoneCallback } from 'bull';
export default function (job: Job, cb: DoneCallback) {
console.log(`[${process.pid}] ${JSON.stringify(job.data)}`);
cb(null, 'It works');
}
~~~
### 異步配置
你可能需要異步而不是靜態傳遞隊列選項。在這種情況下,使用`registerQueueAsync()`方法,可以提供不同的異步配置方法。
一個方法是使用工廠函數:
```typescript
BullModule.registerQueueAsync({
name: 'audio',
useFactory: () => ({
redis: {
host: 'localhost',
port: 6379,
},
}),
});
```
我們的工廠函數方法和其他[異步提供者](https://docs.nestjs.com/fundamentals/async-providers)(它可以是`async`的并可以使用`inject`來注入)方法相同。
```typescript
BullModule.registerQueueAsync({
name: 'audio',
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
redis: {
host: configService.get('QUEUE_HOST'),
port: +configService.get('QUEUE_PORT'),
},
}),
inject: [ConfigService],
});
```
可選的,你可以使用`useClass`語法。
```typescript
BullModule.registerQueueAsync({
name: 'audio',
useClass: BullConfigService,
});
```
上述結構在`BullModule`中實例化`BullConfigService`,并通過調用`createBullOptions()`來用它提供一個選項對象。注意這意味著`BullConfigService`要實現`BullOptionsFactory`工廠接口,如下:
```typescript
@Injectable()
class BullConfigService implements BullOptionsFactory {
createBullOptions(): BullModuleOptions {
return {
redis: {
host: 'localhost',
port: 6379,
},
};
}
}
```
要阻止在`BullModule`中創建`BullConfigService`并使用一個從其他模塊導入的提供者,可以使用`useExisting`語法。
```typescript
BullModule.registerQueueAsync({
name: 'audio',
imports: [ConfigModule],
useExisting: ConfigService,
});
```
這個結構和`useClass`有一個根本區別——`BullModule`將查找導入的模塊來重用現有的`ConfigServie`而不是實例化一個新的。
### 示例
一個可用的示例見[這里](https://github.com/nestjs/nest/tree/master/sample/26-queues)。
- 介紹
- 概述
- 第一步
- 控制器
- 提供者
- 模塊
- 中間件
- 異常過濾器
- 管道
- 守衛
- 攔截器
- 自定義裝飾器
- 基礎知識
- 自定義提供者
- 異步提供者
- 動態模塊
- 注入作用域
- 循環依賴
- 模塊參考
- 懶加載模塊
- 應用上下文
- 生命周期事件
- 跨平臺
- 測試
- 技術
- 數據庫
- Mongo
- 配置
- 驗證
- 緩存
- 序列化
- 版本控制
- 定時任務
- 隊列
- 日志
- Cookies
- 事件
- 壓縮
- 文件上傳
- 流式處理文件
- HTTP模塊
- Session(會話)
- MVC
- 性能(Fastify)
- 服務器端事件發送
- 安全
- 認證(Authentication)
- 授權(Authorization)
- 加密和散列
- Helmet
- CORS(跨域請求)
- CSRF保護
- 限速
- GraphQL
- 快速開始
- 解析器(resolvers)
- 變更(Mutations)
- 訂閱(Subscriptions)
- 標量(Scalars)
- 指令(directives)
- 接口(Interfaces)
- 聯合類型
- 枚舉(Enums)
- 字段中間件
- 映射類型
- 插件
- 復雜性
- 擴展
- CLI插件
- 生成SDL
- 其他功能
- 聯合服務
- 遷移指南
- Websocket
- 網關
- 異常過濾器
- 管道
- 守衛
- 攔截器
- 適配器
- 微服務
- 概述
- Redis
- MQTT
- NATS
- RabbitMQ
- Kafka
- gRPC
- 自定義傳輸器
- 異常過濾器
- 管道
- 守衛
- 攔截器
- 獨立應用
- Cli
- 概述
- 工作空間
- 庫
- 用法
- 腳本
- Openapi
- 介紹
- 類型和參數
- 操作
- 安全
- 映射類型
- 裝飾器
- CLI插件
- 其他特性
- 遷移指南
- 秘籍
- CRUD 生成器
- 熱重載
- MikroORM
- TypeORM
- Mongoose
- 序列化
- 路由模塊
- Swagger
- 健康檢查
- CQRS
- 文檔
- Prisma
- 靜態服務
- Nest Commander
- 問答
- Serverless
- HTTP 適配器
- 全局路由前綴
- 混合應用
- HTTPS 和多服務器
- 請求生命周期
- 常見錯誤
- 實例
- 遷移指南
- 發現
- 誰在使用Nest?