### API 介紹和原理簡析
這個我就做不到一個詞說明了……因為這一節的主要內容就是一步步地說明 RxJava 到底怎樣做到了異步,怎樣做到了簡潔。
### 1. 概念:擴展的觀察者模式
RxJava 的異步實現,是通過一種擴展的觀察者模式來實現的。
#### 觀察者模式
先簡述一下觀察者模式,已經熟悉的可以跳過這一段。
觀察者模式面向的需求是:A 對象(觀察者)對 B 對象(被觀察者)的某種變化高度敏感,需要在 B 變化的一瞬間做出反應。舉個例子,新聞里喜聞樂見的警察抓小偷,警察需要在小偷伸手作案的時候實施抓捕。在這個例子里,警察是觀察者,小偷是被觀察者,警察需要時刻盯著小偷的一舉一動,才能保證不會漏過任何瞬間。程序的觀察者模式和這種真正的『觀察』略有不同,觀察者不需要時刻盯著被觀察者(例如 A 不需要每過 2ms 就檢查一次 B 的狀態),而是采用注冊(Register)或者稱為訂閱(Subscribe)的方式,告訴被觀察者:我需要你的某某狀態,你要在它變化的時候通知我。 Android 開發中一個比較典型的例子是點擊監聽器 *OnClickListener* 。對設置 *OnClickListener* 來說, View 是被觀察者, *OnClickListener *是觀察者,二者通過 setOnClickListener() 方法達成訂閱關系。訂閱之后用戶點擊按鈕的瞬間,Android Framework 就會將點擊事件發送給已經注冊的 OnClickListener 。采取這樣被動的觀察方式,既省去了反復檢索狀態的資源消耗,也能夠得到最高的反饋速度。當然,這也得益于我們可以隨意定制自己程序中的觀察者和被觀察者,而警察叔叔明顯無法要求小偷『你在作案的時候務必通知我』。
OnClickListener 的模式大致如下圖:

OnClickListener 觀察者模式
如圖所示,通過 setOnClickListener() 方法,Button 持有 OnClickListener 的引用(這一過程沒有在圖上畫出);當用戶點擊時,Button 自動調用 OnClickListener 的 onClick() 方法。另外,如果把這張圖中的概念抽象出來(Button -> 被觀察者、OnClickListener -> 觀察者、setOnClickListener() -> 訂閱,onClick() -> 事件),就由專用的觀察者模式(例如只用于監聽控件點擊)轉變成了通用的觀察者模式。如下圖:

