# 網絡層設計與實現
## Node
一個網絡節點(Node)命名為Network。
Node用Network來定義和實現,特指P2P網絡節點,更體現Node的本質。
```
//?Network?節點的數據結構
type Network struct?{
????Host?????????????host.Host//主機
????GeneralChannel???*Channel//通用節點
????MiningChannel????*Channel//挖礦節點
????FullNodesChannel?*Channel//全節點
????Blockchain???????*blockchain.Blockchain
????Blocks???????????chan?*blockchain.Block//Block類型的通道
????Transactions?????chan?*blockchain.Transaction//Transaction類型的通道
????Miner????????????bool
}
```
## Channel
Channel為通信通道,每個host有三個通信通道,但根據其節點的類別,一般一個節點只用到其中一個通信通道。
```
//?Channel?的數據結構
type Channel struct?{
????ctx???context.Context
????pub???*pubsub.PubSub//發布者
????topic?*pubsub.Topic
????sub???*pubsub.Subscription//訂閱者
????channelName?string//構成Topic名稱字符串的組成部分(TopicName="channel:"?+?channelName)
????self????????peer.ID
????Content?????chan?*ChannelContent//ChannelContent類型的通道
}
```
GeneralChannel為通用節點,負責列舉所有連接到主機(host)的所有peer,這也是所有連接到host的peer,處理除了tx之外的所有命令消息。
FullNodesChannel為全節點,處理與交易相關的tx及gettxfrompool命令,即將新交易放到內存池,以及每秒不斷將交易從交易池中取出(這里我們每秒只取出一條交易,可以優化為每次取出多條交易)給挖礦節點進行挖礦。
MiningChannel為挖礦節點,處理來自交易池的inv命令及來自交易池的tx命令。
## Host
P2P的host package定義了Host這一interface。
```
//?為本主機(host)創建一對新的?RSA?密鑰
prvKey,?_,?err?:=?crypto.GenerateKeyPairWithReader(crypto.RSA,?2048,?r)
if?err?!=?nil?{
panic(err)
????}
transports?:=?libp2p.ChainOptions(
????????libp2p.Transport(tcp.NewTCPTransport),//支持TCP傳輸協議
????????libp2p.Transport(ws.New),//支持websorcket傳輸協議
????)
muxers?:=?libp2p.ChainOptions(
????????libp2p.Muxer("/yamux/1.0.0",?yamux.DefaultTransport),//支持"/yamux/1.0.0"流連接(基于可靠連接的多路I/O復用)
????????libp2p.Muxer("/mplex/6.7.0",?mplex.DefaultTransport),//支持"/mplex/6.7.0"流連接(二進制流多路I/O復用),由LibP2P基于multiplex創建
????)
if len(listenPort)?==?0?{
listenPort?=?"0"
????}
listenAddrs?:=?libp2p.ListenAddrStrings(
????????fmt.Sprintf("/ip4/0.0.0.0/tcp/%s",?listenPort),//支持tcp傳輸
????????fmt.Sprintf("/ip4/0.0.0.0/tcp/%s/ws",?listenPort),//支持websorket傳輸
????)
// Host是參與p2p網絡的對象,它實現協議或提供服務。
// 它像服務器一樣處理請求,像客戶端一樣發出請求。
// 之所以稱為 Host,是因為它既是 Server 又是 Client(而 Peer 可能會混淆)。
// 1、創建host
// 重要:創建主機host
//-如果沒有提供transport和listen addresses,節點將監聽在多地址(mutiaddresses): "/ip4/0.0.0.0/tcp/0" 和 "/ip6/::/tcp/0";
//-如果沒有提供transport的選項,節點使用TCP和websorcket傳輸協議
//-如果multiplexer配置沒有提供,節點缺省使用"yamux/1.0.0" 和 "mplux/6.7.0"流連接配置
//-如果沒有提供security transport,主機使用go-libp2p的noise和/或tls加密的transport來加密所有的traffic(新版本libp2p已經不再支持security transport參數設置)
//-如果沒有提供peer的identity,它產生一個隨機RSA 2048鍵值對,并由它導出一個新的identity
//-如果沒有提供peerstore,主機使用一個空的peerstore來進行初始化
host,?err?:=?libp2p.New(
????????ctx,
????????transports,
????????listenAddrs,
????????muxers,
????????libp2p.Identity(prvKey),
????)
```
上述代碼中第一步創建Host:
```
host,?err?:=?libp2p.New(...)
```
我們追溯New函數,它來自于libp2p.go,最終調用的是:
```
func NewWithoutDefaults(ctx?context.Context,?opts?...Option)?(host.Host,?error)?{
varcfg?Config
if err?:=?cfg.Apply(opts...);?err?!=?nil?{
returnnil,?err
????}
return?cfg.NewNode(ctx)
}
```
我們繼續追溯cfg.NewNode(ctx),在P2Plib的config.go,關鍵代碼如下:
```
func?(cfg?*Config)?NewNode(ctx?context.Context)?(host.Host,?error)?{
swrm,?err?:=?cfg.makeSwarm(ctx)
if?err?!=?nil?{
returnnil,?err
????}
h,?err?:=?bhost.NewHost(ctx,?swrm,?&bhost.HostOpts{
????????ConnManager:??cfg.ConnManager,
????????AddrsFactory:?cfg.AddrsFactory,
????????NATManager:???cfg.NATManager,
????????EnablePing:???!cfg.DisablePing,
????????UserAgent:????cfg.UserAgent,
????})
...
h.Start()
if?router?!=?nil?{
return?outed.Wrap(h,?router),?nil
????}
return?h,?nil
}
```
security transport,默認的值為:
```
var DefaultSecurity =?libp2p.ChainOptions(
Security(noise.ID, noise.New),
Security(tls.ID, tls.New),
)
```
上述代碼的第一個關鍵是:
```
swrm,?err?:=?cfg.makeSwarm(ctx)
```
我們追溯進去,看看cfg.makeSwarm(ctx):
```
func?(cfg?*Config)?makeSwarm(ctx?context.Context)?(*swarm.Swarm,?error)?{
//從config保存的公鑰得到pid
pid,?err?:=?peer.IDFromPublicKey(cfg.PeerKey.GetPublic())
...
swrm?:=?swarm.NewSwarm(ctx,?pid,?cfg.Peerstore,?cfg.Reporter,?cfg.ConnectionGater)
return?swrm,?nil
```
我們繼續追溯swarm.NewSwarm(ctx,?pid,?cfg.Peerstore,?cfg.Reporter,?cfg.ConnectionGater):
```
func NewSwarm(ctx?context.Context,?local?peer.ID,?peers?peerstore.Peerstore,?bwc?metrics.Reporter,?extra?...interface{})?*Swarm?{
s?:=?&Swarm{
????????local:?local,
????????peers:?peers,
????????bwc:???bwc,
????}
...
return?s
```
可見,peer.ID被賦值到Swarm對象的local變量。
我們回到函數:
```
func?(cfg?*Config)?NewNode(ctx?context.Context)?(host.Host,?error)?{
swrm,?err?:=?cfg.makeSwarm(ctx)
if?err?!=?nil?{
returnnil,?err
????}
h,?err?:=?bhost.NewHost(ctx,?swrm,?&bhost.HostOpts{
????????ConnManager:??cfg.ConnManager,
????????AddrsFactory:?cfg.AddrsFactory,
????????NATManager:???cfg.NATManager,
????????EnablePing:???!cfg.DisablePing,
????????UserAgent:????cfg.UserAgent,
????})
...
h.Start()
if?router?!=?nil?{
return?outed.Wrap(h,?router),?nil
????}
return?h,?nil
}
```
前面已經討論完了swrm,?err?:=?cfg.makeSwarm(ctx),我們繼續往下看,swrm成為創建Host的一個參數:
```
h,?err?:=?bhost.NewHost(ctx,?swrm,?&bhost.HostOpts{
????????ConnManager:??cfg.ConnManager,
????????AddrsFactory:?cfg.AddrsFactory,
????????NATManager:???cfg.NATManager,
????????EnablePing:???!cfg.DisablePing,
????????UserAgent:????cfg.UserAgent,
????})
```
bhost是一個package:
```
bhost?"github.com/libp2p/go-libp2p/p2p/host/basic"
```
我們查看上面的NewHost,進入到basichost package(basic_host.go):
定義了basichost:
```
type BasicHost struct?
```
然后BasicHost實現了Host的所有接口方法,其中NewHost接口實現如下:
```
func NewHost(ctx?context.Context,?n?network.Network,?opts?\*HostOpts)?(\*BasicHost,?error)?{
hostCtx,?cancel?:=?context.WithCancel(ctx)
h?:=?&BasicHost{
????????network:?????????????????n,
????????mux:?????????????????????msmux.NewMultistreamMuxer(),
????????negtimeout:??????????????DefaultNegotiationTimeout,
????????AddrsFactory:????????????DefaultAddrsFactory,
????????maResolver:??????????????madns.DefaultResolver,
????????eventbus:????????????????eventbus.NewBus(),
????????addrChangeChan:??????????make(chanstruct{},?1),
????????ctx:?????????????????????hostCtx,
????????ctxCancel:???????????????cancel,
????????disableSignedPeerRecord:?opts.DisableSignedPeerRecord,
????}
...
return?h,?nil
```
swrm作為參數傳給了n?network.Network。而實現的接口:
```
func?(h?*BasicHost)?ID()?peer.ID?{
return?h.Network().LocalPeer()
}
```
h.Network()返回Swarm對象(Swarm是一個struct,實現了接口network.Network(Network是一個interface))
```
func?(h?*BasicHost)?Network()?network.Network?{
return?h.network
}
```
我們看看Swarm的函數LocalPeer(),正好返回的是local(即peer的ID):
```
func?(s?*Swarm)?LocalPeer()?peer.ID?{
return?s.local
}
```
### 小結
1、主機實際上是BasicHost struct,它實現了Host interface,peer.ID在創建host時候已經在Host中得到了(host.ID()得到的即是peer.ID)。
2、同時Swarm struct實現了libp2p的network.Network interface。
3、BasicHost和Swarm均由p2plib提供。
## Peer
Peer為對等端,是host的第三方視覺的概念。
Peer以ID為唯一標識,peer.ID是通過哈希peer的公鑰而派生,并編碼其哈希輸出為multihash的結果。
peer.ID是往后不同節點之間進行通信的重要參數,它代表一個Host,或者說,我們可以通過peer.ID獲得一個具體的Host對象。如發送虛擬幣:
```
func?(net?*Network)?SendTx(peerId?string,?transaction?*blockchain.Transaction)?{
?memoryPool.Add(*transaction)
tnx?:=?Tx{net.Host.ID().Pretty(),?transaction.Serializer()}
payload?:=?GobEncode(tnx)
request?:=?append(CmdToBytes("tx"),?payload...)
//?給全節點(FullNodes)第通信通道發布此消息,全節點將進行處理
net.FullNodesChannel.Publish("接收到?Send?transaction?命令",?request,?peerId)
}
```
如同Host一樣,peer package也是在libp2p庫中定義,所在的文件是peer.go,不同的是,在peer.go中并沒有定義一個peer的struct,而是直接在peer package中定義ID:
```
type ID string
```
但顯然ID是一個mutihash的值,如需要對外呈現需要使用base58編碼后得到人可以識別的字符串:
```
func?(id?ID)?String()?string?{
return?id.Pretty()
}
```
Pretty方法如下:
```
func?(id?ID)?Pretty()?string?{
returnIDB58Encode(id)
}
```
## 網絡通信流程
一切從startNode開始。
main.go:
```
cli.StartNode(listenPort,?minerAddress,?miner,?fullNode,?func(net?*p2p.Network)?{//最后一個參數是回調函數,獲得net實例
if?rpc?{
cli.P2p?=?net//啟動節點后設置cli的P2P實例,net為啟動節點函數的回調函數參數被回調后返回的Network實例
go?jsonrpc.StartServer(cli,?rpc,?rpcPort,?rpcAddr)
????????????????}
?})
```
其中cli的結構:
```
type CommandLinestruct?{
????Blockchain????*blockchain.Blockchain
????P2p???????????*p2p.Network
????CloseDbAlways?bool//每次命令執行完畢是否關閉數據庫
}
```
其中istenPort,?minerAddress,?miner,?fullNode等參數的值來自于命令startnode執行時獲得的命令行參數。
cli.StartNode實現:
```
//?StartNode?啟動節點,其中fn為回調函數,p2p.StartNode調用過程中調用fn,設置p2p.Network實例
func?(cli?*CommandLine)?StartNode(listenPort,?minerAddress?string,?miner,?fullNode?bool,?fn?func(*p2p.Network))?{
if?miner?{
????????log.Infof("作為礦工正在啟動節點:?%s\\n",?listenPort)
iflen(minerAddress)?>?0?{
if?wallet.ValidateAddress(minerAddress)?{
????????????????log.Info("正在挖礦,接收獎勵的地址是:",?minerAddress)
????????????}?else?{
????????????????log.Fatal("請提供一個合法的礦工地址")
????????????}
????????}
????}?else?{
????????log.Infof("在:?%s\\n端口上啟動節點",?listenPort)
????}
chain?:=?cli.Blockchain.ContinueBlockchain()
????p2p.StartNode(chain,?listenPort,?minerAddress,?miner,?fullNode,?fn)
}
```
在獲得了blockchain實例后,調用p2p package的StartNode函數:
```
//?StartNode?啟動一個節點
func StartNode(chain?*blockchain.Blockchain,?listenPort,?minerAddress?string,?miner,?fullNode?bool,?callback?func(*Network))?{
var r?io.Reader
r?=?rand.Reader//沒有指定seed,使用隨機種子
MinerAddress?=?minerAddress
ctx,?cancel?:=?context.WithCancel(context.Background())
defercancel()
defer?chain.Database.Close()//函數運行結束,關閉區塊鏈數據庫
go?appUtils.CloseDB(chain)//啟動協程,遇到程序強行終止信號時關閉數據庫,退出程序
//?為本主機(host)創建一對新的?RSA?密鑰
prvKey,?_,?err?:=?crypto.GenerateKeyPairWithReader(crypto.RSA,?2048,?r)
if?err?!=?nil?{
panic(err)
????}
transports?:=?libp2p.ChainOptions(
????????libp2p.Transport(tcp.NewTCPTransport),//支持TCP傳輸協議
????????libp2p.Transport(ws.New),//支持websorcket傳輸協議
????)
muxers?:=?libp2p.ChainOptions(
????????libp2p.Muxer("/yamux/1.0.0",?yamux.DefaultTransport),//支持"/yamux/1.0.0"流連接(基于可靠連接的多路I/O復用)
????????libp2p.Muxer("/mplex/6.7.0",?mplex.DefaultTransport),//支持"/mplex/6.7.0"流連接(二進制流多路I/O復用),由LibP2P基于multiplex創建
????)
if len(listenPort)?==?0?{
listenPort?=?"0"
????}
listenAddrs?:=?libp2p.ListenAddrStrings(
????????fmt.Sprintf("/ip4/0.0.0.0/tcp/%s",?listenPort),//支持tcp傳輸
????????fmt.Sprintf("/ip4/0.0.0.0/tcp/%s/ws",?listenPort),//支持websorket傳輸
????)
// Host是參與p2p網絡的對象,它實現協議或提供服務。
// 它像服務器一樣處理請求,像客戶端一樣發出請求。
// 之所以稱為 Host,是因為它既是 Server 又是 Client(而 Peer 可能會混淆)。
// 1、創建host
// 重要:創建主機host
//-如果沒有提供transport和listen addresses,節點將監聽在多地址(mutiaddresses): "/ip4/0.0.0.0/tcp/0" 和 "/ip6/::/tcp/0";
//-如果沒有提供transport的選項,節點使用TCP和websorcket傳輸協議
//-如果multiplexer配置沒有提供,節點缺省使用"yamux/1.0.0" 和 "mplux/6.7.0"流連接配置
//-如果沒有提供security transport,主機使用go-libp2p的noise和/或tls加密的transport來加密所有的traffic(新版本libp2p已經不再支持security transport參數設置)
//-如果沒有提供peer的identity,它產生一個隨機RSA 2048鍵值對,并由它導出一個新的identity
//-如果沒有提供peerstore,主機使用一個空的peerstore來進行初始化
host,?err?:=?libp2p.New(
????????ctx,
????????transports,
????????listenAddrs,
????????muxers,
????????libp2p.Identity(prvKey),
????)
if?err?!=?nil?{
panic(err)
????}
for _,?addr?:=?range?host.Addrs()?{
????????fmt.Println("正在監聽在",?addr)
????}
????log.Info("主機已創建:?",?host.ID())
//?2、使用GossipSub路由,創建一個新的基于Gossip?協議的?PubSub?服務系統
//?任何一個主機節點,都是一個訂閱發布服務系統
//?這是整個區塊鏈網絡運行的關鍵所在
pub,?err?:=?pubsub.NewGossipSub(ctx,?host)
if?err?!=?nil?{
panic(err)
????}
// 3、構建三個通信通道,通信通道使用發布-訂閱系統,在不同節點之間傳遞信息
// 之所以需要三個通道,是因為未來規劃不同節點擁有不同的功能,不同功能的節點完成不同類型的任務。
// 三個通道的消息獨立,只有訂閱了該通道消息的節點,才能收到該通道的消息,然后進行處理,以完成相應的任務。
// 任何一個節點,均創建了三個通道實例,這意味著人一個節點都可以根據需要,選擇任意一個通道發送消息
// 在訂閱上,一個具體的節點, GeneralChannel 訂閱將消息,如果是采礦節點(miner==true),miningChannel 會接收到消息,
// 如果是全節點(fullNode==true),fullNodesChannel會接受到消息
//GeneralChannel?通道訂閱消息
generalChannel,?_?:=?JoinChannel(ctx,?pub,?host.ID(),?GeneralChannel,?true)
subscribe?:=?false
if?miner?{
subscribe?=?true
????}
//如果是挖礦節點,?miningChannel?訂閱消息,否則?miningChannel?不訂閱消息
miningChannel,?_?:=?JoinChannel(ctx,?pub,?host.ID(),?MiningChannel,?subscribe)
subscribe?=?false
if?fullNode?{
subscribe?=?true
????}
//如果是全節點,?fullNodesChannel?訂閱消息,否則?fullNodesChannel?不訂閱消息
fullNodesChannel,?_?:=?JoinChannel(ctx,?pub,?host.ID(),?FullNodesChannel,?subscribe)
//?3、為各通信通道建立命令行界面對象
ui?:=?NewCLIUI(generalChannel,?miningChannel,?fullNodesChannel)
//?4、建立對等端(peer)發現機制(discovery),使得本節點可以被網絡上的其它節點發現
//同時將主機(host)連接到所有已經發現的對等端(peer)
err?=?SetupDiscovery(ctx,?host)
if?err?!=?nil?{
panic(err)
????}
network?:=?&Network{
????????Host:?????????????host,
????????GeneralChannel:???generalChannel,
????????MiningChannel:????miningChannel,
????????FullNodesChannel:?fullNodesChannel,
????????Blockchain:???????chain,
????????Blocks:???????????make(chan?*blockchain.Block,?200),
????????Transactions:?????make(chan?*blockchain.Transaction,?200),
????????Miner:????????????miner,
????}
//?5、回調,將節點(network)實例傳回
callback(network)
//?6、向全網請求區塊信息,以補全本地區塊鏈
//?每一個節點均有區塊鏈的一個完整副本
err?=?RequestBlocks(network)
//?7、啟用協程,處理節點事件
goHandleEvents(network)
//?8、如果是礦工節點,啟用協程,不斷發送ping命令給全節點
if?miner?{
//?礦工事件循環,以不斷地發送一個ping給全節點,目的是得到新的交易,為它挖礦,并添加到區塊鏈
go?network.MinersEventLoop()
????}
if?err?!=?nil?{
panic(err)
????}
//?9、運行UI界面,將在Run函數體中啟動協程,循環接收并處理全網節點publish的消息
iferr?=?ui.Run(network);?err?!=?nil?{
????????log.Error("運行文字UI發生錯誤:?%s",?err)
????}
}
```
- 重要更新說明
- linechain發布
- linechain新版設計
- 引言一
- 引言二
- 引言三
- vs-code設置及開發環境設置
- BoltDB數據庫應用
- 關于Go語言、VS-code的一些Tips
- 區塊鏈的架構
- 網絡通信與區塊鏈
- 單元測試
- 比特幣腳本語言
- 關于區塊鏈的一些概念
- 區塊鏈組件
- 區塊鏈第一版:基本原型
- 區塊鏈第二版:增加工作量證明
- 區塊鏈第三版:持久化
- 區塊鏈第四版:交易
- 區塊鏈第五版:實現錢包
- 區塊鏈第六版:實現UTXO集
- 區塊鏈第七版:網絡
- 階段小結
- 區塊鏈第八版:P2P
- P2P網絡架構
- 區塊鏈網絡層
- P2P區塊鏈最簡體驗
- libp2p建立P2P網絡的關鍵概念
- 區塊鏈結構層設計與實現
- 用戶交互層設計與實現
- 網絡層設計與實現
- 建立節點發現機制
- 向區塊鏈網絡請求區塊信息
- 向區塊鏈網絡發布消息
- 運行區塊鏈
- LineChain
- 系統運行流程
- Multihash
- 區塊鏈網絡的節點發現機制深入探討
- DHT
- Bootstrap
- 連接到所有引導節點
- Advertise
- 搜索其它peers
- 連接到搜到的其它peers
- 區塊鏈網絡的消息訂發布-訂閱機制深入探討
- LineChain:適用于智能合約編程的腳本語言支持
- LineChain:解決分叉問題
- LineChain:多重簽名
- libp2p升級到v0.22版本
- 以太坊基礎
- 重溫以太坊的樹結構
- 世界狀態樹
- (智能合約)賬戶存儲樹
- 交易樹
- 交易收據樹
- 小結
- 以太坊的存儲結構
- 以太坊狀態數據庫
- MPT
- 以太坊POW共識算法
- 智能合約存儲
- Polygon Edge
- block結構
- transaction數據結構
- 數據結構小結
- 關于本區塊鏈的一些說明
- UML工具-PlantUML
- libp2p介紹
- JSON-RPC
- docker制作:啟動多個應用系統
- Dockerfile
- docker-entrypoint.sh
- supervisord.conf
- docker run
- nginx.conf
- docker基礎操作整理
- jupyter計算交互環境
- git技巧一
- git技巧二
- 使用github項目的最佳實踐
- windows下package管理工具