diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala index d106914e32..dbbef9613d 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala @@ -18,11 +18,10 @@ package fr.acinq.eclair.channel import akka.actor.ActorRef import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey -import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, Transaction} +import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, Transaction, TxId} import fr.acinq.eclair.blockchain.fee.FeeratePerKw import fr.acinq.eclair.channel.Helpers.Closing.ClosingType -import fr.acinq.eclair.io.Peer.OpenChannelResponse -import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate} +import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, LiquidityAds} import fr.acinq.eclair.{BlockHeight, Features, ShortChannelId} /** @@ -79,6 +78,8 @@ case class ChannelSignatureSent(channel: ActorRef, commitments: Commitments) ext case class ChannelSignatureReceived(channel: ActorRef, commitments: Commitments) extends ChannelEvent +case class LiquidityPurchased(channel: ActorRef, channelId: ByteVector32, remoteNodeId: PublicKey, fundingTxId: TxId, purchase: LiquidityAds.LiquidityPurchased) extends ChannelEvent + case class ChannelErrorOccurred(channel: ActorRef, channelId: ByteVector32, remoteNodeId: PublicKey, error: ChannelError, isFatal: Boolean) extends ChannelEvent // NB: the fee should be set to 0 when we're not paying it. diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala index dc56e82e62..74a36e0e0a 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala @@ -16,6 +16,8 @@ package fr.acinq.eclair.channel.fund +import akka.actor.typed.eventstream.EventStream +import akka.actor.typed.scaladsl.adapter.TypedActorRefOps import akka.actor.typed.scaladsl.{ActorContext, Behaviors, StashBuffer} import akka.actor.typed.{ActorRef, Behavior} import akka.event.LoggingAdapter @@ -778,6 +780,7 @@ private class InteractiveTxBuilder(replyTo: ActorRef[InteractiveTxBuilder.Respon Behaviors.receiveMessagePartial { case SignTransactionResult(signedTx) => log.info(s"interactive-tx txid=${signedTx.txId} partially signed with {} local inputs, {} remote inputs, {} local outputs and {} remote outputs", signedTx.tx.localInputs.length, signedTx.tx.remoteInputs.length, signedTx.tx.localOutputs.length, signedTx.tx.remoteOutputs.length) + liquidityPurchased_opt.foreach(purchase => context.system.eventStream ! EventStream.Publish(LiquidityPurchased(replyTo.toClassic, channelParams.channelId, remoteNodeId, signedTx.txId, purchase))) replyTo ! Succeeded(InteractiveTxSigningSession.WaitingForSigs(fundingParams, purpose.fundingTxIndex, signedTx, Left(localCommit), remoteCommit), commitSig) Behaviors.stopped case WalletFailure(t) => diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/AuditDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/AuditDb.scala index f9cf8cf5ff..d77898692f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/AuditDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/AuditDb.scala @@ -22,6 +22,7 @@ import fr.acinq.eclair.channel._ import fr.acinq.eclair.db.AuditDb.{NetworkFee, PublishedTransaction, Stats} import fr.acinq.eclair.db.DbEventHandler.ChannelEvent import fr.acinq.eclair.payment.{PathFindingExperimentMetrics, PaymentReceived, PaymentRelayed, PaymentSent} +import fr.acinq.eclair.wire.protocol.LiquidityAds import fr.acinq.eclair.{Paginated, TimestampMilli} trait AuditDb { @@ -34,6 +35,8 @@ trait AuditDb { def add(paymentRelayed: PaymentRelayed): Unit + def add(liquidityPurchase: LiquidityPurchased): Unit + def add(txPublished: TransactionPublished): Unit def add(txConfirmed: TransactionConfirmed): Unit @@ -52,6 +55,8 @@ trait AuditDb { def listRelayed(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[PaymentRelayed] + def listLiquidityPurchases(remoteNodeId: PublicKey): Seq[LiquidityAds.LiquidityPurchased] + def listNetworkFees(from: TimestampMilli, to: TimestampMilli): Seq[NetworkFee] def stats(from: TimestampMilli, to: TimestampMilli): Seq[Stats] diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala index e241db7a47..93a1a5b6ee 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala @@ -45,6 +45,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL context.system.eventStream.subscribe(self, classOf[PaymentFailed]) context.system.eventStream.subscribe(self, classOf[PaymentReceived]) context.system.eventStream.subscribe(self, classOf[PaymentRelayed]) + context.system.eventStream.subscribe(self, classOf[LiquidityPurchased]) context.system.eventStream.subscribe(self, classOf[TransactionPublished]) context.system.eventStream.subscribe(self, classOf[TransactionConfirmed]) context.system.eventStream.subscribe(self, classOf[ChannelErrorOccurred]) @@ -92,6 +93,8 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL } auditDb.add(e) + case e: LiquidityPurchased => auditDb.add(e) + case e: TransactionPublished => log.info(s"paying mining fee=${e.miningFee} for txid=${e.tx.txid} desc=${e.desc}") auditDb.add(e) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala index bf6eb6e669..f6cb64e681 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala @@ -11,7 +11,7 @@ import fr.acinq.eclair.db.DualDatabases.runAsync import fr.acinq.eclair.payment._ import fr.acinq.eclair.payment.relay.Relayer.RelayFees import fr.acinq.eclair.router.Router -import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAddress, NodeAnnouncement} +import fr.acinq.eclair.wire.protocol._ import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli} import grizzled.slf4j.Logging @@ -151,6 +151,11 @@ case class DualAuditDb(primary: AuditDb, secondary: AuditDb) extends AuditDb { primary.add(paymentRelayed) } + override def add(liquidityPurchase: LiquidityPurchased): Unit = { + runAsync(secondary.add(liquidityPurchase)) + primary.add(liquidityPurchase) + } + override def add(txPublished: TransactionPublished): Unit = { runAsync(secondary.add(txPublished)) primary.add(txPublished) @@ -196,6 +201,11 @@ case class DualAuditDb(primary: AuditDb, secondary: AuditDb) extends AuditDb { primary.listRelayed(from, to, paginated_opt) } + override def listLiquidityPurchases(remoteNodeId: PublicKey): Seq[LiquidityAds.LiquidityPurchased] = { + runAsync(secondary.listLiquidityPurchases(remoteNodeId)) + primary.listLiquidityPurchases(remoteNodeId) + } + override def listNetworkFees(from: TimestampMilli, to: TimestampMilli): Seq[AuditDb.NetworkFee] = { runAsync(secondary.listNetworkFees(from, to)) primary.listNetworkFees(from, to) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala index 0b62cb7af3..d1ba7663c1 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala @@ -17,7 +17,7 @@ package fr.acinq.eclair.db.pg import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey -import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, SatoshiLong, TxId} +import fr.acinq.bitcoin.scalacompat.{ByteVector32, ByteVector64, Satoshi, SatoshiLong, TxId} import fr.acinq.eclair.channel._ import fr.acinq.eclair.db.AuditDb.{NetworkFee, PublishedTransaction, Stats} import fr.acinq.eclair.db.DbEventHandler.ChannelEvent @@ -26,6 +26,7 @@ import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.db._ import fr.acinq.eclair.payment._ import fr.acinq.eclair.transactions.Transactions.PlaceHolderPubKey +import fr.acinq.eclair.wire.protocol.LiquidityAds import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, Paginated, TimestampMilli} import grizzled.slf4j.Logging @@ -36,7 +37,7 @@ import javax.sql.DataSource object PgAuditDb { val DB_NAME = "audit" - val CURRENT_VERSION = 12 + val CURRENT_VERSION = 13 } class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { @@ -114,6 +115,11 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { statement.executeUpdate("CREATE INDEX transactions_published_channel_id_idx ON audit.transactions_published(channel_id)") } + def migration1213(statement: Statement): Unit = { + statement.executeUpdate("CREATE TABLE IF NOT EXISTS audit.liquidity_purchases (tx_id TEXT NOT NULL, channel_id TEXT NOT NULL, node_id TEXT NOT NULL, confirmed BOOLEAN NOT NULL, is_buyer BOOLEAN NOT NULL, amount_sat BIGINT NOT NULL, fee_sat BIGINT NOT NULL, seller_sig TEXT NOT NULL, witness TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS liquidity_purchases_node_id_idx ON audit.liquidity_purchases(node_id)") + } + getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE SCHEMA audit") @@ -125,6 +131,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { statement.executeUpdate("CREATE TABLE audit.channel_events (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, capacity_sat BIGINT NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)") statement.executeUpdate("CREATE TABLE audit.channel_updates (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, fee_base_msat BIGINT NOT NULL, fee_proportional_millionths BIGINT NOT NULL, cltv_expiry_delta BIGINT NOT NULL, htlc_minimum_msat BIGINT NOT NULL, htlc_maximum_msat BIGINT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)") statement.executeUpdate("CREATE TABLE audit.path_finding_metrics (amount_msat BIGINT NOT NULL, fees_msat BIGINT NOT NULL, status TEXT NOT NULL, duration_ms BIGINT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL, is_mpp BOOLEAN NOT NULL, experiment_name TEXT NOT NULL, recipient_node_id TEXT NOT NULL, payment_hash TEXT, routing_hints JSONB)") + statement.executeUpdate("CREATE TABLE audit.liquidity_purchases (tx_id TEXT NOT NULL, channel_id TEXT NOT NULL, node_id TEXT NOT NULL, confirmed BOOLEAN NOT NULL, is_buyer BOOLEAN NOT NULL, amount_sat BIGINT NOT NULL, fee_sat BIGINT NOT NULL, seller_sig TEXT NOT NULL, witness TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)") statement.executeUpdate("CREATE TABLE audit.transactions_published (tx_id TEXT NOT NULL PRIMARY KEY, channel_id TEXT NOT NULL, node_id TEXT NOT NULL, mining_fee_sat BIGINT NOT NULL, tx_type TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)") statement.executeUpdate("CREATE TABLE audit.transactions_confirmed (tx_id TEXT NOT NULL PRIMARY KEY, channel_id TEXT NOT NULL, node_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)") @@ -147,9 +154,10 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { statement.executeUpdate("CREATE INDEX metrics_recipient_idx ON audit.path_finding_metrics(recipient_node_id)") statement.executeUpdate("CREATE INDEX metrics_hash_idx ON audit.path_finding_metrics(payment_hash)") statement.executeUpdate("CREATE INDEX transactions_published_channel_id_idx ON audit.transactions_published(channel_id)") + statement.executeUpdate("CREATE INDEX liquidity_purchases_node_id_idx ON audit.liquidity_purchases(node_id)") statement.executeUpdate("CREATE INDEX transactions_published_timestamp_idx ON audit.transactions_published(timestamp)") statement.executeUpdate("CREATE INDEX transactions_confirmed_timestamp_idx ON audit.transactions_confirmed(timestamp)") - case Some(v@(4 | 5 | 6 | 7 | 8 | 9 | 10 | 11)) => + case Some(v@(4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") if (v < 5) { migration45(statement) @@ -175,6 +183,9 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { if (v < 12) { migration1112(statement) } + if (v < 13) { + migration1213(statement) + } case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } @@ -264,6 +275,24 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { } } + override def add(e: LiquidityPurchased): Unit = withMetrics("audit/add-liquidity-purchase", DbBackends.Postgres) { + inTransaction { pg => + using(pg.prepareStatement("INSERT INTO audit.liquidity_purchases VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) { statement => + statement.setString(1, e.fundingTxId.value.toHex) + statement.setString(2, e.channelId.toHex) + statement.setString(3, e.remoteNodeId.toHex) + statement.setBoolean(4, false) // transaction isn't confirmed yet, we just published it + statement.setBoolean(5, e.purchase.isBuyer) + statement.setLong(6, e.purchase.lease.amount.toLong) + statement.setLong(7, e.purchase.lease.fees.toLong) + statement.setString(8, e.purchase.lease.sellerSig.toHex) + statement.setString(9, LiquidityAds.leaseWitnessCodec.encode(e.purchase.lease.witness).require.bytes.toHex) + statement.setTimestamp(10, Timestamp.from(Instant.now())) + statement.executeUpdate() + } + } + } + override def add(e: TransactionPublished): Unit = withMetrics("audit/add-transaction-published", DbBackends.Postgres) { inTransaction { pg => using(pg.prepareStatement("INSERT INTO audit.transactions_published VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING")) { statement => @@ -287,6 +316,12 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { statement.setTimestamp(4, Timestamp.from(Instant.now())) statement.executeUpdate() } + using(pg.prepareStatement("UPDATE audit.liquidity_purchases SET confirmed=? WHERE node_id=? AND tx_id=?")) { statement => + statement.setBoolean(1, true) + statement.setString(2, e.remoteNodeId.toHex) + statement.setString(3, e.tx.txid.value.toHex) + statement.executeUpdate() + } } } @@ -462,6 +497,26 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging { } } + override def listLiquidityPurchases(remoteNodeId: PublicKey): Seq[LiquidityAds.LiquidityPurchased] = { + inTransaction { pg => + using(pg.prepareStatement("SELECT * FROM audit.liquidity_purchases WHERE confirmed=? AND node_id=?")) { statement => + statement.setBoolean(1, true) + statement.setString(2, remoteNodeId.toHex) + statement.executeQuery().map { rs => + LiquidityAds.LiquidityPurchased( + isBuyer = rs.getBoolean("is_buyer"), + lease = LiquidityAds.Lease( + amount = Satoshi(rs.getLong("amount_sat")), + fees = Satoshi(rs.getLong("fee_sat")), + sellerSig = ByteVector64.fromValidHex(rs.getString("seller_sig")), + witness = LiquidityAds.leaseWitnessCodec.decode(rs.getByteVectorFromHex("witness").bits).require.value, + ) + ) + }.toSeq + } + } + } + override def listNetworkFees(from: TimestampMilli, to: TimestampMilli): Seq[NetworkFee] = inTransaction { pg => using(pg.prepareStatement("SELECT * FROM audit.transactions_confirmed INNER JOIN audit.transactions_published ON audit.transactions_published.tx_id = audit.transactions_confirmed.tx_id WHERE audit.transactions_confirmed.timestamp BETWEEN ? and ? ORDER BY audit.transactions_confirmed.timestamp")) { statement => diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala index 81c0da3e77..accfb822b2 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala @@ -17,7 +17,7 @@ package fr.acinq.eclair.db.sqlite import fr.acinq.bitcoin.scalacompat.Crypto.{PrivateKey, PublicKey} -import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, SatoshiLong, TxId} +import fr.acinq.bitcoin.scalacompat.{ByteVector32, ByteVector64, Satoshi, SatoshiLong, TxId} import fr.acinq.eclair.channel._ import fr.acinq.eclair.db.AuditDb.{NetworkFee, PublishedTransaction, Stats} import fr.acinq.eclair.db.DbEventHandler.ChannelEvent @@ -26,6 +26,7 @@ import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.db._ import fr.acinq.eclair.payment._ import fr.acinq.eclair.transactions.Transactions.PlaceHolderPubKey +import fr.acinq.eclair.wire.protocol.LiquidityAds import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, Paginated, TimestampMilli} import grizzled.slf4j.Logging @@ -34,7 +35,7 @@ import java.util.UUID object SqliteAuditDb { val DB_NAME = "audit" - val CURRENT_VERSION = 9 + val CURRENT_VERSION = 10 } class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging { @@ -114,6 +115,11 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging { statement.executeUpdate("CREATE INDEX transactions_published_channel_id_idx ON transactions_published(channel_id)") } + def migration910(statement: Statement): Unit = { + statement.executeUpdate("CREATE TABLE IF NOT EXISTS liquidity_purchases (tx_id BLOB NOT NULL, channel_id BLOB NOT NULL, node_id BLOB NOT NULL, confirmed BOOLEAN NOT NULL, is_buyer BOOLEAN NOT NULL, amount_sat INTEGER NOT NULL, fee_sat INTEGER NOT NULL, seller_sig BLOB NOT NULL, witness BLOB NOT NULL, timestamp INTEGER NOT NULL)") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS liquidity_purchases_node_id_idx ON liquidity_purchases(node_id)") + } + getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE TABLE sent (amount_msat INTEGER NOT NULL, fees_msat INTEGER NOT NULL, recipient_amount_msat INTEGER NOT NULL, payment_id TEXT NOT NULL, parent_payment_id TEXT NOT NULL, payment_hash BLOB NOT NULL, payment_preimage BLOB NOT NULL, recipient_node_id BLOB NOT NULL, to_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)") @@ -124,6 +130,7 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging { statement.executeUpdate("CREATE TABLE channel_errors (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, error_name TEXT NOT NULL, error_message TEXT NOT NULL, is_fatal INTEGER NOT NULL, timestamp INTEGER NOT NULL)") statement.executeUpdate("CREATE TABLE channel_updates (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, fee_base_msat INTEGER NOT NULL, fee_proportional_millionths INTEGER NOT NULL, cltv_expiry_delta INTEGER NOT NULL, htlc_minimum_msat INTEGER NOT NULL, htlc_maximum_msat INTEGER NOT NULL, timestamp INTEGER NOT NULL)") statement.executeUpdate("CREATE TABLE path_finding_metrics (amount_msat INTEGER NOT NULL, fees_msat INTEGER NOT NULL, status TEXT NOT NULL, duration_ms INTEGER NOT NULL, timestamp INTEGER NOT NULL, is_mpp INTEGER NOT NULL, experiment_name TEXT NOT NULL, recipient_node_id BLOB NOT NULL)") + statement.executeUpdate("CREATE TABLE liquidity_purchases (tx_id BLOB NOT NULL, channel_id BLOB NOT NULL, node_id BLOB NOT NULL, confirmed BOOLEAN NOT NULL, is_buyer BOOLEAN NOT NULL, amount_sat INTEGER NOT NULL, fee_sat INTEGER NOT NULL, seller_sig BLOB NOT NULL, witness BLOB NOT NULL, timestamp INTEGER NOT NULL)") statement.executeUpdate("CREATE TABLE transactions_published (tx_id BLOB NOT NULL PRIMARY KEY, channel_id BLOB NOT NULL, node_id BLOB NOT NULL, mining_fee_sat INTEGER NOT NULL, tx_type TEXT NOT NULL, timestamp INTEGER NOT NULL)") statement.executeUpdate("CREATE TABLE transactions_confirmed (tx_id BLOB NOT NULL PRIMARY KEY, channel_id BLOB NOT NULL, node_id BLOB NOT NULL, timestamp INTEGER NOT NULL)") @@ -143,9 +150,10 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging { statement.executeUpdate("CREATE INDEX metrics_mpp_idx ON path_finding_metrics(is_mpp)") statement.executeUpdate("CREATE INDEX metrics_name_idx ON path_finding_metrics(experiment_name)") statement.executeUpdate("CREATE INDEX transactions_published_channel_id_idx ON transactions_published(channel_id)") + statement.executeUpdate("CREATE INDEX liquidity_purchases_node_id_idx ON liquidity_purchases(node_id)") statement.executeUpdate("CREATE INDEX transactions_published_timestamp_idx ON transactions_published(timestamp)") statement.executeUpdate("CREATE INDEX transactions_confirmed_timestamp_idx ON transactions_confirmed(timestamp)") - case Some(v@(1 | 2 | 3 | 4 | 5 | 6 | 7 | 8)) => + case Some(v@(1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") if (v < 2) { migration12(statement) @@ -171,6 +179,9 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging { if (v < 9) { migration89(statement) } + if (v < 10) { + migration910(statement) + } case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } @@ -252,6 +263,22 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging { } } + override def add(e: LiquidityPurchased): Unit = withMetrics("audit/add-liquidity-purchase", DbBackends.Sqlite) { + using(sqlite.prepareStatement("INSERT INTO liquidity_purchases VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) { statement => + statement.setBytes(1, e.fundingTxId.value.toArray) + statement.setBytes(2, e.channelId.toArray) + statement.setBytes(3, e.remoteNodeId.value.toArray) + statement.setBoolean(4, false) // transaction isn't confirmed yet, we just published it + statement.setBoolean(5, e.purchase.isBuyer) + statement.setLong(6, e.purchase.lease.amount.toLong) + statement.setLong(7, e.purchase.lease.fees.toLong) + statement.setBytes(8, e.purchase.lease.sellerSig.toArray) + statement.setBytes(9, LiquidityAds.leaseWitnessCodec.encode(e.purchase.lease.witness).require.bytes.toArray) + statement.setLong(10, TimestampMilli.now().toLong) + statement.executeUpdate() + } + } + override def add(e: TransactionPublished): Unit = withMetrics("audit/add-transaction-published", DbBackends.Sqlite) { using(sqlite.prepareStatement("INSERT OR IGNORE INTO transactions_published VALUES (?, ?, ?, ?, ?, ?)")) { statement => statement.setBytes(1, e.tx.txid.value.toArray) @@ -272,6 +299,12 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging { statement.setLong(4, TimestampMilli.now().toLong) statement.executeUpdate() } + using(sqlite.prepareStatement("UPDATE liquidity_purchases SET confirmed=? WHERE node_id=? AND tx_id=?")) { statement => + statement.setBoolean(1, true) + statement.setBytes(2, e.remoteNodeId.value.toArray) + statement.setBytes(3, e.tx.txid.value.toArray) + statement.executeUpdate() + } } override def add(e: ChannelErrorOccurred): Unit = withMetrics("audit/add-channel-error", DbBackends.Sqlite) { @@ -432,6 +465,24 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging { } } + override def listLiquidityPurchases(remoteNodeId: PublicKey): Seq[LiquidityAds.LiquidityPurchased] = { + using(sqlite.prepareStatement("SELECT * FROM liquidity_purchases WHERE confirmed=? AND node_id=?")) { statement => + statement.setBoolean(1, true) + statement.setBytes(2, remoteNodeId.value.toArray) + statement.executeQuery().map { rs => + LiquidityAds.LiquidityPurchased( + isBuyer = rs.getBoolean("is_buyer"), + lease = LiquidityAds.Lease( + amount = Satoshi(rs.getLong("amount_sat")), + fees = Satoshi(rs.getLong("fee_sat")), + sellerSig = ByteVector64(rs.getByteVector("seller_sig")), + witness = LiquidityAds.leaseWitnessCodec.decode(rs.getByteVector("witness").bits).require.value, + ) + ) + }.toSeq + } + } + override def listNetworkFees(from: TimestampMilli, to: TimestampMilli): Seq[NetworkFee] = using(sqlite.prepareStatement("SELECT * FROM transactions_confirmed INNER JOIN transactions_published ON transactions_published.tx_id = transactions_confirmed.tx_id WHERE transactions_confirmed.timestamp >= ? AND transactions_confirmed.timestamp < ? ORDER BY transactions_confirmed.timestamp")) { statement => statement.setLong(1, from.toLong) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LiquidityAds.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LiquidityAds.scala index 6fc0a6ac11..28d44f1aef 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LiquidityAds.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LiquidityAds.scala @@ -191,7 +191,7 @@ object LiquidityAds { } } - private val leaseWitnessCodec: Codec[LeaseWitness] = ( + val leaseWitnessCodec: Codec[LeaseWitness] = ( ("tag" | constant(ByteVector("option_will_fund".getBytes(StandardCharsets.US_ASCII)))) :: ("funding_pubkey" | publicKey) :: ("lease_end" | blockHeight) :: diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForDualFundingConfirmedStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForDualFundingConfirmedStateSpec.scala index 011e708582..ff5cfa6e7e 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForDualFundingConfirmedStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForDualFundingConfirmedStateSpec.scala @@ -391,12 +391,20 @@ class WaitForDualFundingConfirmedStateSpec extends TestKitBaseClass with Fixture val balanceBob1 = bob.stateData.asInstanceOf[DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED].commitments.latest.localCommit.spec.toLocal assert(alice.stateData.asInstanceOf[DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED].previousFundingTxs.isEmpty) + val eventListener = TestProbe() + systemA.eventStream.subscribe(eventListener.ref, classOf[LiquidityPurchased]) + val feerate2 = FeeratePerKw(12_500 sat) - testBumpFundingFees(f, Some(feerate2), Some(LiquidityAds.RequestRemoteFunding(remoteFunding, 20_000 sat, alice.underlyingActor.nodeParams.currentBlockHeight, 2016))) + val rbfTx = testBumpFundingFees(f, Some(feerate2), Some(LiquidityAds.RequestRemoteFunding(remoteFunding, 20_000 sat, alice.underlyingActor.nodeParams.currentBlockHeight, 2016))) val liquidityFee2 = bob.underlyingActor.nodeParams.liquidityRates_opt.get.fees(feerate2, remoteFunding, remoteFunding) val balanceBob2 = bob.stateData.asInstanceOf[DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED].commitments.latest.localCommit.spec.toLocal assert(liquidityFee1 < liquidityFee2) assert(balanceBob1 + liquidityFee2 - liquidityFee1 == balanceBob2) + val event = eventListener.expectMsgType[LiquidityPurchased] + assert(event.fundingTxId == rbfTx.txId) + assert(event.purchase.isBuyer) + assert(event.purchase.lease.amount == remoteFunding) + assert(event.purchase.lease.fees == liquidityFee2) // The second RBF attempt removes the liquidity request. val feerate3 = FeeratePerKw(15_000 sat) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala index a1634e7cdf..22f27d4026 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala @@ -307,6 +307,11 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik test("recv CMD_SPLICE (splice-in, liquidity ads)", Tag(ChannelStateTestsTags.Quiescence)) { f => import f._ + val eventListenerA = TestProbe() + systemA.eventStream.subscribe(eventListenerA.ref, classOf[LiquidityPurchased]) + val eventListenerB = TestProbe() + systemB.eventStream.subscribe(eventListenerB.ref, classOf[LiquidityPurchased]) + val sender = TestProbe() val fundingRequest = LiquidityAds.RequestRemoteFunding(400_000 sat, 10_000 sat, alice.nodeParams.currentBlockHeight, 2016) val cmd = CMD_SPLICE(sender.ref, Some(SpliceIn(500_000 sat)), None, Some(fundingRequest)) @@ -335,12 +340,21 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik bob2alice.forward(alice) alice2bob.expectMsgType[TxComplete] alice2bob.forward(bob) - exchangeSpliceSigs(alice, bob, alice2bob, bob2alice, sender) + val spliceTx = exchangeSpliceSigs(alice, bob, alice2bob, bob2alice, sender) // Alice paid fees to Bob for the additional liquidity. assert(alice.stateData.asInstanceOf[DATA_NORMAL].commitments.latest.capacity == 2_400_000.sat) assert(alice.stateData.asInstanceOf[DATA_NORMAL].commitments.latest.localCommit.spec.toLocal < 1_300_000_000.msat) assert(alice.stateData.asInstanceOf[DATA_NORMAL].commitments.latest.localCommit.spec.toRemote > 1_100_000_000.msat) + + val eventA = eventListenerA.expectMsgType[LiquidityPurchased] + assert(eventA.fundingTxId == spliceTx.txid) + assert(eventA.purchase.isBuyer) + assert(eventA.purchase.lease.amount == fundingRequest.fundingAmount) + assert(eventA.purchase.lease.fees > 0.sat) + val eventB = eventListenerB.expectMsgType[LiquidityPurchased] + assert(!eventB.purchase.isBuyer) + assert(eventB.purchase.copy(isBuyer = true) == eventA.purchase) } test("recv CMD_SPLICE (splice-in, liquidity ads, fee too high)", Tag(ChannelStateTestsTags.Quiescence)) { f => diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala index a66d344b1e..d961f5ed16 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala @@ -32,7 +32,7 @@ import fr.acinq.eclair.payment.Bolt11Invoice.ExtraHop import fr.acinq.eclair.payment._ import fr.acinq.eclair.router.Announcements import fr.acinq.eclair.transactions.Transactions.PlaceHolderPubKey -import fr.acinq.eclair.wire.protocol.Error +import fr.acinq.eclair.wire.protocol.{Error, LiquidityAds} import org.scalatest.Tag import org.scalatest.funsuite.AnyFunSuite import scodec.bits.HexStringSyntax @@ -132,6 +132,38 @@ class AuditDbSpec extends AnyFunSuite { } } + test("add/list liquidity events") { + forAllDbs { dbs => + val db = dbs.audit + val (nodeId1, nodeId2) = (randomKey().publicKey, randomKey().publicKey) + val confirmedFundingTx = Transaction(2, Nil, Seq(TxOut(150_000 sat, Script.pay2wpkh(randomKey().publicKey))), 0) + val unconfirmedFundingTx = Transaction(2, Nil, Seq(TxOut(100_000 sat, Script.pay2wpkh(randomKey().publicKey))), 0) + val e1a = LiquidityPurchased(null, randomBytes32(), nodeId1, confirmedFundingTx.txid, LiquidityAds.LiquidityPurchased(isBuyer = true, LiquidityAds.Lease(250_000 sat, 5_000 sat, randomBytes64(), LiquidityAds.LeaseWitness(randomKey().publicKey, BlockHeight(500_000), 1000, 100, 5 msat)))) + val e1b = LiquidityPurchased(null, randomBytes32(), nodeId1, confirmedFundingTx.txid, LiquidityAds.LiquidityPurchased(isBuyer = false, LiquidityAds.Lease(50_000 sat, 1_000 sat, randomBytes64(), LiquidityAds.LeaseWitness(randomKey().publicKey, BlockHeight(600_000), 2000, 150, 10 msat)))) + val e1c = LiquidityPurchased(null, e1b.channelId, nodeId1, confirmedFundingTx.txid, LiquidityAds.LiquidityPurchased(isBuyer = false, LiquidityAds.Lease(150_000 sat, 2_000 sat, randomBytes64(), LiquidityAds.LeaseWitness(randomKey().publicKey, BlockHeight(610_000), 1500, 100, 0 msat)))) + val e1d = LiquidityPurchased(null, randomBytes32(), nodeId1, unconfirmedFundingTx.txid, LiquidityAds.LiquidityPurchased(isBuyer = true, LiquidityAds.Lease(250_000 sat, 5_000 sat, randomBytes64(), LiquidityAds.LeaseWitness(randomKey().publicKey, BlockHeight(625_000), 500, 50, 25 msat)))) + val e2a = LiquidityPurchased(null, randomBytes32(), nodeId2, confirmedFundingTx.txid, LiquidityAds.LiquidityPurchased(isBuyer = false, LiquidityAds.Lease(200_000 sat, 2_500 sat, randomBytes64(), LiquidityAds.LeaseWitness(randomKey().publicKey, BlockHeight(500_000), 2016, 0, 1 msat)))) + val e2b = LiquidityPurchased(null, randomBytes32(), nodeId2, unconfirmedFundingTx.txid, LiquidityAds.LiquidityPurchased(isBuyer = false, LiquidityAds.Lease(200_000 sat, 2_500 sat, randomBytes64(), LiquidityAds.LeaseWitness(randomKey().publicKey, BlockHeight(500_000), 2016, 0, 1 msat)))) + + db.add(e1a) + db.add(e1b) + db.add(e1c) + db.add(e1d) + db.add(e2a) + db.add(e2b) + + // The liquidity purchase is confirmed only once the corresponding transaction confirms. + assert(db.listLiquidityPurchases(nodeId1).isEmpty) + assert(db.listLiquidityPurchases(nodeId2).isEmpty) + + db.add(TransactionConfirmed(randomBytes32(), nodeId1, confirmedFundingTx)) + db.add(TransactionConfirmed(randomBytes32(), nodeId2, confirmedFundingTx)) + + assert(db.listLiquidityPurchases(nodeId1).toSet == Set(e1a, e1b, e1c).map(_.purchase)) + assert(db.listLiquidityPurchases(nodeId2) == Seq(e2a.purchase)) + } + } + test("stats") { forAllDbs { dbs => val db = dbs.audit