<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                ## kafka [Kafka](https://kafka.apache.org/) 是一個由Apache軟件基金會開源的一個高吞吐量的分布式流處理平臺, 它具有三個關鍵特性: - 可以允許你發布和訂閱消息流。 - 可以以容錯的方式記錄消息流。 - 可以在消息流記錄產生時就進行處理。 Kafka 致力于提供一個處理實時數據的統一 、高吞吐量、 低延時的平臺。 它在處理實時數據分析時可以與Apache Storm 和 Spark完美集成。 **Kafka 傳輸器是實驗性的.** ### 安裝 要開始構建基于Kafka的微服務首先需要安裝所需的依賴: ```bash $ npm i --save kafkajs ``` ### 概述 類似其他微服務傳輸器層的實現,要使用kafka傳輸器機制,你需要像下面的示例一樣給`createMicroservice()`方法傳遞指定傳輸器`transport`屬性和可選的`options`屬性。 > main.ts ```typescript const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, { transport: Transport.KAFKA, options: { client: { brokers: ['localhost:9092'], } } }); ``` > `Transport`枚舉 需要從 `@nestjs/microservices` 包導入。 ### 選項 `options`對象和選擇的傳輸器有關,`Kafka`傳輸器暴露了一些屬性: -| - ---|--- client|客戶端配置選項([參見這里](https://kafka.js.org/docs/configuration)) consumer|消費者配置選項([參見這里](https://kafka.js.org/docs/consuming#a-name-options-a-options)) run|運行配置選項([參見這里](https://kafka.js.org/docs/consuming)) subscribe|訂閱配置選項([參見這里](https://kafka.js.org/docs/consuming#frombeginning)) producer|生產者配置選項([參見這里](https://kafka.js.org/docs/producing#options)) send|發送配置選項([參見這里](https://kafka.js.org/docs/producing#options)) ### 客戶端 `Kafka`和其他微服務傳送器有一點不同的是,我們需要用`ClientKafka`類替換`ClientProxy` 類。 像其他微服務一樣,創建`ClientKafka`實例也有幾個可[選項](https://docs.nestjs.com/microservices/basics#client)。 一種方式創建客戶端實例我們需要用到`ClientsModule`方法。 為了通過`ClientsModule`創建客戶端實例,導入`register()` 方法并且傳遞一個和上面`createMicroservice()`方法一樣的對象以及一個`name`屬性,它將被注入為token。了解更多關于[ClientsModule](https://docs.nestjs.com/microservices/basics#client)。 ```typescript @Module({ imports: [ ClientsModule.register([ { name: 'HERO_SERVICE', transport: Transport.KAFKA, options: { client: { clientId: 'hero', brokers: ['localhost:9092'], }, consumer: { groupId: 'hero-consumer' } } }, ]), ] ... }) ``` 另一種方式建立客戶端 ( `ClientProxyFactory`或者`@Client()`) 也可以正常使用。 為了創建客戶端實例,我們需要使用 `@Client()` 裝飾器。 ```typescript @Client({ transport: Transport.KAFKA, options: { client: { clientId: 'hero', brokers: ['localhost:9092'], }, consumer: { groupId: 'hero-consumer' } } }) client: ClientKafka; ``` ### 消息模式 `Kafka`消息模式利用兩個主題來請求和答復通道。`ClientKafka#send()`方法通過關聯[相關ID](https://www.enterpriseintegrationpatterns.com/patterns/messaging/CorrelationIdentifier.html)發送帶有[返回地址](https://www.enterpriseintegrationpatterns.com/patterns/messaging/ReturnAddress.html)的消息,答復主題,帶有請求信息的答復分區。 這要求在發送消息之前,`ClientKafka`實例需要訂閱答復主題并至少分配一個分區。 隨后,您需要為每個運行的Nest應用程序至少有一個答復主題分區。例如,如果您正在運行4個Nest應用程序,但是答復主題只有3個分區,則嘗試發送消息時,其中1個Nest應用程序將會報錯。 當啟動新的`ClientKafka`實例時,它們將加入消費者組并訂閱各自的主題。此過程觸發一個主題分區的再平衡并分配給消費者組中的消費者。 通常,主題分區是使用循環分區程序分配的,該程序將主題分區分配給按消費者名稱排序的消費者集合,消費者名稱是在應用程序啟動時隨機設置的。但是,當新消費者加入該消費者組時,該新消費者可以位于消費者集合中的任何位置。這就創造了這樣一種條件,即當現有消費者位于新消費者之后時,可以為現有消費者分配不同的分區。結果,分配了不同分區的消費者將丟失重新平衡之前發送的請求的響應消息。 為了防止`ClientKafka`使用者丟失響應消息,使用了Nest特定的內置自定義分區程序。這個自定義分區程序將分區分配給一組消費者,這些消費者按照在應用程序啟動時設置的高精度的時間戳(`process.hrtime()`)進行排序。 ### 消息訂閱響應 `ClientKafka`類提供了一個`subscribeToResponseOf()`方法,該方法會將獲取請求的主題名稱作為參數并將派生的答復主題加入到答復主題的集合中。這個函數在執行消息模式時是必須的。 >heroes.controller.ts ```typescript onModuleInit() { this.client.subscribeToResponseOf('hero.kill.dragon'); } ``` 如果`ClientKafka` 實例是異步創建的, `subscribeToResponseOf()`函數必須在`connect()`函數之前被調用。 >heros.controller.ts ```typescript async onModuleInit() { this.client.subscribeToResponseOf('hero.kill.dragon'); await this.client.connect(); } ``` ### 傳入(Incoming) Nest將會接收傳入的`Kafka`消息作為具有鍵,值和頭屬性(其值為Buffer類型)的對象。然后,Nest通過`Buffer`轉換為字符串來解析這些值。如果字符串是可被序列化的,Nest會把字符串解析為`JSON`并將該值傳遞到其關聯的處理程序。 ### 傳出(Outgoing) 在發布事件或發送消息時,Nest將在序列化過程之后發送傳出的`Kafka`消息。這發生在傳遞給`ClientKafka`的`emit()`和`send()`方法的參數上,或從`@MessagePattern`方法的返回值上。該序列化通過使用`JSON.stringify()`或`toString()`原型方法來“字符串化”不是字符串或緩沖區的對象。 >heroes.controller.ts ```typescript @Controller() export class HeroesController { @MessagePattern('hero.kill.dragon') killDragon(@Payload() message: KillDragonMessage): any { const dragonId = message.dragonId; const items = [ { id: 1, name: 'Mythical Sword' }, { id: 2, name: 'Key to Dungeon' }, ]; return items; } } ``` > `@Payload()` 需要從 `@nestjs/microservices` 中導入. 傳出的消息也可以通過傳遞帶有`key`和`value`屬性的對象來鍵入。密鑰消息對于滿足[共同分區要求](https://docs.confluent.io/current/ksql/docs/developer-guide/partition-data.html#co-partitioning-requirements)很重要。 >heroes.controller.ts ```typescript @Controller() export class HeroesController { @MessagePattern('hero.kill.dragon') killDragon(@Payload() message: KillDragonMessage): any { const realm = 'Nest'; const heroId = message.heroId; const dragonId = message.dragonId; const items = [ { id: 1, name: 'Mythical Sword' }, { id: 2, name: 'Key to Dungeon' }, ]; return { headers: { realm }, key: heroId, value: items } } } ``` 此外,以這種格式傳遞的消息還可以包含在自定義頭中設置`headers`哈希屬性值。 `headers`哈希屬性值必須為`string`類型或`buffer`類型。 >heroes.controller.ts ```typescript @Controller() export class HeroesController { @MessagePattern('hero.kill.dragon') killDragon(@Payload() message: KillDragonMessage): any { const realm = 'Nest'; const heroId = message.heroId; const dragonId = message.dragonId; const items = [ { id: 1, name: 'Mythical Sword' }, { id: 2, name: 'Key to Dungeon' }, ]; return { headers: { kafka_nestRealm: realm }, key: heroId, value: items } } } ``` ### 基于事件 雖然 `request-response` 方法非常適合在服務之間交換消息,但當您的消息樣式是基于事件的(這又是 Kafka 的理想選擇)時,它不太適合 - 當您只想發布事件而不等待響應時。 在這種情況下,您不希望請求-響應所需的開銷來維護兩個主題。 查看這兩個部分以了解更多信息:[概述:基于事件](https://docs.nestjs.com/microservices/basics#event-based)和[概述:發布事件](https://docs.nestjs.com/microservices/basics#publishing-events)。 ### 上下文 在更復雜的方案中,您可能需要訪問有關傳入請求的更多信息。 使用Kafka傳輸器時,您可以訪問`KafkaContext`對象。 ```typescript @MessagePattern('hero.kill.dragon') killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) { console.log(`Topic: ${context.getTopic()}`); } ``` >`@Payload()`, `@Ctx()` 和 `KafkaContext` 需要從 `@nestjs/microservices` 包導入. 為了訪問`Kafka`原生的 `IncomingMessage`對象,需要像下面的示例一樣使用`KafkaContext`的`getMessage()`方法。 ```typescript @MessagePattern('hero.kill.dragon') killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) { const originalMessage = context.getMessage(); const { headers, partition, timestamp } = originalMessage; } ``` `IncomingMessage`實現了以下的接口: ```typescript interface IncomingMessage { topic: string; partition: number; timestamp: string; size: number; attributes: number; offset: string; key: any; value: any; headers: Record<string, any>; } ``` ### 命名約定 `Kafka`微服務組件將其各自角色的描述附加到`client.clientId`和`consumer.groupId`選項上,以防止Nest微服務客戶端和服務器組件之間發生沖突。默認情況下,`ClientKafka`組件和`ServerKafka`組件將各自分別附加`-client`和`-server`到各自的選項中。請注意下面提供的值如何以這種方式轉換(如注釋中所示)。 >main.ts ```typescript const app = await NestFactory.createMicroservice(AppModule, { transport: Transport.KAFKA, options: { client: { clientId: 'hero', // hero-server brokers: ['localhost:9092'], }, consumer: { groupId: 'hero-consumer' // hero-consumer-server }, } }); ``` 對于客戶端: >heroes.controller.ts ```typescript @Client({ transport: Transport.KAFKA, options: { client: { clientId: 'hero', // hero-client brokers: ['localhost:9092'], }, consumer: { groupId: 'hero-consumer' // hero-consumer-client } } }) client: ClientKafka; ``` > 可以通過在您自己的自定義的提供者中擴展`ClientKafka`和`KafkaServer`并通過覆蓋構造函數來自定義`Kafka`客戶端口和使用者命名約定。 由于`Kafka`微服務的消息模式將兩個主題用于請求和回復通道,因此應從請求主題中獲得一個回復模式。默認情況下,回復主題的名稱是請求主題名稱和`.reply`的組合。 >heroes.controller.ts ```typescript onModuleInit() { this.client.subscribeToResponseOf('hero.get'); // hero.get.reply } ``` > 可以通過在您自己的自定義的提供者中擴展`ClientKafka`并通過覆蓋`getResponsePatternName`方法來自定義`Kafka`答復主題的命名約定。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看