Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor swap-in trigger mechanism #483

Merged
merged 2 commits into from
Jun 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package fr.acinq.lightning.blockchain.electrum

import fr.acinq.bitcoin.OutPoint
import fr.acinq.lightning.Lightning
import fr.acinq.lightning.channel.LocalFundingStatus
import fr.acinq.lightning.channel.RbfStatus
import fr.acinq.lightning.channel.SignedSharedTransaction
import fr.acinq.lightning.channel.SpliceStatus
import fr.acinq.lightning.channel.states.*
import fr.acinq.lightning.io.RequestChannelOpen
import fr.acinq.lightning.utils.MDCLogger
import fr.acinq.lightning.utils.sat

internal sealed class SwapInCommand {
data class TrySwapIn(val currentBlockHeight: Int, val wallet: WalletState, val swapInConfirmations: Int, val isMigrationFromLegacyApp: Boolean) : SwapInCommand()
data class UnlockWalletInputs(val inputs: Set<OutPoint>) : SwapInCommand()
}

/**
* This object selects inputs that are ready to be used for swaps and keeps track of those that are currently used in channel funding attempts.
* Those inputs should not be reused, otherwise we would double-spend ourselves.
* If the electrum server we connect to has our channel funding attempts in their mempool, those inputs wouldn't be added to our wallet at all.
* But we cannot rely only on that, since we may connect to a different electrum server after a restart, or transactions may be evicted from their mempool.
* Since we don't have an easy way of asking electrum to check for double-spends, we would end up with channels that are stuck waiting for confirmations.
* This generally wouldn't be a security issue (only one of the funding attempts would succeed), unless 0-conf is used and our LSP is malicious.
*
* Note: this object is *not* thread-safe and should be used in a dedicated coroutine.
*/
class SwapInManager(private var reservedUtxos: Set<OutPoint>, private val logger: MDCLogger) {
constructor(bootChannels: List<PersistedChannelState>, logger: MDCLogger) : this(reservedWalletInputs(bootChannels), logger)

internal fun process(cmd: SwapInCommand): RequestChannelOpen? = when (cmd) {
is SwapInCommand.TrySwapIn -> {
val availableWallet = cmd.wallet.withoutReservedUtxos(reservedUtxos).withConfirmations(cmd.currentBlockHeight, cmd.swapInConfirmations)
logger.info { "swap-in wallet balance (migration=${cmd.isMigrationFromLegacyApp}): deeplyConfirmed=${availableWallet.deeplyConfirmed.balance}, weaklyConfirmed=${availableWallet.weaklyConfirmed.balance}, unconfirmed=${availableWallet.unconfirmed.balance}" }
val utxos = when {
// When migrating from the legacy android app, we use all utxos, even unconfirmed ones.
cmd.isMigrationFromLegacyApp -> availableWallet.all
else -> availableWallet.deeplyConfirmed
}
if (utxos.balance > 0.sat) {
logger.info { "swap-in wallet: requesting channel using ${utxos.size} utxos with balance=${utxos.balance}" }
reservedUtxos = reservedUtxos.union(utxos.map { it.outPoint })
RequestChannelOpen(Lightning.randomBytes32(), utxos)
} else {
null
}
}
is SwapInCommand.UnlockWalletInputs -> {
logger.debug { "releasing ${cmd.inputs.size} utxos" }
reservedUtxos = reservedUtxos - cmd.inputs
null
}
}

companion object {
/**
* Return the list of wallet inputs used in pending unconfirmed channel funding attempts.
* These inputs should not be reused in other funding attempts, otherwise we would double-spend ourselves.
*/
fun reservedWalletInputs(channels: List<PersistedChannelState>): Set<OutPoint> {
val unconfirmedFundingTxs: List<SignedSharedTransaction> = buildList {
for (channel in channels) {
// Add all unsigned inputs currently used to build a funding tx that isn't broadcast yet (creation, rbf, splice).
when {
channel is WaitForFundingSigned -> add(channel.signingSession.fundingTx)
channel is WaitForFundingConfirmed && channel.rbfStatus is RbfStatus.WaitingForSigs -> add(channel.rbfStatus.session.fundingTx)
channel is Normal && channel.spliceStatus is SpliceStatus.WaitingForSigs -> add(channel.spliceStatus.session.fundingTx)
else -> {}
}
// Add all inputs in unconfirmed funding txs (utxos spent by confirmed transactions will never appear in our wallet).
when (channel) {
is ChannelStateWithCommitments -> channel.commitments.all
.map { it.localFundingStatus }
.filterIsInstance<LocalFundingStatus.UnconfirmedFundingTx>()
.forEach { add(it.sharedTx) }
else -> {}
}
}
}
val localInputs = unconfirmedFundingTxs.flatMap { fundingTx -> fundingTx.tx.localInputs.map { it.outPoint } }
return localInputs.toSet()
}
}
}
32 changes: 3 additions & 29 deletions src/commonMain/kotlin/fr/acinq/lightning/channel/Helpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import fr.acinq.lightning.blockchain.fee.FeeratePerKw
import fr.acinq.lightning.blockchain.fee.FeerateTolerance
import fr.acinq.lightning.blockchain.fee.OnChainFeerates
import fr.acinq.lightning.channel.Helpers.Closing.inputsAlreadySpent
import fr.acinq.lightning.channel.states.*
import fr.acinq.lightning.channel.states.Channel
import fr.acinq.lightning.channel.states.ClosingFeerates
import fr.acinq.lightning.channel.states.ClosingFees
import fr.acinq.lightning.crypto.Bolt3Derivation.deriveForCommitment
import fr.acinq.lightning.crypto.Bolt3Derivation.deriveForRevocation
import fr.acinq.lightning.crypto.KeyManager
Expand Down Expand Up @@ -244,34 +246,6 @@ object Helpers {
}
}

