From ce4a4d97a55ed3df946be61aa36e04d0a744ef08 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Tue, 30 Jul 2024 23:39:06 -0300 Subject: [PATCH 1/7] sweepbatcher: rename a private const Renamed defaultPublishDelay to defaultTestnetPublishDelay. Its godoc was already for defaultTestnetPublishDelay, but the const was named defaultPublishDelay. --- sweepbatcher/sweep_batcher.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index 5029cf7d4..ac5c31e94 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -44,7 +44,7 @@ const ( // defaultTestnetPublishDelay is the default publish delay that is used // for testnet. - defaultPublishDelay = 500 * time.Millisecond + defaultTestnetPublishDelay = 500 * time.Millisecond ) type BatcherStore interface { @@ -536,7 +536,7 @@ func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) { cfg.batchPublishDelay = defaultMainnetPublishDelay default: - cfg.batchPublishDelay = defaultPublishDelay + cfg.batchPublishDelay = defaultTestnetPublishDelay } batchKit := b.newBatchKit() From be69653bf029461d356aef4a3423f31796a1eb8e Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Sat, 3 Aug 2024 17:46:00 -0300 Subject: [PATCH 2/7] sweepbatcher/store: remove unused field clock --- sweepbatcher/store.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/sweepbatcher/store.go b/sweepbatcher/store.go index f429b987c..510015cc3 100644 --- a/sweepbatcher/store.go +++ b/sweepbatcher/store.go @@ -11,7 +11,6 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/lightninglabs/loop/loopdb" "github.com/lightninglabs/loop/loopdb/sqlc" - "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/lntypes" ) @@ -72,7 +71,6 @@ type SQLStore struct { baseDb BaseDB network *chaincfg.Params - clock clock.Clock } // NewSQLStore creates a new SQLStore. @@ -80,7 +78,6 @@ func NewSQLStore(db BaseDB, network *chaincfg.Params) *SQLStore { return &SQLStore{ baseDb: db, network: network, - clock: clock.NewDefaultClock(), } } From d7fa4ab94d0f288aae4dc004337ce9053649cb7f Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Wed, 31 Jul 2024 14:07:51 -0300 Subject: [PATCH 3/7] test.Guard: make timeout configurable --- test/timeout.go | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/test/timeout.go b/test/timeout.go index 52d47624b..4370cedde 100644 --- a/test/timeout.go +++ b/test/timeout.go @@ -9,12 +9,34 @@ import ( "github.com/fortytw2/leaktest" ) +// GuardConfig stores options for Guard function. +type GuardConfig struct { + timeout time.Duration +} + +// GuardOption is an option for Guard function. +type GuardOption func(*GuardConfig) + +// WithGuardTimeout sets timeout for the guard. Default is 5s. +func WithGuardTimeout(timeout time.Duration) GuardOption { + return func(c *GuardConfig) { + c.timeout = timeout + } +} + // Guard implements a test level timeout. -func Guard(t *testing.T) func() { +func Guard(t *testing.T, opts ...GuardOption) func() { + cfg := GuardConfig{ + timeout: 5 * time.Second, + } + for _, opt := range opts { + opt(&cfg) + } + done := make(chan struct{}) go func() { select { - case <-time.After(5 * time.Second): + case <-time.After(cfg.timeout): err := pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) if err != nil { panic(err) From 429eb85e14479fc0552db15b7d41cc56a4cf331d Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Sat, 3 Aug 2024 23:18:55 -0300 Subject: [PATCH 4/7] sweepbatcher: use lnd/clock for timers Added option: WithClock. Use it for publishDelay timer. It will be used for testing. --- sweepbatcher/sweep_batch.go | 9 ++++++++- sweepbatcher/sweep_batcher.go | 22 ++++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 709c85299..0c308c41c 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -24,6 +24,7 @@ import ( "github.com/lightninglabs/loop/swap" sweeppkg "github.com/lightninglabs/loop/sweep" "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lnrpc/walletrpc" @@ -134,6 +135,9 @@ type batchConfig struct { // batchConfTarget is the confirmation target of the batch transaction. batchConfTarget int32 + // clock provides methods to work with time and timers. + clock clock.Clock + // batchPublishDelay is the delay between receiving a new block and // publishing the batch transaction. batchPublishDelay time.Duration @@ -527,6 +531,9 @@ func (b *batch) Run(ctx context.Context) error { return fmt.Errorf("both musig2 signers provided") } + // Cache clock variable. + clock := b.cfg.clock + blockChan, blockErrChan, err := b.chainNotifier.RegisterBlockEpochNtfn(runCtx) if err != nil { @@ -562,7 +569,7 @@ func (b *batch) Run(ctx context.Context) error { // Set the timer to publish the batch transaction after // the configured delay. - timerChan = time.After(b.cfg.batchPublishDelay) + timerChan = clock.TickAfter(b.cfg.batchPublishDelay) b.currentHeight = height case <-timerChan: diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index ac5c31e94..a2ee99f3f 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -16,6 +16,7 @@ import ( "github.com/lightninglabs/loop/loopdb" "github.com/lightninglabs/loop/swap" "github.com/lightninglabs/loop/utils" + "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwallet/chainfee" @@ -253,6 +254,9 @@ type Batcher struct { // exit. wg sync.WaitGroup + // clock provides methods to work with time and timers. + clock clock.Clock + // customFeeRate provides custom min fee rate per swap. The batch uses // max of the fee rates of its swaps. In this mode confTarget is // ignored and fee bumping by sweepbatcher is disabled. @@ -267,6 +271,9 @@ type Batcher struct { // BatcherConfig holds batcher configuration. type BatcherConfig struct { + // clock provides methods to work with time and timers. + clock clock.Clock + // customFeeRate provides custom min fee rate per swap. The batch uses // max of the fee rates of its swaps. In this mode confTarget is // ignored and fee bumping by sweepbatcher is disabled. @@ -282,6 +289,14 @@ type BatcherConfig struct { // BatcherOption configures batcher behaviour. type BatcherOption func(*BatcherConfig) +// WithClock sets the clock used by sweepbatcher and its batches. It is needed +// to manipulate time in tests. +func WithClock(clock clock.Clock) BatcherOption { + return func(cfg *BatcherConfig) { + cfg.clock = clock + } +} + // WithCustomFeeRate instructs sweepbatcher not to fee bump itself and rely on // external source of fee rates (FeeRateProvider). To apply a fee rate change, // the caller should re-add the sweep by calling AddSweep. @@ -315,6 +330,11 @@ func NewBatcher(wallet lndclient.WalletKitClient, opt(&cfg) } + // If WithClock was not provided, use default clock. + if cfg.clock == nil { + cfg.clock = clock.NewDefaultClock() + } + if cfg.customMuSig2Signer != nil && musig2ServerSigner != nil { panic("customMuSig2Signer must not be used with " + "musig2ServerSigner") @@ -334,6 +354,7 @@ func NewBatcher(wallet lndclient.WalletKitClient, chainParams: chainparams, store: store, sweepStore: sweepStore, + clock: cfg.clock, customFeeRate: cfg.customFeeRate, customMuSig2Signer: cfg.customMuSig2Signer, } @@ -934,6 +955,7 @@ func (b *Batcher) newBatchConfig(maxTimeoutDistance int32) batchConfig { maxTimeoutDistance: maxTimeoutDistance, noBumping: b.customFeeRate != nil, customMuSig2Signer: b.customMuSig2Signer, + clock: b.clock, } } From e178d32717f3186fe26e2eb9ec9943e210537d11 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Wed, 31 Jul 2024 12:07:18 -0300 Subject: [PATCH 5/7] sweepbatcher: add option WithInitialDelay WithInitialDelay instructs sweepbatcher to wait for the duration provided after new batch creation before it is first published. This facilitates better grouping. It only affects newly created batches, not batches loaded from DB, so publishing does happen in case of a daemon restart (especially important in case of a crashloop). Defaults to 0s. If a sweep is about to expire (time until timeout is less that 2x initialDelay), then waiting is skipped. --- sweepbatcher/sweep_batch.go | 121 +++++++++++++++++++++++++++++++--- sweepbatcher/sweep_batcher.go | 37 +++++++++++ 2 files changed, 150 insertions(+), 8 deletions(-) diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 0c308c41c..1f444d530 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -138,8 +138,14 @@ type batchConfig struct { // clock provides methods to work with time and timers. clock clock.Clock - // batchPublishDelay is the delay between receiving a new block and - // publishing the batch transaction. + // initialDelay is the delay of first batch publishing after creation. + // It only affects newly created batches, not batches loaded from DB, + // so publishing does happen in case of a daemon restart (especially + // important in case of a crashloop). + initialDelay time.Duration + + // batchPublishDelay is the delay between receiving a new block or + // initial delay completion and publishing the batch transaction. batchPublishDelay time.Duration // noBumping instructs sweepbatcher not to fee bump itself and rely on @@ -511,6 +517,11 @@ func (b *batch) Wait() { <-b.finished } +// stillWaitingMsg is the format of the message printed if the batch is about +// to publish, but initial delay has not ended yet. +const stillWaitingMsg = "Skipping publishing, initial delay will end at " + + "%v, now is %v." + // Run is the batch's main event loop. func (b *batch) Run(ctx context.Context) error { runCtx, cancel := context.WithCancel(ctx) @@ -550,10 +561,25 @@ func (b *batch) Run(ctx context.Context) error { } } + // skipBefore is the time before which we skip batch publishing. + // This is needed to facilitate better grouping of sweeps. + // For batches loaded from DB initialDelay should be 0. + skipBefore := clock.Now().Add(b.cfg.initialDelay) + + // initialDelayChan is a timer which fires upon initial delay end. + // If initialDelay is 0, it does not fire to prevent race with + // blockChan which also fires immediately with current tip. Such a race + // may result in double publishing if batchPublishDelay is also 0. + var initialDelayChan <-chan time.Time + if b.cfg.initialDelay > 0 { + initialDelayChan = clock.TickAfter(b.cfg.initialDelay) + } + // We use a timer in order to not publish new transactions at the same // time as the block epoch notification. This is done to prevent // unnecessary transaction publishments when a spend is detected on that - // block. + // block. This timer starts after new block arrives or initialDelay + // completes. var timerChan <-chan time.Time b.log.Infof("started, primary %x, total sweeps %v", @@ -564,6 +590,7 @@ func (b *batch) Run(ctx context.Context) error { case <-b.callEnter: <-b.callLeave + // blockChan provides immediately the current tip. case height := <-blockChan: b.log.Debugf("received block %v", height) @@ -572,12 +599,39 @@ func (b *batch) Run(ctx context.Context) error { timerChan = clock.TickAfter(b.cfg.batchPublishDelay) b.currentHeight = height + case <-initialDelayChan: + b.log.Debugf("initial delay of duration %v has ended", + b.cfg.initialDelay) + + // Set the timer to publish the batch transaction after + // the configured delay. + timerChan = clock.TickAfter(b.cfg.batchPublishDelay) + case <-timerChan: - if b.state == Open { - err := b.publish(ctx) - if err != nil { - return err - } + // Check that batch is still open. + if b.state != Open { + b.log.Debugf("Skipping publishing, because the"+ + "batch is not open (%v).", b.state) + continue + } + + // If the batch became urgent, skipBefore is set to now. + if b.isUrgent(skipBefore) { + skipBefore = clock.Now() + } + + // Check that the initial delay has ended. We have also + // batchPublishDelay on top of initialDelay, so if + // initialDelayChan has just fired, this check passes. + now := clock.Now() + if skipBefore.After(now) { + b.log.Debugf(stillWaitingMsg, skipBefore, now) + continue + } + + err := b.publish(ctx) + if err != nil { + return err } case spend := <-b.spendChan: @@ -611,6 +665,57 @@ func (b *batch) Run(ctx context.Context) error { } } +// timeout returns minimum timeout as block height among sweeps of the batch. +// If the batch is empty, return -1. +func (b *batch) timeout() int32 { + // Find minimum among sweeps' timeouts. + minTimeout := int32(-1) + for _, sweep := range b.sweeps { + if minTimeout == -1 || minTimeout > sweep.timeout { + minTimeout = sweep.timeout + } + } + + return minTimeout +} + +// isUrgent checks if the batch became urgent. This is determined by comparing +// the remaining number of blocks until timeout to the initial delay remained, +// given one block is 10 minutes. +func (b *batch) isUrgent(skipBefore time.Time) bool { + timeout := b.timeout() + if timeout <= 0 { + b.log.Warnf("Method timeout() returned %v. Number of"+ + " sweeps: %d. It may be an empty batch.", + timeout, len(b.sweeps)) + return false + } + + if b.currentHeight == 0 { + // currentHeight is not initiated yet. + return false + } + + blocksToTimeout := timeout - b.currentHeight + const blockTime = 10 * time.Minute + timeBank := time.Duration(blocksToTimeout) * blockTime + + // We want to have at least 2x as much time to be safe. + const safetyFactor = 2 + remainingWaiting := skipBefore.Sub(b.cfg.clock.Now()) + + if timeBank >= safetyFactor*remainingWaiting { + // There is enough time, keep waiting. + return false + } + + b.log.Debugf("cancelling waiting for urgent sweep (timeBank is %v, "+ + "remainingWaiting is %v)", timeBank, remainingWaiting) + + // Signal to the caller to cancel initialDelay. + return true +} + // publish creates and publishes the latest batch transaction to the network. func (b *batch) publish(ctx context.Context) error { var ( diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index a2ee99f3f..5f1a11387 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -257,6 +257,14 @@ type Batcher struct { // clock provides methods to work with time and timers. clock clock.Clock + // initialDelay is the delay of first batch publishing after creation. + // It only affects newly created batches, not batches loaded from DB, + // so publishing does happen in case of a daemon restart (especially + // important in case of a crashloop). If a sweep is about to expire + // (time until timeout is less that 2x initialDelay), then waiting is + // skipped. + initialDelay time.Duration + // customFeeRate provides custom min fee rate per swap. The batch uses // max of the fee rates of its swaps. In this mode confTarget is // ignored and fee bumping by sweepbatcher is disabled. @@ -274,6 +282,14 @@ type BatcherConfig struct { // clock provides methods to work with time and timers. clock clock.Clock + // initialDelay is the delay of first batch publishing after creation. + // It only affects newly created batches, not batches loaded from DB, + // so publishing does happen in case of a daemon restart (especially + // important in case of a crashloop). If a sweep is about to expire + // (time until timeout is less that 2x initialDelay), then waiting is + // skipped. + initialDelay time.Duration + // customFeeRate provides custom min fee rate per swap. The batch uses // max of the fee rates of its swaps. In this mode confTarget is // ignored and fee bumping by sweepbatcher is disabled. @@ -297,6 +313,17 @@ func WithClock(clock clock.Clock) BatcherOption { } } +// WithInitialDelay instructs sweepbatcher to wait for the duration provided +// after new batch creation before it is first published. This facilitates +// better grouping. Defaults to 0s (no initial delay). If a sweep is about +// to expire (time until timeout is less that 2x initialDelay), then waiting +// is skipped. +func WithInitialDelay(initialDelay time.Duration) BatcherOption { + return func(cfg *BatcherConfig) { + cfg.initialDelay = initialDelay + } +} + // WithCustomFeeRate instructs sweepbatcher not to fee bump itself and rely on // external source of fee rates (FeeRateProvider). To apply a fee rate change, // the caller should re-add the sweep by calling AddSweep. @@ -355,6 +382,7 @@ func NewBatcher(wallet lndclient.WalletKitClient, store: store, sweepStore: sweepStore, clock: cfg.clock, + initialDelay: cfg.initialDelay, customFeeRate: cfg.customFeeRate, customMuSig2Signer: cfg.customMuSig2Signer, } @@ -560,6 +588,12 @@ func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) { cfg.batchPublishDelay = defaultTestnetPublishDelay } + if b.initialDelay < 0 { + return nil, fmt.Errorf("negative initialDelay: %v", + b.initialDelay) + } + cfg.initialDelay = b.initialDelay + batchKit := b.newBatchKit() batch := NewBatch(cfg, batchKit) @@ -647,6 +681,9 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error { cfg := b.newBatchConfig(batch.cfg.maxTimeoutDistance) + // Note that initialDelay and batchPublishDelay are 0 for batches + // recovered from DB so publishing happen in case of a daemon restart + // (especially important in case of a crashloop). newBatch, err := NewBatchFromDB(cfg, batchKit) if err != nil { return fmt.Errorf("failed in NewBatchFromDB: %w", err) From 3f563454926830399b321ebc346bc1ec5e53720f Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Wed, 31 Jul 2024 14:08:49 -0300 Subject: [PATCH 6/7] sweepbatcher: add option WithPublishDelay WithPublishDelay sets the delay of batch publishing that is applied in the beginning, after the appearance of a new block in the network or after the end of initial delay (see WithInitialDelay). It is needed to prevent unnecessary transaction publishments when a spend is detected on that block. Default value depends on the network: 5 seconds in mainnet, 0.5s in testnet. For batches recovered from DB this value is always 0s. --- sweepbatcher/sweep_batcher.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index 5f1a11387..7818c6858 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -265,6 +265,12 @@ type Batcher struct { // skipped. initialDelay time.Duration + // publishDelay is the delay of batch publishing that is applied in the + // beginning, after the appearance of a new block in the network or + // after the end of initial delay. For batches recovered from DB this + // value is always 0s, regardless of this setting. + publishDelay time.Duration + // customFeeRate provides custom min fee rate per swap. The batch uses // max of the fee rates of its swaps. In this mode confTarget is // ignored and fee bumping by sweepbatcher is disabled. @@ -290,6 +296,12 @@ type BatcherConfig struct { // skipped. initialDelay time.Duration + // publishDelay is the delay of batch publishing that is applied in the + // beginning, after the appearance of a new block in the network or + // after the end of initial delay. For batches recovered from DB this + // value is always 0s, regardless of this setting. + publishDelay time.Duration + // customFeeRate provides custom min fee rate per swap. The batch uses // max of the fee rates of its swaps. In this mode confTarget is // ignored and fee bumping by sweepbatcher is disabled. @@ -324,6 +336,18 @@ func WithInitialDelay(initialDelay time.Duration) BatcherOption { } } +// WithPublishDelay sets the delay of batch publishing that is applied in the +// beginning, after the appearance of a new block in the network or after the +// end of initial delay (see WithInitialDelay). It is needed to prevent +// unnecessary transaction publishments when a spend is detected on that block. +// Default value depends on the network: 5 seconds in mainnet, 0.5s in testnet. +// For batches recovered from DB this value is always 0s. +func WithPublishDelay(publishDelay time.Duration) BatcherOption { + return func(cfg *BatcherConfig) { + cfg.publishDelay = publishDelay + } +} + // WithCustomFeeRate instructs sweepbatcher not to fee bump itself and rely on // external source of fee rates (FeeRateProvider). To apply a fee rate change, // the caller should re-add the sweep by calling AddSweep. @@ -383,6 +407,7 @@ func NewBatcher(wallet lndclient.WalletKitClient, sweepStore: sweepStore, clock: cfg.clock, initialDelay: cfg.initialDelay, + publishDelay: cfg.publishDelay, customFeeRate: cfg.customFeeRate, customMuSig2Signer: cfg.customMuSig2Signer, } @@ -588,6 +613,14 @@ func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) { cfg.batchPublishDelay = defaultTestnetPublishDelay } + if b.publishDelay != 0 { + if b.publishDelay < 0 { + return nil, fmt.Errorf("negative publishDelay: %v", + b.publishDelay) + } + cfg.batchPublishDelay = b.publishDelay + } + if b.initialDelay < 0 { return nil, fmt.Errorf("negative initialDelay: %v", b.initialDelay) From 9af6718089e6e1b8b02a1d9f130dd1e9638f4403 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Fri, 9 Aug 2024 17:19:38 -0300 Subject: [PATCH 7/7] sweepbatcher: test InitialDelay and PublishDelay Tested scenarios: 1. For a regular sweep newly added it waits for initialDelay + publishDelay before publishing a transaction. 2. For a sweep recovered from DB it does not wait before publishing tx. 3. If a sweep is about to expire, initialDelay is skipped. --- sweepbatcher/sweep_batcher_test.go | 494 +++++++++++++++++++++++++++++ 1 file changed, 494 insertions(+) diff --git a/sweepbatcher/sweep_batcher_test.go b/sweepbatcher/sweep_batcher_test.go index 96b77fc51..18fe747e1 100644 --- a/sweepbatcher/sweep_batcher_test.go +++ b/sweepbatcher/sweep_batcher_test.go @@ -19,9 +19,11 @@ import ( "github.com/lightninglabs/loop/test" "github.com/lightninglabs/loop/utils" "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwallet/chainfee" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -548,6 +550,493 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, }, test.Timeout, eventuallyCheckFrequency) } +// wrappedLogger implements btclog.Logger, recording last debug message format. +// It is needed to watch for messages in tests. +type wrappedLogger struct { + btclog.Logger + + debugMessages []string + infoMessages []string +} + +// Debugf logs debug message. +func (l *wrappedLogger) Debugf(format string, params ...interface{}) { + l.debugMessages = append(l.debugMessages, format) + l.Logger.Debugf(format, params...) +} + +// Infof logs info message. +func (l *wrappedLogger) Infof(format string, params ...interface{}) { + l.infoMessages = append(l.infoMessages, format) + l.Logger.Infof(format, params...) +} + +// testDelays tests that WithInitialDelay and WithPublishDelay work. +func testDelays(t *testing.T, store testStore, batcherStore testBatcherStore) { + // Set initial delay and publish delay. + const ( + initialDelay = 4 * time.Second + publishDelay = 3 * time.Second + ) + + defer test.Guard(t)() + + lnd := test.NewMockLnd() + ctx, cancel := context.WithCancel(context.Background()) + + sweepStore, err := NewSweepFetcherFromSwapStore(store, lnd.ChainParams) + require.NoError(t, err) + + startTime := time.Date(2018, 11, 1, 0, 0, 0, 0, time.UTC) + tickSignal := make(chan time.Duration) + testClock := clock.NewTestClockWithTickSignal(startTime, tickSignal) + + batcher := NewBatcher( + lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, + testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams, + batcherStore, sweepStore, WithInitialDelay(initialDelay), + WithPublishDelay(publishDelay), WithClock(testClock), + ) + + var wg sync.WaitGroup + wg.Add(1) + + var runErr error + go func() { + defer wg.Done() + runErr = batcher.Run(ctx) + }() + + // Wait for the batcher to be initialized. + <-batcher.initDone + + // Create a sweep request. + sweepReq := SweepRequest{ + SwapHash: lntypes.Hash{1, 1, 1}, + Value: 111, + Outpoint: wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + }, + Notifier: &dummyNotifier, + } + + swap := &loopdb.LoopOutContract{ + SwapContract: loopdb.SwapContract{ + CltvExpiry: 1000, + AmountRequested: 111, + ProtocolVersion: loopdb.ProtocolVersionMuSig2, + HtlcKeys: htlcKeys, + }, + + DestAddr: destAddr, + SwapInvoice: swapInvoice, + SweepConfTarget: 123, + } + + err = store.CreateLoopOut(ctx, sweepReq.SwapHash, swap) + require.NoError(t, err) + store.AssertLoopOutStored() + + // Deliver sweep request to batcher. + require.NoError(t, batcher.AddSweep(&sweepReq)) + + // Expect two timers to be set: initialDelay and publishDelay, + // and RegisterSpend to be called. The order is not determined, + // so catch these actions from two separate goroutines. + var wg2 sync.WaitGroup + + wg2.Add(1) + go func() { + defer wg2.Done() + + // Since a batch was created we check that it registered for its + // primary sweep's spend. + <-lnd.RegisterSpendChannel + }() + + wg2.Add(1) + var delays []time.Duration + go func() { + defer wg2.Done() + + // Expect two timers: initialDelay and publishDelay. + delays = append(delays, <-tickSignal) + delays = append(delays, <-tickSignal) + }() + + // Wait for RegisterSpend and for timer registrations. + wg2.Wait() + + // Expect timer for initialDelay and publishDelay to be registered. + wantDelays := []time.Duration{initialDelay, publishDelay} + require.Equal(t, wantDelays, delays) + + // Eventually the batch is launched. + require.Eventually(t, func() bool { + return len(batcher.batches) == 1 + }, test.Timeout, eventuallyCheckFrequency) + + // Replace the logger in the batch with wrappedLogger to watch messages. + var batch1 *batch + for _, batch := range batcher.batches { + batch1 = batch + } + require.NotNil(t, batch1) + testLogger := &wrappedLogger{Logger: batch1.log} + batch1.log = testLogger + + // Advance the clock to publishDelay. It will trigger the publishDelay + // timer, but won't result in publishing, because of initialDelay. + now := startTime.Add(publishDelay) + testClock.SetTime(now) + + // Wait for batch publishing to be skipped, because initialDelay has not + // ended. + require.EventuallyWithT(t, func(c *assert.CollectT) { + require.Contains(t, testLogger.debugMessages, stillWaitingMsg) + }, test.Timeout, eventuallyCheckFrequency) + + // Advance the clock to the end of initialDelay. + now = startTime.Add(initialDelay) + testClock.SetTime(now) + + // Expect timer for publishDelay to be registered. + require.Equal(t, publishDelay, <-tickSignal) + + // Advance the clock. + now = now.Add(publishDelay) + testClock.SetTime(now) + + // Wait for tx to be published. + <-lnd.TxPublishChannel + + // Once batcher receives sweep request it will eventually spin up a + // batch. + require.Eventually(t, func() bool { + // Make sure that the sweep was stored + if !batcherStore.AssertSweepStored(sweepReq.SwapHash) { + return false + } + + // Make sure there is exactly one active batch. + if len(batcher.batches) != 1 { + return false + } + + // Get the batch. + batch := getOnlyBatch(batcher) + + // Make sure the batch has one sweep. + return len(batch.sweeps) == 1 + }, test.Timeout, eventuallyCheckFrequency) + + // Make sure we have stored the batch. + batches, err := batcherStore.FetchUnconfirmedSweepBatches(ctx) + require.NoError(t, err) + require.Len(t, batches, 1) + + // Now make the batcher quit by canceling the context. + cancel() + wg.Wait() + + // Make sure the batcher exited without an error. + checkBatcherError(t, runErr) + + // Advance the clock by 1 second. + now = now.Add(time.Second) + testClock.SetTime(now) + + // Now launch it again. + batcher = NewBatcher( + lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, + testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams, + batcherStore, sweepStore, WithInitialDelay(initialDelay), + WithPublishDelay(publishDelay), WithClock(testClock), + ) + ctx, cancel = context.WithCancel(context.Background()) + wg.Add(1) + go func() { + defer wg.Done() + runErr = batcher.Run(ctx) + }() + + // Wait for the batcher to be initialized. + <-batcher.initDone + + // Wait for batch to load. + require.Eventually(t, func() bool { + // Make sure that the sweep was stored + if !batcherStore.AssertSweepStored(sweepReq.SwapHash) { + return false + } + + // Make sure there is exactly one active batch. + if len(batcher.batches) != 1 { + return false + } + + // Get the batch. + batch := getOnlyBatch(batcher) + + // Make sure the batch has one sweep. + return len(batch.sweeps) == 1 + }, test.Timeout, eventuallyCheckFrequency) + + // Expect a timer to be set: 0 (instead of publishDelay), and + // RegisterSpend to be called. The order is not determined, so catch + // these actions from two separate goroutines. + var wg3 sync.WaitGroup + + wg3.Add(1) + go func() { + defer wg3.Done() + + // Since a batch was created we check that it registered for its + // primary sweep's spend. + <-lnd.RegisterSpendChannel + }() + + wg3.Add(1) + delays = nil + go func() { + defer wg3.Done() + + // Expect one timer: publishDelay (0). + delays = append(delays, <-tickSignal) + }() + + // Wait for RegisterSpend and for timer registration. + wg3.Wait() + + // Expect one timer: publishDelay (0). + wantDelays = []time.Duration{0} + require.Equal(t, wantDelays, delays) + + // Advance the clock. + now = now.Add(time.Millisecond) + testClock.SetTime(now) + + // Wait for tx to be published. + <-lnd.TxPublishChannel + + // Tick tock next block. + err = lnd.NotifyHeight(601) + require.NoError(t, err) + + // Expect timer for publishDelay (0) to be registered. Make sure + // sweepbatcher does not wait for recovered batches after new block + // arrives as well. + require.Equal(t, time.Duration(0), <-tickSignal) + + // Advance the clock. + now = now.Add(time.Millisecond) + testClock.SetTime(now) + + // Wait for tx to be published. + <-lnd.TxPublishChannel + + // Now make the batcher quit by canceling the context. + cancel() + wg.Wait() + + // Make sure the batcher exited without an error. + checkBatcherError(t, runErr) + + // Advance the clock by 1 second. + now = now.Add(time.Second) + testClock.SetTime(now) + + // Now test for large initialDelay and make sure it is cancelled + // for an urgent sweep. + const largeInitialDelay = 6 * time.Hour + + batcher = NewBatcher( + lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, + testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams, + batcherStore, sweepStore, WithInitialDelay(largeInitialDelay), + WithPublishDelay(publishDelay), WithClock(testClock), + ) + ctx, cancel = context.WithCancel(context.Background()) + wg.Add(1) + go func() { + defer wg.Done() + runErr = batcher.Run(ctx) + }() + + // Wait for the batcher to be initialized. + <-batcher.initDone + + // Expect spend notification and publication for the first batch. + // Expect a timer to be set: 0 (instead of publishDelay), and + // RegisterSpend to be called. The order is not determined, so catch + // these actions from two separate goroutines. + var wg4 sync.WaitGroup + + wg4.Add(1) + go func() { + defer wg4.Done() + + // Since a batch was created we check that it registered for its + // primary sweep's spend. + <-lnd.RegisterSpendChannel + }() + + wg4.Add(1) + delays = nil + go func() { + defer wg4.Done() + + // Expect one timer: publishDelay (0). + delays = append(delays, <-tickSignal) + }() + + // Wait for RegisterSpend and for timer registration. + wg4.Wait() + + // Expect one timer: publishDelay (0). + wantDelays = []time.Duration{0} + require.Equal(t, wantDelays, delays) + + // Get spend notification and tx publication for the first batch. + <-lnd.TxPublishChannel + + // Create a sweep request which is not urgent, but close to. + sweepReq2 := SweepRequest{ + SwapHash: lntypes.Hash{2, 2, 2}, + Value: 111, + Outpoint: wire.OutPoint{ + Hash: chainhash.Hash{2, 2}, + Index: 1, + }, + Notifier: &dummyNotifier, + } + + const blocksInDelay = int32(largeInitialDelay / (10 * time.Minute)) + swap2 := &loopdb.LoopOutContract{ + SwapContract: loopdb.SwapContract{ + // CltvExpiry is not urgent, but close. + CltvExpiry: 600 + blocksInDelay*2 + 5, + + AmountRequested: 111, + ProtocolVersion: loopdb.ProtocolVersionMuSig2, + HtlcKeys: htlcKeys, + + // Make preimage unique to pass SQL constraints. + Preimage: lntypes.Preimage{2}, + }, + + DestAddr: destAddr, + SwapInvoice: swapInvoice, + SweepConfTarget: 123, + } + + err = store.CreateLoopOut(ctx, sweepReq2.SwapHash, swap2) + require.NoError(t, err) + store.AssertLoopOutStored() + + // Deliver sweep request to batcher. + require.NoError(t, batcher.AddSweep(&sweepReq2)) + + // Expect the sweep to be added to new batch. Expect two timers: + // largeInitialDelay and publishDelay. RegisterSpend is called in + // parallel, so catch these actions from two separate goroutines. + var wg5 sync.WaitGroup + + wg5.Add(1) + go func() { + defer wg5.Done() + + // Since a batch was created we check that it registered for its + // primary sweep's spend. + <-lnd.RegisterSpendChannel + }() + + wg5.Add(1) + delays = nil + go func() { + defer wg5.Done() + + // Expect two timer: largeInitialDelay, publishDelay. + delays = append(delays, <-tickSignal) + delays = append(delays, <-tickSignal) + }() + + // Wait for RegisterSpend and for timers' registrations. + wg5.Wait() + + // Expect two timers: largeInitialDelay, publishDelay. + wantDelays = []time.Duration{largeInitialDelay, publishDelay} + require.Equal(t, wantDelays, delays) + + // Replace the logger in the batch with wrappedLogger to watch messages. + var batch2 *batch + for _, batch := range batcher.batches { + if batch.id != batch1.id { + batch2 = batch + } + } + require.NotNil(t, batch2) + testLogger2 := &wrappedLogger{Logger: batch2.log} + batch2.log = testLogger2 + + // Add another sweep which is urgent. It will go to the same batch + // to make sure minimum timeout is calculated properly. + sweepReq3 := SweepRequest{ + SwapHash: lntypes.Hash{3, 3, 3}, + Value: 111, + Outpoint: wire.OutPoint{ + Hash: chainhash.Hash{2, 2}, + Index: 1, + }, + Notifier: &dummyNotifier, + } + swap3 := &loopdb.LoopOutContract{ + SwapContract: loopdb.SwapContract{ + // CltvExpiry is urgent. + CltvExpiry: 600 + blocksInDelay*2 - 5, + + AmountRequested: 111, + ProtocolVersion: loopdb.ProtocolVersionMuSig2, + HtlcKeys: htlcKeys, + + // Make preimage unique to pass SQL constraints. + Preimage: lntypes.Preimage{3}, + }, + + DestAddr: destAddr, + SwapInvoice: swapInvoice, + SweepConfTarget: 123, + } + + err = store.CreateLoopOut(ctx, sweepReq3.SwapHash, swap3) + require.NoError(t, err) + store.AssertLoopOutStored() + + // Deliver sweep request to batcher. + require.NoError(t, batcher.AddSweep(&sweepReq3)) + + // Wait for sweep to be added to the batch. + require.EventuallyWithT(t, func(c *assert.CollectT) { + require.Contains(t, testLogger2.infoMessages, "adding sweep %x") + }, test.Timeout, eventuallyCheckFrequency) + + // Advance the clock by publishDelay. Don't wait largeInitialDelay. + now = now.Add(publishDelay) + testClock.SetTime(now) + + // Wait for tx to be published. + tx := <-lnd.TxPublishChannel + require.Equal(t, 2, len(tx.TxIn)) + + // Now make the batcher quit by canceling the context. + cancel() + wg.Wait() + + // Make sure the batcher exited without an error. + checkBatcherError(t, runErr) +} + // testSweepBatcherSweepReentry tests that when an old version of the batch tx // gets confirmed the sweep leftovers are sent back to the batcher. func testSweepBatcherSweepReentry(t *testing.T, store testStore, @@ -2284,6 +2773,11 @@ func TestSweepBatcherSimpleLifecycle(t *testing.T) { runTests(t, testSweepBatcherSimpleLifecycle) } +// TestDelays tests that WithInitialDelay and WithPublishDelay work. +func TestDelays(t *testing.T) { + runTests(t, testDelays) +} + // TestSweepBatcherSweepReentry tests that when an old version of the batch tx // gets confirmed the sweep leftovers are sent back to the batcher. func TestSweepBatcherSweepReentry(t *testing.T) {