<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                如題,近兩天遇到此類錯誤,發現goroutine以及channel的基礎仍需鞏固。由該錯誤牽引出go相關并發操作的問題,下面做一些簡單的tips操作和記錄。 ``` func hello() { fmt.Println("Hello Goroutine!") } func main() { go hello() // 啟動另外一個goroutine去執行hello函數 fmt.Println("main goroutine done!") } ``` 1、在程序啟動時,Go程序就會為main()函數創建一個默認的goroutine。當main()函數返回的時候該goroutine就結束了,所有在main()函數中啟動的goroutine會一同結束! 所以引出sync.WaitGroup的使用。通過它,可以實現goroutine的同步。 ``` var wg sync.WaitGroup func hello(i int) { defer wg.Done() // goroutine結束就登記-1 fmt.Println("Hello Goroutine!", i) } func main() { for i := 0; i < 10; i++ { wg.Add(1) // 啟動一個goroutine就登記+1 go hello(i) } wg.Wait() // 等待所有登記的goroutine都結束 } ``` 2、單純地將函數并發執行是沒有意義的。函數與函數間需要交換數據才能體現并發執行函數的意義。如果說goroutine是Go程序并發的執行體,channel就是它們之間的連接。channel是可以讓一個goroutine發送特定值到另一個goroutine的通信機制。Go 語言中的通道(channel)是一種特殊的類型。通道像一個傳送帶或者隊列,總是遵循先入先出(First In First Out)的規則,保證收發數據的順序。每一個通道都是一個具體類型的導管,也就是聲明channel的時候需要為其指定元素類型。 通道有發送(send)、接收(receive)和關閉(close)三種操作。 發送和接收都使用<-符號。我們通過調用內置的close函數來關閉通道。 關閉后的通道有以下特點: 對一個關閉的通道再發送值就會導致panic。 對一個關閉的通道進行接收會一直獲取值直到通道為空。 對一個關閉的并且沒有值的通道執行接收操作會得到對應類型的零值。 關閉一個已經關閉的通道會導致panic。 無緩沖的通道又稱為阻塞的通道: ``` func main() { ch := make(chan int) ch <- 10 fmt.Println("發送成功") } ``` 上面這段代碼能夠通過編譯,但是執行的時候會出現以下錯誤: ``` fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan send]: main.main() ``` 們使用`ch := make(chan int)`創建的是**無緩沖的通道**,無緩沖的通道只有在有人接收值的時候才能發送值。 上面的代碼會阻塞在`ch <- 10`這一行代碼形成[死鎖](https://so.csdn.net/so/search?q=%E6%AD%BB%E9%94%81&spm=1001.2101.3001.7020),那如何解決這個問題呢? 一種方法是啟用一個`goroutine`去接收值,并一種方式是使用帶緩沖的通道,例如: ``` package main // 方式1 func recv(c chan int) { ret := <-c fmt.Println("接收成功", ret) } func main() { ch := make(chan int) go recv(ch) // 啟用goroutine從通道接收值 ch <- 10 fmt.Println("發送成功") } // 方式2 func main() { ch := make(chan int,1) ch<-1 println(<-ch) } ``` 但是注意:channel 通道增加緩存區后,可將數據暫存到緩沖區,而不需要接收端同時接收 (緩沖區如果超出大小同樣會造成死鎖) ![](https://img.kancloud.cn/3d/b8/3db833b95adc1d071fb7483d35a31ea0_598x370.png) ![](https://img.kancloud.cn/98/5c/985cf65230dd907dc26e5fb7d8308d0a_802x316.png) 如圖,總結,可以看出,產生阻塞的方式,主要容易踩坑的有兩種:空的通道一直接收會阻塞;滿的通道一直發送也會阻塞! 3、那么,如何解決阻塞死鎖問題呢? 1)、如果是上面的無緩沖通道,使用再起一個協程的方式,可使得接收端和發送端并行執行。 2)、可以初始化時就給channel增加緩沖區,也就是使用有緩沖的通道 3)、易踩坑點,針對有緩沖的通道,產生阻塞,如何解決? 如下面例子,開啟多個goroutine并發執行任務,并將數據存入管道channel,后續讀取數據: ``` package main import ( "fmt" "sync" "time" ) func request(index int,ch chan<- string) { time.Sleep(time.Duration(index)*time.Second) s := fmt.Sprintf("編號%d完成",index) ch <- s } func main() { ch := make(chan string, 10) fmt.Println(ch,len(ch)) for i := 0; i < 4; i++ { go request(i, ch) } for ret := range ch{ fmt.Println(len(ch)) fmt.Println(ret) } } ``` ![](https://img.kancloud.cn/c7/00/c7004a00391beefa18ff22530f170ce7_556x230.png) **不可靠的解決方式如下:** ``` for { i, ok := <-ch // 通道關閉后再取值ok=false;通道為空去接收,會發生阻塞死鎖 if !ok { break } println(i) } ``` ``` for ret := range ch{ fmt.Println(len(ch)) fmt.Println(ret) //通道為空去接收,會發生阻塞死鎖 } ``` **以上兩種從通道獲取方式,都有小坑! 一旦獲取的通道沒有主動close(ch)關閉,而且通道為空時,無論通過for還是foreach方式去取值獲取,都會產生阻塞死鎖deadlock chan receive錯誤!?** **可靠的解決方式1 如下:** ``` package main import ( "fmt" "sync" "time" ) var wg sync.WaitGroup func request(index int,ch chan<- string) { time.Sleep(time.Duration(index)*time.Second) s := fmt.Sprintf("編號%d完成",index) ch <- s defer wg.Done() } func main() { ch := make(chan string, 10) go func() { wg.Wait() close(ch) }() for i := 0; i < 4; i++ { wg.Add(1) go request(i, ch) } for ret := range ch{ fmt.Println(len(ch)) fmt.Println(ret) } } ``` 解決方式: 即我們在生成完4個goroutine后對data channel進行關閉,這樣通過for range從通道循環取出全部值,通道關閉就會退出for range循環。 具體實現:可以利用sync.WaitGroup解決,在所有的 data channel 的輸入處理之前,wg.Wait()這個goroutine會處于等待狀態(wg.Wait()源碼就是for循環)。當執行方法處理完后(wg.Done),wg.Wait()就會放開執行,執行后面的close(ch)。 可靠的解決方式2 如下: ``` package main import ( "fmt" "time" ) func request(index int,ch chan<- string) { time.Sleep(time.Duration(index)*time.Second) s := fmt.Sprintf("編號%d完成",index) ch <- s } func main() { ch := make(chan string, 10) for i := 0; i < 4; i++ { go request(i, ch) } for { select { case i := <-ch: // select會一直等待,直到某個case的通信操作完成時,就會執行case分支對應的語句 println(i) default: time.Sleep(time.Second) fmt.Println("無數據") } } } ``` 上面這種方式獲取,通過select case + default的方式也可以完美避免阻塞死鎖報錯!但是適用于通道不關閉,需要時刻循環執行數據并且處理的情境下。 4、由此,引入了select多路復用的使用 在某些場景下我們需要同時從多個通道接收數據。通道在接收數據時,如果沒有數據可以接收將會發生阻塞。select的使用類似于switch語句,它有一系列case分支和一個默認的分支。每個case會對應一個通道的通信(接收或發送)過程。select會一直等待,直到某個case的通信操作完成時,就會執行case分支對應的語句。具體格式如下: ``` select{ case <-ch1: ... case data := <-ch2: ... case ch3<-data: ... default: 默認操作 } ``` 一定留意,default的作用很大! 是避免阻塞的核心。 使用select語句能提高代碼的可讀性。 可處理一個或多個channel的發送/接收操作。 如果多個case同時滿足,select會隨機選擇一個。 對于沒有case的select{}會一直等待,可用于阻塞main函數。 5、實際項目中goroutine+channel+select的使用 如下,使用于 項目監聽終端中斷信號操作: ``` srv := http.Server{ Addr: setting.AppConf.Http.Addr, Handler: routers.SetupRouter(setting.AppConf), } go func() { // 開啟一個goroutine啟動服務 if err := srv.ListenAndServe(); err != nil { zap.S().Errorf("listen finish err: %s addr: %s", err, setting.AppConf.Http.Addr) } }() // 等待中斷信號來優雅地關閉服務器,為關閉服務器操作設置一個5秒的超時 sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) for { select { case s := <-sig: zap.S().Infof("recv exit signal: %s", s.String()) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // 5秒內優雅關閉服務(將未處理完的請求處理完再關閉服務),超過5秒就超時退出 if err := srv.Shutdown(ctx); err != nil { zap.S().Fatal("Server Shutdown err: ", err) } zap.S().Info("Server Shutdown Success") return } } ``` 如下,使用于**項目通過通道來進行數據處理、數據發送接收等操作**: ``` package taillog // 專門從日志文件,收集日志 import ( "context" "fmt" "github.com/hpcloud/tail" "logagent/kafka" ) //var ( // tailObj *tail.Tail //) //TailTask 一個日志收集的任務 type TailTask struct { path string topic string instance *tail.Tail //為了能實現退出t.run ctx context.Context cancelFunc context.CancelFunc } func NewTailTask(path,topic string) (tailObj *TailTask) { ctx,cancel := context.WithCancel(context.Background()) tailObj = &TailTask{ path:path, topic:topic, ctx:ctx, cancelFunc:cancel, } tailObj.init() //根據路徑去打開對應的日志 return } func (t *TailTask)init() { config := tail.Config{ ReOpen: true, //重新打開 Follow: true, //是否跟隨 Location: &tail.SeekInfo{Offset:0,Whence:2}, //從文件哪個地方開始讀 MustExist: false, //文件不存在不報錯 Poll: true, } var err error t.instance, err = tail.TailFile(t.path, config) if err != nil { fmt.Println("tail file failed,err:",err) } // 當goroutine執行的函數退出的時候,goroutine結束 go t.run() //直接去采集日志,發送到kafka } func (t *TailTask)run() { for{ select { case <- t.ctx.Done(): fmt.Printf("tail task:%s_%s 結束了\n",t.path,t.topic) return case line := <- t.instance.Lines: //從tailObj一行行讀取數據 //發往kafka //kafka.SendToKafka(t.topic,line.Text) //函數調用函數 // 優化,先把日志數據發送到一個通道中 // kafka包中有單獨的goroutine去取日志發送到kafka kafka.SendToChan(t.topic,line.Text) } } } ``` ``` package kafka //專門從kafka寫日志 import ( "fmt" "github.com/Shopify/sarama" "time" ) type logData struct { topic string data string } var ( client sarama.SyncProducer //聲明一個全局連接kafka的生產者client logDataChan chan *logData ) // 初始化client func Init(address []string, maxSize int)(err error) { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll //發送完數據需要leader和follow都確認 config.Producer.Partitioner = sarama.NewRandomPartitioner //新選出一個partition config.Producer.Return.Successes = true //成功交付的消息將在success channel 返回 //連接kafka client,err = sarama.NewSyncProducer(address,config) if err != nil { fmt.Println("producer closed,err:",err) return } // 初始化logDataChan logDataChan = make(chan *logData,maxSize) // 開啟后臺的goroutine從通道取數據,發送kafka go sendToKafka() return } // 給外部暴漏一個函數,該函數只把日志數據發送到一個內部chan中 func SendToChan(topic,data string) { msg := &logData{ topic: topic, data: data, } logDataChan <- msg } //真正往kafka發送日志的函數 func sendToKafka() { for{ select { case ld := <- logDataChan: // 構造一個消息 msg := &sarama.ProducerMessage{} msg.Topic = ld.topic msg.Value = sarama.StringEncoder(ld.data) // 發送到kafka pid,offset,err := client.SendMessage(msg) if err != nil { fmt.Println("send msg failed,err:",err) return } fmt.Printf("pid:%v,offset:%v\n",pid,offset) default: time.Sleep(time.Microsecond*50) } } } ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看