[TOC]
基于 io.reactivex:rxjava:1.3.8
源碼分析分為三個方向簡單訂閱過程、變換過程和線程切換。
## 簡單訂閱
### 使用
~~~
Observable.create(new Observable.<Integer>() {
// 1. 創建被觀察者(Observable) & 定義需發送的事件
@Override
public void call(Subscriber<Integer> subScriber){
subScriber.onNext(1);
}
}).subscribe(new Subscriber<Integer>() {
// 2. 創建觀察者(Observer) & 定義響應事件的行為
// 3. 通過訂閱(subscribe)連接觀察者和被觀察者
@Override
public void onNext(Integer value) {}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {}
});
~~~
### Observable.create
實際上就是創建一個Observable對象,并且把OnSubscribe對象賦值到Observable對象的onSubscribe 字段里。
~~~
public static <T> Observable<T> create(OnSubscribe<T> f){
//hook.onCreate(f) 實際上返回的還是f
//Observable構造函數,實際上只是賦值到OnSubscribe
return new Observable<T>(hook.onCreate(f));
}
//hook實際上就是RxJava-ObservableExecutionHook,它的onCreate如下
public <T> OnSubscribe<T> OnCreate(OnSubscribe<T> f){
return f;f
}
//接著回來查看Observable的構造方法,如下
public Observable(OnSubscribe<T> f){
this.onSubscribe = f;
}
~~~
### Observable().subscribe
找到Observable對象的onSubscribe對象
把自己作為參數調用onSubscribe的對象的call方法。
~~~
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn((Subscription)subscriber);
...
}
~~~
#### RxJavaHooks.onObservableStart
可以理解為直接返回onSubscribe
~~~
public static <T> OnSubscribe<T> onObservableStart(Observable<T> instance, OnSubscribe<T> onSubscribe) {
Func2<Observable, OnSubscribe, OnSubscribe> f = onObservableStart;
return f != null ? (OnSubscribe)f.call(instance, onSubscribe) : onSubscribe;
}
~~~
## 變換過程
### 使用
~~~
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<Integer> subScriber){
subScriber.onNext(1);
}
}).map(new Funcl<Integer,String>(){
@Override
public Srting call(Integer integer){
return "a" + integer;
r
}
}).subscribe(new Subscriber<String>() {
@Override
public void onNext(Integer value) {}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {}
});
~~~
Observable.create還是創建一個Observable對象,并且把OnSubscribe對象賦值到Observable對象的onSubscribe 字段里。我們把這一步的叫做 obj1
### Observable().map
~~~
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return unsafeCreate(new OnSubscribeMap(this, func));
}
public static <T> Observable<T> unsafeCreate(Observable.OnSubscribe<T> f) {
return new Observable(RxJavaHooks.onCreate(f));
}
~~~
#### OnSubscribeMap
~~~
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
final Observable<T> source;
final Func1<? super T, ? extends R> transformer;
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
public void call(Subscriber<? super R> o) {
OnSubscribeMap.MapSubscriber<T, R> parent = new OnSubscribeMap.MapSubscriber(o, this.transformer);
o.add(parent);
this.source.unsafeSubscribe(parent);
}
static final class MapSubscriber<T, R> extends Subscriber<T> {
final Subscriber<? super R> actual;
final Func1<? super T, ? extends R> mapper;
boolean done;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
public void onNext(T t) {
Object result;
try {
result = this.mapper.call(t);
} catch (Throwable var4) {
Exceptions.throwIfFatal(var4);
this.unsubscribe();
this.onError(OnErrorThrowable.addValueAsLastCause(var4, t));
return;
}
this.actual.onNext(result);
}
public void onError(Throwable e) {
if (this.done) {
RxJavaHooks.onError(e);
} else {
this.done = true;
this.actual.onError(e);
}
}
public void onCompleted() {
if (!this.done) {
this.actual.onCompleted();
}
}
public void setProducer(Producer p) {
this.actual.setProducer(p);
}
~~~
這里做就是
1. 生成第二個Observable obj2,然后他的onSubscribe 就是OnSubscribeMap
2. 執行最后Observable().subscribe時,就會調用obj2的onSubscribe的call
3. obj2的onSubscribe的call里,首先把自己new了一個MapSubscriber,主要功能就是完成參數轉換,并且調用下一個的對應方法
4. 然后他又會調用unsafeSubscribe(obj1),就會調用obj1的onSubscribe的call

## 線程切換
~~~
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<Integer> subScriber){
subScriber.onNext(1);
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onNext(Integer value) {}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {}
});
~~~
根據【變換過程】的思路,我們很容易猜到,實際上subscribeOn 和observeOn肯定也是生成了一個Observable,然后自定義一個Observable.OnSubscribe,OnSubscribe里面實際上做的也是先線程切換,然后在執行下一個的對應方法
### observeOn
~~~
public final class OperatorObserveOn<T> implements Operator<T, T> {
private final Scheduler scheduler;
private final boolean delayError;
private final int bufferSize;
public OperatorObserveOn(Scheduler scheduler, boolean delayError) {
this(scheduler, delayError, RxRingBuffer.SIZE);
}
public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize > 0 ? bufferSize : RxRingBuffer.SIZE;
}
//核心
public Subscriber<? super T> call(Subscriber<? super T> child) {
OperatorObserveOn.ObserveOnSubscriber<T> parent = new OperatorObserveOn.ObserveOnSubscriber(this.scheduler, child, this.delayError, this.bufferSize);
parent.init();
return parent;
}
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
final Subscriber<? super T> child;
final Worker recursiveScheduler
...
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
this.delayError = delayError;
...
}
void init() {
Subscriber<? super T> localChild = this.child;
localChild.setProducer(new Producer() {
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(ObserveOnSubscriber.this.requested, n);
ObserveOnSubscriber.this.schedule();
}
}
});
localChild.add(this.recursiveScheduler);
localChild.add(this);
}
public void onNext(T t) {
if (!this.isUnsubscribed() && !this.finished) {
if (!this.queue.offer(NotificationLite.next(t))) {
this.onError(new MissingBackpressureException());
} else {
this.schedule();
}
}
}
public void onCompleted() {
if (!this.isUnsubscribed() && !this.finished) {
this.finished = true;
this.schedule();
}
}
public void onError(Throwable e) {
if (!this.isUnsubscribed() && !this.finished) {
this.error = e;
this.finished = true;
this.schedule();
} else {
RxJavaHooks.onError(e);
}
}
protected void schedule() {
if (this.counter.getAndIncrement() == 0L) {
this.recursiveScheduler.schedule(this);
}
}
//schedule(this) 已經在異步線程上了實際上執行的方法
public void call() {
//代碼太多貼出來不便,實際上就是根據error 和 finished 或者是t值調用child對應的onNext onError onCompleted方法
~~~
### subscribeOn
~~~
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;
final Observable<T> source;
final boolean requestOn;
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) {
this.scheduler = scheduler;
this.source = source;
this.requestOn = requestOn;
}
public void call(Subscriber<? super T> subscriber) {
Worker inner = this.scheduler.createWorker();
OperatorSubscribeOn.SubscribeOnSubscriber<T> parent = new OperatorSubscribeOn.SubscribeOnSubscriber(subscriber, this.requestOn, inner, this.source);
subscriber.add(parent);
subscriber.add(inner);
//核心,在異步線程調用父Observable的方法
inner.schedule(parent);
}
static final class SubscribeOnSubscriber<T> extends Subscriber<T> implements Action0 {
...
//這里的代碼實際上就是調用子的 onNext onError onCompleted方法
}
~~~
## 總結
1. 中間一步都會創建一個Observable 觀察者
2. Observable都有一個 OnSubscribe onSubscribe, 這個實際上就是被出入的參數
3. 最后一個調用subScribe時,最后會調用上一個Observable的call,并且把自己當作參數出入
4. 上一個Observable會調用通過subScribe調用上一個Observable的call,一路上去,直到第一個Observable。
5. 第一個處理往后,會調用下一個的Observable的onSubscribe.onNext,下一個的調用下下個的。
6. 直到最后一個,調用結束
## 參考資料
《Android進階之光》
- Android
- 四大組件
- Activity
- Fragment
- Service
- 序列化
- Handler
- Hander介紹
- MessageQueue詳細
- 啟動流程
- 系統啟動流程
- 應用啟動流程
- Activity啟動流程
- View
- view繪制
- view事件傳遞
- choreographer
- LayoutInflater
- UI渲染概念
- Binder
- Binder原理
- Binder最大數據
- Binder小結
- Android組件
- ListView原理
- RecyclerView原理
- SharePreferences
- AsyncTask
- Sqlite
- SQLCipher加密
- 遷移與修復
- Sqlite內核
- Sqlite優化v2
- sqlite索引
- sqlite之wal
- sqlite之鎖機制
- 網絡
- 基礎
- TCP
- HTTP
- HTTP1.1
- HTTP2.0
- HTTPS
- HTTP3.0
- HTTP進化圖
- HTTP小結
- 實踐
- 網絡優化
- Json
- ProtoBuffer
- 斷點續傳
- 性能
- 卡頓
- 卡頓監控
- ANR
- ANR監控
- 內存
- 內存問題與優化
- 圖片內存優化
- 線下內存監控
- 線上內存監控
- 啟動優化
- 死鎖監控
- 崩潰監控
- 包體積優化
- UI渲染優化
- UI常規優化
- I/O監控
- 電量監控
- 第三方框架
- 網絡框架
- Volley
- Okhttp
- 網絡框架n問
- OkHttp原理N問
- 設計模式
- EventBus
- Rxjava
- 圖片
- ImageWoker
- Gilde的優化
- APT
- 依賴注入
- APT
- ARouter
- ButterKnife
- MMKV
- Jetpack
- 協程
- MVI
- Startup
- DataBinder
- 黑科技
- hook
- 運行期Java-hook技術
- 編譯期hook
- ASM
- Transform增量編譯
- 運行期Native-hook技術
- 熱修復
- 插件化
- AAB
- Shadow
- 虛擬機
- 其他
- UI自動化
- JavaParser
- Android Line
- 編譯
- 疑難雜癥
- Android11滑動異常
- 方案
- 工業化
- 模塊化
- 隱私合規
- 動態化
- 項目管理
- 業務啟動優化
- 業務架構設計
- 性能優化case
- 性能優化-排查思路
- 性能優化-現有方案
- 登錄
- 搜索
- C++
- NDK入門
- 跨平臺
- H5
- Flutter
- Flutter 性能優化
- 數據跨平臺