From d2e603be0a8a08c4ab90af32f0512e2c501c541d Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Tue, 10 Sep 2024 14:35:37 -0500 Subject: [PATCH] netsync: Track peer for requested blocks. This modifies the map that tracks requests for blocks to keep track of which peer the block was requested from. This is currently not used since blocks are only downloaded from a single sync peer, but it helps pave the way to handling downloading blocks from multiple peers in parallel. --- internal/netsync/manager.go | 40 ++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index 3f0911668..2b3504101 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -338,7 +338,7 @@ type SyncManager struct { rejectedTxns *apbf.Filter rejectedMixMsgs *apbf.Filter requestedTxns map[chainhash.Hash]struct{} - requestedBlocks map[chainhash.Hash]struct{} + requestedBlocks map[chainhash.Hash]*Peer requestedMixMsgs map[chainhash.Hash]struct{} progressLogger *progresslog.Logger syncPeer *Peer @@ -424,6 +424,16 @@ func (m *SyncManager) maybeUpdateNextNeededBlocks() { } } +// isRequestedBlock returns whether or not the given block hash has already been +// requested from any remote peer. +// +// This function is NOT safe for concurrent access. It must be called from the +// event handler goroutine. +func (m *SyncManager) isRequestedBlock(hash *chainhash.Hash) bool { + _, ok := m.requestedBlocks[*hash] + return ok +} + // fetchNextBlocks creates and sends a request to the provided peer for the next // blocks to be downloaded based on the current headers. func (m *SyncManager) fetchNextBlocks(peer *Peer) { @@ -458,12 +468,12 @@ func (m *SyncManager) fetchNextBlocks(peer *Peer) { // Skip blocks that have already been requested. The needed blocks // might have been updated above thereby potentially repopulating some // blocks that are still in flight. - if _, ok := m.requestedBlocks[*hash]; ok { + if m.isRequestedBlock(hash) { continue } iv := wire.NewInvVect(wire.InvTypeBlock, hash) - m.requestedBlocks[*hash] = struct{}{} + m.requestedBlocks[*hash] = peer peer.requestedBlocks[*hash] = struct{}{} gdmsg.AddInvVect(iv) } @@ -1454,14 +1464,19 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) { // Skip the block when it has already been requested or is otherwise // already known. hash := &headerHashes[i] - _, isRequestedBlock := m.requestedBlocks[*hash] - if isRequestedBlock || chain.HaveBlock(hash) { + if m.isRequestedBlock(hash) || chain.HaveBlock(hash) { continue } + // Stop requesting when the request would exceed the max size of the + // map used to track requests. + if len(m.requestedBlocks)+1 > maxRequestedBlocks { + break + } + + m.requestedBlocks[*hash] = peer + peer.requestedBlocks[*hash] = struct{}{} iv := wire.NewInvVect(wire.InvTypeBlock, hash) - limitAdd(m.requestedBlocks, *hash, maxRequestedBlocks) - limitAdd(peer.requestedBlocks, *hash, maxRequestedBlocks) gdmsg.AddInvVect(iv) } if len(gdmsg.InvList) > 0 { @@ -1920,12 +1935,9 @@ func (m *SyncManager) requestFromPeer(peer *Peer, blocks, voteHashes, // Add the blocks to the request. msgResp := wire.NewMsgGetData() for i := range blocks { - // If we've already requested this block, skip it. + // Skip the block when it has already been requested. bh := &blocks[i] - _, alreadyReqP := peer.requestedBlocks[*bh] - _, alreadyReqB := m.requestedBlocks[*bh] - - if alreadyReqP || alreadyReqB { + if m.isRequestedBlock(bh) { continue } @@ -1942,7 +1954,7 @@ func (m *SyncManager) requestFromPeer(peer *Peer, blocks, voteHashes, } peer.requestedBlocks[*bh] = struct{}{} - m.requestedBlocks[*bh] = struct{}{} + m.requestedBlocks[*bh] = peer } addTxsToRequest := func(txs []chainhash.Hash, txType stake.TxType) error { @@ -2169,7 +2181,7 @@ func New(config *Config) *SyncManager { rejectedTxns: apbf.NewFilter(maxRejectedTxns, rejectedTxnsFPRate), rejectedMixMsgs: apbf.NewFilter(maxRejectedMixMsgs, rejectedMixMsgsFPRate), requestedTxns: make(map[chainhash.Hash]struct{}), - requestedBlocks: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]*Peer), requestedMixMsgs: make(map[chainhash.Hash]struct{}), peers: make(map[*Peer]struct{}), minKnownWork: minKnownWork,