diff --git a/spv/backend.go b/spv/backend.go index 6e5abf330..56fe43518 100644 --- a/spv/backend.go +++ b/spv/backend.go @@ -99,49 +99,55 @@ func (s *Syncer) cfiltersV2FromNodes(ctx context.Context, rp *p2p.RemotePeer, no } cnet := s.wallet.ChainParams().Net - g, ctx := errgroup.WithContext(ctx) - res := make([]*gcs.FilterV2, len(nodes)) - for i := range nodes { - i := i - g.Go(func() error { - node := nodes[i] - filter, proofIndex, proof, err := rp.CFilterV2(ctx, node.Hash) - if err != nil { - log.Tracef("Unable to fetch cfilter for "+ - "block %v (height %d) from %v: %v", - node.Hash, node.Header.Height, - rp, err) - return err - } - err = validate.CFilterV2HeaderCommitment(cnet, node.Header, - filter, proofIndex, proof) - if err != nil { - errMsg := fmt.Sprintf("CFilter for block %v (height %d) "+ - "received from %v failed validation: %v", - node.Hash, node.Header.Height, - rp, err) - log.Warnf(errMsg) - err := errors.E(errors.Protocol, errMsg) - rp.Disconnect(err) - return err - } + // Split fetching into batches of a max size. + const cfilterBatchSize = 100 + if len(nodes) > cfilterBatchSize { + g, ctx := errgroup.WithContext(ctx) + for len(nodes) > cfilterBatchSize { + batch := nodes[:cfilterBatchSize] + g.Go(func() error { return s.cfiltersV2FromNodes(ctx, rp, batch) }) + nodes = nodes[cfilterBatchSize:] + } + g.Go(func() error { return s.cfiltersV2FromNodes(ctx, rp, nodes) }) + return g.Wait() + } - res[i] = filter - return nil - }) + nodeHashes := make([]*chainhash.Hash, len(nodes)) + for i := range nodes { + nodeHashes[i] = nodes[i].Hash } - err := g.Wait() + + // TODO: Fetch using getcfsv2 if peer supports batched cfilter fetching. + + filters, err := rp.CFiltersV2(ctx, nodeHashes) if err != nil { + log.Tracef("Unable to fetch cfilter batch for "+ + "from %v: %v", rp, err) return err } + for i := range nodes { + err = validate.CFilterV2HeaderCommitment(cnet, nodes[i].Header, + filters[i].Filter, filters[i].ProofIndex, filters[i].Proof) + if err != nil { + errMsg := fmt.Sprintf("CFilter for block %v (height %d) "+ + "received from %v failed validation: %v", + nodes[i].Hash, nodes[i].Header.Height, + rp, err) + log.Warnf(errMsg) + err := errors.E(errors.Protocol, errMsg) + rp.Disconnect(err) + return err + } + } + s.sidechainMu.Lock() for i := range nodes { - nodes[i].FilterV2 = res[i] + nodes[i].FilterV2 = filters[i].Filter } s.sidechainMu.Unlock() - log.Debugf("Fetched %d new cfilters(s) ending at height %d from %v", + log.Tracef("Fetched %d new cfilters(s) ending at height %d from %v", len(nodes), nodes[len(nodes)-1].Header.Height, rp) return nil } diff --git a/spv/sync.go b/spv/sync.go index 94d7d989b..f6167b3b4 100644 --- a/spv/sync.go +++ b/spv/sync.go @@ -1358,6 +1358,10 @@ nextbatch: batch.rp, err) continue nextbatch } + if len(missingCfilter) > 0 { + log.Debugf("Fetched %d new cfilters(s) ending at height %d", + len(missingCfilter), missingCfilter[len(missingCfilter)-1].Header.Height) + } // Switch the best chain, now that all cfilters have been // fetched for it.