[TOC]
# :-: **RxJS框架介紹及使?詳解**
# **概念**
**RxJS**是 **Reactive Extensions for JavaScript**的縮寫,起源于 **Reactive Extensions**,是?個基于可觀測數據流 **Stream**結合觀察者模式和迭代器模式的?種異步編程的應?庫。 **RxJS**是 **Reactive****Extensions**在 **JavaScript**上的實現。
注意!它跟 **React**沒啥關系,筆者最初眼花把它看成了 **React.js**的縮寫(恥辱啊!!!)
對于陌?的技術??,我們?般的思路莫過于,打開百度(google),搜索,然后查看官??檔,或者從零散的博客當中,去找尋能夠理解這項技術的信息。但在很多時候,僅從?些只??語中,的確也很難真正了解到?門技術的來龍去脈。
本?將從學習的?度來解析這項技術具備的價值以及能給我們現有項?中帶來的好處。
# **背景**
從開發者?度來看,對于任何?項技術??,我們經常會去談論的,莫過于以下?點:
* 應?場景?
* 如何落地?
* 上?難易程度如何?
* 為什么需要它?它解決了什么問題?
針對以上問題,我們可以由淺?深的來刨析?下 **RxJS**的相關理念。
# **應?場景?**
假設我們有這樣?個需求:
我們上傳?個??件之后,需要實時監聽他的進度,并且待進度進?到 100 的時候停?監聽。
對于?般的做法我們可以采?短輪詢的?式來實現,在對于異步請求的封裝的時候,如果我們采? **Promise**的?式,那么我們?般的做法就可以采?編寫?個?于輪詢的?法,獲取返回值進?處理,如果進度沒有完成則延遲?定時間再次調?該?法,同時在出現錯誤的時候需要捕獲錯誤并處理。顯然,這樣的處理?式?疑在?定程度上給開發者帶來了?定開發和維護成本,因為這個過程更像是我們在觀察?個事件,這個事件會多次觸發并讓我感知到,不僅如此還要具備取消訂閱的能?, **Promise**在處理這種事情時的?式其實并不友好,? **RxJS**對于異步數據流的管理就更加符合這種范式。
引?尤?的話:
我個?傾向于在適合 **Rx**的地?? ?**Rx**,但是不強求 ?**Rx for everything**。?較合適的例?就是?如多個服務端實時消息流,通過 ?**Rx**進??階處理,最后到 **view**層就是很清晰的?個 **Observable**,但是 **view**層本?處理?戶事件依然可以沿?現有的范式。
# **如何落地?**
針對現有項?來說,如何與實際結合并保證原有項?的穩定性也的確是我們應該優先考慮的問題,畢竟任何?項技術如果?法落地實踐,那么必然給我們帶來的收益是?較有限的。
這?如果你是?名使? **Angular**的開發者,或許你應該知道 **Angular**中深度集成了 **Rxjs**,只要你使? **Angular**框架,你就不可避免的會接觸到 RxJs 相關的知識。
在?些需要對事件進?更為精確控制的場景下,?如我們想要監聽點擊事件 (click event),但點擊三次之后不再監聽。
那么這個時候引? ???**RxJS**進?功能開發是?分便利?有效的,讓我們能省去對事件的監聽并且記錄點擊的狀態,以及需要處理取消監聽的?些邏輯上的?理負擔。
你也可以選擇為你的?型項?引? ?**RxJS**進?數據流的統?管理規范,當然也不要給本不適合 ?**RxJS**理念的場景強加使?,這樣實際帶來的效果可能并不明顯。
# **上?難易程度如何?**
如果你是?名具備?定開發經驗的 **JavaScript**開發者,那么?分鐘或許你就能將 **RxJS**應?到?些簡單的實踐中了。
# **為什么需要它?它解決了什么問題?**
如果你是?名使? ???**JavaScript**的開發者,在?對眾多的事件處理,以及復雜的數據解析轉化時,是否常常容易寫出?分低效的代碼或者是臃腫的判斷以及?量臟邏輯語句?
不僅如此,在 **JavaScript**的世界?,就眾多處理異步事件的場景中來看,“?煩” 兩個字似乎經常容易被提起,我們可以先從 **JS**的異步事件的處理?式發展史中來細細品味 **RxJS**帶來的價值。

