我早些時候承諾會演示一些可能廣泛使用的有趣的生成器。我們來看看一個名為repeat的生成器:
```
repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
for _, v := range values {
select {
case <-done:
return
case valueStream <- v:
}
}
}
}()
return valueStream
}
```
這個函數會重復你傳給它的值,直到你告訴它停止。 讓我們來看看另一個函數take,它在與repeat結合使用時很有用:
```
take := func(done <-chan interface{}, valueStream <-chan interface{}, num int, ) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}
```
這個函數會從其傳入的valueStream中取出第一個元素然后退出。二者組合起來會怎么樣呢?
```
done := make(chan interface{})
defer close(done)
for num := range take(done, repeat(done, 1), 10) {
fmt.Printf("%v ", num)
}
```
這會輸出:
```
1 1 1 1 1 1 1 1 1 1
```
在這個基本的例子中,我們創建了一個repeat生成器來生成無限數量的重復生成器,但是只取前10個。repeat生成器由take接收。雖然我們可以生成無線數量的流,但只會生成n+1個實例,其中n是我們傳入take的數量。
我們可以擴展這一點。讓我們創建另一個生成器,但是這次我們創建一個重復調用函數的生成器repeatFn:
```
repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
select {
case <-done:
return
case valueStream <- fn():
}
}
}()
return valueStream
}
```
我們用它來生成10個隨機數:
```
done := make(chan interface{})
defer close(done)
rand := func() interface{} {
return rand.Int()
}
for num := range take(done, repeatFn(done, rand), 10) {
fmt.Println(num)
}
```
這會輸出:
```
5577006791947779410
8674665223082153551
6129484611666145821
4037200794235010051
3916589616287113937
6334824724549167320
605394647632969758
1443635317331776148
894385949183117216
2775422040480279449
```
您可能想知道為什么所有這些發生器通道類型都是interface{}。
Go中的空接口有點爭議,但我認為處理interface的通道方便使用標準的管道模式。 正如我們前面所討論的,管道的強大來自可重用的階段。當階段以適合自身的特異性水平進行操作時,這是最好的。在repeat和repeatFn生成器中,我們需要關注的是通過在列表或運算符上循環來生成數據流。這些操作都不需要關于處理的類型,而只需要知道參數的類型。
當需要處理特定的類型時,可以放置一個執行類型斷言的階段。有一個額外的管道階段和類型斷言的性能開銷可以忽略不計,正如我們稍后會看到的。 以下是一個介紹toString管道階段的小例子:
```
toString := func(done <-chan interface{}, valueStream <-chan interface{}, ) <-chan string {
stringStream := make(chan string)
go func() {
defer close(stringStream)
for v := range valueStream {
select {
case <-done:
return
case stringStream <- v.(string):
}
}
}()
return stringStream
}
```
可以這樣使用它:
```
done := make(chan interface{})
defer close(done)
var message string
for token := range toString(done, take(done, repeat(done, "I", "am."), 5)) {
message += token
}
fmt.Printf("message: %s...", message)
```
這會輸出:
```
message: Iam.Iam.I...
```
現在讓我們證明剛才提到的性能問題。我們將編寫兩個基準測試函數:一個測試通用階段,一個測試類型特定階段:
```
func BenchmarkGeneric(b *testing.B) {
done := make(chan interface{})
defer close(done)
b.ResetTimer()
for range toString(done, take(done, repeat(done, "a"), b.N)) {
}
}
func BenchmarkTyped(b *testing.B) {
repeat := func(done <-chan interface{}, values ...string) <-chan string {
valueStream := make(chan string)
go func() {
defer close(valueStream)
for {
for _, v := range values {
select {
case <-done:
return
case valueStream <- v:
}
}
}
}()
return valueStream
}
take := func(done <-chan interface{}, valueStream <-chan string, num int, ) <-chan string {
takeStream := make(chan string)
go func() {
defer close(takeStream)
for i := num; i > 0 || i == -1; {
if i != -1 {
i--
}
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}
done := make(chan interface{})
defer close(done)
b.ResetTimer()
for range take(done, repeat(done, "a"), b.N) {
}
}
```
這會輸出:
| BenchmarkGeneric-4 | 1000000 | 2266 ns/op |
| --- | --- | --- |
| BenchmarkTyped-4 | 1000000 | 1181 ns/op |
| PASS ok | command-line-arguments | 3.486s |
可以看到,特定類型的速度是接口類型的2倍。一般來說,管道上的限制因素將是生成器,或者是密集計算的某個階段。如果生成器不像repeat和repeatFn生成器那樣從內存中創建流,則可能會受I/O限制。從磁盤或網絡讀取數據可能會超出此處顯示的性能開銷。
那么,如果真是在計算上存在性能瓶頸,我們該怎么辦?基于這種情況,讓我們來討論扇出扇入技術。
* * * * *
學識淺薄,錯誤在所難免。我是長風,歡迎來Golang中國的群(211938256)就本書提出修改意見。
- 前序
- 誰適合讀這本書
- 章節導讀
- 在線資源
- 第一章 并發編程介紹
- 摩爾定律,可伸縮網絡和我們所處的困境
- 為什么并發編程如此困難
- 數據競爭
- 原子性
- 內存訪問同步
- 死鎖,活鎖和鎖的饑餓問題
- 死鎖
- 活鎖
- 饑餓
- 并發安全性
- 優雅的面對復雜性
- 第二章 代碼建模:序列化交互處理
- 并發與并行
- 什么是CSP
- CSP在Go中的衍生物
- Go的并發哲學
- 第三章 Go的并發構建模塊
- Goroutines
- sync包
- WaitGroup
- Mutex和RWMutex
- Cond
- Once
- Pool
- Channels
- select語句
- GOMAXPROCS
- 結論
- 第四章 Go的并發編程范式
- 訪問范圍約束
- fo-select循環
- 防止Goroutine泄漏
- or-channel
- 錯誤處理
- 管道
- 構建管道的最佳實踐
- 便利的生成器
- 扇入扇出
- or-done-channel
- tee-channel
- bridge-channel
- 隊列
- context包
- 小結
- 第五章 可伸縮并發設計
- 錯誤傳遞
- 超時和取消
- 心跳
- 請求并發復制處理
- 速率限制
- Goroutines異常行為修復
- 本章小結
- 第六章 Goroutines和Go運行時
- 任務調度