Skip to content

Commit

Permalink
sweepbatcher: use lnd/clock for timers
Browse files Browse the repository at this point in the history
Added option: WithClock. Use it for waitingPeriod and publishDelay timers.
The test now runs in 0.5s instead of 10s.

Testing for urgent batch failed sometimes, because the check happen to run
before addSweep and the batch was empty at that point (timeout unknown). To fix
this, the check was moved to timerChan handler, just before the code that checks
if waiting period has ended, before publishing. By that time the sweep is added.
  • Loading branch information
starius committed Aug 4, 2024
1 parent 196815a commit 7d34e2c
Show file tree
Hide file tree
Showing 3 changed files with 307 additions and 80 deletions.
120 changes: 77 additions & 43 deletions sweepbatcher/sweep_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/lightninglabs/loop/swap"
sweeppkg "github.com/lightninglabs/loop/sweep"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnrpc/walletrpc"
Expand Down Expand Up @@ -139,6 +140,9 @@ type batchConfig struct {
// so publishing does happen in case of a crashloop.
waitingPeriod time.Duration

// clock provides methods to work with time and timers.
clock clock.Clock

// batchPublishDelay is the delay between receiving a new block or
// waiting period completion and publishing the batch transaction.
batchPublishDelay time.Duration
Expand Down Expand Up @@ -512,6 +516,11 @@ func (b *batch) Wait() {
<-b.finished
}

// stillWaitingMsg is the format of the message printed if the batch is about
// to publish, but waiting period has not ended yet.
const stillWaitingMsg = "Skipping publishing, waiting period will end at " +
"%v, now is %v."

// Run is the batch's main event loop.
func (b *batch) Run(ctx context.Context) error {
runCtx, cancel := context.WithCancel(ctx)
Expand All @@ -532,6 +541,9 @@ func (b *batch) Run(ctx context.Context) error {
return fmt.Errorf("both musig2 signers provided")
}

// Cache clock variable.
clock := b.cfg.clock

blockChan, blockErrChan, err :=
b.chainNotifier.RegisterBlockEpochNtfn(runCtx)
if err != nil {
Expand All @@ -551,15 +563,15 @@ func (b *batch) Run(ctx context.Context) error {
// skipBefore is the time before which we skip batch publishing.
// This is needed to facilitate better grouping of sweeps.
// For batches loaded from DB waitingPeriod should be 0.
skipBefore := time.Now().Add(b.cfg.waitingPeriod)
skipBefore := clock.Now().Add(b.cfg.waitingPeriod)

// waitingEndChan is a timer which fires upon waiting period end.
// If waitingPeriod is 0, it does not fire to prevent race with
// blockChan which also fires immediately with current tip. Such a race
// may result in double publishing if batchPublishDelay is also 0.
var waitingEndChan <-chan time.Time
if b.cfg.waitingPeriod > 0 {
waitingEndChan = time.After(b.cfg.waitingPeriod)
waitingEndChan = clock.TickAfter(b.cfg.waitingPeriod)
}

// We use a timer in order to not publish new transactions at the same
Expand All @@ -572,6 +584,45 @@ func (b *batch) Run(ctx context.Context) error {
b.log.Infof("started, primary %x, total sweeps %v",
b.primarySweepID[0:6], len(b.sweeps))

// checkUrgent checks if the batch became urgent. This is determined
// by comparing the remaining number of blocks until timeout to the
// waiting period remained, given one block is 10 minutes. If the batch
// is found to be urgent, waitingPeriod is cancelled.
checkUrgent := func() {
timeout := b.timeout()
if timeout <= 0 {
b.log.Warnf("Method timeout() returned %v. Number of"+
" sweeps: %d. It may be an empty batch.",
timeout, len(b.sweeps))
return
}

if b.currentHeight == 0 {
// currentHeight is not initiated yet.
return
}

blocksToTimeout := timeout - b.currentHeight
const blockTime = 10 * time.Minute
timeBank := time.Duration(blocksToTimeout) * blockTime

// We want to have at least 2x as much time to be safe.
const safetyFactor = 2
remainingWaiting := skipBefore.Sub(clock.Now())
if timeBank >= safetyFactor*remainingWaiting {
// There is enough time, keep waiting.
return
}

// Reset waitingPeriod by setting skipBefore to now. It will be
// published when timerChan fires (in batchPublishDelay seconds).
skipBefore = clock.Now()

b.log.Debugf("cancelling waiting for urgent sweep "+
"(timeBank is %v, remainingWaiting is %v)",
timeBank, remainingWaiting)
}

for {
select {
case <-b.callEnter:
Expand All @@ -583,57 +634,40 @@ func (b *batch) Run(ctx context.Context) error {

// Set the timer to publish the batch transaction after
// the configured delay.
timerChan = time.After(b.cfg.batchPublishDelay)
timerChan = clock.TickAfter(b.cfg.batchPublishDelay)
b.currentHeight = height

// Check if the batch became urgent.
timeout := b.timeout()
if timeout <= 0 {
b.log.Warnf("Method timeout() returned %v. "+
"Number of sweeps: %d. It may be an"+
" empty batch.", timeout, len(b.sweeps))
continue
}

blocksToTimeout := timeout - height
const blockTime = 10 * time.Minute
timeBank := time.Duration(blocksToTimeout) * blockTime

// We want to have at least 2x as much time to be safe.
const safetyFactor = 2
remainingWaiting := time.Until(skipBefore)
if timeBank >= safetyFactor*remainingWaiting {
// There is enough time, keep waiting.
continue
}

// Reset waitingPeriod by setting skipBefore to now.
// It will be published when timerChan fires (in
// batchPublishDelay seconds).
skipBefore = time.Now()

b.log.Debugf("cancelling waiting for urgent sweep "+
"(timeBank is %v, remainingWaiting is %v)",
timeBank, remainingWaiting)

case <-waitingEndChan:
b.log.Debugf("waiting period of duration %v has ended",
b.cfg.waitingPeriod)

// Set the timer to publish the batch transaction after
// the configured delay.
timerChan = time.After(b.cfg.batchPublishDelay)
timerChan = clock.TickAfter(b.cfg.batchPublishDelay)

case <-timerChan:
// Check that batch is still open and that the waiting
// period has ended. We have also batchPublishDelay on
// top of waitingPeriod, so if waitingEndChan has just
// fired, this time check must pass.
if b.state == Open && !skipBefore.After(time.Now()) {
err := b.publish(ctx)
if err != nil {
return err
}
// Check that batch is still open.
if b.state != Open {
b.log.Debugf("Skipping publishing, because the"+
"batch is not open (%v).", b.state)
continue
}

// If the batch became urgent, skipBefore is set to now.
checkUrgent()

// Check that the waiting period has ended. We have also
// batchPublishDelay on top of waitingPeriod, so if
// waitingEndChan has just fired, this check must pass.
now := clock.Now()
if skipBefore.After(now) {
b.log.Debugf(stillWaitingMsg, skipBefore, now)
continue
}

err := b.publish(ctx)
if err != nil {
return err
}

case spend := <-b.spendChan:
Expand Down
22 changes: 22 additions & 0 deletions sweepbatcher/sweep_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/swap"
"github.com/lightninglabs/loop/utils"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
Expand Down Expand Up @@ -266,6 +267,9 @@ type Batcher struct {
// value is always 0s, regardless of this setting.
publishDelay time.Duration

// clock provides methods to work with time and timers.
clock clock.Clock

// customFeeRate provides custom min fee rate per swap. The batch uses
// max of the fee rates of its swaps. In this mode confTarget is
// ignored and fee bumping by sweepbatcher is disabled.
Expand Down Expand Up @@ -293,6 +297,9 @@ type BatcherConfig struct {
// value is always 0s, regardless of this setting.
publishDelay time.Duration

// clock provides methods to work with time and timers.
clock clock.Clock

// customFeeRate provides custom min fee rate per swap. The batch uses
// max of the fee rates of its swaps. In this mode confTarget is
// ignored and fee bumping by sweepbatcher is disabled.
Expand Down Expand Up @@ -331,6 +338,14 @@ func WithPublishDelay(publishDelay time.Duration) BatcherOption {
}
}

// WithClock sets the clock used by sweepbatcher and its batches. It is needed
// to manipulate time in tests.
func WithClock(clock clock.Clock) BatcherOption {
return func(cfg *BatcherConfig) {
cfg.clock = clock
}
}

// WithCustomFeeRate instructs sweepbatcher not to fee bump itself and rely on
// external source of fee rates (FeeRateProvider). To apply a fee rate change,
// the caller should re-add the sweep by calling AddSweep.
Expand Down Expand Up @@ -364,6 +379,11 @@ func NewBatcher(wallet lndclient.WalletKitClient,
opt(&cfg)
}

// If WithClock was not provided, use default clock.
if cfg.clock == nil {
cfg.clock = clock.NewDefaultClock()
}

if cfg.customMuSig2Signer != nil && musig2ServerSigner != nil {
panic("customMuSig2Signer must not be used with " +
"musig2ServerSigner")
Expand All @@ -385,6 +405,7 @@ func NewBatcher(wallet lndclient.WalletKitClient,
sweepStore: sweepStore,
waitingPeriod: cfg.waitingPeriod,
publishDelay: cfg.publishDelay,
clock: cfg.clock,
customFeeRate: cfg.customFeeRate,
customMuSig2Signer: cfg.customMuSig2Signer,
}
Expand Down Expand Up @@ -1001,6 +1022,7 @@ func (b *Batcher) newBatchConfig(maxTimeoutDistance int32) batchConfig {
maxTimeoutDistance: maxTimeoutDistance,
noBumping: b.customFeeRate != nil,
customMuSig2Signer: b.customMuSig2Signer,
clock: b.clock,
}
}

Expand Down
Loading

0 comments on commit 7d34e2c

Please sign in to comment.