Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sweepbatcher: add options WithInitialDelay and WithPublishDelay #801

Merged
merged 7 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions sweepbatcher/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/loopdb/sqlc"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/lntypes"
)

Expand Down Expand Up @@ -72,15 +71,13 @@ type SQLStore struct {
baseDb BaseDB

network *chaincfg.Params
clock clock.Clock
}

// NewSQLStore creates a new SQLStore.
func NewSQLStore(db BaseDB, network *chaincfg.Params) *SQLStore {
return &SQLStore{
baseDb: db,
network: network,
clock: clock.NewDefaultClock(),
}
}

Expand Down
130 changes: 121 additions & 9 deletions sweepbatcher/sweep_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/lightninglabs/loop/swap"
sweeppkg "github.com/lightninglabs/loop/sweep"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnrpc/walletrpc"
Expand Down Expand Up @@ -134,8 +135,17 @@ type batchConfig struct {
// batchConfTarget is the confirmation target of the batch transaction.
batchConfTarget int32

// batchPublishDelay is the delay between receiving a new block and
// publishing the batch transaction.
// clock provides methods to work with time and timers.
clock clock.Clock

// initialDelay is the delay of first batch publishing after creation.
// It only affects newly created batches, not batches loaded from DB,
// so publishing does happen in case of a daemon restart (especially
// important in case of a crashloop).
initialDelay time.Duration

// batchPublishDelay is the delay between receiving a new block or
// initial delay completion and publishing the batch transaction.
batchPublishDelay time.Duration

// noBumping instructs sweepbatcher not to fee bump itself and rely on
Expand Down Expand Up @@ -507,6 +517,11 @@ func (b *batch) Wait() {
<-b.finished
}

// stillWaitingMsg is the format of the message printed if the batch is about
// to publish, but initial delay has not ended yet.
const stillWaitingMsg = "Skipping publishing, initial delay will end at " +
"%v, now is %v."

// Run is the batch's main event loop.
func (b *batch) Run(ctx context.Context) error {
runCtx, cancel := context.WithCancel(ctx)
Expand All @@ -527,6 +542,9 @@ func (b *batch) Run(ctx context.Context) error {
return fmt.Errorf("both musig2 signers provided")
}

// Cache clock variable.
clock := b.cfg.clock

blockChan, blockErrChan, err :=
b.chainNotifier.RegisterBlockEpochNtfn(runCtx)
if err != nil {
Expand All @@ -543,10 +561,25 @@ func (b *batch) Run(ctx context.Context) error {
}
}

// skipBefore is the time before which we skip batch publishing.
// This is needed to facilitate better grouping of sweeps.
// For batches loaded from DB initialDelay should be 0.
skipBefore := clock.Now().Add(b.cfg.initialDelay)

// initialDelayChan is a timer which fires upon initial delay end.
// If initialDelay is 0, it does not fire to prevent race with
// blockChan which also fires immediately with current tip. Such a race
// may result in double publishing if batchPublishDelay is also 0.
var initialDelayChan <-chan time.Time
if b.cfg.initialDelay > 0 {
initialDelayChan = clock.TickAfter(b.cfg.initialDelay)
}

// We use a timer in order to not publish new transactions at the same
// time as the block epoch notification. This is done to prevent
// unnecessary transaction publishments when a spend is detected on that
// block.
// block. This timer starts after new block arrives or initialDelay
// completes.
var timerChan <-chan time.Time

b.log.Infof("started, primary %x, total sweeps %v",
Expand All @@ -557,20 +590,48 @@ func (b *batch) Run(ctx context.Context) error {
case <-b.callEnter:
<-b.callLeave

// blockChan provides immediately the current tip.
case height := <-blockChan:
b.log.Debugf("received block %v", height)

// Set the timer to publish the batch transaction after
// the configured delay.
timerChan = time.After(b.cfg.batchPublishDelay)
timerChan = clock.TickAfter(b.cfg.batchPublishDelay)
b.currentHeight = height

case <-initialDelayChan:
b.log.Debugf("initial delay of duration %v has ended",
b.cfg.initialDelay)

// Set the timer to publish the batch transaction after
// the configured delay.
timerChan = clock.TickAfter(b.cfg.batchPublishDelay)

case <-timerChan:
if b.state == Open {
err := b.publish(ctx)
if err != nil {
return err
}
// Check that batch is still open.
if b.state != Open {
b.log.Debugf("Skipping publishing, because the"+
"batch is not open (%v).", b.state)
continue
}

// If the batch became urgent, skipBefore is set to now.
if b.isUrgent(skipBefore) {
skipBefore = clock.Now()
}

// Check that the initial delay has ended. We have also
// batchPublishDelay on top of initialDelay, so if
// initialDelayChan has just fired, this check passes.
now := clock.Now()
if skipBefore.After(now) {
b.log.Debugf(stillWaitingMsg, skipBefore, now)
continue
}

err := b.publish(ctx)
if err != nil {
return err
}

case spend := <-b.spendChan:
Expand Down Expand Up @@ -604,6 +665,57 @@ func (b *batch) Run(ctx context.Context) error {
}
}

// timeout returns minimum timeout as block height among sweeps of the batch.
// If the batch is empty, return -1.
func (b *batch) timeout() int32 {
// Find minimum among sweeps' timeouts.
minTimeout := int32(-1)
for _, sweep := range b.sweeps {
if minTimeout == -1 || minTimeout > sweep.timeout {
minTimeout = sweep.timeout
}
}

return minTimeout
}

// isUrgent checks if the batch became urgent. This is determined by comparing
// the remaining number of blocks until timeout to the initial delay remained,
// given one block is 10 minutes.
func (b *batch) isUrgent(skipBefore time.Time) bool {
timeout := b.timeout()
if timeout <= 0 {
b.log.Warnf("Method timeout() returned %v. Number of"+
" sweeps: %d. It may be an empty batch.",
timeout, len(b.sweeps))
return false
}

if b.currentHeight == 0 {
// currentHeight is not initiated yet.
return false
}

blocksToTimeout := timeout - b.currentHeight
const blockTime = 10 * time.Minute
timeBank := time.Duration(blocksToTimeout) * blockTime

// We want to have at least 2x as much time to be safe.
const safetyFactor = 2
remainingWaiting := skipBefore.Sub(b.cfg.clock.Now())

if timeBank >= safetyFactor*remainingWaiting {
// There is enough time, keep waiting.
return false
}

b.log.Debugf("cancelling waiting for urgent sweep (timeBank is %v, "+
"remainingWaiting is %v)", timeBank, remainingWaiting)

// Signal to the caller to cancel initialDelay.
return true
}

// publish creates and publishes the latest batch transaction to the network.
func (b *batch) publish(ctx context.Context) error {
var (
Expand Down
96 changes: 94 additions & 2 deletions sweepbatcher/sweep_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/swap"
"github.com/lightninglabs/loop/utils"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
Expand Down Expand Up @@ -44,7 +45,7 @@ const (

// defaultTestnetPublishDelay is the default publish delay that is used
// for testnet.
defaultPublishDelay = 500 * time.Millisecond
defaultTestnetPublishDelay = 500 * time.Millisecond
)

type BatcherStore interface {
Expand Down Expand Up @@ -253,6 +254,23 @@ type Batcher struct {
// exit.
wg sync.WaitGroup

// clock provides methods to work with time and timers.
clock clock.Clock

// initialDelay is the delay of first batch publishing after creation.
// It only affects newly created batches, not batches loaded from DB,
// so publishing does happen in case of a daemon restart (especially
// important in case of a crashloop). If a sweep is about to expire
// (time until timeout is less that 2x initialDelay), then waiting is
// skipped.
initialDelay time.Duration

// publishDelay is the delay of batch publishing that is applied in the
// beginning, after the appearance of a new block in the network or
// after the end of initial delay. For batches recovered from DB this
// value is always 0s, regardless of this setting.
publishDelay time.Duration

// customFeeRate provides custom min fee rate per swap. The batch uses
// max of the fee rates of its swaps. In this mode confTarget is
// ignored and fee bumping by sweepbatcher is disabled.
Expand All @@ -267,6 +285,23 @@ type Batcher struct {

// BatcherConfig holds batcher configuration.
type BatcherConfig struct {
// clock provides methods to work with time and timers.
clock clock.Clock

// initialDelay is the delay of first batch publishing after creation.
// It only affects newly created batches, not batches loaded from DB,
// so publishing does happen in case of a daemon restart (especially
// important in case of a crashloop). If a sweep is about to expire
// (time until timeout is less that 2x initialDelay), then waiting is
// skipped.
initialDelay time.Duration

// publishDelay is the delay of batch publishing that is applied in the
// beginning, after the appearance of a new block in the network or
// after the end of initial delay. For batches recovered from DB this
// value is always 0s, regardless of this setting.
publishDelay time.Duration

// customFeeRate provides custom min fee rate per swap. The batch uses
// max of the fee rates of its swaps. In this mode confTarget is
// ignored and fee bumping by sweepbatcher is disabled.
Expand All @@ -282,6 +317,37 @@ type BatcherConfig struct {
// BatcherOption configures batcher behaviour.
type BatcherOption func(*BatcherConfig)

// WithClock sets the clock used by sweepbatcher and its batches. It is needed
// to manipulate time in tests.
func WithClock(clock clock.Clock) BatcherOption {
return func(cfg *BatcherConfig) {
cfg.clock = clock
}
}

// WithInitialDelay instructs sweepbatcher to wait for the duration provided
// after new batch creation before it is first published. This facilitates
// better grouping. Defaults to 0s (no initial delay). If a sweep is about
// to expire (time until timeout is less that 2x initialDelay), then waiting
// is skipped.
func WithInitialDelay(initialDelay time.Duration) BatcherOption {
return func(cfg *BatcherConfig) {
cfg.initialDelay = initialDelay
}
}

// WithPublishDelay sets the delay of batch publishing that is applied in the
// beginning, after the appearance of a new block in the network or after the
// end of initial delay (see WithInitialDelay). It is needed to prevent
// unnecessary transaction publishments when a spend is detected on that block.
// Default value depends on the network: 5 seconds in mainnet, 0.5s in testnet.
// For batches recovered from DB this value is always 0s.
func WithPublishDelay(publishDelay time.Duration) BatcherOption {
return func(cfg *BatcherConfig) {
cfg.publishDelay = publishDelay
}
}

// WithCustomFeeRate instructs sweepbatcher not to fee bump itself and rely on
// external source of fee rates (FeeRateProvider). To apply a fee rate change,
// the caller should re-add the sweep by calling AddSweep.
Expand Down Expand Up @@ -315,6 +381,11 @@ func NewBatcher(wallet lndclient.WalletKitClient,
opt(&cfg)
}

// If WithClock was not provided, use default clock.
if cfg.clock == nil {
cfg.clock = clock.NewDefaultClock()
}

if cfg.customMuSig2Signer != nil && musig2ServerSigner != nil {
panic("customMuSig2Signer must not be used with " +
"musig2ServerSigner")
Expand All @@ -334,6 +405,9 @@ func NewBatcher(wallet lndclient.WalletKitClient,
chainParams: chainparams,
store: store,
sweepStore: sweepStore,
clock: cfg.clock,
initialDelay: cfg.initialDelay,
publishDelay: cfg.publishDelay,
customFeeRate: cfg.customFeeRate,
customMuSig2Signer: cfg.customMuSig2Signer,
}
Expand Down Expand Up @@ -536,8 +610,22 @@ func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) {
cfg.batchPublishDelay = defaultMainnetPublishDelay

default:
cfg.batchPublishDelay = defaultPublishDelay
cfg.batchPublishDelay = defaultTestnetPublishDelay
}

if b.publishDelay != 0 {
bhandras marked this conversation as resolved.
Show resolved Hide resolved
if b.publishDelay < 0 {
return nil, fmt.Errorf("negative publishDelay: %v",
b.publishDelay)
}
cfg.batchPublishDelay = b.publishDelay
}

if b.initialDelay < 0 {
return nil, fmt.Errorf("negative initialDelay: %v",
b.initialDelay)
}
cfg.initialDelay = b.initialDelay

batchKit := b.newBatchKit()

Expand Down Expand Up @@ -626,6 +714,9 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {

cfg := b.newBatchConfig(batch.cfg.maxTimeoutDistance)

// Note that initialDelay and batchPublishDelay are 0 for batches
// recovered from DB so publishing happen in case of a daemon restart
// (especially important in case of a crashloop).
newBatch, err := NewBatchFromDB(cfg, batchKit)
if err != nil {
return fmt.Errorf("failed in NewBatchFromDB: %w", err)
Expand Down Expand Up @@ -934,6 +1025,7 @@ func (b *Batcher) newBatchConfig(maxTimeoutDistance int32) batchConfig {
maxTimeoutDistance: maxTimeoutDistance,
noBumping: b.customFeeRate != nil,
customMuSig2Signer: b.customMuSig2Signer,
clock: b.clock,
}
}

Expand Down
Loading
Loading