通用觀察者模式
而 RxJava 作為一個工具庫,使用的就是通用形式的觀察者模式。
RxJava 的觀察者模式
RxJava 有四個基本概念:Observable (可觀察者,即被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、事件。Observable 和 Observer 通過 subscribe() 方法實現訂閱關系,從而 Observable 可以在需要的時候發出事件來通知 Observer。
與傳統觀察者模式不同, RxJava 的事件回調方法除了普通事件 onNext() (相當于 onClick() / onEvent())之外,還定義了兩個特殊的事件:onCompleted() 和 onError()。
* onCompleted(): 事件隊列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列。RxJava 規定,當不會再有新的 onNext() 發出時,需要觸發 onCompleted() 方法作為標志。
* onError(): 事件隊列異常。在事件處理過程中出異常時,onError() 會被觸發,同時隊列自動終止,不允許再有事件發出。
* 在一個正確運行的事件序列中, onCompleted() 和 onError() 有且只有一個,并且是事件序列中的最后一個。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在隊列中調用了其中一個,就不應該再調用另一個。
RxJava 的觀察者模式大致如下圖:

RxJava 的觀察者模式
### 2. 基本實現
基于以上的概念, RxJava 的基本實現主要有三點:
#### 1) 創建 Observer
Observer 即觀察者,它決定事件觸發的時候將有怎樣的行為。 RxJava 中的 Observer 接口的實現方式:
~~~
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
~~~
除了 Observer 接口之外,RxJava 還內置了一個實現了 Observer 的抽象類:Subscriber。 Subscriber 對 Observer 接口進行了一些擴展,但他們的基本使用方式是完全一樣的:
~~~
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
~~~
不僅基本使用方式一樣,實質上,在 RxJava 的 subscribe 過程中,Observer 也總是會先被轉換成一個 Subscriber 再使用。所以如果你只想使用基本功能,選擇 Observer 和 Subscriber 是完全一樣的。它們的區別對于使用者來說主要有兩點:
1. onStart(): 這是 Subscriber 增加的方法。它會在 subscribe 剛開始,而事件還未發送之前被調用,可以用于做一些準備工作,例如數據的清零或重置。這是一個可選方法,默認情況下它的實現為空。需要注意的是,如果對準備工作的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執行), onStart() 就不適用了,因為它總是在 subscribe 所發生的線程被調用,而不能指定線程。要在指定的線程來做準備工作,可以使用 doOnSubscribe() 方法,具體可以在后面的文中看到。
1. unsubscribe(): 這是 Subscriber 所實現的另一個接口 Subscription 的方法,用于取消訂閱。在這個方法被調用后,Subscriber 將不再接收事件。一般在這個方法調用前,可以使用 isUnsubscribed() 先判斷一下狀態。 unsubscribe() 這個方法很重要,因為在 subscribe() 之后, Observable 會持有 Subscriber 的引用,這個引用如果不能及時被釋放,將有內存泄露的風險。所以最好保持一個原則:要在不再使用的時候盡快在合適的地方(例如 onPause() onStop() 等方法中)調用 unsubscribe() 來解除引用關系,以避免內存泄露的發生。
#### 2) 創建 Observable
Observable 即被觀察者,它決定什么時候觸發事件以及觸發怎樣的事件。 RxJava 使用 create() 方法來創建一個 Observable ,并為它定義事件觸發規則:
~~~
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
~~~
可以看到,這里傳入了一個 OnSubscribe 對象作為參數。OnSubscribe 會被存儲在返回的 Observable 對象中,它的作用相當于一個計劃表,當 Observable 被訂閱的時候,OnSubscribe 的 call() 方法會自動被調用,事件序列就會依照設定依次觸發(對于上面的代碼,就是觀察者Subscriber 將會被調用三次 onNext() 和一次 onCompleted())。這樣,由被觀察者調用了觀察者的回調方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式。
> *這個例子很簡單:事件的內容是字符串,而不是一些復雜的對象;事件的內容是已經定好了的,而不像有的觀察者模式一樣是待確定的(例如網絡請求的結果在請求返回之前是未知的);所有事件在一瞬間被全部發送出去,而不是夾雜一些確定或不確定的時間間隔或者經過某種觸發器來觸發的。總之,這個例子看起來毫無實用價值。但這是為了便于說明,實質上只要你想,各種各樣的事件發送規則你都可以自己來寫。至于具體怎么做,后面都會講到,但現在不行。只有把基礎原理先說明白了,上層的運用才能更容易說清楚。*
create() 方法是 RxJava 最基本的創造事件序列的方法。基于這個方法, RxJava 還提供了一些方法用來快捷創建事件隊列,例如:
* just(T...): 將傳入的參數依次發送出來。
~~~
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 將會依次調用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
~~~
* from(T[]) / from(Iterable<? extends T>) : 將傳入的數組或 Iterable 拆分成具體對象后,依次發送出來。
~~~
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 將會依次調用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
~~~
上面 just(T...) 的例子和 from(T[]) 的例子,都和之前的 create(OnSubscribe) 的例子是等價的。
#### 3) Subscribe (訂閱)
創建了 Observable 和 Observer 之后,再用 subscribe() 方法將它們聯結起來,整條鏈子就可以工作了。代碼形式很簡單:
~~~
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
~~~
> *有人可能會注意到, subscribe() 這個方法有點怪:它看起來是『observalbe 訂閱了 observer / subscriber』而不是『observer / subscriber 訂閱了 observalbe』,這看起來就像『雜志訂閱了讀者』一樣顛倒了對象關系。這讓人讀起來有點別扭,不過如果把 API 設計成 observer.subscribe(observable) / subscriber.subscribe(observable) ,雖然更加符合思維邏輯,但對流式 API 的設計就造成影響了,比較起來明顯是得不償失的。*
Observable.subscribe(Subscriber) 的內部實現是這樣的(僅核心代碼):
~~~
// 注意:這不是 subscribe() 的源碼,而是將源碼中與性能、兼容性、擴展性有關的代碼剔除后的核心代碼。
// 如果需要看源碼,可以去 RxJava 的 GitHub 倉庫下載。
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
~~~
可以看到,subscriber() 做了3件事:
1. 調用 Subscriber.onStart() 。這個方法在前面已經介紹過,是一個可選的準備方法。
2. 調用 Observable 中的 OnSubscribe.call(Subscriber) 。在這里,事件發送的邏輯開始運行。從這也可以看出,在 RxJava 中, Observable 并不是在創建的時候就立即開始發送事件,而是在它被訂閱的時候,即當 subscribe() 方法執行的時候。
3. 將傳入的 Subscriber 作為 Subscription 返回。這是為了方便 unsubscribe().
整個過程中對象間的關系如下圖:
關系靜圖

