# 第八章 Goroutines和Channels
并發程序指同時進行多個任務的程序,隨著硬件的發展,并發程序變得越來越重要。Web服務器會一次處理成千上萬的請求。平板電腦和手機app在渲染用戶畫面同時還會后臺執行各種計算任務和網絡請求。即使是傳統的批處理問題--讀取數據,計算,寫輸出--現在也會用并發來隱藏掉I/O的操作延遲以充分利用現代計算機設備的多個核心。計算機的性能每年都在以非線性的速度增長。
Go語言中的并發程序可以用兩種手段來實現。本章講解goroutine和channel,其支持“順序通信進程”(communicating sequential processes)或被簡稱為CSP。CSP是一種現代的并發編程模型,在這種編程模型中值會在不同的運行實例(goroutine)中傳遞,盡管大多數情況下仍然是被限制在單一實例中。第9章覆蓋更為傳統的并發模型:多線程共享內存,如果你在其它的主流語言中寫過并發程序的話可能會更熟悉一些。第9章也會深入介紹一些并發程序帶來的風險和陷阱。
盡管Go對并發的支持是眾多強力特性之一,但跟蹤調試并發程序還是很困難,在線性程序中形成的直覺往往還會使我們誤入歧途。如果這是讀者第一次接觸并發,推薦稍微多花一些時間來思考這兩個章節中的樣例。
### 8.1. Goroutines
在Go語言中,每一個并發的執行單元叫作一個goroutine。設想這里的一個程序有兩個函數,一個函數做計算,另一個輸出結果,假設兩個函數沒有相互之間的調用關系。一個線性的程序會先調用其中的一個函數,然后再調用另一個。如果程序中包含多個goroutine,對兩個函數的調用則可能發生在同一時刻。馬上就會看到這樣的一個程序。
如果你使用過操作系統或者其它語言提供的線程,那么你可以簡單地把goroutine類比作一個線程,這樣你就可以寫出一些正確的程序了。goroutine和線程的本質區別會在9.8節中講。
當一個程序啟動時,其主函數即在一個單獨的goroutine中運行,我們叫它main goroutine。新的goroutine會用go語句來創建。在語法上,go語句是一個普通的函數或方法調用前加上關鍵字go。go語句會使其語句中的函數在一個新創建的goroutine中運行。而go語句本身會迅速地完成。
~~~
f() // call f(); wait for it to return
go f() // create a new goroutine that calls f(); don't wait
~~~
下面的例子,main goroutine將計算菲波那契數列的第45個元素值。由于計算函數使用低效的遞歸,所以會運行相當長時間,在此期間我們想讓用戶看到一個可見的標識來表明程序依然在正常運行,所以來做一個動畫的小圖標:
*gopl.io/ch8/spinner*
~~~
func main() {
go spinner(100 * time.Millisecond)
const n = 45
fibN := fib(n) // slow
fmt.Printf("\rFibonacci(%d) = %d\n", n, fibN)
}
func spinner(delay time.Duration) {
for {
for _, r := range `-\|/` {
fmt.Printf("\r%c", r)
time.Sleep(delay)
}
}
}
func fib(x int) int {
if x < 2 {
return x
}
return fib(x-1) + fib(x-2)
}
~~~
動畫顯示了幾秒之后,fib(45)的調用成功地返回,并且打印結果:
~~~
Fibonacci(45) = 1134903170
~~~
然后主函數返回。主函數返回時,所有的goroutine都會被直接打斷,程序退出。除了從主函數退出或者直接終止程序之外,沒有其它的編程方法能夠讓一個goroutine來打斷另一個的執行,但是之后可以看到一種方式來實現這個目的,通過goroutine之間的通信來讓一個goroutine請求其它的goroutine,并被請求的goroutine自行結束執行。
留意一下這里的兩個獨立的單元是如何進行組合的,spinning和菲波那契的計算。分別在獨立的函數中,但兩個函數會同時執行。
### 8.2. 示例: 并發的Clock服務
網絡編程是并發大顯身手的一個領域,由于服務器是最典型的需要同時處理很多連接的程序,這些連接一般來自遠彼此獨立的客戶端。在本小節中,我們會講解go語言的net包,這個包提供編寫一個網絡客戶端或者服務器程序的基本組件,無論兩者間通信是使用TCP,UDP或者Unix domain sockets。在第一章中我們已經使用過的net/http包里的方法,也算是net包的一部分。
我們的第一個例子是一個順序執行的時鐘服務器,它會每隔一秒鐘將當前時間寫到客戶端:
*gopl.io/ch8/clock1*
~~~
// Clock1 is a TCP server that periodically writes the time.
package main
import (
"io"
"log"
"net"
"time"
)
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err) // e.g., connection aborted
continue
}
handleConn(conn) // handle one connection at a time
}
}
func handleConn(c net.Conn) {
defer c.Close()
for {
_, err := io.WriteString(c, time.Now().Format("15:04:05\n"))
if err != nil {
return // e.g., client disconnected
}
time.Sleep(1 * time.Second)
}
}
~~~
Listen函數創建了一個net.Listener的對象,這個對象會監聽一個網絡端口上到來的連接,在這個例子里我們用的是TCP的localhost:8000端口。listener對象的Accept方法會直接阻塞,直到一個新的連接被創建,然后會返回一個net.Conn對象來表示這個連接。
handleConn函數會處理一個完整的客戶端連接。在一個for死循環中,將當前的時候用time.Now()函數得到,然后寫到客戶端。由于net.Conn實現了io.Writer接口,我們可以直接向其寫入內容。這個死循環會一直執行,直到寫入失敗。最可能的原因是客戶端主動斷開連接。這種情況下handleConn函數會用defer調用關閉服務器側的連接,然后返回到主函數,繼續等待下一個連接請求。
time.Time.Format方法提供了一種格式化日期和時間信息的方式。它的參數是一個格式化模板標識如何來格式化時間,而這個格式化模板限定為Mon Jan 2 03:04:05PM 2006 UTC-0700。有8個部分(周幾,月份,一個月的第幾天,等等)。可以以任意的形式來組合前面這個模板;出現在模板中的部分會作為參考來對時間格式進行輸出。在上面的例子中我們只用到了小時、分鐘和秒。time包里定義了很多標準時間格式,比如time.RFC1123。在進行格式化的逆向操作time.Parse時,也會用到同樣的策略。(譯注:這是go語言和其它語言相比比較奇葩的一個地方。。你需要記住格式化字符串是1月2日下午3點4分5秒零六年UTC-0700,而不像其它語言那樣Y-m-d H:i:s一樣,當然了這里可以用1234567的方式來記憶,倒是也不麻煩)
為了連接例子里的服務器,我們需要一個客戶端程序,比如netcat這個工具(nc命令),這個工具可以用來執行網絡連接操作。
~~~
$ go build gopl.io/ch8/clock1
$ ./clock1 &
$ nc localhost 8000
13:58:54
13:58:55
13:58:56
13:58:57
^C
~~~
客戶端將服務器發來的時間顯示了出來,我們用Control+C來中斷客戶端的執行,在Unix系統上,你會看到^C這樣的響應。如果你的系統沒有裝nc這個工具,你可以用telnet來實現同樣的效果,或者也可以用我們下面的這個用go寫的簡單的telnet程序,用net.Dial就可以簡單地創建一個TCP連接:
*gopl.io/ch8/netcat1*
~~~
// Netcat1 is a read-only TCP client.
package main
import (
"io"
"log"
"net"
"os"
)
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
mustCopy(os.Stdout, conn)
}
func mustCopy(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
log.Fatal(err)
}
}
~~~
這個程序會從連接中讀取數據,并將讀到的內容寫到標準輸出中,直到遇到end of file的條件或者發生錯誤。mustCopy這個函數我們在本節的幾個例子中都會用到。讓我們同時運行兩個客戶端來進行一個測試,這里可以開兩個終端窗口,下面左邊的是其中的一個的輸出,右邊的是另一個的輸出:
~~~
$ go build gopl.io/ch8/netcat1
$ ./netcat1
13:58:54 $ ./netcat1
13:58:55
13:58:56
^C
13:58:57
13:58:58
13:58:59
^C
$ killall clock1
~~~
killall命令是一個Unix命令行工具,可以用給定的進程名來殺掉所有名字匹配的進程。
第二個客戶端必須等待第一個客戶端完成工作,這樣服務端才能繼續向后執行;因為我們這里的服務器程序同一時間只能處理一個客戶端連接。我們這里對服務端程序做一點小改動,使其支持并發:在handleConn函數調用的地方增加go關鍵字,讓每一次handleConn的調用都進入一個獨立的goroutine。
*gopl.io/ch8/clock2*
~~~
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err) // e.g., connection aborted
continue
}
go handleConn(conn) // handle connections concurrently
}
~~~
現在多個客戶端可以同時接收到時間了:
~~~
$ go build gopl.io/ch8/clock2
$ ./clock2 &
$ go build gopl.io/ch8/netcat1
$ ./netcat1
14:02:54 $ ./netcat1
14:02:55 14:02:55
14:02:56 14:02:56
14:02:57 ^C
14:02:58
14:02:59 $ ./netcat1
14:03:00 14:03:00
14:03:01 14:03:01
^C 14:03:02
^C
$ killall clock2
~~~
**練習 8.1:** 修改clock2來支持傳入參數作為端口號,然后寫一個clockwall的程序,這個程序可以同時與多個clock服務器通信,從多服務器中讀取時間,并且在一個表格中一次顯示所有服務傳回的結果,類似于你在某些辦公室里看到的時鐘墻。如果你有地理學上分布式的服務器可以用的話,讓這些服務器跑在不同的機器上面;或者在同一臺機器上跑多個不同的實例,這些實例監聽不同的端口,假裝自己在不同的時區。像下面這樣:
~~~
$ TZ=US/Eastern ./clock2 -port 8010 &
$ TZ=Asia/Tokyo ./clock2 -port 8020 &
$ TZ=Europe/London ./clock2 -port 8030 &
$ clockwall NewYork=localhost:8010 Tokyo=localhost:8020 London=localhost:8030
~~~
**練習 8.2:** 實現一個并發FTP服務器。服務器應該解析客戶端來的一些命令,比如cd命令來切換目錄,ls來列出目錄內文件,get和send來傳輸文件,close來關閉連接。你可以用標準的ftp命令來作為客戶端,或者也可以自己實現一個。
### 8.3. 示例: 并發的Echo服務
clock服務器每一個連接都會起一個goroutine。在本節中我們會創建一個echo服務器,這個服務在每個連接中會有多個goroutine。大多數echo服務僅僅會返回他們讀取到的內容,就像下面這個簡單的handleConn函數所做的一樣:
~~~
func handleConn(c net.Conn) {
io.Copy(c, c) // NOTE: ignoring errors
c.Close()
}
~~~
一個更有意思的echo服務應該模擬一個實際的echo的“回響”,并且一開始要用大寫HELLO來表示“聲音很大”,之后經過一小段延遲返回一個有所緩和的Hello,然后一個全小寫字母的hello表示聲音漸漸變小直至消失,像下面這個版本的handleConn(譯注:笑看作者腦洞大開):
*gopl.io/ch8/reverb1*
~~~
func echo(c net.Conn, shout string, delay time.Duration) {
fmt.Fprintln(c, "\t", strings.ToUpper(shout))
time.Sleep(delay)
fmt.Fprintln(c, "\t", shout)
time.Sleep(delay)
fmt.Fprintln(c, "\t", strings.ToLower(shout))
}
func handleConn(c net.Conn) {
input := bufio.NewScanner(c)
for input.Scan() {
echo(c, input.Text(), 1*time.Second)
}
// NOTE: ignoring potential errors from input.Err()
c.Close()
}
~~~
我們需要升級我們的客戶端程序,這樣它就可以發送終端的輸入到服務器,并把服務端的返回輸出到終端上,這使我們有了使用并發的另一個好機會:
*gopl.io/ch8/netcat2*
~~~
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
go mustCopy(os.Stdout, conn)
mustCopy(conn, os.Stdin)
}
~~~
當main goroutine從標準輸入流中讀取內容并將其發送給服務器時,另一個goroutine會讀取并打印服務端的響應。當main goroutine碰到輸入終止時,例如,用戶在終端中按了Control-D(^D),在windows上是Control-Z,這時程序就會被終止,盡管其它goroutine中還有進行中的任務。(在8.4.1中引入了channels后我們會明白如何讓程序等待兩邊都結束)。
下面這個會話中,客戶端的輸入是左對齊的,服務端的響應會用縮進來區別顯示。 客戶端會向服務器“喊三次話”:
~~~
$ go build gopl.io/ch8/reverb1
$ ./reverb1 &
$ go build gopl.io/ch8/netcat2
$ ./netcat2
Hello?
HELLO?
Hello?
hello?
Is there anybody there?
IS THERE ANYBODY THERE?
Yooo-hooo!
Is there anybody there?
is there anybody there?
YOOO-HOOO!
Yooo-hooo!
yooo-hooo!
^D
$ killall reverb1
~~~
注意客戶端的第三次shout在前一個shout處理完成之前一直沒有被處理,這貌似看起來不是特別“現實”。真實世界里的回響應該是會由三次shout的回聲組合而成的。為了模擬真實世界的回響,我們需要更多的goroutine來做這件事情。這樣我們就再一次地需要go這個關鍵詞了,這次我們用它來調用echo:
*gopl.io/ch8/reverb2*
~~~
func handleConn(c net.Conn) {
input := bufio.NewScanner(c)
for input.Scan() {
go echo(c, input.Text(), 1*time.Second)
}
// NOTE: ignoring potential errors from input.Err()
c.Close()
}
~~~
go后跟的函數的參數會在go語句自身執行時被求值;因此input.Text()會在main goroutine中被求值。 現在回響是并發并且會按時間來覆蓋掉其它響應了:
~~~
$ go build gopl.io/ch8/reverb2
$ ./reverb2 &
$ ./netcat2
Is there anybody there?
IS THERE ANYBODY THERE?
Yooo-hooo!
Is there anybody there?
YOOO-HOOO!
is there anybody there?
Yooo-hooo!
yooo-hooo!
^D
$ killall reverb2
~~~
讓服務使用并發不只是處理多個客戶端的請求,甚至在處理單個連接時也可能會用到,就像我們上面的兩個go關鍵詞的用法。然而在我們使用go關鍵詞的同時,需要慎重地考慮net.Conn中的方法在并發地調用時是否安全,事實上對于大多數類型來說也確實不安全。我們會在下一章中詳細地探討并發安全性。
### 8.4. Channels
如果說goroutine是Go語音程序的并發體的話,那么channels它們之間的通信機制。一個channels是一個通信機制,它可以讓一個goroutine通過它給另一個goroutine發送值信息。每個channel都有一個特殊的類型,也就是channels可發送數據的類型。一個可以發送int類型數據的channel一般寫為chan int。
使用內置的make函數,我們可以創建一個channel:
~~~
ch := make(chan int) // ch has type 'chan int'
~~~
和map類似,channel也一個對應make創建的底層數據結構的引用。當我們復制一個channel或用于函數參數傳遞時,我們只是拷貝了一個channel引用,因此調用者何被調用者將引用同一個channel對象。和其它的引用類型一樣,channel的零值也是nil。
兩個相同類型的channel可以使用==運算符比較。如果兩個channel引用的是相通的對象,那么比較的結果為真。一個channel也可以和nil進行比較。
一個channel有發送和接受兩個主要操作,都是通信行為。一個發送語句將一個值從一個goroutine通過channel發送到另一個執行接收操作的goroutine。發送和接收兩個操作都是用`<-`運算符。在發送語句中,`<-`運算符分割channel和要發送的值。在接收語句中,`<-`運算符寫在channel對象之前。一個不使用接收結果的接收操作也是合法的。
~~~
ch <- x // a send statement
x = <-ch // a receive expression in an assignment statement
<-ch // a receive statement; result is discarded
~~~
Channel還支持close操作,用于關閉channel,隨后對基于該channel的任何發送操作都將導致panic異常。對一個已經被close過的channel之行接收操作依然可以接受到之前已經成功發送的數據;如果channel中已經沒有數據的話講產生一個零值的數據。
使用內置的close函數就可以關閉一個channel:
~~~
close(ch)
~~~
以最簡單方式調用make函數創建的時一個無緩存的channel,但是我們也可以指定第二個整形參數,對應channel的容量。如果channel的容量大于零,那么該channel就是帶緩存的channel。
~~~
ch = make(chan int) // unbuffered channel
ch = make(chan int, 0) // unbuffered channel
ch = make(chan int, 3) // buffered channel with capacity 3
~~~
我們將先討論無緩存的channel,然后在8.4.4節討論帶緩存的channel。
### 8.4.1. 不帶緩存的Channels
一個基于無緩存Channels的發送操作將導致發送者goroutine阻塞,直到另一個goroutine在相同的Channels上執行接收操作,當發送的值通過Channels成功傳輸之后,兩個goroutine可以繼續執行后面的語句。反之,如果接收操作先發生,那么接收者goroutine也將阻塞,直到有另一個goroutine在相同的Channels上執行發送操作。
基于無緩存Channels的發送和接收操作將導致兩個goroutine做一次同步操作。因為這個原因,無緩存Channels有時候也被稱為同步Channels。當通過一個無緩存Channels發送數據時,接收者收到數據發生在喚醒發送者goroutine之前(譯注:*happens before*,這是Go語言并發內存模型的一個關鍵術語!)。
在討論并發編程時,當我們說x事件在y事件之前發生(*happens before*),我們并不是說x事件在時間上比y時間更早;我們要表達的意思是要保證在此之前的事件都已經完成了,例如在此之前的更新某些變量的操作已經完成,你可以放心依賴這些已完成的事件了。
當我們說x事件既不是在y事件之前發生也不是在y事件之后發生,我們就說x事件和y事件是并發的。這并不是意味著x事件和y事件就一定是同時發生的,我們只是不能確定這兩個事件發生的先后順序。在下一章中我們將看到,當兩個goroutine并發訪問了相同的變量時,我們有必要保證某些事件的執行順序,以避免出現某些并發問題。
在8.3節的客戶端程序,它在主goroutine中(譯注:就是執行main函數的goroutine)將標準輸入復制到server,因此當客戶端程序關閉標準輸入時,后臺goroutine可能依然在工作。我們需要讓主goroutine等待后臺goroutine完成工作后再退出,我們使用了一個channel來同步兩個goroutine:
*gopl.io/ch8/netcat3*
~~~
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
done := make(chan struct{})
go func() {
io.Copy(os.Stdout, conn) // NOTE: ignoring errors
log.Println("done")
done <- struct{}{} // signal the main goroutine
}()
mustCopy(conn, os.Stdin)
conn.Close()
<-done // wait for background goroutine to finish
}
~~~
當用戶關閉了標準輸入,主goroutine中的mustCopy函數調用將返回,然后調用conn.Close()關閉讀和寫方向的網絡連接。關閉網絡鏈接中的寫方向的鏈接將導致server程序收到一個文件(end-of-?le)結束的信號。關閉網絡鏈接中讀方向的鏈接將導致后臺goroutine的io.Copy函數調用返回一個“read from closed connection”(“從關閉的鏈接讀”)類似的錯誤,因此我們臨時移除了錯誤日志語句;在練習8.3將會提供一個更好的解決方案。(需要注意的是go語句調用了一個函數字面量,這Go語言中啟動goroutine常用的形式。)
在后臺goroutine返回之前,它先打印一個日志信息,然后向done對應的channel發送一個值。主goroutine在退出前先等待從done對應的channel接收一個值。因此,總是可以在程序退出前正確輸出“done”消息。
基于channels發送消息有兩個重要方面。首先每個消息都有一個值,但是有時候通訊的事實和發生的時刻也同樣重要。當我們更希望強調通訊發生的時刻時,我們將它稱為**消息事件**。有些消息事件并不攜帶額外的信息,它僅僅是用作兩個goroutine之間的同步,這時候我們可以用`struct{}`空結構體作為channels元素的類型,雖然也可以使用bool或int類型實現同樣的功能,`done <- 1`語句也比`done <- struct{}{}`更短。
**練習 8.3:** 在netcat3例子中,conn雖然是一個interface類型的值,但是其底層真實類型是`*net.TCPConn`,代表一個TCP鏈接。一個TCP鏈接有讀和寫兩個部分,可以使用CloseRead和CloseWrite方法分別關閉它們。修改netcat3的主goroutine代碼,只關閉網絡鏈接中寫的部分,這樣的話后臺goroutine可以在標準輸入被關閉后繼續打印從reverb1服務器傳回的數據。(要在reverb2服務器也完成同樣的功能是比較困難的;參考**練習 8.4**。)
### 8.4.2. 串聯的Channels(Pipeline)
Channels也可以用于將多個goroutine鏈接在一起,一個Channels的輸出作為下一個Channels的輸入。這種串聯的Channels就是所謂的管道(pipeline)。下面的程序用兩個channels將三個goroutine串聯起來,如圖8.1所示。

