<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                ## 33 分階段執行你的任務-學習使用Phaser運行多階段任務 > 青年是學習智慧的時期,中年是付諸實踐的時期。 > —— 盧梭 本節我們學習的 Phaser,自 Java 7 出現,在功能上和 CyclicBarrier 及 CountDownLatch 很相似,不過更為靈活。我們回想 CyclicBarrier 的使用,在初始化時需要指定參與者數量,并且無法更改。而 Phaser 可以靈活的添加參與者,以及動態注銷參與者,從而更加靈活地協同線程工作。 ## 1、Phaser API 介紹 Phaser 從名稱可以看出,它對線程協同的重點是任務階段。phaser 中維護了所處階段的數值。其實 CyclicBarrier 也可以實現類似的功能,但無法應對更為復雜的場景。Phaser 會更為的靈活,這體現著它對參與者的動態增減。并且參與者可以選擇到達屏障點后是否阻塞。我們先看 Phaser 中涉及到的兩個重要概念: 1. phase。Phaser 對階段進行管理,而 phase 就是階段,可以是階段 1、階段 2、階段 3…… 當所有的參與者到達某個階段屏障點時,phaser 會進入下一個階段; 2. party。參與者,Phase r 中會記錄參與者的數量,可以通過 register 方法來添加,或者通過 arriveAndDeregister 來注銷。 接下來我們看一下 Phaser 的主要 API: 1. register ():參與者數量加一; 2. arrive ():參與者到達屏障點,到達數量加一。但是不會阻塞調用此方法的線程; 3. arriveAndAwaitAdvance ():參與者到達屏障點,到達數量加一。阻塞線程直到所有的參與者到達該 phase 輪次; 4. arriveAndDeregister ():參與者到達屏障點,到達數量加一。然后從 Phase 注銷掉一個參與者,參與者減一; 5. awaitAdvance (int phase):阻塞所有的參與者到達該 phaser 的指定輪次。如果當前輪次和 phase 值不同或者 phase 已被終止時,會立即返回; 6. awaitAdvanceInterruptibly (int phase):功能同上,但是可以被打斷; 7. awaitAdvanceInterruptibly (int phase, long timeout, TimeUnit unit):功能同上,但是只阻塞指定的時長; 8. bulkRegister (int parties):批量注冊參與者; 9. forceTermination ():終止當前 phaser,改變其狀態為 termination; 10. onAdvance ():階段達成時被調用,子類可以對其重寫。。 ## 2、Phaser 使用示例 網上有很多 Phaser 的使用范例,但其實絕大多數并沒有體現出 Phaser 的優勢來,看完之后反而覺得用 CyclicBarrier 也是能直接實現。其實 Phaser 的優勢體現在對參與者數量動態管理上。下面我們寫一個簡單的例子,來看看 Phaser 如何使用。 我們設想如下場景:期末考試到了,軟件學院三個班共有 60 個學生一起參加考試,全部交卷后,有 3 個老師做判卷的工作,再由 3 位輔導員公布成績。 這個過程中分為三個階段: 1. 學生考試,參與者 50 2. 老師判卷,參與者 3 3. 輔導員公布成績,參與者 3 這個過程使用一個 CyclicBarrier 是無法實現的,因為 CyclicBarrier 的參與者數量無法變化。為了日志的簡潔,下面的代碼只模擬 10 個學生考試: ~~~java public class Client { public static void main(String[] args) { Phaser phaser = new Phaser(); //主線程注冊 phaser.register(); //10個學生線程分別啟動開始考試,然后交卷,交卷后通知phaser已到達并且注銷 IntStream.range(1,10).forEach(number->{ phaser.register(); Thread student= new Thread(()->{ try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("學生"+number+"交卷"); phaser.arriveAndDeregister(); }); student.start(); }); //學生并行考試時,主線程會先執行到此行代碼,但由于本phase還沒有達成,所以阻塞在此 phaser.arriveAndAwaitAdvance(); //所有學生達成后,開始新的phase,下面啟動三個老師線程,開始判卷 IntStream.range(1,3).forEach(number->{ phaser.register(); Thread teacher= new Thread(()->{ try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("老師"+number+"判卷完成"); phaser.arriveAndDeregister(); }); teacher.start(); }); //老師判卷時,主線程會先執行到此行代碼,但由于本phase還沒有達成,所以阻塞在此 phaser.arriveAndAwaitAdvance(); //所有老師都達成后,開始新的phase,下面啟動三個輔導員線程,公布成績 IntStream.range(1,3).forEach(number->{ phaser.register(); Thread counsellor= new Thread(()->{ System.out.println("輔導員"+number+"公布成績完成"); phaser.arriveAndDeregister(); }); counsellor.start(); }); } ~~~ 1、首先主線程進行 register,因為主線程要使用 phaser 來控制流程,它也是參與者之一; 2、然后起 10 個學生線程考試、交卷。注意起線程前需要通過 phaser.register () 來注冊參與者; 3、接下來主線程 phaser.arriveAndAwaitAdvance (); 這個方法會阻塞,直到所有的子線程執行了 phaser.arriveAndDeregister (),此時進入下一個 phase; 4、創建三個 teacher 線程進行判卷,和 student 線程一樣,需要先注冊自己,輸出判卷完成后,調用 phaser.arriveAndAwaitAdvance (),通知 phaser 自己已經到達并且要注銷; 5、主線程還是調用 phaser.arriveAndAwaitAdvance (); 阻塞,等待所有老師線程 arrive。然后繼續執行; 6、創建 3 個 counsellor 線程。先注冊自己,公布成績后,調用 phaser.arriveAndAwaitAdvance (),通知 phaser 自己已經到達并且要注銷。 主流程通過 phaser.arriveAndAwaitAdvance () 來阻塞,控制主流程在上一 phase 完成后才進入下個 phase。在每個 phase 中會有多個線程同時執行。 程序輸出如下: ~~~ 學生8交卷 學生1交卷 學生9交卷 學生2交卷 學生3交卷 學生6交卷 學生7交卷 學生4交卷 學生5交卷 學生10交卷 老師3判卷完成 老師1判卷完成 老師2判卷完成 輔導員1公布成績完成 輔導員2公布成績完成 輔導員3公布成績完成 ~~~ 可以看到和我們預想的一模一樣。階段間串行,階段內并行。 下面總結一下我們使用到的 phaser 的方法: 1、new Phaser ()。創建新的 Phaser,并且參與者為 0; 2、phaser.register (); 增加參與者; 3、phaser.arriveAndDeregister (); 參與者到達,并且注銷掉參與者。這個方法不會阻塞; 4、phaser.arriveAndAwaitAdvance (); 阻塞等待階段達成。 ## 3、Phaser 實現原理解析 下面我們分析一下 Phaser 的實現原理。我們先來理解 Phaser 中有一個很關鍵的屬性 status。 ~~~java private volatile long state; ~~~ 這個 long 類型的 status 在不同 bit 位保存了 Phaser 狀態相關的四種屬性,具體如下: 0-15 位:還未到達屏障的參與者數量 16-31 位:參與者數量 32-62 位:phase 的輪次 63 位:標識是否被終止 可以看到與 Phaser 狀態相關的數據都包含在 state 之中。不分開保存的原因是多個屬性不能通過 CAS 的方式做原子操作。把這些屬性組合起來,可以通過 CAS 方式更新確保線程安全,并且變相做到了多個屬性更新的原子操作。 Phaser 中有兩個鏈表保存等待的線程: ~~~java private final AtomicReference<QNode> evenQ; private final AtomicReference<QNode> oddQ; ~~~ 這是為了消除添加和釋放線程等待的爭搶。所以根據 phaser 輪次的奇偶,保存在不同的鏈表中。 這里就不再展開將 Phaser 的源代碼了,簡單講一下源代碼中的實現原理。首先我們知道 state 保存了 4 種狀態,所以更新狀態的時候要把 status 中相應屬性增減的數值換算為相應位數對應的整數,然后通過 CAS 的方式進行修改。 比如通過調用 arrive 方法,需要減少一個未到達屏障的參與者,也就是要對 state 的 0-1 5 位 - 1。由于為低位 -1,所以直接對 state 減一即可。如下,adjust 的值為 1: ~~~java (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) ~~~ 如果調用 arriveAndDeregister 方法,減少一個未到達屏障的參與者,并且還要減少一個參與者。相當于對 0-15 位減 1,并且對 16-31 位減 1,對應的二進制數值就是 10000000000000001,轉化為 10 進制為 65537。那么需要對 status 減掉 65537。我們看一下 arriveAndDeregister 方法: ~~~java public int arriveAndDeregister() { return doArrive(ONE_DEREGISTER); } ~~~ 我們看到調用 doArrive 時傳入的參數是 ONE\_DEREGISTER,它的值如下: ~~~java private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY; ~~~ ONE\_ARRIVAL= 1, ONE\_PARTY=1 << PARTIES\_SHIFT,PARTIES\_SHIFT=16。也就是 ONE\_PARTY 的值是 1 向左移 16 位。那么 ONE\_ARRIVAL|ONE\_PARTY 得出的二進制就是 10000000000000001。正是我們按照需求推斷出的二進制數值。 此外為了取出 state 中相應 bit 位數區間的狀態值,Phaser 是通過位移或者 &、| 操作來實現,例如取得 phse 輪次值的代碼如下: ~~~java int phase = (int)(s >>> PHASE_SHIFT); ~~~ PHASE\_SHIFT 為 32。將 state 值右移 32 位,這意味著把代表 phase 輪次的 32-63 位 bit 數值移到了 0-32 位。然后強轉 int 類型,消除掉高 32 位。這樣就得到了 phase 的輪次真正數值。 Phaser 對 state 的操作方式都是這種二進制的方式,一開始看起來會比較費勁,但是理解了它的原理,也并不復雜。其他的關于等待線程的管理、線程阻塞和恢復,和之前我們分析的源代碼大同小異。大家感興趣的話,也可以看一看。 ## 4、總結 Phaser 提供了分階段執行任務的功能,并且能夠動態的改變參與者的數量,和 CyclicBarrier 以及 CountDownLatch 比較起來更為靈活。這也是 JDK 的文檔中所提到的。實際開發中按照需求選擇使用。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看