死磕以太坊源码分析之区块和交易广播
死磕以太坊源码分析之区块和交易广播
> 死磕以太坊源码分析之区块和交易广播 > ## ProtocolManager详解 `ProtocolManager`,从字面上看是协议管理器,负责着`p2p`通信协议的管理。它连接了`p2p`的逻辑层`peer`与顶层`peer`之间的调用,从顶层将协议传递至逻辑层,再从逻辑层得到`message`传递到顶层。 ![image-20201202142450663](https://img.learnblockchain.cn/2021/02/22_/604279520.jpg) 1. `fastSync`规定了同步的模式 ; 2. `acceptTxs`是节点是否接受交易的阀门,只有当`pm.acceptTxs == 1`时,节点才会接受交易。这个操作只会在同步结束后再开始,即同步的时候节点是不会接受交易的; 3. `SubProtocols`中是以太坊的通讯协议,通常只有一个值,即`eth63`。 4. `downloader`是一个下载器,用于主动从远程节点中获取`hashes`和`blocks`。 5. `fetcher`则被动的收集网络其他以太坊节点发过来的同步通知,进行验证,并做出相应的处理。 ------ `ProtocolManager.Start()`启动了四条`go`程,分别是交易订阅广播协程(`txBroadcastLoop`)、挖矿订阅协程(`minedBroadcastLoop`)、节点定期同步协程(`syncer`)和交易同步协程(txsyncLoop) ![image-20201202143101376](https://img.learnblockchain.cn/2021/02/22_/786415939.jpg) 1. ==txBroadcastLoop==:广播新出现的交易对象。`txBroadcastLoop()`会在`txCh`通道的收端持续等待,一旦接收到有关新交易的事件,会立即调用`BroadcastTx()`函数广播给那些尚无该交易对象的相邻个体。 2. ==minedBroadcastLoop==:广播新挖掘出的区块。`minedBroadcastLoop()`持续等待本节点的新挖掘出区块事件,然后立即广播给需要的相邻个体。当不再订阅新挖掘区块事件时,这个函数才会结束等待并返回。 3. ==syncer==:**定时的和网络其他节点同步,并处理网络节点的相关通知**。定时与相邻个体进行区块全链的强制同步。syncer()首先启动fetcher成员,然后进入一个无限循环,每次循环中都会向相邻peer列表中“最优”的那个peer作一次区块全链同步。发起上述同步的理由分两种:如果有新登记(加入)的相邻个体,则在整个peer列表数目大于5时,发起之;如果没有新peer到达,则以10s为间隔定时的发起之。这里所谓"最优"指的是peer中所维护区块链的TotalDifficulty(td)最高,由于Td是全链中从创世块到最新头块的Difficulty值总和,所以Td值最高就意味着它的区块链是最新的,跟这样的peer作区块全链同步,显然改动量是最小的,此即"最优"。 4. ==txsyncLoop==:**把新的交易均匀的同步给网路节点**。 ----- ## 广播的情形 1. `minedBroadcastLoop()`监听到新区块事件后,把新区块和区块`hash`分别广播出去; 2. 从远程节点同步完成后,将`CurrentBlock`广播出去,此时广播的是区块`hash`; 3. `txBlockcastLoop()`监听到区块池的新增交易事件时会广播交易; ------ ## 广播区块及区块哈希 广播区块的入口在`pm.minedBroadcastLoop()`,进入到`BroadcastBlock`,这里的参数为`bool`值,如果传入的为true,则将区块block和总难度td发送给一部分节点,节点数为根号n;如果传入的为false,则将区块的hash发送给所有的节点。**需要注意的是两个广播函数都执行**。 进入到`true`分支:**代表只传播区块给一部分节点** ①:首先计算一个临时的TD ```go if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil { td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1)) } ``` ②:发送块到peers的子集 对节点数进行开方,16开方得4,然后取前4个节点。 ```GO transferLen := int(math.Sqrt(float64(len(peers)))) if transferLen < minBroadcastPeers { transferLen = minBroadcastPeers } if transferLen > len(peers) { transferLen = len(peers) } transfer := peers[:transferLen] for _, peer := range transfer { peer.AsyncSendNewBlock(block, td) // 块传播 } ``` 执行完之后直接return出去,再次执行此函数,此时不会走ture分支,直接判断判断本地是否有区块,如果有则发送区区块哈希给剩下的节点,如果没有,则不做发送哈希的操作。 > 如果本地存在这个要广播的区块(很可能就是出块节点,或者接受块的节点已经插入到区块链中),那就还要像其他没有被广播到区块的节点发送区块哈希。 ![image-20201125150810051](https://img.learnblockchain.cn/2021/02/22_/128897535.jpg) > 如果本地不存在这个要广播的区块哈希(应该是还没接收到区块或者区块哈希的节点),那它只要向它的节点列表里发送区块即可。 ![image-20201125151546035](https://img.learnblockchain.cn/2021/02/22_/504148558.jpg) -------- 接下来就是重点分析`AsyncSendNewBlock`和`AsyncSendNewBlockHash`两个函数了。 ### AsyncSendNewBlock > 发送块到需要广播的节点的广播队列中 ```GO select { case p.queuedProps <- &propEvent{block: block, td: td}: p.knownBlocks.Add(block.Hash()) for p.knownBlocks.Cardinality() >= maxKnownBlocks { p.knownBlocks.Pop() } ``` 这里的`queuedProps`是用来存放要广播的块的队列,同时,要把广播的块标记为已知,还不能超过1024(**maxKnownBlocks**)个。超过就会弹出队列第一个`propEvent()` 。接下来就是处理队列中的块了。 在`eth/peer.go`中,有个专门处理广播的循环`brodcast` ```go func (p *peer) broadcast() { for { select { case txs := <-p.queuedTxs: if err := p.SendTransactions(txs); err != nil { return } p.Log().Trace("Broadcast transactions", "count", len(txs)) case prop := <-p.queuedProps: if err := p.SendNewBlock(prop.block, prop.td); err != nil { return } p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td) case block := <-p.queuedAnns: if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil { return } p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash()) case <-p.term: return } } } ``` > 广播新块到远程节点 ```go p.SendNewBlock(prop.block, prop.td); ``` 远程节点收到块后同样也会标记哈希存入队列,并且不会超过最大,同时发送一个`NewBlockMsg`,`msgcode`为`0x07`,同时数据会被RLP编码。 ```go p.knownBlocks.Add(block.Hash()) for p.knownBlocks.Cardinality() >= maxKnownBlocks { p.knownBlocks.Pop() } return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td}) ``` ```go func Send(w MsgWriter, msgcode uint64, data interface{}) error { size, r, err := rlp.EncodeToReader(data) if err != nil { return err } return w.WriteMsg(Msg{Code: msgcode, Size: uint32(size), Payload: r}) } ``` 到此广播区块的过程结束,交由远程节点去处理`NewBlockMsg`消息。 ------ ### AsyncSendNewBlockHash 广播哈希的过程跟广播区块的过程非常的类似,最终是由远程节点去处理`NewBlockHashesMsg`消息。 广播区块的过程完毕之后,会直接进入下一个阶段,调用`fetcher`模块去同步这些广播的区块,接下的文章会讲到。 --------- ## 广播交易 广播交易的入口在`pm.txBroadcastLoop()`,直接进入到`pm.BroadcastTxs(event.Txs)`,大概做了以下几件事: ①:将交易广播给一批没有这个交易的节点 ```go for _, tx := range txs { peers := pm.peers.PeersWithoutTx(tx.Hash()) for _, peer := range peers { txset[peer] = append(txset[peer], tx) } log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers)) } ``` ②:异步发送交易给这些节点 ```go for peer, txs := range txset { peer.AsyncSendTransactions(txs) } ``` 接着进入到`AsyncSendTransactions`: 将所有交易标记为已知交易,同时还要保证没有超过最大的已知交易(32768笔) ```go case p.queuedTxs <- txs: for _, tx := range txs { p.knownTxs.Add(tx.Hash()) } for p.knownTxs.Cardinality() >= maxKnownTxs { p.knownTxs.Pop() } ``` ```go case txs := <-p.queuedTxs: if err := p.SendTransactions(txs); err != nil { return } ``` ```go func (p *peer) SendTransactions(txs types.Transactions) error { ... return p2p.Send(p.rw, TxMsg, txs) } ``` 发送交易最终会发送一个`TxMsg`消息,接收到这个消息的节点会通过`pm.txpool.AddRemotes(txs)`处理交易。 ------- ## 消息处理(handleMsg) `handleMsg`从对方连接中读取消息,根据消息码的不同进行处理,从而将广播和同步之间来回的消息进行处理。 ```go func (pm *ProtocolManager) handleMsg(p *peer) error { msg, err := p.rw.ReadMsg() if err != nil { return err } if msg.Size > ProtocolMaxMsgSize { return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) } defer msg.Discard() switch { case msg.Code == StatusMsg: ...... case msg.Code == GetBlockHeadersMsg: ...... case msg.Code == BlockHeadersMsg: ...... case msg.Code == GetBlockBodiesMsg: ...... case msg.Code == BlockBodiesMsg: ...... case p.version >= eth63 && msg.Code == GetNodeDataMsg: ...... case p.version >= eth63 && msg.Code == NodeDataMsg: ...... case p.version >= eth63 && msg.Code == GetReceiptsMsg: ...... case p.version >= eth63 && msg.Code == ReceiptsMsg: ...... case msg.Code == NewBlockHashesMsg: ...... case msg.Code == NewBlockMsg: ...... case msg.Code == TxMsg: ...... default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) } return nil } ``` ----- ## 参考 > https://mindcarver.cn 最新发布 > https://github.com/blockchainGuide (文章及学习资料,给个star哦) >公众号:区块链技术栈
死磕以太坊源码分析之区块和交易广播
ProtocolManager详解
ProtocolManager
,从字面上看是协议管理器,负责着p2p
通信协议的管理。它连接了p2p
的逻辑层peer
与顶层peer
之间的调用,从顶层将协议传递至逻辑层,再从逻辑层得到message
传递到顶层。
fastSync
规定了同步的模式 ;acceptTxs
是节点是否接受交易的阀门,只有当pm.acceptTxs == 1
时,节点才会接受交易。这个操作只会在同步结束后再开始,即同步的时候节点是不会接受交易的;SubProtocols
中是以太坊的通讯协议,通常只有一个值,即eth63
。downloader
是一个下载器,用于主动从远程节点中获取hashes
和blocks
。fetcher
则被动的收集网络其他以太坊节点发过来的同步通知,进行验证,并做出相应的处理。
ProtocolManager.Start()
启动了四条go
程,分别是交易订阅广播协程(txBroadcastLoop
)、挖矿订阅协程(minedBroadcastLoop
)、节点定期同步协程(syncer
)和交易同步协程(txsyncLoop)
- ==txBroadcastLoop==:广播新出现的交易对象。
txBroadcastLoop()
会在txCh
通道的收端持续等待,一旦接收到有关新交易的事件,会立即调用BroadcastTx()
函数广播给那些尚无该交易对象的相邻个体。 - ==minedBroadcastLoop==:广播新挖掘出的区块。
minedBroadcastLoop()
持续等待本节点的新挖掘出区块事件,然后立即广播给需要的相邻个体。当不再订阅新挖掘区块事件时,这个函数才会结束等待并返回。 - ==syncer==:定时的和网络其他节点同步,并处理网络节点的相关通知。定时与相邻个体进行区块全链的强制同步。syncer()首先启动fetcher成员,然后进入一个无限循环,每次循环中都会向相邻peer列表中“最优”的那个peer作一次区块全链同步。发起上述同步的理由分两种:如果有新登记(加入)的相邻个体,则在整个peer列表数目大于5时,发起之;如果没有新peer到达,则以10s为间隔定时的发起之。这里所谓"最优"指的是peer中所维护区块链的TotalDifficulty(td)最高,由于Td是全链中从创世块到最新头块的Difficulty值总和,所以Td值最高就意味着它的区块链是最新的,跟这样的peer作区块全链同步,显然改动量是最小的,此即"最优"。
- ==txsyncLoop==:把新的交易均匀的同步给网路节点。
广播的情形
minedBroadcastLoop()
监听到新区块事件后,把新区块和区块hash
分别广播出去;- 从远程节点同步完成后,将
CurrentBlock
广播出去,此时广播的是区块hash
; txBlockcastLoop()
监听到区块池的新增交易事件时会广播交易;
广播区块及区块哈希
广播区块的入口在pm.minedBroadcastLoop()
,进入到BroadcastBlock
,这里的参数为bool
值,如果传入的为true,则将区块block和总难度td发送给一部分节点,节点数为根号n;如果传入的为false,则将区块的hash发送给所有的节点。需要注意的是两个广播函数都执行。
进入到true
分支:代表只传播区块给一部分节点
①:首先计算一个临时的TD
if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))
}
②:发送块到peers的子集
对节点数进行开方,16开方得4,然后取前4个节点。
transferLen := int(math.Sqrt(float64(len(peers))))
if transferLen < minBroadcastPeers {
transferLen = minBroadcastPeers
}
if transferLen > len(peers) {
transferLen = len(peers)
}
transfer := peers[:transferLen]
for _, peer := range transfer {
peer.AsyncSendNewBlock(block, td) // 块传播
}
执行完之后直接return出去,再次执行此函数,此时不会走ture分支,直接判断判断本地是否有区块,如果有则发送区区块哈希给剩下的节点,如果没有,则不做发送哈希的操作。
如果本地存在这个要广播的区块(很可能就是出块节点,或者接受块的节点已经插入到区块链中),那就还要像其他没有被广播到区块的节点发送区块哈希。
如果本地不存在这个要广播的区块哈希(应该是还没接收到区块或者区块哈希的节点),那它只要向它的节点列表里发送区块即可。
接下来就是重点分析AsyncSendNewBlock
和AsyncSendNewBlockHash
两个函数了。
AsyncSendNewBlock
发送块到需要广播的节点的广播队列中
select {
case p.queuedProps <- &propEvent{block: block, td: td}:
p.knownBlocks.Add(block.Hash())
for p.knownBlocks.Cardinality() >= maxKnownBlocks {
p.knownBlocks.Pop()
}
这里的queuedProps
是用来存放要广播的块的队列,同时,要把广播的块标记为已知,还不能超过1024(maxKnownBlocks)个。超过就会弹出队列第一个propEvent()
。接下来就是处理队列中的块了。
在eth/peer.go
中,有个专门处理广播的循环brodcast
func (p *peer) broadcast() {
for {
select {
case txs := <-p.queuedTxs:
if err := p.SendTransactions(txs); err != nil {
return
}
p.Log().Trace("Broadcast transactions", "count", len(txs))
case prop := <-p.queuedProps:
if err := p.SendNewBlock(prop.block, prop.td); err != nil {
return
}
p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td)
case block := <-p.queuedAnns:
if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil {
return
}
p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash())
case <-p.term:
return
}
}
}
广播新块到远程节点
p.SendNewBlock(prop.block, prop.td);
远程节点收到块后同样也会标记哈希存入队列,并且不会超过最大,同时发送一个NewBlockMsg
,msgcode
为0x07
,同时数据会被RLP编码。
p.knownBlocks.Add(block.Hash())
for p.knownBlocks.Cardinality() >= maxKnownBlocks {
p.knownBlocks.Pop()
}
return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td})
func Send(w MsgWriter, msgcode uint64, data interface{}) error {
size, r, err := rlp.EncodeToReader(data)
if err != nil {
return err
}
return w.WriteMsg(Msg{Code: msgcode, Size: uint32(size), Payload: r})
}
到此广播区块的过程结束,交由远程节点去处理NewBlockMsg
消息。
AsyncSendNewBlockHash
广播哈希的过程跟广播区块的过程非常的类似,最终是由远程节点去处理NewBlockHashesMsg
消息。
广播区块的过程完毕之后,会直接进入下一个阶段,调用fetcher
模块去同步这些广播的区块,接下的文章会讲到。
广播交易
广播交易的入口在pm.txBroadcastLoop()
,直接进入到pm.BroadcastTxs(event.Txs)
,大概做了以下几件事:
①:将交易广播给一批没有这个交易的节点
for _, tx := range txs {
peers := pm.peers.PeersWithoutTx(tx.Hash())
for _, peer := range peers {
txset[peer] = append(txset[peer], tx)
}
log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers))
}
②:异步发送交易给这些节点
for peer, txs := range txset {
peer.AsyncSendTransactions(txs)
}
接着进入到AsyncSendTransactions
:
将所有交易标记为已知交易,同时还要保证没有超过最大的已知交易(32768笔)
case p.queuedTxs <- txs:
for _, tx := range txs {
p.knownTxs.Add(tx.Hash())
}
for p.knownTxs.Cardinality() >= maxKnownTxs {
p.knownTxs.Pop()
}
case txs := <-p.queuedTxs:
if err := p.SendTransactions(txs); err != nil {
return
}
func (p *peer) SendTransactions(txs types.Transactions) error {
...
return p2p.Send(p.rw, TxMsg, txs)
}
发送交易最终会发送一个TxMsg
消息,接收到这个消息的节点会通过pm.txpool.AddRemotes(txs)
处理交易。
消息处理(handleMsg)
handleMsg
从对方连接中读取消息,根据消息码的不同进行处理,从而将广播和同步之间来回的消息进行处理。
func (pm *ProtocolManager) handleMsg(p *peer) error {
msg, err := p.rw.ReadMsg()
if err != nil {
return err
}
if msg.Size > ProtocolMaxMsgSize {
return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
}
defer msg.Discard()
switch {
case msg.Code == StatusMsg: ......
case msg.Code == GetBlockHeadersMsg: ......
case msg.Code == BlockHeadersMsg: ......
case msg.Code == GetBlockBodiesMsg: ......
case msg.Code == BlockBodiesMsg: ......
case p.version >= eth63 && msg.Code == GetNodeDataMsg: ......
case p.version >= eth63 && msg.Code == NodeDataMsg: ......
case p.version >= eth63 && msg.Code == GetReceiptsMsg: ......
case p.version >= eth63 && msg.Code == ReceiptsMsg: ......
case msg.Code == NewBlockHashesMsg: ......
case msg.Code == NewBlockMsg: ......
case msg.Code == TxMsg: ......
default: return errResp(ErrInvalidMsgCode, "%v", msg.Code)
}
return nil
}
参考
https://mindcarver.cn 最新发布 https://github.com/blockchainGuide (文章及学习资料,给个star哦) 公众号:区块链技术栈
区块链技术网。
- 发表于 2021-02-15 10:56
- 阅读 ( 760 )
- 学分 ( 5 )
- 分类:以太坊
评论