區塊鏈以消息為中心,通過發布/訂閱服務,所有節點之間的數據同步成為可能。
**注意publish執行后,消息將發給全網,包括本地節點,所以本地節點在接收到自己發的消息后,需要特別做好識別(自己發布的消息自己不應該處理)。**
一般情況下,startNode啟動一個節點時候,miner, fullNode應該是為true,miner可以為true可以為false。一般作為挖礦節點,不執行send交易命令,只有全節點才執行send交易命令,但這只是分工,事實上它也可以發起交易,分工的好處是,它可以專注于挖礦,將更多區塊放一起集中挖礦。
FullNodesChanne 通道支持處理的消息支持類型:gettxfrompool,tx;
MiningChannel 通道支持處理的消息類型:inv(type為tx),tx;
GeneralChannel 通道支持處理的消息類型:block,getdata,inv(type為block),getblocks,version;
## 1、開始
每一個網絡節點,可能是挖礦節點,也可能是全節點,或者既是挖礦節點,也是全節點。
```
// Network 節點的數據結構
type Network struct {
? ? Host ? ? ? ? ? ? host.Host//主機
GeneralChannel *Channel//通用節點
? ? MiningChannel ? ?*Channel//挖礦節點
FullNodesChannel *Channel//全節點
Blockchain *blockchain.Blockchain
//Blocks和Transactions消息隊列,用于存儲新產生的Block或Transaction
//一般而言,我們在主程序執行send交易過程中,根據send參數mineNow決定是否立即挖礦,mineNow為true則存儲Block消息隊列(立即挖礦)
//mineNow為false則存儲Transaction消息隊列(不立即挖礦)。兩個消息由節點在startNode后啟動消息處理協程進行處理
//一般情況下,為提高挖礦效率,我們會匯聚幾個交易在一起,然后挖礦一次
Blocks chan *blockchain.Block//Block類型的通道(帶緩沖的通道:在StartNode函數中構建Network實例,緩沖數量200)
Transactions chan *blockchain.Transaction//Transaction類型的通道(帶緩沖的通道:在StartNode函數中構建Network實例,緩沖數量200)
//是否是挖礦節點
? Miner ? ? ? ? ? ?bool
}
```
通過startNode函數,實例化Network時候,將根據參數,對GeneralChannel、MiningChannel、FullNodesChannel全部進行實例化。
```
//GeneralChannel 通道訂閱消息
generalChannel, _ := JoinChannel(ctx, pubsub, host.ID(), GeneralChannel, true)
//如果是挖礦節點, miningChannel 訂閱消息,否則 miningChannel 不訂閱消息
subscribe := false
if miner {
subscribe = true
}
miningChannel, _ := JoinChannel(ctx, pubsub, host.ID(), MiningChannel, subscribe)
//如果是全節點, fullNodesChannel 訂閱消息,否則 fullNodesChannel 不訂閱消息
subscribe = false
if fullNode {
subscribe = true
}
? ? fullNodesChannel, _ := JoinChannel(ctx, pubsub, host.ID(), FullNodesChannel, subscribe)
```
主程序為有界面命令控制程序,三個實例將作為參數傳遞給cliui:
```
// 各通信通道建立命令行界面對象,監控來自三個通道實例的消息
ui := NewCLIUI(generalChannel, miningChannel, fullNodesChannel)
```
## 2、所有對network的操作命令,將會通過generalChannel實例(其它實例可能沒有實例化),publish到全網
對network發送的消息指令類型有:
1. block
2. inv
3. getblocks
4. getdata
5. tx
6. gettxfrompool
7. version
如:發送inv命令:
```
func (net *Network) SendInv(peerId string, _type string, items [][]byte) {
inventory := Inv{net.Host.ID().Pretty(), _type, items}
payload := GobEncode(inventory)
request := append(CmdToBytes("inv"), payload...)
? ? net.GeneralChannel.Publish("接收到 inventory 命令", request, peerId)
}
```
## 3、這些事件消息,將在cliui中開協程進行處理:
```
// HandleStream 處理來自全網發來的且可以處理的消息內容
func (ui *CLIUI) HandleStream(net *Network, content *ChannelContent) {
// ui.displayContent(content)
if content.Payload != nil {
command := BytesToCmd(content.Payload\[:commandLength\])
? ? ? ? log.Infof("Received ?%s command \\n", command)
switch command {
case "block":
? ? ? ? ? ? net.HandleBlocks(content)
case "inv":
? ? ? ? ? ? net.HandleInv(content)
case "getblocks":
? ? ? ? ? ? net.HandleGetBlocks(content)
case "getdata":
? ? ? ? ? ? net.HandleGetData(content)
case "tx":
? ? ? ? ? ? net.HandleTx(content)
case "gettxfrompool":
? ? ? ? ? ? net.HandleGetTxFromPool(content)
case "version":
? ? ? ? ? ? net.HandleVersion(content)
default:
? ? ? ? ? ? log.Warn("Unknown Command")
}
}
}
```
cliui的RUN函數,需要處理來自network的各種事件消息:
```
// Run 開啟協程處理各種事件消息
func (ui *CLIUI) Run(net *Network) error {
go ui.handleEvents(net)
defer ui.end()
return ui.app.Run()
}
```
handleEvents:
```
func (ui *CLIUI) handleEvents(net *Network) {
peerRefreshTicker := time.NewTicker(time.Second)
defer peerRefreshTicker.Stop()
go ui.readFromLogs(net.Blockchain.InstanceId)
?log.Info("HOST ADDR: ", net.Host.Addrs())
for {
select {
case input := <-ui.inputCh:
err := ui.GeneralChannel.Publish(input, nil, "")//未指定消息接收者,意味著所有節點(peer)均會收到
if err != nil {
? ? ? ? ? ? ? ? log.Errorf("Publish error: %s", err)
}
ui.displaySelfMessage(input)
case <-peerRefreshTicker.C:
// 定期刷新peers list
? ui.refreshPeers()
case m := <-ui.GeneralChannel.Content://如果 GeneralChannel 收到消息
? ? ? ? ? ? ui.HandleStream(net, m)
case m := <-ui.MiningChannel.Content://如果 MiningChannel 收到消息
? ? ? ? ? ? ui.HandleStream(net, m)
case m := <-ui.FullNodesChannel.Content://如果 FullNodesChannel 收到消息
? ? ? ? ? ? ui.HandleStream(net, m)
case <-ui.GeneralChannel.ctx.Done():
return
case <-ui.doneCh:
return
}
}
}
```
## 4、一切從Inv(block)消息開始
每次挖出新區塊,節點向全網廣播inv(資產,在區塊鏈里面inv包括block和交易池中的tx兩類,但狹義上說,只有block)消息:
```
func (net *Network) MineTx(memopoolTxs map[string]blockchain.Transaction) {
var txs []\*blockchain.Transaction
? ? log.Infof("挖礦的交易數: %d", len(memopoolTxs))
chain := net.Blockchain.ContinueBlockchain()
for id := range memopoolTxs {
? ? ? ? log.Infof("tx: %s \\n", memopoolTxs[id].ID)
tx := memopoolTxs[id\]
? ? ? ? log.Info("tx校驗: ", chain.VerifyTransaction(&tx))
if chain.VerifyTransaction(&tx) {
txs = append(txs, &tx)
}
}
if len(txs) == 0 {
? ? ? ? log.Info("無合法的交易")
}
cbTx := blockchain.MinerTx(MinerAddress, "")
txs = append(txs, cbTx)
newBlock := chain.MineBlock(txs)
UTXOs := blockchain.UTXOSet{Blockchain:chain}
? ? UTXOs.Compute()
? ? log.Info("挖出新的區塊")
** //peerId為空,SendInv發布給全網**
? ? net.SendInv("", "block", [][]byte{newBlock.Hash})
? ? memoryPool.ClearAll()//清除內存池中的全部交易
? ? memoryPool.Wg.Done()
}
```
當然,收到請求消息getblocks后,節點應向請求方發一個block類型的inv:
```
func (net *Network) HandleGetBlocks(content *ChannelContent) {
var buff bytes.Buffer
var payload GetBlocks
?buff.Write(content.Payload[commandLength:])
dec := gob.NewDecoder(&buff)
err := dec.Decode(&payload)
if err != nil {
? ? ? ? log.Panic(err)
}
chain := net.Blockchain.ContinueBlockchain()
blockHashes := chain.GetBlockHashes(payload.Height)
log.Info("LENGTH:", len(blockHashes))
net.SendInv(payload.SendFrom, "block", blockHashes)
}
```
### 4、每個network節點啟動后做什么
```
1、向全網請求區塊信息,以補全本地區塊鏈
// 每一個節點均有區塊鏈的一個完整副本
err = RequestBlocks(network)//全網發布version命令,接收到該命令的節點,要么將本地version回給它(接收者的區塊鏈height更大),要么向它發送getblocks命令請求自己沒有的區塊(接收者的區塊鏈height更小)
2、啟用協程,處理來自自身網絡節點事件**(只有啟用了rpc才會有相關事件發生)**
//本地的Blocks有了新的block加入隊列,或者本地的Transactions有了新的tx加入隊列,則將新的block或tx發布給請求者或全網(根據請求者是否為空)
go HandleEvents(network)
3、如果是礦工節點,啟用協程,不斷發送ping命令給全節點
if miner {
// 礦工事件循環,以不斷地發送一個 ping 給全節點,目的是得到新的交易,為新交易挖礦,并添加到區塊鏈
//礦工節點將定時向全網發送gettxfrompool指令,請求每個節點交易內存池(pending)的待挖礦交易
go network.MinersEventLoop()
}
if err != nil {
panic(err)
}
4、運行UI界面,將在Run函數體中啟動協程,通過三個通道,循環接收并處理全網節點publish的消息
//(包括generalChannel, miningChannel, fullNodesChannel節點)
if err = ui.Run(network); err != nil {
? ? ? ? log.Error("運行文字UI發生錯誤: %s", err)
}
```
- 重要更新說明
- linechain發布
- linechain新版設計
- 引言一
- 引言二
- 引言三
- vs-code設置及開發環境設置
- BoltDB數據庫應用
- 關于Go語言、VS-code的一些Tips
- 區塊鏈的架構
- 網絡通信與區塊鏈
- 單元測試
- 比特幣腳本語言
- 關于區塊鏈的一些概念
- 區塊鏈組件
- 區塊鏈第一版:基本原型
- 區塊鏈第二版:增加工作量證明
- 區塊鏈第三版:持久化
- 區塊鏈第四版:交易
- 區塊鏈第五版:實現錢包
- 區塊鏈第六版:實現UTXO集
- 區塊鏈第七版:網絡
- 階段小結
- 區塊鏈第八版:P2P
- P2P網絡架構
- 區塊鏈網絡層
- P2P區塊鏈最簡體驗
- libp2p建立P2P網絡的關鍵概念
- 區塊鏈結構層設計與實現
- 用戶交互層設計與實現
- 網絡層設計與實現
- 建立節點發現機制
- 向區塊鏈網絡請求區塊信息
- 向區塊鏈網絡發布消息
- 運行區塊鏈
- LineChain
- 系統運行流程
- Multihash
- 區塊鏈網絡的節點發現機制深入探討
- DHT
- Bootstrap
- 連接到所有引導節點
- Advertise
- 搜索其它peers
- 連接到搜到的其它peers
- 區塊鏈網絡的消息訂發布-訂閱機制深入探討
- LineChain:適用于智能合約編程的腳本語言支持
- LineChain:解決分叉問題
- LineChain:多重簽名
- libp2p升級到v0.22版本
- 以太坊基礎
- 重溫以太坊的樹結構
- 世界狀態樹
- (智能合約)賬戶存儲樹
- 交易樹
- 交易收據樹
- 小結
- 以太坊的存儲結構
- 以太坊狀態數據庫
- MPT
- 以太坊POW共識算法
- 智能合約存儲
- Polygon Edge
- block結構
- transaction數據結構
- 數據結構小結
- 關于本區塊鏈的一些說明
- UML工具-PlantUML
- libp2p介紹
- JSON-RPC
- docker制作:啟動多個應用系統
- Dockerfile
- docker-entrypoint.sh
- supervisord.conf
- docker run
- nginx.conf
- docker基礎操作整理
- jupyter計算交互環境
- git技巧一
- git技巧二
- 使用github項目的最佳實踐
- windows下package管理工具