如題,近兩天遇到此類錯誤,發現goroutine以及channel的基礎仍需鞏固。由該錯誤牽引出go相關并發操作的問題,下面做一些簡單的tips操作和記錄。
```
func hello() {
fmt.Println("Hello Goroutine!")
}
func main() {
go hello() // 啟動另外一個goroutine去執行hello函數
fmt.Println("main goroutine done!")
}
```
1、在程序啟動時,Go程序就會為main()函數創建一個默認的goroutine。當main()函數返回的時候該goroutine就結束了,所有在main()函數中啟動的goroutine會一同結束!
所以引出sync.WaitGroup的使用。通過它,可以實現goroutine的同步。
```
var wg sync.WaitGroup
func hello(i int) {
defer wg.Done() // goroutine結束就登記-1
fmt.Println("Hello Goroutine!", i)
}
func main() {
for i := 0; i < 10; i++ {
wg.Add(1) // 啟動一個goroutine就登記+1
go hello(i)
}
wg.Wait() // 等待所有登記的goroutine都結束
}
```
2、單純地將函數并發執行是沒有意義的。函數與函數間需要交換數據才能體現并發執行函數的意義。如果說goroutine是Go程序并發的執行體,channel就是它們之間的連接。channel是可以讓一個goroutine發送特定值到另一個goroutine的通信機制。Go 語言中的通道(channel)是一種特殊的類型。通道像一個傳送帶或者隊列,總是遵循先入先出(First In First Out)的規則,保證收發數據的順序。每一個通道都是一個具體類型的導管,也就是聲明channel的時候需要為其指定元素類型。
通道有發送(send)、接收(receive)和關閉(close)三種操作。
發送和接收都使用<-符號。我們通過調用內置的close函數來關閉通道。
關閉后的通道有以下特點:
對一個關閉的通道再發送值就會導致panic。
對一個關閉的通道進行接收會一直獲取值直到通道為空。
對一個關閉的并且沒有值的通道執行接收操作會得到對應類型的零值。
關閉一個已經關閉的通道會導致panic。
無緩沖的通道又稱為阻塞的通道:
```
func main() {
ch := make(chan int)
ch <- 10
fmt.Println("發送成功")
}
```
上面這段代碼能夠通過編譯,但是執行的時候會出現以下錯誤:
```
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
```
們使用`ch := make(chan int)`創建的是**無緩沖的通道**,無緩沖的通道只有在有人接收值的時候才能發送值。
上面的代碼會阻塞在`ch <- 10`這一行代碼形成[死鎖](https://so.csdn.net/so/search?q=%E6%AD%BB%E9%94%81&spm=1001.2101.3001.7020),那如何解決這個問題呢?
一種方法是啟用一個`goroutine`去接收值,并一種方式是使用帶緩沖的通道,例如:
```
package main
// 方式1
func recv(c chan int) {
ret := <-c
fmt.Println("接收成功", ret)
}
func main() {
ch := make(chan int)
go recv(ch) // 啟用goroutine從通道接收值
ch <- 10
fmt.Println("發送成功")
}
// 方式2
func main() {
ch := make(chan int,1)
ch<-1
println(<-ch)
}
```
但是注意:channel 通道增加緩存區后,可將數據暫存到緩沖區,而不需要接收端同時接收 (緩沖區如果超出大小同樣會造成死鎖)


如圖,總結,可以看出,產生阻塞的方式,主要容易踩坑的有兩種:空的通道一直接收會阻塞;滿的通道一直發送也會阻塞!
3、那么,如何解決阻塞死鎖問題呢?
1)、如果是上面的無緩沖通道,使用再起一個協程的方式,可使得接收端和發送端并行執行。
2)、可以初始化時就給channel增加緩沖區,也就是使用有緩沖的通道
3)、易踩坑點,針對有緩沖的通道,產生阻塞,如何解決?
如下面例子,開啟多個goroutine并發執行任務,并將數據存入管道channel,后續讀取數據:
```
package main
import (
"fmt"
"sync"
"time"
)
func request(index int,ch chan<- string) {
time.Sleep(time.Duration(index)*time.Second)
s := fmt.Sprintf("編號%d完成",index)
ch <- s
}
func main() {
ch := make(chan string, 10)
fmt.Println(ch,len(ch))
for i := 0; i < 4; i++ {
go request(i, ch)
}
for ret := range ch{
fmt.Println(len(ch))
fmt.Println(ret)
}
}
```

