Skip to content

Commit

Permalink
netsync: Track peer for requested blocks.
Browse files Browse the repository at this point in the history
This modifies the map that tracks requests for blocks to keep track of
which peer the block was requested from.  This is currently not used
since blocks are only downloaded from a single sync peer, but it helps
pave the way to handling downloading blocks from multiple peers in
parallel.
  • Loading branch information
davecgh committed Sep 11, 2024
1 parent 01dfc85 commit d2e603b
Showing 1 changed file with 26 additions and 14 deletions.
40 changes: 26 additions & 14 deletions internal/netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ type SyncManager struct {
rejectedTxns *apbf.Filter
rejectedMixMsgs *apbf.Filter
requestedTxns map[chainhash.Hash]struct{}
requestedBlocks map[chainhash.Hash]struct{}
requestedBlocks map[chainhash.Hash]*Peer
requestedMixMsgs map[chainhash.Hash]struct{}
progressLogger *progresslog.Logger
syncPeer *Peer
Expand Down Expand Up @@ -424,6 +424,16 @@ func (m *SyncManager) maybeUpdateNextNeededBlocks() {
}
}

// isRequestedBlock returns whether or not the given block hash has already been
// requested from any remote peer.
//
// This function is NOT safe for concurrent access. It must be called from the
// event handler goroutine.
func (m *SyncManager) isRequestedBlock(hash *chainhash.Hash) bool {
_, ok := m.requestedBlocks[*hash]
return ok
}

// fetchNextBlocks creates and sends a request to the provided peer for the next
// blocks to be downloaded based on the current headers.
func (m *SyncManager) fetchNextBlocks(peer *Peer) {
Expand Down Expand Up @@ -458,12 +468,12 @@ func (m *SyncManager) fetchNextBlocks(peer *Peer) {
// Skip blocks that have already been requested. The needed blocks
// might have been updated above thereby potentially repopulating some
// blocks that are still in flight.
if _, ok := m.requestedBlocks[*hash]; ok {
if m.isRequestedBlock(hash) {
continue
}

iv := wire.NewInvVect(wire.InvTypeBlock, hash)
m.requestedBlocks[*hash] = struct{}{}
m.requestedBlocks[*hash] = peer
peer.requestedBlocks[*hash] = struct{}{}
gdmsg.AddInvVect(iv)
}
Expand Down Expand Up @@ -1454,14 +1464,19 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
// Skip the block when it has already been requested or is otherwise
// already known.
hash := &headerHashes[i]
_, isRequestedBlock := m.requestedBlocks[*hash]
if isRequestedBlock || chain.HaveBlock(hash) {
if m.isRequestedBlock(hash) || chain.HaveBlock(hash) {
continue
}

// Stop requesting when the request would exceed the max size of the
// map used to track requests.
if len(m.requestedBlocks)+1 > maxRequestedBlocks {
break
}

m.requestedBlocks[*hash] = peer
peer.requestedBlocks[*hash] = struct{}{}
iv := wire.NewInvVect(wire.InvTypeBlock, hash)
limitAdd(m.requestedBlocks, *hash, maxRequestedBlocks)
limitAdd(peer.requestedBlocks, *hash, maxRequestedBlocks)
gdmsg.AddInvVect(iv)
}
if len(gdmsg.InvList) > 0 {
Expand Down Expand Up @@ -1920,12 +1935,9 @@ func (m *SyncManager) requestFromPeer(peer *Peer, blocks, voteHashes,
// Add the blocks to the request.
msgResp := wire.NewMsgGetData()
for i := range blocks {
// If we've already requested this block, skip it.
// Skip the block when it has already been requested.
bh := &blocks[i]
_, alreadyReqP := peer.requestedBlocks[*bh]
_, alreadyReqB := m.requestedBlocks[*bh]

if alreadyReqP || alreadyReqB {
if m.isRequestedBlock(bh) {
continue
}

Expand All @@ -1942,7 +1954,7 @@ func (m *SyncManager) requestFromPeer(peer *Peer, blocks, voteHashes,
}

peer.requestedBlocks[*bh] = struct{}{}
m.requestedBlocks[*bh] = struct{}{}
m.requestedBlocks[*bh] = peer
}

addTxsToRequest := func(txs []chainhash.Hash, txType stake.TxType) error {
Expand Down Expand Up @@ -2169,7 +2181,7 @@ func New(config *Config) *SyncManager {
rejectedTxns: apbf.NewFilter(maxRejectedTxns, rejectedTxnsFPRate),
rejectedMixMsgs: apbf.NewFilter(maxRejectedMixMsgs, rejectedMixMsgsFPRate),
requestedTxns: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]*Peer),
requestedMixMsgs: make(map[chainhash.Hash]struct{}),
peers: make(map[*Peer]struct{}),
minKnownWork: minKnownWork,
Expand Down

0 comments on commit d2e603b

Please sign in to comment.