From adba0d1c45daefea0ab16883c623a669a01af59c Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Wed, 29 Nov 2023 07:25:27 -0300 Subject: [PATCH 1/2] spv: Fetch CFilters from multiple peers This modifies the initial cfilter fetching stage to use multiple peers and to keep trying to fetch the cfilters even when the peer from which headers were received disconnects. --- spv/backend.go | 83 ++++++++++++++++++++++++++++++++------------------ spv/sync.go | 2 +- 2 files changed, 55 insertions(+), 30 deletions(-) diff --git a/spv/backend.go b/spv/backend.go index 56fe43518..e3e88d55b 100644 --- a/spv/backend.go +++ b/spv/backend.go @@ -42,6 +42,19 @@ func pickForGetHeaders(tipHeight int32) func(rp *p2p.RemotePeer) bool { } } +// pickForGetCfilters returns a function to use in waitForRemotes which selects +// peers that should have cfilters up to the passed lastHeaderHeight. +func pickForGetCfilters(lastHeaderHeight int32) func(rp *p2p.RemotePeer) bool { + return func(rp *p2p.RemotePeer) bool { + // When performing initial sync, it could be the case that + // blocks are generated while performing the sync, therefore + // the initial advertised peer height would be lower than the + // last header height. Therefore, accept peers that are + // close, but not quite at the tip. + return rp.InitialHeight() >= lastHeaderHeight-6 + } +} + // Blocks implements the Blocks method of the wallet.Peer interface. func (s *Syncer) Blocks(ctx context.Context, blockHashes []*chainhash.Hash) ([]*wire.MsgBlock, error) { for { @@ -93,7 +106,7 @@ func (s *Syncer) CFiltersV2(ctx context.Context, blockHashes []*chainhash.Hash) // cfiltersV2FromNodes fetches cfilters for all the specified nodes from a // remote peer. -func (s *Syncer) cfiltersV2FromNodes(ctx context.Context, rp *p2p.RemotePeer, nodes []*wallet.BlockNode) error { +func (s *Syncer) cfiltersV2FromNodes(ctx context.Context, nodes []*wallet.BlockNode) error { if len(nodes) == 0 { return nil } @@ -106,10 +119,10 @@ func (s *Syncer) cfiltersV2FromNodes(ctx context.Context, rp *p2p.RemotePeer, no g, ctx := errgroup.WithContext(ctx) for len(nodes) > cfilterBatchSize { batch := nodes[:cfilterBatchSize] - g.Go(func() error { return s.cfiltersV2FromNodes(ctx, rp, batch) }) + g.Go(func() error { return s.cfiltersV2FromNodes(ctx, batch) }) nodes = nodes[cfilterBatchSize:] } - g.Go(func() error { return s.cfiltersV2FromNodes(ctx, rp, nodes) }) + g.Go(func() error { return s.cfiltersV2FromNodes(ctx, nodes) }) return g.Wait() } @@ -117,39 +130,51 @@ func (s *Syncer) cfiltersV2FromNodes(ctx context.Context, rp *p2p.RemotePeer, no for i := range nodes { nodeHashes[i] = nodes[i].Hash } + lastHeight := nodes[len(nodes)-1].Header.Height - // TODO: Fetch using getcfsv2 if peer supports batched cfilter fetching. +nextTry: + for ctx.Err() == nil { + // Select a peer that should have these cfilters. + rp, err := s.waitForRemote(ctx, pickForGetCfilters(int32(lastHeight)), true) + if err != nil { + return err + } - filters, err := rp.CFiltersV2(ctx, nodeHashes) - if err != nil { - log.Tracef("Unable to fetch cfilter batch for "+ - "from %v: %v", rp, err) - return err - } + // TODO: Fetch using getcfsv2 if peer supports batched cfilter fetching. - for i := range nodes { - err = validate.CFilterV2HeaderCommitment(cnet, nodes[i].Header, - filters[i].Filter, filters[i].ProofIndex, filters[i].Proof) + filters, err := rp.CFiltersV2(ctx, nodeHashes) if err != nil { - errMsg := fmt.Sprintf("CFilter for block %v (height %d) "+ - "received from %v failed validation: %v", - nodes[i].Hash, nodes[i].Header.Height, - rp, err) - log.Warnf(errMsg) - err := errors.E(errors.Protocol, errMsg) - rp.Disconnect(err) - return err + log.Tracef("Unable to fetch cfilter batch for "+ + "from %v: %v", rp, err) + continue nextTry } - } - s.sidechainMu.Lock() - for i := range nodes { - nodes[i].FilterV2 = filters[i].Filter + for i := range nodes { + err = validate.CFilterV2HeaderCommitment(cnet, nodes[i].Header, + filters[i].Filter, filters[i].ProofIndex, filters[i].Proof) + if err != nil { + errMsg := fmt.Sprintf("CFilter for block %v (height %d) "+ + "received from %v failed validation: %v", + nodes[i].Hash, nodes[i].Header.Height, + rp, err) + log.Warnf(errMsg) + err := errors.E(errors.Protocol, errMsg) + rp.Disconnect(err) + continue nextTry + } + } + + s.sidechainMu.Lock() + for i := range nodes { + nodes[i].FilterV2 = filters[i].Filter + } + s.sidechainMu.Unlock() + log.Tracef("Fetched %d new cfilters(s) ending at height %d from %v", + len(nodes), nodes[len(nodes)-1].Header.Height, rp) + return nil } - s.sidechainMu.Unlock() - log.Tracef("Fetched %d new cfilters(s) ending at height %d from %v", - len(nodes), nodes[len(nodes)-1].Header.Height, rp) - return nil + + return ctx.Err() } // headersBatch is a batch of headers fetched during initial sync. diff --git a/spv/sync.go b/spv/sync.go index f6167b3b4..dfcc869fb 100644 --- a/spv/sync.go +++ b/spv/sync.go @@ -1352,7 +1352,7 @@ nextbatch: s.sidechainMu.Unlock() // Fetch Missing CFilters. - err = s.cfiltersV2FromNodes(ctx, batch.rp, missingCfilter) + err = s.cfiltersV2FromNodes(ctx, missingCfilter) if err != nil { log.Debugf("Unable to fetch missing cfilters from %v: %v", batch.rp, err) From 66a3e69d0437578628c5b209956912dc78ad423c Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Wed, 29 Nov 2023 09:35:41 -0300 Subject: [PATCH 2/2] spv: Add watchdog timer to cfilter fetching This commits adds a watchdog timer to the initial cfilter fetching, triggered when a batch takes too long to be successfully fetched. In some situations, specially close to the tip, the sidechain forest may be populated with one (or more) headers for which no cfilter is available in the network. In this situation, the syncer could end up in a state where it does not advance due to waiting for cfilters that will never be sent by any peers. To alleviate this issue, a watchdog timer is started for every batch of cfilters. If that timer is triggered, then the entire sidechain forest is pruned, ensuring the next batch of headers that will be fetched will be using fresh block locators and that the best chain selected for fetching cfilters will be for these fresh headers. The timer interval is set for 2 minutes, which given the stall timer of 30 seconds per peer means at least 4 peers will be searched for the cfilters, before giving up and resetting the forest. --- spv/backend.go | 42 +++++++++++++++++++++++++++++++++++++++--- wallet/sidechains.go | 5 +++++ 2 files changed, 44 insertions(+), 3 deletions(-) diff --git a/spv/backend.go b/spv/backend.go index e3e88d55b..8ffd3d8a7 100644 --- a/spv/backend.go +++ b/spv/backend.go @@ -9,6 +9,7 @@ import ( "fmt" "runtime" "sync" + "time" "decred.org/dcrwallet/v4/errors" "decred.org/dcrwallet/v4/p2p" @@ -104,6 +105,10 @@ func (s *Syncer) CFiltersV2(ctx context.Context, blockHashes []*chainhash.Hash) } } +// errCfilterWatchdogTriggered is an internal error generated when a batch +// of cfilters takes too long to be fetched. +var errCfilterWatchdogTriggered = errors.New("getCFilters watchdog triggered") + // cfiltersV2FromNodes fetches cfilters for all the specified nodes from a // remote peer. func (s *Syncer) cfiltersV2FromNodes(ctx context.Context, nodes []*wallet.BlockNode) error { @@ -132,14 +137,43 @@ func (s *Syncer) cfiltersV2FromNodes(ctx context.Context, nodes []*wallet.BlockN } lastHeight := nodes[len(nodes)-1].Header.Height + // Specially once we get close to the tip, we may have a header in the + // best sidechain that has been reorged out and thus no peer will have + // its corresponding CFilters. To recover from this case in a timely + // manner, we setup a special watchdog context that, if triggered, will + // make us clean up the sidechain forest, forcing a request of fresh + // headers from all remote peers. + // + // Peers have a 30s stall timeout protection, therefore a 2 minute + // watchdog interval means we'll try at least 4 different peers before + // resetting. + const watchdogTimeoutInterval = 2 * time.Minute + watchdogCtx, cancelWatchdog := context.WithTimeout(ctx, time.Minute) + defer cancelWatchdog() + nextTry: for ctx.Err() == nil { // Select a peer that should have these cfilters. - rp, err := s.waitForRemote(ctx, pickForGetCfilters(int32(lastHeight)), true) + rp, err := s.waitForRemote(watchdogCtx, pickForGetCfilters(int32(lastHeight)), true) + if watchdogCtx.Err() != nil && ctx.Err() == nil { + // Watchdog timer triggered. Reset sidechain forest. + lastNode := nodes[len(nodes)-1] + log.Warnf("Batch of CFilters ending on block %s at "+ + "height %d not received within %s. Clearing "+ + "sidechain forest to retry with different "+ + "headers", lastNode.Hash, lastNode.Header.Height, + watchdogTimeoutInterval) + s.sidechainMu.Lock() + s.sidechains.PruneAll() + s.sidechainMu.Unlock() + return errCfilterWatchdogTriggered + } if err != nil { return err } + startTime := time.Now() + // TODO: Fetch using getcfsv2 if peer supports batched cfilter fetching. filters, err := rp.CFiltersV2(ctx, nodeHashes) @@ -169,8 +203,10 @@ nextTry: nodes[i].FilterV2 = filters[i].Filter } s.sidechainMu.Unlock() - log.Tracef("Fetched %d new cfilters(s) ending at height %d from %v", - len(nodes), nodes[len(nodes)-1].Header.Height, rp) + log.Tracef("Fetched %d new cfilters(s) ending at height %d "+ + "from %v (request took %s)", + len(nodes), nodes[len(nodes)-1].Header.Height, rp, + time.Since(startTime).Truncate(time.Millisecond)) return nil } diff --git a/wallet/sidechains.go b/wallet/sidechains.go index 70c6c78ca..07d563785 100644 --- a/wallet/sidechains.go +++ b/wallet/sidechains.go @@ -244,6 +244,11 @@ func (f *SidechainForest) AddBlockNode(n *BlockNode) bool { return true } +// PruneAll removes all sidechains. +func (f *SidechainForest) PruneAll() { + f.trees = nil +} + // Prune removes any sidechain trees which contain a root that is significantly // behind the current main chain tip block. func (f *SidechainForest) Prune(mainChainHeight int32, params *chaincfg.Params) {