**不可靠的解決方式如下:**
```
for {
i, ok := <-ch // 通道關閉后再取值ok=false;通道為空去接收,會發生阻塞死鎖
if !ok {
break
}
println(i)
}
```
```
for ret := range ch{
fmt.Println(len(ch))
fmt.Println(ret) //通道為空去接收,會發生阻塞死鎖
}
```
**以上兩種從通道獲取方式,都有小坑! 一旦獲取的通道沒有主動close(ch)關閉,而且通道為空時,無論通過for還是foreach方式去取值獲取,都會產生阻塞死鎖deadlock chan receive錯誤!?**
**可靠的解決方式1 如下:**
```
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func request(index int,ch chan<- string) {
time.Sleep(time.Duration(index)*time.Second)
s := fmt.Sprintf("編號%d完成",index)
ch <- s
defer wg.Done()
}
func main() {
ch := make(chan string, 10)
go func() {
wg.Wait()
close(ch)
}()
for i := 0; i < 4; i++ {
wg.Add(1)
go request(i, ch)
}
for ret := range ch{
fmt.Println(len(ch))
fmt.Println(ret)
}
}
```
解決方式: 即我們在生成完4個goroutine后對data channel進行關閉,這樣通過for range從通道循環取出全部值,通道關閉就會退出for range循環。
具體實現:可以利用sync.WaitGroup解決,在所有的 data channel 的輸入處理之前,wg.Wait()這個goroutine會處于等待狀態(wg.Wait()源碼就是for循環)。當執行方法處理完后(wg.Done),wg.Wait()就會放開執行,執行后面的close(ch)。
可靠的解決方式2 如下:
```
package main
import (
"fmt"
"time"
)
func request(index int,ch chan<- string) {
time.Sleep(time.Duration(index)*time.Second)
s := fmt.Sprintf("編號%d完成",index)
ch <- s
}
func main() {
ch := make(chan string, 10)
for i := 0; i < 4; i++ {
go request(i, ch)
}
for {
select {
case i := <-ch: // select會一直等待,直到某個case的通信操作完成時,就會執行case分支對應的語句
println(i)
default:
time.Sleep(time.Second)
fmt.Println("無數據")
}
}
}
```
上面這種方式獲取,通過select case + default的方式也可以完美避免阻塞死鎖報錯!但是適用于通道不關閉,需要時刻循環執行數據并且處理的情境下。
4、由此,引入了select多路復用的使用
在某些場景下我們需要同時從多個通道接收數據。通道在接收數據時,如果沒有數據可以接收將會發生阻塞。select的使用類似于switch語句,它有一系列case分支和一個默認的分支。每個case會對應一個通道的通信(接收或發送)過程。select會一直等待,直到某個case的通信操作完成時,就會執行case分支對應的語句。具體格式如下:
```
select{
case <-ch1:
...
case data := <-ch2:
...
case ch3<-data:
...
default:
默認操作
}
```
一定留意,default的作用很大! 是避免阻塞的核心。
使用select語句能提高代碼的可讀性。
可處理一個或多個channel的發送/接收操作。
如果多個case同時滿足,select會隨機選擇一個。
對于沒有case的select{}會一直等待,可用于阻塞main函數。
5、實際項目中goroutine+channel+select的使用
如下,使用于 項目監聽終端中斷信號操作:
```
srv := http.Server{
Addr: setting.AppConf.Http.Addr,
Handler: routers.SetupRouter(setting.AppConf),
}
go func() {
// 開啟一個goroutine啟動服務
if err := srv.ListenAndServe(); err != nil {
zap.S().Errorf("listen finish err: %s addr: %s", err, setting.AppConf.Http.Addr)
}
}()
// 等待中斷信號來優雅地關閉服務器,為關閉服務器操作設置一個5秒的超時
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
for {
select {
case s := <-sig:
zap.S().Infof("recv exit signal: %s", s.String())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 5秒內優雅關閉服務(將未處理完的請求處理完再關閉服務),超過5秒就超時退出
if err := srv.Shutdown(ctx); err != nil {
zap.S().Fatal("Server Shutdown err: ", err)
}
zap.S().Info("Server Shutdown Success")
return
}
}
```
如下,使用于**項目通過通道來進行數據處理、數據發送接收等操作**:
```
package taillog
// 專門從日志文件,收集日志
import (
"context"
"fmt"
"github.com/hpcloud/tail"
"logagent/kafka"
)
//var (
// tailObj *tail.Tail
//)
//TailTask 一個日志收集的任務
type TailTask struct {
path string
topic string
instance *tail.Tail
//為了能實現退出t.run
ctx context.Context
cancelFunc context.CancelFunc
}
func NewTailTask(path,topic string) (tailObj *TailTask) {
ctx,cancel := context.WithCancel(context.Background())
tailObj = &TailTask{
path:path,
topic:topic,
ctx:ctx,
cancelFunc:cancel,
}
tailObj.init() //根據路徑去打開對應的日志
return
}
func (t *TailTask)init() {
config := tail.Config{
ReOpen: true, //重新打開
Follow: true, //是否跟隨
Location: &tail.SeekInfo{Offset:0,Whence:2}, //從文件哪個地方開始讀
MustExist: false, //文件不存在不報錯
Poll: true,
}
var err error
t.instance, err = tail.TailFile(t.path, config)
if err != nil {
fmt.Println("tail file failed,err:",err)
}
// 當goroutine執行的函數退出的時候,goroutine結束
go t.run() //直接去采集日志,發送到kafka
}
func (t *TailTask)run() {
for{
select {
case <- t.ctx.Done():
fmt.Printf("tail task:%s_%s 結束了\n",t.path,t.topic)
return
case line := <- t.instance.Lines: //從tailObj一行行讀取數據
//發往kafka
//kafka.SendToKafka(t.topic,line.Text) //函數調用函數
// 優化,先把日志數據發送到一個通道中
// kafka包中有單獨的goroutine去取日志發送到kafka
kafka.SendToChan(t.topic,line.Text)
}
}
}
```
```
package kafka
//專門從kafka寫日志
import (
"fmt"
"github.com/Shopify/sarama"
"time"
)
type logData struct {
topic string
data string
}
var (
client sarama.SyncProducer //聲明一個全局連接kafka的生產者client
logDataChan chan *logData
)
// 初始化client
func Init(address []string, maxSize int)(err error) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll //發送完數據需要leader和follow都確認
config.Producer.Partitioner = sarama.NewRandomPartitioner //新選出一個partition
config.Producer.Return.Successes = true //成功交付的消息將在success channel 返回
//連接kafka
client,err = sarama.NewSyncProducer(address,config)
if err != nil {
fmt.Println("producer closed,err:",err)
return
}
// 初始化logDataChan
logDataChan = make(chan *logData,maxSize)
// 開啟后臺的goroutine從通道取數據,發送kafka
go sendToKafka()
return
}
// 給外部暴漏一個函數,該函數只把日志數據發送到一個內部chan中
func SendToChan(topic,data string) {
msg := &logData{
topic: topic,
data: data,
}
logDataChan <- msg
}
//真正往kafka發送日志的函數
func sendToKafka() {
for{
select {
case ld := <- logDataChan:
// 構造一個消息
msg := &sarama.ProducerMessage{}
msg.Topic = ld.topic
msg.Value = sarama.StringEncoder(ld.data)
// 發送到kafka
pid,offset,err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed,err:",err)
return
}
fmt.Printf("pid:%v,offset:%v\n",pid,offset)
default:
time.Sleep(time.Microsecond*50)
}
}
}
```
- Golang
- Beego框架
- Gin框架
- gin框架介紹
- 使用Gin web框架的知名開源線上項目
- go-admin-gin
- air 熱啟動
- 完整的form表單參數驗證語法
- Go 語言入門練手項目推薦
- Golang是基于多線程模型
- golang 一些概念
- Golang程序開發注意事項
- fatal error: all goroutines are asleep - deadlock
- defer
- Golang 的內建調試器
- go部署
- golang指針重要性
- 包(golang)
- Golang框架選型比較: goframe, beego, iris和gin
- GoFrame
- golang-admin-項目
- go module的使用方法及原理
- go-admin支持多框架的后臺系統(go-admin.cn)
- docker gocv
- go-fac
- MSYS2
- 企業開發框架系統推薦
- gorm
- go-zero
- 優秀系統
- GinSkeleton(gin web 及gin 知識)
- 一次 request -> response 的生命周期概述
- 路由與路由組以及gin源碼學習
- 中間件以及gin源碼學習
- golang項目部署
- 獨立部署golang
- 代理部署golang
- 容器部署golang
- golang交叉編譯
- goravel
- kardianos+gin 項目作為windows服務運行
- go env
- 適用在Windows、Linux和macOS環境下打包Go應用程序的詳細步驟和命令
- Redis
- Dochub
- Docker部署開發go環境
- Docker部署運行go環境
- dochub說明
- Vue
- i18n
- vue3
- vue3基本知識
- element-plus 表格單選
- vue3后臺模板
- Thinkphp
- Casbin權限控制中間件
- 容器、依賴注入、門面、事件、中間件
- tp6問答
- 偽靜態
- thinkphp-queue
- think-throttle
- thinkphp隊列queue的一些使用說明,queue:work和queue:listen的區別
- ThinkPHP6之模型事件的觸發條件
- thinkphp-swoole
- save、update、insert 的區別
- Socket
- workerman
- 介紹
- 從ThinkPHP6移植到Webman的一些技術和經驗(干貨)
- swoole
- swoole介紹
- hyperf
- hf官網
- Swoft
- swoft官網
- easyswoole
- easyswoole官網地址
- EASYSWOOLE 聊天室DEMO
- socket問答
- MySQL
- 聚簇索引與非聚簇索引
- Mysql使用max獲取最大值細節
- 主從復制
- 隨機生成20萬User表的數據
- MySQL進階-----前綴索引、單例與聯合索引
- PHP
- 面向切面編程AOP
- php是單線程的一定程度上也可以看成是“多線程”
- PHP 線程,進程、并發、并行 的理解
- excel數據畫表格圖片
- php第三方包
- monolog/monolog
- league/glide
- 博客-知識網站
- php 常用bc函數
- PHP知識點的應用場景
- AOP(面向切面編程)
- 注解
- 依賴注入
- 事件機制
- phpspreadsheet導出數據和圖片到excel
- Hyperf
- mineAdmin
- 微服務
- nacos注冊服務
- simps-mqtt連接客戶端simps
- Linux
- 切換php版本
- Vim
- Laravel
- RabbitMQ
- thinkphp+rabbitmq
- 博客
- Webman框架
- 框架注意問題
- 關于內存泄漏
- 移動端自動化
- 懶人精靈
- 工具應用
- render
- gitlab Sourcetree
- ssh-agent失敗 錯誤代碼-1
- 資源網站
- Git
- wkhtmltopdf
- MSYS2 介紹
- powershell curl 使用教程
- NSSM(windows服務工具)
- MinGW64
- 知識擴展
- 對象存儲系統
- minio
- 雪花ID
- 請求body參數類型
- GraphQL
- js 深拷貝
- window 共享 centos文件夾
- 前端get/post 請求 特殊符號 “+”傳參數問題
- 什么是SCM系統?SCM系統與ERP系統有什么區別?
- nginx 日志格式統一為 json
- 特殊符號怎么打
- 收藏網址
- 收藏-golang
- 收藏-vue3
- 收藏-php
- 收藏-node
- 收藏-前端
- 規劃ITEM
- 旅游類
- 人臉識別
- dlib
- Docker&&部署
- Docker-compose
- Docker的網絡模式
- rancher
- DHorse
- Elasticsearch
- es與kibana都docke連接
- 4種數據同步到Elasticsearch方案
- GPT
- 推薦系統
- fastposter海報生成
- elasticsearch+logstash+kibana
- beego文檔系統-MinDoc
- jeecg開源平臺
- Java
- 打包部署
- spring boot
- 依賴
- Maven 相關 命令
- Gradle 相關命令
- mybatis
- mybatis.plus
- spring boot 模板引擎
- SpringBoot+Maven多模塊項目(創建、依賴、打包可執行jar包部署測試)完整流程
- Spring Cloud
- Sentinel
- nacos
- Apollo
- java推薦項目
- gradle
- Maven
- Nexus倉庫管理器
- Python
- Masonite框架
- scrapy
- Python2的pip2
- Python3 安裝 pip3
- 安全攻防
- 運維技術
- 騰訊云安全加固建議
- 免費freessl證書申請
- ruby
- homeland
- Protobuf
- GIT
- FFMPEG
- 命令說明
- 音頻
- ffmpeg合并多個MP4視頻
- NODEJS
- 開發npm包
- MongoDB
- php-docker-mongodb環境搭建
- mongo基本命令
- Docker安裝MongoDB最新版并連接
- 少兒編程官網
- UI推薦
- MQTT
- PHP連接mqtt
- EMQX服務端
- php搭建mqtt服務端