<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                ## 28 人齊了,一起行動—CyclicBarrier詳解 > 青年是學習智慧的時期,中年是付諸實踐的時期。 > —— 盧梭 上一節我們講解了 CountDownLatch,它的作用是讓多個線程完成后,再促使主線程繼續向下執行。不過它有一定的局限性,無法被重復使用。本節我們學習的 CyclicBarrier 不會有這個問題。CyclicBarrier 從字面上理解為循環柵欄。柵欄自然起到的就是屏障的作用,阻止線程通過,而循環則是指其可以反復使用。下面我們就先看看如何使用 CyclicBarrier。 ## 1、CyclicBarrier 的使用 幾年前北京的黑車盛行,西二旗地鐵口,大量黑車司機在出口招攬生意:“軟件園、軟件園!5 塊一位!還差最后一位!” 。等你上車,發現其實不是還差一位,而是只有你一位。而司機此時絕對不會發車,而是會等車上坐夠 4 個人后才出發,然后下一輛黑車再次坐滿 4 人后發車。下面我們就使用 CyclicBarrier 來模擬這個場景。 ~~~java public class Client { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(4, () -> System.out.println("人滿了發車") ); IntStream.range(1, 11).forEach(number -> { try { Thread.currentThread().sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { try { System.out.println("第 " + number + " 乘客上車了!"); cyclicBarrier.await(); System.out.println("第 " + number + " 乘客出發了!"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); }); } } ~~~ 代碼中首先聲明 cyclicBarrier 對象,構造方法有兩個參數,第一個參數是計數器初始值,每有一個線程達成則會減 1 。減到 0 時,觸發執行第二個參數傳入的 Runnable 實現的 run 方法。我這里使用 lambda 的方式簡化代碼。如果你不需要這個 Runnable 的任務,那么只需要傳入第一個參數即可。 接下來的代碼中,模擬 10 位乘客上車,每次上車后調用 cyclicBarrier.await() 。這里就是屏障點,此時當前線程會阻塞在此處,并且計數器被減 1 。為了輸出的效果便于觀看,每次新線程啟動前先 sleep 一會。 每當四個乘客完成上車操作,cyclicBarrier 就會觸發 “人滿了發車” 的操作。而最后兩位乘客上車后,由于沒有新的乘客上車,計數器不會被減到 0,導致無法越過屏障,所以永遠不會發車。 cyclicBarrier運行的示意圖如下: ![圖片描述](https://img.mukewang.com/5dd5fb100001250a16000799.jpg) 代碼運行輸出如下: ~~~ 第 1 乘客上車了! 第 2 乘客上車了! 第 3 乘客上車了! 第 4 乘客上車了! 人滿了發車 第 4 乘客出發了! 第 1 乘客出發了! 第 2 乘客出發了! 第 3 乘客出發了! 第 5 乘客上車了! 第 6 乘客上車了! 第 7 乘客上車了! 第 8 乘客上車了! 人滿了發車 第 8 乘客出發了! 第 5 乘客出發了! 第 7 乘客出發了! 第 6 乘客出發了! 第 9 乘客上車了! 第 10 乘客上車了! ~~~ 可以看到每上車 4 人,才會觸發發車,同時每個人的線程才會繼續 cyclicBarrier.await() 后面的代碼,輸出 “第 n 乘客出發了!” 這個例子也驗證了 CyclicBarrier 可以重復使用,每次滿 4 人上車,都會觸發發車。然后重新開始計數。 通過這個例子我們了解了 CyclicBarrier 的使用。在這里我們總結下 CyclicBarrier 涉及的幾個概念: 1、計數器。初始值為構造 CyclicBarrier 傳入的第一個參數,每當一個線程到達屏障點,計數器減1; 2、屏障點,線程中調用 cyclicBarrier.await() 后,該線程到達屏障點,等待 CyclicBarrier 打開,也就是計數器到 0 ; 3、沖出屏障后的任務。首先這個任務可選。不需要的話,在構造 CyclicBarrier 時只需要傳入計數器初始值即可。這個任務在計數器到 0時被觸發。 ## 2、CyclicBarrier 原理解析 ### 2.1、 CyclicBarrier 中的屬性 ~~~java /** CyclicBarrier使用的拍他鎖*/ private final ReentrantLock lock = new ReentrantLock(); /** barrier被沖破前,線程等待的condition*/ private final Condition trip = lock.newCondition(); /** barrier被沖破時,需要滿足的參與線程數。*/ private final int parties; /* barrier被沖破后執行的方法。*/ private final Runnable barrierCommand; /** 當其輪次 */ private Generation generation = new Generation(); /** *目前等待剩余的參與者數量。從 parties倒數到0。每個輪次該值會被重置回parties */ private int count; ~~~ 可以看到 CyclicBarrier 內部通過 ReentrantLock 來實現的,而 ReentrantLock 的底層實現還是 AQS。 parties 在構造函數中被賦值,它的值永遠不會變,因為 CyclicBarrier 會被重置復用。而每個輪次真正用來計數的變量是 count。每個輪次結束,count 會被重置為 parties 的值。 ### 2.2、 await() 方法解析 await 方法的調用,代表調用線程到達了屏障點,這個方法其實調用了 dowait 方法,我們直接分析 dowait 方法,它實現了 CyclicBarrier 的核心功能。 ~~~java /** * Main barrier code, covering the various policies. */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; //對共享資源count,generation操作前,需要先上鎖保證線程安全 lock.lock(); try { //拿到當前輪次對象的引用 final Generation g = generation; //如果已經broken,那么拋出異常 if (g.broken) throw new BrokenBarrierException(); //如果被打斷,通過breakBarrier方法設置當前輪次為broken狀態,通知當前輪次所有等待的線程線程 //并且拋出InterruptedException if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //count減1 int index = --count; //如果index為0,那么沖破屏障點 if (index == 0) { // tripped boolean ranAction = false; //沖破屏障點后,如果CyclicBarrier構造時傳入Runnable,則被調用。 try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; //這個方法中會進行重置,并且通知所有在屏障點阻塞的線程繼續執行。 nextGeneration(); return 0; } finally { //正常情況由于運行了command后ranAction被置為true,并不會執行如下邏輯 //在command執行期間出了異常才會進入下面的邏輯,認為當前輪次被破壞了 if (!ranAction) breakBarrier(); } } //開始自旋,直到屏障被沖破,或者interrupted或者超時 for (;;) { try { if (!timed) //阻塞,此時會釋放鎖,以讓其他線程進入await方法中。等待屏障被沖破后,向后執行 trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { //如果當前線程阻塞被interrupt了,并且本輪次還沒有被break,那么修改本輪次狀態為broken if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } //如果本輪次被破壞,那么拋出異常 if (g.broken) throw new BrokenBarrierException(); //如果已經成功進入下一輪次,那么返回index if (g != generation) return index; //如果已經超時,那么本輪次被打破 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { //釋放鎖 lock.unlock(); } } ~~~ 以上代碼分為兩大段邏輯,分別是自旋前,和自旋。 **A、自旋前的邏輯,核心邏輯如下:** 1. 計數器 -1; 2. 判斷是否計數器到 0; 3. 如果到了,則沖破屏障點,執行傳入的 Runnable; 4. 調用 nextGeneration() 來更新 Generation,重置計數器,并且通知本輪次等待的線程。 **B、如果計數器沒有到 0,則進入自旋的邏輯:** 1. 開始等待,此時會釋放鎖,以讓其它線程進入 lock 的代碼塊執行以上邏輯; 2. 當被喚醒時,可能因為當前 generation 被 break 了,或者計數器到 0,屏障被沖破; 3. 對比邊剛進入 dowait 方法時獲取的 generation 對象和最新 generation 是否一致。不一致說明已經換代了,也就是屏障被沖破,可以 return 了; 4. 如果等待超時或者 generation 被 break,分別拋出異常。 不同線程在 A 部分的邏輯會影響已經進入 B 部分邏輯的線程中止自旋。這些自旋的線程或者沖破屏障點,繼續向下執行,也可能拋出異常。 我們再看下用于更新輪次的方法 nextGeneration(): ~~~java private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); } ~~~ 三行代碼做了三件事: 1、通知所有被阻塞在本輪次屏障點的線程。屏障點被沖破,可以繼續向下執行了; 2、重置計數器為初始值; 3、更新輪次對象。這樣自旋中的線程才會跳出自旋。 ## 3、總結 CyclicBarrier 和 CountDownLatch 相比,更為靈活,可以被重復使用。前者可以用來分段任務,假如有個任務需要分三個階段來完成,每個階段可以多線程并發執行,但是進入下一個階段的時候,必須所有線程都完成了第一階段的執行。那么通過 CyclicBarrier,在每個線程的每個階段開始前都設置屏障點,可以很輕松地實現。 CyclicBarrier 的實現是通過 ReentrantLock 控制計數器的原子更新,通過條件變量來實現線程同步。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看