Skip to content

Commit

Permalink
Introduce a SwapInManager
Browse files Browse the repository at this point in the history
We create a new `SwapInManager` that checks the wallet state and decides
whether to initiate a channel funding attempt, while keeping track of
utxos that are currently being used.

We allow unlocking those utxos once a channel funding attempt fails,
which ensures that if we retry later (because another input is confirmed)
we will reuse the previously failed inputs (instead of waiting for a
restart to use them).
  • Loading branch information
t-bast committed Jun 23, 2023
1 parent e7a38da commit 73db384
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package fr.acinq.lightning.blockchain.electrum

import fr.acinq.bitcoin.OutPoint
import fr.acinq.lightning.Lightning
import fr.acinq.lightning.channel.LocalFundingStatus
import fr.acinq.lightning.channel.RbfStatus
import fr.acinq.lightning.channel.SignedSharedTransaction
import fr.acinq.lightning.channel.SpliceStatus
import fr.acinq.lightning.channel.states.*
import fr.acinq.lightning.io.RequestChannelOpen
import fr.acinq.lightning.utils.MDCLogger
import fr.acinq.lightning.utils.sat

internal sealed class SwapInCommand {
data class TrySwapIn(val currentBlockHeight: Int, val wallet: WalletState, val swapInConfirmations: Int, val isMigrationFromLegacyApp: Boolean) : SwapInCommand()
data class UnlockWalletInputs(val inputs: Set<OutPoint>) : SwapInCommand()
}

/**
* This object selects inputs that are ready to be used for swaps and keeps track of those that are currently used in channel funding attempts.
* Those inputs should not be reused, otherwise we would double-spend ourselves.
* If the electrum server we connect to has our channel funding attempts in their mempool, those inputs wouldn't be added to our wallet at all.
* But we cannot rely only on that, since we may connect to a different electrum server after a restart, or transactions may be evicted from their mempool.
* Since we don't have an easy way of asking electrum to check for double-spends, we would end up with channels that are stuck waiting for confirmations.
* This generally wouldn't be a security issue (only one of the funding attempts would succeed), unless 0-conf is used and our LSP is malicious.
*
* Note: this object is *not* thread-safe and should be used in a dedicated coroutine.
*/
class SwapInManager(private var reservedUtxos: Set<OutPoint>, private val logger: MDCLogger) {
constructor(bootChannels: List<PersistedChannelState>, logger: MDCLogger) : this(reservedWalletInputs(bootChannels), logger)

internal fun process(cmd: SwapInCommand): RequestChannelOpen? = when (cmd) {
is SwapInCommand.TrySwapIn -> {
val availableWallet = cmd.wallet.withoutReservedUtxos(reservedUtxos).withConfirmations(cmd.currentBlockHeight, cmd.swapInConfirmations)
logger.info { "swap-in wallet balance (migration=${cmd.isMigrationFromLegacyApp}): deeplyConfirmed=${availableWallet.deeplyConfirmed.balance}, weaklyConfirmed=${availableWallet.weaklyConfirmed.balance}, unconfirmed=${availableWallet.unconfirmed.balance}" }
val utxos = when {
// When migrating from the legacy android app, we use all utxos, even unconfirmed ones.
cmd.isMigrationFromLegacyApp -> availableWallet.all
else -> availableWallet.deeplyConfirmed
}
if (utxos.balance > 0.sat) {
logger.info { "swap-in wallet: requesting channel using ${utxos.size} utxos with balance=${utxos.balance}" }
reservedUtxos = reservedUtxos.union(utxos.map { it.outPoint })
RequestChannelOpen(Lightning.randomBytes32(), utxos)
} else {
null
}
}
is SwapInCommand.UnlockWalletInputs -> {
logger.debug { "releasing ${cmd.inputs.size} utxos" }
reservedUtxos = reservedUtxos - cmd.inputs
null
}
}

companion object {
/**
* Return the list of wallet inputs used in pending unconfirmed channel funding attempts.
* These inputs should not be reused in other funding attempts, otherwise we would double-spend ourselves.
*/
fun reservedWalletInputs(channels: List<PersistedChannelState>): Set<OutPoint> {
val unconfirmedFundingTxs: List<SignedSharedTransaction> = buildList {
for (channel in channels) {
// Add all unsigned inputs currently used to build a funding tx that isn't broadcast yet (creation, rbf, splice).
when {
channel is WaitForFundingSigned -> add(channel.signingSession.fundingTx)
channel is WaitForFundingConfirmed && channel.rbfStatus is RbfStatus.WaitingForSigs -> add(channel.rbfStatus.session.fundingTx)
channel is Normal && channel.spliceStatus is SpliceStatus.WaitingForSigs -> add(channel.spliceStatus.session.fundingTx)
else -> {}
}
// Add all inputs in unconfirmed funding txs (utxos spent by confirmed transactions will never appear in our wallet).
when (channel) {
is ChannelStateWithCommitments -> channel.commitments.all
.map { it.localFundingStatus }
.filterIsInstance<LocalFundingStatus.UnconfirmedFundingTx>()
.forEach { add(it.sharedTx) }
else -> {}
}
}
}
val localInputs = unconfirmedFundingTxs.flatMap { fundingTx -> fundingTx.tx.localInputs.map { it.outPoint } }
return localInputs.toSet()
}
}
}
32 changes: 3 additions & 29 deletions src/commonMain/kotlin/fr/acinq/lightning/channel/Helpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import fr.acinq.lightning.blockchain.fee.FeeratePerKw
import fr.acinq.lightning.blockchain.fee.FeerateTolerance
import fr.acinq.lightning.blockchain.fee.OnChainFeerates
import fr.acinq.lightning.channel.Helpers.Closing.inputsAlreadySpent
import fr.acinq.lightning.channel.states.*
import fr.acinq.lightning.channel.states.Channel
import fr.acinq.lightning.channel.states.ClosingFeerates
import fr.acinq.lightning.channel.states.ClosingFees
import fr.acinq.lightning.crypto.Bolt3Derivation.deriveForCommitment
import fr.acinq.lightning.crypto.Bolt3Derivation.deriveForRevocation
import fr.acinq.lightning.crypto.KeyManager
Expand Down Expand Up @@ -244,34 +246,6 @@ object Helpers {
}
}

