# 實現自己的操作符
你可以實現你自己的Observable操作符,本文展示怎么做。
如果你的操作符是被用于*創造*一個Observable,而不是變換或者響應一個Observable,使用 [`create(?)`](http://reactivex.io/documentation/operators/create.html) 方法,不要試圖手動實現 `Observable`。另外,你可以按照下面的用法說明創建一個自定義的操作符。
如果你的操作符是用于Observable發射的單獨的數據項,按照下面的說明做:[_Sequence Operators_](Implementing-Your-Own-Operators#序列操作符) 。如果你的操作符是用于變換Observable發射的整個數據序列,按照這個說明做:[_Transformational Operators_](Implementing-Your-Own-Operators#變換操作符) 。
**提示:** 在一個類似于Groovy的語言Xtend中,你可以以 _extension methods_ 的方式實現你自己的操作符 ,不使用本文的方法,它們也可以鏈式調用。詳情參見 [RxJava and Xtend](http://mnmlst-dvlpr.blogspot.de/2014/07/rxjava-and-xtend.html)
# 序列操作符
下面的例子向你展示了怎樣使用`lift(?)`操作符將你的自定義操作符(在這個例子中是 `myOperator`)與標準的RxJava操作符(如`ofType`和`map`)一起使用:
```groovy
fooObservable = barObservable.ofType(Integer).map({it*2}).lift(new MyOperator<T>()).map({"transformed by myOperator: " + it});
```
下面這部分向你展示了你的操作符的腳手架形式,以便它能正確的與`lift()`搭配使用。
## 實現你的操作符
將你的自定義操作符定義為實現了 [`Operator`](http://reactivex.io/RxJava/javadoc/rx/Observable.Operator.html) 接口的一個公開類, 就像這樣:
```java
public class MyOperator<T> implements Operator<T> {
public MyOperator( /* any necessary params here */ ) {
/* 這里添加必要的初始化代碼 */
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> s) {
return new Subscriber<t>(s) {
@Override
public void onCompleted() {
/* 這里添加你自己的onCompleted行為,或者僅僅傳遞完成通知: */
if(!s.isUnsubscribed()) {
s.onCompleted();
}
}
@Override
public void onError(Throwable t) {
/* 這里添加你自己的onError行為, 或者僅僅傳遞錯誤通知:*/
if(!s.isUnsubscribed()) {
s.onError(t);
}
}
@Override
public void onNext(T item) {
/* 這個例子對結果的每一項執行排序操作,然后返回這個結果 */
if(!s.isUnsubscribed()) {
transformedItem = myOperatorTransformOperation(item);
s.onNext(transformedItem);
}
}
};
}
}
```
# 變換操作符
下面的例子向你展示了怎樣使用 `compose(?)` 操作符將你得自定義操作符(在這個例子中,是一個名叫`myTransformer`的操作符,它將一個發射整數的Observable轉換為發射字符串的)與標準的RxJava操作符(如`ofType`和`map`)一起使用:
```groovy
fooObservable = barObservable.ofType(Integer).map({it*2}).compose(new MyTransformer<Integer,String>()).map({"transformed by myOperator: " + it});
```
下面這部分向你展示了你的操作符的腳手架形式,以便它能正確的與`compose()`搭配使用。
## 實現你的變換器
將你的自定義操作符定義為實現了 [`Transformer`](http://reactivex.io/RxJava/javadoc/rx/Observable.Transformer.html) 接口的一個公開類,就像這樣:
````java
public class MyTransformer<Integer,String> implements Transformer<Integer,String> {
public MyTransformer( /* any necessary params here */ ) {
/* 這里添加必要的初始化代碼 */
}
@Override
public Observable<String> call(Observable<Integer> source) {
/*
* 這個簡單的例子Transformer應用一個map操作,
* 這個map操作將發射整數變換為發射整數的字符串表示。
*/
return source.map( new Func1<Integer,String>() {
@Override
public String call(Integer t1) {
return String.valueOf(t1);
}
} );
}
}
````
## 參見
* [“Don’t break the chain: use RxJava’s compose() operator”](http://blog.danlew.net/2015/03/02/dont-break-the-chain/) by Dan Lew
# 其它需要考慮的
* 在發射任何數據(或者通知)給訂閱者之前,你的序列操作符可能需要檢查它的 [`Subscriber.isUnsubscribed(?)`](Observable#unsubscribing) 狀態,如果沒有訂閱者了,沒必要浪費時間生成數據項。
* 請注意:你的序列操作符必須復合Observable協議的核心原則:
* 它可能調用訂閱者的 [`onNext(?)`](Observable#onnext-oncompleted-and-onerror) 方法任意次,但是這些調用必須是不重疊的。
* 它只能調用訂閱者的 [`onCompleted(?)`](Observable#onnext-oncompleted-and-onerror) 或 [`onError(?)`](Observable#onnext-oncompleted-and-onerror) 正好一次,但不能都調用,而且不能在這之后調用訂閱者的 [`onNext(?)`](Observable#onnext-oncompleted-and-onerror) 方法。
* 如果你不能保證你得操作符遵從這兩個原則,你可以給它添加 [`serialize(?)`](Observable-Utility-Operators#serialize) 操作符,它會強制保持正確的行為。
* 請關注這里 [Issue #1962](https://github.com/ReactiveX/RxJava/issues/1962) &mdash;需要有一個計劃創建一個測試腳手架,你可以用它來寫測試驗證你的新操作符遵從了Observable協議。
* 不要讓你的操作符阻塞別的操作。
* When possible, you should compose new operators by combining existing operators, rather than implementing them with new code. RxJava itself does this with some of its standard operators, for example:
* 如果可能,你應該組合現有的操作符創建你的新操作符,而不是從零開始實現它。RxJava自身的標準操作符也是這樣做的,例如:
* [`first(?)`](http://reactivex.io/documentation/operators/first.html) 被定義為 <tt>[take(1)](http://reactivex.io/documentation/operators/take.html).[single(?)](http://reactivex.io/documentation/operators/first.html)</tt>
* [`ignoreElements(?)`](http://reactivex.io/documentation/operators/ignoreelements.html) 被定義為 <tt>[filter(alwaysFalse(?))](http://reactivex.io/documentation/operators/filter.html)</tt>
* [`reduce(a)`](http://reactivex.io/documentation/operators/reduce.html) 被定義為 <tt>[scan(a)](http://reactivex.io/documentation/operators/scan.html).[last(?)](http://reactivex.io/documentation/operators/last.html)</tt>
* 如果你的操作符使用了函數或者lambda表達式作為參數,請注意它們可能是異常的來源,而且要準備好捕獲這些異常,并且使用 `onError()` 通知訂閱者。
* 某些異常被認為是致命的,對它們來說,調用 `onError()`毫無意義,那樣或者是無用的,或者只是對問題的妥協。你可以使用 `Exceptions.throwIfFatal(throwable)` 方法過濾掉這些致命的異常,并重新拋出它們,而不是試圖發射關于它們的通知。
* 一般說來,一旦發生錯誤應立即通知訂閱者,而不是首先嘗試發射更多的數據。
* 請注意 `null` 可能是Observable發射的一個合法數據。頻繁發生錯誤的一個來源是:測試一些變量并且將持有一個非 `null` 值作為是否發射了數據的替代。一個值為 `null` 的數據仍然是一個發射數據項,它與沒有發射任何東西是不能等同的。
* 想讓你的操作符在反壓(*backpressure*)場景中變得得好可能會非常棘手。可以參考Dávid Karnok的博客 [Advanced RxJava](http://akarnokd.blogspot.hu/),這里有一個涉及到的各種因素和怎樣處理它們的很值得看的討論。
- ReactiveX
- Observables
- Single
- Subject
- Scheduler
- Operators
- 創建操作
- Create
- Defer
- Empty/Never/Throw
- From
- Interval
- Just
- Range
- Repeat
- Start
- Timer
- 變換操作
- Buffer
- FlatMap
- GroupBy
- Map
- Scan
- Window
- 過濾操作
- Debounce
- Distinct
- ElementAt
- Filter
- First
- IgnoreElements
- Last
- Sample
- Skip
- SkipLast
- Take
- TakeLast
- 結合操作
- And/Then/When
- CombineLatest
- Join
- Merge
- StartWith
- Switch
- Zip
- 錯誤處理
- Catch
- Retry
- 輔助操作
- Delay
- Do
- Materialize/Dematerialize
- ObserveOn
- Serialize
- Subscribe
- SubscribeOn
- TimeInterval
- Timeout
- Timestamp
- Using
- To
- 條件和布爾操作
- All/Contains/Amb
- DefaultIfEmpty
- SequenceEqual
- SkipUntil/SkipWhile
- TakeUntil/TakeWhile
- 算術和聚合操作
- Average/Concat/Reduce
- Max/Min/Count/Sum
- 異步操作
- 連接操作
- Connect
- Publish
- RefCount
- Replay
- 轉換操作
- 阻塞操作
- 字符串操作
- 按字母順序的操作符列表
- RxJava文檔和教程
- RxJava入門指南
- RxJava使用示例
- 實現自定義操作符
- 自定義插件
- Backpressure
- 錯誤處理
- Android模塊
- 參與開發
- 補充閱讀材料