Skip to content

Commit

Permalink
Retry swaps on liquidity policy update
Browse files Browse the repository at this point in the history
We create a new `SwapInManager` that checks the wallet state and decides
whether to initiate a channel funding attempt, while keeping track of
utxos that are currently being used.

We allow unlocking those utxos once a channel funding attempt fails, and
listen to liquidity policy updates to retry swaps when a change is
detected.
  • Loading branch information
t-bast committed Jun 20, 2023
1 parent 6618b51 commit 07febbb
Show file tree
Hide file tree
Showing 6 changed files with 277 additions and 71 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package fr.acinq.lightning.blockchain.electrum

import fr.acinq.bitcoin.OutPoint
import fr.acinq.lightning.Lightning
import fr.acinq.lightning.channel.LocalFundingStatus
import fr.acinq.lightning.channel.RbfStatus
import fr.acinq.lightning.channel.SignedSharedTransaction
import fr.acinq.lightning.channel.SpliceStatus
import fr.acinq.lightning.channel.states.*
import fr.acinq.lightning.io.RequestChannelOpen
import fr.acinq.lightning.io.WalletCommand
import fr.acinq.lightning.utils.MDCLogger
import fr.acinq.lightning.utils.sat

/**
* This object selects inputs that are ready to be used for swaps and keeps track of those that are currently used in channel funding attempts.
* Those inputs should not be reused, otherwise we would double-spend ourselves.
* If the electrum server we connect to has our channel funding attempts in their mempool, those inputs wouldn't be added to our wallet at all.
* But we cannot rely only on that, since we may connect to a different electrum server after a restart, or transactions may be evicted from their mempool.
* Since we don't have an easy way of asking electrum to check for double-spends, we would end up with channels that are stuck waiting for confirmations.
* This generally wouldn't be a security issue (only one of the funding attempts would succeed), unless 0-conf is used and our LSP is malicious.
*
* Note: this object is *not* thread-safe and should be used in a dedicated coroutine.
*/
class SwapInManager(private var reservedUtxos: Set<OutPoint>, private val logger: MDCLogger) {
constructor(bootChannels: List<PersistedChannelState>, logger: MDCLogger) : this(reservedWalletInputs(bootChannels), logger)

internal fun process(cmd: WalletCommand): RequestChannelOpen? = when (cmd) {
is WalletCommand.TrySwapIn -> {
val availableWallet = cmd.wallet.withoutReservedUtxos(reservedUtxos).withConfirmations(cmd.currentBlockHeight, cmd.swapInConfirmations)
logger.info { "swap-in wallet balance (migration=${cmd.isMigrationFromLegacyApp}): deeplyConfirmed=${availableWallet.deeplyConfirmed.balance}, weaklyConfirmed=${availableWallet.weaklyConfirmed.balance}, unconfirmed=${availableWallet.unconfirmed.balance}" }
val utxos = when {
// When migrating from the legacy android app, we use all utxos, even unconfirmed ones.
cmd.isMigrationFromLegacyApp -> availableWallet.all
else -> availableWallet.deeplyConfirmed
}
if (utxos.balance > 0.sat) {
logger.info { "swap-in wallet: requesting channel using ${utxos.size} utxos with balance=${utxos.balance}" }
reservedUtxos = reservedUtxos.union(utxos.map { it.outPoint })
RequestChannelOpen(Lightning.randomBytes32(), utxos)
} else {
null
}
}
is WalletCommand.UnlockWalletInputs -> {
logger.debug { "releasing ${cmd.inputs.size} utxos" }
reservedUtxos = reservedUtxos - cmd.inputs
null
}
}

companion object {
/**
* Return the list of wallet inputs used in pending unconfirmed channel funding attempts.
* These inputs should not be reused in other funding attempts, otherwise we would double-spend ourselves.
*/
fun reservedWalletInputs(channels: List<PersistedChannelState>): Set<OutPoint> {
val unconfirmedFundingTxs: List<SignedSharedTransaction> = buildList {
for (channel in channels) {
// Add all unsigned inputs currently used to build a funding tx that isn't broadcast yet (creation, rbf, splice).
when {
channel is WaitForFundingSigned -> add(channel.signingSession.fundingTx)
channel is WaitForFundingConfirmed && channel.rbfStatus is RbfStatus.WaitingForSigs -> add(channel.rbfStatus.session.fundingTx)
channel is Normal && channel.spliceStatus is SpliceStatus.WaitingForSigs -> add(channel.spliceStatus.session.fundingTx)
else -> {}
}
// Add all inputs in unconfirmed funding txs (utxos spent by confirmed transactions will never appear in our wallet).
when (channel) {
is ChannelStateWithCommitments -> channel.commitments.all
.map { it.localFundingStatus }
.filterIsInstance<LocalFundingStatus.UnconfirmedFundingTx>()
.forEach { add(it.sharedTx) }
else -> {}
}
}
}
val localInputs = unconfirmedFundingTxs.flatMap { fundingTx -> fundingTx.tx.localInputs.map { it.outPoint } }
return localInputs.toSet()
}
}
}
32 changes: 3 additions & 29 deletions src/commonMain/kotlin/fr/acinq/lightning/channel/Helpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import fr.acinq.lightning.blockchain.fee.FeeratePerKw
import fr.acinq.lightning.blockchain.fee.FeerateTolerance
import fr.acinq.lightning.blockchain.fee.OnChainFeerates
import fr.acinq.lightning.channel.Helpers.Closing.inputsAlreadySpent
import fr.acinq.lightning.channel.states.*
import fr.acinq.lightning.channel.states.Channel
import fr.acinq.lightning.channel.states.ClosingFeerates
import fr.acinq.lightning.channel.states.ClosingFees
import fr.acinq.lightning.crypto.Bolt3Derivation.deriveForCommitment
import fr.acinq.lightning.crypto.Bolt3Derivation.deriveForRevocation
import fr.acinq.lightning.crypto.KeyManager
Expand Down Expand Up @@ -244,34 +246,6 @@ object Helpers {
}
}

