[TOC]
## 什么是反應式編程(Reactive programming)
維基百科定義:
> 在計算領域,響應式編程是一種關注數據流和變更傳播的聲明式編程范式。有了這個范例,就可以輕松地表達靜態(例如,數組)或動態(例如,事件發射器)數據流,還可以傳遞關聯執行模型中存在的推斷依賴項,這有助于自動傳播已更改的數據流。
在一個命令式編程中`a:= b + c`意味著`b+c`的結算結果賦值給`a`,后續`b,c`值的更改不會影響`a`的值。在反應式編程中,每當`b`或`c`的值發生變化時,`a`的值就會自動更新,而無需程序顯式地重新執行語句`a:= b + c`來重新賦值。
</br>
## 反應式系統特質
[反應式宣言](https://www.reactivemanifesto.org/zh-CN) 是反應式的理論基礎,定義了反應式系統的4個特質:
**1. 即時響應性(Responsive)**
只要有可能,系統就會及時地做出響應。
**2. 回彈性(Resilient)**
系統在出現失敗時依然保持即時響應性。
**3. 彈性(Elastic)**
系統在不斷變化的工作負載之下依然保持即時響應性。
**4. 消息驅動(Message Driven)**
反應式系統依賴一步的消息傳遞,從而確保了松耦合、隔離、位置透明的組件之間有著明確邊界。
</br>

## 反應式流(Reactive Streams)
作為向反應式編程方向邁出的第一步,微軟在.NET生態系統中創建了響應式擴展(`Rx`)庫。然后`RxJava`在`JVM`上實現了響應式編程。隨著時間的推移,通反應式流(`Reactive Streams`)的努力,`Java`的標準化出現了,該規范為`JVM`上的響應式庫定義了一組接口和交互規則。它的接口已經集成到`Java 9`的`Flow`類下。
</br>
反應式流[Reactive Streams](https://www.reactive-streams.org/)是非阻塞背壓異步流處理的規范,包括針對運行時環境(`JVM`和`JavaScript`)以及網絡協議。
</br>
### 目標、設計與適用范圍
在異步系統中,處理數據流—尤其是容量沒有預先確定的“實時”數據—需要特別注意。最突出的問題是需要控制資源消耗,以便快速數據源不會壓倒流目的地。為了在協作網絡主機或一臺機器中的多個CPU核上并行使用計算資源,需要異步。
Reactive Streams的主要目標是管理跨異步邊界的流數據交換—比如將元素傳遞給另一個線程或線程池—同時確保接收端不會被迫緩沖任意數量的數據。換句話說,背壓是該模型的一個組成部分,以便允許在線程之間進行中介的隊列被綁定。如果背壓的通信是同步的,那么異步處理的好處就會被抵消,因此必須謹慎地強制要求完全非阻塞。
該規范的意圖是允許創建許多符合規范的實現,這些實現通過遵守規則將能夠順利地互操作,在流應用程序的整個處理圖中保持上述優點和特征。
`Reactive Streams `的范圍是找到一組最小的接口、方法和協議,這些接口、方法和協議將描述實現目標所需的操作和實體——具有非阻塞背壓的異步數據流。
</br>
總之,`Reactive Streams`是JVM上面向流的庫的一個標準和規范:
* 處理一個潛在無限元素的數目
* 依次處理
* 異步地在組件之間傳遞元素
* 必須強制有非阻塞后壓
`Reactive Streams`規范由以下部分組成:
</br>
1、API 規定了需要實現的響應式流類型,并且在不同的實現間完成互操作性。
2、技術兼容套件(`TCK`)是用于實現一致性測試的標準測試套件。
各種實現可以自由地實現規范中沒有提到的額外特性,只要它們遵從API要求和在TCK中通過測試。
### API組件
反應式編程范式通常在面向對象語言中作為`Observer`設計模式的擴展而出現。你還可以將`Reactive Streams`模式與熟悉的`Iterator`設計模式進行比較。`Iterator`模式中`Iterable-Iterator`總是成對出現,一個主要的區別是,`Iterator`是基于`pull`的,而`Reactive Streams`是基于`push`的。
</br>
使用`Iterator`是一種命令式編程模式,盡管訪問值的方法完全由`Iterable`負責。實際上,何時訪問序列中的`next()`項取決于開發人員。在反應式流中,上面這一對的等效物是`Publisher-Subscriber`。但是`Publisher`在出現新可用值時通知`Subscriber`,而這個`push`方面是反應式的關鍵。
</br>
除了`push`值之外,錯誤處理和完成方面也以定義良好的方式進行了介紹。`Publisher`可以向其`Subscriber`推送新值(通過調用`onNext`),但也可以發出錯誤信號(通過調用`onError`)或完成信號(通過調用`onComplete`)。錯誤和完成都終止序列。可以總結如下:
</br>
~~~java
onNext x 0..N [onError | onComplete]
~~~
</br>
該API由以下組件組成,這些組件必須由`Reactive Streams`實現提供:
1. `Publisher` 發布者
2. `Subscriber` 訂閱者
3. `Subscription` 訂閱對象
4. `Processor` 處理者
下圖展示了訂閱者與發布者交互的典型場景:

接口定義:
~~~java
//Publisher將數據流發送到 Subscriber
public static interface Publisher<T> {
//指定訂閱者 Subscriber
public void subscribe(Subscriber<? super T> subscriber);
}
~~~
~~~java
//消費處理 Publisher 發送過來的數據流
public static interface Subscriber<T> {
//開啟訂閱Subscription具體的訂閱對象
public void onSubscribe(Subscription subscription);
//接收數據
public void onNext(T item);
//錯誤處理
public void onError(Throwable throwable);
//數據處理結束
public void onComplete();
}
~~~
~~~java
//訂閱對象:發布者和訂閱者之間交互的操作對象
public static interface Subscription {
//訂閱者拿到訂閱對象后,通過調用訂閱對象的request方法,根據自身消費能力請求n條數據
//request方法被調用時,會觸發訂閱者的onNext事件方法,把數據傳輸給訂閱者。
//數據全部傳輸完成,則觸發訂閱者的onComplete事件方法。如果數據傳輸發生錯誤,則觸發訂閱者的onError事件方法。
public void request(long n);
//調用cancel方法來停止接收數據
public void cancel();
}
~~~
~~~java
//處理者既是發布者又是訂閱者,用于發布者和訂閱者之間轉換數據格式,
//把發布者的T類型數據轉換為訂閱者接受的R類型數據。處理者作為數據轉換的中介不是必須的。
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
~~~
## Java9 Flow
jdk9中的`java.util.concurrent.Flow`接口`1:1`語義上等價于對應的`Reactive Streams`,`Flow`接口約定了`Reactive`編程的一套規范,并沒有具體的實現。常用的實現有`RxJava`、`Reactor`、`Akka`等,`Spring WebFlux`中集成的是`Reactor3.0`。
- 1.反應式編程概述
- 2.Reactor框架
- Flux
- Mono
- 訂閱(Subscribe)
- 編程創建序列
- 線程和調度器
- 錯誤處理
- 3.Spring WebFlux概述
- 4.Spring WebFlux核心組件
- HttpHandler
- WebHandler
- ServerWebExchange
- 編碼和解碼器
- JSON
- Form Data
- Multipart Data
- 過濾器
- 異常處理器
- DispatcherHandler
- 5.Spring Boot啟動WebFlux
- 6.Spring WebFlux注解控制器
- 請求映射
- 處理程序方法
- 方法參數
- 返回值
- 類型轉換
- 模型(Model)
- 數據綁定(DataBinder)
- 異常管理
- @ControllerAdvice
- 7.Spring WebFlux函數端點
- HandlerFunction
- RouterFunction
- 運行服務
- 函數過濾器
- 8.Spring Boot中使用函數端點
- 9.Spring Webflux請求處理流程
- 10.Spring WebFlux配置
- 11.Spring WebFlux使用R2DBC訪問MySQL
- 12.Spring WebFlux訪問Redis
- 13.Spring WebFlux訪問MongoDB
- 14.Spring WebFlux集成Thymeleaf
- 15.Spring WebFlux集成FreeMarker
- 16.Spring WebFlux WebClient
- 17.Spring WebFlux WebSocket
- 18.測試
- 19.RSocket