## Buffer
定期收集Observable的數據放進一個數據包裹,然后發射這些數據包裹,而不是一次發射一個值。

`Buffer`操作符將一個Observable變換為另一個,原來的Observable正常發射數據,變換產生的Observable發射這些數據的緩存集合。`Buffer`操作符在很多語言特定的實現中有很多種變體,它們在如何緩存這個問題上存在區別。
注意:如果原來的Observable發射了一個`onError`通知,`Buffer`會立即傳遞這個通知,而不是首先發射緩存的數據,即使在這之前緩存中包含了原始Observable發射的數據。
`Window`操作符與`Buffer`類似,但是它在發射之前把收集到的數據放進單獨的Observable,而不是放進一個數據結構。
在RxJava中有許多`Buffer`的變體:
### buffer(count)

`buffer(count)`以列表(List)的形式發射非重疊的緩存,每一個緩存至多包含來自原始Observable的count項數據(最后發射的列表數據可能少于count項)
* Javadoc: [buffer(int)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(int))
### buffer(count, skip)

`buffer(count,?skip)`從原始Observable的第一項數據開始創建新的緩存,此后每當收到`skip`項數據,用`count`項數據填充緩存:開頭的一項和后續的`count-1`項,它以列表(List)的形式發射緩存,取決于`count`和`skip`的值,這些緩存可能會有重疊部分(比如skip < count時),也可能會有間隙(比如skip > count時)。
* Javadoc: [buffer(int,int)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(int,%20int))
### buffer(bufferClosingSelector)

當它訂閱原來的Observable時,`buffer(bufferClosingSelector)`開始將數據收集到一個`List`,然后它調用`bufferClosingSelector`生成第二個Observable,當第二個Observable發射一個`TClosing`時,`buffer`發射當前的`List`,然后重復這個過程:開始組裝一個新的`List`,然后調用`bufferClosingSelector`創建一個新的Observable并監視它。它會一直這樣做直到原來的Observable執行完成。
* Javadoc: [buffer(Func0)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(rx.functions.Func0))
### buffer(boundary)

`buffer(boundary)`監視一個名叫`boundary`的Observable,每當這個Observable發射了一個值,它就創建一個新的`List`開始收集來自原始Observable的數據并發射原來的`List`。
* Javadoc: [buffer(Observable)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(rx.Observable))
* Javadoc: [buffer(Observable,int)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(rx.Observable,%20int))
### buffer(bufferOpenings, bufferClosingSelector)

`buffer(bufferOpenings,?bufferClosingSelector)`監視這個叫`bufferOpenings`的Observable(它發射`BufferOpening`對象),每當`bufferOpenings`發射了一個數據時,它就創建一個新的`List`開始收集原始Observable的數據,并將`bufferOpenings`傳遞給`closingSelector`函數。這個函數返回一個Observable。`buffer`監視這個Observable,當它檢測到一個來自這個Observable的數據時,就關閉`List`并且發射它自己的數據(之前的那個List)。
* Javadoc: [buffer(Observable,Func1)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(rx.Observable,%20rx.functions.Func1))
### buffer(timespan, unit[, scheduler])

`buffer(timespan,?unit)`定期以`List`的形式發射新的數據,每個時間段,收集來自原始Observable的數據(從前面一個數據包裹之后,或者如果是第一個數據包裹,從有觀察者訂閱原來的Observale之后開始)。還有另一個版本的`buffer`接受一個`Scheduler`參數,默認情況下會使用`computation`調度器。
* Javadoc: [buffer(long,TimeUnit)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(long,%20java.util.concurrent.TimeUnit))
* Javadoc: [buffer(long,TimeUnit,Scheduler)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(long,%20java.util.concurrent.TimeUnit,%20rx.Scheduler))
### buffer(timespan, unit, count[, scheduler])