/**
* Return the list of wallet inputs used in pending unconfirmed channel funding attempts.
* These inputs should not be reused in other funding attempts, otherwise we would double-spend ourselves.
*/
fun reservedWalletInputs(channels: List<PersistedChannelState>): Set<OutPoint> {
val unconfirmedFundingTxs: List<SignedSharedTransaction> = buildList {
for (channel in channels) {
// Add all unsigned inputs currently used to build a funding tx that isn't broadcast yet (creation, rbf, splice).
when {
channel is WaitForFundingSigned -> add(channel.signingSession.fundingTx)
channel is WaitForFundingConfirmed && channel.rbfStatus is RbfStatus.WaitingForSigs -> add(channel.rbfStatus.session.fundingTx)
channel is Normal && channel.spliceStatus is SpliceStatus.WaitingForSigs -> add(channel.spliceStatus.session.fundingTx)
else -> {}
}
// Add all inputs in unconfirmed funding txs (utxos spent by confirmed transactions will never appear in our wallet).
when (channel) {
is ChannelStateWithCommitments -> channel.commitments.all
.map { it.localFundingStatus }
.filterIsInstance<LocalFundingStatus.UnconfirmedFundingTx>()
.forEach { add(it.sharedTx) }
else -> {}
}
}
}
val localInputs = unconfirmedFundingTxs.flatMap { fundingTx -> fundingTx.tx.localInputs.map { it.outPoint } }
return localInputs.toSet()
}

