[TOC]
本系列文章為 Season_zlc 大神的[《給初學者的RxJava2.0教程》](http://www.jianshu.com/p/464fa025229e)系列學習筆記。
# 配置
添加 Gradle 配置:
```groovy
implementation 'io.reactivex.rxjava2:rxjava:x.y.z'
implementation 'io.reactivex.rxjava2:rxandroid:x.y.z'
```
# 基礎
[原文鏈接](http://www.jianshu.com/p/464fa025229e)

## 上下游
* 上游和下游分別對應著 Observable 和 Observer
* 水管中流動的是事件(對象),這個重要!
## 發送事件
* 只有調用了 subscribe 方法之后,上游才會發事件
* 上游發送了 onComplete 或 onError 事件后可以繼續發送其他事件,而下游接收了 onComplete 或 onError 事件之后不再接收其他事件
* 上游可以不發送 onComplete 或 onError 事件,發送的話只能發送一次并且互斥,即不能同時發送
## 水管機關
* 調用 subscribe 方法就把上游、下游兩個水管連接起來了,同時會發送一個 onSubscribe 事件,并可以拿到一個 disposable 開關
* disposable 可以理解為兩根管子之間的一個機關,調用其 dispose 方法時,可以將兩根管道切斷,導致下游收不到事件
* RxJava 內置了一個 disposable 容器 CompositeDisposable,可以存儲 disposable,用來在 Activity 退出時調用所有機關的 dispose 方法,使下游不再接收事件
```java
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
});
Observer<Integer> observer = new Observer<Integer>() {
Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
observable.subscribe(observer);
```
# 事件源類型
事件源類型|描述
---|---
Flowable|可以發送 N 個事件,支持背壓
Observable|可以發送 N 個事件,不支持背壓
Single|可以發送 1 個事件,或 1 個 onError 事件
Completable|不可以發送普通事件,可以發送 1 個 onComplete 事件或 1 個 onError 事件
Maybe|可以發射 1 個事件、1 個 onError 事件,或者什么都不發射
# 線程調度
[原文鏈接](http://www.jianshu.com/p/8818b98c44e2)
## 應用場景
Rxjava 最常見的應用場景是:
在子線程進行數據計算、網絡請求操作等,然后回到主線程展示結果(成功或錯誤)
## 內置線程
* Schedulers.io():代表 IO 操作線程,用于網絡、讀寫文件等 IO 密集型操作
* Schedulers.newThread():代表一個常規的新線程
* Schedulers.computation():代表 CPU 計算密集型操作,如需要大量計算操作時使用
* AndroidSchedulers.mainThread():Android 主線程
## 線程調度
* subscribeOn 指定上游發送事件的線程
* observeOn 指定下游接收事件的線程
* 多次指定上游線程只有第一次有效,其他會被忽略
* 多次指定下游線程沒問題,每調用一次 observeOn,下游線程就會切換一次
```java
observable.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.io())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
// 此處為 IO 線程
}
})
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
// 此處為主線程
}
})
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
// 此處為新的子線程
}
});
```
# 變換操作符
[原文鏈接](http://www.jianshu.com/p/128e662906af)
## Map 操作符
* 對上游發送的每一個事件應用一個函數,使得每一個事件都按照指定的函數變化
```java
Observable.fromArray(1, 2, 3)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "Result is " + integer;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, s);
}
});
```
## FlatMap 操作符

### 定義
* FlatMap 的作用是將一個事件,轉換為,發送新的事件的,Observable
* 被轉換為 Observable 后開始發送新的事件
* 發送出來的,這些新的事件,會被合并到一個水管中,發給下游
* FlatMap 相當于直接轉換了上游發射源所發射的事件類型
* Map 相當于轉換了水管中流動的事件的類型
### 其他
* FlatMap 不保證新事件的順序,ConcatMap 可以保證
### 代碼示例
```java
public interface Api {
@GET
Observable<LoginResponse> login(@Body LoginRequest request);
@GET
Observable<RegisterResponse> register(@Body RegisterRequest request);
}
```
```java
api.register(new RegisterRequest())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<RegisterResponse>() {
@Override
public void accept(RegisterResponse registerResponse) throws Exception {
// 先根據注冊結果做一些事情
}
})
.observeOn(Schedulers.io())
.flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() {
@Override
public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception {
return api.login(new LoginRequest());
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<LoginResponse>() {
@Override
public void accept(LoginResponse loginResponse) throws Exception {
// 登錄成功
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
// 登錄失敗
}
});
```
## Zip 操作符
[原文鏈接](http://www.jianshu.com/p/bb58571cdb64)
### 定義
* Zip 可將多個 Observable 發送的事件結合到一起,然后發送這些組合到一起的事件
* 嚴格按照順序執行
* 只發射與發射數據項數最少的那個 Observable 一樣多的數據

### 代碼示例
```java
public interface Api {
@GET
Observable<UserBaseInfoResponse> getUserBaseInfo(@Body UserBaseInfoRequest request);
@GET
Observable<UserExtraInfoResponse> getUserExtraInfo(@Body UserExtraInfoRequest request);
}
```
```java
Observable<UserBaseInfoResponse> userBaseInfoObservable = api.getUserBaseInfo(
new UserBaseInfoRequest())
.subscribeOn(Schedulers.io());
Observable<UserExtraInfoResponse> userExtraInfoObservable = api.getUserExtraInfo(
new UserExtraInfoRequest())
.subscribeOn(Schedulers.io());
Observable.zip(userBaseInfoObservable, userExtraInfoObservable, new BiFunction<UserBaseInfoResponse,
UserExtraInfoResponse, UserInfo>() {
@Override
public UserInfo apply(UserBaseInfoResponse userBaseInfoResponse, UserExtraInfoResponse
userExtraInfoResponse) throws Exception {
return new UserInfo(userBaseInfoResponse, userExtraInfoResponse);
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<UserInfo>() {
@Override
public void accept(UserInfo userInfo) throws Exception {
// do something
}
});
```
# 背壓
## 上下游流速不均衡問題
[原文鏈接](http://www.jianshu.com/p/0f2d6c2387c9)
```java
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; ; i++) {
e.onNext(i);
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
Log.e(TAG, "accept: " + integer);
}
});
```
* 上游水管發送事件過快時,事件會被先保存到該水管的水缸中
* 當上下游處在不同線程,即為異步操作時,上下游流速不均衡時會造成內存溢出(水缸溢出)
* 上下游為同一線程,即為同步操作時,不會出現內存溢出情況。因為上游需等待下游處理完畢才會發送下一個事件
## 流速問題解決方案
[原文鏈接](http://www.jianshu.com/p/e4c6d7989356)
* 從數量上治理,減少發送進入水缸事件的數量(會造成部分事件丟失)
* 從時間上治理,減緩事件發送進入水缸的速度(事件不會丟失)
方案一 demo:
```java
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; ; i++) {
e.onNext(i);
}
}
}).subscribeOn(Schedulers.io())
.sample(2, TimeUnit.SECONDS) // sample 每隔 2 秒取樣
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
Log.e(TAG, "accept: " + integer);
}
});
}
```
方案二 demo:
```java
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; ; i++) {
e.onNext(i);
Thread.sleep(2000);
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
Log.e(TAG, "accept: " + integer);
}
});
}
```
# Flowable
* Flowable 采取響應式拉取
* Subscription 的 request 方法相當于下游告訴上游,下游它所能處理事件的能力
* Flowable 默認有一個大小為 128 的水缸,上下游工作在不同線程時,上游會先把事件貯存在水缸中
```java
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
e.onNext(0);
e.onNext(1);
e.onNext(2);
e.onComplete();
}
// 此處第二個參數 BackpressureStrategy.ERROR 代表處理上下游流速不均問題的處理策略,此處為直接拋出錯誤
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
```
## Flowable 處理上下游流速不均問題的策略
類型|策略描述
---|---
BackpressureStrategy.ERROR|直接拋出 MissingBackpressureException 異常
BackpressureStrategy.BUFFER|上游的水缸更換為無限大的
BackpressureStrategy.DROP|水缸存不下的事件直接丟棄
BackpressureStrategy.LATEST|水缸內只保留最新的事件
- 導讀
- Java知識
- Java基本程序設計結構
- 【基礎知識】Java基礎
- 【源碼分析】Okio
- 【源碼分析】深入理解i++和++i
- 【專題分析】JVM與GC
- 【面試清單】Java基本程序設計結構
- 對象與類
- 【基礎知識】對象與類
- 【專題分析】Java類加載過程
- 【面試清單】對象與類
- 泛型
- 【基礎知識】泛型
- 【面試清單】泛型
- 集合
- 【基礎知識】集合
- 【源碼分析】SparseArray
- 【面試清單】集合
- 多線程
- 【基礎知識】多線程
- 【源碼分析】ThreadPoolExecutor源碼分析
- 【專題分析】volatile關鍵字
- 【面試清單】多線程
- Java新特性
- 【專題分析】Lambda表達式
- 【專題分析】注解
- 【面試清單】Java新特性
- Effective Java筆記
- Android知識
- Activity
- 【基礎知識】Activity
- 【專題分析】運行時權限
- 【專題分析】使用Intent打開三方應用
- 【源碼分析】Activity的工作過程
- 【面試清單】Activity
- 架構組件
- 【專題分析】MVC、MVP與MVVM
- 【專題分析】數據綁定
- 【面試清單】架構組件
- 界面
- 【專題分析】自定義View
- 【專題分析】ImageView的ScaleType屬性
- 【專題分析】ConstraintLayout 使用
- 【專題分析】搞懂點九圖
- 【專題分析】Adapter
- 【源碼分析】LayoutInflater
- 【源碼分析】ViewStub
- 【源碼分析】View三大流程
- 【源碼分析】觸摸事件分發機制
- 【源碼分析】按鍵事件分發機制
- 【源碼分析】Android窗口機制
- 【面試清單】界面
- 動畫和過渡
- 【基礎知識】動畫和過渡
- 【面試清單】動畫和過渡
- 圖片和圖形
- 【專題分析】圖片加載
- 【面試清單】圖片和圖形
- 后臺任務
- 應用數據和文件
- 基于網絡的內容
- 多線程與多進程
- 【基礎知識】多線程與多進程
- 【源碼分析】Handler
- 【源碼分析】AsyncTask
- 【專題分析】Service
- 【源碼分析】Parcelable
- 【專題分析】Binder
- 【源碼分析】Messenger
- 【面試清單】多線程與多進程
- 應用優化
- 【專題分析】布局優化
- 【專題分析】繪制優化
- 【專題分析】內存優化
- 【專題分析】啟動優化
- 【專題分析】電池優化
- 【專題分析】包大小優化
- 【面試清單】應用優化
- Android新特性
- 【專題分析】狀態欄、ActionBar和導航欄
- 【專題分析】應用圖標、通知欄適配
- 【專題分析】Android新版本重要變更
- 【專題分析】唯一標識符的最佳做法
- 開源庫源碼分析
- 【源碼分析】BaseRecyclerViewAdapterHelper
- 【源碼分析】ButterKnife
- 【源碼分析】Dagger2
- 【源碼分析】EventBus3(一)
- 【源碼分析】EventBus3(二)
- 【源碼分析】Glide
- 【源碼分析】OkHttp
- 【源碼分析】Retrofit
- 其他知識
- Flutter
- 原生開發與跨平臺開發
- 整體歸納
- 狀態及狀態管理
- 零碎知識點
- 添加Flutter到現有應用
- Git知識
- Git命令
- .gitignore文件
- 設計模式
- 創建型模式
- 結構型模式
- 行為型模式
- RxJava
- 基礎
- Linux知識
- 環境變量
- Linux命令
- ADB命令
- 算法
- 常見數據結構及實現
- 數組
- 排序算法
- 鏈表
- 二叉樹
- 棧和隊列
- 算法時間復雜度
- 常見算法思想
- 其他技術
- 正則表達式
- 編碼格式
- HTTP與HTTPS
- 【面試清單】其他知識
- 開發歸納
- Android零碎問題
- 其他零碎問題
- 開發思路