/**
* Return the list of wallet inputs used in pending unconfirmed channel funding attempts.
* These inputs should not be reused in other funding attempts, otherwise we would double-spend ourselves.
*/
fun reservedWalletInputs(channels: List<PersistedChannelState>): Set<OutPoint> {
val unconfirmedFundingTxs: List<SignedSharedTransaction> = buildList {
for (channel in channels) {
// Add all unsigned inputs currently used to build a funding tx that isn't broadcast yet (creation, rbf, splice).
when {
channel is WaitForFundingSigned -> add(channel.signingSession.fundingTx)
channel is WaitForFundingConfirmed && channel.rbfStatus is RbfStatus.WaitingForSigs -> add(channel.rbfStatus.session.fundingTx)
channel is Normal && channel.spliceStatus is SpliceStatus.WaitingForSigs -> add(channel.spliceStatus.session.fundingTx)
else -> {}
}
// Add all inputs in unconfirmed funding txs (utxos spent by confirmed transactions will never appear in our wallet).
when (channel) {
is ChannelStateWithCommitments -> channel.commitments.all
.map { it.localFundingStatus }
.filterIsInstance<LocalFundingStatus.UnconfirmedFundingTx>()
.forEach { add(it.sharedTx) }
else -> {}
}
}
}
val localInputs = unconfirmedFundingTxs.flatMap { fundingTx -> fundingTx.tx.localInputs.map { it.outPoint } }
return localInputs.toSet()
}

