[TOC]
## Channel
### 基本特性
#### 兩種模式
單向:
~~~
//只允許發送的通道:chan <- T
onlySend := make(chan <- int)
//只允許接收的通道:<- chan T
onlyRecv := make(<-chan int)
~~~
雙向:chan T
~~~
ch := make(chan int)
~~~
#### 緩沖
有緩沖
有緩存的 channel(buffered channel),其緩存區大小是根據所設置的值來調整。在功能上,若緩沖區未滿則不會阻塞,會源源不斷的進行傳輸。當緩沖區滿了后,發送者就會阻塞并等待。而當緩沖區為空時,接受者就會阻塞并等待,直至有新的數據
~~~
緩沖為10
ch := make(chan int,10)
~~~
無緩沖
無緩沖的 channel(unbuffered channel),其緩沖區大小則默認為 0。在功能上其接受者會阻塞等待并阻塞應用程序,直至收到通信和接收到數據。
~~~
ch := make(chan int)
~~~
### channel 本質
>本質就是一個環形隊列的配合,其包含發送方隊列、接收方隊列,加上互斥鎖`mutex`等結構。
#### 基本原理

#### 數據結構
hchan 結構體是 channel 在運行時的具體表現形式
~~~
type hchan struct {
qcount uint
dataqsiz uint
buf unsafe.Pointer
elemsize uint16
closed uint32
elemtype *_type
sendx uint
recvx uint
recvq waitq
sendq waitq
lock mutex //互斥鎖,chan不允許并發讀寫
}
~~~
qcount:channel里面的元素計數。內建函數 len 可以返回這個字段的值。已接收還沒被取走
dataqsiz:環形隊列大小,即可存放元素的個數。make(chan int,10),10就是這個值
buf:當 channel 設置了緩沖數量時,該 buf 指向一個存儲緩沖數據的區域,該區域是一個循環隊列的數據結構
elemsize :要發送或接收的數據類型大小
closed :標識關閉狀態
elemtype :元素類型
sendx :當 channel 設置了緩沖數量時,數據區域即循環隊列此時已發送數據的索引位置
recvx:當 channel 設置了緩沖數量時,數據區域即循環隊列此時已接收數據的索引位置
recvq :想讀取數據但又被阻塞住的 goroutine 隊列,即:等待讀消息的goroutine隊列
sendq :想發送數據但又被阻塞住的 goroutine 隊列,即:等待寫消息的goroutine隊列
在數據結構中,我們可以看到`recvq`和`sendq`,其表現為等待隊列,
其類型為`runtime.waitq`的雙向鏈表結構:
~~~
type waitq struct {
first *sudog
last *sudog
}
~~~
且無論是`first`屬性又或是`last`,其類型都為`runtime.sudog`結構體:
~~~
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer
...
}
~~~
* g:指向當前的 goroutine。
* next:指向下一個 g。
* prev:指向上一個 g。
* elem:數據元素,可能會指向堆棧。
### channel 實現原理
channel 的四大塊操作,分別是:“創建、發送、接收、關閉”。
#### 創建 chan
```
ch?:=?make(chan?string)
```
編譯器翻譯后對應`runtime.makechan`或`runtime.makechan64`方法:
```
//?通用創建方法
func?makechan(t?*chantype,?size?int)?*hchan
//?類型為?int64?的進行特殊處理
func?makechan64(t?*chantype,?size?int64)?*hchan
```
`makechan`方法
~~~
func makechan(t *chantype, size int) *hchan {
elem := t.elem
mem, _ := math.MulUintptr(elem.size, uintptr(size))
var c *hchan
switch {
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
return c
}
~~~
創建 channel 的邏輯主要分為三大塊:
* 當前 channel 不存在緩沖區,也就是元素大小為 0 的情況下,就會調用`mallocgc`方法分配一段連續的內存空間。
* 當前 channel 存儲的類型存在指針引用,就會連同`hchan`和底層數組同時分配一段連續的內存空間。
* 通用情況,默認分配相匹配的連續內存空間。
> 那就是 channel 的創建都是調用的`mallocgc`方法,也就是 channel 都是創建在堆上的。因此 channel 是會被 GC 回收的,自然也不總是需要`close`方法來進行顯示關閉了。
從整體上來講,`makechan`方法的邏輯比較簡單,就是創建`hchan`并分配合適的`buf`大小的堆上內存空間。
#### 發送
發送的時候會把send轉換成chansend1,chansend1再調用chansend
~~~
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
~~~
~~~
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//第一部分
if c == nil { // 先判斷通道是不是nil
if !block { //block是寫死的true
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) //是nil就阻塞休眠
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
//2、沒有阻塞,沒有關閉 但是滿了,就直接返回
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
//3、chan已經被close的情景
lock(&c.lock) //開始加鎖
if c.closed != 0 { //已被 close 了,再發送數據的話會 panic。
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
//5、查看接收隊列是不是有接收者
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// buf還沒滿
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
//就放到緩沖區
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz { //
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// 當bug滿了、沒有緩沖那種
gp := getg()
mysg := acquireSudog() //獲取sudog
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg) //放入發送等待隊列
......
return true
}
~~~
解讀:
第一部分:如果 chan 是 nil 的話,就把調用者 goroutine park(阻塞休眠),調用者就永遠被阻塞住了,
第二部分:往一個已經滿了的 chan 實例發送數據時,并且想不阻塞當前調用,那么這里的邏輯是直接返回。chansend1 方法在調用 chansend 的時候設置了阻塞參數,所以不會執行到第二部分的分支里。
第三部分:如果 chan 已經被 close 了,再往里面發送數據的話會 panic。
第四部分:如果等待隊列中有等待的 receiver,那么這段代碼就把它從隊列中彈出,然后直接把數據交給它(通過 memmove(dst, src, t.size)),而不需要放入到 buf 中,速度可以更快一些。
第五部分:當前沒有 receiver,需要把數據放入到 buf 中,放入之后,就成功返回了。
第六部分:處理 buf 滿的情況。如果 buf 滿了,發送者的 goroutine 就會加入到發送者的等待隊列中,直到被喚醒。這個時候,數據或者被取走了,或者 chan 被 close 了。
#### 接收
在處理從 chan 中接收數據時,Go 會把代碼轉換成 chanrecv1 函數,如果要返回兩個返回值,會轉換成 chanrecv2,chanrecv1 函數和 chanrecv2 會調用 chanrecv。
~~~
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
~~~
~~~
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if debugChan { //常量false,到處都是寫死的常量
print("chanrecv: chan=", c, "\n")
}
//1、判斷chan是不是nil
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//2、沒有阻塞,而且chan還是空的
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
//3、加鎖,返回時釋放鎖
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 { //被關閉了,且沒有緩沖元素了
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock) //釋放鎖
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
//4、沒有數據,在先讀后寫的情況下,即讀的g先到了
//查看寫隊列是不是有g,有就拿走
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
//5、沒有等待的sender,buf中有數據
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
//6、沒有元素就阻塞,掛起等待
gp := getg()
mysg := acquireSudog() /獲取sudog
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg) //放入寫goroutine列列
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)//掛起等待
......
}
~~~
解讀:
第一部分:chan 為 nil 的情況。和 send 一樣,從 nil chan 中接收(讀取、獲取)數據時,調用者會被永遠阻塞。
第二部分:沒有阻塞,而且chan還是空的
第三部分: chan 已經被 close 的情況。如果 chan 已經被 close 了,并且隊列中沒有緩存的元素,那么返回 true、false。
第四部分:處理 sendq 隊列中有等待者的情況。這個時候,如果 buf 中有數據,優先從buf 中讀取數據,否則直接從等待隊列中彈出一個 sender,把它的數據復制給這個receiver。
第五部分:處理沒有等待的 sender 的情況。這個是和 chansend 共用一把大鎖,所以不會有并發的問題。如果 buf 有元素,就取出一個元素給 receiver。
第六部分:處理 buf 中沒有元素的情況。如果沒有元素,那么當前的 receiver 就會被阻塞,直到它從 sender 中接收了數據,或者是 chan 被 close,才返回。
#### 關閉close
* 如果 chan 為 nil,close 會 panic;
* 如果 chan 已經 closed,再次 close 也會 panic。
* 如果 chan 不為 nil,chan 也沒有closed,就把等待隊列中的 sender(writer)和 receiver(reader)從隊列中全部移除并喚醒。
~~~
func closechan(c *hchan) {
if c == nil { //關閉nil chan,panic
panic(plainError("close of nil channel"))
}
lock(&c.lock) //加鎖
if c.closed != 0 { //關閉已經關閉的chan,panic
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
c.closed = 1
var glist gList
// release all readers
//釋放所有的reader
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
//釋放所有的writer(它們會panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
~~~
### 總結
圖形化:https://www.jianshu.com/p/78df8ab49495
**向 channel 寫數據:**
>recvq隊列不為空
直接從 recvq 取出 G ,并把數據寫入,最后把該 G 喚醒,結束發送過程。
>recvq隊列為空
1、buf沒有滿,直接把數據發到buf隊尾,結束發送過程。
2、buf滿了或者就沒有,阻塞休眠,加入sendq隊列,等待喚醒
**從 channel 讀數據**
>sendq隊列不為空
1、沒有緩沖區,直接從 sendq 中取出 G ,把 G 中數據讀出,最后把 G 喚醒,結束讀取過程。
2、說明緩沖區已滿,從緩沖區中首部讀出數據,把 G 中數據寫入緩沖區尾部,把 G 喚醒,結束讀取過程。
>sendq隊列為空
1、緩沖區中有數據,則從緩沖區取出數據,結束讀取過程。
2、緩沖區中沒有數據,將當前 goroutine 加入 recvq ,進入睡眠,等待被寫 goroutine 喚醒。
**關閉 channel**
1.關閉 channel 時會將 recvq 中的 G 全部喚醒,本該寫入 G 的數據位置為 nil。將 sendq 中的 G 全部喚醒,但是這些 G 會 panic。
panic 出現的場景還有:
* 關閉值為 nil 的 channel
* 關閉已經關閉的 channel
* 向已經關閉的 channel 中寫數據
- Go準備工作
- 依賴管理
- Go基礎
- 1、變量和常量
- 2、基本數據類型
- 3、運算符
- 4、流程控制
- 5、數組
- 數組聲明和初始化
- 遍歷
- 數組是值類型
- 6、切片
- 定義
- slice其他內容
- 7、map
- 8、函數
- 函數基礎
- 函數進階
- 9、指針
- 10、結構體
- 類型別名和自定義類型
- 結構體
- 11、接口
- 12、反射
- 13、并發
- 14、網絡編程
- 15、單元測試
- Go常用庫/包
- Context
- time
- strings/strconv
- file
- http
- Go常用第三方包
- Go優化
- Go問題排查
- Go框架
- 基礎知識點的思考
- 面試題
- 八股文
- 操作系統
- 整理一份資料
- interface
- array
- slice
- map
- MUTEX
- RWMUTEX
- Channel
- waitGroup
- context
- reflect
- gc
- GMP和CSP
- Select
- Docker
- 基本命令
- dockerfile
- docker-compose
- rpc和grpc
- consul和etcd
- ETCD
- consul
- gin
- 一些小點
- 樹
- K8s
- ES
- pprof
- mycat
- nginx
- 整理后的面試題
- 基礎
- Map
- Chan
- GC
- GMP
- 并發
- 內存
- 算法
- docker