## 自定義傳輸器
Nest提供了一系列開箱即用的傳輸器,也提供了允許用戶自定義傳輸策略的API接口。傳輸器允許你使用可插拔的通訊層和非常簡單的應用層消息協議通過網絡連接組件。(閱讀[全文](https://dev.to/nestjs/integrate-nestjs-with-external-services-using-microservice-transporters-part-1-p3))
> 不一定非要使用`@nestjs/microservices`包才能創建微服務,例如,如果需要和外部服務通訊 (假設為其他語言編寫的其他微服務),你可能不需要 `@nestjs/microservice `提供的全部功能。實際上,如果你不需要裝飾器(`@EventPattern`或者`@MessagePattern`)來定義訂閱者,運行一個獨立的應用并且手動維護連接/訂閱頻道可能會提供更高的靈活性。
使用自定義傳輸器,你可以集成任何消息系統/協議(包括`Google Cloud Pub/Sub`, `Amazon Kinesis`等等)或者已有的外部系統,在頂部添加額外的特性(例如用于MQTT的QoS)。
> 要更好地理解Nest微服務的工作模式以及如何擴展現有傳輸器,推薦閱讀 [NestJS Microservices in Action](https://dev.to/johnbiundo/series/4724) 和 [Advanced NestJS Microservices](https://dev.to/nestjs/part-1-introduction-and-setup-1a2l) 系列文章。
### 創建策略
首先定義一個代表自定義傳輸器的類。
```typescript
import { CustomTransportStrategy, Server } from '@nestjs/microservices';
class GoogleCloudPubSubServer
extends Server
implements CustomTransportStrategy {
/**
* This method is triggered when you run "app.listen()".
*/
listen(callback: () => void) {
callback();
}
/**
* This method is triggered on application shutdown.
*/
close() {}
}
```
> 在這里不會實現一個完整的谷歌云訂閱服務器,因為這需要更多更深入的傳輸器細節。
在這個例子中,聲明了`GoogleCloudPubSubServer`類,提供`listen()`和`close()` 方法,并由`CustomTransportStrategy`接口進行限制。此外,我們的類擴展了從`@nestjs/microservices`包導入的`Server`類,來提供一些有用的方法。例如,提供Nest運行時注冊消息處理程序的方法。可選的,如果要擴展傳輸器策略,也可以擴展相應的服務器。例如,`ServerRedis`。一般來說,我們在類前面添加`Server`前綴來表示該類用于處理訂閱消息事件(并在必要時進行響應)。
這樣就可以自定義一個傳輸器而不是使用內置的。
```typescript
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
strategy: new GoogleCloudPubSubServer(),
},
);
```
和常規的傳輸器選項對象的`transport`和`options`屬性不同,我們傳遞一個屬性 `strategy`, 其值是我們自定義傳輸器類的一個實例。
回到我們的`GoogleCloudPubSubServer`類,在真實的應用中,我們可以在`listen()`方法(以及之后移除訂閱和監聽的`close()`方法)中指定訂閱/監聽特定頻道來和代理/外部服務建立連接。但由于這需要對Nest微服務通訊原理有深入理解,我們建議閱讀[這一系列文章](https://dev.to/nestjs/part-1-introduction-and-setup-1a2l)。在本章這里,我們重點介紹服務器類的能力以及如何暴露它們來建立自定義策略。
例如,在我們程序的某個部分,定義了以下處理程序:
```typescript
@MessagePattern('echo')
echo(@Payload() data: object) {
return data;
}
```
該消息處理程序可以自動在Nest運行時注冊。通過服務器類,可以看到被注冊的消息類型是哪種,也可以接收和執行分配給它們的實際方法。要測試這個,我們在`listen()`中添加一個簡單的`console.log`方法,并在回調函數前執行:
```typescript
listen(callback: () => void) {
console.log(this.messageHandlers);
callback();
}
```
程序重啟后,可以看到終端中以下記錄:
```typescript
Map { 'echo' => [AsyncFunction] { isEventHandler: false } }
```
> 如果我們使用`@EventPattern`裝飾器,你可以看到同樣的輸出,但是`isEventHandler`屬性被設置為`true`。
如你所見,`messageHandlers`屬性是一個所有消息(和事件)處理程序的`Map`集合。在這里,模式被用作鍵名,你可以使用一個鍵(例如`echo`)來接收一個消息處理程序的引用:
```typescript
async listen(callback: () => void) {
const echoHandler = this.messageHandlers.get('echo');
console.log(await echoHandler('Hello world!'));
callback();
}
```
一旦我們執行了`echoHandler`,并傳遞任意字符串作為參數(在這里是"Hello world!"), 可以看到以下輸出:
```bash
Hello world!
```
這意味著消息處理程序正確執行了。
### 客戶代理
如第一部分介紹的,你不一定需要`@nestjs/microservices`包來創建微服務,但是如果你這樣做,那么需要集成一個自定義策略,你還需要提供一個`client`類。
?> 類似地,要實現完全兼容所有的`@nestjs/microservices`特性(例如`streaming`)需要對框架的通訊機制有深入理解。閱讀[本文](https://dev.to/nestjs/part-4-basic-client-component-16f9)了解更多。
要和外部服務/發射與發布消息(或者事件)通訊,你可以使用一個特定庫的SDK包,或者一個擴展了`ClientProxy`的自定義的客戶端類,如下:
```typescript
import { ClientProxy, ReadPacket, WritePacket } from '@nestjs/microservices';
class GoogleCloudPubSubClient extends ClientProxy {
async connect(): Promise<any> {}
async close() {}
async dispatchEvent(packet: ReadPacket<any>): Promise<any> {}
publish(
packet: ReadPacket<any>,
callback: (packet: WritePacket<any>) => void,
): Function {}
}
```
> 注意,在這里我們不會實現一個完整的google云發布/訂閱客戶端,因為這需要對傳輸者技術深入理解。
如你所見,`ClientProxy`需要我們提供幾個方法來建立和關閉連接,以及發布消息(`publish`)和事件(`dispatchEvent`)。注意,如果你不需要支持請求-響應的通訊風格,可以保持`publish()`方法空白。類似地,如果你不需要支持基于事件的通訊,跳過`dispatchEvent()`方法。
要觀察何時何地執行了哪些方法,如下添加多個`console.log`方法:
```typescript
class GoogleCloudPubSubClient extends ClientProxy {
async connect(): Promise<any> {
console.log('connect');
}
async close() {
console.log('close');
}
async dispatchEvent(packet: ReadPacket<any>): Promise<any> {
return console.log('event to dispatch: ', packet);
}
publish(
packet: ReadPacket<any>,
callback: (packet: WritePacket<any>) => void,
): Function {
console.log('message:', packet);
// In a real-world application, the "callback" function should be executed
// with payload sent back from the responder. Here, we'll simply simulate (5 seconds delay)
// that response came through by passing the same "data" as we've originally passed in.
setTimeout(() => callback({ response: packet.data }), 5000);
return () => console.log('teardown');
}
}
```
創建一個 `GoogleCloudPubSubClient `類并運行`send()`方法(參見前節),注冊和返回一個可觀察流。
```typescript
const googlePubSubClient = new GoogleCloudPubSubClient();
googlePubSubClient
.send('pattern', 'Hello world!')
.subscribe((response) => console.log(response));
```
在終端可以看到如下輸出:
```bash
connect
message: { pattern: 'pattern', data: 'Hello world!' }
Hello world! // <-- after 5 seconds
```
要測試"teardown"方法(由`publish()`方法返回)正確執行,我們在流中添加一個超時操作,設置超時時間為2秒以保證其早于`setTimeout`調用回調函數。
```typescript
const googlePubSubClient = new GoogleCloudPubSubClient();
googlePubSubClient
.send('pattern', 'Hello world!')
.pipe(timeout(2000))
.subscribe(
(response) => console.log(response),
(error) => console.error(error.message),
);
```
> `timeout`操作符從`rxjs/operators`包中導入。
應用`timeout`操作符,終端看上去類似如下:
```bash
connect
message: { pattern: 'pattern', data: 'Hello world!' }
teardown // <-- teardown
Timeout has occurred
```
要分派一個事件(代替消息),使用`emit()`方法:
```typescript
googlePubSubClient.emit('event', 'Hello world!');
```
終端看上去如下:
```bash
connect
event to dispatch: { pattern: 'event', data: 'Hello world!' }
```
### 消息序列化
如果您需要圍繞客戶端的響應序列化添加一些自定義邏輯,您可以使用擴展 `ClientProxy` 類或其子類之一的自定義類。 要修改成功的請求,您可以覆蓋 `serializeResponse` 方法,并且要修改通過此客戶端的任何錯誤,您可以覆蓋 `serializeError` 方法。 要使用此自定義類,您可以使用 `customClass `屬性將類本身傳遞給 `ClientsModule.register()` 方法。 下面是一個將每個錯誤序列化為 `RpcException` 的自定義 `ClientProxy` 示例。
>error-handling.proxy.ts
~~~typescript
import { ClientTcp, RpcException } from '@nestjs/microservices';
class ErrorHandlingProxy extends ClientTCP {
serializeError(err: Error) {
return new RpcException(err);
}
}
~~~
然后像這樣在 `ClientsModule` 中使用它:
>app.module.ts
~~~typescript
@Module({
imports: [
ClientsModule.register({
name: 'CustomProxy',
customClass: ErrorHandlingProxy,
}),
]
})
export class AppModule
~~~
> **暗示** :這是傳遞給 `customClass` 的類本身,而不是類的實例。 Nest 將在后臺為您創建實例,并將給選項屬性的任何選項傳遞給新的 `ClientProxy`。
- 介紹
- 概述
- 第一步
- 控制器
- 提供者
- 模塊
- 中間件
- 異常過濾器
- 管道
- 守衛
- 攔截器
- 自定義裝飾器
- 基礎知識
- 自定義提供者
- 異步提供者
- 動態模塊
- 注入作用域
- 循環依賴
- 模塊參考
- 懶加載模塊
- 應用上下文
- 生命周期事件
- 跨平臺
- 測試
- 技術
- 數據庫
- Mongo
- 配置
- 驗證
- 緩存
- 序列化
- 版本控制
- 定時任務
- 隊列
- 日志
- Cookies
- 事件
- 壓縮
- 文件上傳
- 流式處理文件
- HTTP模塊
- Session(會話)
- MVC
- 性能(Fastify)
- 服務器端事件發送
- 安全
- 認證(Authentication)
- 授權(Authorization)
- 加密和散列
- Helmet
- CORS(跨域請求)
- CSRF保護
- 限速
- GraphQL
- 快速開始
- 解析器(resolvers)
- 變更(Mutations)
- 訂閱(Subscriptions)
- 標量(Scalars)
- 指令(directives)
- 接口(Interfaces)
- 聯合類型
- 枚舉(Enums)
- 字段中間件
- 映射類型
- 插件
- 復雜性
- 擴展
- CLI插件
- 生成SDL
- 其他功能
- 聯合服務
- 遷移指南
- Websocket
- 網關
- 異常過濾器
- 管道
- 守衛
- 攔截器
- 適配器
- 微服務
- 概述
- Redis
- MQTT
- NATS
- RabbitMQ
- Kafka
- gRPC
- 自定義傳輸器
- 異常過濾器
- 管道
- 守衛
- 攔截器
- 獨立應用
- Cli
- 概述
- 工作空間
- 庫
- 用法
- 腳本
- Openapi
- 介紹
- 類型和參數
- 操作
- 安全
- 映射類型
- 裝飾器
- CLI插件
- 其他特性
- 遷移指南
- 秘籍
- CRUD 生成器
- 熱重載
- MikroORM
- TypeORM
- Mongoose
- 序列化
- 路由模塊
- Swagger
- 健康檢查
- CQRS
- 文檔
- Prisma
- 靜態服務
- Nest Commander
- 問答
- Serverless
- HTTP 適配器
- 全局路由前綴
- 混合應用
- HTTPS 和多服務器
- 請求生命周期
- 常見錯誤
- 實例
- 遷移指南
- 發現
- 誰在使用Nest?