Skip to content

Commit

Permalink
p2p: Reduce number of goroutines in CFiltersV2()
Browse files Browse the repository at this point in the history
This refactors the CFiltersV2 function to reduce the number of
goroutines used, therefore slightly reducing total cpu and memory load
and making it easier to run the race detector and debug issues.

The number of goroutines is reduced from one per block hash in the
request to two per call.  This is done by refactoring the code to use
one goroutine to send all requests and one to read the responses, while
ensuring that the requests are still sent as fast as the network stack
will allow it.
  • Loading branch information
matheusd committed Nov 23, 2023
1 parent c61d89b commit 41fc943
Showing 1 changed file with 100 additions and 19 deletions.
119 changes: 100 additions & 19 deletions p2p/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -1515,29 +1515,110 @@ type filterProof = struct {
// CFiltersV2 requests version 2 cfilters for all blocks described by
// blockHashes. This is currently implemented by making many separate
// getcfilter requests concurrently and waiting on every result.
//
// Note: returning a []func() is an ugly hack to prevent a cyclical dependency
// between the rpc package and the wallet package.
func (rp *RemotePeer) CFiltersV2(ctx context.Context, blockHashes []*chainhash.Hash) ([]filterProof, error) {
// TODO: this is spammy and would be better implemented with a single
// request/response.
const opf = "remotepeer(%v).CFiltersV2(%v)"

ctxSend, cancelSend := context.WithCancel(ctx)
defer cancelSend()

type request struct {
t time.Time
c chan *wire.MsgCFilterV2
}

// Send the requests on a separate goroutine, as fast as the network
// accepts them.
errChan := make(chan error, 1)
requests := make(chan request, len(blockHashes))
go func() {
defer close(requests)
for _, blockHash := range blockHashes {
m := wire.NewMsgGetCFilterV2(blockHash)
c := make(chan *wire.MsgCFilterV2, 1)
if !rp.addRequestedCFilterV2(blockHash, c) {
op := errors.Opf(opf, rp.raddr, blockHash)
errChan <- errors.E(op, errors.Invalid, "cfilterv2 is already being requested from this peer for this block")
return
}
now := time.Now()
select {
case rp.out <- &msgAck{m, nil}:
requests <- request{t: now, c: c}
case <-ctxSend.Done():
return
case <-rp.errc:
return
}
}
}()

stalled := time.NewTimer(stallTimeout)

// Helper func that stops the sending goroutine and removes all requests
// made starting at index `start`.
cleanup := func(start int, stopStalled bool) {
cancelSend()
for range requests { // Drain until it signals closed.
}
for i := start; i < len(blockHashes); i++ {
rp.deleteRequestedCFilterV2(blockHashes[i])
}
if stopStalled && !stalled.Stop() {
<-stalled.C
}
}

// Receive the responses.
filters := make([]filterProof, len(blockHashes))
g, ctx := errgroup.WithContext(ctx)
for i := range blockHashes {
i := i
g.Go(func() error {
f, pi, prf, err := rp.CFilterV2(ctx, blockHashes[i])
filters[i] = filterProof{
Filter: f,
ProofIndex: pi,
Proof: prf,
// Alternate between waiting for the next request to be sent
// and waiting for its response by switching which of req.c
// and q channels are not nil.
var req request
q := requests
for req.c != nil || q != nil {
select {
case <-ctx.Done():
cleanup(i, true)
return nil, ctx.Err()
case <-stalled.C:
cleanup(i, false)
op := errors.Opf(opf, rp.raddr, blockHashes[i])
err := errors.E(op, errors.IO, "peer appears stalled")
rp.Disconnect(err)
return nil, err
case <-rp.errc:
cleanup(i, true)
return nil, rp.err
case err := <-errChan:
cleanup(i, true)
return nil, err
case req = <-q:
q = nil

// Request was sent. Reset the stall timer to
// be relative to the sending time.
if !stalled.Stop() {
<-stalled.C
}
stalled.Reset(stallTimeout - time.Now().Sub(req.t))
case m := <-req.c:
var f *gcs.FilterV2
var err error
f, err = gcs.FromBytesV2(blockcf.B, blockcf.M, m.Data)
if err != nil {
cleanup(i, true)
op := errors.Opf(opf, rp.raddr, blockHashes[i])
return nil, errors.E(op, err)
}
filters[i] = filterProof{
Filter: f,
ProofIndex: m.ProofIndex,
Proof: m.ProofHashes,
}
req = request{}
}
return err
})
}
err := g.Wait()
if err != nil {
return nil, err
}
}
return filters, nil
}
Expand Down

0 comments on commit 41fc943

Please sign in to comment.