## 為什么要學 RxJava?
提升開發效率,降低維護成本一直是開發團隊永恒不變的宗旨。近兩年來國內的技術圈子中越來越多的開始提及 [RxJava](https://github.com/ReactiveX/RxJava) ,越來越多的應用和面試中都會有 [RxJava](https://github.com/ReactiveX/RxJava) ,而就目前的情況,Android 的網絡庫基本被 [Retrofit](https://github.com/square/retrofit) + [OkHttp](https://github.com/square/okhttp) 一統天下了,而配合上**響應式編程** [RxJava](https://github.com/ReactiveX/RxJava) 可謂如魚得水。想必大家肯定被近期的 [Kotlin](https://github.com/JetBrains/kotlin) 炸開了鍋,筆者也在閑暇之時去了解了一番(作為一個與時俱進的有理想的青年怎么可能不與時俱進?),發現其中有個非常好的優點就是簡潔,支持函數式編程。是的, RxJava 最大的優點也是簡潔,但它不止是簡潔,而且是 **隨著程序邏輯變得越來越復雜,它依然能夠保持簡潔** (這貨潔身自好呀有木有)。

咳咳,要例子,猛戳這里:[給 Android 開發者的 RxJava 詳解](http://gank.io/post/560e15be2dca930e00da1083#toc_2)
## 什么是響應式編程
上面我們提及了**響應式編程**,不少新司機對它可謂一臉懵逼,那什么是**響應式編程**呢?**響應式編程**是一種基于異步數據流概念的編程模式。數據流就像一條河:它可以被觀測,被過濾,被操作,或者為新的消費者與另外一條流合并為一條新的流。
響應式編程的一個關鍵概念是事件。事件可以被等待,可以觸發過程,也可以觸發其它事件。事件是唯一的以合適的方式將我們的現實世界映射到我們的軟件中:如果屋里太熱了我們就打開一扇窗戶。同樣的,當我們的天氣app從服務端獲取到新的天氣數據后,我們需要更新app上展示天氣信息的UI;汽車上的車道偏移系統探測到車輛偏移了正常路線就會提醒駕駛者糾正,就是是響應事件。
今天,響應式編程最通用的一個場景是UI:我們的移動App必須做出對網絡調用、用戶觸摸輸入和系統彈框的響應。在這個世界上,軟件之所以是事件驅動并響應的是因為現實生活也是如此。
## 為什么出了一個系列后還有完結版?
[RxJava](https://github.com/ReactiveX/RxJava) 這些年可謂越來越流行,而在去年的晚些時候發布了2.0正式版。大半年已過,雖然網上已經出現了大部分的 RxJava 教程(其實細心的你還是會發現 1.x 的超級多),前些日子,筆者花了大約兩周的閑暇之時寫了 RxJava 2.x 系列教程,也得到了不少反饋,其中就有不少讀者覺得每一篇的教程太短,抑或是希望更多的側重適用場景的介紹,在這樣的大前提下,這篇完結版教程就此誕生,僅供各位新司機采納。

## 開始
RxJava 2.x 已經按照 Reactive-Streams specification 規范完全的重寫了,maven也被放在了`io.reactivex.rxjava2:rxjava:2.x.y` 下,所以 RxJava 2.x 獨立于 RxJava 1.x 而存在,而隨后官方宣布的將在一段時間后終止對 RxJava 1.x 的維護,所以對于熟悉 RxJava 1.x 的老司機自然可以直接看一下 [2.x 的文檔](http://reactivex.io/RxJava/2.x/javadoc/)和異同就能輕松上手了,而對于不熟悉的年輕司機,不要慌,本醬帶你裝逼帶你飛,馬上就發車,坐穩了:[https://github.com/nanchen2251/RxJava2Examples](https://github.com/nanchen2251/RxJava2Examples)
你只需要在 build.gradle 中加上:`compile 'io.reactivex.rxjava2:rxjava:2.1.1'`(2.1.1為寫此文章時的最新版本)

## 接口變化
RxJava 2.x 擁有了新的特性,其依賴于4個基礎接口,它們分別是
* Publisher
* Subscriber
* Subscription
* Processor
其中最核心的莫過于 `Publisher` 和 `Subscriber`。`Publisher` 可以發出一系列的事件,而 `Subscriber` 負責和處理這些事件。
其中用的比較多的自然是 `Publisher` 的 `Flowable`,它支持背壓。關于背壓給個簡潔的定義就是:
> 背壓是指在異步場景中,被觀察者發送事件速度遠快于觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發送速度的策略。
簡而言之,**背壓是流速控制的一種策略**。有興趣的可以看一下[官方對于背壓的講解](https://github.com/ReactiveX/RxJava/wiki/Backpressure)。
可以明顯地發現,RxJava 2.x 最大的改動就是對于 `backpressure` 的處理,為此將原來的 `Observable` 拆分成了新的 `Observable` 和 `Flowable`,同時其他相關部分也同時進行了拆分,但令人慶幸的是,是它,是它,還是它,還是我們最熟悉和最喜歡的 RxJava。
## 觀察者模式
大家可能都知道, **RxJava 以觀察者模式為骨架,在 2.0 中依舊如此**。
不過此次更新中,出現了兩種觀察者模式:
* Observable ( 被觀察者 ) / Observer ( 觀察者 )
* Flowable (被觀察者)/ Subscriber (觀察者)

在 RxJava 2.x 中,Observable 用于訂閱 Observer,不再支持背壓(1.x 中可以使用背壓策略),而 Flowable 用于訂閱 Subscriber , 是支持背壓(Backpressure)的。
## Observable
在 RxJava 1.x 中,我們最熟悉的莫過于 `Observable` 這個類了,筆者在剛剛使用 RxJava 2.x 的時候,創建了 一個 `Observable`,瞬間一臉懵逼有木有,居然連我們最最熟悉的 `Subscriber` 都沒了,取而代之的是 `ObservableEmmiter`,俗稱發射器。此外,由于沒有了`Subscriber`的蹤影,我們創建觀察者時需使用 `Observer`。而 `Observer` 也不是我們熟悉的那個 `Observer`,又出現了一個 `Disposable` 參數帶你裝逼帶你飛。
廢話不多說,從會用開始,還記得 RxJava 的三部曲嗎?

**第一步:初始化 Observable**
**第二步:初始化 Observer**
**第三步:建立訂閱關系**

~~~
Observable.create(new ObservableOnSubscribe<Integer>() { // 第一步:初始化Observable
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.e(TAG, "Observable emit 1" + "\n");
e.onNext(1);
Log.e(TAG, "Observable emit 2" + "\n");
e.onNext(2);
Log.e(TAG, "Observable emit 3" + "\n");
e.onNext(3);
e.onComplete();
Log.e(TAG, "Observable emit 4" + "\n" );
e.onNext(4);
}
}).subscribe(new Observer<Integer>() { // 第三步:訂閱
// 第二步:初始化Observer
private int i;
private Disposable mDisposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
mDisposable = d;
}
@Override
public void onNext(@NonNull Integer integer) {
i++;
if (i == 2) {
// 在RxJava 2.x 中,新增的Disposable可以做到切斷的操作,讓Observer觀察者不再接收上游事件
mDisposable.dispose();
}
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError : value : " + e.getMessage() + "\n" );
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete" + "\n" );
}
});
~~~
不難看出,RxJava 2.x 與 1.x 還是存在著一些區別的。首先,創建 `Observable` 時,回調的是 `ObservableEmitter` ,字面意思即發射器,并且直接 throws Exception。其次,在創建的 Observer 中,也多了一個回調方法:`onSubscribe`,傳遞參數為`Disposable`,`Disposable` 相當于 RxJava 1.x 中的 `Subscription`, 用于解除訂閱。可以看到示例代碼中,在 i 自增到 2 的時候,訂閱關系被切斷。
~~~
07-03 14:24:11.663 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onSubscribe : false
07-03 14:24:11.664 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 1
07-03 14:24:11.665 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : value : 1
07-03 14:24:11.666 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 2
07-03 14:24:11.667 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : value : 2
07-03 14:24:11.668 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : isDisposable : true
07-03 14:24:11.669 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 3
07-03 14:24:11.670 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 4
~~~
當然,我們的 RxJava 2.x 也為我們保留了簡化訂閱方法,我們可以根據需求,進行相應的簡化訂閱,只不過傳入對象改為了 `Consumer`。
`Consumer` 即消費者,用于接收單個值,`BiConsumer` 則是接收兩個值,`Function` 用于變換對象,`Predicate` 用于判斷。這些接口命名大多參照了 Java 8 ,熟悉 Java 8 新特性的應該都知道意思,這里也不再贅述。
## 線程調度
關于線程切換這點,RxJava 1.x 和 RxJava 2.x 的實現思路是一樣的。這里簡單的說一下,以便于我們的新司機入手。
#### subScribeOn
同 RxJava 1.x 一樣,`subscribeOn` 用于指定 `subscribe()` 時所發生的線程,從源碼角度可以看出,內部線程調度是通過 `ObservableSubscribeOn`來實現的。
~~~
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
~~~
`ObservableSubscribeOn` 的核心源碼在 `subscribeActual` 方法中,通過代理的方式使用 `SubscribeOnObserver` 包裝 `Observer` 后,設置 `Disposable` 來將 `subscribe` 切換到 `Scheduler` 線程中。
#### observeOn
`observeOn` 方法用于指定下游 `Observer` 回調發生的線程。
~~~
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
~~~
#### 線程切換需要注意的
RxJava 內置的線程調度器的確可以讓我們的線程切換得心應手,但其中也有些需要注意的地方。
* 簡單地說,`subscribeOn()` 指定的就是發射事件的線程,`observerOn` 指定的就是訂閱者接收事件的線程。
* 多次指定發射事件的線程只有第一次指定的有效,也就是說多次調用 `subscribeOn()` 只有第一次的有效,其余的會被忽略。
* 但多次指定訂閱者接收線程是可以的,也就是說每調用一次 `observerOn()`,下游的線程就會切換一次。

~~~
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName());
e.onNext(1);
e.onComplete();
}
}).subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "After observeOn(mainThread),Current thread is " + Thread.currentThread().getName());
}
})
.observeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "After observeOn(io),Current thread is " + Thread.currentThread().getName());
}
});
~~~
輸出:
~~~
07-03 14:54:01.177 15121-15438/com.nanchen.rxjava2examples E/RxThreadActivity: Observable thread is : RxNewThreadScheduler-1
07-03 14:54:01.178 15121-15121/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(mainThread),Current thread is main
07-03 14:54:01.179 15121-15439/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(io),Current thread is RxCachedThreadScheduler-2
~~~
實例代碼中,分別用 `Schedulers.newThread()` 和 `Schedulers.io()` 對發射線程進行切換,并采用 `observeOn(AndroidSchedulers.mainThread()` 和 `Schedulers.io()` 進行了接收線程的切換。可以看到輸出中發射線程僅僅響應了第一個 `newThread`,但每調用一次 `observeOn()` ,線程便會切換一次,因此如果我們有類似的需求時,便知道如何處理了。
RxJava 中,已經內置了很多線程選項供我們選擇,例如有:
* `Schedulers.io()` 代表io操作的線程, 通常用于網絡,讀寫文件等io密集型的操作;
* `Schedulers.computation()` 代表CPU計算密集型的操作, 例如需要大量計算的操作;
* `Schedulers.newThread()` 代表一個常規的新線程;
* `AndroidSchedulers.mainThread()` 代表Android的主線程
這些內置的 `Scheduler` 已經足夠滿足我們開發的需求,因此我們應該使用內置的這些選項,而 RxJava 內部使用的是線程池來維護這些線程,所以效率也比較高。
## 操作符
關于操作符,在[官方文檔](http://reactivex.io/documentation/operators.html)中已經做了非常完善的講解,并且筆者[前面的系列教程](http://www.jianshu.com/p/b39afa92807e)中也著重講解了絕大多數的操作符作用,這里受于篇幅限制,就不多做贅述,只挑選幾個進行實際情景的講解。

#### map

`map` 操作符可以將一個 `Observable` 對象通過某種關系轉換為另一個`Observable` 對象。在 2.x 中和 1.x 中作用幾乎一致,不同點在于:2.x 將 1.x 中的 `Func1` 和 `Func2` 改為了 `Function` 和 `BiFunction`。
#### 采用 map 操作符進行網絡數據解析
想必大家都知道,很多時候我們在使用 RxJava 的時候總是和 Retrofit 進行結合使用,而為了方便演示,這里我們就暫且采用 OkHttp3 進行演示,配合 `map`,`doOnNext` ,線程切換進行簡單的網絡請求:
1)通過 Observable.create() 方法,調用 OkHttp 網絡請求;
2)通過 map 操作符集合 gson,將 Response 轉換為 bean 類;
3)通過 doOnNext() 方法,解析 bean 中的數據,并進行數據庫存儲等操作;
4)調度線程,在子線程中進行耗時操作任務,在主線程中更新 UI ;
5)通過 subscribe(),根據請求成功或者失敗來更新 UI 。
~~~
Observable.create(new ObservableOnSubscribe<Response>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
Builder builder = new Builder()
.url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
Response response = call.execute();
e.onNext(response);
}
}).map(new Function<Response, MobileAddress>() {
@Override
public MobileAddress apply(@NonNull Response response) throws Exception {
if (response.isSuccessful()) {
ResponseBody body = response.body();
if (body != null) {
Log.e(TAG, "map:轉換前:" + response.body());
return new Gson().fromJson(body.string(), MobileAddress.class);
}
}
return null;
}
}).observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<MobileAddress>() {
@Override
public void accept(@NonNull MobileAddress s) throws Exception {
Log.e(TAG, "doOnNext: 保存成功:" + s.toString() + "\n");
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<MobileAddress>() {
@Override
public void accept(@NonNull MobileAddress data) throws Exception {
Log.e(TAG, "成功:" + data.toString() + "\n");
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "失敗:" + throwable.getMessage() + "\n");
}
});
~~~
#### concat

