## 同步
在[上一章](http://www.hmoore.net/imnotdown1019/java_core_full/1012270)中,我們學到了如何通過執行器服務同時執行代碼。當我們編寫這種多線程代碼時,我們需要特別注意共享可變變量的并發訪問。假設我們打算增加某個可被多個線程同時訪問的整數。
我們定義了`count`字段,帶有`increment()`方法來使`count`加一:
~~~java
int count = 0;
void increment() {
count = count + 1;
}
~~~
當多個線程并發調用這個方法時,我們就會遇到大麻煩:
~~~java
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 10000)
.forEach(i -> executor.submit(this::increment));
stop(executor);
System.out.println(count); // 9965
~~~
我們沒有看到`count`為10000的結果,上面代碼的實際結果在每次執行時都不同。原因是我們在不同的線程上共享可變變量,并且變量訪問沒有同步機制,這會產生[競爭條件](http://en.wikipedia.org/wiki/Race_condition)。
增加一個數值需要三個步驟:(1)讀取當前值,(2)使這個值加一,(3)將新的值寫到變量。如果兩個線程同時執行,就有可能出現兩個線程同時執行步驟1,于是會讀到相同的當前值。這會導致無效的寫入,所以實際的結果會偏小。上面的例子中,對`count`的非同步并發訪問丟失了35次增加操作,但是你在自己執行代碼時會看到不同的結果。
幸運的是,Java自從很久之前就通過`synchronized`關鍵字支持線程同步。我們可以使用`synchronized`來修復上面在增加`count`時的競爭條件。
~~~java
synchronized void incrementSync() {
count = count + 1;
}
~~~
在我們并發調用`incrementSync()`時,我們得到了`count`為10000的預期結果。沒有再出現任何競爭條件,并且結果在每次代碼執行中都很穩定:
~~~java
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 10000)
.forEach(i -> executor.submit(this::incrementSync));
stop(executor);
System.out.println(count); // 10000
~~~
`synchronized`關鍵字也可用于語句塊:
~~~java
void incrementSync() {
synchronized (this) {
count = count + 1;
}
}
~~~
Java在內部使用所謂的“監視器”(monitor),也稱為監視器鎖(monitor lock)或內在鎖( intrinsic lock)來管理同步。監視器綁定在對象上,例如,當使用同步方法時,每個方法都共享相應對象的相同監視器。
所有隱式的監視器都實現了重入(reentrant)特性。重入的意思是鎖綁定在當前線程上。線程可以安全地多次獲取相同的鎖,而不會產生死鎖(例如,同步方法調用相同對象的另一個同步方法)。
## [](https://github.com/wizardforcel/modern-java-zh/blob/master/ch5.md#鎖)鎖
并發API支持多種顯式的鎖,它們由`Lock`接口規定,用于代替`synchronized`的隱式鎖。鎖對細粒度的控制支持多種方法,因此它們比隱式的監視器具有更大的開銷。
鎖的多個實現在標準JDK中提供,它們會在下面的章節中展示。
### [](https://github.com/wizardforcel/modern-java-zh/blob/master/ch5.md#reentrantlock)`ReentrantLock`
`ReentrantLock`類是互斥鎖,與通過`synchronized`訪問的隱式監視器具有相同行為,但是具有擴展功能。就像它的名稱一樣,這個鎖實現了重入特性,就像隱式監視器一樣。
讓我們看看使用`ReentrantLock`之后的上面的例子。
~~~java
ReentrantLock lock = new ReentrantLock();
int count = 0;
void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
~~~
鎖可以通過`lock()`來獲取,通過`unlock()`來釋放。把你的代碼包裝在`try-finally`代碼塊中來確保異常情況下的解鎖非常重要。這個方法是線程安全的,就像同步副本那樣。如果另一個線程已經拿到鎖了,再次調用`lock()`會阻塞當前線程,直到鎖被釋放。在任意給定的時間內,只有一個線程可以拿到鎖。
鎖對細粒度的控制支持多種方法,就像下面的例子那樣:
~~~java
executor.submit(() -> {
lock.lock();
try {
sleep(1);
} finally {
lock.unlock();
}
});
executor.submit(() -> {
System.out.println("Locked: " + lock.isLocked());
System.out.println("Held by me: " + lock.isHeldByCurrentThread());
boolean locked = lock.tryLock();
System.out.println("Lock acquired: " + locked);
});
stop(executor);
~~~
在第一個任務拿到鎖的一秒之后,第二個任務獲得了鎖的當前狀態的不同信息。
~~~
Locked: true
Held by me: false
Lock acquired: false
~~~
`tryLock()`方法是`lock()`方法的替代,它嘗試拿鎖而不阻塞當前線程。在訪問任何共享可變變量之前,必須使用布爾值結果來檢查鎖是否已經被獲取。
### [](https://github.com/wizardforcel/modern-java-zh/blob/master/ch5.md#readwritelock)`ReadWriteLock`
`ReadWriteLock`接口規定了鎖的另一種類型,包含用于讀寫訪問的一對鎖。讀寫鎖的理念是,只要沒有任何線程寫入變量,并發讀取可變變量通常是安全的。所以讀鎖可以同時被多個線程持有,只要沒有線程持有寫鎖。這樣可以提升性能和吞吐量,因為讀取比寫入更加頻繁。
~~~java
ExecutorService executor = Executors.newFixedThreadPool(2);
Map<String, String> map = new HashMap<>();
ReadWriteLock lock = new ReentrantReadWriteLock();
executor.submit(() -> {
lock.writeLock().lock();
try {
sleep(1);
map.put("foo", "bar");
} finally {
lock.writeLock().unlock();
}
});
~~~
上面的例子在暫停一秒之后,首先獲取寫鎖來向映射添加新的值。在這個任務完成之前,兩個其它的任務被啟動,嘗試讀取映射中的元素,并暫停一秒:
~~~java
Runnable readTask = () -> {
lock.readLock().lock();
try {
System.out.println(map.get("foo"));
sleep(1);
} finally {
lock.readLock().unlock();
}
};
executor.submit(readTask);
executor.submit(readTask);
stop(executor);
~~~
當你執行這一代碼示例時,你會注意到兩個讀任務需要等待寫任務完成。在釋放了寫鎖之后,兩個讀任務會同時執行,并同時打印結果。它們不需要相互等待完成,因為讀鎖可以安全同步獲取,只要沒有其它線程獲取了寫鎖。
### [](https://github.com/wizardforcel/modern-java-zh/blob/master/ch5.md#stampedlock)`StampedLock`
Java 8 自帶了一種新的鎖,叫做`StampedLock`,它同樣支持讀寫鎖,就像上面的例子那樣。與`ReadWriteLock`不同的是,`StampedLock`的鎖方法會返回表示為`long`的標記。你可以使用這些標記來釋放鎖,或者檢查鎖是否有效。此外,`StampedLock`支持另一種叫做樂觀鎖(optimistic locking)的模式。
讓我們使用`StampedLock`代替`ReadWriteLock`重寫上面的例子:
~~~java
ExecutorService executor = Executors.newFixedThreadPool(2);
Map<String, String> map = new HashMap<>();
StampedLock lock = new StampedLock();
executor.submit(() -> {
long stamp = lock.writeLock();
try {
sleep(1);
map.put("foo", "bar");
} finally {
lock.unlockWrite(stamp);
}
});
Runnable readTask = () -> {
long stamp = lock.readLock();
try {
System.out.println(map.get("foo"));
sleep(1);
} finally {
lock.unlockRead(stamp);
}
};
executor.submit(readTask);
executor.submit(readTask);
stop(executor);
~~~
通過`readLock()`或`writeLock()`來獲取讀鎖或寫鎖會返回一個標記,它可以在稍后用于在`finally`塊中解鎖。要記住`StampedLock`并沒有實現重入特性。每次調用加鎖都會返回一個新的標記,并且在沒有可用的鎖時阻塞,即使相同線程已經拿鎖了。所以你需要額外注意不要出現死鎖。
就像前面的`ReadWriteLock`例子那樣,兩個讀任務都需要等待寫鎖釋放。之后兩個讀任務同時向控制臺打印信息,因為多個讀操作不會相互阻塞,只要沒有線程拿到寫鎖。
下面的例子展示了樂觀鎖:
~~~java
ExecutorService executor = Executors.newFixedThreadPool(2);
StampedLock lock = new StampedLock();
executor.submit(() -> {
long stamp = lock.tryOptimisticRead();
try {
System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
sleep(1);
System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
sleep(2);
System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
} finally {
lock.unlock(stamp);
}
});
executor.submit(() -> {
long stamp = lock.writeLock();
try {
System.out.println("Write Lock acquired");
sleep(2);
} finally {
lock.unlock(stamp);
System.out.println("Write done");
}
});
stop(executor);
~~~
樂觀的讀鎖通過調用`tryOptimisticRead()`獲取,它總是返回一個標記而不阻塞當前線程,無論鎖是否真正可用。如果已經有寫鎖被拿到,返回的標記等于0。你需要總是通過`lock.validate(stamp)`檢查標記是否有效。
執行上面的代碼會產生以下輸出:
~~~
Optimistic Lock Valid: true
Write Lock acquired
Optimistic Lock Valid: false
Write done
Optimistic Lock Valid: false
~~~
樂觀鎖在剛剛拿到鎖之后是有效的。和普通的讀鎖不同的是,樂觀鎖不阻止其他線程同時獲取寫鎖。在第一個線程暫停一秒之后,第二個線程拿到寫鎖而無需等待樂觀的讀鎖被釋放。此時,樂觀的讀鎖就不再有效了。甚至當寫鎖釋放時,樂觀的讀鎖還處于無效狀態。
所以在使用樂觀鎖時,你需要每次在訪問任何共享可變變量之后都要檢查鎖,來確保讀鎖仍然有效。
有時,將讀鎖轉換為寫鎖而不用再次解鎖和加鎖十分實用。`StampedLock`為這種目的提供了`tryConvertToWriteLock()`方法,就像下面那樣:
~~~java
ExecutorService executor = Executors.newFixedThreadPool(2);
StampedLock lock = new StampedLock();
executor.submit(() -> {
long stamp = lock.readLock();
try {
if (count == 0) {
stamp = lock.tryConvertToWriteLock(stamp);
if (stamp == 0L) {
System.out.println("Could not convert to write lock");
stamp = lock.writeLock();
}
count = 23;
}
System.out.println(count);
} finally {
lock.unlock(stamp);
}
});
stop(executor);
~~~
第一個任務獲取讀鎖,并向控制臺打印`count`字段的當前值。但是如果當前值是零,我們希望將其賦值為`23`。我們首先需要將讀鎖轉換為寫鎖,來避免打破其它線程潛在的并發訪問。`tryConvertToWriteLock()`的調用不會阻塞,但是可能會返回為零的標記,表示當前沒有可用的寫鎖。這種情況下,我們調用`writeLock()`來阻塞當前線程,直到有可用的寫鎖。
## [](https://github.com/wizardforcel/modern-java-zh/blob/master/ch5.md#信號量)信號量
除了鎖之外,并發API也支持計數的信號量。不過鎖通常用于變量或資源的互斥訪問,信號量可以維護整體的準入許可。這在一些不同場景下,例如你需要限制你程序某個部分的并發訪問總數時非常實用。
下面是一個例子,演示了如何限制對通過`sleep(5)`模擬的長時間運行任務的訪問:
~~~java
ExecutorService executor = Executors.newFixedThreadPool(10);
Semaphore semaphore = new Semaphore(5);
Runnable longRunningTask = () -> {
boolean permit = false;
try {
permit = semaphore.tryAcquire(1, TimeUnit.SECONDS);
if (permit) {
System.out.println("Semaphore acquired");
sleep(5);
} else {
System.out.println("Could not acquire semaphore");
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
} finally {
if (permit) {
semaphore.release();
}
}
}
IntStream.range(0, 10)
.forEach(i -> executor.submit(longRunningTask));
stop(executor);
~~~
執行器可能同時運行10個任務,但是我們使用了大小為5的信號量,所以將并發訪問限制為5。使用`try-finally`代碼塊在異常情況中合理釋放信號量十分重要。
執行上述代碼產生如下結果:
~~~
Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
~~~
信號量限制對通過`sleep(5)`模擬的長時間運行任務的訪問,最大5個線程。每個隨后的`tryAcquire()`調用在經過最大為一秒的等待超時之后,會向控制臺打印不能獲取信號量的結果。
- 一.JVM
- 1.1 java代碼是怎么運行的
- 1.2 JVM的內存區域
- 1.3 JVM運行時內存
- 1.4 JVM內存分配策略
- 1.5 JVM類加載機制與對象的生命周期
- 1.6 常用的垃圾回收算法
- 1.7 JVM垃圾收集器
- 1.8 CMS垃圾收集器
- 1.9 G1垃圾收集器
- 2.面試相關文章
- 2.1 可能是把Java內存區域講得最清楚的一篇文章
- 2.0 GC調優參數
- 2.1GC排查系列
- 2.2 內存泄漏和內存溢出
- 2.2.3 深入理解JVM-hotspot虛擬機對象探秘
- 1.10 并發的可達性分析相關問題
- 二.Java集合架構
- 1.ArrayList深入源碼分析
- 2.Vector深入源碼分析
- 3.LinkedList深入源碼分析
- 4.HashMap深入源碼分析
- 5.ConcurrentHashMap深入源碼分析
- 6.HashSet,LinkedHashSet 和 LinkedHashMap
- 7.容器中的設計模式
- 8.集合架構之面試指南
- 9.TreeSet和TreeMap
- 三.Java基礎
- 1.基礎概念
- 1.1 Java程序初始化的順序是怎么樣的
- 1.2 Java和C++的區別
- 1.3 反射
- 1.4 注解
- 1.5 泛型
- 1.6 字節與字符的區別以及訪問修飾符
- 1.7 深拷貝與淺拷貝
- 1.8 字符串常量池
- 2.面向對象
- 3.關鍵字
- 4.基本數據類型與運算
- 5.字符串與數組
- 6.異常處理
- 7.Object 通用方法
- 8.Java8
- 8.1 Java 8 Tutorial
- 8.2 Java 8 數據流(Stream)
- 8.3 Java 8 并發教程:線程和執行器
- 8.4 Java 8 并發教程:同步和鎖
- 8.5 Java 8 并發教程:原子變量和 ConcurrentMap
- 8.6 Java 8 API 示例:字符串、數值、算術和文件
- 8.7 在 Java 8 中避免 Null 檢查
- 8.8 使用 Intellij IDEA 解決 Java 8 的數據流問題
- 四.Java 并發編程
- 1.線程的實現/創建
- 2.線程生命周期/狀態轉換
- 3.線程池
- 4.線程中的協作、中斷
- 5.Java鎖
- 5.1 樂觀鎖、悲觀鎖和自旋鎖
- 5.2 Synchronized
- 5.3 ReentrantLock
- 5.4 公平鎖和非公平鎖
- 5.3.1 說說ReentrantLock的實現原理,以及ReentrantLock的核心源碼是如何實現的?
- 5.5 鎖優化和升級
- 6.多線程的上下文切換
- 7.死鎖的產生和解決
- 8.J.U.C(java.util.concurrent)
- 0.簡化版(快速復習用)
- 9.鎖優化
- 10.Java 內存模型(JMM)
- 11.ThreadLocal詳解
- 12 CAS
- 13.AQS
- 0.ArrayBlockingQueue和LinkedBlockingQueue的實現原理
- 1.DelayQueue的實現原理
- 14.Thread.join()實現原理
- 15.PriorityQueue 的特性和原理
- 16.CyclicBarrier的實際使用場景
- 五.Java I/O NIO
- 1.I/O模型簡述
- 2.Java NIO之緩沖區
- 3.JAVA NIO之文件通道
- 4.Java NIO之套接字通道
- 5.Java NIO之選擇器
- 6.基于 Java NIO 實現簡單的 HTTP 服務器
- 7.BIO-NIO-AIO
- 8.netty(一)
- 9.NIO面試題
- 六.Java設計模式
- 1.單例模式
- 2.策略模式
- 3.模板方法
- 4.適配器模式
- 5.簡單工廠
- 6.門面模式
- 7.代理模式
- 七.數據結構和算法
- 1.什么是紅黑樹
- 2.二叉樹
- 2.1 二叉樹的前序、中序、后序遍歷
- 3.排序算法匯總
- 4.java實現鏈表及鏈表的重用操作
- 4.1算法題-鏈表反轉
- 5.圖的概述
- 6.常見的幾道字符串算法題
- 7.幾道常見的鏈表算法題
- 8.leetcode常見算法題1
- 9.LRU緩存策略
- 10.二進制及位運算
- 10.1.二進制和十進制轉換
- 10.2.位運算
- 11.常見鏈表算法題
- 12.算法好文推薦
- 13.跳表
- 八.Spring 全家桶
- 1.Spring IOC
- 2.Spring AOP
- 3.Spring 事務管理
- 4.SpringMVC 運行流程和手動實現
- 0.Spring 核心技術
- 5.spring如何解決循環依賴問題
- 6.springboot自動裝配原理
- 7.Spring中的循環依賴解決機制中,為什么要三級緩存,用二級緩存不夠嗎
- 8.beanFactory和factoryBean有什么區別
- 九.數據庫
- 1.mybatis
- 1.1 MyBatis-# 與 $ 區別以及 sql 預編譯
- Mybatis系列1-Configuration
- Mybatis系列2-SQL執行過程
- Mybatis系列3-之SqlSession
- Mybatis系列4-之Executor
- Mybatis系列5-StatementHandler
- Mybatis系列6-MappedStatement
- Mybatis系列7-參數設置揭秘(ParameterHandler)
- Mybatis系列8-緩存機制
- 2.淺談聚簇索引和非聚簇索引的區別
- 3.mysql 證明為什么用limit時,offset很大會影響性能
- 4.MySQL中的索引
- 5.數據庫索引2
- 6.面試題收集
- 7.MySQL行鎖、表鎖、間隙鎖詳解
- 8.數據庫MVCC詳解
- 9.一條SQL查詢語句是如何執行的
- 10.MySQL 的 crash-safe 原理解析
- 11.MySQL 性能優化神器 Explain 使用分析
- 12.mysql中,一條update語句執行的過程是怎么樣的?期間用到了mysql的哪些log,分別有什么作用
- 十.Redis
- 0.快速復習回顧Redis
- 1.通俗易懂的Redis數據結構基礎教程
- 2.分布式鎖(一)
- 3.分布式鎖(二)
- 4.延時隊列
- 5.位圖Bitmaps
- 6.Bitmaps(位圖)的使用
- 7.Scan
- 8.redis緩存雪崩、緩存擊穿、緩存穿透
- 9.Redis為什么是單線程、及高并發快的3大原因詳解
- 10.布隆過濾器你值得擁有的開發利器
- 11.Redis哨兵、復制、集群的設計原理與區別
- 12.redis的IO多路復用
- 13.相關redis面試題
- 14.redis集群
- 十一.中間件
- 1.RabbitMQ
- 1.1 RabbitMQ實戰,hello world
- 1.2 RabbitMQ 實戰,工作隊列
- 1.3 RabbitMQ 實戰, 發布訂閱
- 1.4 RabbitMQ 實戰,路由
- 1.5 RabbitMQ 實戰,主題
- 1.6 Spring AMQP 的 AMQP 抽象
- 1.7 Spring AMQP 實戰 – 整合 RabbitMQ 發送郵件
- 1.8 RabbitMQ 的消息持久化與 Spring AMQP 的實現剖析
- 1.9 RabbitMQ必備核心知識
- 2.RocketMQ 的幾個簡單問題與答案
- 2.Kafka
- 2.1 kafka 基礎概念和術語
- 2.2 Kafka的重平衡(Rebalance)
- 2.3.kafka日志機制
- 2.4 kafka是pull還是push的方式傳遞消息的?
- 2.5 Kafka的數據處理流程
- 2.6 Kafka的腦裂預防和處理機制
- 2.7 Kafka中partition副本的Leader選舉機制
- 2.8 如果Leader掛了的時候,follower沒來得及同步,是否會出現數據不一致
- 2.9 kafka的partition副本是否會出現腦裂情況
- 十二.Zookeeper
- 0.什么是Zookeeper(漫畫)
- 1.使用docker安裝Zookeeper偽集群
- 3.ZooKeeper-Plus
- 4.zk實現分布式鎖
- 5.ZooKeeper之Watcher機制
- 6.Zookeeper之選舉及數據一致性
- 十三.計算機網絡
- 1.進制轉換:二進制、八進制、十六進制、十進制之間的轉換
- 2.位運算
- 3.計算機網絡面試題匯總1
- 十四.Docker
- 100.面試題收集合集
- 1.美團面試常見問題總結
- 2.b站部分面試題
- 3.比心面試題
- 4.騰訊面試題
- 5.哈羅部分面試
- 6.筆記
- 十五.Storm
- 1.Storm和流處理簡介
- 2.Storm 核心概念詳解
- 3.Storm 單機版本環境搭建
- 4.Storm 集群環境搭建
- 5.Storm 編程模型詳解
- 6.Storm 項目三種打包方式對比分析
- 7.Storm 集成 Redis 詳解
- 8.Storm 集成 HDFS 和 HBase
- 9.Storm 集成 Kafka
- 十六.Elasticsearch
- 1.初識ElasticSearch
- 2.文檔基本CRUD、集群健康檢查
- 3.shard&replica
- 4.document核心元數據解析及ES的并發控制
- 5.document的批量操作及數據路由原理
- 6.倒排索引
- 十七.分布式相關
- 1.分布式事務解決方案一網打盡
- 2.關于xxx怎么保證高可用的問題
- 3.一致性hash原理與實現
- 4.微服務注冊中心 Nacos 比 Eureka的優勢
- 5.Raft 協議算法
- 6.為什么微服務架構中需要網關
- 0.CAP與BASE理論
- 十八.Dubbo
- 1.快速掌握Dubbo常規應用
- 2.Dubbo應用進階
- 3.Dubbo調用模塊詳解
- 4.Dubbo調用模塊源碼分析
- 6.Dubbo協議模塊