協議
websocket是個二進制協議,需要先通過Http協議進行握手,從而協商完成從Http協議向websocket協議的轉換。一旦握手結束,當前的TCP連接后續將采用二進制websocket協議進行雙向雙工交互,自此與Http協議無關。
可以通過這篇知乎了解一下websocket協議的基本原理:[《WebSocket 是什么原理?為什么可以實現持久連接?》](https://www.zhihu.com/question/20215561)。
粘包
我們開發過TCP服務的都知道,需要通過協議decode從TCP字節流中解析出一個一個請求,那么websocket又怎么樣呢?
websocket以message為單位進行通訊,本身就是一個在TCP層上的一個分包協議,其實并不需要我們再進行粘包處理。但是因為單個message可能很大很大(比如一個視頻文件),那么websocket顯然不適合把一個視頻作為一個message傳輸(中途斷了前功盡棄),所以websocket協議其實是支持1個message分多個frame幀傳輸的。
我們的瀏覽器提供的編程API都是message粒度的,把frame拆幀的細節對開發者隱蔽了,而服務端websocket框架一般也做了同樣的隱藏,會自動幫我們收集所有的frame后拼成messasge再回調,所以結論就是:
websocket以message為單位通訊,不需要開發者自己處理粘包問題。
golang實現
golang官方標準庫里有一個websocket的包,但是它提供的就是frame粒度的API,壓根不能用。
不過官方其實已經認可了一個準標準庫實現,它實現了message粒度的API,讓開發者不需要關心websocket協議細節,開發起來非常方便,其文檔地址:https://godoc.org/github.com/gorilla/websocket。
開發websocket服務時,首先要基于http庫對外暴露接口,然后由websocket庫接管TCP連接進行協議升級,然后進行websocket協議的數據交換,所以開發時總是要用到http庫和websocket庫。
上述websocket文檔中對開發websocket服務有明確的注意事項要求,主要是指:
讀和寫API不是并發安全的,需要啟動單個goroutine串行處理。
關閉API是線程安全的,一旦調用則阻塞的讀和寫API會出錯返回,從而終止處理。
在我的實現中,我對websocket進行了封裝,簡化應用層開發的復雜度,主要思路是:
請求和應答都放入管道中排隊。
讀協程阻塞讀websocket,將message放入請求隊列。
寫協程阻塞讀應答channel,將message寫給websocket。
如何處理websocket錯誤和主動關閉websocket呢?
讀/寫協程調用websocket若返回錯誤,那么直接調用websocket的Close關閉連接,協程退出。(此時用戶可能仍舊持有連接對象,繼續向下閱讀!)
websocket連接關閉后,用戶通常正阻塞在讀/寫channel上而不知情,所以每個連接配套一個closeChan專門用于喚醒用戶代碼,關閉websocket連接同時關閉closeChan,這會令<-closeChan總是立即返回。
因為上一條設計,所以用戶讀/寫channel時總是select同時監聽channel和closeChan,以便實時感知到websocket連接的關閉。
用戶可以主動關閉連接,websocket連接重復Close沒有影響,而closeChan重復關閉會報錯,所以通過一個上鎖的狀態位判重處理。
描述比較繁瑣,實際并不復雜,看看我的代碼吧:
https://github.com/owenliang/go-websocket。
server.go
~~~
package main
import (
"errors"
"fmt"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
// http升級websocket協議的配置
var wsUpgrader = websocket.Upgrader{
// 允許所有CORS跨域請求
CheckOrigin: func(r *http.Request) bool {
return true
},
}
// 客戶端讀寫消息
type wsMessage struct {
messageType int
data []byte
}
// 客戶端連接
type wsConnection struct {
wsSocket *websocket.Conn // 底層websocket
inChan chan *wsMessage // 讀隊列
outChan chan *wsMessage // 寫隊列
mutex sync.Mutex // 避免重復關閉管道
isClosed bool
closeChan chan byte // 關閉通知
}
func (wsConn *wsConnection) wsReadLoop() {
for {
// 讀一個message
msgType, data, err := wsConn.wsSocket.ReadMessage()
if err != nil {
goto error
}
req := &wsMessage{
msgType,
data,
}
// 放入請求隊列
select {
case wsConn.inChan <- req:
case <-wsConn.closeChan:
goto closed
}
}
error:
wsConn.wsClose()
closed:
}
func (wsConn *wsConnection) wsWriteLoop() {
for {
select {
// 取一個應答
case msg := <-wsConn.outChan:
// 寫給websocket
if err := wsConn.wsSocket.WriteMessage(msg.messageType, msg.data); err != nil {
goto error
}
case <-wsConn.closeChan:
goto closed
}
}
error:
wsConn.wsClose()
closed:
}
func (wsConn *wsConnection) procLoop() {
// 啟動一個gouroutine發送心跳
go func() {
for {
time.Sleep(2 * time.Second)
if err := wsConn.wsWrite(websocket.TextMessage, []byte("heartbeat from server")); err != nil {
fmt.Println("heartbeat fail")
wsConn.wsClose()
break
}
}
}()
// 這是一個同步處理模型(只是一個例子),如果希望并行處理可以每個請求一個gorutine,注意控制并發goroutine的數量!!!
for {
msg, err := wsConn.wsRead()
if err != nil {
fmt.Println("read fail")
break
}
fmt.Println(string(msg.data))
err = wsConn.wsWrite(msg.messageType, msg.data)
if err != nil {
fmt.Println("write fail")
break
}
}
}
func wsHandler(resp http.ResponseWriter, req *http.Request) {
// 應答客戶端告知升級連接為websocket
wsSocket, err := wsUpgrader.Upgrade(resp, req, nil)
if err != nil {
return
}
wsConn := &wsConnection{
wsSocket: wsSocket,
inChan: make(chan *wsMessage, 1000),
outChan: make(chan *wsMessage, 1000),
closeChan: make(chan byte),
isClosed: false,
}
// 處理器
go wsConn.procLoop()
// 讀協程
go wsConn.wsReadLoop()
// 寫協程
go wsConn.wsWriteLoop()
}
func (wsConn *wsConnection) wsWrite(messageType int, data []byte) error {
select {
case wsConn.outChan <- &wsMessage{messageType, data}:
case <-wsConn.closeChan:
return errors.New("websocket closed")
}
return nil
}
func (wsConn *wsConnection) wsRead() (*wsMessage, error) {
select {
case msg := <-wsConn.inChan:
return msg, nil
case <-wsConn.closeChan:
}
return nil, errors.New("websocket closed")
}
func (wsConn *wsConnection) wsClose() {
wsConn.wsSocket.Close()
wsConn.mutex.Lock()
defer wsConn.mutex.Unlock()
if !wsConn.isClosed {
wsConn.isClosed = true
close(wsConn.closeChan)
}
}
func main() {
http.HandleFunc("/ws", wsHandler)
http.ListenAndServe("0.0.0.0:7777", nil)
}
~~~
client.html
~~~
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<script>
window.addEventListener("load", function(evt) {
var output = document.getElementById("output");
var input = document.getElementById("input");
var ws;
var print = function(message) {
var d = document.createElement("div");
d.innerHTML = message;
output.appendChild(d);
};
document.getElementById("open").onclick = function(evt) {
if (ws) {
return false;
}
ws = new WebSocket("ws://localhost:7777/ws");
ws.onopen = function(evt) {
print("OPEN");
}
ws.onclose = function(evt) {
print("CLOSE");
ws = null;
}
ws.onmessage = function(evt) {
print("RESPONSE: " + evt.data);
}
ws.onerror = function(evt) {
print("ERROR: " + evt.data);
}
return false;
};
document.getElementById("send").onclick = function(evt) {
if (!ws) {
return false;
}
print("SEND: " + input.value);
ws.send(input.value);
return false;
};
document.getElementById("close").onclick = function(evt) {
if (!ws) {
return false;
}
ws.close();
return false;
};
});
</script>
</head>
<body>
<table>
<tr><td valign="top" width="50%">
<p>Click "Open" to create a connection to the server,
"Send" to send a message to the server and "Close" to close the connection.
You can change the message and send multiple times.
</p>
<form>
<button id="open">Open</button>
<button id="close">Close</button>
<input id="input" type="text" value="Hello world!">
<button id="send">Send</button>
</form>
</td><td valign="top" width="50%">
<div id="output"></div>
</td></tr></table>
</body>
</html>
~~~
首先運行server.go,然后打開client.html頁面,即可體驗所有流程:
- 序言
- 目錄
- 環境搭建
- Linux搭建golang環境
- Windows搭建golang環境
- Mac搭建golang環境
- 介紹
- 1.Go語言的主要特征
- 2.golang內置類型和函數
- 3.init函數和main函數
- 4.包
- 1.工作空間
- 2.源文件
- 3.包結構
- 4.文檔
- 5.編寫 Hello World
- 6.Go語言 “ _ ”(下劃線)
- 7.運算符
- 8.命令
- 類型
- 1.變量
- 2.常量
- 3.基本類型
- 1.基本類型介紹
- 2.字符串String
- 3.數組Array
- 4.類型轉換
- 4.引用類型
- 1.引用類型介紹
- 2.切片Slice
- 3.容器Map
- 4.管道Channel
- 5.指針
- 6.自定義類型Struct
- 編碼格式轉換
- 流程控制
- 1.條件語句(if)
- 2.條件語句 (switch)
- 3.條件語句 (select)
- 4.循環語句 (for)
- 5.循環語句 (range)
- 6.循環控制Goto、Break、Continue
- 函數
- 1.函數定義
- 2.參數
- 3.返回值
- 4.匿名函數
- 5.閉包、遞歸
- 6.延遲調用 (defer)
- 7.異常處理
- 8.單元測試
- 壓力測試
- 方法
- 1.方法定義
- 2.匿名字段
- 3.方法集
- 4.表達式
- 5.自定義error
- 接口
- 1.接口定義
- 2.執行機制
- 3.接口轉換
- 4.接口技巧
- 面向對象特性
- 并發
- 1.并發介紹
- 2.Goroutine
- 3.Chan
- 4.WaitGroup
- 5.Context
- 應用
- 反射reflection
- 1.獲取基本類型
- 2.獲取結構體
- 3.Elem反射操作基本類型
- 4.反射調用結構體方法
- 5.Elem反射操作結構體
- 6.Elem反射獲取tag
- 7.應用
- json協議
- 1.結構體轉json
- 2.map轉json
- 3.int轉json
- 4.slice轉json
- 5.json反序列化為結構體
- 6.json反序列化為map
- 終端讀取
- 1.鍵盤(控制臺)輸入fmt
- 2.命令行參數os.Args
- 3.命令行參數flag
- 文件操作
- 1.文件創建
- 2.文件寫入
- 3.文件讀取
- 4.文件刪除
- 5.壓縮文件讀寫
- 6.判斷文件或文件夾是否存在
- 7.從一個文件拷貝到另一個文件
- 8.寫入內容到Excel
- 9.日志(log)文件
- server服務
- 1.服務端
- 2.客戶端
- 3.tcp獲取網頁數據
- 4.http初識-瀏覽器訪問服務器
- 5.客戶端訪問服務器
- 6.訪問延遲處理
- 7.form表單提交
- web模板
- 1.渲染終端
- 2.渲染瀏覽器
- 3.渲染存儲文件
- 4.自定義io.Writer渲染
- 5.模板語法
- 時間處理
- 1.格式化
- 2.運行時間
- 3.定時器
- 鎖機制
- 互斥鎖
- 讀寫鎖
- 性能比較
- sync.Map
- 原子操作
- 1.原子增(減)值
- 2.比較并交換
- 3.導入、導出、交換
- 加密解密
- 1.md5
- 2.base64
- 3.sha
- 4.hmac
- 常用算法
- 1.冒泡排序
- 2.選擇排序
- 3.快速排序
- 4.插入排序
- 5.睡眠排序
- 設計模式
- 創建型模式
- 單例模式
- 抽象工廠模式
- 工廠方法模式
- 原型模式
- 結構型模式
- 適配器模式
- 橋接模式
- 合成/組合模式
- 裝飾模式
- 外觀模式
- 享元模式
- 代理模式
- 行為性模式
- 職責鏈模式
- 命令模式
- 解釋器模式
- 迭代器模式
- 中介者模式
- 備忘錄模式
- 觀察者模式
- 狀態模式
- 策略模式
- 模板模式
- 訪問者模式
- 數據庫操作
- golang操作MySQL
- 1.mysql使用
- 2.insert操作
- 3.select 操作
- 4.update 操作
- 5.delete 操作
- 6.MySQL事務
- golang操作Redis
- 1.redis介紹
- 2.golang鏈接redis
- 3.String類型 Set、Get操作
- 4.String 批量操作
- 5.設置過期時間
- 6.list隊列操作
- 7.Hash表
- 8.Redis連接池
- golang操作ETCD
- 1.etcd介紹
- 2.鏈接etcd
- 3.etcd存取
- 4.etcd監聽Watch
- golang操作kafka
- 1.kafka介紹
- 2.寫入kafka
- 3.kafka消費
- golang操作ElasticSearch
- 1.ElasticSearch介紹
- 2.kibana介紹
- 3.寫入ElasticSearch
- NSQ
- 安裝
- 生產者
- 消費者
- beego框架
- 1.beego框架環境搭建
- 2.參數配置
- 1.默認參數
- 2.自定義配置
- 3.config包使用
- 3.路由設置
- 1.自動匹配
- 2.固定路由
- 3.正則路由
- 4.注解路由
- 5.namespace
- 4.多種數據格式輸出
- 1.直接輸出字符串
- 2.模板數據輸出
- 3.json格式數據輸出
- 4.xml格式數據輸出
- 5.jsonp調用
- 5.模板處理
- 1.模板語法
- 2.基本函數
- 3.模板函數
- 6.請求處理
- 1.GET請求
- 2.POST請求
- 3.文件上傳
- 7.表單驗證
- 1.表單驗證
- 2.定制錯誤信息
- 3.struct tag 驗證
- 4.XSRF過濾
- 8.靜態文件處理
- 1.layout設計
- 9.日志處理
- 1.日志處理
- 2.logs 模塊
- 10.會話控制
- 1.會話控制
- 2.session 包使用
- 11.ORM 使用
- 1.鏈接數據庫
- 2. CRUD 操作
- 3.原生 SQL 操作
- 4.構造查詢
- 5.事務處理
- 6.自動建表
- 12.beego 驗證碼
- 1.驗證碼插件
- 2.驗證碼使用
- beego admin
- 1.admin安裝
- 2.admin開發
- beego 熱升級
- gin框架
- 安裝使用
- 項目
- 秒殺項目
- 日志收集
- 面試題
- 面試題一
- 面試題二
- 錯題集
- Go語言陷阱和常見錯誤
- 常見語法錯誤
- 初級
- 中級
- 高級
- Go高級應用
- goim
- goim 啟動流程
- goim 工作流程
- goim 結構體
- gopush
- gopush工作流程
- gopush啟動流程
- gopush業務流程
- gopush應用
- gopush新添功能
- rpc
- HTTP RPC
- TCP RPC
- JSON RPC
- 常見RPC開源框架
- pprof
- pprof介紹
- pprof應用
- 封裝 websocket
- zookeeper
- 基本操作測試
- 簡單的分布式server
- Zookeeper命令行使用
- cgo
- Go語言 demo
- 用Go語言計算一個人的年齡,生肖,星座
- 超簡易Go語言實現的留言板代碼
- 信號處理模塊,可用于在線加載配置,配置動態加載的信號為SIGHUP
- 陽歷和陰歷相互轉化的工具類 golang版本
- 錯誤總結