Skip to content

Commit

Permalink
Store liquidity purchases in AuditDb
Browse files Browse the repository at this point in the history
We store every liquidity purchase (whether we're buyer or seller).
This is important information when choosing which peers are worth keeping
channels with.
  • Loading branch information
t-bast committed Mar 29, 2024
1 parent 301ba99 commit 73325fc
Show file tree
Hide file tree
Showing 11 changed files with 196 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ package fr.acinq.eclair.channel

import akka.actor.ActorRef
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, Transaction}
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, Transaction, TxId}
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.channel.Helpers.Closing.ClosingType
import fr.acinq.eclair.io.Peer.OpenChannelResponse
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate}
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, LiquidityAds}
import fr.acinq.eclair.{BlockHeight, Features, ShortChannelId}

/**
Expand Down Expand Up @@ -79,6 +78,8 @@ case class ChannelSignatureSent(channel: ActorRef, commitments: Commitments) ext

case class ChannelSignatureReceived(channel: ActorRef, commitments: Commitments) extends ChannelEvent

case class LiquidityPurchased(channel: ActorRef, channelId: ByteVector32, remoteNodeId: PublicKey, fundingTxId: TxId, purchase: LiquidityAds.LiquidityPurchased) extends ChannelEvent

case class ChannelErrorOccurred(channel: ActorRef, channelId: ByteVector32, remoteNodeId: PublicKey, error: ChannelError, isFatal: Boolean) extends ChannelEvent

// NB: the fee should be set to 0 when we're not paying it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package fr.acinq.eclair.channel.fund

import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.adapter.TypedActorRefOps
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, StashBuffer}
import akka.actor.typed.{ActorRef, Behavior}
import akka.event.LoggingAdapter
Expand Down Expand Up @@ -778,6 +780,7 @@ private class InteractiveTxBuilder(replyTo: ActorRef[InteractiveTxBuilder.Respon
Behaviors.receiveMessagePartial {
case SignTransactionResult(signedTx) =>
log.info(s"interactive-tx txid=${signedTx.txId} partially signed with {} local inputs, {} remote inputs, {} local outputs and {} remote outputs", signedTx.tx.localInputs.length, signedTx.tx.remoteInputs.length, signedTx.tx.localOutputs.length, signedTx.tx.remoteOutputs.length)
liquidityPurchased_opt.foreach(purchase => context.system.eventStream ! EventStream.Publish(LiquidityPurchased(replyTo.toClassic, channelParams.channelId, remoteNodeId, signedTx.txId, purchase)))
replyTo ! Succeeded(InteractiveTxSigningSession.WaitingForSigs(fundingParams, purpose.fundingTxIndex, signedTx, Left(localCommit), remoteCommit), commitSig)
Behaviors.stopped
case WalletFailure(t) =>
Expand Down
5 changes: 5 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/AuditDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.AuditDb.{NetworkFee, PublishedTransaction, Stats}
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.payment.{PathFindingExperimentMetrics, PaymentReceived, PaymentRelayed, PaymentSent}
import fr.acinq.eclair.wire.protocol.LiquidityAds
import fr.acinq.eclair.{Paginated, TimestampMilli}

trait AuditDb {
Expand All @@ -34,6 +35,8 @@ trait AuditDb {

def add(paymentRelayed: PaymentRelayed): Unit

def add(liquidityPurchase: LiquidityPurchased): Unit

def add(txPublished: TransactionPublished): Unit

def add(txConfirmed: TransactionConfirmed): Unit
Expand All @@ -52,6 +55,8 @@ trait AuditDb {

def listRelayed(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[PaymentRelayed]

def listLiquidityPurchases(remoteNodeId: PublicKey): Seq[LiquidityAds.LiquidityPurchased]

def listNetworkFees(from: TimestampMilli, to: TimestampMilli): Seq[NetworkFee]

def stats(from: TimestampMilli, to: TimestampMilli): Seq[Stats]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL
context.system.eventStream.subscribe(self, classOf[PaymentFailed])
context.system.eventStream.subscribe(self, classOf[PaymentReceived])
context.system.eventStream.subscribe(self, classOf[PaymentRelayed])
context.system.eventStream.subscribe(self, classOf[LiquidityPurchased])
context.system.eventStream.subscribe(self, classOf[TransactionPublished])
context.system.eventStream.subscribe(self, classOf[TransactionConfirmed])
context.system.eventStream.subscribe(self, classOf[ChannelErrorOccurred])
Expand Down Expand Up @@ -92,6 +93,8 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL
}
auditDb.add(e)

case e: LiquidityPurchased => auditDb.add(e)

case e: TransactionPublished =>
log.info(s"paying mining fee=${e.miningFee} for txid=${e.tx.txid} desc=${e.desc}")
auditDb.add(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import fr.acinq.eclair.db.DualDatabases.runAsync
import fr.acinq.eclair.payment._
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
import fr.acinq.eclair.router.Router
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAddress, NodeAnnouncement}
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli}
import grizzled.slf4j.Logging

Expand Down Expand Up @@ -151,6 +151,11 @@ case class DualAuditDb(primary: AuditDb, secondary: AuditDb) extends AuditDb {
primary.add(paymentRelayed)
}

override def add(liquidityPurchase: LiquidityPurchased): Unit = {
runAsync(secondary.add(liquidityPurchase))
primary.add(liquidityPurchase)
}

override def add(txPublished: TransactionPublished): Unit = {
runAsync(secondary.add(txPublished))
primary.add(txPublished)
Expand Down Expand Up @@ -196,6 +201,11 @@ case class DualAuditDb(primary: AuditDb, secondary: AuditDb) extends AuditDb {
primary.listRelayed(from, to, paginated_opt)
}

override def listLiquidityPurchases(remoteNodeId: PublicKey): Seq[LiquidityAds.LiquidityPurchased] = {
runAsync(secondary.listLiquidityPurchases(remoteNodeId))
primary.listLiquidityPurchases(remoteNodeId)
}

override def listNetworkFees(from: TimestampMilli, to: TimestampMilli): Seq[AuditDb.NetworkFee] = {
runAsync(secondary.listNetworkFees(from, to))
primary.listNetworkFees(from, to)
Expand Down
61 changes: 58 additions & 3 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package fr.acinq.eclair.db.pg

import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, SatoshiLong, TxId}
import fr.acinq.bitcoin.scalacompat.{ByteVector32, ByteVector64, Satoshi, SatoshiLong, TxId}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.AuditDb.{NetworkFee, PublishedTransaction, Stats}
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
Expand All @@ -26,6 +26,7 @@ import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
import fr.acinq.eclair.db._
import fr.acinq.eclair.payment._
import fr.acinq.eclair.transactions.Transactions.PlaceHolderPubKey
import fr.acinq.eclair.wire.protocol.LiquidityAds
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, Paginated, TimestampMilli}
import grizzled.slf4j.Logging

Expand All @@ -36,7 +37,7 @@ import javax.sql.DataSource

object PgAuditDb {
val DB_NAME = "audit"
val CURRENT_VERSION = 12
val CURRENT_VERSION = 13
}

class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
Expand Down Expand Up @@ -114,6 +115,11 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
statement.executeUpdate("CREATE INDEX transactions_published_channel_id_idx ON audit.transactions_published(channel_id)")
}

def migration1213(statement: Statement): Unit = {
statement.executeUpdate("CREATE TABLE IF NOT EXISTS audit.liquidity_purchases (tx_id TEXT NOT NULL, channel_id TEXT NOT NULL, node_id TEXT NOT NULL, confirmed BOOLEAN NOT NULL, is_buyer BOOLEAN NOT NULL, amount_sat BIGINT NOT NULL, fee_sat BIGINT NOT NULL, seller_sig TEXT NOT NULL, witness TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS liquidity_purchases_node_id_idx ON audit.liquidity_purchases(node_id)")
}

getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE SCHEMA audit")
Expand All @@ -125,6 +131,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
statement.executeUpdate("CREATE TABLE audit.channel_events (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, capacity_sat BIGINT NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.channel_updates (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, fee_base_msat BIGINT NOT NULL, fee_proportional_millionths BIGINT NOT NULL, cltv_expiry_delta BIGINT NOT NULL, htlc_minimum_msat BIGINT NOT NULL, htlc_maximum_msat BIGINT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.path_finding_metrics (amount_msat BIGINT NOT NULL, fees_msat BIGINT NOT NULL, status TEXT NOT NULL, duration_ms BIGINT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL, is_mpp BOOLEAN NOT NULL, experiment_name TEXT NOT NULL, recipient_node_id TEXT NOT NULL, payment_hash TEXT, routing_hints JSONB)")
statement.executeUpdate("CREATE TABLE audit.liquidity_purchases (tx_id TEXT NOT NULL, channel_id TEXT NOT NULL, node_id TEXT NOT NULL, confirmed BOOLEAN NOT NULL, is_buyer BOOLEAN NOT NULL, amount_sat BIGINT NOT NULL, fee_sat BIGINT NOT NULL, seller_sig TEXT NOT NULL, witness TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.transactions_published (tx_id TEXT NOT NULL PRIMARY KEY, channel_id TEXT NOT NULL, node_id TEXT NOT NULL, mining_fee_sat BIGINT NOT NULL, tx_type TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.transactions_confirmed (tx_id TEXT NOT NULL PRIMARY KEY, channel_id TEXT NOT NULL, node_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")

Expand All @@ -147,9 +154,10 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
statement.executeUpdate("CREATE INDEX metrics_recipient_idx ON audit.path_finding_metrics(recipient_node_id)")
statement.executeUpdate("CREATE INDEX metrics_hash_idx ON audit.path_finding_metrics(payment_hash)")
statement.executeUpdate("CREATE INDEX transactions_published_channel_id_idx ON audit.transactions_published(channel_id)")
statement.executeUpdate("CREATE INDEX liquidity_purchases_node_id_idx ON audit.liquidity_purchases(node_id)")
statement.executeUpdate("CREATE INDEX transactions_published_timestamp_idx ON audit.transactions_published(timestamp)")
statement.executeUpdate("CREATE INDEX transactions_confirmed_timestamp_idx ON audit.transactions_confirmed(timestamp)")
case Some(v@(4 | 5 | 6 | 7 | 8 | 9 | 10 | 11)) =>
case Some(v@(4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12)) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
if (v < 5) {
migration45(statement)
Expand All @@ -175,6 +183,9 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
if (v < 12) {
migration1112(statement)
}
if (v < 13) {
migration1213(statement)
}
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
Expand Down Expand Up @@ -264,6 +275,24 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
}
}

override def add(e: LiquidityPurchased): Unit = withMetrics("audit/add-liquidity-purchase", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO audit.liquidity_purchases VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) { statement =>
statement.setString(1, e.fundingTxId.value.toHex)
statement.setString(2, e.channelId.toHex)
statement.setString(3, e.remoteNodeId.toHex)
statement.setBoolean(4, false) // transaction isn't confirmed yet, we just published it
statement.setBoolean(5, e.purchase.isBuyer)
statement.setLong(6, e.purchase.lease.amount.toLong)
statement.setLong(7, e.purchase.lease.fees.toLong)
statement.setString(8, e.purchase.lease.sellerSig.toHex)
statement.setString(9, LiquidityAds.leaseWitnessCodec.encode(e.purchase.lease.witness).require.bytes.toHex)
statement.setTimestamp(10, Timestamp.from(Instant.now()))
statement.executeUpdate()
}
}
}

override def add(e: TransactionPublished): Unit = withMetrics("audit/add-transaction-published", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO audit.transactions_published VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING")) { statement =>
Expand All @@ -287,6 +316,12 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
statement.setTimestamp(4, Timestamp.from(Instant.now()))
statement.executeUpdate()
}
using(pg.prepareStatement("UPDATE audit.liquidity_purchases SET confirmed=? WHERE node_id=? AND tx_id=?")) { statement =>
statement.setBoolean(1, true)
statement.setString(2, e.remoteNodeId.toHex)
statement.setString(3, e.tx.txid.value.toHex)
statement.executeUpdate()
}
}
}

Expand Down Expand Up @@ -462,6 +497,26 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
}
}

override def listLiquidityPurchases(remoteNodeId: PublicKey): Seq[LiquidityAds.LiquidityPurchased] = {
inTransaction { pg =>
using(pg.prepareStatement("SELECT * FROM audit.liquidity_purchases WHERE confirmed=? AND node_id=?")) { statement =>
statement.setBoolean(1, true)
statement.setString(2, remoteNodeId.toHex)
statement.executeQuery().map { rs =>
LiquidityAds.LiquidityPurchased(
isBuyer = rs.getBoolean("is_buyer"),
lease = LiquidityAds.Lease(
amount = Satoshi(rs.getLong("amount_sat")),
fees = Satoshi(rs.getLong("fee_sat")),
sellerSig = ByteVector64.fromValidHex(rs.getString("seller_sig")),
witness = LiquidityAds.leaseWitnessCodec.decode(rs.getByteVectorFromHex("witness").bits).require.value,
)
)
}.toSeq
}
}
}

override def listNetworkFees(from: TimestampMilli, to: TimestampMilli): Seq[NetworkFee] =
inTransaction { pg =>
using(pg.prepareStatement("SELECT * FROM audit.transactions_confirmed INNER JOIN audit.transactions_published ON audit.transactions_published.tx_id = audit.transactions_confirmed.tx_id WHERE audit.transactions_confirmed.timestamp BETWEEN ? and ? ORDER BY audit.transactions_confirmed.timestamp")) { statement =>
Expand Down
Loading

0 comments on commit 73325fc

Please sign in to comment.