# rxjs中breakpressure(背壓)概念
當涉及到數據流時,流可能發送太快及至于訂閱者跟不上其速度。為此,我們需要一些機制來控制發送源,使訂閱者不至于被大量數據淹沒。這些機制可以根據需要采用有損或者無損操作形式來處理背壓。例如,如果你錯過了幾次鼠標點擊,這沒有什么大礙,但是,如果你錯過了幾次銀行交易,這是一個很明確的問題。
例如,想象一下使用`zip`操作符將2個無限大的Observable壓縮在一起,其中一個Observable的發送速度是另外一個的2倍。`zip`操作符內部處理這種情形的機制是采用一個擴充緩沖區來存儲發送較快的Observable,待較慢的Observable發出消息時再取出緩存的內容并將它們最終合并在一起。這將導致RXJS占用大量系統資源。
## Hot and Cold Observables and Multicast
Cold Observable會發出特定元素序列值(例如:時間序列,數組元素),一旦有觀察者開始訂閱時,就開始發送其數據,在發送過程中RxJS會保證該序列值的完整性。例如,將類似于Array、Map、Set或者生成器(generator)這樣的可迭代數據轉換成Observable,無論是第一次訂閱還是后面反復訂閱都會發出相同的值。Cold Observable發出的值來源可能包括數據庫查詢、文件檢索或者Web請求結果等。
Hot Observable的特點是一旦被創建就會立即發射數據流。訂閱者通常會從序列的中間(處于數據開始發射和結束之間)位置開始觀察發出的Observable,并且在第一次發射數據后開始建立起subscription。這樣一個Observable在在發射數據時擁有自己的獨立空間,并由觀察者維護其狀態。Hot Observable的例子可能包括鼠標&鍵盤事件、系統事件或者股票價格波動。
當一個Cold Observable是多播的(當它被轉換成一個`ConnectableObservable`對象,并且它的`connect`方法被調用),它實際上已經變**hot**了,即出于背壓和流量控制的目的,它應被視為一個Hot Observable。
在處理背壓問題上,Cold Observable是一個**拉取**(pull)模型的理想主體。Hot Observable通常不是用來處理拉取取問題,而是作為本文討論的流程控制策略的最佳選擇,比如使用`pausableBuffered`和`pausable`操作符,通過節流(throttling)、緩存(buffers)或者窗口(windows)方式。
> 譯注:這里的"windows",是指RxJS中的一種數據控制方式,和`buffer`相似,不同之處在于它發送的是嵌套的Observable而不是數組形式。
## 有損背壓(Lossy Backpressure)
有很多方法可以控制可觀察序列,從而讓消費者不至于因有損操作而失去方向,這也意味著數據包可能在暫停(pause)和恢復(resume)操作切換期間丟棄。
### 去抖動(Debounce)
應對有損背壓問題的第一種方法稱為**去抖動**,只有在一個特定的時間段通過并且當前Observable沒有再次發射數據時,才會從源Observable獲取發射的數據流。這在一些場景下很有用,例如用戶輸入太快,我們并不希望在每次用戶觸發按鍵后立即開始執行,而是等待半秒鐘,直到用戶沒有輸入后開始執行。
```javascript
var debounced = Rx.Observable.fromEvent(input, 'keyup')
.map(function (e) { return e.target.value; })
.debounce(500 /* ms */);
debounced.subscribeOnNext(function (value) {
console.log('Input value: %s', value);
});
```
### 節流(Throttling)
應對背壓問題的另一種技術是通過使用周期性時間間隔在Observable發出第一條數據后使用`throttle`方法節流。這種節流方式特別適用于像窗口改變大小和滾動等觸發頻率很快的事件處理。
```javascript
var throttled = Rx.Observable.fromEvent(window, 'resize')
.throttle(250 /* ms */);
throttled.subscribeOnNext(function (e) {
console.log('Window inner height: %d', window.innerHeight);
console.log('Window inner width: %d', window.innerWidth);
});
```
### 采樣(Sampling) Observables
您也可以在一定的間隔內使用`sample`方法從觀察序列中抽取值,而不需要消耗整個可觀察序列。
```javascript
var sampled = getStockData()
.sample(5000 /* ms */);
sampled.subscribeOnNext(function (data) {
console.log('Stock data: %o', data);
});
```
### 可暫停(Pausable) Observables
暫停和恢復的能力也是RxJS在有損和無損版本中提供的強大概念。 在有損背壓的情況下,`pausable`操作符可以在可觀察序列分別調用`pause`和`resume`之后停止或者恢復監聽。例如我們可以獲取一些可觀察序列并調用`pausable`方法,然后調用`pause`暫停序列并在5秒鐘內恢復。這里需要注意的是,在暫停和恢復期間產生的數據都將丟失。這種情況只適用于Hot Observable,并不適合Code Observable,原因在于在恢復后它們將重新啟動。
```javascript
var pausable = getSomeObservableSource()
.pausable();
pausable.subscribeOnNext(function (data) {
console.log('Data: %o', data);
});
pausable.pause();
// Resume in five seconds
setTimeout(function () {
pausable.resume();
}, 5000);
```
> 注:rxjs 5.*版本中并沒有`pausable`, `pause`和`resume`操作符
## 無損背壓(Loss-less Backpressure)
除了支持有損的背壓機制,RxJS還支持以數據獲取方式,使其能夠以消費者自己的速度完全消費。 在工作中有許多策略,包括使用與時間盤,計數或兩者兼容的緩沖區,可暫停緩沖區,反應拉動等。
### 緩沖區(Buffers) and Windows
處理背壓問題第一個策略是使用緩沖區,這允許消費者設置他們希望等待的時間、項目數量,或兩者,以較先獲得者為準。這在很多情況下是很有用的,例如您希望將窗口中的某些數據出于比較的目的,對數據根據需要進行分塊。
`bufferWithCount`方法允許我們在將消息傳遞給緩沖區數組之前指定要捕獲的項目數。 這是不切實際而有趣的用法是計算用戶是否輸入了Konami代碼。
> 注:在rxjs 5.*中為`bufferCount`
```javascript
var codes = [
38, // up
38, // up
40, // down
40, // down
37, // left
39, // right
37, // left
39, // right
66, // b
65 // a
];
function isKonamiCode(buffer) {
return codes.toString() === buffer.toString();
}
var keys = Rx.Observable.fromEvent(document, 'keyup')
.map(function (e) { return e.keyCode; })
.bufferWithCount(10, 1)
.filter(isKonamiCode)
.subscribeOnNext(function () {
console.log('KONAMI!');
});
```
另一方面,您也可以使用`bufferWithTime`在緩沖區內獲取一段給定時間的數據。 這是有用的,例如,如果您正在跟蹤網絡中的數據量,然后可以統一處理。
> 注:rxjs 5.* 中為 `bufferTime`
```javascript
var source = getStockData()
.bufferWithTime(5000, 1000) // time in milliseconds
.subscribeOnNext(function (data) {
data.forEach(function (d) {
console.log('Stock: %o', d);
});
});
```
為了避免緩沖區填充太快,有一種方法可以通過指定計數和時間間隔的天花板來限制緩沖區,以先到者為準。 例如,網絡可能特別快,數據在指定的時間內,其他時間不是,所以為了保持數據級別,您可以通過`bufferWithTimeOrCount`指定這個閾值
```javascript
var source = getStockData()
.bufferWithTimeOrCount(5000 /* ms */, 100 /* items */)
.subscribeOnNext(function (data) {
data.forEach(function (d) {
console.log('Stock: %o', d);
});
});
```
### 可暫停緩沖區(Pausable Buffers)
`pausable`方法在處理熱(hot)可觀察對象時很有用,在這里你可以在丟棄數據時暫停和恢復,但您可能希望在此期間保留數據。為此,我們引入`pausableBuffered`方法,它在`pause`調用之前保持一個運行緩沖區, 當`resume`被調用時,它被回收。然后由開發人員決定何時暫停和恢復,同時也不會丟失任何數據。
```javascript
var source = getStockData()
.pausableBuffered();
source.subscribeOnNext(function (stock) {
console.log('Stock data: %o', stock);
});
source.pause();
// Resume after five seconds
setTimeout(function () {
// Drains the buffer and subscribeOnNext is called with the data
source.resume();
}, 5000);
```
### 受控可觀察對象(Controlled Observables)
在更高級的情況下,您可能希望控制在給定時間收到的項目的絕對數量,其余的通過`controlled`方法進行緩沖。 例如,您可以拉10個項目,其次是20個項目,由開發人員決定。 這更加符合“活動流”努力有效地將推送流轉化為推/拉流的努力。
```javascript
var source = getStockData()
.controlled();
source.subscribeOnNext(function (stock) {
console.log('Stock data: %o', stock);
});
source.request(2);
// Keep getting more after 5 seconds
setInterval(function () {
source.request(2);
}, 5000);
```
## 未來計劃(Future Work)
這當然只是背壓工作的開始,因為還有很多其他可以考慮的策略。 在未來版本的RxJS中,受控觀察者的想法將被烘焙到訂閱本身中,然后允許背壓成為合同的重要部分或者要求n個項目。
---
## 譯注
由于本人英文只有4級水平,所以全文借助Google進行翻譯,有能力者還是建議直接看[原文](https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/backpressure.md),同時需要注意的是這里所有的示例都是基于Rx.js 4以前的版本。雖然內容有些過時,但我們可以感受到RxJS作者們考慮得真全面,集成了大部分業務場景需要解決的數據問題。
- 說明
- angular 1.x
- ngModelController
- ngOptions
- ngModelOptions
- lifecycle
- directive
- angular 2
- @angular/forms
- 類
- AbstractControl
- AbstractControlDirective
- AbstractFormGroupDirective
- FormControl
- FormArray
- FormBuilder
- FormGroup
- NgControl
- 接口
- controlValueAccessor
- 指令
- DefaultValueAccessor
- Angular 2 生命周期
- OnInit
- DoCheck
- @angular/router
- 配置
- Routes
- 指令
- RouterOutlet
- RouterLink
- 接口
- ActivatedRoute
- UrlTree
- NavigationExtras
- ActivatedRouteSnapshot
- RouterStateSnapshot
- 類
- UrlSegment
- UrlSegmentGroup
- UrlSerializer
- DefaultUrlSerializer
- Router
- bug記得
- @angular/http
- 類
- Http
- Body
- Response
- ResponseOptions
- Header
- Request
- RequestOptions
- URLSearchParams
- @angular/core
- decorator
- Component-decorator
- animation
- DI
- linker
- TemplateRef
- ElementRef
- EmbeddedViewRef
- ViewRef
- ViewContainerRef
- Query
- ComponentFactory
- ComponentRef
- Renderer
- change_detection
- KeyValueDiffers
- IterableDiffers
- ChangeDetectorRef
- ChangeDetectionStrategy
- Zone
- ngZone
- @angular/common
- 指令
- NgTemplateOutlet
- QueryList
- bootstrap4
- card
- form
- 重點關注博客
- 學習過的文章
- 筆記
- Angular 2 雙向綁定
- 將字符串解析成DOM
- rx相關
- operators
- combineLatest
- combineAll
- concat(All, Map, *MapTo)
- 背壓(backpressure)
- js事件keycode對應表
- 裝飾器
- 有用的代碼摘錄
- 日期操作
- 數量操作
- 字符操作
- rxjs問題
- 小示例
- h5面試準備
- react
- 開發遇到的問題