[TOC]
## WebSocket介紹
`WebSocket`協議,[RFC 6455](https://tools.ietf.org/html/rfc6455),提供了一種標準化的方法,通過單個`TCP`連接在客戶端和服務器之間建立全雙工、雙向的通信通道。它是一種不同于`HTTP`的`TCP`協議,但被設計在`HTTP`上工作,使用端口`80`和`443`,并允許重用現有的防火墻規則。
`WebSocket`交互從一個`HTTP`請求開始,該請求使用`HTTP Upgrade`頭進行升級,或者在本例中,切換到`WebSocket`協議。下面的例子展示了這樣的交互:
~~~
GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
//Upgrade請求頭
Upgrade: websocket
//通過Upgrade連接
Connection: Upgrade
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
~~~
而不是通常的`200`狀態碼,一個支持`WebSocket`的服務器返回類似如下的輸出:
~~~
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
~~~
成功握手之后,`HTTP`升級請求底層的`TCP`套接字將保持打開狀態,以便客戶機和服務器繼續發送和接收消息。
注意,如果`WebSocket`服務器運行在`web`服務器(例如`nginx`)后面,你可能需要配置它來將`WebSocket`升級請求傳遞給`WebSocket`服務器。同樣,如果應用程序運行在云環境中,請檢查云提供商有關`WebSocket`支持的說明。
## HTTP與WebSocket
盡管`WebSocket`被設計成與`HTTP`兼容,并且從`HTTP`請求開始,但重要的是要理解這兩種協議導致非常不同的體系結構和應用程序編程模型。
在`HTTP`和`REST`中,應用程序被建模為多個`url`。為了與應用程序交互,客戶端以請求-響應的方式訪問這些`url`。服務器根據`HTTP URL`、方法和頭將請求路由到適當的處理程序。
相反,在`WebSockets`中,通常只有一個`URL`用于初始連接。隨后,所有應用程序消息在同一`TCP`連接上流動。這指向一個完全不同的異步、事件驅動的消息傳遞體系結構。
`WebSocket`也是一種低級傳輸協議,與`HTTP`不同,它沒有規定消息內容的任何語義。這意味著,除非客戶端和服務器在消息語義上達成一致,否則無法路由或處理消息。
`WebSocket`客戶端和服務器可以通過`HTTP`握手請求的`Sec-WebSocket-Protocol`報頭來協商使用更高級別的消息傳遞協議(例如`STOMP`)。
## 何時使用WebSockets
`WebSockets`可以使網頁具有動態和交互性。然而,在許多情況下,`Ajax`和`HTTP`流或長輪詢的組合可以提供一個簡單而有效的解決方案。例如,新聞、郵件和社交源需要動態更新,但每隔幾分鐘更新一次可能完全沒有問題。另一方面,協作、游戲和金融應用需要更接近實時。
延遲本身并不是一個決定性因素。如果消息量相對較低(例如,監視網絡故障),`HTTP`流或輪詢可以提供有效的解決方案。低延遲、高頻率和高容量的結合是`WebSocket`的最佳使用情況。
在`Internet`上,你控制范圍之外的限制性代理可能會阻止`WebSocket`交互,要么因為它們沒有配置為傳遞`Upgrade`頭,要么因為它們關閉了看起來空閑的長時間連接。這意味著將`WebSocket`用于防火墻內的內部應用程序比用于面向公共的應用程序要簡單得多。
## WebSocket使用
`Spring`框架提供了一個`WebSocket API`,你可以使用它來編寫處理`WebSocket`消息的客戶端和服務器端應用程序。
### 服務端
要創建`WebSocket`服務器,你可以先創建一個`WebSocketHandler`。下面的例子展示了如何做到這一點:
~~~
public class MyWebSocketHandler implements WebSocketHandler {
private Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1L), Duration.ofSeconds(1L));
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.send(intervalFlux.map(item -> session.textMessage(item + "")))
.and(session.receive().doOnNext(msg -> {
String msgText = msg.getPayloadAsText();
System.out.println("收到客戶端消息:" + msgText);
}).then());
}
}
~~~
### 客戶端
`Spring WebFlux`提供了一個`WebSocketClient`抽象,實現了`Reactor Netty`、`Tomcat`、`Jetty`、`Undertow`和標準`Java`(即`JSR-356`)。
要啟動`WebSocket`會話,你可以創建一個客戶端的實例,并使用它的`execute`方法:
~~~
WebSocketClient client = new ReactorNettyWebSocketClient();
URI url = new URI("ws://localhost:8080/path");
client.execute(url,
session -> session.send(Mono.just(session.textMessage("hello world")))
.thenMany(session.receive().map(WebSocketMessage::getPayloadAsText).log())
.then())
.block();
~~~
服務端日志:

客戶端日志:

有些客戶端,比如`Jetty`,實現了`Lifecycle`,需要在使用它們之前停止和啟動。所有客戶端都有與底層`WebSocket`客戶端配置相關的構造函數選項。
### `WebSocketHandler`
`WebSocketHandler`的`handle`方法接受`WebSocketSession`并返回`Mono<Void>`來指示應用程序對會話的處理何時完成。會話通過兩個流處理,一個用于輸入消息,另一個用于輸出消息。下表描述了處理流的兩種方法:
| WebSocketSession方法 | 說明 |
| --- | --- |
| `Flux<WebSocketMessage> receive()` | 提供對輸入消息流的訪問,并在連接關閉時完成。 |
|`Mono<Void> send(Publisher<WebSocketMessage>)` | 獲取輸出消息的源,寫入消息,并返回一個`Mono<Void>`,該`Mono<Void>`在源完成且寫入完成時完成。|
`WebSocketHandler`必須將輸入和輸出流組合成一個統一的流,并返回一個`Mono<Void>`,以反映該流的完成。根據應用需求,統一流程在以下情況下完成:
* 輸入或輸出消息流完成。
* 輸入流完成(即連接關閉),而輸出流是無限的。
* 在選定的點,通過`WebSocketSession`的`close`方法。
當輸入和輸出消息流組合在一起時,不需要檢查連接是否打開,因為`Reactive streams`會發出結束活動的信號。輸出流接收到完成或錯誤信號,而輸出流接收到取消信號。
處理程序的最基本實現是處理輸入流。下面的例子展示了這樣一個實現:
~~~
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.receive()//訪問輸入消息流
.doOnNext(message -> {
// ...處理每條消息。
})
.concatMap(message -> {
// 執行使用消息內容的嵌套異步操作。 當接收完成時,返回一個Mono<Void>。
})
.then();//當接收完成時,返回一個Mono<Void>。
}
}
~~~
> 對于嵌套的異步操作,你可能需要在使用數據池緩沖區的底層服務器上調用`message.retain()(`例如`Netty`)。否則,數據緩沖區可能會在你有機會讀取數據之前被釋放。
下面的實現組合了輸入和輸出流:
~~~
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive()//處理輸入消息流
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
// 創建輸出消息,生成組合流。
.map(value -> session.textMessage("Echo " + value));
//返回一個Mono<Void>,當繼續接收時它不會完成。
return session.send(output);
}
}
~~~
輸入流和輸出流可以是獨立的,并且只能在完成時進行連接,如下面的示例所示:
~~~
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Mono<Void> input = session.receive()//處理輸入消息
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.then();
Flux<String> source = ... ;
//發送消息
Mono<Void> output = session.send(source.map(session::textMessage));
//加入兩個流并返回一個Mono<Void>,該函數在任意一個流結束時結束。
return Mono.zip(input, output).then();
}
}
~~~
### 握手
`WebSocketHandlerAdapter`委托給`WebSocketService`。默認情況下,它是一個`HandshakeWebSocketService`的實例,它對`WebSocket`請求執行基本的檢查,然后對正在使用的服務器使用`RequestUpgradeStrategy`。目前,它內置了對`Reactor Netty`、`Tomcat`、`Jetty`和`Undertow`的支持。
`HandshakeWebSocketService`公開了一個`sessionAttributePredicate`屬性,該屬性允許設置`Predicate`來從`WebSession`中提取屬性,并將它們插入到`WebSocketSession`的屬性中。
~~~
@Bean
public WebSocketService webSocketService() {
ReactorNettyRequestUpgradeStrategy strategy=new ReactorNettyRequestUpgradeStrategy();
HandshakeWebSocketService handshakeWebSocketService = new HandshakeWebSocketService(strategy);
handshakeWebSocketService.setSessionAttributePredicate(Predicates.isTrue());
return handshakeWebSocketService;
}
~~~
### 服務端配置
每個服務器的`RequestUpgradeStrategy`公開了特定于底層`WebSocket`服務器引擎的配置。
~~~
@Configuration
class WebConfig {
@Bean
public WebSocketService webSocketService() {
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
strategy.setMaxSessionIdleTimeout(0L);
return new HandshakeWebSocketService(strategy);
}
}
~~~
檢查服務器的`upgrade`策略,看看有哪些可用選項。目前,只有`Tomcat`和`Jetty`公開了這些選項。
### 跨域
配置`CORS`和限制對`WebSocket`端點訪問的最簡單的方法是讓你的`WebSocketHandler`實現`CorsConfigurationSource`并返回一個`CorsConfiguration`,包含允許的源、頭和其他細節。如果你不能這樣做,你還可以在`SimpleUrlHandler`上設置`corsConfigurations`屬性,以通過`URL`模式指定`CORS`設置。如果兩者都指定了,它們將通過使用`CorsConfiguration`上的`combine`方法進行組合。
~~~
public class MyWebSocketHandler implements WebSocketHandler, CorsConfigurationSource {
@Override
public CorsConfiguration getCorsConfiguration(ServerWebExchange exchange) {
return new CorsConfiguration();
}
}
~~~
- 1.反應式編程概述
- 2.Reactor框架
- Flux
- Mono
- 訂閱(Subscribe)
- 編程創建序列
- 線程和調度器
- 錯誤處理
- 3.Spring WebFlux概述
- 4.Spring WebFlux核心組件
- HttpHandler
- WebHandler
- ServerWebExchange
- 編碼和解碼器
- JSON
- Form Data
- Multipart Data
- 過濾器
- 異常處理器
- DispatcherHandler
- 5.Spring Boot啟動WebFlux
- 6.Spring WebFlux注解控制器
- 請求映射
- 處理程序方法
- 方法參數
- 返回值
- 類型轉換
- 模型(Model)
- 數據綁定(DataBinder)
- 異常管理
- @ControllerAdvice
- 7.Spring WebFlux函數端點
- HandlerFunction
- RouterFunction
- 運行服務
- 函數過濾器
- 8.Spring Boot中使用函數端點
- 9.Spring Webflux請求處理流程
- 10.Spring WebFlux配置
- 11.Spring WebFlux使用R2DBC訪問MySQL
- 12.Spring WebFlux訪問Redis
- 13.Spring WebFlux訪問MongoDB
- 14.Spring WebFlux集成Thymeleaf
- 15.Spring WebFlux集成FreeMarker
- 16.Spring WebFlux WebClient
- 17.Spring WebFlux WebSocket
- 18.測試
- 19.RSocket