`concat` 可以做到不交錯的發射兩個甚至多個 Observable 的發射事件,并且只有前一個 `Observable` 終止(`onComplete`) 后才會訂閱下一個 `Observable`。
#### 采用 concat 操作符先讀取緩存再通過網絡請求獲取數據
想必在實際應用中,很多時候(對數據操作不敏感時)都需要我們先讀取緩存的數據,如果緩存沒有數據,再通過網絡請求獲取,隨后在主線程更新我們的UI。
`concat` 操作符簡直就是為我們這種需求量身定做。
利用 `concat` 的必須調用 `onComplete` 后才能訂閱下一個 `Observable` 的特性,我們就可以先讀取緩存數據,倘若獲取到的緩存數據不是我們想要的,再調用 `onComplete()` 以執行獲取網絡數據的 `Observable`,如果緩存數據能應我們所需,則直接調用 `onNext()`,防止過度的網絡請求,浪費用戶的流量。
~~~
Observable<FoodList> cache = Observable.create(new ObservableOnSubscribe<FoodList>() {
@Override
public void subscribe(@NonNull ObservableEmitter<FoodList> e) throws Exception {
Log.e(TAG, "create當前線程:"+Thread.currentThread().getName() );
FoodList data = CacheManager.getInstance().getFoodListData();
// 在操作符 concat 中,只有調用 onComplete 之后才會執行下一個 Observable
if (data != null){ // 如果緩存數據不為空,則直接讀取緩存數據,而不讀取網絡數據
isFromNet = false;
Log.e(TAG, "\nsubscribe: 讀取緩存數據:" );
runOnUiThread(new Runnable() {
@Override
public void run() {
mRxOperatorsText.append("\nsubscribe: 讀取緩存數據:\n");
}
});
e.onNext(data);
}else {
isFromNet = true;
runOnUiThread(new Runnable() {
@Override
public void run() {
mRxOperatorsText.append("\nsubscribe: 讀取網絡數據:\n");
}
});
Log.e(TAG, "\nsubscribe: 讀取網絡數據:" );
e.onComplete();
}
}
});
Observable<FoodList> network = Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
.addQueryParameter("rows",10+"")
.build()
.getObjectObservable(FoodList.class);
// 兩個 Observable 的泛型應當保持一致
Observable.concat(cache,network)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<FoodList>() {
@Override
public void accept(@NonNull FoodList tngouBeen) throws Exception {
Log.e(TAG, "subscribe 成功:"+Thread.currentThread().getName() );
if (isFromNet){
mRxOperatorsText.append("accept : 網絡獲取數據設置緩存: \n");
Log.e(TAG, "accept : 網絡獲取數據設置緩存: \n"+tngouBeen.toString() );
CacheManager.getInstance().setFoodListData(tngouBeen);
}
mRxOperatorsText.append("accept: 讀取數據成功:" + tngouBeen.toString()+"\n");
Log.e(TAG, "accept: 讀取數據成功:" + tngouBeen.toString());
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "subscribe 失敗:"+Thread.currentThread().getName() );
Log.e(TAG, "accept: 讀取數據失敗:"+throwable.getMessage() );
mRxOperatorsText.append("accept: 讀取數據失敗:"+throwable.getMessage()+"\n");
}
});
~~~
有時候我們的緩存可能還會分為 memory 和 disk ,實際上都差不多,無非是多寫點 `Observable` ,然后通過 `concat` 合并即可。
#### flatMap 實現多個網絡請求依次依賴
想必這種情況也在實際情況中比比皆是,例如用戶注冊成功后需要自動登錄,我們只需要先通過注冊接口注冊用戶信息,注冊成功后馬上調用登錄接口進行自動登錄即可。
我們的 `flatMap` 恰好解決了這種應用場景,`flatMap` 操作符可以將一個發射數據的 `Observable` 變換為多個 `Observables` ,然后將它們發射的數據合并后放到一個單獨的 `Observable`,利用這個特性,我們很輕松地達到了我們的需求。
~~~
Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
.addQueryParameter("rows", 1 + "")
.build()
.getObjectObservable(FoodList.class) // 發起獲取食品列表的請求,并解析到FootList
.subscribeOn(Schedulers.io()) // 在io線程進行網絡請求
.observeOn(AndroidSchedulers.mainThread()) // 在主線程處理獲取食品列表的請求結果
.doOnNext(new Consumer<FoodList>() {
@Override
public void accept(@NonNull FoodList foodList) throws Exception {
// 先根據獲取食品列表的響應結果做一些操作
Log.e(TAG, "accept: doOnNext :" + foodList.toString());
mRxOperatorsText.append("accept: doOnNext :" + foodList.toString()+"\n");
}
})
.observeOn(Schedulers.io()) // 回到 io 線程去處理獲取食品詳情的請求
.flatMap(new Function<FoodList, ObservableSource<FoodDetail>>() {
@Override
public ObservableSource<FoodDetail> apply(@NonNull FoodList foodList) throws Exception {
if (foodList != null && foodList.getTngou() != null && foodList.getTngou().size() > 0) {
return Rx2AndroidNetworking.post("http://www.tngou.net/api/food/show")
.addBodyParameter("id", foodList.getTngou().get(0).getId() + "")
.build()
.getObjectObservable(FoodDetail.class);
}
return null;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<FoodDetail>() {
@Override
public void accept(@NonNull FoodDetail foodDetail) throws Exception {
Log.e(TAG, "accept: success :" + foodDetail.toString());
mRxOperatorsText.append("accept: success :" + foodDetail.toString()+"\n");
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "accept: error :" + throwable.getMessage());
mRxOperatorsText.append("accept: error :" + throwable.getMessage()+"\n");
}
});
~~~
#### 善用 zip 操作符,實現多個接口數據共同更新 UI
在實際應用中,我們極有可能會在一個頁面顯示的數據來源于多個接口,這時候我們的 `zip` 操作符為我們排憂解難。
`zip` 操作符可以將多個 `Observable` 的數據結合為一個數據源再發射出去。
~~~
Observable<MobileAddress> observable1 = Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
.build()
.getObjectObservable(MobileAddress.class);
Observable<CategoryResult> observable2 = Network.getGankApi()
.getCategoryData("Android",1,1);
Observable.zip(observable1, observable2, new BiFunction<MobileAddress, CategoryResult, String>() {
@Override
public String apply(@NonNull MobileAddress mobileAddress, @NonNull CategoryResult categoryResult) throws Exception {
return "合并后的數據為:手機歸屬地:"+mobileAddress.getResult().getMobilearea()+"人名:"+categoryResult.results.get(0).who;
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "accept: 成功:" + s+"\n");
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "accept: 失敗:" + throwable+"\n");
}
});
~~~
#### 采用 interval 操作符實現心跳間隔任務
想必即時通訊等需要輪訓的任務在如今的 APP 中已是很常見,而 RxJava 2.x 的 `interval` 操作符可謂完美地解決了我們的疑惑。
這里就簡單的意思一下輪訓。
~~~
private Disposable mDisposable;
@Override
protected void doSomething() {
mDisposable = Flowable.interval(1, TimeUnit.SECONDS)
.doOnNext(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: doOnNext : "+aLong );
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: 設置文本 :"+aLong );
mRxOperatorsText.append("accept: 設置文本 :"+aLong +"\n");
}
});
}
/**
* 銷毀時停止心跳
*/
@Override
protected void onDestroy() {
super.onDestroy();
if (mDisposable != null){
mDisposable.dispose();
}
}
~~~
## RxJava 1.x 如何平滑升級到 RxJava 2.x?
由于 RxJava 2.x 變化較大無法直接升級,幸運的是,官方為我們提供了 [RxJava2Interrop](%5Bhttps://github.com/akarnokd/RxJava2Interop%5D(https://github.com/akarnokd/RxJava2Interop)) 這個庫,可以方便地把 RxJava 1.x 升級到 RxJava 2.x,或者將 RxJava 2.x 轉回到 RxJava 1.x。
## 寫在最后
本醬看你都看到這兒了,實為未來的棟梁之才,所以且送你一本經書:
[https://github.com/nanchen2251/RxJava2Examples](https://github.com/nanchen2251/RxJava2Examples)


GIF.gif
作者:南塵2251
鏈接:http://www.jianshu.com/p/0cd258eecf60
來源:簡書
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。
- 0-發現
- AndroidInterview-Q-A
- Android能讓你少走彎路的干貨整理
- LearningNotes
- temp
- temp11
- 部分地址
- 0-待辦任務
- 待補充列表
- 0-未分類
- AndroidView事件分發與滑動沖突處理
- Spannable
- 事件分發機制詳解
- 1-Java
- 1-Java-01基礎
- 未歸檔
- 你應該知道的JDK知識
- 集合框架
- 1-Java-04合集
- Java之旅0
- Java之旅
- JAVA之旅01
- JAVA之旅02
- JAVA之旅03
- JAVA之旅04
- JAVA之旅05
- JAVA之旅06
- JAVA之旅07
- JAVA之旅08
- JAVA之旅09
- java之旅1
- JAVA之旅10
- JAVA之旅11
- JAVA之旅12
- JAVA之旅13
- JAVA之旅14
- JAVA之旅15
- JAVA之旅16
- JAVA之旅17
- JAVA之旅18
- JAVA之旅19
- java之旅2
- JAVA之旅20
- JAVA之旅21
- JAVA之旅22
- JAVA之旅23
- JAVA之旅24
- JAVA之旅25
- JAVA之旅26
- JAVA之旅27
- JAVA之旅28
- JAVA之旅29
- java之旅3
- JAVA之旅30
- JAVA之旅31
- JAVA之旅32
- JAVA之旅33
- JAVA之旅34
- JAVA之旅35
- 1-Java-05辨析
- HashMapArrayMap
- Java8新特性
- Java8接口默認方法
- 圖解HashMap(1)
- 圖解HashMap(2)
- 2-Android
- 2-Android-1-基礎
- View繪制流程
- 事件分發
- AndroidView的事件分發機制和滑動沖突解決
- 自定義View基礎
- 1-安卓自定義View基礎-坐標系
- 2-安卓自定義View基礎-角度弧度
- 3-安卓自定義View基礎-顏色
- 自定義View進階
- 1-安卓自定義View進階-分類和流程
- 10-安卓自定義View進階-Matrix詳解
- 11-安卓自定義View進階-MatrixCamera
- 12-安卓自定義View進階-事件分發機制原理
- 13-安卓自定義View進階-事件分發機制詳解
- 14-安卓自定義View進階-MotionEvent詳解
- 15-安卓自定義View進階-特殊形狀控件事件處理方案
- 16-安卓自定義View進階-多點觸控詳解
- 17-安卓自定義View進階-手勢檢測GestureDetector
- 2-安卓自定義View進階-繪制基本圖形
- 3-安卓自定義View進階-畫布操作
- 4-安卓自定義View進階-圖片文字
- 5-安卓自定義View進階-Path基本操作
- 6-安卓自定義View進階-貝塞爾曲線
- 7-安卓自定義View進階-Path完結篇偽
- 8-安卓自定義View進階-Path玩出花樣PathMeasure
- 9-安卓自定義View進階-Matrix原理
- 通用類介紹
- Application
- 2-Android-2-使用
- 2-Android-02控件
- ViewGroup
- ConstraintLayout
- CoordinatorLayout
- 2-Android-03三方使用
- Dagger2
- Dagger2圖文完全教程
- Dagger2最清晰的使用教程
- Dagger2讓你愛不釋手-終結篇
- Dagger2讓你愛不釋手-重點概念講解、融合篇
- dagger2讓你愛不釋手:基礎依賴注入框架篇
- 閱讀筆記
- Glide
- Google推薦的圖片加載庫Glide:最新版使用指南(含新特性)
- rxjava
- 這可能是最好的RxJava2.x入門教程完結版
- 這可能是最好的RxJava2.x入門教程(一)
- 這可能是最好的RxJava2.x入門教程(三)
- 這可能是最好的RxJava2.x入門教程(二)
- 這可能是最好的RxJava2.x入門教程(五)
- 這可能是最好的RxJava2.x入門教程(四)
- 2-Android-3-優化
- 優化概況
- 各種優化
- Android端秒開優化
- apk大小優化
- 內存分析
- 混淆
- 2-Android-4-工具
- adb命令
- 一鍵分析Android的BugReport
- 版本控制
- git
- git章節簡述
- 2-Android-5-源碼
- HandlerThread 源碼分析
- IntentService的使用和源碼分析
- 2-Android-9-辨析
- LRU算法
- 什么是Bitmap
- 常見圖片壓縮方式
- 3-Kotlin
- Kotlin使用筆記1-草稿
- Kotlin使用筆記2
- kotlin特性草稿
- Kotlin草稿-Delegation
- Kotlin草稿-Field
- Kotlin草稿-object
- 4-JavaScript
- 5-Python
- 6-Other
- Git
- Gradle
- Android中ProGuard配置和總結
- gradle使用筆記
- Nexus私服搭建
- 編譯提速最佳實踐
- 7-設計模式與架構
- 組件化
- 組件化探索(OKR)
- 1-參考列表
- 2-1-組件化概述
- 2-2-gradle配置
- 2-3-代碼編寫
- 2-4-常見問題
- 2-9-值得一讀
- 8-數據結構與算法
- 0臨時文件
- 漢諾塔
- 8-數據-1數據結構
- HashMap
- HashMap、Hashtable、HashSet 和 ConcurrentHashMap 的比較
- 遲到一年HashMap解讀
- 8-數據-2算法
- 1個就夠了
- Java常用排序算法(必須掌握的8大排序算法)
- 常用排序算法總結(性能+代碼)
- 必須知道的八大種排序算法(java實現)
- 9-職業
- 閱讀
- 書單
- 面試
- 面試-01-java
- Java面試題全集駱昊(上)
- Java面試題全集駱昊(下)
- Java面試題全集駱昊(中)
- 面試-02-android
- 40道Android面試題
- 面試-03-開源源碼
- Android圖片加載框架最全解析(二),從源碼的角度理解Glide的執行流程
- 面試-07-設計模式
- 面試-08-算法
- 面試-09-其他
- SUMMARY
- 版權說明
- temp111