<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                Go語言類庫中,有兩個官方的服務器框架,一個HTTP,一個是RPC。使用這個兩個框架,已經能解決大部分的問題,但是,也有一些需求,這些框架是不夠的,這篇文章,我們先分析一下HTTP 和 RPC服務器的特點, 然后結合這兩個服務器的特點,我實現了一個新的服務器,這個服務器非常適合客戶端和服務器端有大量交互的情況。 HTTP服務器的特點: HTTP的請求 和 響應的周期如下: ![](https://box.kancloud.cn/bb34f2f85a8da500642a4d216b63d58f_453x454.png) 對于一個HTTP 長連接,一個請求必須等到一個響應完成后,才能進行下一個請求。這就是http協議最本質的特點,是串行化的。而這個特點保證了http協議的簡潔性,一個請求中間不會插入其他的請求干擾,這樣不需要去對應請求和響應。但是,同時也有個弱點,那就是不適合做大量的請求。舉個實際中我們遇到的例子,我們要把大量的中國客戶的訂單送入英國的交易所,交易所的接口是http協議的,從中國到英國,一次http的請求到響應至少需要 300ms左右,這樣一秒一個連只能發送3個,就算是開十個線程發送(接口對線程總數是有限制的),1s 也只能是30個。而最高峰的時候,我們可能1s 要發送1萬個訂單,那采用http協議就不能滿足我們的要求了(這個可以通過fix協議解決)。 當然,http可以解決批量提交的需求,只要增加一個批量提交的接口就可以了。但是,這樣的實現方式不夠自然,而且增加了額外的接口。 RPC服務的特點: PRC服務器克服了http服務器串流模型,可以并發的提交請求。請求響應的周期圖如下: ![](https://box.kancloud.cn/4c3a6074d4a518def27cb0bbee687aee_498x484.png) RPC服務,已經可以客服http服務器的串流的劣勢,可以批量提交大量的數據。在局域網的中測試,1s鐘可以實現3萬次左右的請求。而相同的條件下,http在局域網中,只能實現1500次左右的請求,真實環境下面,延時嚴重,http性能會急劇下降。在兩個不同的機房中,有百兆帶寬相連,實際測試rpc請求是兩萬次左右,http是 500次左右,而且http占用很多頭部的帶寬。 RPC的一個核心特點是類似一次函數調用。這樣一個請求 只能 對應于 一個響應。在某些情下,這似乎是不夠的。舉個實際的例子,我要獲取一個報價的行情數據,這個時候,類似一個MessageQueue,服務器會不斷的push數據給客戶端。也就是一次請求,會有多次返回,持續不斷的返回。 當然,RPC的一個非常重要的優勢是,你不需要知道怎么去解析數據,你可以當做網絡是空氣,完全像寫本地調用函數一樣去調用rpc的函數。 異步服務器: 因為暫時我沒有很好的名字來命名這個服務器,所以暫時就叫做異步服務器吧,這個服務器的特點類似一個界面程序的消息體系。我們不斷的吧鼠標鍵盤等各種事件提交給界面程序,界面程序根據消息的類型,參數做出相應的處理。所以,我們就叫做異步服務器吧。經典的金融服務器都是異步服務器,處理機制都類似界面的消息循環機制,比如國內期貨最常用的ctp交易系統,還有就是銀行間,交易所和銀行之間,經常用的一個協議叫做 fix,也是這樣的架構。請求是一種消息,響應也是一種消息。請求響應的時序圖如下: ![](https://box.kancloud.cn/25d04091a2dde55eaeee844f1ed87af0_644x466.png) msg1 請求之后,有兩個響應,Resp1 , resp2, msg2 有一個響應 resp3. 借鑒了rpc的特點,請求和響應都自動編碼,寫服務器不再為編碼而煩惱,同時也不需要為是否要壓縮而頭痛。現在提供三種方式,gob , json, protocolbuffer. 并且可以 設置是否啟用壓縮的,以及壓縮的格式。我 們把客戶端和服務器的交互抽象為一個消息系統,先來看看客戶端客戶端調用: ~~~ 1: client, err := NewClient("http://localhost:8080", jar, "gob", "gzip") 2: if err != nil { 3: log.Println(err) 4: return 5: } 6: defer client.Close() 7: req := NewRequest("hello", "jack", func(call *Call, status int) { 8: log.Println(call, call.Resp, status) 9: }) 10: client.Go(req) 11: req2 := NewRequest("hello", "fuck", func(call *Call, status int) { 12: log.Println(call, call.Resp, status) 13: }) 14: client.Go(req2) 15: //wait for all req is done 16: client.Wait() ~~~ 1-6行,我們建立了一個到服務器的連接,注意,我們這個服務器底層是用http包實現的。jar 是用來管理session的,這里暫時忽略,gob是編碼,gzip是壓縮格式。可以動態設置各種編碼和壓縮格式。 7-13行,NewRequest 的第一個參數是消息的類型(我建議再后面的版本中,改成NewMessage, Client.GO 改成 client.Send),叫做hello, 詳細類型為了方便查看也打印,我采用字符串的格式。后面是消息的參數,可以是任何的go的結構,變量。每個請求對應一個回調函數,處理響應的消息,響應的消息保存在 call.Resp 里面,如果status == StatusDone , 表示請求結束了,服務器不會響應任何消息了,status == StatusUpdate ,說明,還會有下一個消息過來。 16行 Wait函數,其實就是一個消息循環函數,不斷的從服務器端讀取消息,對應到某個請求的回調函數里面。類似event loop 我們在Client里面加入心跳函數,保證能檢查到鏈接損壞的情況,如果連接損壞,會自動結束消息循環,錯誤處理是一個服務器非常重要的一環。 然后我們再來看看服務器端的實現: ~~~ 1: func helloWorld(w *ResponseWriter, r *Request) { 2: resp := w.Resp 3: resp.MsgType = MsgTString 4: //表示我已經沒有其他數據包了,這個請求已經結束了 5: resp.Done = true 6: //向客戶端發送請求 7: w.WriteResponse(resp, "hello: " + r.GetBody().(string)) 8: } ~~~ 第7行中,r.GetBody() 獲取的到是上面NewRequest 中的第二個參數。 這樣就是一個最簡單的hello world 程序。要實現一個實戰有用的服務器,的細節當然還有很多,主要的是流量控制。比如,一個用戶寫錯程序了,錯誤的發起了10萬個請求,服務器端不能開個10萬個go進行處理,這樣的話,會直接拖垮服務器,我們給每個用戶設置了一個并發處理數目,最多這個用戶可以并發處理多少個請求。還有一個比較重要的,對服務器來說,就是服務器服務的量的限制。我們會實時監控 cpu 內存,io的使用情況,當發現使用到某個限額的時候,服務會拒絕接受連接(事先要對性能進行測試)這些都是為了防止服務器過載 ,而實際中的服務器,這個問題其實是很常見的。 實例:可靠消息通知系統。 可靠消息通知系統實際上是一個非常常見的系統。最常用的一個例子就是數據庫的master slave 模式。master里面的事件要非常可靠的通知到slave,中間不能有任何的丟失。還有一種比如交易系統中,我們會調用銀行或者交易所的接口,銀行在交易成功后會給我們一個通知,這個通知的消息必須可靠的被通知到目標,不能有任何的丟失。在我們的系統中,行情數據的復制也是不能有任何數據丟失的情景,為了保證A 服務器 和 B服務器有相同的行情,在從A服務器的消息要被B服務器準確的接收。當然,你也可以做一個聊天系統,這個聊天系統不會丟失任何消息。 那么如何實現這個系統呢,首先,為了保證不在內存中丟失消息,那么消息必須寫盤,并且為了檢測消息是否丟失,必須給消息編號。消息寫盤也可以用我們開發的事務日志系統,如果消息非常的大量,那么還需要批量提交模式(Group Commit)。大部分情況下,消息丟失不是因為服務器崩潰,而且網絡意外中斷,這些中斷往往時間很短,在1分鐘以內,所以,有必要在內存中緩存部分的消息,如果網絡中斷,客戶端再次請求時,發送當時的消息序號,這樣就可以補全網絡中斷丟失的數據。如果時間太長了,內存中的數據不夠補了,那么首先要從消息源數據庫中下載歷史消息,然后再接受實時的消息。整體的思路就是這樣的,在這里,我們就看看我們的消息通知系統的實時廣播部分的設計。 1. 消息廣播基本流程: 訂閱 –> 廣播: 首先客戶端向服務器說明,我要訂閱哪些消息,比如,master slave 中,我只要寫消息就好了,讀消息就不需要了。然后,再向服務器請求數據,服務器廣播數據給我們。注意,我們這里把訂閱 和 廣播分成兩個部分,兩個請求,那么怎么知道這兩個請求是同一個人發出的呢?或者,怎么關聯起來呢?這里,我用了一個session的概念,訂閱的時候,把訂閱的消息類型保存到session,廣播的時候,從session中讀取消息類型,然后發送對應的數據。 這部分的代碼如下: ~~~ 1: var bmu sync.Mutex 2: var defaultBroadcast = make(map[int64]*Broadcast) 3: var ErrNotRingItemer = errors.New("ErrNotRingItemer") 4: //基本上可以保證有1個小時的數據 5: const btickSize = 3600 * 4 6: //可以傳遞任意的數據 7: 8: func GetBroadcast(name int64, n int) (*Broadcast, error) { 9: bmu.Lock() 10: defer bmu.Unlock() 11: b, ok := defaultBroadcast[name] 12: if ok { 13: return b, nil 14: } 15: b , err := NewBroadcast(name, n) 16: if err != nil { 17: return nil, err 18: } 19: defaultBroadcast[name] = b 20: return b, nil 21: } 22: 23: type Broadcast struct { 24: mu sync.RWMutex 25: targets map[int64]*Subscribe 26: ringbuffer *algo.RingBuffer 27: name int64 28: } 29: 30: func NewBroadcast(name int64, n int) (*Broadcast, error) { 31: b := &Broadcast{} 32: b.targets = make(map[int64]*Subscribe) 33: b.ringbuffer = algo.NewRingBuffer(n, nil) 34: b.name = name 35: return b, nil 36: } 37: 38: func (b *Broadcast) GetName() int64 { 39: return b.name 40: } 41: 42: func (b *Broadcast) Sub(id int64, req *Subscribe) { 43: b.mu.Lock() 44: defer b.mu.Unlock() 45: b.targets[id] = req 46: } 47: 48: func (b *Broadcast) Unsub(id int64) { 49: b.mu.Lock() 50: defer b.mu.Unlock() 51: delete(b.targets, id) 52: } 53: 54: //是否在buffer內部 55: func (b *Broadcast) InBuffer(start int64, end int64) (bool, error) { 56: return b.ringbuffer.InBuffer(start, end) 57: } 58: 59: func (b *Broadcast) Query(start int64, end int64, ty int64) (algo.Iterator, error) { 60: find := &algo.RingFind{start, end, ty} 61: return b.ringbuffer.Find(find, true) //模糊查找,不是精確匹配 62: } 63: 64: //如果要提供查詢功能,那么就要緩存數據,一般采用ringbuffer 65: //data要滿足下面的條件: 66: //1. 存在一個遞增著的ID 67: //2. 實現BufferItemer接口 68: func (b *Broadcast) Push(item algo.RingItemer) error { 69: b.mu.RLock() 70: defer b.mu.RUnlock() 71: item2, err := b.ringbuffer.Push(item) 72: if err != nil { 73: return err 74: } 75: for _, v := range b.targets { 76: //過濾不想發送的 77: if (v.Check(b.name, item2.Type)) { 78: v.Send(item) 79: } 80: } 81: return nil 82: } 83: 84: func (b *Broadcast) Find(find *algo.RingFind) (algo.Iterator, error) { 85: return b.ringbuffer.Find(find, true) 86: } 87: 88: type Subscribe struct { 89: mu sync.Mutex 90: ch chan interface{} 91: tys map[int64]int64 92: } 93: 94: func NewSubscribe(n int) (*Subscribe) { 95: s := &Subscribe{} 96: s.ch = make(chan interface{}, n) 97: s.tys = make(map[int64]int64) 98: return s 99: } 100: 101: func (s *Subscribe) Add(bname int64, ty int64) { 102: s.mu.Lock() 103: defer s.mu.Unlock() 104: s.tys[bname] = ty 105: } 106: 107: func (s *Subscribe) Check(bname int64, dataty int64) bool { 108: s.mu.Lock() 109: defer s.mu.Unlock() 110: ty, ok := s.tys[bname] 111: if !ok { //沒有訂閱 112: return false 113: } 114: if ty == algo.AnyType || dataty == ty { 115: return true 116: } 117: return false 118: } 119: 120: func (s *Subscribe) Read(buf []interface{}) (int) { 121: var i = 1 122: buf[0] = <-s.ch 123: for { 124: if i == len(buf) { 125: return i 126: } 127: select { 128: case data := <-s.ch: 129: buf[i] = data 130: i++ 131: default: 132: return i 133: } 134: } 135: panic("nerver reach") 136: } 137: 138: func (s *Subscribe) Send(data interface{}) { 139: select { 140: case s.ch <- data : 141: default: 142: //清除舊的數據 143: s.Clear() 144: //發送結束標志位 145: s.ch <- nil 146: } 147: } 148: 149: func (s *Subscribe) Clear() { 150: for { 151: select { 152: case <-s.ch: 153: default: 154: return 155: } 156: } 157: } 158: ~~~ 這里,有個數據結構叫做RingBuffer, 是一個環狀的buffer,非常適合做緩存固定數目的數據,用于廣播。廣播是用管道來傳輸數據的,管道的性能實際上已經非常的高,不需要什么無鎖隊列之類的。在這里也給管道加上buffer使得,消息意外的擾動,不會使得帶寬不夠用而立馬堵塞。 2. 接受消息: 在用戶登錄后,如果有權限,那么就可以作為消息源客戶端,消息源的代碼如下: ~~~ 1: func pushTick(w *asyn.ResponseWriter, r *asyn.Request) { 2: event := r.GetBody().(*response.OrderBookEvent) 3: b, _ := GetBroadcast(event.InstrumentId, btickSize) 4: b.Push(event) 5: asyn.Log().Println(event) 6: asyn.OKHandle(w, r) 7: } ~~~ 第2行: 從請求中獲取 消息事件。 第3行: event.InstrumentId 是消息的類型,btickSzie 是緩存的數據數目。 第6行: 向客戶端發送OK,確認消息發送成功。 每個消息是否發送成功,都有確認。這樣,客戶端就知道上次消息發送到哪里了。 3. 訂閱: ~~~ 1: func subscribe(w *asyn.ResponseWriter, r *asyn.Request) { 2: instId := r.GetBody().(int64) 3: log.Println("sub", instId) 4: b, err := GetBroadcast(instId, btickSize) 5: if err != nil { 6: r.SetErr(err) 7: asyn.ErrorHandle(w, r) 8: return 9: } 10: //訂閱的size 11: //get and set 要成為一個原子操作 12: session := r.GetSession() 13: session.Get3("subscribe", func (data interface{}) interface{} { 14: if data == nil { 15: data = NewSubscribe(4096) 16: } 17: sub := data.(*Subscribe) 18: //廣播, 類型 19: id := int64(uintptr(unsafe.Pointer(session))) 20: sub.Add(instId, algo.AnyType) 21: b.Sub(id, sub) 22: session.OnDelete(func () { 23: b.Unsub(id) 24: }) 25: return sub 26: }) 27: asyn.OKHandle(w, r) 28: } ~~~ 第2行:獲取消息的類型,通過這個類型,可以找到對應的廣播對象。 第12-30行:這是一個線程安全的session操作,具體看一下session.Get3 的實現就知道了: ~~~ 1: func (s *Session) Get3(name string, callback func (interface{}) interface{}) interface{} { 2: s.mu.Lock() 3: defer s.mu.Unlock() 4: data, err := s.get(name) 5: if err != nil { 6: data = nil 7: } 8: data = callback(data) 9: s.set(name, data) 10: return data 11: } ~~~ s.get 獲取session的數據,如果沒有session數據,那么為nil。簡單的說,這里的意思是:如果session “subscribe” 如果還沒有設置,那么就新建一個對象,如果已經設置了,那么讀取這個對象,并且,這個操作是線程安全的。 這里還添加了一個session撤銷時候的操作。 4. 廣播: ~~~ 1: //讀取廣播數據 2: func read(w *asyn.ResponseWriter, r *asyn.Request) { 3: session := r.GetSession() 4: //從session 中獲取subscribe 對象 5: sub := session.Get3("subscribe", func (data interface{}) interface{} { 6: if data == nil { 7: data = NewSubscribe(4096) 8: } 9: return data 10: }).(*Subscribe) 11: depth := r.GetBody().(int) 12: log.Println("get subscribe") 13: resp := w.Resp 14: if depth == 0 { 15: resp.MsgType = "ticks" 16: } else { 17: resp.MsgType = "ticks1" 18: } 19: buf := make([]interface{}, 1024) 20: dg := make([]*response.OrderBookEvent, 1024) 21: tick1 := make([]*base.TickGo, 1024) 22: for { 23: n := sub.Read(buf) 24: for i := 0; i < n; i++ { 25: if buf[i] == nil { 26: //close by broadcast 27: r.SetErr(errors.New("501")) 28: asyn.ErrorHandle(w, r) 29: return 30: } 31: if depth == 0 { 32: dg[i] = buf[i].(*response.OrderBookEvent) 33: } else { 34: tick1[i] = buf[i].(*response.OrderBookEvent).ToTickGo() 35: } 36: } 37: var err error 38: if depth == 0 { 39: err = w.WriteResponse(resp, dg[:n]) 40: } else { 41: err = w.WriteResponse(resp, tick1[:n]) 42: } 43: if err != nil { 44: r.SetErr(err) 45: asyn.ErrorHandle(w, r) 46: return 47: } 48: } 49: } ~~~ ~~~ read 有個depth參數,這是行情的深度。股票期貨里面都有后這個概念。傳說中的幾檔行情。 第26行:這里有個close。一般來說,是因為網絡擁堵 或者 異常,無法發送數據了。 還有一點要注意,這里的行情是批量發送的。sub.Read 盡可能多的讀取數據,減少網絡io的次數。 當然,服務器框架本身提供了心跳機制,對消息廣播系統,實時性是非常重要的,即時的檢查出網絡異常,才能保證實時性。 以上是對我們的異步消息服務器框架的一個簡單的介紹。設計這框架,非常重要的兩個理念: 1. 模塊化的設計,一個功能,就對應一個函數。 2. 模塊之間的通訊采用session,而對于比較復雜的通訊,可以自己建立一個線程安全的數據結構,比如這里的Broadcast 和 Subscribe ~~~ ~~~ ![](https://box.kancloud.cn/032176b8778dbc41f1ce6b79a83d59c1_900x350.jpg) ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看