/**
* Return the list of wallet inputs used in pending unconfirmed channel funding attempts.
* These inputs should not be reused in other funding attempts, otherwise we would double-spend ourselves.
*/
fun reservedWalletInputs(channels: List<PersistedChannelState>): Set<OutPoint> {
val unconfirmedFundingTxs: List<SignedSharedTransaction> = buildList {
for (channel in channels) {
// Add all unsigned inputs currently used to build a funding tx that isn't broadcast yet (creation, rbf, splice).
when {
channel is WaitForFundingSigned -> add(channel.signingSession.fundingTx)
channel is WaitForFundingConfirmed && channel.rbfStatus is RbfStatus.WaitingForSigs -> add(channel.rbfStatus.session.fundingTx)
channel is Normal && channel.spliceStatus is SpliceStatus.WaitingForSigs -> add(channel.spliceStatus.session.fundingTx)
else -> {}
}
// Add all inputs in unconfirmed funding txs (utxos spent by confirmed transactions will never appear in our wallet).
when (channel) {
is ChannelStateWithCommitments -> channel.commitments.all
.map { it.localFundingStatus }
.filterIsInstance<LocalFundingStatus.UnconfirmedFundingTx>()
.forEach { add(it.sharedTx) }
else -> {}
}
}
}
val localInputs = unconfirmedFundingTxs.flatMap { fundingTx -> fundingTx.tx.localInputs.map { it.outPoint } }
return localInputs.toSet()
}

