# 前言
作為最簡準備,旨在熟悉libp2p library的使用方法。一個節點的區塊鏈每隔5秒廣播本地的區塊鏈,啟動監聽,并在對端節點連接到后,將本地的區塊鏈數據以json字符串形式寫入網絡,對端節點讀取后,解析為區塊鏈,如果解析而來的區塊鏈長度長于本地的區塊鏈,則直接更新本地區塊鏈,否則丟棄。
libp2p很特別,采用mutiaddress,我們可以自定義系統的網絡地址形式(稱為網絡協議),比如,在本文中,我們設計為:/ip4/127.0.0.1/tcp/listenPort,這個地址用于服務器監聽,其中listenPort為服務器監聽端口;另外一個地址是:/ipfs/ID,其中ID為全網唯一,唯一標識本服務器,用于被其它節點發現,例如:/ipfs/QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N
實際改造我們的區塊鏈時候,會有更細粒度的控制。
# 導入所需的包
其中大部分的包來自于`go-libp2p`:
~~~go
"github.com/davecgh/go-spew/spew"
golog "github.com/ipfs/go-log"
libp2p "github.com/libp2p/go-libp2p"
crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
net "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
gologging "github.com/whyrusleeping/go-logging"
~~~
`spew`包是為了能夠友好地打印區塊鏈數據。
# 創建一個LibP2P節點主機
~~~go
// makeBasicHost 創建一個LibP2P主機
//randseed:一個隨機數,提供隨機數創建主機,程序會更健壯
//listenPort:監聽端口
// secio:是否對數據流進行加密,推薦打開
func makeBasicHost(listenPort int, secio bool, randseed int64) (host.Host, error) {
// 如果seed為0,使用真實的密碼隨機源,
//否則,使用確定性隨機性源,以使生成的密鑰在多次運行中保持不變
var r io.Reader
if randseed == 0 {
r = rand.Reader
} else {
r = mrand.New(mrand.NewSource(randseed))
}
// 為主機產生一對鑰匙. 我們將使用它 來獲得一個合法的主機ID
priv, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
if err != nil {
return nil, err
}
//選項
opts := []libp2p.Option{
libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)),//監聽地址和端口
libp2p.Identity(priv),
}
//默認是開啟的
if !secio {
opts = append(opts, libp2p.NoSecurity)
}
//利用相關參數,創建主機basicHost,獲得全網唯一的IPFS ID,唯一標識本服務器節點
basicHost, err := libp2p.New(context.Background(), opts...)
if err != nil {
return nil, err
}
// 建立主機的Multiaddr,libp2p使用一種獨特的Mutliaddr,而非傳統的IP+端口,用于節點之間互相發現
hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", basicHost.ID().Pretty()))//服務器的ipfs地址,用于被其它節點發現
// 現在,我們可以通過封裝兩個地址來構建一個可抵達主機的完整的Multiaddr
addr := basicHost.Addrs()[0]
fullAddr := addr.Encapsulate(hostAddr)
log.Printf("I am %s\n", fullAddr)
if secio {
log.Printf("現在運行命令: \"go run main.go -l %d -d %s -secio\" 在一個不同的終端\n", listenPort+1, fullAddr)
} else {
log.Printf("現在運行命令: \"go run main.go -l %d -d %s\" 在一個不同的終端\n", listenPort+1, fullAddr)
}
return basicHost, nil
}
~~~
# 流處理
我們需要讓我們的主機處理傳入的數據流。既要處理讀取,也要處理寫操作。
在流處理中,本地需要對Blockchain使用互斥鎖進行讀寫保護,此外,將數據寫入到網絡上也需要使用互斥鎖
~~~go
func handleStream(s net.Stream) {
log.Println("Got a new stream!")
// bufio為非阻塞的讀和寫操作創建一個讀寫緩沖流
rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
go readData(rw)
go writeData(rw)
//流將保持打開直到你關閉它(或者其它方關閉它)
}
~~~
# 讀取
~~~go
func readData(rw *bufio.ReadWriter) {
for {//無限循環,永不停歇地讀取外面進來的數據
//我們使用`ReadString`解析從其它節點發送過來的新的區塊鏈(JSON字符串)。
str, err := rw.ReadString('\n')//以'\n'為分隔符讀取。返回的是字符串拷貝。
if err != nil {//只有一種情況會返回err:沒有遇到分隔符。
log.Fatal(err)
}
if str == "" {//沒有讀取到任何數據
return
}
if str != "\n" {
chain := make([]Block, 0)//make只用于映射、切片和程道,不返回指針,這里創建一個[]Block類型的切片
if err := json.Unmarshal([]byte(str), &chain); err != nil {//json轉為結構對象
log.Fatal(err)
}
mutex.Lock()//獨占互斥鎖,保護的是Blockchain
if len(chain) > len(Blockchain) {//如果流中解析的區塊鏈長度大于本地區塊鏈的長度,替換本地區塊鏈為讀取的區塊鏈
Blockchain = chain
bytes, err := json.MarshalIndent(Blockchain, "", " ")//縮進一個空格,對象Blockchain轉為json
if err != nil {
log.Fatal(err)
}
fmt.Printf("\x1b[32m%s\x1b[0m> ", string(bytes))
}
mutex.Unlock()
}
}
}
~~~
# 寫
我們用一個Go例程啟動函數,它每隔5秒廣播我們的區塊鏈的最新狀態給我們的對等體。如果長度比他們的短,它們會接受并扔掉。如果更長,他們會接受的。無論是哪種方式,所有的對等體都在不斷地通過網絡的最新狀態來更新他們的鏈鏈。
~~~go
func writeData(rw *bufio.ReadWriter) {
go func() {//每隔5秒廣播我們的區塊鏈的最新狀態給我們的對等體
for {
time.Sleep(5 * time.Second)
mutex.Lock()//互斥鎖,保護Blockchain
bytes, err := json.Marshal(Blockchain)//本地Blockchain轉為json字符串
if err != nil {
log.Println(err)
}
mutex.Unlock()
mutex.Lock()//互斥鎖,獨占rw
rw.WriteString(fmt.Sprintf("%s\n", string(bytes)))
rw.Flush()//將緩存的數據真正寫入到網絡上
mutex.Unlock()
}
}()
stdReader := bufio.NewReader(os.Stdin)//從終端讀取待發送到信息(心率數)
for {//無限循環,隨時讀取終端填寫的心律數據,發送到網上
fmt.Print("> ")
sendData, err := stdReader.ReadString('\n')
if err != nil {
log.Fatal(err)
}
sendData = strings.Replace(sendData, "\n", "", -1)
bpm, err := strconv.Atoi(sendData)//將讀取的字符串轉為數字(心率為數字)
if err != nil {
log.Fatal(err)
}
newBlock := generateBlock(Blockchain[len(Blockchain)-1], bpm)//創建區塊
if isBlockValid(newBlock, Blockchain[len(Blockchain)-1]) {
mutex.Lock()//凡是讀或寫Blockchain,均需要開啟互斥鎖
Blockchain = append(Blockchain, newBlock)//將新區塊加入到區塊鏈
mutex.Unlock()
}
mutex.Lock()
bytes, err := json.Marshal(Blockchain)//讀取本地區塊,轉為json
if err != nil {
log.Println(err)
}
mutex.Unlock()
//使用spew.Dump 這個函數可以以非常美觀和方便閱讀的方式將 struct、slice 等數據打印在控制臺里,方便我們調試
spew.Dump(Blockchain)
mutex.Lock()//互斥鎖,獨占rw
rw.WriteString(fmt.Sprintf("%s\n", string(bytes)))
rw.Flush()//將本地區塊數據寫入到網絡
mutex.Unlock()
}
}
~~~
* `secio` 是否允許安全傳輸。我們會一直把這個開關打開的。
* `target` 指定想要連接的host地址,這里我們其實扮演的節點去連接其他host。
* `listenF`打開指定端口讓其他節點連接,這里我們扮演的host。
我們用一個Go例程啟動函數,它每隔5秒廣播我們的區塊鏈的最新狀態給我們的對等體。如果長度比他們的短,他們會接受并扔掉。如果更長,他們會接受并更新本地的區塊鏈。無論是哪種方式,所有的對等體都在不斷地通過網絡的最新狀態來更新他們的鏈鏈。
我們進行一些字符串操作,以確保輸入的BPM是一個整數,并且格式正確,可以添加為新塊。我們通過我們的標準BangLink函數(見上面的“Blockchain stuff”部分)。然后,我們`Marshal`它,使它看起來漂亮,打印到我們的控制臺,用`spew.Dump`驗證。然后我們用`rw.WriteString`將它廣播到我們的連接的對等體。
創建了我們的處理程序和讀寫邏輯來處理輸入和輸出的塊鏈。通過這些函數,我們已經為每個對等點創建了一種方法,以連續地相互檢查其塊鏈的狀態,并且在同一時間,它們都被更新到最新狀態(最長的有效塊鏈)。
# main
~~~go
t := time.Now()
genesisBlock := Block{}
genesisBlock = Block{0, t.String(), 0, calculateHash(genesisBlock), ""}
Blockchain = append(Blockchain, genesisBlock)
// LibP2P 使用 golog記錄消息日志. 我們可以控制日志的詳細程度
golog.SetAllLoggers(gologging.INFO) // 變更為 DEBUG 可以獲得額外的信息
// 從命令行解析出選項
listenF := flag.Int("l", 0, "等待到來的連接")
target := flag.String("d", "", "連接的目標節點")
secio := flag.Bool("secio", false, "啟用 secio")
seed := flag.Int64("seed", 0, "設定產生ID的隨機種子")
flag.Parse()
if *listenF == 0 {
log.Fatal("請提供綁定的端口 -l")
}
// 創建一個主機
ha, err := makeBasicHost(*listenF, *secio, *seed)
if err != nil {
log.Fatal(err)
}
//target為我們指定要連接的另一主機的地址,這意味著如果使用此標志,我們將充當主機的對等方
if *target == "" {//只充當主機
log.Println("listening for connections")
// 將流處理器設定在主機: A。 /p2p/1.0.0 是一個用戶自定義的協議
ha.SetStreamHandler("/p2p/1.0.0", handleStream)
select {} // 一直掛起
} else {//作為主機的對等方,連接到target主機
ha.SetStreamHandler("/p2p/1.0.0", handleStream)//對等端仍然要開啟監聽,接受其它節點的連接
//**下面的代碼,是作為對等端B主機所做的工作:連接到A主機節點,執行讀和寫,實現兩個節點之間的網絡通信**
// 下面的代碼從目標節點的mutiaddress中展開獲得節點的ID
ipfsaddr, err := ma.NewMultiaddr(*target)
if err != nil {
log.Fatalln(err)
}
pid, err := ipfsaddr.ValueForProtocol(ma.P_IPFS)
if err != nil {
log.Fatalln(err)
}
peerid, err := peer.IDB58Decode(pid)//將58位的string轉化為id
if err != nil {
log.Fatalln(err)
}
// 從目標主機解封裝 /ipfs/<peerID>
// /ip4/<a.b.c.d>/ipfs/<peer> 變成 /ip4/<a.b.c.d>
targetPeerAddr, _ := ma.NewMultiaddr(
fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(peerid)))
targetAddr := ipfsaddr.Decapsulate(targetPeerAddr)
// 我們有了一個節點ID和targetAddr,將它添加到peerstore
// 這樣LibP2就知道如何聯系到它
ha.Peerstore().AddAddr(peerid, targetAddr, pstore.PermanentAddrTTL)
log.Println("正在打開B到A的網絡流...")
// 創建一個新的從主機B到A的流
// 它應當被主機A通過我們上面設定的處理器進行處理
// 因為我們使用相同的 /p2p/1.0.0 協議
s, err := ha.NewStream(context.Background(), peerid, "/p2p/1.0.0")
if err != nil {
log.Fatalln(err)
}
// 創建一個新的緩沖流,這樣讀和寫將不會阻塞
rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
// 創建一個線程讀和寫數據
go writeData(rw)
go readData(rw)
select {} // 永遠掛起
}
~~~
我們設置所有的命令行標志。
* `secio` 我們以前覆蓋并允許安全流。我們將確保在運行程序時始終使用這個標志。
* `target` 讓我們指定要連接的另一主機的地址,這意味著如果使用此標志,我們將充當主機的對等方。
* `listenF`打開了我們希望允許連接的端口,這意味著我們作為主機。我們既可以是主機(接收連接),也可以是對等體(連接到其他主機)。這就是這個系統真正成為P2P的原因!
* `seed` 是可選的隨機播種器,用來構造我們的地址,其他節點可以用來連接我們。
然后,我們創建了一個新的主機,我們之前創建了`makeBasicHost`函數。如果我們只充當主機(即,我們沒有連接到其他主機),我們指定如果`*target==“”`,則使用我們之前創建的`setStreamHandle`函數啟動處理程序,這是我們的監聽器代碼的結束。
如果我們確實想要連接到另一個主機,我們移動到`else`部分。我們再次設置我們的處理程序,因為我們作為一個主機和一個連接的對等體。
接下來的幾行解構了我們提供給目標的字符串,這樣我們就可以找到我們想要連接的主機。這也被稱為解封裝。
我們最終得到要連接的主機的`peerID`和目標地址`targetAddr`,并將該記錄添加到“存儲”中,以便跟蹤我們與誰連接。
然后,我們使用`ha.NewStream`創建想要連接到的對等體連接。我們希望能夠接收和發送數據流(我們的區塊鏈),因此就像我們在處理程序中做的那樣,我們創建一個`ReadWriter`,并為`readData`和`writeData`創建單獨的Go例程。最后我們通過空的`select`來阻塞程序,這樣程序不會停止。
- 重要更新說明
- linechain發布
- linechain新版設計
- 引言一
- 引言二
- 引言三
- vs-code設置及開發環境設置
- BoltDB數據庫應用
- 關于Go語言、VS-code的一些Tips
- 區塊鏈的架構
- 網絡通信與區塊鏈
- 單元測試
- 比特幣腳本語言
- 關于區塊鏈的一些概念
- 區塊鏈組件
- 區塊鏈第一版:基本原型
- 區塊鏈第二版:增加工作量證明
- 區塊鏈第三版:持久化
- 區塊鏈第四版:交易
- 區塊鏈第五版:實現錢包
- 區塊鏈第六版:實現UTXO集
- 區塊鏈第七版:網絡
- 階段小結
- 區塊鏈第八版:P2P
- P2P網絡架構
- 區塊鏈網絡層
- P2P區塊鏈最簡體驗
- libp2p建立P2P網絡的關鍵概念
- 區塊鏈結構層設計與實現
- 用戶交互層設計與實現
- 網絡層設計與實現
- 建立節點發現機制
- 向區塊鏈網絡請求區塊信息
- 向區塊鏈網絡發布消息
- 運行區塊鏈
- LineChain
- 系統運行流程
- Multihash
- 區塊鏈網絡的節點發現機制深入探討
- DHT
- Bootstrap
- 連接到所有引導節點
- Advertise
- 搜索其它peers
- 連接到搜到的其它peers
- 區塊鏈網絡的消息訂發布-訂閱機制深入探討
- LineChain:適用于智能合約編程的腳本語言支持
- LineChain:解決分叉問題
- LineChain:多重簽名
- libp2p升級到v0.22版本
- 以太坊基礎
- 重溫以太坊的樹結構
- 世界狀態樹
- (智能合約)賬戶存儲樹
- 交易樹
- 交易收據樹
- 小結
- 以太坊的存儲結構
- 以太坊狀態數據庫
- MPT
- 以太坊POW共識算法
- 智能合約存儲
- Polygon Edge
- block結構
- transaction數據結構
- 數據結構小結
- 關于本區塊鏈的一些說明
- UML工具-PlantUML
- libp2p介紹
- JSON-RPC
- docker制作:啟動多個應用系統
- Dockerfile
- docker-entrypoint.sh
- supervisord.conf
- docker run
- nginx.conf
- docker基礎操作整理
- jupyter計算交互環境
- git技巧一
- git技巧二
- 使用github項目的最佳實踐
- windows下package管理工具