object Funding {

/** Compute the channelId of a dual-funded channel. */
Expand Down
42 changes: 18 additions & 24 deletions src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ class Peer(
private var output = Channel<ByteArray>(UNLIMITED)
val outputLightningMessages: ReceiveChannel<ByteArray> get() = output

private val swapInCommands = Channel<SwapInCommand>(Channel.UNLIMITED)

private val logger = MDCLogger(nodeParams.loggerFactory.newLogger(this::class), staticMdc = mapOf("remoteNodeId" to remoteNodeId))

// The channels map, as initially loaded from the database at "boot" (on Peer.init).
Expand Down Expand Up @@ -227,9 +229,13 @@ class Peer(
}
logger.info { "restored ${channelIds.size} channels" }
launch {
val swapInManager = SwapInManager(bootChannels, logger)
// wait to have a swap-in feerate available
swapInFeeratesFlow.filterNotNull().first()
watchSwapInWallet(bootChannels)
processSwapInCommands(swapInManager)
}
launch {
watchSwapInWallet()
}
launch {
// If we have some htlcs that have timed out, we may need to close channels to ensure we don't lose funds.
Expand All @@ -249,7 +255,6 @@ class Peer(
previousState = it
}
}

}

private suspend fun updateEstimateFees() {
Expand Down Expand Up @@ -384,34 +389,21 @@ class Peer(
listen() // This suspends until the coroutines is cancelled or the socket is closed
}

private suspend fun watchSwapInWallet(channels: List<PersistedChannelState>) {
// Wallet utxos that are already used in channel funding attempts should be ignored, otherwise we would double-spend ourselves.
// If the electrum server we connect to has our channel funding attempts in their mempool, those utxos wouldn't be added to our wallet anyway.
// But we cannot rely only on that, since we may connect to a different electrum server after a restart, or transactions may be evicted from their mempool.
// Since we don't have an easy way of asking electrum to check for double-spends, we would end up with channels that are stuck waiting for confirmations.
// This generally wouldn't be a security issue (only one of the funding attempts would succeed), unless 0-conf is used and our LSP is malicious.
val initiallyReservedUtxos: Set<OutPoint> = Helpers.reservedWalletInputs(channels)
private suspend fun watchSwapInWallet() {
swapInWallet.walletStateFlow
.filter { it.consistent }
.fold(initiallyReservedUtxos) { reservedUtxos, wallet ->
.collect {
val currentBlockHeight = currentTipFlow.filterNotNull().first().first
val availableWallet = wallet.withoutReservedUtxos(reservedUtxos).withConfirmations(currentBlockHeight, walletParams.swapInConfirmations)
logger.info { "swap-in wallet balance (migration=$isMigrationFromLegacyApp): deeplyConfirmed=${availableWallet.deeplyConfirmed.balance}, weaklyConfirmed=${availableWallet.weaklyConfirmed.balance}, unconfirmed=${availableWallet.unconfirmed.balance}" }
val utxos = when {
// When migrating from the legacy android app, we use all utxos, even unconfirmed ones.
isMigrationFromLegacyApp -> availableWallet.all
else -> availableWallet.deeplyConfirmed
}
if (utxos.balance > 0.sat) {
logger.info { "swap-in wallet: requesting channel using ${utxos.size} utxos with balance=${utxos.balance}" }
input.send(RequestChannelOpen(Lightning.randomBytes32(), utxos))
reservedUtxos.union(utxos.map { it.outPoint })
} else {
reservedUtxos
}
swapInCommands.send(SwapInCommand.TrySwapIn(currentBlockHeight, it, walletParams.swapInConfirmations, isMigrationFromLegacyApp))
}
}

private suspend fun processSwapInCommands(swapInManager: SwapInManager) {
for (command in swapInCommands) {
swapInManager.process(command)?.let { requestChannelOpen -> input.send(requestChannelOpen) }
}
}

suspend fun send(cmd: PeerCommand) {
input.send(cmd)
}
Expand Down Expand Up @@ -770,6 +762,7 @@ class Peer(
nodeParams.liquidityPolicy.value.maybeReject(request.walletInputs.balance.toMilliSatoshi(), totalFee, LiquidityEvents.Source.OnChainWallet, logger)?.let { rejected ->
logger.info { "rejecting open_channel2: reason=${rejected.reason}" }
nodeParams._nodeEvents.emit(rejected)
swapInCommands.send(SwapInCommand.UnlockWalletInputs(request.walletInputs.map { it.outPoint }.toSet()))
sendToPeer(Error(msg.temporaryChannelId, "cancelling open due to local liquidity policy"))
return@withMDC
}
Expand Down Expand Up @@ -953,6 +946,7 @@ class Peer(
nodeParams.liquidityPolicy.value.maybeReject(cmd.walletInputs.balance.toMilliSatoshi(), fee.toMilliSatoshi(), LiquidityEvents.Source.OnChainWallet, logger)?.let { rejected ->
logger.info { "rejecting splice: reason=${rejected.reason}" }
nodeParams._nodeEvents.emit(rejected)
swapInCommands.send(SwapInCommand.UnlockWalletInputs(cmd.walletInputs.map { it.outPoint }.toSet()))
return
}

Expand Down
Loading

0 comments on commit 73db384

Please sign in to comment.