Skip to content

Commit

Permalink
spv: Split cfilter fetching in smaller batches
Browse files Browse the repository at this point in the history
This splits cfilter fetching during initial sync to use smaller sized
batches of requests.

This makes the code ready to switch to batched cfilter fetching on a
per-peer basis when a new protocol version is introduced that supports
it.
  • Loading branch information
matheusd committed Nov 23, 2023
1 parent 51c6c57 commit c61d89b
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 32 deletions.
70 changes: 38 additions & 32 deletions spv/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,49 +99,55 @@ func (s *Syncer) cfiltersV2FromNodes(ctx context.Context, rp *p2p.RemotePeer, no
}

cnet := s.wallet.ChainParams().Net
g, ctx := errgroup.WithContext(ctx)
res := make([]*gcs.FilterV2, len(nodes))
for i := range nodes {
i := i
g.Go(func() error {
node := nodes[i]
filter, proofIndex, proof, err := rp.CFilterV2(ctx, node.Hash)
if err != nil {
log.Tracef("Unable to fetch cfilter for "+
"block %v (height %d) from %v: %v",
node.Hash, node.Header.Height,
rp, err)
return err
}

err = validate.CFilterV2HeaderCommitment(cnet, node.Header,
filter, proofIndex, proof)
if err != nil {
errMsg := fmt.Sprintf("CFilter for block %v (height %d) "+
"received from %v failed validation: %v",
node.Hash, node.Header.Height,
rp, err)
log.Warnf(errMsg)
err := errors.E(errors.Protocol, errMsg)
rp.Disconnect(err)
return err
}
// Split fetching into batches of a max size.
const cfilterBatchSize = 100
if len(nodes) > cfilterBatchSize {
g, ctx := errgroup.WithContext(ctx)
for len(nodes) > cfilterBatchSize {
batch := nodes[:cfilterBatchSize]
g.Go(func() error { return s.cfiltersV2FromNodes(ctx, rp, batch) })
nodes = nodes[cfilterBatchSize:]
}
g.Go(func() error { return s.cfiltersV2FromNodes(ctx, rp, nodes) })
return g.Wait()
}

res[i] = filter
return nil
})
nodeHashes := make([]*chainhash.Hash, len(nodes))
for i := range nodes {
nodeHashes[i] = nodes[i].Hash
}
err := g.Wait()

// TODO: Fetch using getcfsv2 if peer supports batched cfilter fetching.

filters, err := rp.CFiltersV2(ctx, nodeHashes)
if err != nil {
log.Tracef("Unable to fetch cfilter batch for "+
"from %v: %v", rp, err)
return err
}

for i := range nodes {
err = validate.CFilterV2HeaderCommitment(cnet, nodes[i].Header,
filters[i].Filter, filters[i].ProofIndex, filters[i].Proof)
if err != nil {
errMsg := fmt.Sprintf("CFilter for block %v (height %d) "+
"received from %v failed validation: %v",
nodes[i].Hash, nodes[i].Header.Height,
rp, err)
log.Warnf(errMsg)
err := errors.E(errors.Protocol, errMsg)
rp.Disconnect(err)
return err
}
}

s.sidechainMu.Lock()
for i := range nodes {
nodes[i].FilterV2 = res[i]
nodes[i].FilterV2 = filters[i].Filter
}
s.sidechainMu.Unlock()
log.Debugf("Fetched %d new cfilters(s) ending at height %d from %v",
log.Tracef("Fetched %d new cfilters(s) ending at height %d from %v",
len(nodes), nodes[len(nodes)-1].Header.Height, rp)
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions spv/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -1358,6 +1358,10 @@ nextbatch:
batch.rp, err)
continue nextbatch
}
if len(missingCfilter) > 0 {
log.Debugf("Fetched %d new cfilters(s) ending at height %d",
len(missingCfilter), missingCfilter[len(missingCfilter)-1].Header.Height)
}

// Switch the best chain, now that all cfilters have been
// fetched for it.
Expand Down

0 comments on commit c61d89b

Please sign in to comment.