<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                [TOC] ## J.U.C - AQS AQS 是 AbstractQueuedSynchronizer 的簡稱,java.util.concurrent(J.U.C)大大提高了并發性能,AQS (AbstractQueuedSynchronizer) 被認為是 J.U.C 的核心。它提供了一個基于 FIFO 隊列,這個隊列可以用來構建鎖或者其他相關的同步裝置的基礎框架。下圖是 AQS 底層的數據結構: [![](https://github.com/frank-lam/fullstack-tutorial/raw/master/notes/JavaArchitecture/assets/616953-20160403170136176-573839888.png)](https://github.com/frank-lam/fullstack-tutorial/blob/master/notes/JavaArchitecture/assets/616953-20160403170136176-573839888.png) 它底層使用的是雙向列表,是隊列的一種實現 , 因此也可以將它當成一種隊列。 * Sync queue 是同步列表,它是雙向列表 , 包括 head,tail 節點。其中 head 節點主要用來后續的調度 ; * Condition queue 是單向鏈表 , 不是必須的 , 只有當程序中需要 Condition 的時候,才會存在這個單向鏈表 , 并且可能會有多個 Condition queue。 簡單的來說: * AQS其實就是一個可以給我們實現鎖的**框架** * 內部實現的關鍵是:**先進先出的隊列、state 狀態** * 定義了內部類 ConditionObject * 擁有兩種線程模式 * * 獨占模式 * 共享模式 * 在 LOCK 包中的相關鎖(常用的有 ReentrantLock、 ReadWriteLock )都是基于 AQS 來構建 * 一般我們叫 AQS 為同步器。 ### CountdownLatch CountDownLatch 類位于 java.util.concurrent 包下,利用它可以實現類似計數器的功能。比如有一個任務 A,它要等待其他 4 個任務執行完畢之后才能執行,此時就可以利用 CountDownLatch 來實現這種功能了。 維護了一個計數器 cnt,每次調用 countDown() 方法會讓計數器的值減 1,減到 0 的時候,那些因為調用 await() 方法而在等待的線程就會被喚醒。 [![](https://github.com/frank-lam/fullstack-tutorial/raw/master/notes/JavaArchitecture/assets/CountdownLatch.png)](https://github.com/frank-lam/fullstack-tutorial/blob/master/notes/JavaArchitecture/assets/CountdownLatch.png) CountDownLatch 類只提供了一個構造器: ~~~java public CountDownLatch(int count) { }; // 參數count為計數值 ~~~ 然后下面這 3 個方法是 CountDownLatch 類中最重要的方法: ~~~java //調用await()方法的線程會被掛起,它會等待直到count值為0才繼續執行 public void await() throws InterruptedException { }; //和await()類似,只不過等待一定的時間后count值還沒變為0的話就會繼續執行 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; //將count值減1 public void countDown() { }; ~~~ 下面看一個例子大家就清楚 CountDownLatch 的用法了: ~~~java public class Test { public static void main(String[] args) { final CountDownLatch latch = new CountDownLatch(2); new Thread() { public void run() { try { System.out.println("子線程" + Thread.currentThread().getName() + "正在執行"); Thread.sleep(3000); System.out.println("子線程" + Thread.currentThread().getName() + "執行完畢"); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } ; }.start(); new Thread() { public void run() { try { System.out.println("子線程" + Thread.currentThread().getName() + "正在執行"); Thread.sleep(3000); System.out.println("子線程" + Thread.currentThread().getName() + "執行完畢"); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } ; }.start(); try { System.out.println("等待2個子線程執行完畢..."); latch.await(); System.out.println("2個子線程已經執行完畢"); System.out.println("繼續執行主線程"); } catch (InterruptedException e) { e.printStackTrace(); } } } ~~~ 執行結果: ~~~ 線程Thread-0正在執行 線程Thread-1正在執行 等待2個子線程執行完畢... 線程Thread-0執行完畢 線程Thread-1執行完畢 2個子線程已經執行完畢 繼續執行主線程 ~~~ ### CyclicBarrier 用來控制多個線程互相等待,只有當多個線程都到達時,這些線程才會繼續執行。 和 CountdownLatch 相似,都是通過維護計數器來實現的。但是它的計數器是遞增的,每次執行 await() 方法之后,計數器會加 1,直到計數器的值和設置的值相等,等待的所有線程才會繼續執行。和 CountdownLatch 的另一個區別是,CyclicBarrier 的計數器可以循環使用,所以它才叫做循環屏障。 下圖應該從下往上看才正確。 [![](https://github.com/frank-lam/fullstack-tutorial/raw/master/notes/JavaArchitecture/assets/CyclicBarrier.png)](https://github.com/frank-lam/fullstack-tutorial/blob/master/notes/JavaArchitecture/assets/CyclicBarrier.png) ~~~java public class CyclicBarrierExample { public static void main(String[] args) throws InterruptedException { final int totalThread = 10; CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < totalThread; i++) { executorService.execute(() -> { System.out.print("before.."); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.print("after.."); }); } executorService.shutdown(); } } ~~~ ~~~ before..before..before..before..before..before..before..before..before..before..after..after..after..after..after..after..after..after..after..after.. ~~~ ### Semaphore Semaphore 就是操作系統中的信號量,可以控制對互斥資源的訪問線程數。Semaphore 可以控同時訪問的線程個數,通過 acquire() 獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。 [![](https://github.com/frank-lam/fullstack-tutorial/raw/master/notes/JavaArchitecture/assets/Semaphore.png)](https://github.com/frank-lam/fullstack-tutorial/blob/master/notes/JavaArchitecture/assets/Semaphore.png) Semaphore 類位于 java.util.concurrent 包下,它提供了2個構造器: ~~~java public Semaphore(int permits) { //參數permits表示許可數目,即同時可以允許多少線程進行訪問 sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { //這個多了一個參數fair表示是否是公平的,即等待時間越久的越先獲取許可 sync = (fair) ? new FairSync(permits) : new NonfairSync(permits); } ~~~ 下面說一下 Semaphore 類中比較重要的幾個方法,首先是 acquire()、release() 方法: ~~~java //獲取一個許可 public void acquire() throws InterruptedException { } //獲取permits個許可 public void acquire(int permits) throws InterruptedException { } //釋放一個許可 public void release() { } //釋放permits個許可 public void release(int permits) { } ~~~   acquire() 用來獲取一個許可,若無許可能夠獲得,則會一直等待,直到獲得許可。   release() 用來釋放許可。注意,在釋放許可之前,必須先獲獲得許可。 這 4 個方法都會被阻塞,如果想立即得到執行結果,可以使用下面幾個方法: ~~~java //嘗試獲取一個許可,若獲取成功,則立即返回true,若獲取失敗,則立即返回false public boolean tryAcquire() { }; //嘗試獲取一個許可,若在指定的時間內獲取成功,則立即返回true,否則則立即返回false public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { }; //嘗試獲取permits個許可,若獲取成功,則立即返回true,若獲取失敗,則立即返回false public boolean tryAcquire(int permits) { }; //嘗試獲取permits個許可,若在指定的時間內獲取成功,則立即返回true,否則則立即返回false public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; ~~~   另外還可以通過 availablePermits() 方法得到可用的許可數目。   下面通過一個例子來看一下 Semaphore 的具體使用:   假若一個工廠有 5 臺機器,但是有 8 個工人,一臺機器同時只能被一個工人使用,只有使用完了,其他工人才能繼續使用。那么我們就可以通過 Semaphore 來實現: ~~~java public class Test { public static void main(String[] args) { int N = 8; //工人數 Semaphore semaphore = new Semaphore(5); //機器數目 for (int i = 0; i < N; i++) new Worker(i, semaphore).start(); } static class Worker extends Thread { private int num; private Semaphore semaphore; public Worker(int num, Semaphore semaphore) { this.num = num; this.semaphore = semaphore; } @Override public void run() { try { semaphore.acquire(); System.out.println("工人" + this.num + "占用一個機器在生產..."); Thread.sleep(2000); System.out.println("工人" + this.num + "釋放出機器"); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } } ~~~ 執行結果: ~~~ 工人0占用一個機器在生產... 工人1占用一個機器在生產... 工人2占用一個機器在生產... 工人4占用一個機器在生產... 工人5占用一個機器在生產... 工人0釋放出機器 工人2釋放出機器 工人3占用一個機器在生產... 工人7占用一個機器在生產... 工人4釋放出機器 工人5釋放出機器 工人1釋放出機器 工人6占用一個機器在生產... 工人3釋放出機器 工人7釋放出機器 工人6釋放出機器 ~~~ ### 總結 下面對上面說的三個輔助類進行一個總結: * CountDownLatch 和 CyclicBarrier 都能夠實現線程之間的等待,只不過它們側重點不同: * CountDownLatch 一般用于某個線程A等待若干個其他線程執行完任務之后,它才執行; * CyclicBarrier 一般用于一組線程互相等待至某個狀態,然后這一組線程再同時執行; * 另外,CountDownLatch 是不能夠重用的,而 CyclicBarrier 是可以重用的。 * Semaphore 其實和鎖有點類似,它一般用于控制對某組資源的訪問權限。 ## J.U.C - 其它組件 ### FutureTask 在介紹 Callable 時我們知道它可以有返回值,返回值通過 Future 進行封裝。FutureTask 實現了 RunnableFuture 接口,該接口繼承自 Runnable 和 Future 接口,這使得 FutureTask 既可以當做一個任務執行,也可以有返回值。 ~~~java public class FutureTask<V> implements RunnableFuture<V> ~~~ ~~~java public interface RunnableFuture<V> extends Runnable, Future<V> ~~~ FutureTask 可用于異步獲取執行結果或取消執行任務的場景。當一個計算任務需要執行很長時間,那么就可以用 FutureTask 來封裝這個任務,主線程在完成自己的任務之后再去獲取結果。 ~~~java public class FutureTaskExample { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() { @Override public Integer call() throws Exception { int result = 0; for (int i = 0; i < 100; i++) { Thread.sleep(10); result += i; } return result; } }); Thread computeThread = new Thread(futureTask); computeThread.start(); Thread otherThread = new Thread(() -> { System.out.println("other task is running..."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); otherThread.start(); System.out.println(futureTask.get()); } } ~~~ ~~~java other task is running... 4950 ~~~ ### BlockingQueue java.util.concurrent.BlockingQueue 接口有以下阻塞隊列的實現: * **FIFO 隊列**:LinkedBlockingQueue、ArrayBlockingQueue(固定長度) * **優先級隊列**:PriorityBlockingQueue 提供了阻塞的 take() 和 put() 方法:如果隊列為空 take() 將阻塞,直到隊列中有內容;如果隊列為滿 put() 將阻塞,直到隊列有空閑位置。 **使用 BlockingQueue 實現生產者消費者問題** ~~~java public class ProducerConsumer { private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5); private static class Producer extends Thread { @Override public void run() { try { queue.put("product"); } catch (InterruptedException e) { e.printStackTrace(); } System.out.print("produce.."); } } private static class Consumer extends Thread { @Override public void run() { try { String product = queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.print("consume.."); } } } ~~~ ~~~java public static void main(String[] args) { for (int i = 0; i < 2; i++) { Producer producer = new Producer(); producer.start(); } for (int i = 0; i < 5; i++) { Consumer consumer = new Consumer(); consumer.start(); } for (int i = 0; i < 3; i++) { Producer producer = new Producer(); producer.start(); } } ~~~ ~~~ produce..produce..consume..consume..produce..consume..produce..consume..produce..consume.. ~~~ ### ForkJoin 主要用于并行計算中,和 MapReduce 原理類似,都是把大的計算任務拆分成多個小任務并行計算。 ~~~java public class ForkJoinExample extends RecursiveTask<Integer> { private final int threshold = 5; private int first; private int last; public ForkJoinExample(int first, int last) { this.first = first; this.last = last; } @Override protected Integer compute() { int result = 0; if (last - first <= threshold) { // 任務足夠小則直接計算 for (int i = first; i <= last; i++) { result += i; } } else { // 拆分成小任務 int middle = first + (last - first) / 2; ForkJoinExample leftTask = new ForkJoinExample(first, middle); ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last); leftTask.fork(); rightTask.fork(); result = leftTask.join() + rightTask.join(); } return result; } } ~~~ ~~~js public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinExample example = new ForkJoinExample(1, 10000); ForkJoinPool forkJoinPool = new ForkJoinPool(); Future result = forkJoinPool.submit(example); System.out.println(result.get()); } ~~~ ForkJoin 使用 ForkJoinPool 來啟動,它是一個特殊的線程池,線程數量取決于 CPU 核數。 ~~~java public class ForkJoinPool extends AbstractExecutorService ~~~ ForkJoinPool 實現了工作竊取算法來提高 CPU 的利用率。每個線程都維護了一個雙端隊列,用來存儲需要執行的任務。工作竊取算法允許空閑的線程從其它線程的雙端隊列中竊取一個任務來執行。竊取的任務必須是最晚的任務,避免和隊列所屬線程發生競爭。例如下圖中,Thread2 從 Thread1 的隊列中拿出最晚的 Task1 任務,Thread1 會拿出 Task2 來執行,這樣就避免發生競爭。但是如果隊列中只有一個任務時還是會發生競爭。 [![](https://github.com/frank-lam/fullstack-tutorial/raw/master/notes/JavaArchitecture/assets/fork-and-join.jpg)](https://github.com/frank-lam/fullstack-tutorial/blob/master/notes/JavaArchitecture/assets/fork-and-join.jpg)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看