首先思考下Etcd是什么?可能很多人第一反應可能是一個鍵值存儲倉庫,卻沒有重視官方定義的后半句,用于配置共享和服務發現。
```
A highly-available key value store for shared configuration and service discovery.
```
實際上,etcd 作為一個受到 ZooKeeper 與 doozer 啟發而催生的項目,除了擁有與之類似的功能外,更專注于以下四點。
* 簡單:基于 HTTP+JSON 的 API 讓你用 curl 就可以輕松使用。
* 安全:可選 SSL 客戶認證機制。
* 快速:每個實例每秒支持一千次寫操作。
* 可信:使用 Raft 算法充分實現了分布式。
因為 Etcd 使用 Raft 算法保持了數據的強一致性,某次操作存儲到集群中的值必然是全局一致的,所以很容易實現分布式鎖。鎖服務有兩種使用方式,一是保持獨占,二是控制時序。
* 保持獨占即所有獲取鎖的用戶最終只有一個可以得到。etcd 為此提供了一套實現分布式鎖原子操作 CAS(CompareAndSwap)的 API。通過設置prevExist值,可以保證在多個節點同時去創建某個目錄時,只有一個成功。而創建成功的用戶就可以認為是獲得了鎖。
* 控制時序,即所有想要獲得鎖的用戶都會被安排執行,但是獲得鎖的順序也是全局唯一的,同時決定了執行順序。etcd 為此也提供了一套 API(自動創建有序鍵),對一個目錄建值時指定為POST動作,這樣 etcd 會自動在目錄下生成一個當前最大的值為鍵,存儲這個新的值(客戶端編號)。同時還可以使用 API 按順序列出所有當前目錄下的鍵值。此時這些鍵的值就是客戶端的時序,而這些鍵中存儲的值可以是代表客戶端的編號。
在這里Ectd實現分布式鎖基本實現原理為:
1. 在ectd系統里創建一個key。
2. 如果創建失敗,key存在,則監聽該key的變化事件,直到該key被刪除,回到1。
3. 如果創建成功,則認為我獲得了鎖。
應用示例:
```
package etcdsync
import (
"fmt"
"io"
"os"
"sync"
"time"
"github.com/coreos/etcd/client"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
)
const (
defaultTTL = 60
defaultTry = 3
deleteAction = "delete"
expireAction = "expire"
)
// A Mutex is a mutual exclusion lock which is distributed across a cluster.
type Mutex struct {
key string
id string // The identity of the caller
client client.Client
kapi client.KeysAPI
ctx context.Context
ttl time.Duration
mutex *sync.Mutex
logger io.Writer
}
// New creates a Mutex with the given key which must be the same
// across the cluster nodes.
// machines are the ectd cluster addresses
func New(key string, ttl int, machines []string) *Mutex {
cfg := client.Config{
Endpoints: machines,
Transport: client.DefaultTransport,
HeaderTimeoutPerRequest: time.Second,
}
c, err := client.New(cfg)
if err != nil {
return nil
}
hostname, err := os.Hostname()
if err != nil {
return nil
}
if len(key) == 0 || len(machines) == 0 {
return nil
}
if key[0] != '/' {
key = "/" + key
}
if ttl < 1 {
ttl = defaultTTL
}
return &Mutex{
key: key,
id: fmt.Sprintf("%v-%v-%v", hostname, os.Getpid(), time.Now().Format("20060102-15:04:05.999999999")),
client: c,
kapi: client.NewKeysAPI(c),
ctx: context.TODO(),
ttl: time.Second * time.Duration(ttl),
mutex: new(sync.Mutex),
}
}
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() (err error) {
m.mutex.Lock()
for try := 1; try <= defaultTry; try++ {
if m.lock() == nil {
return nil
}
m.debug("Lock node %v ERROR %v", m.key, err)
if try < defaultTry {
m.debug("Try to lock node %v again", m.key, err)
}
}
return err
}
func (m *Mutex) lock() (err error) {
m.debug("Trying to create a node : key=%v", m.key)
setOptions := &client.SetOptions{
PrevExist:client.PrevNoExist,
TTL: m.ttl,
}
resp, err := m.kapi.Set(m.ctx, m.key, m.id, setOptions)
if err == nil {
m.debug("Create node %v OK [%q]", m.key, resp)
return nil
}
m.debug("Create node %v failed [%v]", m.key, err)
e, ok := err.(client.Error)
if !ok {
return err
}
if e.Code != client.ErrorCodeNodeExist {
return err
}
// Get the already node's value.
resp, err = m.kapi.Get(m.ctx, m.key, nil)
if err != nil {
return err
}
m.debug("Get node %v OK", m.key)
watcherOptions := &client.WatcherOptions{
AfterIndex : resp.Index,
Recursive:false,
}
watcher := m.kapi.Watcher(m.key, watcherOptions)
for {
m.debug("Watching %v ...", m.key)
resp, err = watcher.Next(m.ctx)
if err != nil {
return err
}
m.debug("Received an event : %q", resp)
if resp.Action == deleteAction || resp.Action == expireAction {
return nil
}
}
}
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() (err error) {
defer m.mutex.Unlock()
for i := 1; i <= defaultTry; i++ {
var resp *client.Response
resp, err = m.kapi.Delete(m.ctx, m.key, nil)
if err == nil {
m.debug("Delete %v OK", m.key)
return nil
}
m.debug("Delete %v falied: %q", m.key, resp)
e, ok := err.(client.Error)
if ok && e.Code == client.ErrorCodeKeyNotFound {
return nil
}
}
return err
}
func (m *Mutex) debug(format string, v ...interface{}) {
if m.logger != nil {
m.logger.Write([]byte(m.id))
m.logger.Write([]byte(" "))
m.logger.Write([]byte(fmt.Sprintf(format, v...)))
m.logger.Write([]byte("\n"))
}
}
func (m *Mutex) SetDebugLogger(w io.Writer) {
m.logger = w
}
```
其實類似的實現有很多,但目前都已經過時,使用的都是被官方標記為deprecated的項目。且大部分接口都不如上述代碼簡單。 使用上,跟Golang官方sync包的Mutex接口非常類似,先New(),然后調用Lock(),使用完后調用Unlock(),就三個接口,就是這么簡單。示例代碼如下:
```
package main
import (
"github.com/zieckey/etcdsync"
"log"
)
func main() {
//etcdsync.SetDebug(true)
log.SetFlags(log.Ldate|log.Ltime|log.Lshortfile)
m := etcdsync.New("/etcdsync", "123", []string{"http://127.0.0.1:2379"})
if m == nil {
log.Printf("etcdsync.NewMutex failed")
}
err := m.Lock()
if err != nil {
log.Printf("etcdsync.Lock failed")
} else {
log.Printf("etcdsync.Lock OK")
}
log.Printf("Get the lock. Do something here.")
err = m.Unlock()
if err != nil {
log.Printf("etcdsync.Unlock failed")
} else {
log.Printf("etcdsync.Unlock OK")
}
}
```
- 一、經典(一)
- 二、經典(二)
- 三、經典(三)
- 四、經典(四)
- 五、經典(五)
- 六、經典(六)
- 七、經典(七)
- 八、經典(八)
- 九、經典(九)
- 十、經典(十)
- 十一、經典(十一)
- 十二、經典(十二)
- 其他
- 1、知識點一
- 2、面試集
- 3、負載均衡原理
- 4、LVS相關了解
- 5、微服務架構
- 6、分布式鎖實現原理
- 7、Etcd怎么實現分布式鎖
- 8、Redis的數據結構有哪些,以及實現場景
- 9、Mysql高可用方案有哪些
- 10、Go語言的棧空間管理是怎么樣的
- 11、Goroutine和Channel的作用分別是什么
- 12、Go中的鎖有哪些?三種鎖,讀寫鎖,互斥鎖,還有map的安全的鎖?
- 13、怎么限制Goroutine的數量
- 14、Goroutine和線程的區別?
- 15、中間件原理