通過前面的學習,我們一起回顧了線程、鎖等各種并發編程的基本元素,也逐步涉及了 Java 并發包中的部分內容,相信經過前面的熱身,我們能夠更快地理解 Java 并發包。
今天我要問你的問題是,Java 并發包提供了哪些并發工具類?
## 典型回答
我們通常所說的并發包也就是 java.util.concurrent 及其子包,集中了 Java 并發的各種基礎工具類,具體主要包括幾個方面:
* 提供了比 synchronized 更加高級的各種同步結構,包括 CountDownLatch、CyclicBarrier、Semaphore 等,可以實現更加豐富的多線程操作,比如利用 Semaphore 作為資源控制器,限制同時進行工作的線程數量。
* 各種線程安全的容器,比如最常見的 ConcurrentHashMap、有序的 ConcunrrentSkipListMap,或者通過類似快照機制,實現線程安全的動態數組 CopyOnWriteArrayList 等。
* 各種并發隊列實現,如各種 BlockedQueue 實現,比較典型的 ArrayBlockingQueue、 SynchorousQueue 或針對特定場景的 PriorityBlockingQueue 等。
* 強大的 Executor 框架,可以創建各種不同類型的線程池,調度任務運行等,絕大部分情況下,不再需要自己從頭實現線程池和任務調度器。
## 考點分析
這個題目主要考察你對并發包了解程度,以及是否有實際使用經驗。我們進行多線程編程,無非是達到幾個目的:
* 利用多線程提高程序的擴展能力,以達到業務對吞吐量的要求。
* 協調線程間調度、交互,以完成業務邏輯。
* 線程間傳遞數據和狀態,這同樣是實現業務邏輯的需要。
所以,這道題目只能算作簡單的開始,往往面試官還會進一步考察如何利用并發包實現某個特定的用例,分析實現的優缺點等。
如果你在這方面的基礎比較薄弱,我的建議是:
* 從總體上,把握住幾個主要組成部分(前面回答中已經簡要介紹)。
* 理解具體設計、實現和能力。
* 再深入掌握一些比較典型工具類的適用場景、用法甚至是原理,并熟練寫出典型的代碼用例。
掌握這些通常就夠用了,畢竟并發包提供了方方面面的工具,其實很少有機會能在應用中全面使用過,扎實地掌握核心功能就非常不錯了。真正特別深入的經驗,還是得靠在實際場景中踩坑來獲得。
## 知識擴展
首先,我們來看看并發包提供的豐富同步結構。前面幾講已經分析過各種不同的顯式鎖,今天我將專注于
* [CountDownLatch](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/CountDownLatch.html),允許一個或多個線程等待某些操作完成。
* [CyclicBarrier](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/CyclicBarrier.html),一種輔助性的同步結構,允許多個線程等待到達某個屏障。
* [Semaphore](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Semaphore.html),Java 版本的信號量實現。
Java 提供了經典信號量([Semaphore](https://en.wikipedia.org/wiki/Semaphore_(programming)))的實現,它通過控制一定數量的允許(permit)的方式,來達到限制通用資源訪問的目的。你可以想象一下這個場景,在車站、機場等出租車時,當很多空出租車就位時,為防止過度擁擠,調度員指揮排隊等待坐車的隊伍一次進來 5 個人上車,等這 5 個人坐車出發,再放進去下一批,這和 Semaphore 的工作原理有些類似。
你可以試試使用 Semaphore 來模擬實現這個調度過程:
~~~
import java.util.concurrent.Semaphore;
public class UsualSemaphoreSample {
public static void main(String[] args) throws InterruptedException {
System.out.println("Action...GO!");
Semaphore semaphore = new Semaphore(5);
for (int i = 0; i < 10; i++) {
Thread t = new Thread(new SemaphoreWorker(semaphore));
t.start();
}
}
}
class SemaphoreWorker implements Runnable {
private String name;
private Semaphore semaphore;
public SemaphoreWorker(Semaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public void run() {
try {
log("is waiting for a permit!");
semaphore.acquire();
log("acquired a permit!");
log("executed!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log("released a permit!");
semaphore.release();
}
}
private void log(String msg){
if (name == null) {
name = Thread.currentThread().getName();
}
System.out.println(name + " " + msg);
}
}
~~~
這段代碼是比較典型的 Semaphore 示例,其邏輯是,線程試圖獲得工作允許,得到許可則進行任務,然后釋放許可,這時等待許可的其他線程,就可獲得許可進入工作狀態,直到全部處理結束。編譯運行,我們就能看到 Semaphore 的允許機制對工作線程的限制。
但是,從具體節奏來看,其實并不符合我們前面場景的需求,因為本例中 Semaphore 的用法實際是保證,一直有 5 個人可以試圖乘車,如果有 1 個人出發了,立即就有排隊的人獲得許可,而這并不完全符合我們前面的要求。
那么,我再修改一下,演示個非典型的 Semaphore 用法。
~~~
import java.util.concurrent.Semaphore;
public class AbnormalSemaphoreSample {
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(0);
for (int i = 0; i < 10; i++) {
Thread t = new Thread(new MyWorker(semaphore));
t.start();
}
System.out.println("Action...GO!");
semaphore.release(5);
System.out.println("Wait for permits off");
while (semaphore.availablePermits()!=0) {
Thread.sleep(100L);
}
System.out.println("Action...GO again!");
semaphore.release(5);
}
}
class MyWorker implements Runnable {
private Semaphore semaphore;
public MyWorker(Semaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("Executed!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
~~~
注意,上面的代碼,更側重的是演示 Semaphore 的功能以及局限性,其實有很多線程編程中的反實踐,比如使用了 sleep 來協調任務執行,而且使用輪詢調用 availalePermits 來檢測信號量獲取情況,這都是很低效并且脆弱的,通常只是用在測試或者診斷場景。
總的來說,我們可以看出 Semaphore 就是個**計數器**,**其基本邏輯基于 acquire/release**,并沒有太復雜的同步邏輯。
如果 Semaphore 的數值被初始化為 1,那么一個線程就可以通過 acquire 進入互斥狀態,本質上和互斥鎖是非常相似的。但是區別也非常明顯,比如互斥鎖是有持有者的,而對于 Semaphore 這種計數器結構,雖然有類似功能,但其實不存在真正意義的持有者,除非我們進行擴展包裝。
下面,來看看 CountDownLatch 和 CyclicBarrier,它們的行為有一定的相似度,經常會被考察二者有什么區別,我來簡單總結一下。
* CountDownLatch 是不可以重置的,所以無法重用;而 CyclicBarrier 則沒有這種限制,可以重用。
* CountDownLatch 的基本操作組合是 countDown/await。調用 await 的線程阻塞等待 countDown 足夠的次數,不管你是在一個線程還是多個線程里 countDown,只要次數足夠即可。所以就像 Brain Goetz 說過的,CountDownLatch 操作的是事件。
* CyclicBarrier 的基本操作組合,則就是 await,當所有的伙伴(parties)都調用了 await,才會繼續進行任務,并自動進行重置。**注意**,正常情況下,CyclicBarrier 的重置都是自動發生的,如果我們調用 reset 方法,但還有線程在等待,就會導致等待線程被打擾,拋出 BrokenBarrierException 異常。CyclicBarrier 側重點是線程,而不是調用事件,它的典型應用場景是用來等待并發線程結束。
如果用 CountDownLatch 去實現上面的排隊場景,該怎么做呢?假設有 10 個人排隊,我們將其分成 5 個人一批,通過 CountDownLatch 來協調批次,你可以試試下面的示例代碼。
~~~
import java.util.concurrent.CountDownLatch;
public class LatchSample {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(6);
for (int i = 0; i < 5; i++) {
Thread t = new Thread(new FirstBatchWorker(latch));
t.start();
}
for (int i = 0; i < 5; i++) {
Thread t = new Thread(new SecondBatchWorker(latch));
t.start();
}
// 注意這里也是演示目的的邏輯,并不是推薦的協調方式
while ( latch.getCount() != 1 ){
Thread.sleep(100L);
}
System.out.println("Wait for first batch finish");
latch.countDown();
}
}
class FirstBatchWorker implements Runnable {
private CountDownLatch latch;
public FirstBatchWorker(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
System.out.println("First batch executed!");
latch.countDown();
}
}
class SecondBatchWorker implements Runnable {
private CountDownLatch latch;
public SecondBatchWorker(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
latch.await();
System.out.println("Second batch executed!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
~~~
CountDownLatch 的調度方式相對簡單,后一批次的線程進行 await,等待前一批 countDown 足夠多次。這個例子也從側面體現出了它的局限性,雖然它也能夠支持 10 個人排隊的情況,但是因為不能重用,如果要支持更多人排隊,就不能依賴一個 CountDownLatch 進行了。其編譯運行輸出如下:

在實際應用中的條件依賴,往往沒有這么別扭,CountDownLatch 用于線程間等待操作結束是非常簡單普遍的用法。通過 countDown/await 組合進行通信是很高效的,通常不建議使用例子里那個循環等待方式。
如果用 CyclicBarrier 來表達這個場景呢?我們知道 CyclicBarrier 其實反映的是線程并行運行時的協調,在下面的示例里,從邏輯上,5 個工作線程其實更像是代表了 5 個可以就緒的空車,而不再是 5 個乘客,對比前面 CountDownLatch 的例子更有助于我們區別它們的抽象模型,請看下面的示例代碼:
~~~
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierSample {
public static void main(String[] args) throws InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("Action...GO again!");
}
});
for (int i = 0; i < 5; i++) {
Thread t = new Thread(new CyclicWorker(barrier));
t.start();
}
}
static class CyclicWorker implements Runnable {
private CyclicBarrier barrier;
public CyclicWorker(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
for (int i=0; i<3 ; i++){
System.out.println("Executed!");
barrier.await();
}
} catch (BrokenBarrierException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
~~~
為了讓輸出更能表達運行時序,我使用了 CyclicBarrier 特有的 barrierAction,當屏障被觸發時,Java 會自動調度該動作。因為 CyclicBarrier 會**自動**進行重置,所以這個邏輯其實可以非常自然的支持更多排隊人數。其編譯輸出如下:

Java 并發類庫還提供了[Phaser](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Phaser.html),功能與 CountDownLatch 很接近,但是它允許線程動態地注冊到 Phaser 上面,而 CountDownLatch 顯然是不能動態設置的。Phaser 的設計初衷是,實現多個線程類似步驟、階段場景的協調,線程注冊等待屏障條件觸發,進而協調彼此間行動,具體請參考這個[例子](http://www.baeldung.com/java-phaser)。
接下來,我來梳理下并發包里提供的線程安全 Map、List 和 Set。首先,請參考下面的類圖。

你可以看到,總體上種類和結構還是比較簡單的,如果我們的應用側重于 Map 放入或者獲取的速度,而不在乎順序,大多推薦使用 ConcurrentHashMap,反之則使用 ConcurrentSkipListMap;如果我們需要對大量數據進行非常頻繁地修改,ConcurrentSkipListMap 也可能表現出優勢。
我在前面的專欄,談到了普通無順序場景選擇 HashMap,有順序場景則可以選擇類似 TreeMap 等,但是為什么并發容器里面沒有 ConcurrentTreeMap 呢?
這是因為 TreeMap 要實現高效的線程安全是非常困難的,它的實現基于復雜的紅黑樹。為保證訪問效率,當我們插入或刪除節點時,會移動節點進行平衡操作,這導致在并發場景中難以進行合理粒度的同步。而 SkipList 結構則要相對簡單很多,通過層次結構提高訪問速度,雖然不夠緊湊,空間使用有一定提高(O(nlogn)),但是在增刪元素時線程安全的開銷要好很多。為了方便你理解 SkipList 的內部結構,我畫了一個示意圖。

關于兩個 CopyOnWrite 容器,其實 CopyOnWriteArraySet 是通過包裝了 CopyOnWriteArrayList 來實現的,所以在學習時,我們可以專注于理解一種。
首先,CopyOnWrite 到底是什么意思呢?它的原理是,任何修改操作,如 add、set、remove,都會拷貝原數組,修改后替換原來的數組,通過這種防御性的方式,實現另類的線程安全。請看下面的代碼片段,我進行注釋的地方,可以清晰地理解其邏輯。
~~~
public boolean add(E e) {
synchronized (lock) {
Object[] elements = getArray();
int len = elements.length;
// 拷貝
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
// 替換
setArray(newElements);
return true;
}
}
final void setArray(Object[] a) {
array = a;
}
~~~
所以這種數據結構,相對比較適合讀多寫少的操作,不然修改的開銷還是非常明顯的。
今天我對 Java 并發包進行了總結,并且結合實例分析了各種同步結構和部分線程安全容器,希望對你有所幫助。
## 一課一練
關于今天我們討論的題目你做到心中有數了嗎?留給你的思考題是,你使用過類似 CountDownLatch 的同步結構解決實際問題嗎?談談你的使用場景和心得。
- 前言
- 開篇詞
- 開篇詞 -以面試題為切入點,有效提升你的Java內功
- 模塊一 Java基礎
- 第1講 談談你對Java平臺的理解?
- 第2講 Exception和Error有什么區別?
- 第3講 談談final、finally、 finalize有什么不同?
- 第4講 強引用、軟引用、弱引用、幻象引用有什么區別?
- 第5講 String、StringBuffer、StringBuilder有什么區別?
- 第6講 動態代理是基于什么原理?
- 第7講 int和Integer有什么區別?
- 第8講 對比Vector、ArrayList、LinkedList有何區別?
- 第9講 對比Hashtable、HashMap、TreeMap有什么不同?
- 第10講 如何保證集合是線程安全的? ConcurrentHashMap如何實現高效地線程安全?
- 第11講 Java提供了哪些IO方式? NIO如何實現多路復用?
- 第12講 Java有幾種文件拷貝方式?哪一種最高效?
- 第13講 談談接口和抽象類有什么區別?
- 第14講 談談你知道的設計模式?
- 模塊二 Java進階
- 第15講 synchronized和ReentrantLock有什么區別呢?
- 第16講 synchronized底層如何實現?什么是鎖的升級、降級?
- 第17講 一個線程兩次調用start()方法會出現什么情況?
- 第18講 什么情況下Java程序會產生死鎖?如何定位、修復?
- 第19講 Java并發包提供了哪些并發工具類?
- 第20講 并發包中的ConcurrentLinkedQueue和LinkedBlockingQueue有什么區別?
- 第21講 Java并發類庫提供的線程池有哪幾種? 分別有什么特點?
- 第22講 AtomicInteger底層實現原理是什么?如何在自己的產品代碼中應用CAS操作?
- 第23講 請介紹類加載過程,什么是雙親委派模型?
- 第24講 有哪些方法可以在運行時動態生成一個Java類?
- 第25講 談談JVM內存區域的劃分,哪些區域可能發生OutOfMemoryError?
- 第26講 如何監控和診斷JVM堆內和堆外內存使用?
- 第27講 Java常見的垃圾收集器有哪些?
- 第28講 談談你的GC調優思路?
- 第29講 Java內存模型中的happen-before是什么?
- 第30講 Java程序運行在Docker等容器環境有哪些新問題?
- 模塊三 Java安全基礎
- 第31講 你了解Java應用開發中的注入攻擊嗎?
- 第32講 如何寫出安全的Java代碼?
- 模塊四 Java性能基礎
- 第33講 后臺服務出現明顯“變慢”,談談你的診斷思路?
- 第34講 有人說“Lambda能讓Java程序慢30倍”,你怎么看?
- 第35講 JVM優化Java代碼時都做了什么?
- 模塊五 Java應用開發擴展
- 第36講 談談MySQL支持的事務隔離級別,以及悲觀鎖和樂觀鎖的原理和應用場景?
- 第37講 談談Spring Bean的生命周期和作用域?
- 第38講 對比Java標準NIO類庫,你知道Netty是如何實現更高性能的嗎?
- 第39講 談談常用的分布式ID的設計方案?Snowflake是否受冬令時切換影響?
- 周末福利
- 周末福利 談談我對Java學習和面試的看法
- 周末福利 一份Java工程師必讀書單
- 結束語
- 結束語 技術沒有終點
- 結課測試 Java核心技術的這些知識,你真的掌握了嗎?