[TOC]
# go操作etcd
~~~
go get -u go.etcd.io/etcd
~~~
在import的時候 應該import “go.etcd.io/etcd/clientv3” 而不是 "github.com/coreos/etcd/clientv3"
## 連接
~~~
import (
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)
func main() {
var (
config clientv3.Config //服務器配置
client *clientv3.Client
err error
)
//客戶端配置
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
//建立連接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
defer client.Close()
fmt.Println("連接成功")
}
~~~
## kv設置
~~~
package main
import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)
func main() {
var (
config clientv3.Config //服務器配置
client *clientv3.Client //客戶端連接對象
err error
kv clientv3.KV //操作kv的對象
putResp *clientv3.PutResponse //設置kv的對象
)
//客戶端配置
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
//建立連接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
//關閉連接
defer client.Close()
//用于讀寫etcd的鍵值對
kv = clientv3.NewKV(client)
//這里面有上下文,可以用來取消他
//第三個參數可選,clientv3.WithPrevKV()表示可以查到以前的kv
if putResp, err = kv.Put(context.TODO(), "/cron/jobs/job1", "hello", clientv3.WithPrevKV()); err != nil {
fmt.Println(err)
} else {
//每次操作有個唯一的Revision,單調遞增
fmt.Println("Revision: ", putResp.Header.Revision)
//打印之前的值
if putResp.PrevKv != nil {
fmt.Println("之前的值: ", string(putResp.PrevKv.Key), "---", string(putResp.PrevKv.Value))
}
}
}
~~~
## kv讀取
~~~
package main
import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)
func main() {
var (
config clientv3.Config //服務器配置
client *clientv3.Client //客戶端連接對象
err error
kv clientv3.KV //操作kv的對象
getResp *clientv3.GetResponse //獲取kv的對象
)
//客戶端配置
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
//建立連接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
//關閉連接
defer client.Close()
//用于讀寫etcd的鍵值對
kv = clientv3.NewKV(client)
//第三個參數也是可選的 clientv3.
if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job1"); err != nil {
fmt.Println(err)
} else {
//打印出來的create_revision是創建版本,mod_revision是修改版本, version是修改的次數
fmt.Println(getResp.Kvs)
}
}
~~~
## 以什么為前綴查找
~~~
func main() {
var (
config clientv3.Config //服務器配置
client *clientv3.Client //客戶端連接對象
err error
kv clientv3.KV //操作kv的對象
getResp *clientv3.GetResponse //獲取kv的對象
)
//客戶端配置
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
//建立連接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
//關閉連接
defer client.Close()
//用于讀寫etcd的鍵值對
kv = clientv3.NewKV(client)
//第三個參數也是可選的 clientv3.
//以什么為前綴的
if getResp, err = kv.Get(context.TODO(), "/cron/jobs/", clientv3.WithPrefix()); err != nil {
fmt.Println(err)
} else {
//數組
//打印出來的create_revision是創建版本,mod_revision是修改版本, version是修改的次數
fmt.Println(getResp.Kvs)
for k, v := range getResp.Kvs {
fmt.Println(k)
fmt.Println(v)
}
}
}
~~~
## 刪除key
~~~
func main() {
var (
config clientv3.Config //服務器配置
client *clientv3.Client //客戶端連接對象
err error
kv clientv3.KV //操作kv的對象
deleteResp *clientv3.DeleteResponse //刪除的對象
)
//客戶端配置
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
//建立連接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
//關閉連接
defer client.Close()
//用于讀寫etcd的鍵值對
kv = clientv3.NewKV(client)
//刪除
//第三個參數可選,clientv3.WithPrevKV()表示會賦值deleteResp.PrevKvs
if deleteResp, err = kv.Delete(context.TODO(), "/cron/jobs/job1", clientv3.WithPrevKV()); err != nil {
fmt.Println(err)
return
}
//被刪除之前的k和v,上面第三個參數要設置,否則是沒有的
if len(deleteResp.PrevKvs) != 0 {
for _, v := range deleteResp.PrevKvs {
fmt.Println(string(v.Key))
fmt.Println(string(v.Value))
}
}
}
~~~
## 刪除多個key,以什么為前綴
~~~
if deleteResp, err = kv.Delete(context.TODO(), "/cron/jobs/", clientv3.WithPrefix()); err != nil {
fmt.Println(err)
return
}
~~~
## 刪除連續的2個key
~~~
if deleteResp, err = kv.Delete(context.TODO(), "/cron/jobs/job1", clientv3.WithFromKey(), clientv3.WithLimit(2)); err != nil {
fmt.Println(err)
return
}
~~~
## 租約
~~~
import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)
func main() {
var (
config clientv3.Config //服務器配置
client *clientv3.Client //客戶端連接對象
err error
lease clientv3.Lease //租約對象
leaseGrantResp *clientv3.LeaseGrantResponse //申請到的租約
leaseId clientv3.LeaseID //租約id
putResp *clientv3.PutResponse //PUT對象
getResp *clientv3.GetResponse //get對象
kv clientv3.KV //kv操作對象
)
//客戶端配置
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
//建立連接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
//關閉連接
defer client.Close()
//申請一個租約(lease)
lease = clientv3.NewLease(client)
//申請一個3秒的租約
if leaseGrantResp, err = lease.Grant(context.TODO(), 3); err != nil {
fmt.Println(err)
return
}
//拿到租約的id
leaseId = leaseGrantResp.ID
fmt.Println("租約id: ", leaseId)
//獲得kv對象
kv = clientv3.NewKV(client)
//put一個kv,讓他與租約關聯起來,從而實現10秒后自動過期
//第三個參數是具體的值
if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "hello", clientv3.WithLease(leaseId)); err != nil {
fmt.Println(err)
return
}
fmt.Println("寫入成功: ", putResp.Header.Revision)
//time.Sleep(5 * time.Second)
//獲取下kv
if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1"); err != nil {
fmt.Println(err)
return
}
if getResp.Count == 0 {
fmt.Println("kv過期了")
return
}
fmt.Println("讀取成功: ", getResp.Kvs)
}
~~~
## 續租
~~~
package main
import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)
func main() {
var (
config clientv3.Config //服務器配置
client *clientv3.Client //客戶端連接對象
err error
lease clientv3.Lease //租約對象
leaseGrantResp *clientv3.LeaseGrantResponse //申請到的租約
leaseId clientv3.LeaseID //租約id
putResp *clientv3.PutResponse //PUT對象
getResp *clientv3.GetResponse //get對象
kv clientv3.KV //kv操作對象
keepResp *clientv3.LeaseKeepAliveResponse //續租channel中的對象
keepRespChan <-chan *clientv3.LeaseKeepAliveResponse //續租的channel
)
//客戶端配置
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
//建立連接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
//關閉連接
defer client.Close()
//申請一個租約(lease)
lease = clientv3.NewLease(client)
//申請一個10秒的租約
if leaseGrantResp, err = lease.Grant(context.TODO(), 3); err != nil {
fmt.Println(err)
return
}
//拿到租約的id
leaseId = leaseGrantResp.ID
fmt.Println("租約id: ", leaseId)
//自動續租 KeepAliveOnce只續租一次 KeepAlive是一直續租里面有個協程維護著
if keepRespChan, err = lease.KeepAlive(context.TODO(), leaseId); err != nil {
fmt.Println(err)
return
}
//啟動協程,消費這個租約
go func() {
for {
select {
//每秒續租一次
case keepResp = <-keepRespChan:
//如果維護租約中發生異常,網絡重新連接后發現租約過期的話,或者我主動把context取消
if keepResp == nil {
fmt.Println("租約已經失效了")
//退出循環
goto END
} else {
//續租一切正常,打印租約id
fmt.Println("收到自動續租應答: ", keepResp.ID)
}
}
}
END:
}()
//獲得kv對象
kv = clientv3.NewKV(client)
//put一個kv,讓他與租約關聯起來,從而實現10秒后自動過期
//第三個參數是具體的值
if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "hello", clientv3.WithLease(leaseId)); err != nil {
fmt.Println(err)
return
}
fmt.Println("寫入成功: ", putResp.Header.Revision)
//time.Sleep(5 * time.Second)
//獲取下kv
if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1"); err != nil {
fmt.Println(err)
return
}
if getResp.Count == 0 {
fmt.Println("kv過期了")
return
}
fmt.Println("讀取成功: ", getResp.Kvs)
for {
;
}
}
~~~
## op封裝get和put
~~~
import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)
func main() {
var (
config clientv3.Config //服務器配置
client *clientv3.Client //客戶端連接對象
err error
kv clientv3.KV //kv操作對象
putOp clientv3.Op //op對象,賦值的
getOp clientv3.Op //op對象, 獲取值的
opResp clientv3.OpResponse //op執行的返回結果
)
//客戶端配置
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
//建立連接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
//關閉連接
defer client.Close()
//獲得kv對象
kv = clientv3.NewKV(client)
//Op:opeartion 代表一個操作,具體操作封裝在里面
putOp = clientv3.OpPut("/cron/jobs/job8", "111")
//執行op
if opResp, err = kv.Do(context.TODO(), putOp); err != nil {
fmt.Println(err)
return
}
//把opeartion變為put的 opResp.Put()
fmt.Println("寫入Revision: ", opResp.Put().Header.Revision)
//get的
getOp = clientv3.OpGet("/cron/jobs/job8")
//執行op
if opResp, err = kv.Do(context.TODO(), getOp); err != nil {
fmt.Println(err)
return
}
fmt.Println("讀取數據: ", opResp.Get().Kvs)
}
~~~
# 分布式鎖
~~~
package main
import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)
func main() {
var (
config clientv3.Config //服務器配置
client *clientv3.Client //客戶端連接對象
err error
kv clientv3.KV //kv操作對象
lease clientv3.Lease //租約對象
leaseGrantResp *clientv3.LeaseGrantResponse //申請到的租約
leaseId clientv3.LeaseID //租約id
keepResp *clientv3.LeaseKeepAliveResponse //續租channel中的對象
keepRespChan <-chan *clientv3.LeaseKeepAliveResponse //續租的channel
ctx context.Context //創建一個用于取消租約的context
cancelFunc context.CancelFunc //取消上下文
txn clientv3.Txn //事務
txnResp *clientv3.TxnResponse //事務提交的返回值
)
//客戶端配置
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
//建立連接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
//關閉連接
defer client.Close()
//獲得kv對象
kv = clientv3.NewKV(client)
//lease實現鎖自動過期
//op操作
//txn事務: if else then
//1.上鎖(創建租約,自動續租,拿著租約去搶占一個key)
//申請一個租約(lease)
lease = clientv3.NewLease(client)
//申請一個5秒的租約
if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
fmt.Println(err)
return
}
//拿到租約的id
leaseId = leaseGrantResp.ID
//準備一個用于取消自動續租的context
ctx, cancelFunc = context.WithCancel(context.TODO())
//確保函數退出后,自動續租會停止
defer cancelFunc()
//確保租約釋放
defer lease.Revoke(context.TODO(), leaseId)
//自動續租 KeepAliveOnce只續租一次 KeepAlive是一直續租里面有個協程維護著
if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
fmt.Println(err)
return
}
//啟動協程,消費這個租約
go func() {
for {
select {
//每秒續租一次
case keepResp = <-keepRespChan:
//如果維護租約中發生異常,網絡重新連接后發現租約過期的話,或者我主動把context取消
if keepResp == nil {
fmt.Println("租約已經失效了")
//退出循環
goto END
} else {
//續租一切正常,打印租約id
fmt.Println("收到自動續租應答: ", keepResp.ID)
}
}
}
END:
}()
//if 不存在key,then設置他,else搶鎖失敗
//創建事務
txn = kv.Txn(context.TODO())
//定義事務
//job9的創建版本=0,滿足了說明key不存在.滿足就走then,不滿足就走else
txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)).
Then(clientv3.OpPut("/cron/lock/job9", "", clientv3.WithLease(leaseId))).
Else(clientv3.OpGet("/cron/lock/job9"))
//提交事務
if txnResp, err = txn.Commit(); err != nil {
fmt.Println(err)
return
}
//判斷是否搶到了鎖
if !txnResp.Succeeded {
//沒搶到鎖的話,取else部分的返回值
fmt.Println("鎖被占用: ", txnResp.Responses[0].GetResponseRange().Kvs)
}
//2. 處理業務
//在鎖內很安全
fmt.Println("----------處理任務")
time.Sleep(5 * time.Second)
//3. 釋放鎖(取消自動續租,釋放租約,一釋放與租約關聯的key就被刪除了)
//defer會把租約釋放掉,關聯的kv就被刪除了
}
~~~
- 基礎
- 簡介
- 主要特征
- 變量和常量
- 編碼轉換
- 數組
- byte與rune
- big
- sort接口
- 和mysql類型對應
- 函數
- 閉包
- 工作區
- 復合類型
- 指針
- 切片
- map
- 結構體
- sync.Map
- 隨機數
- 面向對象
- 匿名組合
- 方法
- 接口
- 權限
- 類型查詢
- 異常處理
- error
- panic
- recover
- 自定義錯誤
- 字符串處理
- 正則表達式
- json
- 文件操作
- os
- 文件讀寫
- 目錄
- bufio
- ioutil
- gob
- 棧幀的內存布局
- shell
- 時間處理
- time詳情
- time使用
- new和make的區別
- container
- list
- heap
- ring
- 測試
- 單元測試
- Mock依賴
- delve
- 命令
- TestMain
- path和filepath包
- log日志
- 反射
- 詳解
- plugin包
- 信號
- goto
- 協程
- 簡介
- 創建
- 協程退出
- runtime
- channel
- select
- 死鎖
- 互斥鎖
- 讀寫鎖
- 條件變量
- 嵌套
- 計算單個協程占用內存
- 執行規則
- 原子操作
- WaitGroup
- 定時器
- 對象池
- sync.once
- 網絡編程
- 分層模型
- socket
- tcp
- udp
- 服務端
- 客戶端
- 并發服務器
- Http
- 簡介
- http服務器
- http客戶端
- 爬蟲
- 平滑重啟
- context
- httptest
- 優雅中止
- web服務平滑重啟
- beego
- 安裝
- 路由器
- orm
- 單表增刪改查
- 多級表
- orm使用
- 高級查詢
- 關系查詢
- SQL查詢
- 元數據二次定義
- 控制器
- 參數解析
- 過濾器
- 數據輸出
- 表單數據驗證
- 錯誤處理
- 日志
- 模塊
- cache
- task
- 調試模塊
- config
- 部署
- 一些包
- gjson
- goredis
- collection
- sjson
- redigo
- aliyunoss
- 密碼
- 對稱加密
- 非對稱加密
- 單向散列函數
- 消息認證
- 數字簽名
- mysql優化
- 常見錯誤
- go run的錯誤
- 新手常見錯誤
- 中級錯誤
- 高級錯誤
- 常用工具
- 協程-泄露
- go env
- gometalinter代碼檢查
- go build
- go clean
- go test
- 包管理器
- go mod
- gopm
- go fmt
- pprof
- 提高編譯
- go get
- 代理
- 其他的知識
- go內存對齊
- 細節總結
- nginx路由匹配
- 一些博客
- redis為什么快
- cpu高速緩存
- 常用命令
- Go 永久阻塞的方法
- 常用技巧
- 密碼加密解密
- for 循環迭代變量
- 備注
- 垃圾回收
- 協程和纖程
- tar-gz
- 紅包算法
- 解決golang.org/x 下載失敗
- 逃逸分析
- docker
- 鏡像
- 容器
- 數據卷
- 網絡管理
- 網絡模式
- dockerfile
- docker-composer
- 微服務
- protoBuf
- GRPC
- tls
- consul
- micro
- crontab
- shell調用
- gorhill/cronexpr
- raft
- go操作etcd
- mongodb