# 5.2 互斥鎖
```
type Mutex struct {
state int32 // 表示 mutex 鎖當前的狀態
sema uint32 // 信號量,用于喚醒 goroutine
}
```
## 狀態
```
const (
mutexLocked = 1 << iota // 互斥鎖已鎖住
mutexWoken
mutexStarving
mutexWaiterShift = iota
starvationThresholdNs = 1e6
)
```
Mutex 可能處于兩種不同的模式:正常模式和饑餓模式。
在正常模式中,等待者按照 FIFO 的順序排隊獲取鎖,但是一個被喚醒的等待者有時候并不能獲取 mutex, 它還需要和新到來的 goroutine 們競爭 mutex 的使用權。 新到來的 goroutine 存在一個優勢,它們已經在 CPU 上運行且它們數量很多, 因此一個被喚醒的等待者有很大的概率獲取不到鎖,在這種情況下它處在等待隊列的前面。 如果一個 goroutine 等待 mutex 釋放的時間超過 1ms,它就會將 mutex 切換到饑餓模式
在饑餓模式中,mutex 的所有權直接從解鎖的 goroutine 遞交到等待隊列中排在最前方的 goroutine。 新到達的 goroutine 們不要嘗試去獲取 mutex,即使它看起來是在解鎖狀態,也不要試圖自旋, 而是排到等待隊列的尾部。
如果一個等待者獲得 mutex 的所有權,并且看到以下兩種情況中的任一種:
1. 它是等待隊列中的最后一個,
2. 它等待的時間少于 1ms,它便將 mutex 切換回正常操作模式
正常模式下的性能會更好,因為一個 goroutine 能在即使有很多阻塞的等待者時多次連續的獲得一個 mutex,饑餓模式的重要性則在于避免了病態情況下的尾部延遲。
## 加鎖
Lock 對申請鎖的情況分為三種:
1. 無沖突,通過 CAS 操作把當前狀態設置為加鎖狀態
2. 有沖突,開始自旋,并等待鎖釋放,如果其他 goroutine 在這段時間內釋放該鎖,直接獲得該鎖;如果沒有釋放則為下一種情況
3. 有沖突,且已經過了自旋階段,通過調用 semrelease 讓 goroutine 進入等待狀態
```
func (m *Mutex) Lock() {
// 快速路徑: 抓取并鎖上未鎖住狀態的互斥鎖
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
(...)
return
}
m.lockSlow()
}
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
(...)
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
(...)
}
```
## 解鎖
```
func (m *Mutex) Unlock() {
(...)
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
(...)
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
// 如果沒有等待著,或者已經存在一個 goroutine 被喚醒或者得到鎖,
// 或處于饑餓模式,無需喚醒任何等待狀態的 goroutine
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 喚醒一個阻塞的 goroutine,但不是喚醒第一個等待者
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
// 饑餓模式: 直接將 mutex 所有權交給等待隊列最前端的 goroutine
runtime_Semrelease(&m.sema, true, 1)
}
}
```
## 例:并發安全的單例模式
利用原子操作和互斥鎖我們可以輕松實現一個非常簡單的并發安全單例模式,即 sync.Once。
sync.Once 用來保證絕對一次執行的對象,例如可在單例的初始化中使用。 它內部的結構也相對簡單:
```
// Once 對象可以保證一個動作的絕對一次執行。
type Once struct {
// done 表明某個動作是否被執行
// 由于其使用頻繁(熱路徑),故將其放在結構體的最上方
// 熱路徑在每個調用點進行內嵌
// 將 done 放在第一位,在某些架構下(amd64/x86)能獲得更加緊湊的指令,
// 而在其他架構下能更少的指令(用于計算其偏移量)。
done uint32
m Mutex
}
```
注意,這個結構在 Go 1.13 中得到了重新調整,在其之前`done`字段在`m`之后。
源碼也非常簡單:
```
// Do 當且僅當第一次調用時,f 會被執行。換句話說,給定
// var once Once
// 如果 once.Do(f) 被多次調用則只有第一次會調用 f,即使每次提供的 f 不同。
// 每次執行必須新建一個 Once 實例。
//
// Do 用于變量的一次初始化,由于 f 是無參數的,因此有必要使用函數字面量來捕獲參數:
// config.once.Do(func() { config.init(filename) })
//
// 因為該調用無返回值,因此如果 f 調用了 Do,則會導致死鎖。
//
// 如果 f 發生 panic,則 Do 認為 f 已經返回;之后的調用也不會調用 f。
//
func (o *Once) Do(f func()) {
// 原子讀取 Once 內部的 done 屬性,是否為 0,是則進入慢速路徑,否則直接調用
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
// 注意,我們只使用原子讀讀取了 o.done 的值,這是最快速的路徑執行原子操作,即 fast-path
// 但當我們需要確保在并發狀態下,是不是有多個人讀到 0,因此必須加鎖,這個操作相對昂貴,即 slow-path
o.m.Lock()
defer o.m.Unlock()
// 正好我們有一個并發的 goroutine 讀到了 0,那么立即執行 f 并在結束時候調用原子寫,將 o.done 修改為 1
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
// 當 o.done 為 0 的 goroutine 解鎖后,其他人會繼續加鎖,這時會發現 o.done 已經為了 1 ,于是 f 已經不用在繼續執行了
}
```
- 第一部分 :基礎篇
- 第1章 Go語言的前世今生
- 1.2 Go語言綜述
- 1.3 順序進程通訊
- 1.4 Plan9匯編語言
- 第2章 程序生命周期
- 2.1 從go命令談起
- 2.2 Go程序編譯流程
- 2.3 Go 程序啟動引導
- 2.4 主Goroutine的生與死
- 第3 章 語言核心
- 3.1 數組.切片與字符串
- 3.2 散列表
- 3.3 函數調用
- 3.4 延遲語句
- 3.5 恐慌與恢復內建函數
- 3.6 通信原語
- 3.7 接口
- 3.8 運行時類型系統
- 3.9 類型別名
- 3.10 進一步閱讀的參考文獻
- 第4章 錯誤
- 4.1 問題的演化
- 4.2 錯誤值檢查
- 4.3 錯誤格式與上下文
- 4.4 錯誤語義
- 4.5 錯誤處理的未來
- 4.6 進一步閱讀的參考文獻
- 第5章 同步模式
- 5.1 共享內存式同步模式
- 5.2 互斥鎖
- 5.3 原子操作
- 5.4 條件變量
- 5.5 同步組
- 5.6 緩存池
- 5.7 并發安全散列表
- 5.8 上下文
- 5.9 內存一致模型
- 5.10 進一步閱讀的文獻參考
- 第二部分 運行時篇
- 第6章 并發調度
- 6.1 隨機調度的基本概念
- 6.2 工作竊取式調度
- 6.3 MPG模型與并發調度單
- 6.4 調度循環
- 6.5 線程管理
- 6.6 信號處理機制
- 6.7 執行棧管理
- 6.8 協作與搶占
- 6.9 系統監控
- 6.10 網絡輪詢器
- 6.11 計時器
- 6.12 非均勻訪存下的調度模型
- 6.13 進一步閱讀的參考文獻
- 第7章 內存分配
- 7.1 設計原則
- 7.2 組件
- 7.3 初始化
- 7.4 大對象分配
- 7.5 小對象分配
- 7.6 微對象分配
- 7.7 頁分配器
- 7.8 內存統計
- 第8章 垃圾回收
- 8.1 垃圾回收的基本想法
- 8.2 寫屏幕技術
- 8.3 調步模型與強弱觸發邊界
- 8.4 掃描標記與標記輔助
- 8.5 免清掃式位圖技術
- 8.6 前進保障與終止檢測
- 8.7 安全點分析
- 8.8 分代假設與代際回收
- 8.9 請求假設與實務制導回收
- 8.10 終結器
- 8.11 過去,現在與未來
- 8.12 垃圾回收統一理論
- 8.13 進一步閱讀的參考文獻
- 第三部分 工具鏈篇
- 第9章 代碼分析
- 9.1 死鎖檢測
- 9.2 競爭檢測
- 9.3 性能追蹤
- 9.4 代碼測試
- 9.5 基準測試
- 9.6 運行時統計量
- 9.7 語言服務協議
- 第10章 依賴管理
- 10.1 依賴管理的難點
- 10.2 語義化版本管理
- 10.3 最小版本選擇算法
- 10.4 Vgo 與dep之爭
- 第12章 泛型
- 12.1 泛型設計的演進
- 12.2 基于合約的泛型
- 12.3 類型檢查技術
- 12.4 泛型的未來
- 12.5 進一步閱讀的的參考文獻
- 第13章 編譯技術
- 13.1 詞法與文法
- 13.2 中間表示
- 13.3 優化器
- 13.4 指針檢查器
- 13.5 逃逸分析
- 13.6 自舉
- 13.7 鏈接器
- 13.8 匯編器
- 13.9 調用規約
- 13.10 cgo與系統調用
- 結束語: Go去向何方?