## 30 限量供應,不好意思您來晚了—Semaphore詳解
> 耐心和恒心總會得到報酬的。
> ——愛因斯坦
前幾節我們學習了幾種多線程線程同步工具,有一次性使用的倒數計數的 CountDownLatch,有循環使用的 CyclicBarrier,還有可以做數據交換的 Exchanger。今天我們再講解一種同步工具 Semaphore。
## 1、Semaphore 簡介
Semaphore 是信號量的意思,通過信號量可以對同一資源訪問做數量的限制。我們回憶一下無論是 Synchronized 還是 ReentrantLock 都是限制每次只有一個線程并發訪問資源。而信號量可以控制更多數量的線程訪問資源,但是不能超過信號量的準入數。
這就像停車場,如果停車位資源不緊張,車可以隨便進。但是當停車場停滿了車,那么不好意思,您來晚了。你只能在入口等待。出去幾輛,才能放幾輛進來。這個例子中,停車場就是共享資源,停車位的數量就是信號量準入數。而每輛車就是一個線程。停車場控制系統就是今天要學習的 Semaphore。

下面我們看看如何用代碼實現以上的例子。
## 2、如何使用 Semaphore
下面的代碼模擬 10 個車位的停車場,今天不知道附近有什么活動,突然過來了 500 輛車要停入停車場。這樣必然會造成排隊,前面的車出去一輛后面的車才能進來一輛。代碼如下:
~~~java
public class Client {
public static void main(String[] args) {
//用于生成隨機停車時長
Random random = new Random();
//用Semaphore模擬有10個停車位的停車場管理系統
final Semaphore parkingSystem = new Semaphore(10);
//模擬500輛汽車來停車
IntStream.range(0,500).forEach(i->{
new Thread(()->{
//取得到達停車場的時間
Long startWaitTime = System.currentTimeMillis();
System.out.println("第"+(i+1)+"輛汽車來到車庫");
//等待停車場系統控制抬桿。如果還有空位,立即抬桿,否則一直等到有空位才抬桿
try {
//acquire方法用于獲取資源,這里模擬發出抬桿放行的請求
parkingSystem.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
//已經抬趕,計算等待時長
Long waitingTime = (System.currentTimeMillis() - startWaitTime)/1000;
System.out.println("第"+(i+1)+"輛汽車等待"+waitingTime+"毫秒后進入車庫");
//通過sleep模擬停車時長
int parkingTime = random.nextInt(10)+2;
try {
TimeUnit.SECONDS.sleep(parkingTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
//release方法用于釋放資源,模擬駛出停車場
parkingSystem.release();
System.out.println("第"+(i+1)+"輛汽車停車"+parkingTime+"毫秒離開車庫");
}).start();
});
}
}
~~~
輸出比較多,我們先看開始的輸出:
~~~
第1輛汽車來到車庫
第3輛汽車來到車庫
第2輛汽車來到車庫
第3輛汽車等待0毫秒后進入車庫
第5輛汽車來到車庫
第4輛汽車來到車庫
第1輛汽車等待0毫秒后進入車庫
第6輛汽車來到車庫
第4輛汽車等待0毫秒后進入車庫
第5輛汽車等待0毫秒后進入車庫
第2輛汽車等待0毫秒后進入車庫
第7輛汽車來到車庫
第6輛汽車等待0毫秒后進入車庫
第9輛汽車來到車庫
第9輛汽車等待0毫秒后進入車庫
第7輛汽車等待0毫秒后進入車庫
第8輛汽車來到車庫
第8輛汽車等待0毫秒后進入車庫
第10輛汽車來到車庫
第10輛汽車等待0毫秒后進入車庫
第11輛汽車來到車庫
第12輛汽車來到車庫
第13輛汽車來到車庫
第14輛汽車來到車庫
......
~~~
可以看到前 10 輛車進入車庫都是不需要等待的,從第 11 輛車開始已經無法進入車庫了。我們繼續看后買面的輸出:
~~~
......
第495輛汽車來到車庫
第496輛汽車來到車庫
第497輛汽車來到車庫
第498輛汽車來到車庫
第499輛汽車來到車庫
第500輛汽車來到車庫
......
~~~
由于汽車線程啟動沒有間隔,也就意味著 500 輛車瞬間擠壓到停車場門口,等待入場。繼續看下面的輸出:
~~~
第3輛汽車停車4毫秒離開車庫
第11輛汽車等待4毫秒后進入車庫
第6輛汽車停車5毫秒離開車庫
第12輛汽車等待5毫秒后進入車庫
第5輛汽車停車7毫秒離開車庫
第2輛汽車停車7毫秒離開車庫
第13輛汽車等待7毫秒后進入車庫
第14輛汽車等待7毫秒后進入車庫
第10輛汽車停車8毫秒離開車庫
第15輛汽車等待8毫秒后進入車庫
第1輛汽車停車9毫秒離開車庫
第4輛汽車停車9毫秒離開車庫
~~~
可以看到第一批進入車庫的汽車,逐步離開車庫。后面排隊的車陸續進來。另外也可以觀察到,離開汽車的停車時長和進入汽車的等待時長是一致,這也證明了只有走了一輛,才能進入一輛。
不過由于多線程輸出日志,所以順序上并不一定是一輛離開,一輛進入。但實際運行情況確實是走了一輛才放入一輛。
Semaphore 可以選擇競爭策略是否公平。構造 Semaphore 時可以傳入第二個參數,如下面代碼所示:
~~~java
final Semaphore parkingSystem = new Semaphore(10,true);
~~~
如果構造時傳入第二個參數為 true,那么就是公平的,不傳默認也是公平的。這一點通過以上例子的輸出也有所體現。
## 3、Semaphore 源碼分析
我們先看 Semaphore 的構造方法:
~~~java
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
~~~
根據傳入 fair 的不同,選擇 sync 對象是公平還是不公平。FairSync 和 NonfairSync 都是 Semaphore 內部靜態類,繼承自 AQS。Semaphore 也是借助 AQS 來實現的。
我們再看 acquire 方法代碼:
~~~java
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
~~~
調用了 AQS 中的 acquireSharedInterruptibly 方法。繼續看此方法代碼:
~~~java
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
~~~
核心是先調用 tryAcquireShared,嘗試獲取,如果獲取失敗則調用 doAcquireSharedInterruptibly,自旋進入等待隊列,如果排到自己,那么再次嘗試調用 tryAcquireShared。這個方法之前詳細分析過,這里就不再展開來講。
接下來我們看看嘗試獲取資源的方法 tryAcquireShared,它的實現在 Semaphore 內部靜態類 Sync 中,如下:
~~~java
protected int tryAcquireShared(int acquires) {
for (;;) {
//看是否有更早等待的線程,如果有,獲取失敗
if (hasQueuedPredecessors())
return -1;
//查詢剩余的信號量準入數量
int available = getState();
//查詢剩余的信號量準入數量,看是否滿足想要獲取的數量
int remaining = available - acquires;
//剩余的數量>0則會通過CAS的方式刷新剩余信號量。并且返回剩余信號量。
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
~~~
下面我們再來看一下 release 的源代碼:
~~~java
public void release() {
sync.releaseShared(1);
}
~~~
可以看到每次釋放數量為 1。另外還有可以傳入 release 資源數量的重載方法。
releaseShared 代碼如下:
~~~java
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
~~~
調用 tryReleaseShared 方法進行資源釋放,然后調用 doReleaseShared 來發送信號通知下一個節點來獲取資源。tryReleaseShared 的實現也在 Semaphore 內部靜態類 Sync 中,如下:
~~~java
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
~~~
獲取當前剩余信號量計數,然后把釋放的資源數量加回來。最后通過 CAS 方式刷新信號量的計數。
## 4、總結
信號量用來控制共享資源的訪問數量。所以很適合控制有 “池” 概念的資源訪問。因為池的意思就是池內有有限數量的資源可以使用。如果在池這個層面抽象為一個資源來對待,那么使用 Semaphore 來做控制就非常合適。
- 前言
- 第1章 Java并發簡介
- 01 開篇詞:多線程為什么是你必需要掌握的知識
- 02 絕對不僅僅是為了面試—我們為什么需要學習多線程
- 03 多線程開發如此簡單—Java中如何編寫多線程程序
- 04 人多力量未必大—并發可能會遇到的問題
- 第2章 Java中如何編寫多線程
- 05 看若兄弟,實如父子—Thread和Runnable詳解
- 06 線程什么時候開始真正執行?—線程的狀態詳解
- 07 深入Thread類—線程API精講
- 08 集體協作,什么最重要?溝通!—線程的等待和通知
- 09 使用多線程實現分工、解耦、緩沖—生產者、消費者實戰
- 第3章 并發的問題和原因詳解
- 10 有福同享,有難同當—原子性
- 11 眼見不實—可見性
- 12 什么?還有這種操作!—有序性
- 13 問題的根源—Java內存模型簡介
- 14 僵持不下—死鎖詳解
- 第4章 如何解決并發問題
- 15 原子性輕量級實現—深入理解Atomic與CAS
- 16 讓你眼見為實—volatile詳解
- 17 資源有限,請排隊等候—Synchronized使用、原理及缺陷
- 18 線程作用域內共享變量—深入解析ThreadLocal
- 第5章 線程池
- 19 自己動手豐衣足食—簡單線程池實現
- 20 其實不用造輪子—Executor框架詳解
- 第6章 主要并發工具類
- 21 更高級的鎖—深入解析Lock
- 22 到底哪把鎖更適合你?—synchronized與ReentrantLock對比
- 23 按需上鎖—ReadWriteLock詳解
- 24 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap上
- 25 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap下
- 26不讓我進門,我就在門口一直等!—BlockingQueue和ArrayBlockingQueue
- 27 倒數計時開始,三、二、一—CountDownLatch詳解
- 28 人齊了,一起行動—CyclicBarrier詳解
- 29 一手交錢,一手交貨—Exchanger詳解
- 30 限量供應,不好意思您來晚了—Semaphore詳解
- 第7章 高級并發工具類及并發設計模式
- 31 憑票取餐—Future模式詳解
- 32 請按到場順序發言—Completion Service詳解
- 33 分階段執行你的任務-學習使用Phaser運行多階段任務
- 34 誰都不能偷懶-通過 CompletableFuture 組裝你的異步計算單元
- 35 拆分你的任務—學習使用Fork/Join框架
- 36 為多線程們安排一位經理—Master/Slave模式詳解
- 第8章 總結
- 37 結束語