## 29 一手交錢,一手交貨—Exchanger詳解
> 世上無難事,只要肯登攀
> ——毛澤東
Exchanger 從字面來看是一個交換器,它用來在線程間交換數據。如果兩個線程并行處理,但在某個時刻需要互相交換自己已經處理完的中間數據,然后才能繼續往下執行。這個時候就可以使用 Exchanger。
兩個線程不用謀面,但是通過 Exchanger 就能實現數據交換,這是如何做到的呢?我們可以認為 Exchanger 有兩個槽位,線程可以向其中放入自己想要交換的數據,然后阻塞在此,等待其它線程填充另外的槽位。等兩個槽位都填充數據后,Exchanger 會把槽位調換位置,讓線程取到另外線程放入的數據。如下圖所示:

前面的描述只是為了幫助理解,其實Exchanger 真實的設計更為復雜。下面我們先通過一個簡單的例子來看看如何使用 Exchanger。
## 1、Exchanger 的使用
我們設想如下場景:如果今天上午你需要給你女朋友快遞一部手機,但是突然你要去一個緊急的會議。于是你把快遞的東西留給你的同事。當快遞取件員上門時,他會把快遞底單給你同事,你同事則會把快遞給取件員。然后你的同事再把快遞底單給你。
這個場景里,你和快遞員就是兩個不同的線程。而你的同事是 Exchanger,負責你和快遞員間進行物品交換。我們看看代碼應該如何編寫:
~~~java
public class ExchangerClient {
public static void main(String[] args) throws InterruptedException {
Exchanger<String> workmate = new Exchanger<>();
Thread michael = new Thread(() -> {
String threadName = Thread.currentThread().getName();
try {
System.out.println(threadName+": I'm Michael and want to delivery a phone to my friend");
System.out.println(threadName+": There is an emergency meeting");
System.out.println(threadName+": I give the phone to my workmate.");
System.out.println(threadName+": He will help me to delivery the phone and return the express document to me");
String expressDocument = workmate.exchange("---phone---");
System.out.println(threadName+": I got the express document"+ expressDocument);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Michael");
Thread deliveryman = new Thread(() -> {
String threadName = Thread.currentThread().getName();
try {
System.out.println(threadName+": I'm a deliveryman");
System.out.println(threadName+": I'm going to get Michael's express from his workmate and give the express document to his workmate");
String expressGoods = workmate.exchange("---phone express document----");
System.out.println(threadName+": I got the goods of "+expressGoods);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Deliveryman");
michael.start();
TimeUnit.MILLISECONDS.sleep(100);
deliveryman.start();
}
}
~~~
Michael 通過 Exchanger 的實例 workmate,把 phone 交給了快遞員。而快遞員也通過 workmate 把快遞單據給了 Michael。我們看一下輸出:
~~~
Michael: I'm Michael and want to delivery a phone to my friend
Michael: There is an emergency meeting
Michael: I give the phone to my workmate.
Michael: He will help me to delivery and return the express document to me
Deliveryman: I'm a deliveryman
Deliveryman: I'm going to get Michael's express from his workmate and give the express document to his workmate
Deliveryman: I got the goods of ---phone---
Michael: I got the express document---phone express document----
~~~
可以看出 Michael 在調用 workmate.exchange("—phone—") 后被阻塞。而當 deliveryman 調用 workmate.exchange("—phone express document——") 后,兩個線程才繼續往下走,并且進行了數據交換。 Michael 發出了貨物 “—phone—”,得到了 “—phone express document----”。而 deliveryman 則得到了 “—phone—”,發出了 “—phone express document——”。
兩個線程通過 Exchanger 實現了數據的交換。
## 2、多于兩個線程使用 Exchanger
Exchanger 支持兩個線程數據互換,那么你有沒有想過多于兩個線程使用 Exchanger 做數據交換會怎么樣呢?我們把上買呢例子改一下,假如另外一個同事 Green 也想通過 workmate 協助發送快遞,那么會怎么樣?
~~~java
public static void main(String[] args) throws InterruptedException {
Exchanger<String> workmate = new Exchanger<>();
Thread michael = new Thread(() -> {
String threadName = Thread.currentThread().getName();
try {
System.out.println(threadName+": I'm Michael and want to delivery a phone to my friend");
System.out.println(threadName+": There is an emergency meeting");
System.out.println(threadName+": I give the phone to my workmate.");
System.out.println(threadName+": He will help me to delivery the phone and return the express document to me");
String expressDocument = workmate.exchange("---phone---");
System.out.println(threadName+": I got the express document"+ expressDocument);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Michael");
Thread green = new Thread(() -> {
String threadName = Thread.currentThread().getName();
try {
System.out.println(threadName+": I'm green and want to delivery a macbook to my friend");
System.out.println(threadName+": There is an emergency meeting");
System.out.println(threadName+": I give the macbook to my workmate.");
System.out.println(threadName+": He will help me to delivery the macbook and return the express document to me");
String expressDocument = workmate.exchange("---macbook---");
System.out.println(threadName+": I got the express document"+ expressDocument);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Green");
Thread deliveryman = new Thread(() -> {
String threadName = Thread.currentThread().getName();
try {
System.out.println(threadName+": I'm a deliveryman");
System.out.println(threadName+": I'm going to get Michael's express from his workmate and give the express document to his workmate");
String expressGoods = workmate.exchange("---phone express document----");
System.out.println(threadName+": I got the goods of "+expressGoods);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Deliveryman");
michael.start();
TimeUnit.MILLISECONDS.sleep(100);
green.start();
TimeUnit.MILLISECONDS.sleep(100);
deliveryman.start();
}
~~~
我們加入 Green 線程,并且在 Michael 線程啟動后,先啟動 Green 線程。輸出如下:
~~~
Michael: I'm Michael and want to delivery a phone to my friend
Michael: There is an emergency meeting
Michael: I give the phone to my workmate.
Michael: He will help me to delivery the phone and return the express document to me
Green: I'm green and want to delivery a macbook to my friend
Green: There is an emergency meeting
Green: I give the macbook to my workmate.
Green: He will help me to delivery the macbook and return the express document to me
Green: I got the express document---phone---
Michael: I got the express document---macbook---
Deliveryman: I'm a deliveryman
Deliveryman: I'm going to get Michael's express from his workmate and give the express document to his workmate
~~~
可見 Michael 和 Green 進行了數據交換,雙方都沒有和 Deliveryman 做數據交換。結果是雙方的快遞都沒送到快遞員手中,而快遞員由于沒有收到快遞,所以一直被阻塞住了。
根據以上實驗結果我們總結一下:
1、Exchanger 支持多個線程做數據交換;
2、多個線程使用同一個 Exchanger 做數據交換時,結果隨機,只要湊滿一對,就會進行交換。
## 3、Exchanger實現分析
Exchanger 雖然使用起來很簡單,但是其源碼還是比較復雜的。下面我們來分析它實現的原理。
Exchanger 類中的注解很詳細地闡述了其實現原理,并且用一段偽代碼描述了它的原理:
~~~java
for (;;) {
if (slot is empty) { // offer
place item in a Node;
if (can CAS slot from empty to node) {
wait for release;
return matching item in node;
}
}
else if (can CAS slot from node to empty) { // release
get the item in node;
set matching item in node;
release waiting thread;
}
// else retry on CAS failure
}
~~~
可以看到 Exhcanger 使用 slot 作為容器,保存數據 item。當 A 線程進入時,如果發現 slot 是空的,則把 item 放入 node,然后放入 slot。A 線程此時開始阻塞。B 線程進入后發現 slot 不為空,則從 slot 中取出node,把 slot 置空。此線程從 node 中取出數據,并且把自己的數據放入 node 中,然后通知阻塞的線程恢復,阻塞線程恢復后從 node 中取出數據。從而實現了數據的交換。
Java5 的時候,采用的就是這種算法。一般來說沒有什么問題。但是當大量并發時,會存在激烈的競爭。于是在Java6 開始,加入 slot 數組,讓不同線程使用不同的 slot,提升并發的性能。不過在沒有并發的時候還是使用一個 slot 來做交換。
我們看下核心的 exchange 方法代碼:
~~~java
public V exchange(V x) throws InterruptedException {
Object v;
Object item = (x == null) ? NULL_ITEM : x; // translate null args
if ((arena != null ||
(v = slotExchange(item, false, 0L)) == null) &&
((Thread.interrupted() || // disambiguates null return
(v = arenaExchange(item, false, 0L)) == null)))
throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V)v;
}
~~~
arena 為 slot 數組。可以看到 arena 為 null 時,還是使用的 slot 交換算法 slotExchange。如果 arena 不為 null進入 if 的 && 的第二個條件判斷。在判斷中執行了 arenaExchange,此時通過 arena 也就是 slot 數組完成交換。
具體的算法不再詳述,和上面偽代碼的思想是一致的。
## 4、總結
Exchanger 在我們實際開發中使用不多,但是我們也應該了解其用法和原理,以便在合適場景出現時,我們能識別出應該采用 Exchanger。在 Exchanger 類的注解中,可以看到它適用于基因算法。因為不同基因的搭配測試就應該是隨機進行交換的。此外還適用于管道的設計(pipeline design),讓數據在管道中做交換。
}
- 前言
- 第1章 Java并發簡介
- 01 開篇詞:多線程為什么是你必需要掌握的知識
- 02 絕對不僅僅是為了面試—我們為什么需要學習多線程
- 03 多線程開發如此簡單—Java中如何編寫多線程程序
- 04 人多力量未必大—并發可能會遇到的問題
- 第2章 Java中如何編寫多線程
- 05 看若兄弟,實如父子—Thread和Runnable詳解
- 06 線程什么時候開始真正執行?—線程的狀態詳解
- 07 深入Thread類—線程API精講
- 08 集體協作,什么最重要?溝通!—線程的等待和通知
- 09 使用多線程實現分工、解耦、緩沖—生產者、消費者實戰
- 第3章 并發的問題和原因詳解
- 10 有福同享,有難同當—原子性
- 11 眼見不實—可見性
- 12 什么?還有這種操作!—有序性
- 13 問題的根源—Java內存模型簡介
- 14 僵持不下—死鎖詳解
- 第4章 如何解決并發問題
- 15 原子性輕量級實現—深入理解Atomic與CAS
- 16 讓你眼見為實—volatile詳解
- 17 資源有限,請排隊等候—Synchronized使用、原理及缺陷
- 18 線程作用域內共享變量—深入解析ThreadLocal
- 第5章 線程池
- 19 自己動手豐衣足食—簡單線程池實現
- 20 其實不用造輪子—Executor框架詳解
- 第6章 主要并發工具類
- 21 更高級的鎖—深入解析Lock
- 22 到底哪把鎖更適合你?—synchronized與ReentrantLock對比
- 23 按需上鎖—ReadWriteLock詳解
- 24 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap上
- 25 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap下
- 26不讓我進門,我就在門口一直等!—BlockingQueue和ArrayBlockingQueue
- 27 倒數計時開始,三、二、一—CountDownLatch詳解
- 28 人齊了,一起行動—CyclicBarrier詳解
- 29 一手交錢,一手交貨—Exchanger詳解
- 30 限量供應,不好意思您來晚了—Semaphore詳解
- 第7章 高級并發工具類及并發設計模式
- 31 憑票取餐—Future模式詳解
- 32 請按到場順序發言—Completion Service詳解
- 33 分階段執行你的任務-學習使用Phaser運行多階段任務
- 34 誰都不能偷懶-通過 CompletableFuture 組裝你的異步計算單元
- 35拆分你的任務—學習使用Fork/Join框架
- 36 為多線程們安排一位經理—Master/Slave模式詳解
- 第8章 總結
- 37 結束語