Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spv: Fetch cfilters from multiple remote peers #2308

Merged
merged 2 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 90 additions & 29 deletions spv/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"runtime"
"sync"
"time"

"decred.org/dcrwallet/v4/errors"
"decred.org/dcrwallet/v4/p2p"
Expand Down Expand Up @@ -42,6 +43,19 @@ func pickForGetHeaders(tipHeight int32) func(rp *p2p.RemotePeer) bool {
}
}

// pickForGetCfilters returns a function to use in waitForRemotes which selects
// peers that should have cfilters up to the passed lastHeaderHeight.
func pickForGetCfilters(lastHeaderHeight int32) func(rp *p2p.RemotePeer) bool {
return func(rp *p2p.RemotePeer) bool {
// When performing initial sync, it could be the case that
// blocks are generated while performing the sync, therefore
// the initial advertised peer height would be lower than the
// last header height. Therefore, accept peers that are
// close, but not quite at the tip.
return rp.InitialHeight() >= lastHeaderHeight-6
}
}

// Blocks implements the Blocks method of the wallet.Peer interface.
func (s *Syncer) Blocks(ctx context.Context, blockHashes []*chainhash.Hash) ([]*wire.MsgBlock, error) {
for {
Expand Down Expand Up @@ -91,9 +105,13 @@ func (s *Syncer) CFiltersV2(ctx context.Context, blockHashes []*chainhash.Hash)
}
}

// errCfilterWatchdogTriggered is an internal error generated when a batch
// of cfilters takes too long to be fetched.
var errCfilterWatchdogTriggered = errors.New("getCFilters watchdog triggered")