或者可以看動圖:

除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 還支持不完整定義的回調,RxJava 會自動根據定義創建出 Subscriber 。形式如下:
~~~
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
// 自動創建 Subscriber ,并使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動創建 Subscriber ,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動創建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
~~~
簡單解釋一下這段代碼中出現的 Action1 和 Action0。 Action0 是 RxJava 的一個接口,它只有一個方法 call(),這個方法是無參無返回值的;由于 onCompleted() 方法也是無參無返回值的,因此 Action0 可以被當成一個包裝對象,將 onCompleted() 的內容打包起來將自己作為一個參數傳入 subscribe() 以實現不完整定義的回調。這樣其實也可以看做將 onCompleted() 方法作為參數傳進了 subscribe(),相當于其他某些語言中的『閉包』。 Action1 也是一個接口,它同樣只有一個方法 call(T param),這個方法也無返回值,但有一個參數;與 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是單參數無返回值的,因此 Action1 可以將 onNext(obj) 和 onError(error) 打包起來傳入 subscribe() 以實現不完整定義的回調。事實上,雖然 Action0 和 Action1 在 API 中使用最廣泛,但 RxJava 是提供了多個 ActionX 形式的接口 (例如 Action2, Action3) 的,它們可以被用以包裝不同的無返回值的方法。
> 注:正如前面所提到的,Observer 和 Subscriber 具有相同的角色,而且 Observer 在 subscribe() 過程中最終會被轉換成 Subscriber 對象,因此,從這里開始,后面的描述我將用 Subscriber 來代替 Observer ,這樣更加嚴謹。
#### 4) 場景示例
下面舉兩個例子:
> 為了把原理用更清晰的方式表述出來,本文中挑選的都是功能盡可能簡單的例子,以至于有些示例代碼看起來會有『畫蛇添足』『明明不用 RxJava 可以更簡便地解決問題』的感覺。當你看到這種情況,不要覺得是因為 RxJava 太啰嗦,而是因為在過早的時候舉出真實場景的例子并不利于原理的解析,因此我刻意挑選了簡單的情景。
a. 打印字符串數組
將字符串數組 names 中的所有字符串依次打印出來:
~~~
String[] names = ...;
Observable.from(names)
.subscribe(new Action1<String>() {
@Override
public void call(String name) {
Log.d(tag, name);
}
});
~~~
b. 由 id 取得圖片并顯示
由指定的一個 drawable 文件 id drawableRes 取得圖片,并顯示在 ImageView 中,并在出現異常的時候打印 Toast 報錯:
~~~
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
}).subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});
~~~
正如上面兩個例子這樣,創建出 Observable 和 Subscriber ,再用 subscribe() 將它們串起來,一次 RxJava 的基本使用就完成了。非常簡單。
然而,
這并沒有什么diao用
在 RxJava 的默認規則中,事件的發出和消費都是在同一個線程的。也就是說,如果只用上面的方法,實現出來的只是一個同步的觀察者模式。觀察者模式本身的目的就是『后臺處理,前臺回調』的異步機制,因此異步對于 RxJava 是至關重要的。而要實現異步,則需要用到 RxJava 的另一個概念: Scheduler 。
### 3. 線程控制 —— Scheduler (一)
在不指定線程的情況下, RxJava 遵循的是線程不變的原則,即:在哪個線程調用 subscribe(),就在哪個線程生產事件;在哪個線程生產事件,就在哪個線程消費事件。如果需要切換線程,就需要用到 Scheduler (調度器)。
#### 1) Scheduler 的 API (一)
在RxJava 中,Scheduler ——調度器,相當于線程控制器,RxJava 通過它來指定每一段代碼應該運行在什么樣的線程。RxJava 已經內置了幾個 Scheduler ,它們已經適合大多數的使用場景:
* Schedulers.immediate(): 直接在當前線程運行,相當于不指定線程。這是默認的 Scheduler。
* Schedulers.newThread(): 總是啟用新線程,并在新線程執行操作。
* Schedulers.io(): I/O 操作(讀寫文件、讀寫數據庫、網絡信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多,區別在于 io() 的內部實現是是用一個無數量上限的線程池,可以重用空閑的線程,因此多數情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免創建不必要的線程。
* Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
* 另外, Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操作將在 Android 主線程運行。
有了這幾個 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 兩個方法來對線程進行控制了。 * subscribeOn(): 指定 subscribe() 所發生的線程,即 Observable.OnSubscribe 被激活時所處的線程。或者叫做事件產生的線程。 * observeOn(): 指定 Subscriber 所運行在的線程。或者叫做事件消費的線程。
文字敘述總歸難理解,上代碼:
~~~
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});
~~~
上面這段代碼中,由于 subscribeOn(Schedulers.io()) 的指定,被創建的事件的內容 1、2、3、4 將會在 IO 線程發出;而由于 observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber 數字的打印將發生在主線程 。事實上,這種在 subscribe() 之前寫上兩句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常見,它適用于多數的 『后臺線程取數據,主線程顯示』的程序策略。
而前面提到的由圖片 id 取得圖片并顯示的例子,如果也加上這兩句:
~~~
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程
.subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});
~~~
那么,加載圖片將會發生在 IO 線程,而設置圖片則被設定在了主線程。這就意味著,即使加載圖片耗費了幾十甚至幾百毫秒的時間,也不會造成絲毫界面的卡頓。
#### 2) Scheduler 的原理 (一)
RxJava 的 Scheduler API 很方便,也很神奇(加了一句話就把線程切換了,怎么做到的?而且 subscribe() 不是最外層直接調用的方法嗎,它竟然也能被指定線程?)。然而 Scheduler 的原理需要放在后面講,因為它的原理是以下一節《變換》的原理作為基礎的。
好吧這一節其實我屁也沒說,只是為了讓你安心,讓你知道我不是忘了講原理,而是把它放在了更合適的地方。
### 4. 變換
終于要到牛逼的地方了,不管你激動不激動,反正我是激動了。
RxJava 提供了對事件序列進行變換的支持,這是它的核心功能之一,也是大多數人說『RxJava 真是太好用了』的最大原因。**所謂變換,就是將事件序列中的對象或整個序列進行加工處理,轉換成不同的事件或事件序列。**概念說著總是模糊難懂的,來看 API。
#### 1) API
首先看一個 map() 的例子:
~~~
Observable.just("images/logo.png") // 輸入類型 String
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 參數類型 String
return getBitmapFromPath(filePath); // 返回類型 Bitmap
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 參數類型 Bitmap
showBitmap(bitmap);
}
});
~~~
這里出現了一個叫做 Func1 的類。它和 Action1 非常相似,也是 RxJava 的一個接口,用于包裝含有一個參數的方法。 Func1 和 Action 的區別在于, Func1 包裝的是有返回值的方法。另外,和 ActionX 一樣, FuncX 也有多個,用于不同參數個數的方法。FuncX 和 ActionX 的區別在 FuncX 包裝的是有返回值的方法。
可以看到,map() 方法將參數中的 String 對象轉換成一個 Bitmap 對象后返回,而在經過 map() 方法后,事件的參數類型也由 String 轉為了 Bitmap。這種直接變換對象并返回的,是最常見的也最容易理解的變換。不過 RxJava 的變換遠不止這樣,它不僅可以針對事件對象,還可以針對整個事件隊列,這使得 RxJava 變得非常靈活。我列舉幾個常用的變換:
* map(): 事件對象的直接變換,具體功能上面已經介紹過。它是 RxJava 最常用的變換。 map() 的示意圖:

map() 示意圖
* flatMap(): 這是一個很有用但非常難理解的變換,因此我決定花多些篇幅來介紹它。 首先假設這么一種需求:假設有一個數據結構『學生』,現在需要打印出一組學生的名字。實現方式很簡單:
~~~
Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String name) {
Log.d(tag, name);
}
...
};
Observable.from(students)
.map(new Func1<Student, String>() {
@Override
public String call(Student student) {
return student.getName();
}
})
.subscribe(subscriber);
~~~
很簡單。那么再假設:如果要打印出每個學生所需要修的所有課程的名稱呢?(需求的區別在于,每個學生只有一個名字,但卻有多個課程。)首先可以這樣實現:
~~~
Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() {
@Override
public void onNext(Student student) {
List<Course> courses = student.getCourses();
for (int i = 0; i < courses.size(); i++) {
Course course = courses.get(i);
Log.d(tag, course.getName());
}
}
...
};
Observable.from(students)
.subscribe(subscriber);
~~~
依然很簡單。那么如果我不想在 Subscriber 中使用 for 循環,而是希望 Subscriber 中直接傳入單個的 Course 對象呢(這對于代碼復用很重要)?用 map() 顯然是不行的,因為 map() 是一對一的轉化,而我現在的要求是一對多的轉化。那怎么才能把一個 Student 轉化成多個 Course 呢?
這個時候,就需要用 flatMap() 了:
~~~
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
~~~
從上面的代碼可以看出, flatMap() 和 map() 有一個相同點:它也是把傳入的參數轉化之后返回另一個對象。但需要注意,和 map() 不同的是, flatMap() 中返回的是個 Observable 對象,并且這個 Observable 對象并不是被直接發送到了 Subscriber 的回調方法中。 flatMap() 的原理是這樣的:1. 使用傳入的事件對象創建一個 Observable 對象;2. 并不發送這個 Observable, 而是將它激活,于是它開始發送事件;3. 每一個創建出來的 Observable 發送的事件,都被匯入同一個 Observable ,而這個 Observable 負責將這些事件統一交給 Subscriber 的回調方法。這三個步驟,把事件拆成了兩級,通過一組新創建的 Observable 將初始的對象『鋪平』之后通過統一路徑分發了下去。而這個『鋪平』就是 flatMap() 所謂的 flat。
flatMap() 示意圖:

flatMap() 示意圖
擴展:由于可以在嵌套的 Observable 中添加異步代碼, flatMap() 也常用于嵌套的異步操作,例如嵌套的網絡請求。示例代碼(Retrofit + RxJava):
~~~
networkClient.token() // 返回 Observable<String>,在訂閱時請求 token,并在響應后發送 token
.flatMap(new Func1<String, Observable<Messages>>() {
@Override
public Observable<Messages> call(String token) {
// 返回 Observable<Messages>,在訂閱時請求消息列表,并在響應后發送請求到的消息列表
return networkClient.messages();
}
})
.subscribe(new Action1<Messages>() {
@Override
public void call(Messages messages) {
// 處理顯示消息列表
showMessages(messages);
}
});
~~~
傳統的嵌套請求需要使用嵌套的 Callback 來實現。而通過 flatMap() ,可以把嵌套的請求寫在一條鏈中,從而保持程序邏輯的清晰。
* throttleFirst(): 在每次事件觸發后的一定時間間隔內丟棄新的事件。常用作去抖動過濾,例如按鈕的點擊監聽器: RxView.clickEvents(button) // RxBinding 代碼,后面的文章有解釋 .throttleFirst(500, TimeUnit.MILLISECONDS) // 設置防抖間隔為 500ms .subscribe(subscriber); 媽媽再也不怕我的用戶手抖點開兩個重復的界面啦。
此外, RxJava 還提供很多便捷的方法來實現事件序列的變換,這里就不一一舉例了。
#### 2) 變換的原理:lift()
這些變換雖然功能各有不同,但實質上都是針對事件序列的處理和再發送。而在 RxJava 的內部,它們是基于同一個基礎的變換方法: lift(Operator)。首先看一下 lift() 的內部實現(僅核心代碼):
~~~
// 注意:這不是 lift() 的源碼,而是將源碼中與性能、兼容性、擴展性有關的代碼剔除后的核心代碼。
// 如果需要看源碼,可以去 RxJava 的 GitHub 倉庫下載。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber);
}
});
}
~~~
這段代碼很有意思:它生成了一個新的 Observable 并返回,而且創建新 Observable 所用的參數 OnSubscribe 的回調方法 call() 中的實現竟然看起來和前面講過的 Observable.subscribe() 一樣!然而它們并不一樣喲~不一樣的地方關鍵就在于第二行 onSubscribe.call(subscriber) 中的 onSubscribe **所指代的對象不同**(高能預警:接下來的幾句話可能會導致身體的嚴重不適)——
* subscribe() 中這句話的 onSubscribe 指的是 Observable 中的 onSubscribe 對象,這個沒有問題,但是 lift() 之后的情況就復雜了點。
* 當含有 lift() 時:
1.lift() 創建了一個 Observable 后,加上之前的原始 Observable,已經有兩個 Observable 了;
2.而同樣地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了兩個 OnSubscribe;
3.當用戶調用經過 lift() 后的 Observable 的 subscribe() 的時候,使用的是 lift() 所返回的新的 Observable ,于是它所觸發的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那個 OnSubscribe;
4.而這個新 OnSubscribe 的 call() 方法中的 onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在這個 call() 方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一個新的 Subscriber(Operator 就是在這里,通過自己的 call() 方法將新 Subscriber 和原始 Subscriber 進行關聯,并插入自己的『變換』代碼以實現變換),然后利用這個新 Subscriber 向原始 Observable 進行訂閱。
這樣就實現了 lift() 過程,有點像**一種代理機制,通過事件攔截和處理實現事件序列的變換。**
精簡掉細節的話,也可以這么說:在 Observable 執行了 lift(Operator) 方法之后,會返回一個新的 Observable,這個新的 Observable 會像一個代理一樣,負責接收原始的 Observable 發出的事件,并在處理后發送給 Subscriber。
如果你更喜歡具象思維,可以看圖:

lift() 原理圖
或者可以看動圖:

lift 原理動圖
兩次和多次的 lift() 同理,如下圖:

兩次 lift
舉一個具體的 Operator 的實現。下面這是一個將事件中的 Integer 對象轉換成 String 的例子,僅供參考:
~~~
observable.lift(new Observable.Operator<String, Integer>() {
@Override
public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
// 將事件序列中的 Integer 對象轉換為 String 對象
return new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
subscriber.onNext("" + integer);
}
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {

subscribeOn() 原理
observeOn() 原理圖:

observeOn() 原理
從圖中可以看出,subscribeOn() 和 observeOn() 都做了線程切換的工作(圖中的 "schedule..." 部位)。不同的是, subscribeOn() 的線程切換發生在 OnSubscribe 中,即在它通知上一級 OnSubscribe 時,這時事件還沒有開始發送,因此 subscribeOn() 的線程控制可以從事件發出的開端就造成影響;而 observeOn() 的線程切換則發生在它內建的 Subscriber 中,即發生在它即將給下一級 Subscriber 發送事件時,因此 observeOn() 控制的是它后面的線程。
最后,我用一張圖來解釋當多個 subscribeOn() 和 observeOn() 混合使用時,線程調度是怎么發生的(由于圖中對象較多,相對于上面的圖對結構做了一些簡化調整):

線程控制綜合調用
圖中共有 5 處含有對事件的操作。由圖中可以看出,①和②兩處受第一個 subscribeOn() 影響,運行在紅色線程;③和④處受第一個 observeOn() 的影響,運行在綠色線程;⑤處受第二個 onserveOn() 影響,運行在紫色線程;而第二個 subscribeOn() ,由于在通知過程中線程就被第一個 subscribeOn() 截斷,因此對整個流程并沒有任何影響。這里也就回答了前面的問題:當使用了多個 subscribeOn() 的時候,只有第一個 subscribeOn() 起作用。
#### 3) 延伸:doOnSubscribe()
然而,雖然超過一個的 subscribeOn() 對事件處理的流程沒有影響,但在流程之前卻是可以利用的。
在前面講 Subscriber 的時候,提到過 Subscriber 的 onStart() 可以用作流程開始前的初始化。然而 onStart() 由于在 subscribe() 發生時就被調用了,因此不能指定線程,而是只能執行在 subscribe() 被調用時的線程。這就導致如果 onStart() 中含有對線程有要求的代碼(例如在界面上顯示一個 ProgressBar,這必須在主線程執行),將會有線程非法的風險,因為有時你無法預測 subscribe() 將會在什么線程執行。
而與 Subscriber.onStart() 相對應的,有一個方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同樣是在 subscribe() 調用后而且在事件發送前執行,但區別在于它可以指定線程。默認情況下, doOnSubscribe() 執行在 subscribe() 發生的線程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的話,它將執行在離它最近的 subscribeOn() 所指定的線程。
示例代碼:
~~~
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE); // 需要在主線程執行
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定主線程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
~~~
如上,在 doOnSubscribe()的后面跟一個 subscribeOn() ,就能指定準備工作的線程了。