Skip to content

Commit

Permalink
sweepbatcher: add option WithWaitForAddSweep
Browse files Browse the repository at this point in the history
WithWaitForAddSweep instructs sweepbatcher to wait for all existing sweeps
to be AddSweep'ed in Run before actual starting. This is needed in setups
where AddSweep is called after setting up dependencies of FetchSweep, so
FetchSweep would fail if called before AddSweep.
  • Loading branch information
starius committed Sep 5, 2024
1 parent 6db2a39 commit 974d373
Show file tree
Hide file tree
Showing 2 changed files with 398 additions and 21 deletions.
212 changes: 191 additions & 21 deletions sweepbatcher/sweep_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,21 @@ type Batcher struct {
// initialized.
initDone chan struct{}

// waitForAddSweepDone is a channel that is closed when waitForAddSweep
// completes. If waitForAddSweepOnStart is false, the channel is always
// closed.
waitForAddSweepDone chan struct{}

// sweepsAdded is a set of swap hashes added by AddSweep calls, filled
// and used only during waitForAddSweep running (= until channel
// waitForAddSweepDone is closed).
sweepsAdded map[lntypes.Hash]struct{}

// sweepsAddedCond is a condition variable (and a mutex, its L field)
// used to protect sweepsAdded and to wait on it being filled with
// hashes of all the existing sweeps.
sweepsAddedCond *sync.Cond

// wallet is the wallet kit client that is used by batches.
wallet lndclient.WalletKitClient

Expand Down Expand Up @@ -323,6 +338,13 @@ type Batcher struct {
// error. By default, it logs all errors as warnings, but "insufficient
// fee" as Info.
publishErrorHandler PublishErrorHandler

// waitForAddSweepOnStart instructs sweepbatcher to wait for all
// existing sweeps to be AddSweep'ed in Run before actual starting.
// This is needed in setups where AddSweep is called after setting up
// dependencies of FetchSweep, so FetchSweep would fail if called before
// AddSweep.
waitForAddSweepOnStart bool
}

// BatcherConfig holds batcher configuration.
Expand Down Expand Up @@ -373,6 +395,13 @@ type BatcherConfig struct {
// error. By default, it logs all errors as warnings, but "insufficient
// fee" as Info.
publishErrorHandler PublishErrorHandler

// waitForAddSweepOnStart instructs sweepbatcher to wait for all
// existing sweeps to be AddSweep'ed in Run before actual starting.
// This is needed in setups where AddSweep is called after setting up
// dependencies of FetchSweep, so FetchSweep would fail if called before
// AddSweep.
waitForAddSweepOnStart bool
}

// BatcherOption configures batcher behaviour.
Expand Down Expand Up @@ -460,6 +489,16 @@ func WithPublishErrorHandler(handler PublishErrorHandler) BatcherOption {
}
}

// WithWaitForAddSweep instructs sweepbatcher to wait for all existing sweeps
// to be AddSweep'ed in Run before actual starting. This is needed in setups
// where AddSweep is called after setting up dependencies of FetchSweep, so
// FetchSweep would fail if called before AddSweep.
func WithWaitForAddSweep() BatcherOption {
return func(cfg *BatcherConfig) {
cfg.waitForAddSweepOnStart = true
}
}

// NewBatcher creates a new Batcher instance.
func NewBatcher(wallet lndclient.WalletKitClient,
chainNotifier lndclient.ChainNotifierClient,
Expand Down Expand Up @@ -492,33 +531,151 @@ func NewBatcher(wallet lndclient.WalletKitClient,
"musig2ServerSigner")
}

waitForAddSweepDone := make(chan struct{})
if !cfg.waitForAddSweepOnStart {
close(waitForAddSweepDone)
}

return &Batcher{
batches: make(map[int32]*batch),
sweepReqs: make(chan SweepRequest),
errChan: make(chan error, 1),
quit: make(chan struct{}),
initDone: make(chan struct{}),
wallet: wallet,
chainNotifier: chainNotifier,
signerClient: signerClient,
musig2ServerSign: musig2ServerSigner,
VerifySchnorrSig: verifySchnorrSig,
chainParams: chainparams,
store: store,
sweepStore: sweepStore,
clock: cfg.clock,
initialDelay: cfg.initialDelay,
publishDelay: cfg.publishDelay,
customFeeRate: cfg.customFeeRate,
txLabeler: cfg.txLabeler,
customMuSig2Signer: cfg.customMuSig2Signer,
mixedBatch: cfg.mixedBatch,
publishErrorHandler: cfg.publishErrorHandler,
batches: make(map[int32]*batch),
sweepReqs: make(chan SweepRequest),
errChan: make(chan error, 1),
quit: make(chan struct{}),
initDone: make(chan struct{}),
waitForAddSweepDone: waitForAddSweepDone,
sweepsAdded: make(map[lntypes.Hash]struct{}),
sweepsAddedCond: sync.NewCond(&sync.Mutex{}),
wallet: wallet,
chainNotifier: chainNotifier,
signerClient: signerClient,
musig2ServerSign: musig2ServerSigner,
VerifySchnorrSig: verifySchnorrSig,
chainParams: chainparams,
store: store,
sweepStore: sweepStore,
clock: cfg.clock,
initialDelay: cfg.initialDelay,
publishDelay: cfg.publishDelay,
customFeeRate: cfg.customFeeRate,
txLabeler: cfg.txLabeler,
customMuSig2Signer: cfg.customMuSig2Signer,
mixedBatch: cfg.mixedBatch,
publishErrorHandler: cfg.publishErrorHandler,
waitForAddSweepOnStart: cfg.waitForAddSweepOnStart,
}
}

