<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                [TOC] ## 以通信實現共享 并發程序設計是一個比較大的主題,這里我們只討論一些Go語言特有的亮點。 由于需要考慮很多繁瑣的細節以保證對共享變量訪問的正確型,使得并發編程在很多情況下都會變得異常復雜。Go語言鼓勵開發者采用一種不同的方法,即將共享 變量通過Channel相互傳遞 —— 事實上并沒有真正在不同的執行線程間共享數據 —— 的方式解決上述問題。在任意時刻,僅有一個Goroutine可以訪問某個變量。數據競爭問題在設計上就被規避了。為了鼓勵采用這種思維模式,我們將其總 結為一句口號: > 勿以共享方式通信,以通信實現共享。 這種方法還可以走得更遠。舉例而言,“引用計數”最好的實現途徑可能就是通過在一個共享的整數周圍加一個鎖進行保護。但是在更高的層次,通過使用Channel控制共享整數訪問可以梗容易的寫出整潔、正確的程序。 試著用下面的方法來分析上述模型:想象我們只是在處理傳統的單線程程序,該程序僅運行在一個物理CPU上。基于這個前提進行開發,是無需提供任何同步原語 的。現在,啟動另一個類似的實例;它同樣也不需要任何同步原語。然后讓這兩個實例進行通信;如果將通信本身算作一種同步原語,那么它是系統中僅有的同步原 語。Unix操作系統的管道(Pipeline)就是上述模型的一個很好實例。盡管Go語言的并發模型源自Hoare的CSP模型 (Communicating Sequential Processes, 國內譯為“通信順序進程”,臺灣譯為“交談循序程序”),但它也可以被看成是一種類型安全的、一般化的Unix管道。 ## Goroutines 之所以稱之為*Goroutine*,主要是由于現有的一些概念—“線程”、“協程” 以及 “進程” 等—都不足以準確描述其內涵。每個Goroutine都對應一個非常簡單的模型:它是一個并發的函數執行線索,并且在多個并發的Goroutine間,資 源是共享的。Goroutine非常輕量,創建的開銷不會比棧空間分配的開銷大多少。并且其初始棧空間很小 —— 這也就是它輕量的原因 —— 在后續執行中,會根據需要在堆空間分配(或釋放)額外的棧空間。 Goroutine與操作系統線程間采用“多到多”的映射方式,因此假設一個Goroutine因某種原因阻塞 —— 比如等待一個尚未到達的IO —— 其他Goroutine可以繼續執行。我們在實現中屏蔽了許多底層關于線程創建、管理的復雜細節。 在一個函數或是方法前加上`go`關鍵字就可以創建一個Goroutine并調用該函數或方法。當該函數執行結束,Goroutine也隨之隱式退出。(這種效果很像在Unix Shell里用`&`符號在后臺啟動一個命令。) ~~~ go list.Sort() // run list.Sort concurrently; don't wait for it. ~~~ 還可以將“函數文本”(function literals)嵌入到一個Goroutine創建之際,方法如下: ~~~ func Announce(message string, delay time.Duration) { go func() { time.Sleep(delay) fmt.Println(message) }() // Note the parentheses - must call the function. } ~~~ 在Go中,這種“函數文本”的形式就是閉包(closure):實現保證了在這類函數中被引用的變量在函數結束之前不會被釋放。 以上的例子并不是很實用,因為執行函數無法發布其完成的信號。因此,我們還需要channel這一結構。 ## Channels 與map結構類似,channel也是通過`make`進行分配的,其返回值實際上是一個指向底層相關數據結構的引用。如果在創建channel時提供一個可選的整型參數,會設置該channel的緩沖區大小。該值缺省為0,用來構建默認的“無緩沖channel”,也稱為“同步channel”。 ~~~ ci := make(chan int) // unbuffered channel of integers cj := make(chan int, 0) // unbuffered channel of integers cs := make(chan *os.File, 100) // buffered channel of pointers to Files ~~~ 無緩沖的channel使得通信—值的交換—和同步機制組合—共同保證了兩個執行線索(Goroutines)運行于可控的狀態。 對于channel,有很多巧妙的用法。我們通過以下示例開始介紹。上一節中,我們曾在后臺發起過一個排序操作。通過使用channel,可以讓發起操作的Gorouine等待排序操作的完成。 ~~~ c := make(chan int) // Allocate a channel. // Start the sort in a goroutine; when it completes, signal on the channel. go func() { list.Sort() c <- 1 // Send a signal; value does not matter. }() doSomethingForAWhile() <-c // Wait for sort to finish; discard sent value. ~~~ 接收方會一直阻塞直到有數據到來。如果channel是無緩沖的,發送方會一直阻塞直到接收方將數據取出。如果channel帶有緩沖區,發送方會一直阻塞直到數據被拷貝到緩沖區;如果緩沖區已滿,則發送方只能在接收方取走數據后才能從阻塞狀態恢復。 帶緩沖區的channel可以像信號量一樣使用,用來完成諸如吞吐率限制等功能。在以下示例中,到來的請求以參數形式傳入`handle`函數,該函數從channel中讀出一個值,然后處理請求,最后再向channel寫入以使“信號量”可用,以便響應下一次處理。該channel的緩沖區容量決定了并發調用`process`函數的上限,因此在channel初始化時,需要傳入相應的容量參數。 ~~~ var sem = make(chan int, MaxOutstanding) func handle(r *Request) { <-sem // Wait for active queue to drain. process(r) // May take a long time. sem <- 1 // Done; enable next request to run. } func init() { for i := 0; i < MaxOutstanding; i++ { sem <- 1 } } func Serve(queue chan *Request) { for { req := <-queue go handle(req) // Don't wait for handle to finish. } } ~~~ 由于在Go中,數據同步發生在從channel接收數據階段(也就是說,發送操作發生在接收操作之前,參見[Go內存模型](http://localhost:6060/ref/mem)),因此獲取信號量的操作必須實現在channel的接收階段,而不是發送階段。 這樣的設計會引入一個問題:?`Serve`會為每個請求創建一個新的Goroutine,盡管在任意時刻只有最多`MaxOutstanding`個可以執行。如果請求到來的速度過快,將迅速導致系統資源完全消耗。我們可以通過修改`Serve`的實現來對Goroutine的創建進行限制。以下給出一個簡單的實現,請注意其中包含一個BUG,我們會在后續進行修正: ~~~ func Serve(queue chan *Request) { for req := range queue { <-sem go func() { process(req) // Buggy; see explanation below. sem <- 1 }() } } ~~~ 剛才說的BUG源自Go中`for`循環的實現,循環的迭代變量會在循環中被重用,因此`req`變量會在所有Goroutine間共享。這不是我們所樂見的,我們需要保證`req`變量是每個Goroutine私有的。這里提供一個方法,將`req`的值以參數形式提供給goroutine對應的閉包: ~~~ func Serve(queue chan *Request) { for req := range queue { <-sem go func(req *Request) { process(req) sem <- 1 }(req) } } ~~~ 請與之前有BUG的實現進行對比,看看閉包在聲明和運行上的不同之處。另一個解決方案是,干脆創建一個新的同名變量,示例如下: ~~~ func Serve(queue chan *Request) { for req := range queue { <-sem req := req // Create new instance of req for the goroutine. go func() { process(req) sem <- 1 }() } } ~~~ 這樣寫可能看起來怪怪的 ~~~ req := req ~~~ 但它確實是合法的并且在Go中是一種慣用的方法。你可以如法泡制一個新的同名變量,用來為每個Goroutine創建循環變量的私有拷貝。 回到實現通用服務器的問題上來,另一個有效的資源管理途徑是啟動固定數量的`handle`?Goroutine,每個Goroutine都直接從channel中讀取請求。這個固定的數值就是同時執行`process`的最大并發數。`Serve`函數還需要一個額外的channel參數,用來等待退出通知;當創建完所有的Goroutine之后, Server 自身阻塞在該channel上等待結束信號。 ~~~ func handle(queue chan *Request) { for r := range queue { process(r) } } func Serve(clientRequests chan *Request, quit chan bool) { // Start handlers for i := 0; i < MaxOutstanding; i++ { go handle(clientRequests) } <-quit // Wait to be told to exit. } ~~~ ## Channel類型的Channel Channel在Go語言中是一個 first-class 類型,這意味著channel可以像其他 first-class 類型變量一樣進行分配、傳遞。該屬性的一個常用方法是用來實現安全、并行的解復用(demultiplexing)處理。 在上節的那個例子中,`handle`是一個理想化的請求處理,但我們并沒有定義處理的類型。如果處理的類型中包括一個用來響應的channel,則每個客戶端可以提供其獨特的響應方式。這里提供一個簡單的`Request`類型定義: ~~~ type Request struct { args []int f func([]int) int resultChan chan int } ~~~ 客戶端提供了一個函數及其參數,以及一個內部的channel變量用來接收回答消息。 ~~~ func sum(a []int) (s int) { for _, v := range a { s += v } return } request := &Request{[]int{3, 4, 5}, sum, make(chan int)} // Send request clientRequests <- request // Wait for response. fmt.Printf("answer: %d\n", <-request.resultChan) ~~~ 在服務器端,只有處理函數handle需要改變: ~~~ func handle(queue chan *Request) { for req := range queue { req.resultChan <- req.f(req.args) } } ~~~ 顯然,上述例子還有很大的優化空間以提高其可用性,但是這套代碼已經可以作為一類對速度要求不高、并行、非阻塞式RPC系統的實現框架了,而且實現中沒有使用任何顯式的互斥語法。 ## 并行 上述這些想法的另一個應用場景是將計算在不同的CPU核心之間并行化,如果計算可以被劃分為不同的可獨立執行的部分,那么它就是可并行化的,任務可以通過一個channel發送結束信號。 假設我們需要在數組上進行一個比較耗時的操作,并且操作的值在每個數據上是獨立的,正如下面這個理想化的例子一樣: ~~~ type Vector []float64 // Apply the operation to v[i], v[i+1] ... up to v[n-1]. func (v Vector) DoSome(i, n int, u Vector, c chan int) { for ; i < n; i++ { v[i] += u.Op(v[i]) } c <- 1 // signal that this piece is done } ~~~ 我們在每個CPU上加載一個循環無關的迭代計算。這些計算可能以任意次序完成,但這是無關緊要的;我們僅需要在創建完所有Goroutine后,從channel中讀取結束信號進行計數即可。 ~~~ const NCPU = 4 // number of CPU cores func (v Vector) DoAll(u Vector) { c := make(chan int, NCPU) // Buffering optional but sensible. for i := 0; i < NCPU; i++ { go v.DoSome(i*len(v)/NCPU, (i+1)*len(v)/NCPU, u, c) } // Drain the channel. for i := 0; i < NCPU; i++ { <-c // wait for one task to complete } // All done. } ~~~ 在目前的Go runtime 實現中,這段代碼在默認情況下是不會被并行化的。對于用戶態任務,我們默認僅提供一個物理CPU進行處理。任意數目的Goroutine可以阻塞在系統調 用上,但默認情況下,在任意時刻,只有一個Goroutine可以被調度執行。我們未來可能會將其設計的更加智能,但是目前,你必須通過設置`GOMAXPROCS`環境變量或者導入`runtime`包并調用`runtime.GOMAXPROCS(NCPU)`, 來告訴Go的運行時系統最大并行執行的Goroutine數目。你可以通過`runtime.NumCPU()`獲得當前運行系統的邏輯核數,作為一個有用的參考。需要重申:上述方法可能會隨我們對實現的完善而最終被淘汰。 注意不要把“并發”和“并行”這兩個概念搞混:“并發”是指用一些彼此獨立的執行模塊構建程序;而“并行”則是指通過將計算任務在多個處理器上同時執行以 提高效率。盡管對于一些問題,我們可以利用“并發”特性方便的構建一些并行的程序部件,但是Go終究是一門“并發”語言而非“并行”語言,并非所有的并行 編程模式都適用于Go語言模型。要進一步區分兩者的概念,請參考[這篇博客](http://blog.golang.org/2013/01/concurrency-is-not-parallelism.html)的相關討論。 ## 一個“Leaky Buffer”的示例 并發編程的工具甚至可以更簡單的表達一些非并發的想法。下面提供一個示例,它是從RPC的一個包里抽象而來的。客戶端從某些源 —— 比如網絡 —— 循環接收數據。為了避免頻繁的分配、釋放內存緩沖,程序在內部實現了一個空閑鏈表,并用一個Buffer指針型channel將其封裝。當該 channel為空時,程序為其分配一個新的Buffer對象。一旦消息緩沖就緒,它就會被經由`serverChan`發送到服務器端。 ~~~ var freeList = make(chan *Buffer, 100) var serverChan = make(chan *Buffer) func client() { for { var b *Buffer // Grab a buffer if available; allocate if not. select { case b = <-freeList: // Got one; nothing more to do. default: // None free, so allocate a new one. b = new(Buffer) } load(b) // Read next message from the net. serverChan <- b // Send to server. } } ~~~ 服務器端循環從客戶端接收并處理每個消息,然后將Buffer對象返回到空閑鏈表中。 ~~~ func server() { for { b := <-serverChan // Wait for work. process(b) // Reuse buffer if there's room. select { case freeList <- b: // Buffer on free list; nothing more to do. default: // Free list full, just carry on. } } } ~~~ 客戶端會嘗試從空閑鏈表`freeList`中獲取Buffer對象;如果沒有可用對象,則分配一個新的。服務器端會將用完的Buffer對象 b 加入到空閑鏈表`freeList`中,如果鏈表已滿,則將`b`丟棄,垃圾收集器會在未來某個時刻自動回收對應的內存單元。(`select`語句中的`default`分支會在沒有其他case分支滿足條件時執行,這意味著`select`語句塊不會阻塞。)以上就是一個 Leaky Bucket Free List 的簡單實現,借助Go語言中帶緩沖的channel以及“垃圾收集”機制,我們僅用幾行代碼就將其搞定了。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看