object Funding {

/** Compute the channelId of a dual-funded channel. */
Expand Down
42 changes: 18 additions & 24 deletions src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ class Peer(
private var output = Channel<ByteArray>(UNLIMITED)
val outputLightningMessages: ReceiveChannel<ByteArray> get() = output

private val swapInCommands = Channel<SwapInCommand>(Channel.UNLIMITED)

private val logger = MDCLogger(nodeParams.loggerFactory.newLogger(this::class), staticMdc = mapOf("remoteNodeId" to remoteNodeId))

// The channels map, as initially loaded from the database at "boot" (on Peer.init).
Expand Down Expand Up @@ -226,10 +228,14 @@ class Peer(
it.channelId
}
logger.info { "restored ${channelIds.size} channels" }
launch {
val swapInManager = SwapInManager(bootChannels, logger)
processSwapInCommands(swapInManager)
}
launch {
// wait to have a swap-in feerate available
swapInFeeratesFlow.filterNotNull().first()
watchSwapInWallet(bootChannels)
watchSwapInWallet()
}
launch {
// If we have some htlcs that have timed out, we may need to close channels to ensure we don't lose funds.
Expand All @@ -249,7 +255,6 @@ class Peer(
previousState = it
}
}

}

private suspend fun updateEstimateFees() {
Expand Down Expand Up @@ -384,34 +389,21 @@ class Peer(
listen() // This suspends until the coroutines is cancelled or the socket is closed
}

private suspend fun watchSwapInWallet(channels: List<PersistedChannelState>) {
// Wallet utxos that are already used in channel funding attempts should be ignored, otherwise we would double-spend ourselves.
// If the electrum server we connect to has our channel funding attempts in their mempool, those utxos wouldn't be added to our wallet anyway.
// But we cannot rely only on that, since we may connect to a different electrum server after a restart, or transactions may be evicted from their mempool.
// Since we don't have an easy way of asking electrum to check for double-spends, we would end up with channels that are stuck waiting for confirmations.
// This generally wouldn't be a security issue (only one of the funding attempts would succeed), unless 0-conf is used and our LSP is malicious.
val initiallyReservedUtxos: Set<OutPoint> = Helpers.reservedWalletInputs(channels)
private suspend fun watchSwapInWallet() {
swapInWallet.walletStateFlow
.filter { it.consistent }
.fold(initiallyReservedUtxos) { reservedUtxos, wallet ->
.collect {
val currentBlockHeight = currentTipFlow.filterNotNull().first().first
val availableWallet = wallet.withoutReservedUtxos(reservedUtxos).withConfirmations(currentBlockHeight, walletParams.swapInConfirmations)
logger.info { "swap-in wallet balance (migration=$isMigrationFromLegacyApp): deeplyConfirmed=${availableWallet.deeplyConfirmed.balance}, weaklyConfirmed=${availableWallet.weaklyConfirmed.balance}, unconfirmed=${availableWallet.unconfirmed.balance}" }
val utxos = when {
// When migrating from the legacy android app, we use all utxos, even unconfirmed ones.
isMigrationFromLegacyApp -> availableWallet.all
else -> availableWallet.deeplyConfirmed
}
if (utxos.balance > 0.sat) {
logger.info { "swap-in wallet: requesting channel using ${utxos.size} utxos with balance=${utxos.balance}" }
input.send(RequestChannelOpen(Lightning.randomBytes32(), utxos))
reservedUtxos.union(utxos.map { it.outPoint })
} else {
reservedUtxos
}
swapInCommands.send(SwapInCommand.TrySwapIn(currentBlockHeight, it, walletParams.swapInConfirmations, isMigrationFromLegacyApp))
}
}

private suspend fun processSwapInCommands(swapInManager: SwapInManager) {
for (command in swapInCommands) {
swapInManager.process(command)?.let { requestChannelOpen -> input.send(requestChannelOpen) }
}
}

suspend fun send(cmd: PeerCommand) {
input.send(cmd)
}
Expand Down Expand Up @@ -770,6 +762,7 @@ class Peer(
nodeParams.liquidityPolicy.value.maybeReject(request.walletInputs.balance.toMilliSatoshi(), totalFee, LiquidityEvents.Source.OnChainWallet, logger)?.let { rejected ->
logger.info { "rejecting open_channel2: reason=${rejected.reason}" }
nodeParams._nodeEvents.emit(rejected)
swapInCommands.send(SwapInCommand.UnlockWalletInputs(request.walletInputs.map { it.outPoint }.toSet()))
sendToPeer(Error(msg.temporaryChannelId, "cancelling open due to local liquidity policy"))
return@withMDC
}
Expand Down Expand Up @@ -953,6 +946,7 @@ class Peer(
nodeParams.liquidityPolicy.value.maybeReject(cmd.walletInputs.balance.toMilliSatoshi(), fee.toMilliSatoshi(), LiquidityEvents.Source.OnChainWallet, logger)?.let { rejected ->
logger.info { "rejecting splice: reason=${rejected.reason}" }
nodeParams._nodeEvents.emit(rejected)
swapInCommands.send(SwapInCommand.UnlockWalletInputs(cmd.walletInputs.map { it.outPoint }.toSet()))
return
}

Expand Down
Loading