diff --git a/p2p/peering.go b/p2p/peering.go index 20a7cc63a..c35d76b30 100644 --- a/p2p/peering.go +++ b/p2p/peering.go @@ -1515,29 +1515,110 @@ type filterProof = struct { // CFiltersV2 requests version 2 cfilters for all blocks described by // blockHashes. This is currently implemented by making many separate // getcfilter requests concurrently and waiting on every result. -// -// Note: returning a []func() is an ugly hack to prevent a cyclical dependency -// between the rpc package and the wallet package. func (rp *RemotePeer) CFiltersV2(ctx context.Context, blockHashes []*chainhash.Hash) ([]filterProof, error) { - // TODO: this is spammy and would be better implemented with a single - // request/response. + const opf = "remotepeer(%v).CFiltersV2(%v)" + + ctxSend, cancelSend := context.WithCancel(ctx) + defer cancelSend() + + type request struct { + t time.Time + c chan *wire.MsgCFilterV2 + } + + // Send the requests on a separate goroutine, as fast as the network + // accepts them. + errChan := make(chan error, 1) + requests := make(chan request, len(blockHashes)) + go func() { + defer close(requests) + for _, blockHash := range blockHashes { + m := wire.NewMsgGetCFilterV2(blockHash) + c := make(chan *wire.MsgCFilterV2, 1) + if !rp.addRequestedCFilterV2(blockHash, c) { + op := errors.Opf(opf, rp.raddr, blockHash) + errChan <- errors.E(op, errors.Invalid, "cfilterv2 is already being requested from this peer for this block") + return + } + now := time.Now() + select { + case rp.out <- &msgAck{m, nil}: + requests <- request{t: now, c: c} + case <-ctxSend.Done(): + return + case <-rp.errc: + return + } + } + }() + + stalled := time.NewTimer(stallTimeout) + + // Helper func that stops the sending goroutine and removes all requests + // made starting at index `start`. + cleanup := func(start int, stopStalled bool) { + cancelSend() + for range requests { // Drain until it signals closed. + } + for i := start; i < len(blockHashes); i++ { + rp.deleteRequestedCFilterV2(blockHashes[i]) + } + if stopStalled && !stalled.Stop() { + <-stalled.C + } + } + + // Receive the responses. filters := make([]filterProof, len(blockHashes)) - g, ctx := errgroup.WithContext(ctx) for i := range blockHashes { - i := i - g.Go(func() error { - f, pi, prf, err := rp.CFilterV2(ctx, blockHashes[i]) - filters[i] = filterProof{ - Filter: f, - ProofIndex: pi, - Proof: prf, + // Alternate between waiting for the next request to be sent + // and waiting for its response by switching which of req.c + // and q channels are not nil. + var req request + q := requests + for req.c != nil || q != nil { + select { + case <-ctx.Done(): + cleanup(i, true) + return nil, ctx.Err() + case <-stalled.C: + cleanup(i, false) + op := errors.Opf(opf, rp.raddr, blockHashes[i]) + err := errors.E(op, errors.IO, "peer appears stalled") + rp.Disconnect(err) + return nil, err + case <-rp.errc: + cleanup(i, true) + return nil, rp.err + case err := <-errChan: + cleanup(i, true) + return nil, err + case req = <-q: + q = nil + + // Request was sent. Reset the stall timer to + // be relative to the sending time. + if !stalled.Stop() { + <-stalled.C + } + stalled.Reset(stallTimeout - time.Now().Sub(req.t)) + case m := <-req.c: + var f *gcs.FilterV2 + var err error + f, err = gcs.FromBytesV2(blockcf.B, blockcf.M, m.Data) + if err != nil { + cleanup(i, true) + op := errors.Opf(opf, rp.raddr, blockHashes[i]) + return nil, errors.E(op, err) + } + filters[i] = filterProof{ + Filter: f, + ProofIndex: m.ProofIndex, + Proof: m.ProofHashes, + } + req = request{} } - return err - }) - } - err := g.Wait() - if err != nil { - return nil, err + } } return filters, nil }