// cfiltersV2FromNodes fetches cfilters for all the specified nodes from a
// remote peer.
func (s *Syncer) cfiltersV2FromNodes(ctx context.Context, rp *p2p.RemotePeer, nodes []*wallet.BlockNode) error {
func (s *Syncer) cfiltersV2FromNodes(ctx context.Context, nodes []*wallet.BlockNode) error {
if len(nodes) == 0 {
return nil
}
Expand All @@ -106,50 +124,93 @@ func (s *Syncer) cfiltersV2FromNodes(ctx context.Context, rp *p2p.RemotePeer, no
g, ctx := errgroup.WithContext(ctx)
for len(nodes) > cfilterBatchSize {
batch := nodes[:cfilterBatchSize]
g.Go(func() error { return s.cfiltersV2FromNodes(ctx, rp, batch) })
g.Go(func() error { return s.cfiltersV2FromNodes(ctx, batch) })
nodes = nodes[cfilterBatchSize:]
}
g.Go(func() error { return s.cfiltersV2FromNodes(ctx, rp, nodes) })
g.Go(func() error { return s.cfiltersV2FromNodes(ctx, nodes) })
return g.Wait()
}

nodeHashes := make([]*chainhash.Hash, len(nodes))
for i := range nodes {
nodeHashes[i] = nodes[i].Hash
}
lastHeight := nodes[len(nodes)-1].Header.Height

// Specially once we get close to the tip, we may have a header in the
// best sidechain that has been reorged out and thus no peer will have
// its corresponding CFilters. To recover from this case in a timely
// manner, we setup a special watchdog context that, if triggered, will
// make us clean up the sidechain forest, forcing a request of fresh
// headers from all remote peers.
//
// Peers have a 30s stall timeout protection, therefore a 2 minute
// watchdog interval means we'll try at least 4 different peers before
// resetting.
const watchdogTimeoutInterval = 2 * time.Minute
watchdogCtx, cancelWatchdog := context.WithTimeout(ctx, time.Minute)
defer cancelWatchdog()

nextTry:
for ctx.Err() == nil {
// Select a peer that should have these cfilters.
rp, err := s.waitForRemote(watchdogCtx, pickForGetCfilters(int32(lastHeight)), true)
if watchdogCtx.Err() != nil && ctx.Err() == nil {
// Watchdog timer triggered. Reset sidechain forest.
lastNode := nodes[len(nodes)-1]
log.Warnf("Batch of CFilters ending on block %s at "+
"height %d not received within %s. Clearing "+
"sidechain forest to retry with different "+
"headers", lastNode.Hash, lastNode.Header.Height,
watchdogTimeoutInterval)
s.sidechainMu.Lock()
s.sidechains.PruneAll()
s.sidechainMu.Unlock()
return errCfilterWatchdogTriggered
}
if err != nil {
return err
}

// TODO: Fetch using getcfsv2 if peer supports batched cfilter fetching.
startTime := time.Now()

filters, err := rp.CFiltersV2(ctx, nodeHashes)
if err != nil {
log.Tracef("Unable to fetch cfilter batch for "+
"from %v: %v", rp, err)
return err
}
// TODO: Fetch using getcfsv2 if peer supports batched cfilter fetching.

for i := range nodes {
err = validate.CFilterV2HeaderCommitment(cnet, nodes[i].Header,
filters[i].Filter, filters[i].ProofIndex, filters[i].Proof)
filters, err := rp.CFiltersV2(ctx, nodeHashes)
if err != nil {
errMsg := fmt.Sprintf("CFilter for block %v (height %d) "+
"received from %v failed validation: %v",
nodes[i].Hash, nodes[i].Header.Height,
rp, err)
log.Warnf(errMsg)
err := errors.E(errors.Protocol, errMsg)
rp.Disconnect(err)
return err
log.Tracef("Unable to fetch cfilter batch for "+
"from %v: %v", rp, err)
continue nextTry
}
}

s.sidechainMu.Lock()
for i := range nodes {
nodes[i].FilterV2 = filters[i].Filter
for i := range nodes {
err = validate.CFilterV2HeaderCommitment(cnet, nodes[i].Header,
filters[i].Filter, filters[i].ProofIndex, filters[i].Proof)
if err != nil {
errMsg := fmt.Sprintf("CFilter for block %v (height %d) "+
"received from %v failed validation: %v",
nodes[i].Hash, nodes[i].Header.Height,
rp, err)
log.Warnf(errMsg)
err := errors.E(errors.Protocol, errMsg)
rp.Disconnect(err)
continue nextTry
}
}

s.sidechainMu.Lock()
for i := range nodes {
nodes[i].FilterV2 = filters[i].Filter
}
s.sidechainMu.Unlock()
log.Tracef("Fetched %d new cfilters(s) ending at height %d "+
"from %v (request took %s)",
len(nodes), nodes[len(nodes)-1].Header.Height, rp,
time.Since(startTime).Truncate(time.Millisecond))
return nil
}
s.sidechainMu.Unlock()
log.Tracef("Fetched %d new cfilters(s) ending at height %d from %v",
len(nodes), nodes[len(nodes)-1].Header.Height, rp)
return nil

return ctx.Err()
}

// headersBatch is a batch of headers fetched during initial sync.
Expand Down
2 changes: 1 addition & 1 deletion spv/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -1352,7 +1352,7 @@ nextbatch:
s.sidechainMu.Unlock()

// Fetch Missing CFilters.
err = s.cfiltersV2FromNodes(ctx, batch.rp, missingCfilter)
err = s.cfiltersV2FromNodes(ctx, missingCfilter)
if err != nil {
log.Debugf("Unable to fetch missing cfilters from %v: %v",
batch.rp, err)
Expand Down
5 changes: 5 additions & 0 deletions wallet/sidechains.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ func (f *SidechainForest) AddBlockNode(n *BlockNode) bool {
return true
}

// PruneAll removes all sidechains.
func (f *SidechainForest) PruneAll() {
f.trees = nil
}

// Prune removes any sidechain trees which contain a root that is significantly
// behind the current main chain tip block.
func (f *SidechainForest) Prune(mainChainHeight int32, params *chaincfg.Params) {
Expand Down
Loading