<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                ## 隊列 隊列是一種有用的設計模式,可以幫助你處理一般應用規模和性能的挑戰。一些隊列可以幫助你處理的問題示例包括: - 平滑輸出峰值。例如,如果用戶可以在任何時間創建資源敏感型任務,你可以將其添加到一個消息隊列中而不是同步執行。然后你可以通過工作者進程從隊列中以一個可控的方式取出進程。在應用規模增大時,你可以輕松添加新的隊列消費者來提高后端任務處理能力。 - 將可能阻塞`Node.js`事件循環的整體任務打碎。例如,如果一個用戶請求是 CPU 敏感型工作,例如音頻轉碼,你可以將其委托給其他進程,從而保證用戶接口進程保持響應。 - 在不同的服務間提供一個可信的通訊通道。例如,你可以將任務(工作)加入一個進程或服務,并由另一個進程或服務來消費他們。你可以在由其他任何進程或服務執行的工作完成、錯誤或者其他狀態變化時得到通知(通過監聽狀態事件)。當隊列生產者或者消費者失敗時,他們的狀態會被保留,任務將在 node 重啟后自動重啟。 Nest 提供了`@nestjs/bull`包,這是[Bull](https://github.com/OptimalBits/bull)包的一個包裝器,Bull 是一個流行的、支持良好的、高性能的基于 Nodejs 的消息隊列系統應用。該包將 Bull 隊列以 Nest 友好的方式添加到你的應用中。 Bull 使用[Redis](https://redis.io/)持久化工作數據,因此你需要在你的系統中安裝 Redis。因為他是基于 Redis 的,你的隊列結構可以是完全分布式的并且和平臺無關。例如,你可以有一些隊列[生產者](https://docs.nestjs.com/techniques/queues#producers)、[消費者](https://docs.nestjs.com/techniques/queues#consumers)和[監聽者](https://docs.nestjs.com/techniques/queues#event-listeners),他們運行在 Nest 的一個或多個節點上,同時,其他生產者、消費者和監聽者在其他 Node.js 平臺或者其他網絡節點上。 本章使用`@nestjs/bull`包,我們同時推薦閱讀[BUll 文檔](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md)來獲取更多背景和應用細節。 ### 安裝 要開始使用,我們首先安裝需要的依賴: ```typescript $ npm install --save @nestjs/bull bull $ npm install --save-dev @types/bull ``` 一旦安裝過程完成,我們可以在根`AppModule`中導入`BullModule`。 > app.module.ts ```typescript import { Module } from '@nestjs/common'; import { BullModule } from '@nestjs/bull'; @Module({ imports: [ BullModule.registerQueue({ name: 'audio', redis: { host: 'localhost', port: 6379, }, }), ], }) export class AppModule {} ``` `registerQueue()`方法用于實例化并/或注冊隊列。隊列在不同的模塊和進程之間共享,在底層則通過同樣的憑據連接到同樣的 Redis 數據庫。每個隊列由其`name`屬性區分(如下),當共享隊列(跨模塊/進程)時,第一個`registerQueue()`方法同時實例化該隊列并向模塊注冊它。其他模塊(在相同或者不同進程下)則簡單地注冊隊列。隊列注冊創建一個`injection token`,它可以被用在給定 Nest 模塊中獲取隊列。 針對每個隊列,傳遞一個包含下列屬性的配置對象: -`name:string`- 一個隊列名稱,它可以被用作`injection token`(用于將隊列注冊到控制器/提供者),也可以作為裝飾器參數來將消費者類和監聽者與隊列聯系起來。是必須的。 -`limiter:RateLimiter`-該選項用于確定消息隊列處理速率,查看[RateLimiter](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queue)獲取更多信息。可選的。 -`redis:RedisOpts`-該選項用于配置 Redis 連接,查看[RedisOpts](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queue)獲取更多信息。可選的。 -`prefix: string`-隊列所有鍵的前綴。可選的。 -`defaultJobOptions: JobOpts`-選項用以控制新任務的默認屬性。查看[JobOpts](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queueadd)獲取更多信息。可選的。 -`settings: AdvancedSettings`-高級隊列配置設置。這些通常不需要改變。查看[AdvancedSettings](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queue)獲取更多信息。可選的。 注意,`name`屬性是必須的。其他選項是可選的,為隊列行為提供更細節的控制。這些會直接傳遞給 Bull 的`Queue`構造器。在[這里](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queue)閱讀更多選項。當在第二個或者子模塊中注冊一個隊列時,最佳時間是省略配置對象中除`name`屬性之外的所有選項。這些選項僅應該在實例化隊列的模塊中確定。 > 在`registerQueue()`方法中傳遞多個逗號分隔的選項對象來創建多個隊列。 由于任務在 Redis 中是持久化的,每次當一個特定名稱的隊列被實例化時(例如,當一個 app 啟動/重啟時),它嘗試處理任何可能在前一個舊的任務遺留未完成的`session`。 每個隊里可能有一個或很多生產者、消費者以及監聽者。消費者從一個特定命令隊列中獲取任務:FIFO(默認,先進先出),LIFO(后進先出)或者依據優先級。 控制隊列處理命令在[這里](https://docs.nestjs.com/techniques/queues#consumers)討論。 ### 生產者 任務生產者添加任務到隊列中。生產者是典型的應用服務(Nest [提供者](https://docs.nestjs.com/providers))。要添加工作到一個隊列,首先注冊隊列到服務中: ```typescript import { Injectable } from '@nestjs/common'; import { Queue } from 'bull'; import { InjectQueue } from '@nestjs/bull'; @Injectable() export class AudioService { constructor(@InjectQueue('audio') private audioQueue: Queue) {} } ``` > `@InjectQueue()`裝飾器由其名稱指定隊列,像它在`registerQueue()`方法中提供的那樣(例如,`audio`)。 現在,通過調用隊列的`add()`方法添加一個任務,傳遞一個用戶定義的任務對象。任務表現為序列化的`JavaScript`對象(因為它們被存儲在 Redis 數據庫中)。你傳遞的任務形式是可選的;用它來在語義上表示你任務對象: ```typescript const job = await this.audioQueue.add({ foo: 'bar', }); ``` ### 命名的任務 任務需要獨一無二的名字。這允許你創建專用的[消費者](https://docs.nestjs.com/techniques/queues#consumers),這將僅處理給定名稱的處理任務。 ```typescript const job = await this.audioQueue.add('transcode', { foo: 'bar', }); ``` > 當使用命名任務時,你必須為每個添加到隊列中的特有名稱創建處理者,否則隊列會反饋缺失了給定任務的處理器。查看[這里](https://docs.nestjs.com/techniques/queues#consumers)閱讀更多關于消費命名任務的信息。 ### 任務選項 任務可以包括附加選項。在`Quene.add()`方法的`job`參數之后傳遞選項對象。任務選項屬性有: - `priority: number`-選項優先級值。范圍從 1(最高優先)到 MAX_INT(最低優先)。注意使用屬性對性能有輕微影響,因此要小心使用。 - `delay: number`- 任務執行前等待的時間(毫秒)。注意,為了精確延時,服務端和客戶端時鐘應該同步。 - `attempts: number`-任務結束前總的嘗試次數。 - `repeat: RepeatOpts`-按照定時設置重復任務記錄,查看[RepeatOpts](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queueadd)。 - `backoff: number | BackoffOpts`- 如果任務失敗,自動重試閃避設置,查看[BackoffOpts](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queueadd)。 - `lifo: boolean`-如果為`true`,從隊列右端添加任務以替代從左邊添加(默認為 false)。 - `timeout: number`-任務超時失敗的毫秒數。 - `jobId: number | string`- 覆蓋任務 ID-默認地,任務 ID 是唯一的整數,但你可以使用該參數覆蓋它。如果你使用這個選項,你需要保證`jobId`是唯一的。如果你嘗試添加一個包含已有 id 的任務,它不會被添加。 - `removeOnComplete: boolean | number`-如果為`true`,當任務完成時移除任務。一個數字用來指定要保存的任務數。默認行為是將完成的工作保存在已完成的設置中。 - `removeOnFail: boolean | number`-如果為`true`,當所有嘗試失敗時移除任務。一個數字用來指定要保存的任務數。默認行為是將失敗的任務保存在已失敗的設置中。 - `stackTraceLimit: number`-限制在`stacktrace`中保存的堆棧跟蹤線。 這里是一些帶有任務選項的自定義任務示例。 要延遲任務的開始,使用`delay`配置屬性: ```typescript const job = await this.audioQueue.add( { foo: 'bar', }, { delay: 3000 } // 3 seconds delayed ); ``` 要從右端添加任務到隊列(以 LIFO(后進先出)處理任務),設置配置對象的`lifo`屬性為`true`。 ```typescript const job = await this.audioQueue.add( { foo: 'bar', }, { lifo: true } ); ``` 要優先一個任務,使用`priority`屬性。 ```typescript const job = await this.audioQueue.add( { foo: 'bar', }, { priority: 2 } ); ``` ### 消費者 消費者是一個類,定義的方法要么處理添加到隊列中的任務,要么監聽隊列的事件,或者兩者皆有。使用`@Processor()`裝飾器來定義消費者類,如下: ```typescript import { Processor } from '@nestjs/bull'; @Processor('audio') export class AudioConsumer {} ``` 裝飾器的字符串參數(例如,`audio`)是和類方法關聯的隊列名稱。 在消費者類中,使用`@Process()`裝飾器來裝飾任務處理者。 ```typescript import { Processor, Process } from '@nestjs/bull'; import { Job } from 'bull'; @Processor('audio') export class AudioConsumer { @Process() async transcode(job: Job<unknown>) { let progress = 0; for (i = 0; i < 100; i++) { await doSomething(job.data); progress += 10; job.progress(progress); } return {}; } } ``` 裝飾器方法(例如`transcode()`) 在工作空閑或者隊列中有消息要處理的時候被調用。該處理器方法接受`job`對象作為其僅有的參數。處理器方法的返回值被保存在任務對象中,可以在之后被訪問,例如,在用于完成事件的監聽者中。 `Job`對象有多個方法,允許你和他們的狀態交互。例如,上述代碼使用`progress()`方法來更新工作進程。查看[這里](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#job)以了解完整的`Job`對象 API 參照。 你可以指定一個任務處理方法,僅處理指定類型(包含特定`name`的任務)的任務,這可以通過如下所述的將`name`傳遞給`@Process()`裝飾器完成。你在一個給定消費者類中可以有多個`@Process()`處理器,以反應每個任務類型(`name`),確保每個`name`有相應的處理者。 ```typescript @Process('transcode') async transcode(job: Job<unknown>) { ... } ``` ### 事件監聽者 當隊列和/或任務狀態改變時,`Bull`生成一個有用的事件集合。Nest 提供了一個裝飾器集合,允許訂閱一系列標準核心事件集合。他們從`@nestjs/bull`包中導出。 事件監聽者必須在一個[消費者](https://docs.nestjs.com/techniques/queues#consumers)類中聲明(通過`@Processor()`裝飾器)。要監聽一個事件,使用如下表格之一的裝飾器來聲明一個事件處理器。例如,當一個任務進入`audio`隊列活躍狀態時,要監聽其發射的事件,使用下列結構: ```typescript import { Processor, Process } from '@nestjs/bull'; import { Job } from 'bull'; @Processor('audio') export class AudioConsumer { @OnQueueActive() onActive(job: Job) { console.log( `Processing job ${job.id} of type ${job.name} with data ${job.data}...`, ); } ``` 鑒于 BUll 運行于分布式(多 node)環境,它定義了本地事件概念。該概念可以辨識出一個由完整的單一進程觸發的事件,或者由不同進程共享的隊列。一個本地事件是指在本地進程中觸發的一個隊列行為或者狀態變更。換句話說,當你的事件生產者和消費者是本地單進程時,隊列中所有事件都是本地的。 當一個隊列在多個進程中共享時,我們可能要遇到全局事件。對一個由其他進程觸發的事件通知器進程的監聽者來說,它必須注冊為全局事件。 當相應事件發射時事件處理器被喚醒。該處理器被下表所示的簽名調用,提供訪問事件相關的信息。我們討論下面簽名中本地和全局事件處理器。 | 本地事件監聽者 | 全局事件監聽者 | 處理器方法簽名/當觸發時 | | ------------------- | ------------------------- | ----------------------------------------------------------------------------------------------------------| | @OnQueueError() | @OnGlobalQueueError() | handler(error: Error) - 當錯誤發生時,`error`包括觸發錯誤 | | @OnQueueWaiting() | @OnGlobalQueueWaiting() | handler(jobId: number \| string) - 一旦工作者空閑就等待執行的任務,`jobId`包括進入此狀態的 id | | @OnQueueActive() | @OnGlobalQueueActive() | handler(job: Job)-`job`任務已啟動 | | @OnQueueStalled() | @OnGlobalQueueStalled() | handler(job: Job)-`job`任務被標記為延遲。這在時間循環崩潰或暫停時進行調試工作時是很有效的 | | @OnQueueProgress() | @OnGlobalQueueProgress() | handler(job: Job, progress: number)-`job`任務進程被更新為`progress`值 | | @OnQueueCompleted() | @OnGlobalQueueCompleted() | handler(job: Job, result: any) `job`任務進程成功以`result`結束 | | @OnQueueFailed() | @OnGlobalQueueFailed() | handler(job: Job, err: Error)`job`任務以`err`原因失敗 | | @OnQueuePaused() | @OnGlobalQueuePaused() | handler()隊列被暫停 | | @OnQueueResumed() | @OnGlobalQueueResumed() | handler(job: Job)隊列被恢復 | | @OnQueueCleaned() | @OnGlobalQueueCleaned() | handler(jobs: Job[], type: string) 舊任務從隊列中被清理,`job`是一個清理任務數組,`type`是要清理的任務類型 | | @OnQueueDrained() | @OnGlobalQueueDrained() | handler()在隊列處理完所有等待的任務(除非有些尚未處理的任務被延遲)時發射出 | | @OnQueueRemoved() | @OnGlobalQueueRemoved() | handler(job: Job)`job`任務被成功移除 | 當監聽全局事件時,簽名方法可能和本地有一點不同。特別地,本地版本的任何方法簽名接受`job`對象的方法簽名而不是全局版本的`jobId(number)`。要在這種情況下獲取實際的`job`對象的引用,使用`Queue#getJob`方法。這種調用可能需要等待,因此處理者應該被聲明為`async`,例如: ```typescript @OnGlobalQueueCompleted() async onGlobalCompleted(jobId: number, result: any) { const job = await this.immediateQueue.getJob(jobId); console.log('(Global) on completed: job ', job.id, ' -> result: ', result); } ``` > 要獲取一個`Queue`對象(使用`getJob()`調用),你當然必須注入它。同時,隊列必須注冊到你要注入的模塊中。 在特定事件監聽器裝飾器之外,你可以使用通用的`@OnQueueEvent()`裝飾器與`BullQueueEvents`或者`BullQueueGlobalEvents`枚舉相結合。在[這里](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#events)閱讀更多有關事件的內容。 ### 隊列管理 隊列有一個 API 來實現管理功能比如暫停、恢復、檢索不同狀態的任務數量等。你可以在[這里](https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queue)找到完整的隊列 API。直接在`Queue`對象上調用這些方法,如下所示的暫停/恢復示例。 使用`pause()`方法調用來暫停隊列。一個暫停的隊列在恢復前將不會處理新的任務,但會繼續處理完當前執行的任務。 ```typescript await audioQueue.pause(); ``` 要恢復一個暫停的隊列,使用`resume()`方法,如下: ```typescript await audioQueue.resume(); ``` ## 分離進程[#](#separate-processes) 作業處理程序也可以在單獨的(分叉的)進程([source](https://github.com/OptimalBits/bull#separate-processes))中運行。這有幾個優點: * 該進程是沙盒化的,因此如果它崩潰,它不會影響工作人員。 * 您可以在不影響隊列的情況下運行阻塞代碼(作業不會停止)。 * 更好地利用多核 CPU。 * 與 redis 的連接更少。 >app.module.ts ~~~ts import { Module } from '@nestjs/common'; import { BullModule } from '@nestjs/bull'; import { join } from 'path'; @Module({ imports: [ BullModule.registerQueue({ name: 'audio', processors: [join(__dirname, 'processor.js')], }), ], }) export class AppModule {} ~~~ 請注意,由于您的函數是在分叉進程中執行的,因此依賴注入(和 IoC 容器)將不可用。這意味著您的處理器函數將需要包含(或創建)它所需的所有外部依賴項實例。 >processor.ts ~~~ts import { Job, DoneCallback } from 'bull'; export default function (job: Job, cb: DoneCallback) { console.log(`[${process.pid}] ${JSON.stringify(job.data)}`); cb(null, 'It works'); } ~~~ ### 異步配置 你可能需要異步而不是靜態傳遞隊列選項。在這種情況下,使用`registerQueueAsync()`方法,可以提供不同的異步配置方法。 一個方法是使用工廠函數: ```typescript BullModule.registerQueueAsync({ name: 'audio', useFactory: () => ({ redis: { host: 'localhost', port: 6379, }, }), }); ``` 我們的工廠函數方法和其他[異步提供者](https://docs.nestjs.com/fundamentals/async-providers)(它可以是`async`的并可以使用`inject`來注入)方法相同。 ```typescript BullModule.registerQueueAsync({ name: 'audio', imports: [ConfigModule], useFactory: async (configService: ConfigService) => ({ redis: { host: configService.get('QUEUE_HOST'), port: +configService.get('QUEUE_PORT'), }, }), inject: [ConfigService], }); ``` 可選的,你可以使用`useClass`語法。 ```typescript BullModule.registerQueueAsync({ name: 'audio', useClass: BullConfigService, }); ``` 上述結構在`BullModule`中實例化`BullConfigService`,并通過調用`createBullOptions()`來用它提供一個選項對象。注意這意味著`BullConfigService`要實現`BullOptionsFactory`工廠接口,如下: ```typescript @Injectable() class BullConfigService implements BullOptionsFactory { createBullOptions(): BullModuleOptions { return { redis: { host: 'localhost', port: 6379, }, }; } } ``` 要阻止在`BullModule`中創建`BullConfigService`并使用一個從其他模塊導入的提供者,可以使用`useExisting`語法。 ```typescript BullModule.registerQueueAsync({ name: 'audio', imports: [ConfigModule], useExisting: ConfigService, }); ``` 這個結構和`useClass`有一個根本區別——`BullModule`將查找導入的模塊來重用現有的`ConfigServie`而不是實例化一個新的。 ### 示例 一個可用的示例見[這里](https://github.com/nestjs/nest/tree/master/sample/26-queues)。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看