Skip to content

Commit

Permalink
Merge pull request #813 from starius/sweepbatcher-max-inputs
Browse files Browse the repository at this point in the history
sweepbatcher: set max batch size to 1000 sweeps
  • Loading branch information
starius committed Aug 27, 2024
2 parents a6e9a2b + 7780aca commit 876ed40
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 2 deletions.
10 changes: 9 additions & 1 deletion sweepbatcher/greedy_batch_selection.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (
// greedyAddSweep selects a batch for the sweep using the greedy algorithm,
// which minimizes costs, and adds the sweep to the batch. To accomplish this,
// it first collects fee details about the sweep being added, about a potential
// new batch composed of this sweep only, and about all existing batches. Then
// new batch composed of this sweep only, and about all existing batches. It
// skips batches with at least MaxSweepsPerBatch swaps to keep tx standard. Then
// it passes the data to selectBatches() function, which emulates adding the
// sweep to each batch and creating new batch for the sweep, and calculates the
// costs of each alternative. Based on the estimates of selectBatches(), this
Expand All @@ -40,6 +41,13 @@ func (b *Batcher) greedyAddSweep(ctx context.Context, sweep *sweep) error {
// Collect weight and fee rate info about existing batches.
batches := make([]feeDetails, 0, len(b.batches))
for _, existingBatch := range b.batches {
// Enforce MaxSweepsPerBatch. If there are already too many
// sweeps in the batch, do not add another sweep to prevent the
// tx from becoming non-standard.
if len(existingBatch.sweeps) >= MaxSweepsPerBatch {
continue
}

batchFeeDetails, err := estimateBatchWeight(existingBatch)
if err != nil {
return fmt.Errorf("failed to estimate tx weight for "+
Expand Down
36 changes: 35 additions & 1 deletion sweepbatcher/sweep_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ const (
// maxFeeToSwapAmtRatio is the maximum fee to swap amount ratio that
// we allow for a batch transaction.
maxFeeToSwapAmtRatio = 0.2

// MaxSweepsPerBatch is the maximum number of sweeps in a single batch.
// It is needed to prevent sweep tx from becoming non-standard. Max
// standard transaction is 400k wu, a non-cooperative input is 393 wu.
MaxSweepsPerBatch = 1000
)

var (
Expand Down Expand Up @@ -433,6 +438,8 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) {
// If the provided sweep is nil, we can't proceed with any checks, so
// we just return early.
if sweep == nil {
b.log.Infof("the sweep is nil")

return false, nil
}

Expand Down Expand Up @@ -462,18 +469,40 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) {
return true, nil
}

// Enforce MaxSweepsPerBatch. If there are already too many sweeps in
// the batch, do not add another sweep to prevent the tx from becoming
// non-standard.
if len(b.sweeps) >= MaxSweepsPerBatch {
b.log.Infof("the batch has already too many sweeps (%d >= %d)",
len(b.sweeps), MaxSweepsPerBatch)

return false, nil
}

// Since all the actions of the batch happen sequentially, we could
// arrive here after the batch got closed because of a spend. In this
// case we cannot add the sweep to this batch.
if b.state != Open {
b.log.Infof("the batch state (%v) is not open", b.state)

return false, nil
}

// If this batch contains a single sweep that spends to a non-wallet
// address, or the incoming sweep is spending to non-wallet address,
// we cannot add this sweep to the batch.
for _, s := range b.sweeps {
if s.isExternalAddr || sweep.isExternalAddr {
if s.isExternalAddr {
b.log.Infof("the batch already has a sweep (%x) with "+
"an external address", s.swapHash[:6])

return false, nil
}

if sweep.isExternalAddr {
b.log.Infof("the batch is not empty and new sweep (%x)"+
" has an external address", sweep.swapHash[:6])

return false, nil
}
}
Expand All @@ -486,6 +515,11 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) {
int32(math.Abs(float64(sweep.timeout - s.timeout)))

if timeoutDistance > b.cfg.maxTimeoutDistance {
b.log.Infof("too long timeout distance between the "+
"batch and sweep %x: %d > %d",
sweep.swapHash[:6], timeoutDistance,
b.cfg.maxTimeoutDistance)

return false, nil
}
}
Expand Down
161 changes: 161 additions & 0 deletions sweepbatcher/sweep_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/test"
"github.com/lightninglabs/loop/utils"
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/input"
Expand Down Expand Up @@ -1176,6 +1177,161 @@ func testDelays(t *testing.T, store testStore, batcherStore testBatcherStore) {
checkBatcherError(t, runErr)
}

// testMaxSweepsPerBatch tests the limit on max number of sweeps per batch.
func testMaxSweepsPerBatch(t *testing.T, store testStore,
batcherStore testBatcherStore) {

// Disable logging, because this test is very noisy.
oldLogger := log
UseLogger(build.NewSubLogger("SWEEP", nil))
defer UseLogger(oldLogger)

defer test.Guard(t, test.WithGuardTimeout(5*time.Minute))()

lnd := test.NewMockLnd()
ctx, cancel := context.WithCancel(context.Background())

sweepStore, err := NewSweepFetcherFromSwapStore(store, lnd.ChainParams)
require.NoError(t, err)

startTime := time.Date(2018, 11, 1, 0, 0, 0, 0, time.UTC)
testClock := clock.NewTestClock(startTime)

// Create muSig2SignSweep failing all sweeps to force non-cooperative
// scenario (it increases transaction size).
muSig2SignSweep := func(ctx context.Context,
protocolVersion loopdb.ProtocolVersion, swapHash lntypes.Hash,
paymentAddr [32]byte, nonce []byte, sweepTxPsbt []byte,
prevoutMap map[wire.OutPoint]*wire.TxOut) (
[]byte, []byte, error) {

return nil, nil, fmt.Errorf("test error")
}

// Set publish delay.
const publishDelay = 3 * time.Second

batcher := NewBatcher(
lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
muSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams,
batcherStore, sweepStore, WithPublishDelay(publishDelay),
WithClock(testClock),
)

var wg sync.WaitGroup
wg.Add(1)

var runErr error
go func() {
defer wg.Done()
runErr = batcher.Run(ctx)
}()

// Wait for the batcher to be initialized.
<-batcher.initDone

const swapsNum = MaxSweepsPerBatch + 1

// Expect 2 batches to be registered.
expectedBatches := (swapsNum + MaxSweepsPerBatch - 1) /
MaxSweepsPerBatch

for i := 0; i < swapsNum; i++ {
preimage := lntypes.Preimage{2, byte(i % 256), byte(i / 256)}
swapHash := preimage.Hash()

// Create a sweep request.
sweepReq := SweepRequest{
SwapHash: swapHash,
Value: 111,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{1, 1},
Index: 1,
},
Notifier: &dummyNotifier,
}

swap := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: 1000,
AmountRequested: 111,
ProtocolVersion: loopdb.ProtocolVersionMuSig2,
HtlcKeys: htlcKeys,

// Make preimage unique to pass SQL constraints.
Preimage: preimage,
},

DestAddr: destAddr,
SwapInvoice: swapInvoice,
SweepConfTarget: 123,
}

err = store.CreateLoopOut(ctx, swapHash, swap)
require.NoError(t, err)
store.AssertLoopOutStored()

// Deliver sweep request to batcher.
require.NoError(t, batcher.AddSweep(&sweepReq))

// If this is new batch, expect a spend registration.
if i%MaxSweepsPerBatch == 0 {
<-lnd.RegisterSpendChannel
}
}

// Eventually the batches are launched and all the sweeps are added.
require.Eventually(t, func() bool {
// Make sure all the batches have started.
if len(batcher.batches) != expectedBatches {
return false
}

// Make sure all the sweeps were added.
sweepsNum := 0
for _, batch := range batcher.batches {
sweepsNum += len(batch.sweeps)
}
return sweepsNum == swapsNum
}, test.Timeout, eventuallyCheckFrequency)

// Advance the clock to publishDelay, so batches are published.
now := startTime.Add(publishDelay)
testClock.SetTime(now)

// Expect mockSigner.SignOutputRaw calls to sign non-cooperative
// sweeps.
for i := 0; i < expectedBatches; i++ {
<-lnd.SignOutputRawChannel
}

// Wait for txs to be published.
inputsNum := 0
const maxWeight = lntypes.WeightUnit(400_000)
for i := 0; i < expectedBatches; i++ {
tx := <-lnd.TxPublishChannel
inputsNum += len(tx.TxIn)

// Make sure the transaction size is standard.
weight := lntypes.WeightUnit(
blockchain.GetTransactionWeight(btcutil.NewTx(tx)),
)
require.Less(t, weight, maxWeight)
t.Logf("tx weight: %v", weight)
}

// Make sure the number of inputs in batch transactions is equal
// to the number of swaps.
require.Equal(t, swapsNum, inputsNum)

// Now make the batcher quit by canceling the context.
cancel()
wg.Wait()

// Make sure the batcher exited without an error.
checkBatcherError(t, runErr)
}

// testSweepBatcherSweepReentry tests that when an old version of the batch tx
// gets confirmed the sweep leftovers are sent back to the batcher.
func testSweepBatcherSweepReentry(t *testing.T, store testStore,
Expand Down Expand Up @@ -3468,6 +3624,11 @@ func TestDelays(t *testing.T) {
runTests(t, testDelays)
}

// TestMaxSweepsPerBatch tests the limit on max number of sweeps per batch.
func TestMaxSweepsPerBatch(t *testing.T) {
runTests(t, testMaxSweepsPerBatch)
}

// TestSweepBatcherSweepReentry tests that when an old version of the batch tx
// gets confirmed the sweep leftovers are sent back to the batcher.
func TestSweepBatcherSweepReentry(t *testing.T) {
Expand Down

0 comments on commit 876ed40

Please sign in to comment.