# Go并行計算
如果說Go有什么讓人一見鐘情的特性,那大概就是并行計算了吧。
做個題目
>如果我們列出10以下所有能夠被3或者5整除的自然數,那么我們得到的是3,5,6和9。這四個數的和是23。
那么請計算1000以下(不包括1000)的所有能夠被3或者5整除的自然數的和。
這個題目的一個思路就是:
(1) 先計算1000以下所有能夠被3整除的整數的和A,
(2) 然后計算1000以下所有能夠被5整除的整數和B,
(3) 然后再計算1000以下所有能夠被3和5整除的整數和C,
(4) 使用A+B-C就得到了最后的結果。
按照上面的方法,傳統的方法當然就是一步一步計算,然后再到第(4)步匯總了。
但是一旦有了Go,我們就可以讓前面三個步驟并行計算,然后再在第(4)步匯總。
并行計算涉及到一個新的`數據類型chan`和一個新的`關鍵字go`。
先看例子:
package main
import (
"fmt"
"time"
)
func get_sum_of_divisible(num int, divider int, resultChan chan int) {
sum := 0
for value := 0; value < num; value++ {
if value%divider == 0 {
sum += value
}
}
resultChan <- sum
}
func main() {
LIMIT := 1000
resultChan := make(chan int, 3)
t_start := time.Now()
go get_sum_of_divisible(LIMIT, 3, resultChan)
go get_sum_of_divisible(LIMIT, 5, resultChan)
go get_sum_of_divisible(LIMIT, 15, resultChan)
sum3, sum5, sum15 := <-resultChan, <-resultChan, <-resultChan
sum := sum3 + sum5 - sum15
t_end := time.Now()
fmt.Println(sum)
fmt.Println(t_end.Sub(t_start))
}
(1) 在上面的例子中,我們首先定義了一個普通的函數get_sum_of_divisible,這個函數的`最后一個參數是一個整型chan類型`,這種類型,你可以把它當作一個先進先出的隊列。你可以`向它寫入數據`,也可以`從它讀出數據`。它`所能接受的數據類型`就是`由chan關鍵字后面的類型所決定`的。在上面的例子中,我們使用`<-`運算符將函數計算的結果寫入channel。channel是go提供的用來協程之間通信的方式。本例中main是一個協程,三個get_sum_of_divisible調用是協程。要在這四個協程間通信,必須有一種可靠的手段。
(2) 在main函數中,我們使用go關鍵字來開啟并行計算。并行計算是由goroutine來支持的,`goroutine`又叫做`協程`,你可以把它看作為比線程更輕量級的運算。開啟一個協程很簡單,就是`go關鍵字`后面`跟上所要運行的函數`。
(3) 最后,我們要從channel中取出并行計算的結果。使用`<-`運算符從channel里面取出數據。
在本例中,我們為了演示go并行計算的速度,還引進了time包來計算程序執行時間。在同普通的順序計算相比,并行計算的速度是非同凡響的。
好了,上面的例子看完,我們來詳細講解Go的并行計算。
**Goroutine協程**
所謂協程,就是Go提供的輕量級的獨立運算過程,比線程還輕。創建一個協程很簡單,就是go關鍵字加上所要運行的函數。看個例子:
package main
import (
"fmt"
)
func list_elem(n int) {
for i := 0; i < n; i++ {
fmt.Println(i)
}
}
func main() {
go list_elem(10)
}
上面的例子是創建一個協程遍歷一下元素。但是當你運行的時候,你會`發現什么都沒有輸出`!`為什么呢?`
因為上面的`main函數`在`創建完協程后`就`立刻退出`了,所以`協程`還`沒有來得及運行`呢!修改一下:
package main
import (
"fmt"
)
func list_elem(n int) {
for i := 0; i < n; i++ {
fmt.Println(i)
}
}
func main() {
go list_elem(10)
var input string
fmt.Scanln(&input)
}
這里,我們在main函數創建協程后,要求用戶輸入任何數據后才退出,這樣協程就有了運行的時間,故而輸出結果:
0
1
2
3
4
5
6
7
8
9
其實在開頭的例子里面,我們的main函數事實上也被阻塞了,因為`sum3, sum5, sum15 := <-resultChan, <-resultChan, <-resultChan`這行代碼在channel里面沒有數據或者數據個數不符的時候,都會阻塞在那里,直到協程結束,寫入結果。
不過既然是并行計算,我們還是得看看協程是否真的并行計算了。
package main
import (
"fmt"
"math/rand"
"time"
)
func list_elem(n int, tag string) {
for i := 0; i < n; i++ {
fmt.Println(tag, i)
tick := time.Duration(rand.Intn(100))
time.Sleep(time.Millisecond * tick)
}
}
func main() {
go list_elem(10, "go_a")
go list_elem(20, "go_b")
var input string
fmt.Scanln(&input)
}
輸出結果
go_a 0
go_b 0
go_a 1
go_b 1
go_a 2
go_b 2
go_b 3
go_b 4
go_a 3
go_b 5
go_b 6
go_a 4
go_a 5
go_b 7
go_a 6
go_a 7
go_b 8
go_b 9
go_a 8
go_b 10
go_b 11
go_a 9
go_b 12
go_b 13
go_b 14
go_b 15
go_b 16
go_b 17
go_b 18
go_b 19
在上面的例子中,我們讓兩個協程在每輸出一個數字的時候,隨機Sleep了一會兒。如果是并行計算,那么輸出是無序的。從上面的例子中,我們可以看出兩個協程確實并行運行了。
**Channel通道**
Channel提供了`協程之間`的`通信方式`以及`運行同步機制`。
>假設訓練定點投籃和三分投籃,教練在計數。
package main
import (
"fmt"
"time"
)
func fixed_shooting(msg_chan chan string) {
for {
msg_chan <- "fixed shooting"
fmt.Println("continue fixed shooting...")
}
}
func count(msg_chan chan string) {
for {
msg := <-msg_chan
fmt.Println(msg)
time.Sleep(time.Second * 1)
}
}
func main() {
var c chan string
c = make(chan string)
go fixed_shooting(c)
go count(c)
var input string
fmt.Scanln(&input)
}
輸出結果為:
fixed shooting
continue fixed shooting...
fixed shooting
continue fixed shooting...
fixed shooting
continue fixed shooting...
我們看到在fixed_shooting函數里面我們將消息傳遞到channel,然后輸出提示信息"continue fixed shooting...",而在count函數里面,我們從channel里面取出消息輸出,然后間隔1秒再去取消息輸出。這里面我們可以考慮一下,如果我們不去從channel中取消息會出現什么情況?我們把main函數里面的`go count(c)`注釋掉,然后再運行一下。發現程序再也不會輸出消息和提示信息了。這是因為channel中根本就沒有信息了,因為`如果你要向channel里面寫信息`,`必須有配對的取信息的一端`,否則是不會寫的。
我們再把三分投籃加上。
package main
import (
"fmt"
"time"
)
func fixed_shooting(msg_chan chan string) {
for {
msg_chan <- "fixed shooting"
}
}
func three_point_shooting(msg_chan chan string) {
for {
msg_chan <- "three point shooting"
}
}
func count(msg_chan chan string) {
for {
msg := <-msg_chan
fmt.Println(msg)
time.Sleep(time.Second * 1)
}
}
func main() {
var c chan string
c = make(chan string)
go fixed_shooting(c)
go three_point_shooting(c)
go count(c)
var input string
fmt.Scanln(&input)
}
輸出結果為:
fixed shooting
three point shooting
fixed shooting
three point shooting
fixed shooting
three point shooting
我們看到程序交替輸出定點投籃和三分投籃,這是因為寫入channel的信息必須要讀取出來,否則嘗試再次寫入就失敗了。
在上面的例子中,我們發現`定義一個channel信息變量`的方式就是多加一個`chan`關鍵字。并且你能夠`向channel寫入數據`和`從channel讀取數據`。這里我們還可以設置channel通道的方向。
**Channel通道方向***
所謂的`通道方向`就是`寫`和`讀`。如果我們如下定義
c chan<- string //那么你只能向channel寫入數據
而這種定義
c <-chan string //那么你只能從channel讀取數據
`試圖向只讀chan變量寫入數據或者試圖從只寫chan變量讀取數據都會導致編譯錯誤。`
如果是默認的定義方式
c chan string //那么你既可以向channel寫入數據也可以從channnel讀取數據
**多通道(Select)**
如果上面的投籃訓練現在有兩個教練了,各自負責一個訓練項目。而且還在不同的籃球場,這個時候很顯然,我們一個channel就不夠用了。修改一下:
package main
import (
"fmt"
"time"
)
func fixed_shooting(msg_chan chan string) {
for {
msg_chan <- "fixed shooting"
time.Sleep(time.Second * 1)
}
}
func three_point_shooting(msg_chan chan string) {
for {
msg_chan <- "three point shooting"
time.Sleep(time.Second * 1)
}
}
func main() {
c_fixed := make(chan string)
c_3_point := make(chan string)
go fixed_shooting(c_fixed)
go three_point_shooting(c_3_point)
go func() {
for {
select {
case msg1 := <-c_fixed:
fmt.Println(msg1)
case msg2 := <-c_3_point:
fmt.Println(msg2)
}
}
}()
var input string
fmt.Scanln(&input)
}
其他的和上面的一樣,唯一不同的是我們將定點投籃和三分投籃的消息寫入了不同的channel,那么main函數如何知道從哪個channel讀取消息呢?使用select方法,select方法依次檢查每個channel是否有消息傳遞過來,如果有就取出來輸出。如果同時有多個消息到達,那么select閉上眼睛隨機選一個channel來從中讀取消息,如果沒有一個channel有消息到達,那么select語句就阻塞在這里一直等待。
在某些情況下,比如學生投籃中受傷了,那么就輪到醫護人員上場了,教練在一般看看,如果是重傷,教練就不等了,就回去了休息了,待會兒再過來看看情況。我們可以給select加上一個case用來判斷是否等待各個消息到達超時。
package main
import (
"fmt"
"time"
)
func fixed_shooting(msg_chan chan string) {
var times = 3
var t = 1
for {
if t <= times {
msg_chan <- "fixed shooting"
}
t++
time.Sleep(time.Second * 1)
}
}
func three_point_shooting(msg_chan chan string) {
var times = 5
var t = 1
for {
if t <= times {
msg_chan <- "three point shooting"
}
t++
time.Sleep(time.Second * 1)
}
}
func main() {
c_fixed := make(chan string)
c_3_point := make(chan string)
go fixed_shooting(c_fixed)
go three_point_shooting(c_3_point)
go func() {
for {
select {
case msg1 := <-c_fixed:
fmt.Println(msg1)
case msg2 := <-c_3_point:
fmt.Println(msg2)
case <-time.After(time.Second * 5):
fmt.Println("timeout, check again...")
}
}
}()
var input string
fmt.Scanln(&input)
}
在上面的例子中,我們讓投籃的人在幾次過后掛掉,然后教練就每次等5秒出來看看情況(累死丫的,:-P),因為我們對等待的時間不感興趣就不用變量存儲了,直接`<-time.After(time.Second*5)`,或許你會奇怪,為什么各個channel消息都沒有到達,select為什么不阻塞?就是因為這個time.After,雖然它沒有顯式地告訴你這是一個channel消息,但是記得么?main函數也是一個channel啊!哈哈!至于time.After的功能實際上讓main阻塞了5秒后返回給main的channel一個時間。所以我們在case里面把這個時間消息讀出來,select就不阻塞了。
輸出結果如下:
fixed shooting
three point shooting
fixed shooting
three point shooting
fixed shooting
three point shooting
three point shooting
three point shooting
timeout, check again...
timeout, check again...
timeout, check again...
timeout, check again...
這里select還有一個`default的選項`,如果你指定了default選項,那么當select發現`沒有消息到達`的時候`也不會阻塞`,直接開始轉回去再次判斷。
**Channel Buffer通道緩沖區**
我們定義chan變量的時候,還可以指定它的`緩沖區大小`。一般我們`定義的channel都是同步的`,也就是說接受端和發送端彼此等待對方ok才開始。但是如果你給一個channel`指定了一個緩沖區`,那么`消息的發送和接受式異步的`,`除非channel緩沖區已經滿了`。
c:=make(chan int, 1)
我們看個例子:
package main
import (
"fmt"
"strconv"
"time"
)
func shooting(msg_chan chan string) {
var group = 1
for {
for i := 1; i <= 10; i++ {
msg_chan <- strconv.Itoa(group) + ":" + strconv.Itoa(i)
}
group++
time.Sleep(time.Second * 10)
}
}
func count(msg_chan chan string) {
for {
fmt.Println(<-msg_chan)
}
}
func main() {
var c = make(chan string, 20)
go shooting(c)
go count(c)
var input string
fmt.Scanln(&input)
}
輸出結果為:
1:1
1:2
1:3
1:4
1:5
1:6
1:7
1:8
1:9
1:10
2:1
2:2
2:3
2:4
2:5
2:6
2:7
2:8
2:9
2:10
3:1
3:2
3:3
3:4
3:5
3:6
3:7
3:8
3:9
3:10
4:1
4:2
4:3
4:4
4:5
4:6
4:7
4:8
4:9
4:10
你可以嘗試運行一下,每次都是一下子輸出10個數據。然后等待10秒再輸出一批。
**小結**
并行計算這種特點最適合用來開發網站服務器,因為一般網站服務都是高并發的,邏輯十分復雜。而使用Go的這種特性恰是提供了一種極好的方法。