<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                [TOC] 基于 io.reactivex:rxjava:1.3.8 源碼分析分為三個方向簡單訂閱過程、變換過程和線程切換。 ## 簡單訂閱 ### 使用 ~~~ Observable.create(new Observable.<Integer>() { // 1. 創建被觀察者(Observable) & 定義需發送的事件 @Override public void call(Subscriber<Integer> subScriber){ subScriber.onNext(1); } }).subscribe(new Subscriber<Integer>() { // 2. 創建觀察者(Observer) & 定義響應事件的行為 // 3. 通過訂閱(subscribe)連接觀察者和被觀察者 @Override public void onNext(Integer value) {} @Override public void onError(Throwable e) {} @Override public void onComplete() {} }); ~~~ ### Observable.create 實際上就是創建一個Observable對象,并且把OnSubscribe對象賦值到Observable對象的onSubscribe 字段里。 ~~~ public static <T> Observable<T> create(OnSubscribe<T> f){ //hook.onCreate(f) 實際上返回的還是f //Observable構造函數,實際上只是賦值到OnSubscribe return new Observable<T>(hook.onCreate(f)); } //hook實際上就是RxJava-ObservableExecutionHook,它的onCreate如下 public <T> OnSubscribe<T> OnCreate(OnSubscribe<T> f){ return f;f } //接著回來查看Observable的構造方法,如下 public Observable(OnSubscribe<T> f){ this.onSubscribe = f; } ~~~ ### Observable().subscribe 找到Observable對象的onSubscribe對象 把自己作為參數調用onSubscribe的對象的call方法。 ~~~ public final Subscription subscribe(Subscriber<? super T> subscriber) { return subscribe(subscriber, this); } static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { ... RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn((Subscription)subscriber); ... } ~~~ #### RxJavaHooks.onObservableStart 可以理解為直接返回onSubscribe ~~~ public static <T> OnSubscribe<T> onObservableStart(Observable<T> instance, OnSubscribe<T> onSubscribe) { Func2<Observable, OnSubscribe, OnSubscribe> f = onObservableStart; return f != null ? (OnSubscribe)f.call(instance, onSubscribe) : onSubscribe; } ~~~ ## 變換過程 ### 使用 ~~~ Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<Integer> subScriber){ subScriber.onNext(1); } }).map(new Funcl<Integer,String>(){ @Override public Srting call(Integer integer){ return "a" + integer; r } }).subscribe(new Subscriber<String>() { @Override public void onNext(Integer value) {} @Override public void onError(Throwable e) {} @Override public void onComplete() {} }); ~~~ Observable.create還是創建一個Observable對象,并且把OnSubscribe對象賦值到Observable對象的onSubscribe 字段里。我們把這一步的叫做 obj1 ### Observable().map ~~~ public final <R> Observable<R> map(Func1<? super T, ? extends R> func) { return unsafeCreate(new OnSubscribeMap(this, func)); } public static <T> Observable<T> unsafeCreate(Observable.OnSubscribe<T> f) { return new Observable(RxJavaHooks.onCreate(f)); } ~~~ #### OnSubscribeMap ~~~ public final class OnSubscribeMap<T, R> implements OnSubscribe<R> { final Observable<T> source; final Func1<? super T, ? extends R> transformer; public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) { this.source = source; this.transformer = transformer; } public void call(Subscriber<? super R> o) { OnSubscribeMap.MapSubscriber<T, R> parent = new OnSubscribeMap.MapSubscriber(o, this.transformer); o.add(parent); this.source.unsafeSubscribe(parent); } static final class MapSubscriber<T, R> extends Subscriber<T> { final Subscriber<? super R> actual; final Func1<? super T, ? extends R> mapper; boolean done; public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) { this.actual = actual; this.mapper = mapper; } public void onNext(T t) { Object result; try { result = this.mapper.call(t); } catch (Throwable var4) { Exceptions.throwIfFatal(var4); this.unsubscribe(); this.onError(OnErrorThrowable.addValueAsLastCause(var4, t)); return; } this.actual.onNext(result); } public void onError(Throwable e) { if (this.done) { RxJavaHooks.onError(e); } else { this.done = true; this.actual.onError(e); } } public void onCompleted() { if (!this.done) { this.actual.onCompleted(); } } public void setProducer(Producer p) { this.actual.setProducer(p); } ~~~ 這里做就是 1. 生成第二個Observable obj2,然后他的onSubscribe 就是OnSubscribeMap 2. 執行最后Observable().subscribe時,就會調用obj2的onSubscribe的call 3. obj2的onSubscribe的call里,首先把自己new了一個MapSubscriber,主要功能就是完成參數轉換,并且調用下一個的對應方法 4. 然后他又會調用unsafeSubscribe(obj1),就會調用obj1的onSubscribe的call ![](https://img.kancloud.cn/fd/98/fd98f1107f67ab638dc97e3ed768f914_582x323.png) ## 線程切換 ~~~ Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<Integer> subScriber){ subScriber.onNext(1); } }) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<String>() { @Override public void onNext(Integer value) {} @Override public void onError(Throwable e) {} @Override public void onComplete() {} }); ~~~ 根據【變換過程】的思路,我們很容易猜到,實際上subscribeOn 和observeOn肯定也是生成了一個Observable,然后自定義一個Observable.OnSubscribe,OnSubscribe里面實際上做的也是先線程切換,然后在執行下一個的對應方法 ### observeOn ~~~ public final class OperatorObserveOn<T> implements Operator<T, T> { private final Scheduler scheduler; private final boolean delayError; private final int bufferSize; public OperatorObserveOn(Scheduler scheduler, boolean delayError) { this(scheduler, delayError, RxRingBuffer.SIZE); } public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) { this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize > 0 ? bufferSize : RxRingBuffer.SIZE; } //核心 public Subscriber<? super T> call(Subscriber<? super T> child) { OperatorObserveOn.ObserveOnSubscriber<T> parent = new OperatorObserveOn.ObserveOnSubscriber(this.scheduler, child, this.delayError, this.bufferSize); parent.init(); return parent; } static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 { final Subscriber<? super T> child; final Worker recursiveScheduler ... public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) { this.child = child; this.recursiveScheduler = scheduler.createWorker(); this.delayError = delayError; ... } void init() { Subscriber<? super T> localChild = this.child; localChild.setProducer(new Producer() { public void request(long n) { if (n > 0L) { BackpressureUtils.getAndAddRequest(ObserveOnSubscriber.this.requested, n); ObserveOnSubscriber.this.schedule(); } } }); localChild.add(this.recursiveScheduler); localChild.add(this); } public void onNext(T t) { if (!this.isUnsubscribed() && !this.finished) { if (!this.queue.offer(NotificationLite.next(t))) { this.onError(new MissingBackpressureException()); } else { this.schedule(); } } } public void onCompleted() { if (!this.isUnsubscribed() && !this.finished) { this.finished = true; this.schedule(); } } public void onError(Throwable e) { if (!this.isUnsubscribed() && !this.finished) { this.error = e; this.finished = true; this.schedule(); } else { RxJavaHooks.onError(e); } } protected void schedule() { if (this.counter.getAndIncrement() == 0L) { this.recursiveScheduler.schedule(this); } } //schedule(this) 已經在異步線程上了實際上執行的方法 public void call() { //代碼太多貼出來不便,實際上就是根據error 和 finished 或者是t值調用child對應的onNext onError onCompleted方法 ~~~ ### subscribeOn ~~~ public final class OperatorSubscribeOn<T> implements OnSubscribe<T> { final Scheduler scheduler; final Observable<T> source; final boolean requestOn; public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) { this.scheduler = scheduler; this.source = source; this.requestOn = requestOn; } public void call(Subscriber<? super T> subscriber) { Worker inner = this.scheduler.createWorker(); OperatorSubscribeOn.SubscribeOnSubscriber<T> parent = new OperatorSubscribeOn.SubscribeOnSubscriber(subscriber, this.requestOn, inner, this.source); subscriber.add(parent); subscriber.add(inner); //核心,在異步線程調用父Observable的方法 inner.schedule(parent); } static final class SubscribeOnSubscriber<T> extends Subscriber<T> implements Action0 { ... //這里的代碼實際上就是調用子的 onNext onError onCompleted方法 } ~~~ ## 總結 1. 中間一步都會創建一個Observable 觀察者 2. Observable都有一個 OnSubscribe onSubscribe, 這個實際上就是被出入的參數 3. 最后一個調用subScribe時,最后會調用上一個Observable的call,并且把自己當作參數出入 4. 上一個Observable會調用通過subScribe調用上一個Observable的call,一路上去,直到第一個Observable。 5. 第一個處理往后,會調用下一個的Observable的onSubscribe.onNext,下一個的調用下下個的。 6. 直到最后一個,調用結束 ## 參考資料 《Android進階之光》
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看