# **回調函數時代(callback)**
使?場景:
* 事件回調
* **Ajax**請求
* **Node API**
* **setTimeout**、 **setInterval**等異步事件回調
在上述場景中,我們最開始的處理?式就是在函數調?時傳??個回調函數,在同步或者異步事件完成之后,執?該回調函數。可以說在?部分簡單場景下,采?回調函數的寫法?疑是很?便的,?如我們熟知的?個?階函數:
* **forEach**
* **map**
* **filter**
```
[1, 2, 3].forEach(function (item, index) { console.log(item, index);})
```
他們的使??式只需要我們傳??個回調函數即可完成對?組數據的批量處理,很?便也很清晰明了。
但在?些復雜業務的處理中,我們如果仍然秉持不拋棄不放棄的想法頑強的使?回調函數的?式就可能會出現下?的情況:
```
fs.readFile('a.txt', 'utf-8', function(err, data)
{fs.readFile('b.txt', 'utf-8', function(err, data1)
{fs.readFile('c.txt', 'utf-8', function(err, data2) {
})
})
})
```
當然作為編寫者來說,你可能覺得說這個很清晰啊,沒啥不好的。但是如果再復雜點呢,如果調?的函數都不?樣呢,如果每?個回調??的內容都?分復雜呢。短期內??可能清楚為什么這么寫,?的是什么,但是過了?個?、三個?、?年后,你確定在眾多業務代碼中你還能找回當初的本?嗎?
你會不會迫不及待的查找提交記錄,這是哪個憨批寫的,跟**shit** ,臥槽怎么是我寫的。
這時候,?對眾多開發者苦不堪?的 回調地域,終于還是有?出來造福?類了......
# **Promise 時 代**
**Promise**最初是由社區提出(畢竟作為每天與奇奇怪怪的業務代碼打交道的我們來說,?直?回調頂不住了啊),后來官?正式在 ??**ES6**中將其加?語?標準,并進?了統?規范,讓我們能夠原?就能 **new**?個 **Promise**。
就優勢??, ??**Promise**帶來了與回調函數不?樣的編碼?式,它采?鏈式調?,將數據?層?層往后拋,并且能夠進?統?的異常捕獲,不像使?回調函數就直接炸了,還得在眾多的代碼中?個個 **try catch**。
話不多說,看碼!
```
function readData(filePath) {
return new Promise((resolve, reject) => {
fs.readFile(filePath, 'utf-8', (err, data) => {
if (err) reject(err); resolve(data);
})
});
}
readData('a.txt').then(res => {
return readData('b.txt');
}).then(res => {
return readData('c.txt');
}).then(res => {
return readData('d.txt');
}).catch(err => {
console.log(err);
})
```
對??下,這種寫法會不會就更加符合我們正常的思維邏輯了,這種順序下,讓?看上去?分舒暢,也更利于代碼的維護。
優點:
* 狀態改變就不會再變,任何時候都能得到相同的結果
* 將異步事件的處理流程化,寫法更?便
缺點:
* ?法取消
* 錯誤?法被 **try****catch**(但是可以使? **.catch**?式)
* ?**pending**狀態時?法得知現在處在什么階段
雖然 ??**Promise**的出現在?定程度上提?了我們處理異步事件的效率,但是在需要與?些同步事件的進?混合處理時往往我們還需要?臨?些并不太友好的代碼遷移,我們需要把原本放置在外層的代碼移到 **Promise**的內部才能保證某異步事件完成之后再進?繼續執?。
# **Generator 函 數**
**ES6**新引?了 **Generator**函數,可以通過 **yield**關鍵字,把函數的執?流掛起,為改變執?流程提供了可能,從?為異步編程提供解決?案。形式上也是?個普通函數,但有?個顯著的特征:
* **function**關鍵字與函數名之間有?個星號 "\*" (推薦緊挨著 **function**關鍵字)
* **yield·** 表達式,定義不同的內部狀態(可以有多個yield`)
* **Generator**函數并不會執?,也不會返回運?結果,?是返回?個遍歷器對象( **Iterator Object**)
* 依次調?遍歷器對象的 **next**?法,遍歷 **Generator**函數內部的每?個狀態
```
function read(){
let a= yield '666';
console.log(a);
let b = yield 'ass';
console.log(b); return 2
}
let it = read();
console.log(it.next());
console.log(it.next());
console.log(it.next());
console.log(it.next());
```
這種模式的寫法我們可以?由的控制函數的執?機制,在需要的時候再讓函數執?,但是對于?常項?中來說,這種寫法也是不夠友好的,?法給與使?者最直觀的感受。
# **async / await**
相信在經過許多?試題的洗禮后,?家或多或少應該也知道這玩意其實就是?個語法糖,內部就是把 ?**Generator**函數與?動執?器 ?**co**進?了結合, 讓我們能以同步的?式編寫異步代碼,?分暢快。
有?說?,這玩意著實好?,要不是要考慮兼容性,真就想??積使?這種?式。 ???再來看看?它編寫的代碼有多快樂:
```
async readFileData() {
const data = await Promise.all([
'異步事件?',
'異步事件?',
'異步事件三'
]);
console.log(data);
}
```
直接把它當作同步?式來寫,完全不要考慮把?堆代碼復制粘貼的?個其他異步函數內部,屬實簡潔明了。
# **RxJS**
它在使??式上,跟 ?**Promise**有點像,但在能?上? ?**Promise**強?多了,不僅僅能夠以流的形式對數據進?控制,還內置許許多多的內置?具?法讓我們能?分?便的處理各種數據層?的操作,讓我們的代碼如絲?般順滑。
優勢:
* 代碼量的?幅度減少
* 代碼可讀性的提?
* 很好的處理異步
* 事件管理、調度引擎
* ?分豐富的操作符
* 聲明式的編程風格
```
function readData(filePath) {
return new Observable((observer) => {
fs.readFile(filePath, 'utf-8', (err, data) => {
if (err) observer.error(err);
observer.next(data);
})
});
}
Rx.Observable
.forkJoin(readData('a.txt'),
readData('b.txt'),
readData('c.txt'))
.subscribe(data => console.log(data));
```
這?展?的僅僅是 **RxJS**能表達能量的冰???,對于這種場景的處理辦法還有多種?式。 **RxJS**擅長處理異步數據流,?且具有豐富的庫函數。對于 **RxJS**??,他能將任意的 **Dom**事件,或者是 **Promise**轉換成 **observables**。
## **前置知識點**
在正式進? **RxJS**的世界之前,我們?先需要明確和了解?個概念:
* 響應式編程( **Reactive Programming**) 流( **Stream**)
* 觀察者模式(發布訂閱)
* 迭代器模式
## **響應式編程(Reactive Programming)**
響應式編程( **Reactive Programming**),它是?種基于事件的模型。在上?的異步編程模式中,我們描述了兩種獲得上?個任務執?結果的?式,
?個就是主動輪訓,我們把它稱為 **Proactive**?式。另?個就是被動接收反饋,我們稱為 **Reactive**。簡單來說,在 **Reactive**?式中,上?個任務的結果的反饋就是?個事件,這個事件的到來將會觸發下?個任務的執?。
響應式編程的思路?概如下:你可以?包括 **Click**和 **Hover**事件在內的任何東西創建 ?**Data stream**(也稱 “流”,后續章節詳述)。 **Stream**廉價且常見,任何東西都可以是?個 ?**Stream**:變量、?戶輸?、屬性、 **Cache**、數據結構等等。舉個例?,想像?下你的 ?**Twitter feed**就像是**Click events**那樣的 **Data stream**,你可以監聽它并相應的作出響應。

結合實際,如果你使?過 **Vue**,必然能夠第?時間想到, **Vue**的設計理念不也是?種響應式編程范式么,我們在編寫代碼的過程中,只需要關注數據的變化,不必?動去操作視圖改變,這種 **Dom**層的修改將隨著相關數據的改變??動改變并重新渲染。
**流(** **Stream** **)**
流作為概念應該是語??關的。?件 **IO**流, **Unix**系統標準輸?輸出流,標準錯誤流 ( **stdin**, **stdout**, **stderr**),還有?開始提到的 **TCP**流,還有?些 **Web**后臺技術(如 **Nodejs**)對 **HTTP**請求 / 響應流的抽象,都可以見到流的概念。作為響應式編程的核?,流的本質是?個按時間順序排列的進?中事件的序列集合。

對于?流或多個流來說,我們可以對他們進?轉化,合并等操作,?成?個新的流,在這個過程中,流是不可改變的,也就是只會在原來的基礎返回?個新的 **stream**。
## **觀察者模式**
在眾多設計模式中,觀察者模式可以說是在很多場景下都有著?較明顯的作?。
觀察者模式是?種?為設計模式,允許你定義?種訂閱機制, 可在對象事件發?時通知多個 “觀察” 該對象的其他對象。
?實際的例?來理解,就?如你訂了?個銀?卡余額變化短信通知的服務,那么這個時候,每次只要你轉賬或者是購買商品在使?這張銀?卡消費之后,銀?的系統就會給你推送?條短信,通知你消費了多少多少錢,這種其實就是?種觀察者模式,?稱發布 - 訂閱模式。
在這個過程中,銀?卡余額就是被觀察的對象,??戶就是觀察者。

優點:
* 降低了?標與觀察者之間的耦合關系,兩者之間是抽象耦合關系。
* 符合依賴倒置原則。
* ?標與觀察者之間建?了?套觸發機制。
* ?持?播通信
不?:
* ?標與觀察者之間的依賴關系并沒有完全解除,?且有可能出現循環引?。
* 當觀察者對象很多時,通知的發布會花費很多時間,影響程序的效率。
## **迭代器模式**
迭代器( ?**Iterator**)模式?叫游標( ?**Sursor**)模式,在?向對象編程?,迭代器模式是?種設計模式,是?種最簡單也最常見的設計模式。迭代器模式可以把迭代的過程從從業務邏輯中分離出來,它可以讓?戶透過特定的接?巡訪容器中的每?個元素?不?了解底層的實現。

```
const iterable = [1, 2, 3];
const iterator = iterable[Symbol.iterator]();
iterator.next();
iterator.next();
iterator.next();
iterator.next();
```
作為前端開發者來說,我們最常遇到的部署了 **iterator**接?的數據結構不乏有: **Map**、 **Set**、 **Array**、類數組等等,我們在使?他們的過程中,均能使?同?個接?訪問每個元素就是運?了迭代器模式。
**Iterator**作?:
* 為各種數據結構,提供?個統?的、簡便的訪問接?;
* 使得數據結構的成員能夠按某種次序排列;
* 為新的遍歷語法 **for...of**實現循環遍歷
在許多?章中,有?會喜歡把迭代器和遍歷器混在?起進?概念解析,其實他們表達的含義是?致的,或者可以說(迭代器等于遍歷器)。
## **Observable**
表??個概念,這個概念是?個可調?的未來值或事件的集合。它能被多個 **observer**訂閱,每個訂閱關系相互獨?、互不影響。

舉個栗?:
假設你訂閱了?個博客或者是推送?章的服務號(微信公眾號之類的),之后只要公眾號更新了新的內容,那么該公眾號就會把新的?章推送給你,在這段關系中,這個公眾號就是?個 **Observable**,?來產?數據的數據源。
相信看完上?的描述,你應該對 **Observable**是個什么東西有了?定的了解了,那么這就好辦了,下?我們來看看在 **RxJS**中如何創建?個 **Observable**。
```
const Rx = require('rxjs/Rx')
const myObservable = Rx.Observable.create(observer =>
{ observer.next('foo');
setTimeout(() => observer.next('bar'), 1000);
});
```
我們可以調? **Observable.create**?法來創建?個 **Observable**,這個?法接受?個函數作為參數,這個函數叫做 **producer**函數, ?來?成
**Observable**的值。這個函數的?參是 **observer**,在函數內部通過調? **observer.next()**便可?成有?系列值的?個 **Observable**。
我們先不應理會 **observer**是個什么東西,從創建?個 **Observable**的?式來看,其實也就是調??個 **API**的事,?分簡單,這樣?個簡單的 **Observable**對象就創建出來了。
## **Observer**
?個回調函數的集合,它知道如何去監聽由 ?**Observable**提供的值。 ?**Observer**在信號流中是?個觀察者(哨兵)的??,它負責觀察任務執?的狀態并向流中發射信號。

這?我們簡單實現?下內部的構造:
```
const observer = {
next: function(value) {
console.log(value);
},
error: function(error) {
console.log(error)
},
complete: function() {
console.log('complete')
}
}
```
在 **RxJS**中, **Observer**是可選的。在 **next**、 **error**和 **complete**處理邏輯部分缺失的情況下, **Observable**仍然能正常運?,為包含的特定通知類型的處理邏輯會被?動忽略。
?如我們可以這樣定義:
```
const observer = {
next: function(value) {
console.log(value);
},
error: function(error) {
console.log(error)
}
}
```
它依舊是可以正常的運?。
那么它?是怎么來配合我們在實際戰?中使?的呢:
```
const myObservable = Rx.Observable.create((observer) =>
{observer.next('111')
setTimeout(() => {
observer.next('777')
}, 3000)
})
myObservable.subscribe((text) => console.log(text));
```
這?直接使? **subscribe**?法讓?個 **observer**訂閱?個 **Observable**,我們可以看看這個 **subscribe**的函數定義來看看怎么實現訂閱的:
```
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription;
```
源碼是? **ts**寫的,代碼即?檔,?分清晰,這?筆者給?家解讀?下,我們從?參來看,從左?右依次是 **next**、 **error**, **complete**,且是可選的, 我們可以??選擇性的傳?相關回調,從這?也就印證了我們上?所說 **next**、 **error**和 **complete**處理邏輯部分缺失的情況下仍可以正常運?,因為他們都是可選的。
## **Subscription 與 Subject**
### **Subscription**
**Subscription**就是表? ?**Observable**的執?,可以被清理。這個對象最常?的?法就是 ?**unsubscribe**?法,它不需要任何參數,只是?來清理由 **Subscription**占?的資源。同時,它還有 **add**?法可以使我們取消多個訂閱。
```
const myObservable = Rx.Observable.create(observer =>
{ observer.next('foo');
setTimeout(() =>
observer.next('bar'), 1000);
});
const subscription = myObservable.subscribe(x => console.log(x)); subscription.unsubscribe();
```
### **Subject ( 主體)**
它是?個代理對象,既是?個 **Observable**?是?個 ?**Observer**,它可以同時接受 **Observable**發射出的數據,也可以向訂閱了它的 ?**observer**發射數據,同時, **Subject**會對內部的 **observers**清單進?多播 ( **multicast**)

**Subjects**是將任意 **Observable**執?共享給多個觀察者的唯??式這個時候眼尖的讀者會發現,這?產?了?個新概念——多播。
* 那么多播?是什么呢?
* 有了多播是不是還有單播?
* 他們的區別?是什么呢?](images/screenshot_1661654475959.png)
接下來就讓筆者給?家好好分析這兩個概念吧。

### **單播**
普通的 **Observable**是單播的,那么什么是單播呢?
單播的意思是,每個普通的 **Observables**實例都只能被?個觀察者訂閱,當它被其他觀察者訂閱的時候會產??個新的實例。也就是普通
**Observables**被不同的觀察者訂閱的時候,會有多個實例,不管觀察者是從何時開始訂閱,每個實例都是從頭開始把值發給對應的觀察者。
```
const Rx = require('rxjs/Rx')
const source = Rx.Observable.interval(1000).take(3);
source.subscribe((value) => console.log('A ' + value))
setTimeout(() => {
source.subscribe((value) => console.log('B ' + value))
}, 1000)
```
看到陌?的調?不要慌,后?會進?詳細解析,這?的 **source**你可以理解為就是?個每隔?秒發送?個從 0 開始遞增整數的 **Observable**就?了, 且只會發送三次( **take**操作符其實也就是限定拿多少個數就不在發送數據了。)。
從這?我們可以看出兩個不同觀察者訂閱了同?個源( **source**),?個是直接訂閱,另?個延時?秒之后再訂閱。
從打印的結果來看, **A**從 0 開始每隔?秒打印?個遞增的數,? **B**延時了?秒,然后再從 0 開始打印,由此可見, **A**與 **B**的執?是完全分開的,也就是每次訂閱都創建了?個新的實例。
在許多場景下,我們可能會希望 **B**能夠不從最初始開始接受數據,?是接受在訂閱的那?刻開始接受當前正在發送的數據,這就需要?到多播能
?了。
### **多播**
那么如果實現多播能?呢,也就是實現我們不論什么時候訂閱只會接收到實時的數據的功能。
可能這個時候會有?伙伴跳出來了,直接給個中間?來訂閱這個源,然后將數據轉發給 **A**和 **B**不就?了?
```
const source = Rx.Observable.interval(1000).take(3);
const subject = {
observers: [],
subscribe(target) {
this.observers.push(target);
},
next: function(value) {
this.observers.forEach((next) => next(value))
}
}
source.subscribe(subject);
subject.subscribe((value) => console.log('A ' + value))
setTimeout(() => {
subject.subscribe((value) => console.log('B ' + value))
}, 1000)
```
先分析?下代碼, **A**和 **B**的訂閱和單播?代碼并?差別,唯?變化的是他們訂閱的對象由 **source**變成了 **subject**,然后再看看這個 **subject**包含了什么,這?做了?些簡化,移除了 **error**、 **complete**這樣的處理函數,只保留了 **next**,然后內部含有?個 **observers**數組,這?包含了所有的訂閱者,暴露?個 **subscribe**?于觀察者對其進?訂閱。
在使?過程中,讓這個中間商 **subject**來訂閱 **source**,這樣便做到了統?管理,以及保證數據的實時性,因為本質上對于 **source**來說只有?個訂閱者。
這?主要是?便理解,簡易實現了 **RxJS**中的 **Subject**的實例,這?的中間?可以直接換成 **RxJS**的 **Subject**類實例,效果是?樣的
```
const source = Rx.Observable.interval(1000).take(3);
const subject = new Rx.Subject(); source.subscribe(subject);
subject.subscribe((value) => console.log('A ' + value))
setTimeout(() => {
subject.subscribe((value) => console.log('B ' + value))
}, 1000)
```
同樣先來看看打印的結果是否符合預期,?先 **A**的打印結果并?變化, **B**?次打印的數字現在是從 1 開始了,也就當前正在傳輸的數據,這下滿
?了我們需要獲取實時數據的需求了。
不同于單播訂閱者總是需要從頭開始獲取數據,多播模式能夠保證數據的實時性。除了以上這些, **RxJS**還提供了 **Subject**的三個變體:
* **BehaviorSubject**
* **ReplaySubject**
* **AsyncSubject**
https://blog.csdn.net/web220507/article/details/128913157
### **Behavior** **Subject**
**BehaviorSubject**是?種在有新的訂閱時會額外發出最近?次發出的值的 **Subject**。

同樣我們結合現實場景來進?理解,假設有我們需要使?它來維護?個狀態,在它變化之后給所有重新訂閱的?都能發送?個當前狀態的數據,這 ????????就好?我們要實現?個計算屬性,我們只關?該計算屬性最終的狀態,?不關?過程中變化的數,那么?該怎么處理呢?
我們知道普通的 **Subject**只會在當前有新數據的時候發送當前的數據,?發送完畢之后就不會再發送已發送過的數據,那么這個時候我們就可以引?**BehaviorSubject**來進?終態維護了,因為訂閱了該對象的觀察者在訂閱的同時能夠收到該對象發送的最近?次的值,這樣就能滿?我們上述的需求了。
然后再結合代碼來分析這種 **Subject**應?的場景:
```
const subject = new Rx.Subject();
subject.subscribe((value) => console.log('A:' + value))
subject.next(1);
subject.next(2);
setTimeout(() => {
subject.subscribe((value) => console.log('B:' + value));
}, 1000)
```
?先演?的是采?普通 **Subject**來作為訂閱的對象,然后觀察者 **A**在實例對象 **subject**調? **next**發送新的值之前訂閱的,然后觀察者是延時?秒之后訂閱的,所以 **A**接受數據正常,那么這個時候由于 **B**在數據發送的時候還沒訂閱,所以它并沒有收到數據。
那么我們再來看看采? **BehaviorSubject**實現的效果:
```
const subject = new Rx.BehaviorSubject(0);
subject.subscribe((value: number) => console.log('A:' + value))
subject.next(1);
subject.next(2);
setTimeout(() => {
subject.subscribe((value: number) => console.log('B:' + value))
}, 1000)
```
同樣從打印的結果來看,與普通 ???**Subject**的區別在于,在訂閱的同時源對象就發送了最近?次改變的值(如果沒改變則發送初始值),這個時候我們的 **B**也如愿獲取到了最新的狀態。
這?在實例化 **BehaviorSubject**的時候需要傳??個初始值。
### **Replay** **Subject**
在理解了 **BehaviorSubject**之后再來理解 **ReplaySubject**就?較輕松了, **ReplaySubject**會保存所有值,然后回放給新的訂閱者,同時它提供了?參?于控制重放值的數量(默認重放所有)。

什么?還不理解?看碼:
```
const subject = new Rx.ReplaySubject(2);
subject.next(0); subject.next(1); subject.next(2);
subject.subscribe((value:number)=>console.log('A:+ value))
subject.next(3);
subject.next(4);
setTimeout(() => {
subject.subscribe((value: number) => console.log('B:' + value))
}, 1000)
```
我們先從構造函數傳參來看, **BehaviorSubject**與 **ReplaySubject**都需要傳??個參數,對 **BehaviorSubject**來說是初始值,?對于 **ReplaySubject**來說就是重放先前多少次的值,如果不傳?重放次數,那么它將重放所有發射過的值。
從結果上看,如果你不傳?確定的重放次數,那么實現的效果與之前介紹的單播效果?乎沒有差別。所以我們再分析代碼可以知道在訂閱的那?刻,觀察者們就能收到源對象前多少次發送的值。
### **Async** **Subject**
**AsyncSubject**只有當 **Observable**執?完成時 (執? **complete()**),它才會將執?的最后?個值發送給觀察者,如果因異常?終?, **AsyncSubject**將不會釋放任何數據,但是會向 **Observer**傳遞?個異常通知。

**AsyncSubject**?般?的?較少,更多的還是使?前?三種。
```
const subject = new Rx.AsyncSubject();
subject.next(1);
subject.subscribe(res => { console.log('A:' + res);
});
subject.next(2);
subject.subscribe(res => {
console.log('B:' + res);
});
subject.next(3);
subject.subscribe(res => {
console.log('C:' + res);
});
subject.complete(); subject.next(4);
```
從打印結果來看其實已經很好理解了,也就是說對于所有的觀察者們來說,源對象只會在所有數據發送完畢也就是調? ??**complete**?法之后才會把最后?個數據返回給觀察者們。
這就好??說?經常有的,當你要放技能的時候,先要打?套起?式,打完之后才會放出你的?招。
## **Cold- Observables 與 Hot- Observables**

### **Cold** **Observables**
**Cold Observables**只有被 **observers**訂閱的時候,才會開始產?值。是單播的,有多少個訂閱就會?成多少個訂閱實例,每個訂閱都是從第?個產?的值開始接收值,所以每個訂閱接收到的值都是?樣的。
如果?家想要參考 **Cold Observables**相關代碼,直接看前?的單播?例就?了。
正如單播描述的能?,不管觀察者們什么時候開始訂閱,源對象都會從初始值開始把所有的數都發給該觀察者。
### **Hot****Observables**
**Hot****Observables**不管有沒有被訂閱都會產?值。是多播的,多個訂閱共享同?個實例,是從訂閱開始接受到值,每個訂閱接收到的值是不同的, 取決于它們是從什么時候開始訂閱。
這?有?種場景,我們可以逐?分析?下便于理解:
#### **“ 加熱”**
?先可以忽略代碼中出現的陌?的函數,后?會細說。
```
const source = Rx.Observable.of(1, 2).publish();
source.connect();
source.subscribe((value) => console.log('A:' + value));
setTimeout(() => {
source.subscribe((value) => console.log('B:' + value));
}, 1000);
```
這??先? **Rx**的操作符 **of**創建了?個 **Observable**,并且后?跟上了?個 **publish**函數,在創建完之后調? **connect**函數進?開始數據發送。
最終代碼的執?結果就是沒有任何數據打印出來,分析?下原因其實也?較好理解,由于開啟數據發送的時候還沒有訂閱,并且這是?個 **Hot****Observables**,它是不會理會你是否有沒有訂閱它,開啟之后就會直接發送數據,所以 **A**和 **B**都沒有接收到數據。
當然你這?如果把 **connect**?法放到最后,那么最終的結果就是 **A**接收到了, **B**還是接不到,因為 **A**在開啟發數據之前就訂閱了,? **B**還要等?秒。
#### **更直觀的場景**
正如上述多播所描述的,其實我們更多想看到的現象是能夠 ?**A**和 ?**B**兩個觀察者能夠都有接收到數據,然后觀察數據的差別,這樣會?便理解。這?直接換?個發射源:
```
const source = Rx.Observable.interval(1000).take(3).publish(); source.subscribe((value: number) => console.log('A:' + value));
setTimeout(() => {
source.subscribe((value: number) => console.log('B:' + value));
}, 3000);
source.connect();
```
這?我們利? **interval**配合 **take**操作符每秒發射?個遞增的數,最多三個,然后這個時候的打印結果就更清晰了, **A**正常接收到了三個數, **B**三秒之后才訂閱,所以只接收到了最后?個數 2,這種?式就是上述多播所描述的并???。
### **兩者對?**
* **Cold** **Observables**:舉個栗?會?較好理解?點:?如我們上 B?站看番,更新了新番,我們不論什么時候去看,都能從頭開始看到完整的劇集,與其他?看不看毫?關聯,互不?擾。
* **Hot Observables**:這就好?我們上 B 站看直播,直播開始之后就直接開始播放了,不管是否有沒有訂閱者,也就是說如果你沒有?開始就訂閱它,那么你過?段時候后再去看,是不知道前?直播的內容的。
### **上述代碼中出現的操作符解析**
在創建 **Hot Observables**時我們?到了 **publish**與 **connect**函數的結合,其實調?了 **publish**操作符之后返回的結果是?個 **ConnectableObservable**,然后該對象上提供了 **connect**?法讓我們控制發送數據的時間。
* **publish**:這個操作符把正常的 **Observable**( **Cold Observables**)轉換成 **ConnectableObservable**。
* **ConnectableObservable**: **ConnectableObservable**是多播的共享 **Observable**,可以同時被多個 **observers**共享訂閱,它是 **Hot**
**Observables**。 **ConnectableObservable**是訂閱者和真正的源頭 **Observables**(上?例?中的 **interval**,每隔?秒發送?個值,就是源頭
**Observables**)的中間?, **ConnectableObservable**從源頭 **Observables**接收到值然后再把值轉發給訂閱者。
* **connect()**: **ConnectableObservable**并不會主動發送值,它有個 **connect**?法,通過調? **connect**?法,可以啟動共享 **ConnectableObservable** 發送值。當我們調? ????**ConnectableObservable.prototype.connect**?法,不管有沒有被訂閱,都會發送值。訂閱者共享同?個實例,訂閱者接收到的值取決于它們何時開始訂閱。
其實這種?動控制的?式還挺?煩的,有沒有什么更加?便的操作?式呢,?如監聽到有訂閱者訂閱了才開始發送數據,?旦所有訂閱者都取消了,就停?發送數據?其實也是有的,讓我們看看引?計數( **refCount**):
### **引?計數**
這?主要?到了 **publish**結合 **refCount**實現?個 “?動擋” 的效果。
```
const source = Rx.Observable.interval(1000).take(3).publish().refCount(); setTimeout(() => {
source.subscribe(data => { console.log("A:" + data) });
setTimeout(() => {
source.subscribe(data => { console.log("B:" + data) });
}, 1000);
}, 2000);
```
我們透過結果看本質,能夠很輕松的發現,只有當 **A**訂閱的時候才開始發送數據( **A**拿到的數據是從 0 開始的),并且當 **B**訂閱時,也是只能獲取到當前發送的數據,?不能獲取到之前的數據。
不僅如此,這種 “?動擋” 當所有訂閱者都取消訂閱的時候它就會停?再發送數據了。
## **Schedulers( 調度器)**
?來控制并發并且是中央集權的調度員,允許我們在發?計算時進?協調,例如 **setTimeout**或 **requestAnimationFrame**或其他。
* 調度器是?種數據結構。 它知道如何根據優先級或其他標準來存儲任務和將任務進?排序。
* 調度器是執?上下?。 它表?在何時何地執?任務 (舉例來說,?即的,或另?種回調函數機制 (?如 **setTimeout**或 **process.nextTick**),或動畫幀)。
* 調度器有?個 (虛擬的) 時鐘。 調度器功能通過它的 **getter**?法 **now()**提供了 “時間” 的概念。在具體調度器上安排的任務將嚴格遵循該時鐘所表?的時間。
學到這相信?家也已經或多或少對 ???**RxJS**有?定了解了,不知道?家有沒有發現?個疑問,前?所展?的代碼?例中有同步也有異步,?筆者卻沒有顯?的控制他們的執?,他們的這套執?機制到底是什么呢?
其實他們的內部的調度就是靠的 ?**Schedulers**來控制數據發送的時機,許多操作符會預設不同的 ?**Scheduler**,所以我們不需要進?特殊處理他們就能良好的進?同步或異步運?。
```
const source = Rx.Observable.create(function (observer: any)
{ observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
});
console.log(' 訂 閱 前 ');
source.observeOn(Rx.Scheduler.async)
.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
console.log('訂閱后');
```
從打印結果上來看,數據的發送時機的確已經由同步變成了異步,如果不進?調度?式修改,那么 ???“訂閱后” ???的打印應該是在數據發送完畢之后才會執?的。
看完?例之后我們再來研究這個調度器能做哪?種調度:
* **queue**
* **asap**
* **async**
* **animationFrame**
### **queue**
將每個下?個任務放在隊列中,?不是?即執?
**queue**延遲使?調度程序時,其?為與 **async**調度程序相同。
當沒有延遲使?時,它將同步安排給定的任務 ???- ???在安排好任務后?即執?。但是,當遞歸調?時(即在已調度的任務內部),將使?隊列調度程序調度另?個任務,?不是?即執?,該任務將被放?隊列并等待當前任務完成。
這意味著,當您使? **queue**調度程序執?任務時,您確定它會在該調度程序調度的其他任何任務開始之前結束。這個同步與我們平常理解的同步可能不太?樣,筆者當時也都困惑了?會。
還是??個官?的例?來講解這種調度?式是怎么理解吧:
```
import { queueScheduler } from 'rxjs';
queueScheduler.schedule(() => {
queueScheduler.schedule(() =>
console.log('second'));
console.log('first');
});
```
我們?需關注陌?的函數調?,我們這?著重于看這種調度?式與平常的同步調度的區別。
?先我們調? ?**queueScheduler**的 ?**schedule**?法開始執?,然后函數內部?同樣再以同樣的?式調?(這?也可以改成遞歸,不過這??這個?例去理解可能會好?點),并且傳??個函數,打印 **second**。
然后繼續看下?的語句,?個普通的 **console.log('first')**,然后我們再來看看打印結果:
是不是有點神奇,如果沒看明?為啥的,可以再回頭看看前? **queue**對于遞歸執?的處理?式。也就是說如果遞歸調?,它內部會維護?個隊
列,然后等待先加?隊列的任務先執?完成(也就是上?的 **console.log('first')**執?完才會執? **console.log('second')**,因為 **console.log('second')**這個任務是后加?該隊列的)。
### **asap**
內部基于 **Promise**實現( **Node**端采? **process.nextTick**),他會使?可?的最快的異步傳輸機制,如果不?持 **Promise**或 **process.nextTick**或者 **Web**
**Worker**的 **MessageChannel**也可能會調? **setTimeout**?式進?調度。
### **async**
與 **asap**?式很像,只不過內部采? **setInterval**進?調度,?多?于基于時間的操作符。
### **animationFrame**
從名字看其實相信?家已經就能略知??了,內部基于 ?**requestAnimationFrame**來實現調度,所以執?的時機將與 ?**window.requestAnimationFrame**保持?致,適?于需要頻繁渲染或操作動畫的場景。
## **Operators**
### **Operator 概 念**
采?函數式編程風格的純函數 ( **pure function**),使?像 **map**、 **filter**、 **concat**、 **flatMap**等這樣的操作符來處理集合。也正因為他的純函數定義, 所以我們可以知道調?任意的操作符時都不會改變已存在的 **Observable**實例,?是會在原有的基礎上返回?個新的 **Observable**。
盡管 **RxJS**的根基是 **Observable**,但最有?的還是它的操作符。操作符是允許復雜的異步代碼以聲明式的?式進?輕松組合的基礎代碼單元。

### **學習 RxJS 操作符**
https://rxjs-cn.github.io/learn-rxjs-operators/operators/creation/create.html