<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                [TOC] 本系列文章為 Season_zlc 大神的[《給初學者的RxJava2.0教程》](http://www.jianshu.com/p/464fa025229e)系列學習筆記。 # 配置 添加 Gradle 配置: ```groovy implementation 'io.reactivex.rxjava2:rxjava:x.y.z' implementation 'io.reactivex.rxjava2:rxandroid:x.y.z' ``` # 基礎 [原文鏈接](http://www.jianshu.com/p/464fa025229e) ![](https://img.kancloud.cn/f7/b3/f7b38edb806c5854a3dbde9b974c5e41_580x325.png) ## 上下游 * 上游和下游分別對應著 Observable 和 Observer * 水管中流動的是事件(對象),這個重要! ## 發送事件 * 只有調用了 subscribe 方法之后,上游才會發事件 * 上游發送了 onComplete 或 onError 事件后可以繼續發送其他事件,而下游接收了 onComplete 或 onError 事件之后不再接收其他事件 * 上游可以不發送 onComplete 或 onError 事件,發送的話只能發送一次并且互斥,即不能同時發送 ## 水管機關 * 調用 subscribe 方法就把上游、下游兩個水管連接起來了,同時會發送一個 onSubscribe 事件,并可以拿到一個 disposable 開關 * disposable 可以理解為兩根管子之間的一個機關,調用其 dispose 方法時,可以將兩根管道切斷,導致下游收不到事件 * RxJava 內置了一個 disposable 容器 CompositeDisposable,可以存儲 disposable,用來在 Activity 退出時調用所有機關的 dispose 方法,使下游不再接收事件 ```java Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); e.onComplete(); } }); Observer<Integer> observer = new Observer<Integer>() { Disposable mDisposable; @Override public void onSubscribe(Disposable d) { mDisposable = d; } @Override public void onNext(Integer integer) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }; observable.subscribe(observer); ``` # 事件源類型 事件源類型|描述 ---|--- Flowable|可以發送 N 個事件,支持背壓 Observable|可以發送 N 個事件,不支持背壓 Single|可以發送 1 個事件,或 1 個 onError 事件 Completable|不可以發送普通事件,可以發送 1 個 onComplete 事件或 1 個 onError 事件 Maybe|可以發射 1 個事件、1 個 onError 事件,或者什么都不發射 # 線程調度 [原文鏈接](http://www.jianshu.com/p/8818b98c44e2) ## 應用場景 Rxjava 最常見的應用場景是: 在子線程進行數據計算、網絡請求操作等,然后回到主線程展示結果(成功或錯誤) ## 內置線程 * Schedulers.io():代表 IO 操作線程,用于網絡、讀寫文件等 IO 密集型操作 * Schedulers.newThread():代表一個常規的新線程 * Schedulers.computation():代表 CPU 計算密集型操作,如需要大量計算操作時使用 * AndroidSchedulers.mainThread():Android 主線程 ## 線程調度 * subscribeOn 指定上游發送事件的線程 * observeOn 指定下游接收事件的線程 * 多次指定上游線程只有第一次有效,其他會被忽略 * 多次指定下游線程沒問題,每調用一次 observeOn,下游線程就會切換一次 ```java observable.subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.io()) .doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { // 此處為 IO 線程 } }) .observeOn(AndroidSchedulers.mainThread()) .doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { // 此處為主線程 } }) .observeOn(Schedulers.newThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { // 此處為新的子線程 } }); ``` # 變換操作符 [原文鏈接](http://www.jianshu.com/p/128e662906af) ## Map 操作符 * 對上游發送的每一個事件應用一個函數,使得每一個事件都按照指定的函數變化 ```java Observable.fromArray(1, 2, 3) .map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return "Result is " + integer; } }) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.i(TAG, s); } }); ``` ## FlatMap 操作符 ![](https://img.kancloud.cn/7f/19/7f19a8efb4005e241437ed6cd54bdc89_647x763.png) ### 定義 * FlatMap 的作用是將一個事件,轉換為,發送新的事件的,Observable * 被轉換為 Observable 后開始發送新的事件 * 發送出來的,這些新的事件,會被合并到一個水管中,發給下游 * FlatMap 相當于直接轉換了上游發射源所發射的事件類型 * Map 相當于轉換了水管中流動的事件的類型 ### 其他 * FlatMap 不保證新事件的順序,ConcatMap 可以保證 ### 代碼示例 ```java public interface Api { @GET Observable<LoginResponse> login(@Body LoginRequest request); @GET Observable<RegisterResponse> register(@Body RegisterRequest request); } ``` ```java api.register(new RegisterRequest()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnNext(new Consumer<RegisterResponse>() { @Override public void accept(RegisterResponse registerResponse) throws Exception { // 先根據注冊結果做一些事情 } }) .observeOn(Schedulers.io()) .flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() { @Override public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception { return api.login(new LoginRequest()); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<LoginResponse>() { @Override public void accept(LoginResponse loginResponse) throws Exception { // 登錄成功 } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { // 登錄失敗 } }); ``` ## Zip 操作符 [原文鏈接](http://www.jianshu.com/p/bb58571cdb64) ### 定義 * Zip 可將多個 Observable 發送的事件結合到一起,然后發送這些組合到一起的事件 * 嚴格按照順序執行 * 只發射與發射數據項數最少的那個 Observable 一樣多的數據 ![](https://img.kancloud.cn/ea/49/ea49166f780a9b84b9f1c52753423abb_648x879.png) ### 代碼示例 ```java public interface Api { @GET Observable<UserBaseInfoResponse> getUserBaseInfo(@Body UserBaseInfoRequest request); @GET Observable<UserExtraInfoResponse> getUserExtraInfo(@Body UserExtraInfoRequest request); } ``` ```java Observable<UserBaseInfoResponse> userBaseInfoObservable = api.getUserBaseInfo( new UserBaseInfoRequest()) .subscribeOn(Schedulers.io()); Observable<UserExtraInfoResponse> userExtraInfoObservable = api.getUserExtraInfo( new UserExtraInfoRequest()) .subscribeOn(Schedulers.io()); Observable.zip(userBaseInfoObservable, userExtraInfoObservable, new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() { @Override public UserInfo apply(UserBaseInfoResponse userBaseInfoResponse, UserExtraInfoResponse userExtraInfoResponse) throws Exception { return new UserInfo(userBaseInfoResponse, userExtraInfoResponse); } }).observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<UserInfo>() { @Override public void accept(UserInfo userInfo) throws Exception { // do something } }); ``` # 背壓 ## 上下游流速不均衡問題 [原文鏈接](http://www.jianshu.com/p/0f2d6c2387c9) ```java Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { for (int i = 0; ; i++) { e.onNext(i); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Thread.sleep(2000); Log.e(TAG, "accept: " + integer); } }); ``` * 上游水管發送事件過快時,事件會被先保存到該水管的水缸中 * 當上下游處在不同線程,即為異步操作時,上下游流速不均衡時會造成內存溢出(水缸溢出) * 上下游為同一線程,即為同步操作時,不會出現內存溢出情況。因為上游需等待下游處理完畢才會發送下一個事件 ## 流速問題解決方案 [原文鏈接](http://www.jianshu.com/p/e4c6d7989356) * 從數量上治理,減少發送進入水缸事件的數量(會造成部分事件丟失) * 從時間上治理,減緩事件發送進入水缸的速度(事件不會丟失) 方案一 demo: ```java Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { for (int i = 0; ; i++) { e.onNext(i); } } }).subscribeOn(Schedulers.io()) .sample(2, TimeUnit.SECONDS) // sample 每隔 2 秒取樣 .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Thread.sleep(2000); Log.e(TAG, "accept: " + integer); } }); } ``` 方案二 demo: ```java Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { for (int i = 0; ; i++) { e.onNext(i); Thread.sleep(2000); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Thread.sleep(2000); Log.e(TAG, "accept: " + integer); } }); } ``` # Flowable * Flowable 采取響應式拉取 * Subscription 的 request 方法相當于下游告訴上游,下游它所能處理事件的能力 * Flowable 默認有一個大小為 128 的水缸,上下游工作在不同線程時,上游會先把事件貯存在水缸中 ```java Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> e) throws Exception { e.onNext(0); e.onNext(1); e.onNext(2); e.onComplete(); } // 此處第二個參數 BackpressureStrategy.ERROR 代表處理上下游流速不均問題的處理策略,此處為直接拋出錯誤 }, BackpressureStrategy.ERROR) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(Integer integer) { } @Override public void onError(Throwable t) { } @Override public void onComplete() { } }); ``` ## Flowable 處理上下游流速不均問題的策略 類型|策略描述 ---|--- BackpressureStrategy.ERROR|直接拋出 MissingBackpressureException 異常 BackpressureStrategy.BUFFER|上游的水缸更換為無限大的 BackpressureStrategy.DROP|水缸存不下的事件直接丟棄 BackpressureStrategy.LATEST|水缸內只保留最新的事件
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看