object Funding {

/** Compute the channelId of a dual-funded channel. */
Expand Down
65 changes: 35 additions & 30 deletions src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ data class SendPayment(val paymentId: UUID, val amount: MilliSatoshi, val recipi

data class PurgeExpiredPayments(val fromCreatedAt: Long, val toCreatedAt: Long) : PaymentCommand()

internal sealed class WalletCommand : PeerCommand() {
data class TrySwapIn(val currentBlockHeight: Int, val wallet: WalletState, val swapInConfirmations: Int, val isMigrationFromLegacyApp: Boolean) : WalletCommand()
data class UnlockWalletInputs(val inputs: Set<OutPoint>) : WalletCommand()
}

sealed class PeerEvent
data class PaymentRequestGenerated(val receivePayment: ReceivePayment, val request: String) : PeerEvent()
data class PaymentReceived(val incomingPayment: IncomingPayment, val received: IncomingPayment.Received) : PeerEvent()
Expand Down Expand Up @@ -220,9 +225,8 @@ class Peer(
it.channelId
}
logger.info { "restored ${channelIds.size} channels" }
launch {
watchSwapInWallet(bootChannels)
}
launch { watchSwapInWallet() }
launch { watchLiquidityPolicyChanges() }
launch {
// If we have some htlcs that have timed out, we may need to close channels to ensure we don't lose funds.
// But maybe we were offline for too long and it is why our peer couldn't settle these htlcs in time.
Expand All @@ -231,7 +235,8 @@ class Peer(
logger.info { "checking for timed out htlcs for channels: ${channelIds.joinToString(", ")}" }
channelIds.forEach { input.send(WrappedChannelCommand(it, ChannelCommand.CheckHtlcTimeout)) }
}
run()
val swapInManager = SwapInManager(bootChannels, logger)
run(swapInManager)
}
launch {
var previousState = connectionState.value
Expand All @@ -241,7 +246,6 @@ class Peer(
previousState = it
}
}

}

private suspend fun updateEstimateFees() {
Expand Down Expand Up @@ -376,34 +380,29 @@ class Peer(
listen() // This suspends until the coroutines is cancelled or the socket is closed
}

private suspend fun watchSwapInWallet(channels: List<PersistedChannelState>) {
// Wallet utxos that are already used in channel funding attempts should be ignored, otherwise we would double-spend ourselves.
// If the electrum server we connect to has our channel funding attempts in their mempool, those utxos wouldn't be added to our wallet anyway.
// But we cannot rely only on that, since we may connect to a different electrum server after a restart, or transactions may be evicted from their mempool.
// Since we don't have an easy way of asking electrum to check for double-spends, we would end up with channels that are stuck waiting for confirmations.
// This generally wouldn't be a security issue (only one of the funding attempts would succeed), unless 0-conf is used and our LSP is malicious.
val initiallyReservedUtxos: Set<OutPoint> = Helpers.reservedWalletInputs(channels)
private suspend fun watchSwapInWallet() {
swapInWallet.walletStateFlow
.filter { it.consistent }
.fold(initiallyReservedUtxos) { reservedUtxos, wallet ->
.collect {
val currentBlockHeight = currentTipFlow.filterNotNull().first().first
val availableWallet = wallet.withoutReservedUtxos(reservedUtxos).withConfirmations(currentBlockHeight, walletParams.swapInConfirmations)
logger.info { "swap-in wallet balance (migration=$isMigrationFromLegacyApp): deeplyConfirmed=${availableWallet.deeplyConfirmed.balance}, weaklyConfirmed=${availableWallet.weaklyConfirmed.balance}, unconfirmed=${availableWallet.unconfirmed.balance}" }
val utxos = when {
// When migrating from the legacy android app, we use all utxos, even unconfirmed ones.
isMigrationFromLegacyApp -> availableWallet.all
else -> availableWallet.deeplyConfirmed
}
if (utxos.balance > 0.sat) {
logger.info { "swap-in wallet: requesting channel using ${utxos.size} utxos with balance=${utxos.balance}" }
input.send(RequestChannelOpen(Lightning.randomBytes32(), utxos))
reservedUtxos.union(utxos.map { it.outPoint })
} else {
reservedUtxos
}
input.send(WalletCommand.TrySwapIn(currentBlockHeight, it, walletParams.swapInConfirmations, isMigrationFromLegacyApp))
}
}

private suspend fun watchLiquidityPolicyChanges() {
nodeParams.liquidityPolicy.collect {
val walletState = swapInWallet.walletStateFlow.first()
if (walletState.consistent) {
logger.info { "liquidity policy changed, retrying swaps" }
val currentBlockHeight = currentTipFlow.filterNotNull().first().first
input.send(WalletCommand.TrySwapIn(currentBlockHeight, walletState, walletParams.swapInConfirmations, isMigrationFromLegacyApp))
} else {
// If the wallet is inconsistent, it should become consistent soon, which will trigger the normal swap flow.
logger.info { "liquidity policy changed, waiting for wallet to be consistent" }
}
}
}

suspend fun send(cmd: PeerCommand) {
input.send(cmd)
}
Expand Down Expand Up @@ -696,14 +695,14 @@ class Peer(
Secp256k1DHFunctions, Chacha20Poly1305CipherFunctions, SHA256HashFunctions
)

private suspend fun run() {
private suspend fun run(swapInManager: SwapInManager) {
logger.info { "peer is active" }
for (event in input) {
processEvent(event)
processEvent(swapInManager, event)
}
}

private suspend fun processEvent(cmd: PeerCommand) {
private suspend fun processEvent(swapInManager: SwapInManager, cmd: PeerCommand) {
when {
cmd is BytesReceived -> {
val msg = try {
Expand Down Expand Up @@ -774,6 +773,7 @@ class Peer(
nodeParams.liquidityPolicy.value.maybeReject(request.walletInputs.balance.toMilliSatoshi(), totalFee, LiquidityEvents.Source.OnChainWallet, logger)?.let { rejected ->
logger.info { "rejecting open_channel2: reason=${rejected.reason}" }
nodeParams._nodeEvents.emit(rejected)
input.send(WalletCommand.UnlockWalletInputs(request.walletInputs.map { it.outPoint }.toSet()))
sendToPeer(Error(msg.temporaryChannelId, "cancelling open due to local liquidity policy"))
return@withMDC
}
Expand Down Expand Up @@ -965,6 +965,7 @@ class Peer(
nodeParams.liquidityPolicy.value.maybeReject(cmd.walletInputs.balance.toMilliSatoshi(), fee.toMilliSatoshi(), LiquidityEvents.Source.OnChainWallet, logger)?.let { rejected ->
logger.info { "rejecting splice: reason=${rejected.reason}" }
nodeParams._nodeEvents.emit(rejected)
input.send(WalletCommand.UnlockWalletInputs(cmd.walletInputs.map { it.outPoint }.toSet()))
return
}

Expand Down Expand Up @@ -1094,6 +1095,10 @@ class Peer(
}
}

cmd is WalletCommand -> {
swapInManager.process(cmd)?.let { input.send(it) }
}

cmd is Disconnected -> {
logger.warning { "disconnecting channels" }
_channels.forEach { (key, value) ->
Expand Down
Loading

0 comments on commit 07febbb

Please sign in to comment.