鎖是一種同步機制,用于在多任務環境中限制資源的訪問,以滿足互斥需求。
go源碼sync包中經常用于同步操作的方式:
* 原子操作
* 互斥鎖
* 讀寫鎖
* waitgroup
我們著重來分析下互斥鎖和讀寫鎖.
互斥鎖:
下面是互斥鎖的數據結構:
~~~go
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
state int32 // 互斥鎖上鎖狀態枚舉值如下所示
sema uint32 // 信號量,向處于Gwaitting的G發送信號
}
const (
mutexLocked = 1 << iota // 值為1,表示在state中由低向高第1位,意義:鎖是否可用,0可用,1不可用,鎖定中
mutexWoken // 值為2,表示在state中由低向高第2位,意義:mutex是否被喚醒
mutexStarving // 當前的互斥鎖進入饑餓狀態;
mutexWaiterShift = iota //值為2,表示state中統計阻塞在此mutex上goroutine的數目需要位移的偏移量
starvationThresholdNs = 1e6
~~~
state和sema兩個加起來只占 8 字節空間的結構體表示了 Go 語言中的互斥鎖。
互斥鎖的狀態比較復雜,如下圖所示,最低三位分別表示 mutexLocked、mutexWoken 和 mutexStarving,剩下的位置用來表示當前有多少個 Goroutine 等待互斥鎖的釋放.
[](https://github.com/KeKe-Li/data-structures-questions/blob/master/src/images/138.jpg)
在默認情況下,互斥鎖的所有狀態位都是 0,int32 中的不同位分別表示了不同的狀態:
* mutexLocked 表示互斥鎖的鎖定狀態;
* mutexWoken 表示從正常模式被從喚醒;
* mutexStarving 當前的互斥鎖進入饑餓狀態;
* waitersCount 當前互斥鎖上等待的 Goroutine 個數;
sync.Mutex 有兩種模式,正常模式和饑餓模式。
在正常模式下,鎖的等待者會按照先進先出的順序獲取鎖。
但是剛被喚起的`Goroutine`與新創建的`Goroutine`競爭時,大概率會獲取不到鎖,為了減少這種情況的出現,一旦 Goroutine 超過 1ms 沒有獲取到鎖,它就會將當前互斥鎖切換饑餓模式,防止部分 Goroutine 被餓死。
饑餓模式是在 Go 語言 1.9 版本引入的優化的,引入的目的是保證互斥鎖的公平性(Fairness)。
在饑餓模式中,互斥鎖會直接交給等待隊列最前面的 Goroutine。新的 Goroutine 在該狀態下不能獲取鎖、也不會進入自旋狀態,它們只會在隊列的末尾等待。
如果一個 Goroutine 獲得了互斥鎖并且它在隊列的末尾或者它等待的時間少于 1ms,那么當前的互斥鎖就會被切換回正常模式。
相比于饑餓模式,正常模式下的互斥鎖能夠提供更好地性能,饑餓模式的能避免 Goroutine 由于陷入等待無法獲取鎖而造成的高尾延時。
互斥鎖的加鎖是靠 sync.Mutex.Lock 方法完成的, 當鎖的狀態是 0 時,將`mutexLocked`位置成 1:
~~~go
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
~~~
如果互斥鎖的狀態不是 0 時就會調用`sync.Mutex.lockSlow`嘗試通過自旋(Spinnig)等方式等待鎖的釋放,
這個方法是一個非常大 for 循環,它獲取鎖的過程:
1. 判斷當前 Goroutine 能否進入自旋;
2. 通過自旋等待互斥鎖的釋放;
3. 計算互斥鎖的最新狀態;
4. 更新互斥鎖的狀態并獲取鎖;
那么互斥鎖是如何判斷當前 Goroutine 能否進入自旋等互斥鎖的釋放,是通過它的lockSlow方法, 由于自旋是一種多線程同步機制,所以呢當前的進程在進入自旋的過程中會一直保持對 CPU 的占用,持續檢查某個條件是否為真。 通常在多核的 CPU 上,自旋可以避免 Goroutine 的切換,使用得當會對性能帶來很大的增益,但是往往使用的不得當就會拖慢整個程序.
所以 Goroutine 進入自旋的條件非常苛刻:
* 互斥鎖只有在普通模式才能進入自旋;
* `runtime.sync_runtime_canSpin`需要返回 true: a. 需要運行在多 CPU 的機器上; b. 當前的Goroutine 為了獲取該鎖進入自旋的次數小于四次; c. 當前機器上至少存在一個正在運行的處理器 P 并且處理的運行隊列為空;
一旦當前 Goroutine 能夠進入自旋就會調用`runtime.sync_runtime_doSpin`和`runtime.procyield`并執行 30 次的 PAUSE 指令,該指令只會占用 CPU 并消耗 CPU 時間.
處理了自旋相關的特殊邏輯之后,互斥鎖會根據上下文計算當前互斥鎖最新的狀態。
通過幾個不同的條件分別會更新 state 字段中存儲的不同信息,`mutexLocked`、`mutexStarving`、`mutexWoken`和`mutexWaiterShift`:
~~~go
new := old
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
new &^= mutexWoken
}
~~~
計算了新的互斥鎖狀態之后,就會使用 CAS 函數 sync/atomic.CompareAndSwapInt32 更新該狀態:
~~~go
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // 通過 CAS 函數獲取了鎖
}
...
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
}
~~~
如果我們沒有通過 CAS 獲得鎖,會調用`runtime.sync_runtime_SemacquireMutex`使用信號量保證資源不會被兩個 Goroutine 獲取。
`runtime.sync_runtime_SemacquireMutex`會在方法中不斷調用嘗試獲取鎖并休眠當前 Goroutine 等待信號量的釋放,一旦當前 Goroutine 可以獲取信號量,它就會立刻返回,`sync.Mutex.Lock`方法的剩余代碼也會繼續執行。
在正常模式下,這段代碼會設置喚醒和饑餓標記、重置迭代次數并重新執行獲取鎖的循環.
在饑餓模式下,當前 Goroutine 會獲得互斥鎖,如果等待隊列中只存在當前 Goroutine,互斥鎖還會從饑餓模式中退出.
互斥鎖的解鎖過程`sync.Mutex.Unlock`與加鎖過程相比就很簡單,該過程會先使用`sync/atomic.AddInt32`函數快速解鎖,這時會發生下面的兩種情況:
* 如果該函數返回的新狀態等于 0,當前 Goroutine 就成功解鎖了互斥鎖;
* 如果該函數返回的新狀態不等于 0,這段代碼會調用`sync.Mutex.unlockSlow`方法開始慢速解鎖:
~~~go
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
~~~
`sync.Mutex.unlockSlow`方法首先會校驗鎖狀態的合法性, 如果當前互斥鎖已經被解鎖過了就會直接拋出異常`sync: unlock of unlocked mutex`中止當前程序。
在正常情況下會根據當前互斥鎖的狀態,分別處理正常模式和饑餓模式下的互斥鎖.
~~~go
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
runtime_Semrelease(&m.sema, true, 1)
}
}
~~~
在正常模式下,這段代碼會分別處理以下兩種情況處理:
* 如果互斥鎖不存在等待者或者互斥鎖的`mutexLocked`、`mutexStarving`、`mutexWoken`狀態不都為 0,那么當前方法就可以直接返回,不需要喚醒其他等待者;
* 如果互斥鎖存在等待者,會通過`sync.runtime_Semrelease`喚醒等待者并移交鎖的所有權;
在饑餓模式下,上述代碼會直接調用`sync.runtime_Semrelease`方法將當前鎖交給下一個正在嘗試獲取鎖的等待者,等待者被喚醒后會得到鎖,在這時互斥鎖還不會退出饑餓狀態;
互斥鎖的加鎖過程比較復雜,它涉及自旋、信號量以及調度等概念:
* 如果互斥鎖處于初始化狀態,就會直接通過置位 mutexLocked 加鎖;
* 如果互斥鎖處于 mutexLocked 并且在普通模式下工作,就會進入自旋,執行 30 次 PAUSE 指令消耗 CPU 時間等待鎖的釋放;
* 如果當前 Goroutine 等待鎖的時間超過了 1ms,互斥鎖就會切換到饑餓模式;
* 互斥鎖在正常情況下會通過`runtime.sync_runtime_SemacquireMutex`函數將嘗試獲取鎖的 Goroutine 切換至休眠狀態,等待鎖的持有者喚醒當前 Goroutine;
* 如果當前 Goroutine 是互斥鎖上的最后一個等待的協程或者等待的時間小于 1ms,當前 Goroutine 會將互斥鎖切換回正常模式;
互斥鎖的解鎖過程與之相比就比較簡單,其代碼行數不多、邏輯清晰,也比較容易理解:
* 當互斥鎖已經被解鎖時,那么調用`sync.Mutex.Unlock`會直接拋出異常;
* 當互斥鎖處于饑餓模式時,會直接將鎖的所有權交給隊列中的下一個等待者,等待者會負責設置`mutexLocked`標志位;
* 當互斥鎖處于普通模式時,如果沒有 Goroutine 等待鎖的釋放或者已經有被喚醒的 Goroutine 獲得了鎖,就會直接返回;在其他情況下會通過`sync.runtime_Semrelease`喚醒對應的 Goroutine.
讀寫鎖:
讀寫互斥鎖`sync.RWMutex`是細粒度的互斥鎖,它不限制資源的并發讀,但是讀寫、寫寫操作無法并行執行。
sync.RWMutex 中總共包含5 個字段:
~~~go
type RWMutex struct {
w Mutex // 復用互斥鎖提供的能力
writerSem uint32 // 寫等待讀
readerSem uint32 // 讀等待寫
readerCount int32 // 存儲了當前正在執行的讀操作的數量
readerWait int32 // 當寫操作被阻塞時等待的讀操作個數
}
~~~
我們從寫鎖開始分析:
當我們想要獲取寫鎖時,需要調用`sync.RWMutex.Lock`方法:
~~~go
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
~~~
* 這里調用結構體持有的`sync.Mutex`的`sync.Mutex.Lock`方法阻塞后續的寫操作;
因為互斥鎖已經被獲取,其他 Goroutine 在獲取寫鎖時就會進入自旋或者休眠;
* 調用`sync/atomic.AddInt32`方法阻塞后續的讀操作:
如果仍然有其他 Goroutine 持有互斥鎖的讀鎖`(r != 0)`,該 Goroutine 會調用`runtime.sync_runtime_SemacquireMutex`進入休眠狀態等待所有讀鎖所有者執行結束后釋放`writerSem`信號量將當前協程喚醒。
寫鎖的釋放會調用`sync.RWMutex.Unlock`方法:
~~~go
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
~~~
解鎖與加鎖的過程正好相反,寫鎖的釋放分為以下幾個步驟:
1. 調用`sync/atomic.AddInt32`函數將`readerCount`變回正數,釋放讀鎖;
2. 通過 for 循環觸發所有由于獲取讀鎖而陷入等待的 Goroutine:
3. 調用`sync.Mutex.Unlock`方法釋放寫鎖;
獲取寫鎖時會先阻塞寫鎖的獲取,后阻塞讀鎖的獲取,這種策略能夠保證讀操作不會被連續的寫操作餓死。
接著是讀鎖:
讀鎖的加鎖方法`sync.RWMutex.RLock`就比較簡單了,該方法會通過`sync/atomic.AddInt32`將`readerCount`加一:
~~~go
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
~~~
如果`RLock`該方法返回負數,其他 Goroutine 獲得了寫鎖,當前 Goroutine 就會調用`runtime.sync_runtime_SemacquireMutex`陷入休眠等待鎖的釋放; 如果`RLock`該方法的結果為非負數,沒有 Goroutine 獲得寫鎖,當前方法就會成功返回.
當 Goroutine 想要釋放讀鎖時,會調用如下所示的`RUnlock`方法:
~~~go
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}
~~~
該方法會先減少正在讀資源的`readerCount`整數,根據`sync/atomic.AddInt32`的返回值不同會分別進行處理:
* 如果返回值大于等于零,表示讀鎖直接解鎖成功.
* 如果返回值小于零 ,表示有一個正在執行的寫操作,在這時會調用`rUnlockSlow`方法.
~~~go
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
~~~
`rUnlockSlow`該方法會減少獲取鎖的寫操作等待的讀操作數`readerWait`并在所有讀操作都被釋放之后觸發寫操作的信號量,`writerSem`,該信號量被觸發時,調度器就會喚醒嘗試獲取寫鎖的 Goroutine。
其實讀寫互斥鎖(sync.RWMutex),雖然提供的功能非常復雜,不過因為它是在互斥鎖( sync.Mutex)的基礎上,所以整體的實現上會簡單很多。
因此呢:
* 調用`sync.RWMutex.Lock`嘗試獲取寫鎖時;
每次`sync.RWMutex.RUnlock`都會將`readerCount`其減一,當它歸零時該 Goroutine 就會獲得寫鎖, 將`readerCount`減少`rwmutexMaxReaders`個數以阻塞后續的讀操作.
* 調用`sync.RWMutex.Unlock`釋放寫鎖時,會先通知所有的讀操作,然后才會釋放持有的互斥鎖;
讀寫互斥鎖在互斥鎖之上提供了額外的更細粒度的控制,能夠在讀操作遠遠多于寫操作時提升性能。
- Golang基礎
- Go中new與make的區別
- Golang中除了加Mutex鎖以外還有哪些方式安全讀寫共享變量
- 無緩沖Chan的發送和接收是否同步
- Golang并發機制以及它所使用的CSP并發模型.
- Golang中常用的并發模型
- Go中對nil的Slice和空Slice的處理是一致的嗎
- 協程和線程和進程的區別
- Golang的內存模型中為什么小對象多了會造成GC壓力
- Go中數據競爭問題怎么解決
- 什么是channel,為什么它可以做到線程安全
- Golang垃圾回收算法
- GC的觸發條件
- Go的GPM如何調度
- 并發編程概念是什么
- Go語言的棧空間管理是怎么樣的
- Goroutine和Channel的作用分別是什么
- 怎么查看Goroutine的數量
- Go中的鎖有哪些
- 怎么限制Goroutine的數量
- Channel是同步的還是異步的
- Goroutine和線程的區別
- Go的Struct能不能比較
- Go的defer原理是什么
- Go的select可以用于什么
- Context包的用途是什么
- Go主協程如何等其余協程完再操作
- Go的Slice如何擴容
- Go中的map如何實現順序讀取
- Go中CAS是怎么回事
- Go中的逃逸分析是什么
- Go值接收者和指針接收者的區別
- Go的對象在內存中是怎樣分配的
- 棧的內存是怎么分配的
- 堆內存管理怎么分配的
- 在Go函數中為什么會發生內存泄露
- G0的作用
- Go中的鎖如何實現
- Go中的channel的實現
- 棧的內存是怎么分配的2
- 堆內存管理怎么分配的2
- Go中的map的實現
- Go中的http包的實現原理
- Goroutine發生了泄漏如何檢測
- Go函數返回局部變量的指針是否安全
- Go中兩個Nil可能不相等嗎
- Goroutine和KernelThread之間是什么關系
- 為何GPM調度要有P
- 如何在goroutine執行一半就退出協程
- Mysql基礎
- Mysql索引用的是什么算法
- Mysql事務的基本要素
- Mysql的存儲引擎
- Mysql事務隔離級別
- Mysql高可用方案有哪些
- Mysql中utf8和utf8mb4區別
- Mysql中樂觀鎖和悲觀鎖區別
- Mysql索引主要是哪些
- Mysql聯合索引最左匹配原則
- 聚簇索引和非聚簇索引區別
- 如何查詢一個字段是否命中了索引
- Mysql中查詢數據什么情況下不會命中索引
- Mysql中的MVCC是什么
- Mvcc和Redolog和Undolog以及Binlog有什么不同
- Mysql讀寫分離以及主從同步
- InnoDB的關鍵特性
- Mysql如何保證一致性和持久性
- 為什么選擇B+樹作為索引結構
- InnoDB的行鎖模式
- 哈希(hash)比樹(tree)更快,索引結構為什么要設計成樹型
- 為什么索引的key長度不能太長
- Mysql的數據如何恢復到任意時間點
- Mysql為什么加了索引可以加快查詢
- Explain命令有什么用
- Redis基礎
- Redis的數據結構及使用場景
- Redis持久化的幾種方式
- Redis的LRU具體實現
- 單線程的Redis為什么快
- Redis的數據過期策略
- 如何解決Redis緩存雪崩問題
- 如何解決Redis緩存穿透問題
- Redis并發競爭key如何解決
- Redis的主從模式和哨兵模式和集群模式區別
- Redis有序集合zset底層怎么實現的
- 跳表的查詢過程是怎么樣的,查詢和插入的時間復雜度
- 網絡協議基礎
- TCP和UDP有什么區別
- TCP中三次握手和四次揮手
- TCP的LISTEN狀態是什么
- 常見的HTTP狀態碼有哪些
- 301和302有什么區別
- 504和500有什么區別
- HTTPS和HTTP有什么區別
- Quic有什么優點相比Http2
- Grpc的優缺點
- Get和Post區別
- Unicode和ASCII以及Utf8的區別
- Cookie與Session異同
- Client如何實現長連接
- Http1和Http2和Grpc之間的區別是什么
- Tcp中的拆包和粘包是怎么回事
- TFO的原理是什么
- TIME_WAIT的作用
- 網絡的性能指標有哪些