From c9c826db1253dceb969f625e2c0e82649d09e1c7 Mon Sep 17 00:00:00 2001 From: sstone Date: Thu, 29 Jun 2023 19:43:39 +0200 Subject: [PATCH] Retrieve onchain feerate when we connect to an electrum server When we connect to an electrum sever we perform a "handshake" that includes exchanging protocol version messages and retrieving the server's current tip, and now we also retrieve onchain fees. Since this is done during the connection handshake, errors will be caught by the corouting exception handler that we use in the client and will not crash the application. --- build.gradle.kts | 2 +- .../blockchain/electrum/ElectrumClient.kt | 44 ++++++++++++---- .../kotlin/fr/acinq/lightning/io/Peer.kt | 28 ++--------- .../kotlin/fr/acinq/lightning/io/TcpSocket.kt | 23 +++++---- .../blockchain/electrum/ElectrumClientTest.kt | 50 +++++++++++++++++-- 5 files changed, 98 insertions(+), 49 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index 6ef7cbb70..621c0124d 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -11,7 +11,7 @@ plugins { allprojects { group = "fr.acinq.lightning" - version = "1.5.0" + version = "1.5.1-SNAPSHOT" repositories { // using the local maven repository with Kotlin Multi Platform can lead to build errors that are hard to diagnose. diff --git a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClient.kt b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClient.kt index 31af75ba6..162520f96 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClient.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClient.kt @@ -1,8 +1,10 @@ package fr.acinq.lightning.blockchain.electrum import fr.acinq.bitcoin.* +import fr.acinq.lightning.blockchain.fee.FeeratePerByte +import fr.acinq.lightning.blockchain.fee.FeeratePerKw +import fr.acinq.lightning.blockchain.fee.OnChainFeerates import fr.acinq.lightning.io.TcpSocket -import fr.acinq.lightning.io.linesFlow import fr.acinq.lightning.io.send import fr.acinq.lightning.utils.* import kotlinx.coroutines.* @@ -23,14 +25,15 @@ sealed interface ElectrumClientCommand { sealed interface ElectrumConnectionStatus { data class Closed(val reason: TcpSocket.IOException?) : ElectrumConnectionStatus object Connecting : ElectrumConnectionStatus - data class Connected(val version: ServerVersionResponse, val height: Int, val header: BlockHeader) : ElectrumConnectionStatus + data class Connected(val version: ServerVersionResponse, val height: Int, val header: BlockHeader, val onchainFeeRates: OnChainFeerates) : ElectrumConnectionStatus } @OptIn(ExperimentalCoroutinesApi::class) class ElectrumClient( socketBuilder: TcpSocket.Builder?, scope: CoroutineScope, - private val loggerFactory: LoggerFactory + private val loggerFactory: LoggerFactory, + exceptionHandler_opt: CoroutineExceptionHandler? = null ) : CoroutineScope by scope, IElectrumClient { private val logger = loggerFactory.newLogger(this::class) @@ -99,9 +102,11 @@ class ElectrumClient( } } - private fun establishConnection(serverAddress: ServerAddress) = launch(CoroutineExceptionHandler { _, exception -> + val exceptionHandler = exceptionHandler_opt ?: CoroutineExceptionHandler { _, exception -> logger.error(exception) { "error starting electrum client" } - }) { + } + + private fun establishConnection(serverAddress: ServerAddress) = launch(exceptionHandler) { _connectionStatus.value = ElectrumConnectionStatus.Connecting val socket: TcpSocket = try { val (host, port, tls) = serverAddress @@ -139,22 +144,41 @@ class ElectrumClient( } val flow = socket.linesFlow().map { json.decodeFromString(ElectrumResponseDeserializer, it) } - val version = ServerVersion() - sendRequest(version, 0) val rpcFlow = flow.filterIsInstance>().map { it.value } + var requestId = 0 + + val version = ServerVersion() + sendRequest(version, requestId++) val theirVersion = parseJsonResponse(version, rpcFlow.first()) require(theirVersion is ServerVersionResponse) { "invalid server version response $theirVersion" } logger.info { "server version $theirVersion" } - sendRequest(HeaderSubscription, 0) + + sendRequest(HeaderSubscription, requestId++) val header = parseJsonResponse(HeaderSubscription, rpcFlow.first()) require(header is HeaderSubscriptionResponse) { "invalid header subscription response $header" } + + suspend fun estimateFee(confirmations: Int): EstimateFeeResponse { + val request = EstimateFees(confirmations) + sendRequest(request, requestId++) + val response = parseJsonResponse(request, rpcFlow.first()) + require(response is EstimateFeeResponse) { "invalid estimatefee response $response" } + return response + } + + val fees = listOf(estimateFee(2), estimateFee(6), estimateFee(18), estimateFee(144)) + logger.info { "onchain fees $fees" } + val feeRates = OnChainFeerates( + fundingFeerate = fees[3].feerate ?: FeeratePerKw(FeeratePerByte(2.sat)), + mutualCloseFeerate = fees[2].feerate ?: FeeratePerKw(FeeratePerByte(10.sat)), + claimMainFeerate = fees[1].feerate ?: FeeratePerKw(FeeratePerByte(20.sat)), + fastFeerate = fees[0].feerate ?: FeeratePerKw(FeeratePerByte(50.sat)) + ) _notifications.emit(header) - _connectionStatus.value = ElectrumConnectionStatus.Connected(theirVersion, header.blockHeight, header.header) + _connectionStatus.value = ElectrumConnectionStatus.Connected(theirVersion, header.blockHeight, header.header, feeRates) logger.info { "server tip $header" } // pending requests map val requestMap = mutableMapOf>>() - var requestId = 0 // reset mailbox mailbox.cancel(CancellationException("connection in progress")) diff --git a/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt b/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt index ac58377af..355865217 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt @@ -195,11 +195,11 @@ class Peer( } } launch { - watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.collect { - // onchain fees are retrieved punctually, when electrum status moves to Connection.ESTABLISHED - // since the application is not running most of the time, and when it is, it will be only for a few minutes, this is good enough. - // (for a node that is online most of the time things would be different and we would need to re-evaluate onchain fee estimates on a regular basis) - updateEstimateFees() + watcher.client.connectionStatus.filterIsInstance().collect { + // Onchain fees are retrieved once when we establish a connection to an electrum server. + // It is acceptable since the application will typically not be running more than a few minutes at a time. + // (for a node that is online most of the time things would be different, and we would need to re-evaluate onchain fee estimates on a regular basis) + onChainFeeratesFlow.value = it.onchainFeeRates } } launch { @@ -257,24 +257,6 @@ class Peer( } } - private suspend fun updateEstimateFees() { - watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.first() - val sortedFees = listOf( - watcher.client.estimateFees(2), - watcher.client.estimateFees(6), - watcher.client.estimateFees(18), - watcher.client.estimateFees(144), - ) - logger.info { "on-chain fees: $sortedFees" } - // TODO: If some feerates are null, we may implement a retry - onChainFeeratesFlow.value = OnChainFeerates( - fundingFeerate = sortedFees[3].feerate ?: FeeratePerKw(FeeratePerByte(2.sat)), - mutualCloseFeerate = sortedFees[2].feerate ?: FeeratePerKw(FeeratePerByte(10.sat)), - claimMainFeerate = sortedFees[1].feerate ?: FeeratePerKw(FeeratePerByte(20.sat)), - fastFeerate = sortedFees[0].feerate ?: FeeratePerKw(FeeratePerByte(50.sat)) - ) - } - fun connect() { if (connectionState.value is Connection.CLOSED) establishConnection() else logger.warning { "Peer is already connecting / connected" } diff --git a/src/commonMain/kotlin/fr/acinq/lightning/io/TcpSocket.kt b/src/commonMain/kotlin/fr/acinq/lightning/io/TcpSocket.kt index 7d9e01f0e..68aab9ba4 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/io/TcpSocket.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/io/TcpSocket.kt @@ -21,6 +21,18 @@ interface TcpSocket { suspend fun receiveFully(buffer: ByteArray, offset: Int, length: Int) suspend fun receiveAvailable(buffer: ByteArray, offset: Int, length: Int): Int + fun linesFlow(): Flow { + return flow { + val buffer = ByteArray(8192) + while (true) { + val size = receiveAvailable(buffer) + emit(buffer.subArray(size)) + } + } + .decodeToString() + .splitByLines() + } + suspend fun startTls(tls: TLS): TcpSocket fun close() @@ -71,14 +83,3 @@ internal expect object PlatformSocketBuilder : TcpSocket.Builder suspend fun TcpSocket.receiveFully(size: Int): ByteArray = ByteArray(size).also { receiveFully(it) } - -fun TcpSocket.linesFlow(): Flow = - flow { - val buffer = ByteArray(8192) - while (true) { - val size = receiveAvailable(buffer) - emit(buffer.subArray(size)) - } - } - .decodeToString() - .splitByLines() \ No newline at end of file diff --git a/src/commonTest/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientTest.kt b/src/commonTest/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientTest.kt index b68bcd5b4..4718f5b4c 100644 --- a/src/commonTest/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientTest.kt +++ b/src/commonTest/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientTest.kt @@ -2,15 +2,16 @@ package fr.acinq.lightning.blockchain.electrum import fr.acinq.bitcoin.* import fr.acinq.lightning.blockchain.fee.FeeratePerKw +import fr.acinq.lightning.io.TcpSocket import fr.acinq.lightning.tests.utils.LightningTestSuite import fr.acinq.lightning.tests.utils.runSuspendTest import fr.acinq.lightning.utils.Connection +import fr.acinq.lightning.utils.ServerAddress import fr.acinq.lightning.utils.toByteVector32 import fr.acinq.secp256k1.Hex -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.flow.first -import kotlinx.coroutines.joinAll -import kotlinx.coroutines.launch +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.kodein.log.LoggerFactory import kotlin.test.* import kotlin.time.Duration.Companion.seconds @@ -177,4 +178,45 @@ class ElectrumClientTest : LightningTestSuite() { client.stop() } + + @OptIn(DelicateCoroutinesApi::class) + @Test + fun `catch coroutine errors`() { + val myCustomError = "this is a test error" + + class MyTcpSopcket(val socket: TcpSocket) : TcpSocket by socket { + override fun linesFlow(): Flow { + return super.linesFlow().map { + // during the handshake with the electrum server we first ask for the server version, then headers, fee rates + // so id == 2 means we're asking for fee rates, and here we return an error + val sendError = it.contains("\"id\": 2") + if (sendError) { + """{"jsonrpc": "2.0", "error": {"code": 42, "message": "$myCustomError"}, "id": 2}""" + } else { + it + } + } + } + } + + class MyBuilder(val builder: TcpSocket.Builder) : TcpSocket.Builder { + override suspend fun connect(host: String, port: Int, tls: TcpSocket.TLS, loggerFactory: LoggerFactory): TcpSocket { + val socket = builder.connect(host, port, tls, loggerFactory) + return MyTcpSopcket(socket) + } + } + + runBlocking { + val builder = MyBuilder(TcpSocket.Builder()) + val errorFlow = MutableStateFlow(null) + val myErrorHandler = CoroutineExceptionHandler { _, e -> errorFlow.value = e } + val client = ElectrumClient(builder, GlobalScope, LoggerFactory.default, myErrorHandler) + client.connect(ServerAddress("electrum.acinq.co", 50002, TcpSocket.TLS.UNSAFE_CERTIFICATES)) + client.connectionState.first { it is Connection.CLOSED } + client.connectionState.first { it is Connection.ESTABLISHING } + val error = errorFlow.filterNotNull().first() + assertTrue(error.message!!.contains(myCustomError)) + client.stop() + } + } }