diff --git a/spv/backend.go b/spv/backend.go index 56fe43518..8ffd3d8a7 100644 --- a/spv/backend.go +++ b/spv/backend.go @@ -9,6 +9,7 @@ import ( "fmt" "runtime" "sync" + "time" "decred.org/dcrwallet/v4/errors" "decred.org/dcrwallet/v4/p2p" @@ -42,6 +43,19 @@ func pickForGetHeaders(tipHeight int32) func(rp *p2p.RemotePeer) bool { } } +// pickForGetCfilters returns a function to use in waitForRemotes which selects +// peers that should have cfilters up to the passed lastHeaderHeight. +func pickForGetCfilters(lastHeaderHeight int32) func(rp *p2p.RemotePeer) bool { + return func(rp *p2p.RemotePeer) bool { + // When performing initial sync, it could be the case that + // blocks are generated while performing the sync, therefore + // the initial advertised peer height would be lower than the + // last header height. Therefore, accept peers that are + // close, but not quite at the tip. + return rp.InitialHeight() >= lastHeaderHeight-6 + } +} + // Blocks implements the Blocks method of the wallet.Peer interface. func (s *Syncer) Blocks(ctx context.Context, blockHashes []*chainhash.Hash) ([]*wire.MsgBlock, error) { for { @@ -91,9 +105,13 @@ func (s *Syncer) CFiltersV2(ctx context.Context, blockHashes []*chainhash.Hash) } } +// errCfilterWatchdogTriggered is an internal error generated when a batch +// of cfilters takes too long to be fetched. +var errCfilterWatchdogTriggered = errors.New("getCFilters watchdog triggered") + // cfiltersV2FromNodes fetches cfilters for all the specified nodes from a // remote peer. -func (s *Syncer) cfiltersV2FromNodes(ctx context.Context, rp *p2p.RemotePeer, nodes []*wallet.BlockNode) error { +func (s *Syncer) cfiltersV2FromNodes(ctx context.Context, nodes []*wallet.BlockNode) error { if len(nodes) == 0 { return nil } @@ -106,10 +124,10 @@ func (s *Syncer) cfiltersV2FromNodes(ctx context.Context, rp *p2p.RemotePeer, no g, ctx := errgroup.WithContext(ctx) for len(nodes) > cfilterBatchSize { batch := nodes[:cfilterBatchSize] - g.Go(func() error { return s.cfiltersV2FromNodes(ctx, rp, batch) }) + g.Go(func() error { return s.cfiltersV2FromNodes(ctx, batch) }) nodes = nodes[cfilterBatchSize:] } - g.Go(func() error { return s.cfiltersV2FromNodes(ctx, rp, nodes) }) + g.Go(func() error { return s.cfiltersV2FromNodes(ctx, nodes) }) return g.Wait() } @@ -117,39 +135,82 @@ func (s *Syncer) cfiltersV2FromNodes(ctx context.Context, rp *p2p.RemotePeer, no for i := range nodes { nodeHashes[i] = nodes[i].Hash } + lastHeight := nodes[len(nodes)-1].Header.Height + + // Specially once we get close to the tip, we may have a header in the + // best sidechain that has been reorged out and thus no peer will have + // its corresponding CFilters. To recover from this case in a timely + // manner, we setup a special watchdog context that, if triggered, will + // make us clean up the sidechain forest, forcing a request of fresh + // headers from all remote peers. + // + // Peers have a 30s stall timeout protection, therefore a 2 minute + // watchdog interval means we'll try at least 4 different peers before + // resetting. + const watchdogTimeoutInterval = 2 * time.Minute + watchdogCtx, cancelWatchdog := context.WithTimeout(ctx, time.Minute) + defer cancelWatchdog() + +nextTry: + for ctx.Err() == nil { + // Select a peer that should have these cfilters. + rp, err := s.waitForRemote(watchdogCtx, pickForGetCfilters(int32(lastHeight)), true) + if watchdogCtx.Err() != nil && ctx.Err() == nil { + // Watchdog timer triggered. Reset sidechain forest. + lastNode := nodes[len(nodes)-1] + log.Warnf("Batch of CFilters ending on block %s at "+ + "height %d not received within %s. Clearing "+ + "sidechain forest to retry with different "+ + "headers", lastNode.Hash, lastNode.Header.Height, + watchdogTimeoutInterval) + s.sidechainMu.Lock() + s.sidechains.PruneAll() + s.sidechainMu.Unlock() + return errCfilterWatchdogTriggered + } + if err != nil { + return err + } - // TODO: Fetch using getcfsv2 if peer supports batched cfilter fetching. + startTime := time.Now() - filters, err := rp.CFiltersV2(ctx, nodeHashes) - if err != nil { - log.Tracef("Unable to fetch cfilter batch for "+ - "from %v: %v", rp, err) - return err - } + // TODO: Fetch using getcfsv2 if peer supports batched cfilter fetching. - for i := range nodes { - err = validate.CFilterV2HeaderCommitment(cnet, nodes[i].Header, - filters[i].Filter, filters[i].ProofIndex, filters[i].Proof) + filters, err := rp.CFiltersV2(ctx, nodeHashes) if err != nil { - errMsg := fmt.Sprintf("CFilter for block %v (height %d) "+ - "received from %v failed validation: %v", - nodes[i].Hash, nodes[i].Header.Height, - rp, err) - log.Warnf(errMsg) - err := errors.E(errors.Protocol, errMsg) - rp.Disconnect(err) - return err + log.Tracef("Unable to fetch cfilter batch for "+ + "from %v: %v", rp, err) + continue nextTry } - } - s.sidechainMu.Lock() - for i := range nodes { - nodes[i].FilterV2 = filters[i].Filter + for i := range nodes { + err = validate.CFilterV2HeaderCommitment(cnet, nodes[i].Header, + filters[i].Filter, filters[i].ProofIndex, filters[i].Proof) + if err != nil { + errMsg := fmt.Sprintf("CFilter for block %v (height %d) "+ + "received from %v failed validation: %v", + nodes[i].Hash, nodes[i].Header.Height, + rp, err) + log.Warnf(errMsg) + err := errors.E(errors.Protocol, errMsg) + rp.Disconnect(err) + continue nextTry + } + } + + s.sidechainMu.Lock() + for i := range nodes { + nodes[i].FilterV2 = filters[i].Filter + } + s.sidechainMu.Unlock() + log.Tracef("Fetched %d new cfilters(s) ending at height %d "+ + "from %v (request took %s)", + len(nodes), nodes[len(nodes)-1].Header.Height, rp, + time.Since(startTime).Truncate(time.Millisecond)) + return nil } - s.sidechainMu.Unlock() - log.Tracef("Fetched %d new cfilters(s) ending at height %d from %v", - len(nodes), nodes[len(nodes)-1].Header.Height, rp) - return nil + + return ctx.Err() } // headersBatch is a batch of headers fetched during initial sync. diff --git a/spv/sync.go b/spv/sync.go index f6167b3b4..dfcc869fb 100644 --- a/spv/sync.go +++ b/spv/sync.go @@ -1352,7 +1352,7 @@ nextbatch: s.sidechainMu.Unlock() // Fetch Missing CFilters. - err = s.cfiltersV2FromNodes(ctx, batch.rp, missingCfilter) + err = s.cfiltersV2FromNodes(ctx, missingCfilter) if err != nil { log.Debugf("Unable to fetch missing cfilters from %v: %v", batch.rp, err) diff --git a/wallet/sidechains.go b/wallet/sidechains.go index 70c6c78ca..07d563785 100644 --- a/wallet/sidechains.go +++ b/wallet/sidechains.go @@ -244,6 +244,11 @@ func (f *SidechainForest) AddBlockNode(n *BlockNode) bool { return true } +// PruneAll removes all sidechains. +func (f *SidechainForest) PruneAll() { + f.trees = nil +} + // Prune removes any sidechain trees which contain a root that is significantly // behind the current main chain tip block. func (f *SidechainForest) Prune(mainChainHeight int32, params *chaincfg.Params) {