<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                [TOC] > [參考 github/godis](https://github.com/HDT3213/godis) > [說明](https://www.cnblogs.com/Finley/p/12590718.html#ttl) ## 實例 <details> <summary>tcp/atomic.go</summary> ``` package tcp import ( "sync/atomic" ) type AtomicBool uint32 func (b *AtomicBool)Get()bool { return atomic.LoadUint32((*uint32)(b)) != 0 } func (b *AtomicBool)Set(v bool) { if v { atomic.StoreUint32((*uint32)(b), 1) } else { atomic.StoreUint32((*uint32)(b), 0) } } type AtomicInt32 struct{ v int32 } func (i *AtomicInt32) Load() int32 { return atomic.LoadInt32(&i.v) } func (i *AtomicInt32) Add(n int32) int32 { return atomic.AddInt32(&i.v, n) } func (i *AtomicInt32) Sub(n int32) int32 { return atomic.AddInt32(&i.v, -n) } func (i *AtomicInt32) Inc() int32 { return i.Add(1) } func (i *AtomicInt32) Dec() int32 { return i.Sub(1) } func (i *AtomicInt32) CAS(old, new int32) bool { return atomic.CompareAndSwapInt32(&i.v, old, new) } func (i *AtomicInt32) Store(n int32) { atomic.StoreInt32(&i.v, n) } func (i *AtomicInt32) Swap(n int32) int32 { return atomic.SwapInt32(&i.v, n) } ``` </details> <br/> <details> <summary>tcp/client.go</summary> ``` package tcp import ( "bufio" "context" "fmt" "io" logger "log" "net" "sync" "time" ) // 客戶端連接的抽象 type Client struct { // tcp 連接 Conn net.Conn // 當服務端開始發送數據時進入waiting, 阻止其它goroutine關閉連接 // wait.Wait是作者編寫的帶有最大等待時間的封裝: // https://github.com/HDT3213/godis/blob/master/src/lib/sync/wait/wait.go Waiting Wait } // 關閉客戶端連接 func (c *Client)Close()error { // 等待數據發送完成或超時 c.Waiting.WaitWithTimeout(10 * time.Second) c.Conn.Close() return nil } type ClientMap struct { // 保存所有工作狀態client的集合(把map當set用) // 需使用并發安全的容器 activeConn sync.Map // 和 tcp server 中作用相同的關閉狀態標識位 closing AtomicBool count AtomicInt32 } func NewHandlerMap() *ClientMap { return &ClientMap{ } } func (h *ClientMap)Handle(ctx context.Context, conn net.Conn) { logger.Println("creat new conn",conn.RemoteAddr()) if h.closing.Get() { // closing handler refuse new connection fmt.Println("closing handler refuse new connection") conn.Close() } client := &Client { Conn: conn, } h.activeConn.Store(client, 1) h.count.Inc() fmt.Printf("count client is %+v\n",h.count.Load() ) reader := bufio.NewReader(conn) for { msg, err := reader.ReadBytes('\n') if err != nil { if err == io.EOF { logger.Println("connection close") } else { logger.Println("close connection", err) } h.delete(client) return } // 發送數據前先置為waiting狀態 client.Waiting.Add(1) // 模擬關閉時未完成發送的情況 //logger.Println("sleeping") //time.Sleep(10 * time.Second) conn.Write(msg) // 發送完畢, 結束waiting client.Waiting.Done() } } func (h *ClientMap) delete(client *Client) { h.activeConn.Delete(client) h.count.Dec() client.Close() } // deprecated func (h *ClientMap)Close()error { logger.Println("handler shuting down...") h.closing.Set(true) // TODO: concurrent wait h.activeConn.Range(func(key interface{}, val interface{})bool { client := key.(*Client) client.Close() return true }) return nil } ``` </details> <br/> <details> <summary>tcp/server.go</summary> ``` package tcp import ( "context" "fmt" logger "log" "net" "os" "os/signal" "sync" "syscall" "time" ) type Config struct { Address string `yaml:"address"` MaxConnect uint32 `yaml:"max-connect"` Timeout time.Duration `yaml:"timeout"` } func ListenAndServe(cfg *Config, clientMap *ClientMap) { listener, err := net.Listen("tcp", cfg.Address) if err != nil { logger.Fatal(fmt.Sprintf("listen err: %v", err)) } // 監聽中斷信號 // atomic.AtomicBool 是作者寫的封裝: https://github.com/HDT3213/godis/blob/master/src/lib/sync/atomic/bool.go var closing AtomicBool sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) go func() { sig := <-sigCh switch sig { case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT: // 收到中斷信號后開始關閉流程 logger.Println("shuting down...") // 設置標志位為關閉中, 使用原子操作保證線程可見性 closing.Set(true) // 先關閉 listener 阻止新連接進入 // listener 關閉后 listener.Accept() 會立即返回錯誤 _ = listener.Close() // 逐個關閉已建立鏈接 _ = clientMap.Close() } }() logger.Println(fmt.Sprintf("bind: %s, start listening...", cfg.Address)) defer func() { // 在出現未知錯誤或panic后保證正常關閉 // 這里存在一個問題是: 當應用正常關閉后會再次執行關閉操作 _ = listener.Close() _ = clientMap.Close() }() ctx, _ := context.WithCancel(context.Background()) // waitGroup 的計數是當前仍存在的連接數 // 進入關閉流程時,主協程應該等待所有連接都關閉后再退出 var waitDone sync.WaitGroup for { conn, err := listener.Accept() if err != nil { if closing.Get() { // 收到關閉信號后進入此流程,此時listener已被監聽系統信號的 goroutine 關閉 logger.Println("waiting disconnect...") // 主協程應等待應用層服務器完成工作并關閉鏈接 waitDone.Wait() return } logger.Println(fmt.Sprintf("accept err: %v", err)) continue } // 創建一個新協程處理鏈接 logger.Println("accept link") go func() { defer func() { waitDone.Done() }() waitDone.Add(1) clientMap.Handle(ctx, conn) }() } } ``` </details> <br/> <details> <summary>tcp/wait.go</summary> ``` package tcp import ( "sync" "time" ) type Wait struct { wg sync.WaitGroup } func (w *Wait) Add(delta int) { w.wg.Add(delta) } func (w *Wait) Done() { w.wg.Done() } func (w *Wait) Wait() { w.wg.Wait() } // return isTimeout func (w *Wait) WaitWithTimeout(timeout time.Duration) bool { c := make(chan bool) go func() { defer close(c) w.wg.Wait() c <- true }() select { // 正常 case <-c: return false // 超時 case <-time.After(timeout): return true } } ``` </details> <br/> <details> <summary>server.go</summary> ``` package main import ( "idcpj/tcp" ) func main() { conf :=&tcp.Config{ Address: "127.0.0.1:7000", MaxConnect: 10, Timeout: 0, } handler := tcp.NewHandlerMap() tcp.ListenAndServe(conf,handler) } ``` </details> <br/> <details> <summary>client.php</summary> ``` package main import ( "io" "log" "net" "strconv" "time" ) func main() { for i:=0; i<50; i++ { go client(i) time.Sleep(1*time.Microsecond) } select{} } func client(i int) { conn, err := net.Dial("tcp", "127.0.0.1:7000") defer conn.Close() if err != nil { panic(err) } go func() { for { conn.Write([]byte("hello word "+strconv.Itoa(i)+"\n")) time.Sleep(100000 * time.Millisecond) } }() b := make([]byte, 2048) for { n, err := conn.Read(b) if err != nil { if err == io.EOF { return } else { log.Print("conn read error", err.Error()) } } log.Println(string(b[:n])) } } ``` </details> <br/> 執行 ``` > go run server.go > go run client.go ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看