// missingSweeps returns the list of sweeps presenf in existing, but not present
// in sweepsAdded.
func missingSweeps(existing []lntypes.Hash,
sweepsAdded map[lntypes.Hash]struct{}) []lntypes.Hash {

missing := []lntypes.Hash{}
for _, h := range existing {
if _, has := sweepsAdded[h]; !has {
missing = append(missing, h)
}
}

return missing
}

// waitForAddSweep waits until all the existing sweeps from the DB are added to
// the sweeper by AddSweep calls, or until ctx is cancelled.
func (b *Batcher) waitForAddSweep(ctx context.Context) error {
// Close channel waitForAddSweepDone in the end of this function, so
// subsequent AddSweep calls do not add hashes to sweepsAdded.
defer close(b.waitForAddSweepDone)

// Collect existing sweeps from DB.
batches, err := b.FetchUnconfirmedBatches(ctx)
if err != nil {
return fmt.Errorf("b.FetchUnconfirmedBatches failed: %w", err)
}
var existing []lntypes.Hash
for _, batch := range batches {
dbSweeps, err := b.store.FetchBatchSweeps(ctx, batch.id)
if err != nil {
return fmt.Errorf("store.FetchBatchSweeps failed: %w",
err)
}
for _, dbSweep := range dbSweeps {
existing = append(existing, dbSweep.SwapHash)
}
}

// If it doesn't complete in 1 minute, log an error.
logTimer := time.AfterFunc(time.Minute, func() {
select {
case <-b.waitForAddSweepDone:
// waitForAddSweepDone is closed. Do nothing.
return

default:
// waitForAddSweepDone is open. Still waiting.
}

// Collect missing sweeps.
b.sweepsAddedCond.L.Lock()
missing := missingSweeps(existing, b.sweepsAdded)
b.sweepsAddedCond.L.Unlock()

log.Errorf("SweepBatcher hasn't started yet, because some "+
"sweeps (%v) haven't been AddSweep'ed. Maybe they "+
"confirmed on-chain, in this case they should be "+
"removed manually from the database, table 'sweeps'.",
missing)
})

go func() {
select {
case <-ctx.Done():
// Wake cond var up to finish the loop above.
b.sweepsAddedCond.Signal()

case <-b.waitForAddSweepDone:
// waitForAddSweep has stopped, stop the goroutine.
}
}()

// Wait on cond var for all existing sweeps to be added in AddSweep or
// for the context to expire.
b.sweepsAddedCond.L.Lock()

for {
// If all the existing sweeps were added, stop waiting.
if len(missingSweeps(existing, b.sweepsAdded)) == 0 {
break
}

// It the context expired, stop waiting.
if ctx.Err() != nil {
break
}

// Wait for next AddSweep or ctx cancellation.
b.sweepsAddedCond.Wait()
}

// Clear sweepsAdded, it is not needed anymore.
b.sweepsAdded = make(map[lntypes.Hash]struct{})

b.sweepsAddedCond.L.Unlock()

// Cancel the delayed check.
logTimer.Stop()

return ctx.Err()
}

// Run starts the batcher and processes incoming sweep requests.
func (b *Batcher) Run(ctx context.Context) error {
if b.waitForAddSweepOnStart {
if err := b.waitForAddSweep(ctx); err != nil {
return fmt.Errorf("waitForAddSweep failed: %w", err)
}
}

runCtx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
Expand Down Expand Up @@ -577,6 +734,19 @@ func (b *Batcher) Run(ctx context.Context) error {
// AddSweep adds a sweep request to the batcher for handling. This will either
// place the sweep in an existing batch or create a new one.
func (b *Batcher) AddSweep(sweepReq *SweepRequest) error {
// If waitForAddSweep is running (indicated by waitForAddSweepDone being
// open), add the swap hash to sweepsAdded.
select {
case <-b.waitForAddSweepDone:
// waitForAddSweepDone is closed. Do not add to sweepsAdded.
default:
// waitForAddSweepDone is open.
b.sweepsAddedCond.L.Lock()
b.sweepsAdded[sweepReq.SwapHash] = struct{}{}
b.sweepsAddedCond.L.Unlock()
b.sweepsAddedCond.Signal()
}

select {
case b.sweepReqs <- *sweepReq:
return nil
Expand Down
Loading

0 comments on commit 974d373

Please sign in to comment.