<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                [TOC] 到目前為止筆者分析了Android中最熱門的網絡底層和封裝框架:[Android主流三方庫源碼分析(一、深入理解OKHttp源碼)](https://juejin.im/post/5e1be39b6fb9a02fcd130d1f)和[Android主流三方庫源碼分析(二、深入理解Retrofit源碼)](https://juejin.im/post/5e1fb9386fb9a0300a4501a6),Android中使用最廣泛的圖片加載框架Glide的加載流程:[Android主流三方庫源碼分析(三、深入理解Glide源碼)](https://juejin.im/post/5e2109e25188254c257c40c6)以及Android中性能最好的數據庫框架[Android主流三方庫源碼分析(四、深入理解GreenDao源碼)](https://juejin.im/post/5e44b3c2e51d4526ec0d2b71)。本篇,我將會對近幾年比較熱門的函數式編程框架RxJava的源碼進行詳細的分析。 ### 一、RxJava到底是什么? RxJava是基于Java虛擬機上的響應式擴展庫,它通過**使用可觀察的序列將異步和基于事件的程序組合起來**。 與此同時,它**擴展了觀察者模式來支持數據/事件序列**,并且添加了操作符,這些**操作符允許你聲明性地組合序列**,同時抽象出要關注的問題:比如低級線程、同步、線程安全和并發數據結構等。 從RxJava的官方定義來看,我們如果要想真正地理解RxJava,就必須對它以下兩個部分進行深入的分析: * 1、**訂閱流程** * 2、**線程切換** 當然,RxJava操作符的源碼也是很不錯的學習資源,特別是FlatMap、Zip等操作符的源碼,有很多可以借鑒的地方,但是它們內部的實現比較復雜,限于篇幅,本文只講解RxJava的訂閱流程和線程切換原理。接下來,筆者一一對以上RxJava的兩個關鍵部分來進行詳細地講解。 ### 二、RxJava的訂閱流程 首先給出RxJava消息訂閱的例子: ~~~ Observable.create(newObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String>emitter) throws Exception { emitter.onNext("1"); emitter.onNext("2"); emitter.onNext("3"); emitter.onComplete(); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(String s) { Log.d(TAG, "onNext : " + s); } @Override public void onError(Throwable e) { Log.d(TAG, "onError : " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); ~~~ 可以看到,這里首先創建了一個被觀察者,然后創建一個觀察者訂閱了這個被觀察者,因此下面分兩個部分對RxJava的訂閱流程進行分析: * 1、**創建被觀察者過程** * 2、**訂閱過程** #### 1、創建被觀察者過程 首先,上面使用了Observable類的create()方法創建了一個被觀察者,看看里面做了什么。 ##### 1.1、Observable#create() ~~~ // 省略一些檢測性的注解 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } ~~~ 在Observable的create()里面實際上是創建了一個新的ObservableCreate對象,同時,把我們定義好的ObservableOnSubscribe對象傳入了ObservableCreate對象中,最后調用了RxJavaPlugins.onAssembly()方法。接下來看看這個ObservableCreate是干什么的。 ##### 1.2、ObservableCreate ~~~ public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } ... } 復制代碼 ~~~ 這里僅僅是把ObservableOnSubscribe這個對象保存在ObservableCreate中了。然后看看RxJavaPlugins.onAssembly()這個方法的處理。 ##### 1.3、RxJavaPlugins#onAssembly() ~~~ public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { // 應用hook函數的一些處理,一般用到不到 ... return source; } 復制代碼 ~~~ 最終僅僅是把我們的ObservableCreate給返回了。 ##### 1.4、創建被觀察者過程小結 從以上分析可知,Observable.create()方法僅僅是**先將我們自定義的ObservableOnSubscribe對象重新包裝成了一個ObservableCreate對象**。 #### 2、訂閱過程 接著,看看Observable.subscribe()的訂閱過程是如何實現的。 ##### 2.1、Observable#subscribe() ~~~ public final void subscribe(Observer<? super T> observer) { ... // 1 observer = RxJavaPlugins.onSubscribe(this,observer); ... // 2 subscribeActual(observer); ... } 復制代碼 ~~~ 在注釋1處,在Observable的subscribe()方法內部首先調用了RxJavaPlugins的onSubscribe()方法。 ##### 2.2、RxJavaPlugins#onSubscribe() ~~~ public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) { // 應用hook函數的一些處理,一般用到不到 ... return observer; } 復制代碼 ~~~ 除去hook應用的邏輯,這里僅僅是將observer返回了。接著來分析下注釋2處的subscribeActual()方法, ##### 2.3、Observable#subscribeActual() ~~~ protected abstract void subscribeActual(Observer<? super T> observer); 復制代碼 ~~~ 這是一個抽象的方法,很明顯,它對應的具體實現類就是我們在第一步創建的ObservableCreate類,接下來看到ObservableCreate的subscribeActual()方法。 ##### 2.4、ObservableCreate#subscribeActual() ~~~ @Override protected void subscribeActual(Observer<? super T> observer) { // 1 CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 2 observer.onSubscribe(parent); try { // 3 source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } 復制代碼 ~~~ 在注釋1處,首先新創建了一個CreateEmitter對象,同時傳入了我們自定義的observer對象進去。 ##### 2.4.1、CreateEmitter ~~~ static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { ... final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } ... } 復制代碼 ~~~ 從上面可以看出,**CreateEmitter通過繼承了Java并發包中的原子引用類AtomicReference保證了事件流切斷狀態Dispose的一致性**(這里不理解的話,看到后面講解Dispose的時候就明白了),并**實現了ObservableEmitter接口和Disposable接口**,接著我們分析下注釋2處的observer.onSubscribe(parent),這個onSubscribe回調的含義其實就是**告訴觀察者已經成功訂閱了被觀察者**。再看到注釋3處的source.subscribe(parent)這行代碼,這里的source其實是ObservableOnSubscribe對象,我們看到ObservableOnSubscribe的subscribe()方法。 ##### 2.4.2、ObservableOnSubscribe#subscribe() ~~~ Observable observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public voidsubscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("1"); emitter.onNext("2"); emitter.onNext("3"); emitter.onComplete(); } }); 復制代碼 ~~~ 這里面使用到了ObservableEmitter的onNext()方法將事件流發送出去,最后調用了onComplete()方法完成了訂閱過程。ObservableEmitter是一個抽象類,實現類就是我們傳入的CreateEmitter對象,接下來我們看看CreateEmitter的onNext()方法和onComplete()方法的處理。 ##### 2.4.3、CreateEmitter#onNext() && CreateEmitter#onComplete() ~~~ static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { ... @Override public void onNext(T t) { ... if (!isDisposed()) { //調用觀察者的onNext() observer.onNext(t); } } @Override public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } } ... } 復制代碼 ~~~ 在CreateEmitter的onNext和onComplete方法中首先都要經過一個**isDisposed**的判斷,作用就是看**當前的事件流是否被切斷(廢棄)掉了**,默認是不切斷的,如果想要切斷,可以調用Disposable的dispose()方法將此狀態設置為切斷(廢棄)狀態。我們繼續看看這個isDisposed內部的處理。 ##### 2.4.4、ObservableEmitter#isDisposed() ~~~ @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } 復制代碼 ~~~ 注意到這里通過get()方法首先從ObservableEmitter的AtomicReference中拿到了保存的Disposable狀態。然后交給了DisposableHelper進行判斷處理。接下來看看DisposableHelper的處理。 ##### 2.4.5、DisposableHelper#isDisposed() && DisposableHelper#set() ~~~ public enum DisposableHelper implements Disposable { DISPOSED; public static boolean isDisposed(Disposable d) { // 1 return d == DISPOSED; } public static boolean set(AtomicReference<Disposable> field, Disposable d) { for (;;) { Disposable current = field.get(); if (current == DISPOSED) { if (d != null) { d.dispose(); } return false; } // 2 if (field.compareAndSet(current, d)) { if (current != null) { current.dispose(); } return true; } } } ... public static boolean dispose(AtomicReference<Disposable> field) { Disposable current = field.get(); Disposable d = DISPOSED; if (current != d) { // ... current = field.getAndSet(d); if (current != d) { if (current != null) { current.dispose(); } return true; } } return false; } ... } 復制代碼 ~~~ DisposableHelper是一個枚舉類,內部只有一個值即DISPOSED, 從上面的分析可知它就是用來**標記事件流被切斷(廢棄)狀態的**。先看到注釋2和注釋3處的代碼**field.compareAndSet(current, d)和field.getAndSet(d)**,這里使用了**原子引用AtomicReference內部包裝的[CAS](https://www.jianshu.com/p/ab2c8fce878b)方法處理了標志Disposable的并發讀寫問題**。最后看到注釋3處,將我們傳入的CreateEmitter這個原子引用類保存的Dispable狀態和DisposableHelper內部的DISPOSED進行比較,如果相等,就證明數據流被切斷了。為了更進一步理解Disposed的作用,再來看看CreateEmitter中剩余的關鍵方法。 ##### 2.4.6、CreateEmitter ~~~ @Override public void onNext(T t) { ... // 1 if (!isDisposed()) { observer.onNext(t); } } @Override public void onError(Throwable t) { if (!tryOnError(t)) { // 2 RxJavaPlugins.onError(t); } } @Override public boolean tryOnError(Throwable t) { ... // 3 if (!isDisposed()) { try { observer.onError(t); } finally { // 4 dispose(); } return true; } return false; } @Override public void onComplete() { // 5 if (!isDisposed()) { try { observer.onComplete(); } finally { // 6 dispose(); } } } 復制代碼 ~~~ 在注釋1、3、5處,onNext()和onError()、onComplete()方法首先都會判斷事件流是否被切斷,如果事件流此時被切斷了,那么onNext()和onComplete()則會退出方法體,不做處理,**onError()則會執行到RxJavaPlugins.onError(t)這句代碼,內部會直接拋出異常,導致崩潰**。如果事件流沒有被切斷,那么在onError()和onComplete()內部最終會調用到注釋4、6處的這句dispose()代碼,將事件流進行切斷,由此可知,**onError()和onComplete()只能調用一個,如果先執行的是onComplete(),再調用onError()的話就會導致異常崩潰**。 ### 三、RxJava的線程切換 首先給出RxJava線程切換的例子: ~~~ Observable.create(new ObservableOnSubscribe<String>() { @Override public voidsubscribe(ObservableEmitter<String>emitter) throws Exception { emitter.onNext("1"); emitter.onNext("2"); emitter.onNext("3"); emitter.onComplete(); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(String s) { Log.d(TAG, "onNext : " + s); } @Override public void onError(Throwable e) { Log.d(TAG, "onError : " +e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); 復制代碼 ~~~ 可以看到,RxJava的線程切換主要**分為subscribeOn()和observeOn()方法**,首先,來分析下subscribeOn()方法。 #### 1、subscribeOn(Schedulers.io()) 在Schedulers.io()方法中,我們需要先傳入一個Scheduler調度類,這里是傳入了一個調度到io子線程的調度類,我們看看這個Schedulers.io()方法內部是怎么構造這個調度器的。 #### 2、Schedulers#io() ~~~ static final Scheduler IO; ... public static Scheduler io() { // 1 return RxJavaPlugins.onIoScheduler(IO); } static { ... // 2 IO = RxJavaPlugins.initIoScheduler(new IOTask()); } static final class IOTask implements Callable<Scheduler> { @Override public Scheduler call() throws Exception { // 3 return IoHolder.DEFAULT; } } static final class IoHolder { // 4 static final Scheduler DEFAULT = new IoScheduler(); } 復制代碼 ~~~ Schedulers這個類的代碼很多,這里我只拿出有關Schedulers.io這個方法涉及的邏輯代碼進行講解。首先,在注釋1處,同前面分析的訂閱流程的處理一樣,只是一個處理hook的邏輯,最終返回的還是傳入的這個IO對象。再看到注釋2處,**在Schedulers的靜態代碼塊中將IO對象進行了初始化,其實質就是新建了一個IOTask的靜態內部類**,在IOTask的call方法中,也就是注釋3處,可以了解到使用了靜態內部類的方式把創建的IOScheduler對象給返回出去了。繞了這么大圈子,**Schedulers.io方法其實質就是返回了一個IOScheduler對象**。 #### 3、Observable#subscribeOn() ~~~ public final Observable<T> subscribeOn(Scheduler scheduler) { ... return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); } 復制代碼 ~~~ 在subscribeOn()方法里面,又將ObservableCreate包裝成了一個ObservableSubscribeOn對象。我們關注到ObservableSubscribeOn類。 #### 4、ObservableSubscribeOn ~~~ public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { // 1 super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> observer) { // 2 final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer); // 3 observer.onSubscribe(parent); // 4 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } ... } 復制代碼 ~~~ 首先,在注釋1處,將傳進來的source和scheduler保存起來。接著,等到實際訂閱的時候,就會執行到這個subscribeActual方法,在注釋2處,將我們自定義的Observer包裝成了一個SubscribeOnObserver對象。在注釋3處,通知觀察者訂閱了被觀察者。在注釋4處,內部先創建了一個SubscribeTask對象,來看看它的實現。 #### 5、ObservableSubscribeOn#SubscribeTask ~~~ final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { source.subscribe(parent); } } 復制代碼 ~~~ SubscribeTask是ObservableSubscribeOn的內部類,它實質上就是一個任務類,在它的run方法中會執行到source.subscribe(parent)的訂閱方法,**這個source其實就是我們在ObservableSubscribeOn構造方法中傳進來的ObservableCreate對象**。接下來看看scheduler.scheduleDirect()內部的處理。 #### 6、Scheduler#scheduleDirect() ~~~ public Disposable scheduleDirect(@NonNull Runnable run) { return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); } public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { // 1 final Worker w = createWorker(); // 2 final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); // 3 DisposeTask task = new DisposeTask(decoratedRun, w); // 4 w.schedule(task, delay, unit); return task; } 復制代碼 ~~~ 這里最后會執行到上面這個scheduleDirect()重載方法。首先,在注釋1處,會調用createWorker()方法創建一個工作者對象Worker,它是一個抽象類,這里的實現類就是IoScheduler,下面,我們看看IoScheduler類的createWorker()方法。 ##### 6.1、IOScheduler#createWorker() ~~~ final AtomicReference<CachedWorkerPool> pool; ... public IoScheduler(ThreadFactory threadFactory) { this.threadFactory = threadFactory; this.pool = new AtomicReference<CachedWorkerPool>(NONE); start(); } ... @Override public Worker createWorker() { // 1 return new EventLoopWorker(pool.get()); } static final class EventLoopWorker extends Scheduler.Worker { ... EventLoopWorker(CachedWorkerPool pool) { this.pool = pool; this.tasks = new CompositeDisposable(); // 2 this.threadWorker = pool.get(); } } 復制代碼 ~~~ 首先,在注釋1處調用了pool.get()這個方法,**pool是一個CachedWorkerPool類型的原子引用對象**,它的作用就是**用于緩存工作者對象Worker的**。然后,將得到的CachedWorkerPool傳入新創建的EventLoopWorker對象中。重點關注一下注釋2處,這里將CachedWorkerPool緩存的threadWorker對象保存起來了。 下面,我們繼續分析3.6處代碼段的注釋2處的代碼,這里又是一個關于hook的封裝處理,最終還是返回的當前的Runnable對象。在注釋3處新建了一個切斷任務DisposeTask將decoratedRun和w對象包裝了起來。最后在注釋4處調用了工作者的schedule()方法。下面我們來分析下它內部的處理。 ##### 6.2、IoScheduler#schedule() ~~~ @Override public Disposable schedule(@NonNull Runnableaction, long delayTime, @NonNull TimeUnit unit){ ... return threadWorker.scheduleActual(action,delayTime, unit, tasks); } 復制代碼 ~~~ 內部調用了threadWorker的scheduleActual()方法,實際上是調用到了父類NewThreadWorker的scheduleActual()方法,我們繼續看看NewThreadWorker的scheduleActual()方法中做的事情。 ##### 6.3、NewThreadWorker#scheduleActual() ~~~ public NewThreadWorker(ThreadFactory threadFactory) { executor = SchedulerPoolFactory.create(threadFactory); } @NonNull public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); // 1 ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); if (parent != null) { if (!parent.add(sr)) { return sr; } } Future<?> f; try { // 2 if (delayTime <= 0) { // 3 f = executor.submit((Callable<Object>)sr); } else { // 4 f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; } 復制代碼 ~~~ 在NewThreadWorker的scheduleActual()方法的內部,在注釋1處首先會新建一個ScheduledRunnable對象,將Runnable對象和parent包裝起來了,**這里parent是一個DisposableContainer對象,它實際的實現類是CompositeDisposable類,它是一個保存所有事件流是否被切斷狀態的容器,其內部的實現是使用了RxJava自己定義的一個簡單的OpenHashSet類進行存儲**。最后注釋2處,判斷是否設置了延遲時間,如果設置了,則調用線程池的submit()方法立即進行線程切換,否則,調用schedule()方法進行延時執行線程切換。 #### 7、為什么多次執行subscribeOn(),只有第一次有效? 從上面的分析,我們可以很容易了解到**被觀察者被訂閱時是從最外面的一層(ObservableSubscribeOn)通知到里面的一層(ObservableOnSubscribe)**,當連續執行了到多次subscribeOn()的時候,其實就是先執行倒數第一次的subscribeOn()方法,直到最后一次執行的subscribeOn()方法,這樣肯定會覆蓋前面的線程切換。 #### 8、observeOn(AndroidSchedulers.mainThread()) ~~~ public final Observable<T> observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); } public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { .... return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); } 復制代碼 ~~~ 可以看到,observeOn()方法內部最終也是返回了一個ObservableObserveOn對象,我們直接來看看ObservableObserveOn的subscribeActual()方法。 #### 9、ObservableObserveOn#subscribeActual() ~~~ @Override protected void subscribeActual(Observer<? super T> observer) { // 1 if (scheduler instanceof TrampolineScheduler) { // 2 source.subscribe(observer); } else { // 3 Scheduler.Worker w = scheduler.createWorker(); // 4 source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } } 復制代碼 ~~~ 首先,在注釋1處,判斷指定的調度器是不是TrampolineScheduler,這是一個不進行線程切換,立即執行當前代碼的調度器。如果是,則會直接調用ObservableSubscribeOn的subscribe()方法,如果不是,則會在注釋3處創建一個工作者對象。然后,在注釋4處創建一個新的ObserveOnObserver將SubscribeOnobserver對象包裝起來,并傳入ObservableSubscribeOn的subscribe()方法進行訂閱。接下來看看ObserveOnObserver類的重點方法。 #### 10、ObserveOnObserver ~~~ @Override public void onNext(T t) { ... if (sourceMode != QueueDisposable.ASYNC) { // 1 queue.offer(t); } schedule(); } @Override public void onError(Throwable t) { ... schedule(); } @Override public void onComplete() { ... schedule(); } 復制代碼 ~~~ 去除非主線邏輯的代碼,在ObserveOnObserver的onNext()和onError()、onComplete()方法中最后都會調用到schedule()方法。接著看schedule()方法,其中**onNext()還會把消息存放到隊列中**。 #### 11、ObserveOnObserver#schedule() ~~~ void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } } ~~~ 這里使用了worker進行調度ObserveOnObserver這個實現了Runnable的任務。worker就是在AndroidSchedulers.mainThread()中創建的,內部其實就是**使用Handler進行線程切換的**,此處不再贅述了。接著看ObserveOnObserver的run()方法。 #### 12、ObserveOnObserver#run() ~~~ @Override public void run() { // 1 if (outputFused) { drainFused(); } else { // 2 drainNormal(); } } ~~~ 在注釋1處會**先判斷outputFused這個標志位,它表示事件流是否被融化掉,默認是false,所以,最后會執行到drainNormal()方法**。接著看看drainNormal()方法內部的處理。 #### 13、ObserveOnObserver#drainNormal() ~~~ void drainNormal() { int missed = 1; final SimpleQueue<T> q = queue; // 1 final Observer<? super T> a = downstream; ... // 2 v = q.poll(); ... // 3 a.onNext(v); ... } ~~~ 在注釋1處,這里的downstream實際上是從外面傳進來的SubscribeOnObserver對象。在注釋2處將隊列中的消息取出來,接著在注釋3處調用了SubscribeOnObserver的onNext方法。**最終,會從我們包裝類的最外層一直調用到最里面的我們自定義的Observer中的onNext()方法,所以,在observeOn()方法下面的鏈式代碼都會執行到它所指定的線程中,噢,原來如此**。 ### 五、總結 其實筆者使用了RxJava也已經有一年多的時間了,但是一直沒有去深入去了解過它的內部實現原理,**如今細細品嘗,的確是酣暢淋漓**。從一開始的OkHttp到現如今的RxJava源碼分析,到此為止,Android主流三方庫源碼分析系列文章已經發布了五篇了,我們的征途已經過半,接下來,我將會對Android中的內存泄露框架LeakCanary源碼進行深入地講解,盡請期待~ ##### 參考鏈接: * * * 1、RxJava V2.2.5 源碼 2、Android 進階之光 3、[詳解 RxJava 的消息訂閱和線程切換原理](https://mp.weixin.qq.com/s?__biz=MzIwMTAzMTMxMg==&mid=2649492749&idx=1&sn=a4d2e79afd8257b57c6efa57cbff4404&chksm=8eec86f2b99b0fe46f61f324e032af335fbe02c7db1ef4eca60abb4bc99b4d216da7ba32dc88&scene=38#wechat_redirect) 鏈接:https://juejin.im/post/5e4c9d45518825496e7847b1
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看