## kafka
[Kafka](https://kafka.apache.org/) 是一個由Apache軟件基金會開源的一個高吞吐量的分布式流處理平臺, 它具有三個關鍵特性:
- 可以允許你發布和訂閱消息流。
- 可以以容錯的方式記錄消息流。
- 可以在消息流記錄產生時就進行處理。
Kafka 致力于提供一個處理實時數據的統一 、高吞吐量、 低延時的平臺。 它在處理實時數據分析時可以與Apache Storm 和 Spark完美集成。
**Kafka 傳輸器是實驗性的.**
### 安裝
要開始構建基于Kafka的微服務首先需要安裝所需的依賴:
```bash
$ npm i --save kafkajs
```
### 概述
類似其他微服務傳輸器層的實現,要使用kafka傳輸器機制,你需要像下面的示例一樣給`createMicroservice()`方法傳遞指定傳輸器`transport`屬性和可選的`options`屬性。
> main.ts
```typescript
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
}
}
});
```
> `Transport`枚舉 需要從 `@nestjs/microservices` 包導入。
### 選項
`options`對象和選擇的傳輸器有關,`Kafka`傳輸器暴露了一些屬性:
-| -
---|---
client|客戶端配置選項([參見這里](https://kafka.js.org/docs/configuration))
consumer|消費者配置選項([參見這里](https://kafka.js.org/docs/consuming#a-name-options-a-options))
run|運行配置選項([參見這里](https://kafka.js.org/docs/consuming))
subscribe|訂閱配置選項([參見這里](https://kafka.js.org/docs/consuming#frombeginning))
producer|生產者配置選項([參見這里](https://kafka.js.org/docs/producing#options))
send|發送配置選項([參見這里](https://kafka.js.org/docs/producing#options))
### 客戶端
`Kafka`和其他微服務傳送器有一點不同的是,我們需要用`ClientKafka`類替換`ClientProxy` 類。
像其他微服務一樣,創建`ClientKafka`實例也有幾個可[選項](https://docs.nestjs.com/microservices/basics#client)。
一種方式創建客戶端實例我們需要用到`ClientsModule`方法。 為了通過`ClientsModule`創建客戶端實例,導入`register()` 方法并且傳遞一個和上面`createMicroservice()`方法一樣的對象以及一個`name`屬性,它將被注入為token。了解更多關于[ClientsModule](https://docs.nestjs.com/microservices/basics#client)。
```typescript
@Module({
imports: [
ClientsModule.register([
{
name: 'HERO_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer'
}
}
},
]),
]
...
})
```
另一種方式建立客戶端 ( `ClientProxyFactory`或者`@Client()`) 也可以正常使用。
為了創建客戶端實例,我們需要使用 `@Client()` 裝飾器。
```typescript
@Client({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer'
}
}
})
client: ClientKafka;
```
### 消息模式
`Kafka`消息模式利用兩個主題來請求和答復通道。`ClientKafka#send()`方法通過關聯[相關ID](https://www.enterpriseintegrationpatterns.com/patterns/messaging/CorrelationIdentifier.html)發送帶有[返回地址](https://www.enterpriseintegrationpatterns.com/patterns/messaging/ReturnAddress.html)的消息,答復主題,帶有請求信息的答復分區。
這要求在發送消息之前,`ClientKafka`實例需要訂閱答復主題并至少分配一個分區。
隨后,您需要為每個運行的Nest應用程序至少有一個答復主題分區。例如,如果您正在運行4個Nest應用程序,但是答復主題只有3個分區,則嘗試發送消息時,其中1個Nest應用程序將會報錯。
當啟動新的`ClientKafka`實例時,它們將加入消費者組并訂閱各自的主題。此過程觸發一個主題分區的再平衡并分配給消費者組中的消費者。
通常,主題分區是使用循環分區程序分配的,該程序將主題分區分配給按消費者名稱排序的消費者集合,消費者名稱是在應用程序啟動時隨機設置的。但是,當新消費者加入該消費者組時,該新消費者可以位于消費者集合中的任何位置。這就創造了這樣一種條件,即當現有消費者位于新消費者之后時,可以為現有消費者分配不同的分區。結果,分配了不同分區的消費者將丟失重新平衡之前發送的請求的響應消息。
為了防止`ClientKafka`使用者丟失響應消息,使用了Nest特定的內置自定義分區程序。這個自定義分區程序將分區分配給一組消費者,這些消費者按照在應用程序啟動時設置的高精度的時間戳(`process.hrtime()`)進行排序。
### 消息訂閱響應
`ClientKafka`類提供了一個`subscribeToResponseOf()`方法,該方法會將獲取請求的主題名稱作為參數并將派生的答復主題加入到答復主題的集合中。這個函數在執行消息模式時是必須的。
>heroes.controller.ts
```typescript
onModuleInit() {
this.client.subscribeToResponseOf('hero.kill.dragon');
}
```
如果`ClientKafka` 實例是異步創建的, `subscribeToResponseOf()`函數必須在`connect()`函數之前被調用。
>heros.controller.ts
```typescript
async onModuleInit() {
this.client.subscribeToResponseOf('hero.kill.dragon');
await this.client.connect();
}
```
### 傳入(Incoming)
Nest將會接收傳入的`Kafka`消息作為具有鍵,值和頭屬性(其值為Buffer類型)的對象。然后,Nest通過`Buffer`轉換為字符串來解析這些值。如果字符串是可被序列化的,Nest會把字符串解析為`JSON`并將該值傳遞到其關聯的處理程序。
### 傳出(Outgoing)
在發布事件或發送消息時,Nest將在序列化過程之后發送傳出的`Kafka`消息。這發生在傳遞給`ClientKafka`的`emit()`和`send()`方法的參數上,或從`@MessagePattern`方法的返回值上。該序列化通過使用`JSON.stringify()`或`toString()`原型方法來“字符串化”不是字符串或緩沖區的對象。
>heroes.controller.ts
```typescript
@Controller()
export class HeroesController {
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage): any {
const dragonId = message.dragonId;
const items = [
{ id: 1, name: 'Mythical Sword' },
{ id: 2, name: 'Key to Dungeon' },
];
return items;
}
}
```
> `@Payload()` 需要從 `@nestjs/microservices` 中導入.
傳出的消息也可以通過傳遞帶有`key`和`value`屬性的對象來鍵入。密鑰消息對于滿足[共同分區要求](https://docs.confluent.io/current/ksql/docs/developer-guide/partition-data.html#co-partitioning-requirements)很重要。
>heroes.controller.ts
```typescript
@Controller()
export class HeroesController {
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage): any {
const realm = 'Nest';
const heroId = message.heroId;
const dragonId = message.dragonId;
const items = [
{ id: 1, name: 'Mythical Sword' },
{ id: 2, name: 'Key to Dungeon' },
];
return {
headers: {
realm
},
key: heroId,
value: items
}
}
}
```
此外,以這種格式傳遞的消息還可以包含在自定義頭中設置`headers`哈希屬性值。 `headers`哈希屬性值必須為`string`類型或`buffer`類型。
>heroes.controller.ts
```typescript
@Controller()
export class HeroesController {
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage): any {
const realm = 'Nest';
const heroId = message.heroId;
const dragonId = message.dragonId;
const items = [
{ id: 1, name: 'Mythical Sword' },
{ id: 2, name: 'Key to Dungeon' },
];
return {
headers: {
kafka_nestRealm: realm
},
key: heroId,
value: items
}
}
}
```
### 基于事件
雖然 `request-response` 方法非常適合在服務之間交換消息,但當您的消息樣式是基于事件的(這又是 Kafka 的理想選擇)時,它不太適合 - 當您只想發布事件而不等待響應時。 在這種情況下,您不希望請求-響應所需的開銷來維護兩個主題。
查看這兩個部分以了解更多信息:[概述:基于事件](https://docs.nestjs.com/microservices/basics#event-based)和[概述:發布事件](https://docs.nestjs.com/microservices/basics#publishing-events)。
### 上下文
在更復雜的方案中,您可能需要訪問有關傳入請求的更多信息。 使用Kafka傳輸器時,您可以訪問`KafkaContext`對象。
```typescript
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
console.log(`Topic: ${context.getTopic()}`);
}
```
>`@Payload()`, `@Ctx()` 和 `KafkaContext` 需要從 `@nestjs/microservices` 包導入.
為了訪問`Kafka`原生的 `IncomingMessage`對象,需要像下面的示例一樣使用`KafkaContext`的`getMessage()`方法。
```typescript
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
const originalMessage = context.getMessage();
const { headers, partition, timestamp } = originalMessage;
}
```
`IncomingMessage`實現了以下的接口:
```typescript
interface IncomingMessage {
topic: string;
partition: number;
timestamp: string;
size: number;
attributes: number;
offset: string;
key: any;
value: any;
headers: Record<string, any>;
}
```
### 命名約定
`Kafka`微服務組件將其各自角色的描述附加到`client.clientId`和`consumer.groupId`選項上,以防止Nest微服務客戶端和服務器組件之間發生沖突。默認情況下,`ClientKafka`組件和`ServerKafka`組件將各自分別附加`-client`和`-server`到各自的選項中。請注意下面提供的值如何以這種方式轉換(如注釋中所示)。
>main.ts
```typescript
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero', // hero-server
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer' // hero-consumer-server
},
}
});
```
對于客戶端:
>heroes.controller.ts
```typescript
@Client({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero', // hero-client
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer' // hero-consumer-client
}
}
})
client: ClientKafka;
```
> 可以通過在您自己的自定義的提供者中擴展`ClientKafka`和`KafkaServer`并通過覆蓋構造函數來自定義`Kafka`客戶端口和使用者命名約定。
由于`Kafka`微服務的消息模式將兩個主題用于請求和回復通道,因此應從請求主題中獲得一個回復模式。默認情況下,回復主題的名稱是請求主題名稱和`.reply`的組合。
>heroes.controller.ts
```typescript
onModuleInit() {
this.client.subscribeToResponseOf('hero.get'); // hero.get.reply
}
```
> 可以通過在您自己的自定義的提供者中擴展`ClientKafka`并通過覆蓋`getResponsePatternName`方法來自定義`Kafka`答復主題的命名約定。
- 介紹
- 概述
- 第一步
- 控制器
- 提供者
- 模塊
- 中間件
- 異常過濾器
- 管道
- 守衛
- 攔截器
- 自定義裝飾器
- 基礎知識
- 自定義提供者
- 異步提供者
- 動態模塊
- 注入作用域
- 循環依賴
- 模塊參考
- 懶加載模塊
- 應用上下文
- 生命周期事件
- 跨平臺
- 測試
- 技術
- 數據庫
- Mongo
- 配置
- 驗證
- 緩存
- 序列化
- 版本控制
- 定時任務
- 隊列
- 日志
- Cookies
- 事件
- 壓縮
- 文件上傳
- 流式處理文件
- HTTP模塊
- Session(會話)
- MVC
- 性能(Fastify)
- 服務器端事件發送
- 安全
- 認證(Authentication)
- 授權(Authorization)
- 加密和散列
- Helmet
- CORS(跨域請求)
- CSRF保護
- 限速
- GraphQL
- 快速開始
- 解析器(resolvers)
- 變更(Mutations)
- 訂閱(Subscriptions)
- 標量(Scalars)
- 指令(directives)
- 接口(Interfaces)
- 聯合類型
- 枚舉(Enums)
- 字段中間件
- 映射類型
- 插件
- 復雜性
- 擴展
- CLI插件
- 生成SDL
- 其他功能
- 聯合服務
- 遷移指南
- Websocket
- 網關
- 異常過濾器
- 管道
- 守衛
- 攔截器
- 適配器
- 微服務
- 概述
- Redis
- MQTT
- NATS
- RabbitMQ
- Kafka
- gRPC
- 自定義傳輸器
- 異常過濾器
- 管道
- 守衛
- 攔截器
- 獨立應用
- Cli
- 概述
- 工作空間
- 庫
- 用法
- 腳本
- Openapi
- 介紹
- 類型和參數
- 操作
- 安全
- 映射類型
- 裝飾器
- CLI插件
- 其他特性
- 遷移指南
- 秘籍
- CRUD 生成器
- 熱重載
- MikroORM
- TypeORM
- Mongoose
- 序列化
- 路由模塊
- Swagger
- 健康檢查
- CQRS
- 文檔
- Prisma
- 靜態服務
- Nest Commander
- 問答
- Serverless
- HTTP 適配器
- 全局路由前綴
- 混合應用
- HTTPS 和多服務器
- 請求生命周期
- 常見錯誤
- 實例
- 遷移指南
- 發現
- 誰在使用Nest?