[TOC]
## waitGroup
### 基本結構
```
type WaitGroup struct {
// 避免復制使用的一個技巧,可以告訴vet工具違反了復制使用的規則,輔助Vet檢查
// 這個在 go sync 包中有很多它出現的身影
noCopy noCopy
// 64 位:高 32 位作為計數器,低 32 位作為 waiter 計數
// 64 位的原子操作要求 64 位對齊,但 32 位編譯器無法保證這個要求
// 因此分配 12 字節,然后將其中對齊的 8 字節作為狀態,其他 4 字節用于存儲原語
state1 [3]uint32
}
```
state1的方法:對64和32位的系統進行兼容
~~~
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
~~~
### 基本方法
#### **Add**
來增加設置計數器的值,對協程進行計數
~~~
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
// 競態檢查
if race.Enabled {
_ = *statep // trigger nil deref early
if delta < 0 {
// Synchronize decrements with Wait.
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
// delta 左移 32 位添加到計數器上面
state := atomic.AddUint64(statep, uint64(delta)<<32)
// v 代表 Add() 完之后當前計數器的值,取高 32 位的值
v := int32(state >> 32)
// w 代表當前調用 Wait 被阻塞的數量
w := uint32(state)
if race.Enabled && delta > 0 && v == int32(delta) {
// The first increment must be synchronized with Wait.
// Need to model this as a read, because there can be
// several concurrent wg.counter transitions from 0.
race.Read(unsafe.Pointer(semap))
}
// 非法
if v < 0 {
panic("sync: negative WaitGroup counter")
}
// w != 0,說明已經執行了 Wait() 操作,此時不允許再執行 Add()
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
// v == 0 && w > 0
// 此時不能再有一些狀態的并發改變的問題:
// - Add() 和 Wait() 操作不能并發執行
// - 如果計數器的值已經是 0 了,此時不能再執行 Wait() 操作
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 將 waiter 計數設置為 0,并且喚醒所有 waiter
// 由于 v 和 w 都是 0,所以這里直接將 *statep 設置為 0 就行
*statep = 0
// 喚醒所有 waiter
for ; w != 0; w-- {
// 釋放信號量
runtime_Semrelease(semap, false, 0)
}
}
~~~
#### **Done**
Done() 方法比較簡單,內部就是簡單的調用了 Add() 方法,參數傳 -1,將計數器的值減 1,代表當前協程工作完畢。
```
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
```
#### **Wait**
Wait() 方法在子 goroutine 執行完畢之前需要阻塞主 goroutine,其實現就是內部開了一個死循環,不停檢查計數器的值,直到其為 0 才結束。
~~~
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
// 競態檢查
if race.Enabled {
_ = *statep // trigger nil deref early
race.Disable()
}
// 啟動循環
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
// 計數器已經變成 0 了,不需要再等待,直接返回
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// 增加 waiter 數量(CAS)
// 直接在 state 的低位加就行,也就是直接 +1
if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 {
// Wait must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As a consequence, can do the write only for the first waiter,
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(semap))
}
// 等待信號量喚醒
runtime_Semacquire(semap)
// 這種情況說明在上一輪 Wait() 返回之前,wg 被重新使用了(重新進行了 Add() / Wait())
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
}
}
~~~
### WaitGroup 使用的一些注意事項
1.在任何時候都不要使計數器的值小于 0 ,這會引發程序的 panic。
2.Add() 方法的首次調用,與對它的 Wait() 方法的調用不能同時發生,例如在兩個不同的 goroutine 中分別調用這兩個方法,否則也會引發 panic。因此我們在聲明完 WaitGroup 的時候要盡早調用 Add() 方法。
3.如果想要重復使用 WaitGroup,我們需要等待前一輪調用 Wait() 返回之后再發起下一輪的調用。
4.調用 Done() 方法的次數要與 Add() 的計數器值相等,否則將會 panic。
- Go準備工作
- 依賴管理
- Go基礎
- 1、變量和常量
- 2、基本數據類型
- 3、運算符
- 4、流程控制
- 5、數組
- 數組聲明和初始化
- 遍歷
- 數組是值類型
- 6、切片
- 定義
- slice其他內容
- 7、map
- 8、函數
- 函數基礎
- 函數進階
- 9、指針
- 10、結構體
- 類型別名和自定義類型
- 結構體
- 11、接口
- 12、反射
- 13、并發
- 14、網絡編程
- 15、單元測試
- Go常用庫/包
- Context
- time
- strings/strconv
- file
- http
- Go常用第三方包
- Go優化
- Go問題排查
- Go框架
- 基礎知識點的思考
- 面試題
- 八股文
- 操作系統
- 整理一份資料
- interface
- array
- slice
- map
- MUTEX
- RWMUTEX
- Channel
- waitGroup
- context
- reflect
- gc
- GMP和CSP
- Select
- Docker
- 基本命令
- dockerfile
- docker-compose
- rpc和grpc
- consul和etcd
- ETCD
- consul
- gin
- 一些小點
- 樹
- K8s
- ES
- pprof
- mycat
- nginx
- 整理后的面試題
- 基礎
- Map
- Chan
- GC
- GMP
- 并發
- 內存
- 算法
- docker