第一個goroutine是一個計數器,用于生成0、1、2、……形式的整數序列,然后通過channel將該整數序列發送給第二個goroutine;第二個goroutine是一個求平方的程序,對收到的每個整數求平方,然后將平方后的結果通過第二個channel發送給第三個goroutine;第三個goroutine是一個打印程序,打印收到的每個整數。為了保持例子清晰,我們有意選擇了非常簡單的函數,當然三個goroutine的計算很簡單,在現實中確實沒有必要為如此簡單的運算構建三個goroutine。
*gopl.io/ch8/pipeline1*
~~~
func main() {
naturals := make(chan int)
squares := make(chan int)
// Counter
go func() {
for x := 0; ; x++ {
naturals <- x
}
}()
// Squarer
go func() {
for {
x := <-naturals
squares <- x * x
}
}()
// Printer (in main goroutine)
for {
fmt.Println(<-squares)
}
}
~~~
如您所料,上面的程序將生成0、1、4、9、……形式的無窮數列。像這樣的串聯Channels的管道(Pipelines)可以用在需要長時間運行的服務中,每個長時間運行的goroutine可能會包含一個死循環,在不同goroutine的死循環內部使用串聯的Channels來通信。但是,如果我們希望通過Channels只發送有限的數列該如何處理呢?
如果發送者知道,沒有更多的值需要發送到channel的話,那么讓接收者也能及時知道沒有多余的值可接收將是有用的,因為接收者可以停止不必要的接收等待。這可以通過內置的close函數來關閉channel實現:
~~~
close(naturals)
~~~
當一個channel被關閉后,再向該channel發送數據將導致panic異常。當一個被關閉的channel中已經發送的數據都被成功接收后,后續的接收操作將不再阻塞,它們會立即返回一個零值。關閉上面例子中的naturals變量對應的channel并不能終止循環,它依然會收到一個永無休止的零值序列,然后將它們發送給打印者goroutine。
沒有辦法直接測試一個channel是否被關閉,但是接收操作有一個變體形式:它多接收一個結果,多接收的第二個結果是一個布爾值ok,ture表示成功從channels接收到值,false表示channels已經被關閉并且里面沒有值可接收。使用這個特性,我們可以修改squarer函數中的循環代碼,當naturals對應的channel被關閉并沒有值可接收時跳出循環,并且也關閉squares對應的channel.
~~~
// Squarer
go func() {
for {
x, ok := <-naturals
if !ok {
break // channel was closed and drained
}
squares <- x * x
}
close(squares)
}()
~~~
因為上面的語法是笨拙的,而且這種處理模式很場景,因此Go語言的range循環可直接在channels上面迭代。使用range循環是上面處理模式的簡潔語法,它依次從channel接收數據,當channel被關閉并且沒有值可接收時跳出循環。
在下面的改進中,我們的計數器goroutine只生成100個含數字的序列,然后關閉naturals對應的channel,這將導致計算平方數的squarer對應的goroutine可以正常終止循環并關閉squares對應的channel。(在一個更復雜的程序中,可以通過defer語句關閉對應的channel。)最后,主goroutine也可以正常終止循環并退出程序。
*gopl.io/ch8/pipeline2*
~~~
func main() {
naturals := make(chan int)
squares := make(chan int)
// Counter
go func() {
for x := 0; x < 100; x++ {
naturals <- x
}
close(naturals)
}()
// Squarer
go func() {
for x := range naturals {
squares <- x * x
}
close(squares)
}()
// Printer (in main goroutine)
for x := range squares {
fmt.Println(x)
}
}
~~~
其實你并不需要關閉每一個channel。只要當需要告訴接收者goroutine,所有的數據已經全部發送時才需要關閉channel。不管一個channel是否被關閉,當它沒有被引用時將會被Go語言的垃圾自動回收器回收。(不要將關閉一個打開文件的操作和關閉一個channel操作混淆。對于每個打開的文件,都需要在不使用的使用調用對應的Close方法來關閉文件。)
視圖重復關閉一個channel將導致panic異常,視圖關閉一個nil值的channel也將導致panic異常。關閉一個channels還會觸發一個廣播機制,我們將在8.9節討論。
### 8.4.3. 單方向的Channel
隨著程序的增長,人們習慣于將大的函數拆分為小的函數。我們前面的例子中使用了三個goroutine,然后用兩個channels連鏈接它們,它們都是main函數的局部變量。將三個goroutine拆分為以下三個函數是自然的想法:
~~~
func counter(out chan int)
func squarer(out, in chan int)
func printer(in chan int)
~~~
其中squarer計算平方的函數在兩個串聯Channels的中間,因此擁有兩個channels類型的參數,一個用于輸入一個用于輸出。每個channels都用有相同的類型,但是它們的使用方式想反:一個只用于接收,另一個只用于發送。參數的名字in和out已經明確表示了這個意圖,但是并無法保證squarer函數向一個in參數對應的channels發送數據或者從一個out參數對應的channels接收數據。
這種場景是典型的。當一個channel作為一個函數參數是,它一般總是被專門用于只發送或者只接收。
為了表明這種意圖并防止被濫用,Go語言的類型系統提供了單方向的channel類型,分別用于只發送或只接收的channel。類型`chan<- int`表示一個只發送int的channel,只能發送不能接收。相反,類型`<-chan int`表示一個只接收int的channel,只能接收不能發送。(箭頭`<-`和關鍵字chan的相對位置表明了channel的方向。)這種限制將在編譯期檢測。
因為關閉操作只用于斷言不再向channel發送新的數據,所以只有在發送者所在的goroutine才會調用close函數,因此對一個只接收的channel調用close將是一個編譯錯誤。
這是改進的版本,這一次參數使用了單方向channel類型:
*gopl.io/ch8/pipeline3*
~~~
func counter(out chan<- int) {
for x := 0; x < 100; x++ {
out <- x
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for v := range in {
out <- v * v
}
close(out)
}
func printer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
func main() {
naturals := make(chan int)
squares := make(chan int)
go counter(naturals)
go squarer(squares, naturals)
printer(squares)
}
~~~
調用counter(naturals)將導致將`chan int`類型的naturals隱式地轉換為`chan<- int`類型只發送型的channel。調用printer(squares)也會導致相似的隱式轉換,這一次是轉換為`<-chan int`類型只接收型的channel。任何雙向channel向單向channel變量的賦值操作都將導致該隱式轉換。這里并沒有反向轉換的語法:也就是不能一個將類似`chan<- int`類型的單向型的channel轉換為`chan int`類型的雙向型的channel。
### 8.4.4. 帶緩存的Channels
帶緩存的Channel內部持有一個元素隊列。隊列的最大容量是在調用make函數創建channel時通過第二個參數指定的。下面的語句創建了一個可以持有三個字符串元素的帶緩存Channel。圖8.2是ch變量對應的channel的圖形表示形式。
~~~
ch = make(chan string, 3)
~~~

向緩存Channel的發送操作就是向內部緩存隊列的尾部插入元素,接收操作則是從隊列的頭部刪除元素。如果內部緩存隊列是滿的,那么發送操作將阻塞直到因另一個goroutine執行接收操作而釋放了新的隊列空間。相反,如果channel是空的,接收操作將阻塞直到有另一個goroutine執行發送操作而向隊列插入元素。
我們可以在無阻塞的情況下連續向新創建的channel發送三個值:
~~~
ch <- "A"
ch <- "B"
ch <- "C"
~~~
此刻,channel的內部緩存隊列將是滿的(圖8.3),如果有第四個發送操作將發生阻塞。

如果我們接收一個值,
~~~
fmt.Println(<-ch) // "A"
~~~
那么channel的緩存隊列將不是滿的也不是空的(圖8.4),因此對該channel執行的發送或接收操作都不會發送阻塞。通過這種方式,channel的緩存隊列解耦了接收和發送的goroutine。

在某些特殊情況下,程序可能需要知道channel內部緩存的容量,可以用內置的cap函數獲取:
~~~
fmt.Println(cap(ch)) // "3"
~~~
同樣,對于內置的len函數,如果傳入的是channel,那么將返回channel內部緩存隊列中有效元素的個數。因為在并發程序中該信息會隨著接收操作而失效,但是它對某些故障診斷和性能優化會有幫助。
~~~
fmt.Println(len(ch)) // "2"
~~~
在繼續執行兩次接收操作后channel內部的緩存隊列將又成為空的,如果有第四個接收操作將發生阻塞:
~~~
fmt.Println(<-ch) // "B"
fmt.Println(<-ch) // "C"
~~~
在這個例子中,發送和接收操作都發生在同一個goroutine中,但是在真是的程序中它們一般由不同的goroutine執行。Go語言新手有時候會將一個帶緩存的channel當作同一個goroutine中的隊列使用,雖然語法看似簡單,但實際上這是一個錯誤。Channel和goroutine的調度器機制是緊密相連的,一個發送操作——或許是整個程序——可能會永遠阻塞。如果你只是需要一個簡單的隊列,使用slice就可以了。
下面的例子展示了一個使用了帶緩存channel的應用。它并發地向三個鏡像站點發出請求,三個鏡像站點分散在不同的地理位置。它們分別將收到的響應發送到帶緩存channel,最后接收者只接收第一個收到的響應,也就是最快的那個響應。因此mirroredQuery函數可能在另外兩個響應慢的鏡像站點響應之前就返回了結果。(順便說一下,多個goroutines并發地向同一個channel發送數據,或從同一個channel接收數據都是常見的用法。)
~~~
func mirroredQuery() string {
responses := make(chan string, 3)
go func() { responses <- request("asia.gopl.io") }()
go func() { responses <- request("europe.gopl.io") }()
go func() { responses <- request("americas.gopl.io") }()
return <-responses // return the quickest response
}
func request(hostname string) (response string) { /* ... */ }
~~~
如果我們使用了無緩存的channel,那么兩個慢的goroutines將會因為沒有人接收而被永遠卡住。這種情況,稱為goroutines泄漏,這將是一個BUG。和垃圾變量不同,泄漏的goroutines并不會被自動回收,因此確保每個不再需要的goroutine能正常退出是重要的。
關于無緩存或帶緩存channels之間的選擇,或者是帶緩存channels的容量大小的選擇,都可能影響程序的正確性。無緩存channel更強地保證了每個發送操作與相應的同步接收操作;但是對于帶緩存channel,這些操作是解耦的。同樣,即使我們知道將要發送到一個channel的信息的數量上限,創建一個對應容量大小帶緩存channel也是不現實的,因為這要求在執行任何接收操作之前緩存所有已經發送的值。如果未能分配足夠的緩沖將導致程序死鎖。
Channel的緩存也可能影響程序的性能。想象一家蛋糕店有三個廚師,一個烘焙,一個上糖衣,還有一個將每個蛋糕傳遞到它下一個廚師在生產線。在狹小的廚房空間環境,每個廚師在完成蛋糕后必須等待下一個廚師已經準備好接受它;這類似于在一個無緩存的channel上進行溝通。
如果在每個廚師之間有一個放置一個蛋糕的額外空間,那么每個廚師就可以將一個完成的蛋糕臨時放在那里而馬上進入下一個蛋糕在制作中;這類似于將channel的緩存隊列的容量設置為1。只要每個廚師的平均工作效率相近,那么其中大部分的傳輸工作將是迅速的,個體之間細小的效率差異將在交接過程中彌補。如果廚師之間有更大的額外空間——也是就更大容量的緩存隊列——將可以在不停止生產線的前提下消除更大的效率波動,例如一個廚師可以短暫地休息,然后在加快趕上進度而不影響其其他人。
另一方面,如果生產線的前期階段一直快于后續階段,那么它們之間的緩存在大部分時間都將是滿的。相反,如果后續階段比前期階段更快,那么它們之間的緩存在大部分時間都將是空的。對于這類場景,額外的緩存并沒有帶來任何好處。
生產線的隱喻對于理解channels和goroutines的工作機制是很有幫助的。例如,如果第二階段是需要精心制作的復雜操作,一個廚師可能無法跟上第一個廚師的進度,或者是無法滿足第階段廚師的需求。要解決這個問題,我們可以雇傭另一個廚師來幫助完成第二階段的工作,他執行相同的任務但是獨立工作。這類似于基于相同的channels創建另一個獨立的goroutine。
我們沒有太多的空間展示全部細節,但是gopl.io/ch8/cake包模擬了這個蛋糕店,可以通過不同的參數調整。它還對上面提到的幾種場景提供對應的基準測試(§11.4) 。
### 8.5. 并發的循環
本節中,我們會探索一些用來在并行時循環迭代的常見并發模型。我們會探究從全尺寸圖片生成一些縮略圖的問題。gopl.io/ch8/thumbnail包提供了ImageFile函數來幫我們拉伸圖片。我們不會說明這個函數的實現,只需要從gopl.io下載它。
*gopl.io/ch8/thumbnail*
~~~
package thumbnail
// ImageFile reads an image from infile and writes
// a thumbnail-size version of it in the same directory.
// It returns the generated file name, e.g., "foo.thumb.jpg".
func ImageFile(infile string) (string, error)
~~~
下面的程序會循環迭代一些圖片文件名,并為每一張圖片生成一個縮略圖:
*gopl.io/ch8/thumbnail*
~~~
// makeThumbnails makes thumbnails of the specified files.
func makeThumbnails(filenames []string) {
for _, f := range filenames {
if _, err := thumbnail.ImageFile(f); err != nil {
log.Println(err)
}
}
}
~~~
顯然我們處理文件的順序無關緊要,因為每一個圖片的拉伸操作和其它圖片的處理操作都是彼此獨立的。像這種子問題都是完全彼此獨立的問題被叫做易并行問題(譯注:embarrassingly parallel,直譯的話更像是尷尬并行)。易并行問題是最容易被實現成并行的一類問題(廢話),并且是最能夠享受并發帶來的好處,能夠隨著并行的規模線性地擴展。
下面讓我們并行地執行這些操作,從而將文件IO的延遲隱藏掉,并用上多核cpu的計算能力來拉伸圖像。我們的第一個并發程序只是使用了一個go關鍵字。這里我們先忽略掉錯誤,之后再進行處理。
~~~
// NOTE: incorrect!
func makeThumbnails2(filenames []string) {
for _, f := range filenames {
go thumbnail.ImageFile(f) // NOTE: ignoring errors
}
}
~~~
這個版本運行的實在有點太快,實際上,由于它比最早的版本使用的時間要短得多,即使當文件名的slice中只包含有一個元素。這就有點奇怪了,如果程序沒有并發執行的話,那為什么一個并發的版本還是要快呢?答案其實是makeThumbnails在它還沒有完成工作之前就已經返回了。它啟動了所有的goroutine,沒一個文件名對應一個,但沒有等待它們一直到執行完畢。
沒有什么直接的辦法能夠等待goroutine完成,但是我們可以改變goroutine里的代碼讓其能夠將完成情況報告給外部的goroutine知曉,使用的方式是向一個共享的channel中發送事件。因為我們已經知道內部的goroutine只有len(filenames),所以外部的goroutine只需要在返回之前對這些事件計數。
~~~
// makeThumbnails3 makes thumbnails of the specified files in parallel.
func makeThumbnails3(filenames []string) {
ch := make(chan struct{})
for _, f := range filenames {
go func(f string) {
thumbnail.ImageFile(f) // NOTE: ignoring errors
ch <- struct{}{}
}(f)
}
// Wait for goroutines to complete.
for range filenames {
<-ch
}
}
~~~
注意我們將f的值作為一個顯式的變量傳給了函數,而不是在循環的閉包中聲明:
~~~
for _, f := range filenames {
go func() {
thumbnail.ImageFile(f) // NOTE: incorrect!
// ...
}()
}
~~~
回憶一下之前在5.6.1節中,匿名函數中的循環變量快照問題。上面這個單獨的變量f是被所有的匿名函數值所共享,且會被連續的循環迭代所更新的。當新的goroutine開始執行字面函數時,for循環可能已經更新了f并且開始了另一輪的迭代或者(更有可能的)已經結束了整個循環,所以當這些goroutine開始讀取f的值時,它們所看到的值已經是slice的最后一個元素了。顯式地添加這個參數,我們能夠確保使用的f是當go語句執行時的“當前”那個f。
如果我們想要從每一個worker goroutine往主goroutine中返回值時該怎么辦呢?當我們調用thumbnail.ImageFile創建文件失敗的時候,它會返回一個錯誤。下一個版本的makeThumbnails會返回其在做拉伸操作時接收到的第一個錯誤:
~~~
// makeThumbnails4 makes thumbnails for the specified files in parallel.
// It returns an error if any step failed.
func makeThumbnails4(filenames []string) error {
errors := make(chan error)
for _, f := range filenames {
go func(f string) {
_, err := thumbnail.ImageFile(f)
errors <- err
}(f)
}
for range filenames {
if err := <-errors; err != nil {
return err // NOTE: incorrect: goroutine leak!
}
}
return nil
}
~~~
這個程序有一個微秒的bug。當它遇到第一個非nil的error時會直接將error返回到調用方,使得沒有一個goroutine去排空errors channel。這樣剩下的worker goroutine在向這個channel中發送值時,都會永遠地阻塞下去,并且永遠都不會退出。這種情況叫做goroutine泄露(§8.4.4),可能會導致整個程序卡住或者跑出out of memory的錯誤。
最簡單的解決辦法就是用一個具有合適大小的buffered channel,這樣這些worker goroutine向channel中發送測向時就不會被阻塞。(一個可選的解決辦法是創建一個另外的goroutine,當main goroutine返回第一個錯誤的同時去排空channel)
下一個版本的makeThumbnails使用了一個buffered channel來返回生成的圖片文件的名字,附帶生成時的錯誤。
~~~
// makeThumbnails5 makes thumbnails for the specified files in parallel.
// It returns the generated file names in an arbitrary order,
// or an error if any step failed.
func makeThumbnails5(filenames []string) (thumbfiles []string, err error) {
type item struct {
thumbfile string
err error
}
ch := make(chan item, len(filenames))
for _, f := range filenames {
go func(f string) {
var it item
it.thumbfile, it.err = thumbnail.ImageFile(f)
ch <- it
}(f)
}
for range filenames {
it := <-ch
if it.err != nil {
return nil, it.err
}
thumbfiles = append(thumbfiles, it.thumbfile)
}
return thumbfiles, nil
}
~~~
我們最后一個版本的makeThumbnails返回了新文件們的大小總計數(bytes)。和前面的版本都不一樣的一點是我們在這個版本里沒有把文件名放在slice里,而是通過一個string的channel傳過來,所以我們無法對循環的次數進行預測。
為了知道最后一個goroutine什么時候結束(最后一個結束并不一定是最后一個開始),我們需要一個遞增的計數器,在每一個goroutine啟動時加一,在goroutine退出時減一。這需要一種特殊的計數器,這個計數器需要在多個goroutine操作時做到安全并且提供提供在其減為零之前一直等待的一種方法。這種計數類型被稱為sync.WaitGroup,下面的代碼就用到了這種方法:
~~~
// makeThumbnails6 makes thumbnails for each file received from the channel.
// It returns the number of bytes occupied by the files it creates.
func makeThumbnails6(filenames <-chan string) int64 {
sizes := make(chan int64)
var wg sync.WaitGroup // number of working goroutines
for f := range filenames {
wg.Add(1)
// worker
go func(f string) {
defer wg.Done()
thumb, err := thumbnail.ImageFile(f)
if err != nil {
log.Println(err)
return
}
info, _ := os.Stat(thumb) // OK to ignore error
sizes <- info.Size()
}(f)
}
// closer
go func() {
wg.Wait()
close(sizes)
}()
var total int64
for size := range sizes {
total += size
}
return total
}
~~~
注意Add和Done方法的不對策。Add是為計數器加一,必須在worker goroutine開始之前調用,而不是在goroutine中;否則的話我們沒辦法確定Add是在"closer" goroutine調用Wait之前被調用。并且Add還有一個參數,但Done卻沒有任何參數;其實它和Add(-1)是等價的。我們使用defer來確保計數器即使是在出錯的情況下依然能夠正確地被減掉。上面的程序代碼結構是當我們使用并發循環,但又不知道迭代次數時很通常而且很地道的寫法。
sizes channel攜帶了每一個文件的大小到main goroutine,在main goroutine中使用了range loop來計算總和。觀察一下我們是怎樣創建一個closer goroutine,并讓其等待worker們在關閉掉sizes channel之前退出的。兩步操作:wait和close,必須是基于sizes的循環的并發。考慮一下另一種方案:如果等待操作被放在了main goroutine中,在循環之前,這樣的話就永遠都不會結束了,如果在循環之后,那么又變成了不可達的部分,因為沒有任何東西去關閉這個channel,這個循環就永遠都不會終止。
圖8.5 表明了makethumbnails6函數中事件的序列。縱列表示goroutine。窄線段代表sleep,粗線段代表活動。斜線箭頭代表用來同步兩個goroutine的事件。時間向下流動。注意main goroutine是如何大部分的時間被喚醒執行其range循環,等待worker發送值或者closer來關閉channel的。

**練習 8.4:** 修改reverb2服務器,在每一個連接中使用sync.WaitGroup來計數活躍的echo goroutine。當計數減為零時,關閉TCP連接的寫入,像練習8.3中一樣。驗證一下你的修改版netcat3客戶端會一直等待所有的并發“喊叫”完成,即使是在標準輸入流已經關閉的情況下。
**練習 8.5:** 使用一個已有的CPU綁定的順序程序,比如在3.3節中我們寫的Mandelbrot程序或者3.2節中的3-D surface計算程序,并將他們的主循環改為并發形式,使用channel來進行通信。在多核計算機上這個程序得到了多少速度上的改進?使用多少個goroutine是最合適的呢?
### 8.6. 示例: 并發的Web爬蟲
在5.6節中,我們做了一個簡單的web爬蟲,用bfs(廣度優先)算法來抓取整個網站。在本節中,我們會讓這個這個爬蟲并行化,這樣每一個彼此獨立的抓取命令可以并行進行IO,最大化利用網絡資源。crawl函數和gopl.io/ch5/findlinks3中的是一樣的。
*gopl.io/ch8/crawl1*
~~~
func crawl(url string) []string {
fmt.Println(url)
list, err := links.Extract(url)
if err != nil {
log.Print(err)
}
return list
}
~~~
主函數和5.6節中的breadthFirst(深度優先)類似。像之前一樣,一個worklist是一個記錄了需要處理的元素的隊列,每一個元素都是一個需要抓取的URL列表,不過這一次我們用channel代替slice來做這個隊列。每一個對crawl的調用都會在他們自己的goroutine中進行并且會把他們抓到的鏈接發送回worklist。
~~~
func main() {
worklist := make(chan []string)
// Start with the command-line arguments.
go func() { worklist <- os.Args[1:] }()
// Crawl the web concurrently.
seen := make(map[string]bool)
for list := range worklist {
for _, link := range list {
if !seen[link] {
seen[link] = true
go func(link string) {
worklist <- crawl(link)
}(link)
}
}
}
}
~~~
注意這里的crawl所在的goroutine會將link作為一個顯式的參數傳入,來避免“循環變量快照”的問題(在5.6.1中有講解)。另外注意這里將命令行參數傳入worklist也是在一個另外的goroutine中進行的,這是為了避免在main goroutine和crawler goroutine中同時向另一個goroutine通過channel發送內容時發生死鎖(因為另一邊的接收操作還沒有準備好)。當然,這里我們也可以用buffered channel來解決問題,這里不再贅述。
現在爬蟲可以高并發地運行起來,并且可以產生一大坨的URL了,不過還是會有倆問題。一個問題是在運行一段時間后可能會出現在log的錯誤信息里的:
~~~
$ go build gopl.io/ch8/crawl1
$ ./crawl1 http://gopl.io/
http://gopl.io/
https://golang.org/help/
https://golang.org/doc/
https://golang.org/blog/
...
2015/07/15 18:22:12 Get ...: dial tcp: lookup blog.golang.org: no such host
2015/07/15 18:22:12 Get ...: dial tcp 23.21.222.120:443: socket: too many open files
...
~~~
最初的錯誤信息是一個讓人莫名的DNS查找失敗,即使這個域名是完全可靠的。而隨后的錯誤信息揭示了原因:這個程序一次性創建了太多網絡連接,超過了每一個進程的打開文件數限制,既而導致了在調用net.Dial像DNS查找失敗這樣的問題。
這個程序實在是太他媽并行了。無窮無盡地并行化并不是什么好事情,因為不管怎么說,你的系統總是會有一個些限制因素,比如CPU核心數會限制你的計算負載,比如你的硬盤轉軸和磁頭數限制了你的本地磁盤IO操作頻率,比如你的網絡帶寬限制了你的下載速度上限,或者是你的一個web服務的服務容量上限等等。為了解決這個問題,我們可以限制并發程序所使用的資源來使之適應自己的運行環境。對于我們的例子來說,最簡單的方法就是限制對links.Extract在同一時間最多不會有超過n次調用,這里的n是fd的limit-20,一般情況下。這個一個夜店里限制客人數目是一個道理,只有當有客人離開時,才會允許新的客人進入店內(譯注:作者你個老流氓)。
我們可以用一個有容量限制的buffered channel來控制并發,這類似于操作系統里的計數信號量概念。從概念上講,channel里的n個空槽代表n個可以處理內容的token(通行證),從channel里接收一個值會釋放其中的一個token,并且生成一個新的空槽位。這樣保證了在沒有接收介入時最多有n個發送操作。(這里可能我們拿channel里填充的槽來做token更直觀一些,不過還是這樣吧~)。由于channel里的元素類型并不重要,我們用一個零值的struct{}來作為其元素。
讓我們重寫crawl函數,將對links.Extract的調用操作用獲取、釋放token的操作包裹起來,來確保同一時間對其只有20個調用。信號量數量和其能操作的IO資源數量應保持接近。
*gopl.io/ch8/crawl2*
~~~
// tokens is a counting semaphore used to
// enforce a limit of 20 concurrent requests.
var tokens = make(chan struct{}, 20)
func crawl(url string) []string {
fmt.Println(url)
tokens <- struct{}{} // acquire a token
list, err := links.Extract(url)
<-tokens // release the token
if err != nil {
log.Print(err)
}
return list
}
~~~
第二個問題是這個程序永遠都不會終止,即使它已經爬到了所有初始鏈接衍生出的鏈接。(當然,除非你慎重地選擇了合適的初始化URL或者已經實現了練習8.6中的深度限制,你應該還沒有意識到這個問題)。為了使這個程序能夠終止,我們需要在worklist為空或者沒有crawl的goroutine在運行時退出主循環。
~~~
func main() {
worklist := make(chan []string)
var n int // number of pending sends to worklist
// Start with the command-line arguments.
n++
go func() { worklist <- os.Args[1:] }()
// Crawl the web concurrently.
seen := make(map[string]bool)
for ; n > 0; n-- {
list := <-worklist
for _, link := range list {
if !seen[link] {
seen[link] = true
n++
go func(link string) {
worklist <- crawl(link)
}(link)
}
}
}
}
~~~
這個版本中,計算器n對worklist的發送操作數量進行了限制。每一次我們發現有元素需要被發送到worklist時,我們都會對n進行++操作,在向worklist中發送初始的命令行參數之前,我們也進行過一次++操作。這里的操作++是在每啟動一個crawler的goroutine之前。主循環會在n減為0時終止,這時候說明沒活可干了。
現在這個并發爬蟲會比5.6節中的深度優先搜索版快上20倍,而且不會出什么錯,并且在其完成任務時也會正確地終止。
下面的程序是避免過度并發的另一種思路。這個版本使用了原來的crawl函數,但沒有使用計數信號量,取而代之用了20個長活的crawler goroutine,這樣來保證最多20個HTTP請求在并發。
~~~
func main() {
worklist := make(chan []string) // lists of URLs, may have duplicates
unseenLinks := make(chan string) // de-duplicated URLs
// Add command-line arguments to worklist.
go func() { worklist <- os.Args[1:] }()
// Create 20 crawler goroutines to fetch each unseen link.
for i := 0; i < 20; i++ {
go func() {
for link := range unseenLinks {
foundLinks := crawl(link)
go func() { worklist <- foundLinks }()
}
}()
}
// The main goroutine de-duplicates worklist items
// and sends the unseen ones to the crawlers.
seen := make(map[string]bool)
for list := range worklist {
for _, link := range list {
if !seen[link] {
seen[link] = true
unseenLinks <- link
}
}
}
}
~~~
所有的爬蟲goroutine現在都是被同一個channel-unseenLinks喂飽的了。主goroutine負責拆分它從worklist里拿到的元素,然后把沒有抓過的經由unseenLinks channel發送給一個爬蟲的goroutine。
seen這個map被限定在main goroutine中;也就是說這個map只能在main goroutine中進行訪問。類似于其它的信息隱藏方式,這樣的約束可以讓我們從一定程度上保證程序的正確性。例如,內部變量不能夠在函數外部被訪問到;變量(§2.3.4)在沒有被轉義的情況下是無法在函數外部訪問的;一個對象的封裝字段無法被該對象的方法以外的方法訪問到。在所有的情況下,信息隱藏都可以幫助我們約束我們的程序,使其不發生意料之外的情況。
crawl函數爬到的鏈接在一個專有的goroutine中被發送到worklist中來避免死鎖。為了節省空間,這個例子的終止問題我們先不進行詳細闡述了。
**練習 8.6:** 為并發爬蟲增加深度限制。也就是說,如果用戶設置了depth=3,那么只有從首頁跳轉三次以內能夠跳到的頁面才能被抓取到。
**練習 8.7:** 完成一個并發程序來創建一個線上網站的本地鏡像,把該站點的所有可達的頁面都抓取到本地硬盤。為了省事,我們這里可以只取出現在該域下的所有頁面(比如golang.org結尾,譯注:外鏈的應該就不算了。)當然了,出現在頁面里的鏈接你也需要進行一些處理,使其能夠在你的鏡像站點上進行跳轉,而不是指向原始的鏈接。
**譯注:** 拓展閱讀 [Handling 1 Million Requests per Minute with Go](http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/)。
### 8.7. 基于select的多路復用
下面的程序會進行火箭發射的倒計時。time.Tick函數返回一個channel,程序會周期性地像一個節拍器一樣向這個channel發送事件。每一個事件的值是一個時間戳,不過更有意思的是其傳送方式。
*gopl.io/ch8/countdown1*
~~~
func main() {
fmt.Println("Commencing countdown.")
tick := time.Tick(1 * time.Second)
for countdown := 10; countdown > 0; countdown-- {
fmt.Println(countdown)
j<-tick
}
launch()
}
~~~
現在我們讓這個程序支持在倒計時中,用戶按下return鍵時直接中斷發射流程。首先,我們啟動一個goroutine,這個goroutine會嘗試從標準輸入中調入一個單獨的byte并且,如果成功了,會向名為abort的channel發送一個值。
*gopl.io/ch8/countdown2*
~~~
abort := make(chan struct{})
go func() {
os.Stdin.Read(make([]byte, 1)) // read a single byte
abort <- struct{}{}
}()
~~~
現在每一次計數循環的迭代都需要等待兩個channel中的其中一個返回事件了:ticker channel當一切正常時(就像NASA jorgon的"nominal",譯注:這梗估計我們是不懂了)或者異常時返回的abort事件。我們無法做到從每一個channel中接收信息,如果我們這么做的話,如果第一個channel中沒有事件發過來那么程序就會立刻被阻塞,這樣我們就無法收到第二個channel中發過來的事件。這時候我們需要多路復用(multiplex)這些操作了,為了能夠多路復用,我們使用了select語句。
~~~
select {
case <-ch1:
// ...
case x := <-ch2:
// ...use x...
case ch3 <- y:
// ...
default:
// ...
}
~~~
上面是select語句的一般形式。和switch語句稍微有點相似,也會有幾個case和最后的default選擇支。每一個case代表一個通信操作(在某個channel上進行發送或者接收)并且會包含一些語句組成的一個語句塊。一個接收表達式可能只包含接收表達式自身(譯注:不把接收到的值賦值給變量什么的),就像上面的第一個case,或者包含在一個簡短的變量聲明中,像第二個case里一樣;第二種形式讓你能夠引用接收到的值。
select會等待case中有能夠執行的case時去執行。當條件滿足時,select才會去通信并執行case之后的語句;這時候其它通信是不會執行的。一個沒有任何case的select語句寫作select{},會永遠地等待下去。
讓我們回到我們的火箭發射程序。time.After函數會立即返回一個channel,并起一個新的goroutine在經過特定的時間后向該channel發送一個獨立的值。下面的select語句會會一直等待到兩個事件中的一個到達,無論是abort事件或者一個10秒經過的事件。如果10秒經過了還沒有abort事件進入,那么火箭就會發射。
~~~
func main() {
// ...create abort channel...
fmt.Println("Commencing countdown. Press return to abort.")
select {
case <-time.After(10 * time.Second):
// Do nothing.
case <-abort:
fmt.Println("Launch aborted!")
return
}
launch()
}
~~~
下面這個例子更微秒。ch這個channel的buffer大小是1,所以會交替的為空或為滿,所以只有一個case可以進行下去,無論i是奇數或者偶數,它都會打印0 2 4 6 8。
~~~
ch := make(chan int, 1)
for i := 0; i < 10; i++ {
select {
case x := <-ch:
fmt.Println(x) // "0" "2" "4" "6" "8"
case ch <- i:
}
}
~~~
如果多個case同時就緒時,select會隨機地選擇一個執行,這樣來保證每一個channel都有平等的被select的機會。增加前一個例子的buffer大小會使其輸出變得不確定,因為當buffer既不為滿也不為空時,select語句的執行情況就像是拋硬幣的行為一樣是隨機的。
下面讓我們的發射程序打印倒計時。這里的select語句會使每次循環迭代等待一秒來執行退出操作。
*gopl.io/ch8/countdown3*
~~~
func main() {
// ...create abort channel...
fmt.Println("Commencing countdown. Press return to abort.")
tick := time.Tick(1 * time.Second)
for countdown := 10; countdown > 0; countdown-- {
fmt.Println(countdown)
select {
case <-tick:
// Do nothing.
case <-abort:
fmt.Println("Launch aborted!")
return
}
}
launch()
}
~~~
time.Tick函數表現得好像它創建了一個在循環中調用time.Sleep的goroutine,每次被喚醒時發送一個事件。當countdown函數返回時,它會停止從tick中接收事件,但是ticker這個goroutine還依然存活,繼續徒勞地嘗試從channel中發送值,然而這時候已經沒有其它的goroutine會從該channel中接收值了--這被稱為goroutine泄露(§8.4.4)。
Tick函數挺方便,但是只有當程序整個生命周期都需要這個時間時我們使用它才比較合適。否則的話,我們應該使用下面的這種模式:
~~~
ticker := time.NewTicker(1 * time.Second)
<-ticker.C // receive from the ticker's channel
ticker.Stop() // cause the ticker's goroutine to terminate
~~~
有時候我們希望能夠從channel中發送或者接收值,并避免因為發送或者接收導致的阻塞,尤其是當channel沒有準備好寫或者讀時。select語句就可以實現這樣的功能。select會有一個default來設置當其它的操作都不能夠馬上被處理時程序需要執行哪些邏輯。
下面的select語句會在abort channel中有值時,從其中接收值;無值時什么都不做。這是一個非阻塞的接收操作;反復地做這樣的操作叫做“輪詢channel”。
~~~
select {
case <-abort:
fmt.Printf("Launch aborted!\n")
return
default:
// do nothing
}
~~~
channel的零值是nil。也許會讓你覺得比較奇怪,nil的channel有時候也是有一些用處的。因為對一個nil的channel發送和接收操作會永遠阻塞,在select語句中操作nil的channel永遠都不會被select到。
這使得我們可以用nil來激活或者禁用case,來達成處理其它輸入或輸出事件時超時和取消的邏輯。我們會在下一節中看到一個例子。
**練習 8.8:** 使用select來改造8.3節中的echo服務器,為其增加超時,這樣服務器可以在客戶端10秒中沒有任何喊話時自動斷開連接。
### 8.8. 示例: 并發的字典遍歷
在本小節中,我們會創建一個程序來生成指定目錄的硬盤使用情況報告,這個程序和Unix里的du工具比較相似。大多數工作用下面這個walkDir函數來完成,這個函數使用dirents函數來枚舉一個目錄下的所有入口。
*gopl.io/ch8/du1*
~~~
// walkDir recursively walks the file tree rooted at dir
// and sends the size of each found file on fileSizes.
func walkDir(dir string, fileSizes chan<- int64) {
for _, entry := range dirents(dir) {
if entry.IsDir() {
subdir := filepath.Join(dir, entry.Name())
walkDir(subdir, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
// dirents returns the entries of directory dir.
func dirents(dir string) []os.FileInfo {
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du1: %v\n", err)
return nil
}
return entries
}
~~~
ioutil.ReadDir函數會返回一個os.FileInfo類型的slice,os.FileInfo類型也是os.Stat這個函數的返回值。對每一個子目錄而言,walkDir會遞歸地調用其自身,并且會對每一個文件也遞歸調用。walkDir函數會向fileSizes這個channel發送一條消息。這條消息包含了文件的字節大小。
下面的主函數,用了兩個goroutine。后臺的goroutine調用walkDir來遍歷命令行給出的每一個路徑并最終關閉fileSizes這個channel。主goroutine會對其從channel中接收到的文件大小進行累加,并輸出其和。
~~~
package main
import (
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
)
func main() {
// Determine the initial directories.
flag.Parse()
roots := flag.Args()
if len(roots) == 0 {
roots = []string{"."}
}
// Traverse the file tree.
fileSizes := make(chan int64)
go func() {
for _, root := range roots {
walkDir(root, fileSizes)
}
close(fileSizes)
}()
// Print the results.
var nfiles, nbytes int64
for size := range fileSizes {
nfiles++
nbytes += size
}
printDiskUsage(nfiles, nbytes)
}
func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}
~~~
這個程序會在打印其結果之前卡住很長時間。
~~~
$ go build gopl.io/ch8/du1
$ ./du1 $HOME /usr /bin /etc
213201 files 62.7 GB
~~~
如果在運行的時候能夠讓我們知道處理進度的話想必更好。但是,如果簡單地把printDiskUsage函數調用移動到循環里會導致其打印出成百上千的輸出。
下面這個du的變種會間歇打印內容,不過只有在調用時提供了-v的flag才會顯示程序進度信息。在roots目錄上循環的后臺goroutine在這里保持不變。主goroutine現在使用了計時器來每500ms生成事件,然后用select語句來等待文件大小的消息來更新總大小數據,或者一個計時器的事件來打印當前的總大小數據。如果-v的flag在運行時沒有傳入的話,tick這個channel會保持為nil,這樣在select里的case也就相當于被禁用了。
*gopl.io/ch8/du2*
~~~
var verbose = flag.Bool("v", false, "show verbose progress messages")
func main() {
// ...start background goroutine...
// Print the results periodically.
var tick <-chan time.Time
if *verbose {
tick = time.Tick(500 * time.Millisecond)
}
var nfiles, nbytes int64
loop:
for {
select {
case size, ok := <-fileSizes:
if !ok {
break loop // fileSizes was closed
}
nfiles++
nbytes += size
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}
printDiskUsage(nfiles, nbytes) // final totals
}
~~~
由于我們的程序不再使用range循環,第一個select的case必須顯式地判斷fileSizes的channel是不是已經被關閉了,這里可以用到channel接收的二值形式。如果channel已經被關閉了的話,程序會直接退出循環。這里的break語句用到了標簽break,這樣可以同時終結select和for兩個循環;如果沒有用標簽就break的話只會退出內層的select循環,而外層的for循環會使之進入下一輪select循環。
現在程序會悠閑地為我們打印更新流:
~~~
$ go build gopl.io/ch8/du2
$ ./du2 -v $HOME /usr /bin /etc
28608 files 8.3 GB
54147 files 10.3 GB
93591 files 15.1 GB
127169 files 52.9 GB
175931 files 62.2 GB
213201 files 62.7 GB
~~~
然而這個程序還是會花上很長時間才會結束。無法對walkDir做并行化處理沒什么別的原因,無非是因為磁盤系統并行限制。下面這個第三個版本的du,會對每一個walkDir的調用創建一個新的goroutine。它使用sync.WaitGroup (§8.5)來對仍舊活躍的walkDir調用進行計數,另一個goroutine會在計數器減為零的時候將fileSizes這個channel關閉。
*gopl.io/ch8/du3*
~~~
func main() {
// ...determine roots...
// Traverse each root of the file tree in parallel.
fileSizes := make(chan int64)
var n sync.WaitGroup
for _, root := range roots {
n.Add(1)
go walkDir(root, &n, fileSizes)
}
go func() {
n.Wait()
close(fileSizes)
}()
// ...select loop...
}
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
defer n.Done()
for _, entry := range dirents(dir) {
if entry.IsDir() {
n.Add(1)
subdir := filepath.Join(dir, entry.Name())
go walkDir(subdir, n, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
~~~
由于這個程序在高峰期會創建成百上千的goroutine,我們需要修改dirents函數,用計數信號量來阻止他同時打開太多的文件,就像我們在8.7節中的并發爬蟲一樣:
~~~
// sema is a counting semaphore for limiting concurrency in dirents.
var sema = make(chan struct{}, 20)
// dirents returns the entries of directory dir.
func dirents(dir string) []os.FileInfo {
sema <- struct{}{} // acquire token
defer func() { <-sema }() // release token
// ...
~~~
這個版本比之前那個快了好幾倍,盡管其具體效率還是和你的運行環境,機器配置相關。
**練習 8.9:** 編寫一個du工具,每隔一段時間將root目錄下的目錄大小計算并顯示出來。
### 8.9. 并發的退出
有時候我們需要通知goroutine停止它正在干的事情,比如一個正在執行計算的web服務,然而它的客戶端已經斷開了和服務端的連接。
Go語言并沒有提供在一個goroutine中終止另一個goroutine的方法,由于這樣會導致goroutine之間的共享變量落在未定義的狀態上。在8.7節中的rocket launch程序中,我們往名字叫abort的channel里發送了一個簡單的值,在countdown的goroutine中會把這個值理解為自己的退出信號。但是如果我們想要退出兩個或者任意多個goroutine怎么辦呢?
一種可能的手段是向abort的channel里發送和goroutine數目一樣多的事件來退出它們。如果這些goroutine中已經有一些自己退出了,那么會導致我們的channel里的事件數比goroutine還多,這樣導致我們的發送直接被阻塞。另一方面,如果這些goroutine又生成了其它的goroutine,我們的channel里的數目又太少了,所以有些goroutine可能會無法接收到退出消息。一般情況下我們是很難知道在某一個時刻具體有多少個goroutine在運行著的。另外,當一個goroutine從abort channel中接收到一個值的時候,他會消費掉這個值,這樣其它的goroutine就沒法看到這條信息。為了能夠達到我們退出goroutine的目的,我們需要更靠譜的策略,來通過一個channel把消息廣播出去,這樣goroutine們能夠看到這條事件消息,并且在事件完成之后,可以知道這件事已經發生過了。
回憶一下我們關閉了一個channel并且被消費掉了所有已發送的值,操作channel之后的代碼可以立即被執行,并且會產生零值。我們可以將這個機制擴展一下,來作為我們的廣播機制:不要向channel發送值,而是用關閉一個channel來進行廣播。
只要一些小修改,我們就可以把退出邏輯加入到前一節的du程序。首先,我們創建一個退出的channel,這個channel不會向其中發送任何值,但其所在的閉包內要寫明程序需要退出。我們同時還定義了一個工具函數,cancelled,這個函數在被調用的時候會輪詢退出狀態。
*gopl.io/ch8/du4*
~~~
var done = make(chan struct{})
func cancelled() bool {
select {
case <-done:
return true
default:
return false
}
}
~~~
下面我們創建一個從標準輸入流中讀取內容的goroutine,這是一個比較典型的連接到終端的程序。每當有輸入被讀到(比如用戶按了回車鍵),這個goroutine就會把取消消息通過關閉done的channel廣播出去。
~~~
// Cancel traversal when input is detected.
go func() {
os.Stdin.Read(make([]byte, 1)) // read a single byte
close(done)
}()
~~~
現在我們需要使我們的goroutine來對取消進行響應。在main goroutine中,我們添加了select的第三個case語句,嘗試從done channel中接收內容。如果這個case被滿足的話,在select到的時候即會返回,但在結束之前我們需要把fileSizes channel中的內容“排”空,在channel被關閉之前,舍棄掉所有值。這樣可以保證對walkDir的調用不要被向fileSizes發送信息阻塞住,可以正確地完成。
~~~
for {
select {
case <-done:
// Drain fileSizes to allow existing goroutines to finish.
for range fileSizes {
// Do nothing.
}
return
case size, ok := <-fileSizes:
// ...
}
}
~~~
walkDir這個goroutine一啟動就會輪詢取消狀態,如果取消狀態被設置的話會直接返回,并且不做額外的事情。這樣我們將所有在取消事件之后創建的goroutine改變為無操作。
~~~
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
defer n.Done()
if cancelled() {
return
}
for _, entry := range dirents(dir) {
// ...
}
}
~~~
在walkDir函數的循環中我們對取消狀態進行輪詢可以帶來明顯的益處,可以避免在取消事件發生時還去創建goroutine。取消本身是有一些代價的;想要快速的響應需要對程序邏輯進行侵入式的修改。確保在取消發生之后不要有代價太大的操作可能會需要修改你代碼里的很多地方,但是在一些重要的地方去檢查取消事件也確實能帶來很大的好處。
對這個程序的一個簡單的性能分析可以揭示瓶頸在dirents函數中獲取一個信號量。下面的select可以讓這種操作可以被取消,并且可以將取消時的延遲從幾百毫秒降低到幾十毫秒。
~~~
func dirents(dir string) []os.FileInfo {
select {
case sema <- struct{}{}: // acquire token
case <-done:
return nil // cancelled
}
defer func() { <-sema }() // release token
// ...read directory...
}
~~~
現在當取消發生時,所有后臺的goroutine都會迅速停止并且主函數會返回。當然,當主函數返回時,一個程序會退出,而我們又無法在主函數退出的時候確認其已經釋放了所有的資源(譯注:因為程序都退出了,你的代碼都沒法執行了)。這里有一個方便的竅門我們可以一用:取代掉直接從主函數返回,我們調用一個panic,然后runtime會把每一個goroutine的棧dump下來。如果main goroutine是唯一一個剩下的goroutine的話,他會清理掉自己的一切資源。但是如果還有其它的goroutine沒有退出,他們可能沒辦法被正確地取消掉,也有可能被取消但是取消操作會很花時間;所以這里的一個調研還是很有必要的。我們用panic來獲取到足夠的信息來驗證我們上面的判斷,看看最終到底是什么樣的情況。
**練習 8.10:** HTTP請求可能會因http.Request結構體中Cancel channel的關閉而取消。修改8.6節中的web crawler來支持取消http請求。(提示:http.Get并沒有提供方便地定制一個請求的方法。你可以用http.NewRequest來取而代之,設置它的Cancel字段,然后用http.DefaultClient.Do(req)來進行這個http請求。)
**練習 8.11:** 緊接著8.4.4中的mirroredQuery流程,實現一個并發請求url的fetch的變種。當第一個請求返回時,直接取消其它的請求。
### 8.10. 示例: 聊天服務
我們用一個聊天服務器來終結本章節的內容,這個程序可以讓一些用戶通過服務器向其它所有用戶廣播文本消息。這個程序中有四種goroutine。main和broadcaster各自是一個goroutine實例,每一個客戶端的連接都會有一個handleConn和clientWriter的goroutine。broadcaster是select用法的不錯的樣例,因為它需要處理三種不同類型的消息。
下面演示的main goroutine的工作,是listen和accept(譯注:網絡編程里的概念)從客戶端過來的連接。對每一個連接,程序都會建立一個新的handleConn的goroutine,就像我們在本章開頭的并發的echo服務器里所做的那樣。
*gopl.io/ch8/chat*
~~~
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
go broadcaster()
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err)
continue
}
go handleConn(conn)
}
}
~~~
然后是broadcaster的goroutine。他的內部變量clients會記錄當前建立連接的客戶端集合。其記錄的內容是每一個客戶端的消息發出channel的"資格"信息。
~~~
type client chan<- string // an outgoing message channel
var (
entering = make(chan client)
leaving = make(chan client)
messages = make(chan string) // all incoming client messages
)
func broadcaster() {
clients := make(map[client]bool) // all connected clients
for {
select {
case msg := <-messages:
// Broadcast incoming message to all
// clients' outgoing message channels.
for cli := range clients {
cli <- msg
}
case cli := <-entering:
clients[cli] = true
case cli := <-leaving:
delete(clients, cli)
close(cli)
}
}
}
~~~
broadcaster監聽來自全局的entering和leaving的channel來獲知客戶端的到來和離開事件。當其接收到其中的一個事件時,會更新clients集合,當該事件是離開行為時,它會關閉客戶端的消息發出channel。broadcaster也會監聽全局的消息channel,所有的客戶端都會向這個channel中發送消息。當broadcaster接收到什么消息時,就會將其廣播至所有連接到服務端的客戶端。
現在讓我們看看每一個客戶端的goroutine。handleConn函數會為它的客戶端創建一個消息發出channel并通過entering channel來通知客戶端的到來。然后它會讀取客戶端發來的每一行文本,并通過全局的消息channel來將這些文本發送出去,并為每條消息帶上發送者的前綴來標明消息身份。當客戶端發送完畢后,handleConn會通過leaving這個channel來通知客戶端的離開并關閉連接。
~~~
func handleConn(conn net.Conn) {
ch := make(chan string) // outgoing client messages
go clientWriter(conn, ch)
who := conn.RemoteAddr().String()
ch <- "You are " + who
messages <- who + " has arrived"
entering <- ch
input := bufio.NewScanner(conn)
for input.Scan() {
messages <- who + ": " + input.Text()
}
// NOTE: ignoring potential errors from input.Err()
leaving <- ch
messages <- who + " has left"
conn.Close()
}
func clientWriter(conn net.Conn, ch <-chan string) {
for msg := range ch {
fmt.Fprintln(conn, msg) // NOTE: ignoring network errors
}
}
~~~
另外,handleConn為每一個客戶端創建了一個clientWriter的goroutine來接收向客戶端發出消息channel中發送的廣播消息,并將它們寫入到客戶端的網絡連接。客戶端的讀取方循環會在broadcaster接收到leaving通知并關閉了channel后終止。
下面演示的是當服務器有兩個活動的客戶端連接,并且在兩個窗口中運行的情況,使用netcat來聊天:
~~~
$ go build gopl.io/ch8/chat
$ go build gopl.io/ch8/netcat3
$ ./chat &
$ ./netcat3
You are 127.0.0.1:64208 $ ./netcat3
127.0.0.1:64211 has arrived You are 127.0.0.1:64211
Hi!
127.0.0.1:64208: Hi!
127.0.0.1:64208: Hi!
Hi yourself.
127.0.0.1:64211: Hi yourself. 127.0.0.1:64211: Hi yourself.
^C
127.0.0.1:64208 has left
$ ./netcat3
You are 127.0.0.1:64216 127.0.0.1:64216 has arrived
Welcome.
127.0.0.1:64211: Welcome. 127.0.0.1:64211: Welcome.
^C
127.0.0.1:64211 has left”
~~~
當與n個客戶端保持聊天session時,這個程序會有2n+2個并發的goroutine,然而這個程序卻并不需要顯式的鎖(§9.2)。clients這個map被限制在了一個獨立的goroutine中,broadcaster,所以它不能被并發地訪問。多個goroutine共享的變量只有這些channel和net.Conn的實例,兩個東西都是并發安全的。我們會在下一章中更多地解決約束,并發安全以及goroutine中共享變量的含義。
**練習 8.12:** 使broadcaster能夠將arrival事件通知當前所有的客戶端。為了達成這個目的,你需要有一個客戶端的集合,并且在entering和leaving的channel中記錄客戶端的名字。
**練習 8.13:** 使聊天服務器能夠斷開空閑的客戶端連接,比如最近五分鐘之后沒有發送任何消息的那些客戶端。提示:可以在其它goroutine中調用conn.Close()來解除Read調用,就像input.Scanner()所做的那樣。
**練習 8.14:** 修改聊天服務器的網絡協議這樣每一個客戶端就可以在entering時可以提供它們的名字。將消息前綴由之前的網絡地址改為這個名字。
**練習 8.15:** 如果一個客戶端沒有及時地讀取數據可能會導致所有的客戶端被阻塞。修改broadcaster來跳過一條消息,而不是等待這個客戶端一直到其準備好寫。或者為每一個客戶端的消息發出channel建立緩沖區,這樣大部分的消息便不會被丟掉;broadcaster應該用一個非阻塞的send向這個channel中發消息。
- Go語言程序設計
- Go語言圣經(中文版)
- 譯者序
- 前言
- 第一章 入門
- 第二章 程序結構
- 第三章 基礎數據類型
- 第四章 復合數據類型
- 第五章 函數
- 第六章 方法
- 第七章 接口
- 7.4. flag.Value接口
- 7.6. sort.Interface接口
- 7.7. http.Handler接口
- 第八章 Goroutines和Channels
- 第九章 基于共享變量的并發
- 9.2. sync.Mutex互斥鎖
- 第十章 包和工具
- 第十一章 測試
- 第十二章 反射
- 12.2. reflect.Type和reflect.Value
- 12.5. 通過reflect.Value修改值
- 第13章 底層編程
- 13.1. unsafe.Sizeof, Alignof 和 Offsetof
- 附錄