每當收到來自原始Observable的count項數據,或者每過了一段指定的時間后,`buffer(timespan,?unit,?count)`就以`List`的形式發射這期間的數據,即使數據項少于count項。還有另一個版本的`buffer`接受一個`Scheduler`參數,默認情況下會使用`computation`調度器。
* Javadoc: [buffer(long,TimeUnit,int)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(long,%20java.util.concurrent.TimeUnit,%20int))
* Javadoc: [buffer(long,TimeUnit,int,Scheduler)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(long,%20java.util.concurrent.TimeUnit,%20int,%20rx.Scheduler))
### buffer(timespan, timeshift, unit[, scheduler])

`buffer(timespan,?timeshift,?unit)`在每一個`timeshift`時期內都創建一個新的`List`,然后用原始Observable發射的每一項數據填充這個列表(在把這個`List`當做自己的數據發射前,從創建時開始,直到過了`timespan`這么長的時間)。如果`timespan`長于`timeshift`,它發射的數據包將會重疊,因此可能包含重復的數據項。
還有另一個版本的`buffer`接受一個`Scheduler`參數,默認情況下會使用`computation`調度器。
* Javadoc: [buffer(long,long,TimeUnit)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(long,%20long,%20java.util.concurrent.TimeUnit))
* Javadoc: [buffer(long,long,TimeUnit,Scheduler)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(long,%20long,%20java.util.concurrent.TimeUnit,%20rx.Scheduler))
### buffer-backpressure
你可以使用`Buffer`操作符實現反壓`backpressure`(意思是,處理這樣一個Observable:它產生數據的速度可能比它的觀察者消費數據的速度快)。

Buffer操作符可以將大量的數據序列縮減為較少的數據緩存序列,讓它們更容易處理。例如,你可以按固定的時間間隔,定期關閉和發射來自一個爆發性Observable的數據緩存。這相當于一個緩沖區。
示例代碼
```java
Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
```

或者,如果你想更進一步,可以在爆發期將數據收集到緩存,然后在爆發期終止時發射這些數據,使用 [`Debounce`](Filtering-Operators#Debounce) 操作符給`buffer`操作符發射一個緩存關閉指示器(`buffer closing indicator`)可以做到這一點。
代碼示例:
```java
// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstyMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);
```
### 參見
* [DebouncedBuffer With RxJava by Gopal Kaushik](http://nerds.weddingpartyapp.com/tech/2015/01/05/debouncedbuffer-used-in-rxbus-example/)
- ReactiveX
- Observables
- Single
- Subject
- Scheduler
- Operators
- 創建操作
- Create
- Defer
- Empty/Never/Throw
- From
- Interval
- Just
- Range
- Repeat
- Start
- Timer
- 變換操作
- Buffer
- FlatMap
- GroupBy
- Map
- Scan
- Window
- 過濾操作
- Debounce
- Distinct
- ElementAt
- Filter
- First
- IgnoreElements
- Last
- Sample
- Skip
- SkipLast
- Take
- TakeLast
- 結合操作
- And/Then/When
- CombineLatest
- Join
- Merge
- StartWith
- Switch
- Zip
- 錯誤處理
- Catch
- Retry
- 輔助操作
- Delay
- Do
- Materialize/Dematerialize
- ObserveOn
- Serialize
- Subscribe
- SubscribeOn
- TimeInterval
- Timeout
- Timestamp
- Using
- To
- 條件和布爾操作
- All/Contains/Amb
- DefaultIfEmpty
- SequenceEqual
- SkipUntil/SkipWhile
- TakeUntil/TakeWhile
- 算術和聚合操作
- Average/Concat/Reduce
- Max/Min/Count/Sum
- 異步操作
- 連接操作
- Connect
- Publish
- RefCount
- Replay
- 轉換操作
- 阻塞操作
- 字符串操作
- 按字母順序的操作符列表
- RxJava文檔和教程
- RxJava入門指南
- RxJava使用示例
- 實現自定義操作符
- 自定義插件
- Backpressure
- 錯誤處理
- Android模塊
- 參與開發
- 補充閱讀材料