From fed39c79e797c42f7de02170aab706939408e07c Mon Sep 17 00:00:00 2001 From: sstone Date: Thu, 29 Jun 2023 19:43:39 +0200 Subject: [PATCH] Retrieve onchain feerate when we connect to an electrum server When we connect to an electrum sever we perform a "handshake" that includes exchanging protocol version messages and retrieving the server's current tip, and now we also retrieve onchain fees. We also add an extra, optional CoroutineExceptionHandler to ElectrumClient's constructor, which can be used for testing or to specify a different behaviour to the one that is currently hard-coded. Since this is done during the connection handshake, errors will be caught by the corouting exception handler that we use in the client and will not crash the application. --- .../blockchain/electrum/ElectrumClient.kt | 46 +++++++--- .../kotlin/fr/acinq/lightning/io/Peer.kt | 28 ++---- .../kotlin/fr/acinq/lightning/io/TcpSocket.kt | 20 +++-- .../blockchain/electrum/ElectrumClientTest.kt | 85 ++++++++++++++++++- 4 files changed, 133 insertions(+), 46 deletions(-) diff --git a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClient.kt b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClient.kt index 612d39f1c..e59a5d450 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClient.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClient.kt @@ -1,8 +1,10 @@ package fr.acinq.lightning.blockchain.electrum import fr.acinq.bitcoin.* +import fr.acinq.lightning.blockchain.fee.FeeratePerByte +import fr.acinq.lightning.blockchain.fee.FeeratePerKw +import fr.acinq.lightning.blockchain.fee.OnChainFeerates import fr.acinq.lightning.io.TcpSocket -import fr.acinq.lightning.io.linesFlow import fr.acinq.lightning.io.send import fr.acinq.lightning.utils.* import kotlinx.coroutines.* @@ -23,13 +25,14 @@ sealed interface ElectrumClientCommand { sealed interface ElectrumConnectionStatus { data class Closed(val reason: TcpSocket.IOException?) : ElectrumConnectionStatus object Connecting : ElectrumConnectionStatus - data class Connected(val version: ServerVersionResponse, val height: Int, val header: BlockHeader) : ElectrumConnectionStatus + data class Connected(val version: ServerVersionResponse, val height: Int, val header: BlockHeader, val onchainFeeRates: OnChainFeerates) : ElectrumConnectionStatus } class ElectrumClient( socketBuilder: TcpSocket.Builder?, scope: CoroutineScope, - private val loggerFactory: LoggerFactory + private val loggerFactory: LoggerFactory, + defaultExceptionHandler: CoroutineExceptionHandler? = null ) : CoroutineScope by scope, IElectrumClient { private val logger = loggerFactory.newLogger(this::class) @@ -98,9 +101,11 @@ class ElectrumClient( } } - private fun establishConnection(serverAddress: ServerAddress) = launch(CoroutineExceptionHandler { _, exception -> - logger.error(exception) { "error starting electrum client: " } - }) { + val exceptionHandler = defaultExceptionHandler ?: CoroutineExceptionHandler { _, exception -> + logger.error(exception) { "error starting electrum client" } + } + + private fun establishConnection(serverAddress: ServerAddress) = launch(exceptionHandler) { _connectionStatus.value = ElectrumConnectionStatus.Connecting val socket: TcpSocket = try { val (host, port, tls) = serverAddress @@ -138,22 +143,41 @@ class ElectrumClient( } val flow = socket.linesFlow().map { json.decodeFromString(ElectrumResponseDeserializer, it) } - val version = ServerVersion() - sendRequest(version, 0) val rpcFlow = flow.filterIsInstance>().map { it.value } + var requestId = 0 + + val version = ServerVersion() + sendRequest(version, requestId++) val theirVersion = parseJsonResponse(version, rpcFlow.first()) require(theirVersion is ServerVersionResponse) { "invalid server version response $theirVersion" } logger.info { "server version $theirVersion" } - sendRequest(HeaderSubscription, 0) + + sendRequest(HeaderSubscription, requestId++) val header = parseJsonResponse(HeaderSubscription, rpcFlow.first()) require(header is HeaderSubscriptionResponse) { "invalid header subscription response $header" } + + suspend fun estimateFee(confirmations: Int): EstimateFeeResponse { + val request = EstimateFees(confirmations) + sendRequest(request, requestId++) + val response = parseJsonResponse(request, rpcFlow.first()) + require(response is EstimateFeeResponse) { "invalid estimatefee response $response" } + return response + } + + val fees = listOf(estimateFee(2), estimateFee(6), estimateFee(18), estimateFee(144)) + logger.info { "onchain fees $fees" } + val feeRates = OnChainFeerates( + fundingFeerate = fees[3].feerate ?: FeeratePerKw(FeeratePerByte(2.sat)), + mutualCloseFeerate = fees[2].feerate ?: FeeratePerKw(FeeratePerByte(10.sat)), + claimMainFeerate = fees[1].feerate ?: FeeratePerKw(FeeratePerByte(20.sat)), + fastFeerate = fees[0].feerate ?: FeeratePerKw(FeeratePerByte(50.sat)) + ) _notifications.emit(header) - _connectionStatus.value = ElectrumConnectionStatus.Connected(theirVersion, header.blockHeight, header.header) + _connectionStatus.value = ElectrumConnectionStatus.Connected(theirVersion, header.blockHeight, header.header, feeRates) logger.info { "server tip $header" } // pending requests map val requestMap = mutableMapOf>>() - var requestId = 0 // reset mailbox mailbox.cancel(CancellationException("connection in progress")) diff --git a/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt b/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt index f43feb44c..862eb5922 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt @@ -193,11 +193,11 @@ class Peer( } } launch { - watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.collect { - // onchain fees are retrieved punctually, when electrum status moves to Connection.ESTABLISHED - // since the application is not running most of the time, and when it is, it will be only for a few minutes, this is good enough. - // (for a node that is online most of the time things would be different and we would need to re-evaluate onchain fee estimates on a regular basis) - updateEstimateFees() + watcher.client.connectionStatus.filterIsInstance().collect { + // Onchain fees are retrieved once when we establish a connection to an electrum server. + // It is acceptable since the application will typically not be running more than a few minutes at a time. + // (for a node that is online most of the time things would be different, and we would need to re-evaluate onchain fee estimates on a regular basis) + onChainFeeratesFlow.value = it.onchainFeeRates } } launch { @@ -255,24 +255,6 @@ class Peer( } } - private suspend fun updateEstimateFees() { - watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.first() - val sortedFees = listOf( - watcher.client.estimateFees(2), - watcher.client.estimateFees(6), - watcher.client.estimateFees(18), - watcher.client.estimateFees(144), - ) - logger.info { "on-chain fees: $sortedFees" } - // TODO: If some feerates are null, we may implement a retry - onChainFeeratesFlow.value = OnChainFeerates( - fundingFeerate = sortedFees[3].feerate ?: FeeratePerKw(FeeratePerByte(2.sat)), - mutualCloseFeerate = sortedFees[2].feerate ?: FeeratePerKw(FeeratePerByte(10.sat)), - claimMainFeerate = sortedFees[1].feerate ?: FeeratePerKw(FeeratePerByte(20.sat)), - fastFeerate = sortedFees[0].feerate ?: FeeratePerKw(FeeratePerByte(50.sat)) - ) - } - fun connect() { if (connectionState.value is Connection.CLOSED) establishConnection() else logger.warning { "Peer is already connecting / connected" } diff --git a/src/commonMain/kotlin/fr/acinq/lightning/io/TcpSocket.kt b/src/commonMain/kotlin/fr/acinq/lightning/io/TcpSocket.kt index 6e9f6c4f7..0336b8164 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/io/TcpSocket.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/io/TcpSocket.kt @@ -20,6 +20,18 @@ interface TcpSocket { suspend fun receiveFully(buffer: ByteArray, offset: Int, length: Int) suspend fun receiveAvailable(buffer: ByteArray, offset: Int, length: Int): Int + fun linesFlow(): Flow { + return flow { + val buffer = ByteArray(8192) + while (true) { + val size = receiveAvailable(buffer) + emit(buffer.subArray(size)) + } + } + .decodeToString() + .splitByLines() + } + suspend fun startTls(tls: TLS): TcpSocket fun close() @@ -69,11 +81,3 @@ suspend fun TcpSocket.receiveAvailable(buffer: ByteArray) = receiveAvailable(buf internal expect object PlatformSocketBuilder : TcpSocket.Builder suspend fun TcpSocket.receiveFully(size: Int): ByteArray = ByteArray(size).also { receiveFully(it) } - -fun TcpSocket.linesFlow(): Flow = flow { - val buffer = ByteArray(8192) - while (true) { - val size = receiveAvailable(buffer) - emit(buffer.subArray(size)) - } -}.decodeToString().splitByLines() \ No newline at end of file diff --git a/src/commonTest/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientTest.kt b/src/commonTest/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientTest.kt index b68bcd5b4..7bd605ebe 100644 --- a/src/commonTest/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientTest.kt +++ b/src/commonTest/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientTest.kt @@ -2,15 +2,20 @@ package fr.acinq.lightning.blockchain.electrum import fr.acinq.bitcoin.* import fr.acinq.lightning.blockchain.fee.FeeratePerKw +import fr.acinq.lightning.io.TcpSocket import fr.acinq.lightning.tests.utils.LightningTestSuite import fr.acinq.lightning.tests.utils.runSuspendTest import fr.acinq.lightning.utils.Connection +import fr.acinq.lightning.utils.ServerAddress import fr.acinq.lightning.utils.toByteVector32 import fr.acinq.secp256k1.Hex -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.flow.first -import kotlinx.coroutines.joinAll -import kotlinx.coroutines.launch +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.jsonObject +import kotlinx.serialization.json.jsonPrimitive +import org.kodein.log.LoggerFactory +import org.kodein.log.newLogger import kotlin.test.* import kotlin.time.Duration.Companion.seconds @@ -177,4 +182,76 @@ class ElectrumClientTest : LightningTestSuite() { client.stop() } + + @OptIn(DelicateCoroutinesApi::class) + @Test + fun `catch coroutine errors`() { + val myCustomError = "this is a test error" + + class MyTcpSocket() : TcpSocket { + val output = MutableSharedFlow() + override suspend fun send(bytes: ByteArray?, offset: Int, length: Int, flush: Boolean) { + if (bytes != null) { + CoroutineScope(Dispatchers.IO).launch { + val encoded = bytes.decodeToString(offset, offset + length) + val request = Json.parseToJsonElement(encoded) + val response = when (request.jsonObject["method"]!!.jsonPrimitive.content) { + "server.version" -> """{"jsonrpc": "2.0", "result": ["ElectrumX 1.15.0", "1.4"], "id": 0}""" + "blockchain.headers.subscribe" -> """{"jsonrpc": "2.0", "result": {"hex": "000080209a35ef4422bc37b0e1c3df9d32cfaaef6a6d31047c0202000000000000000000b9f14c32922d305844c739829ef13df9d188953e74a392720c02eeadd93acbf9ae22a464be8e05174bc5c367", "height": 797144}, "id": 1}""" + "blockchain.estimatefee" -> """{"jsonrpc": "2.0", "error": {"code": 42, "message": "$myCustomError"}, "id": 2}""" // we return an error, as if estimatefee had failed + else -> """{"jsonrpc": "2.0", "error": {"code": 43, "message": "unhandled request"}, "id": 2}""" + } + output.emit(response) + } + } + } + + override suspend fun receiveFully(buffer: ByteArray, offset: Int, length: Int) = TODO("Not yet implemented") + override suspend fun receiveAvailable(buffer: ByteArray, offset: Int, length: Int): Int = TODO("Not yet implemented") + override suspend fun startTls(tls: TcpSocket.TLS): TcpSocket = TODO("Not yet implemented") + override fun close() {} + override fun linesFlow(): Flow = output.asSharedFlow() + } + + class MyBuilder() : TcpSocket.Builder { + override suspend fun connect(host: String, port: Int, tls: TcpSocket.TLS, loggerFactory: LoggerFactory): TcpSocket { + return MyTcpSocket() + } + } + + val errorFlow = MutableStateFlow(null) + val loggerFactory = LoggerFactory.default + val logger = loggerFactory.newLogger(this::class) + val myErrorHandler = CoroutineExceptionHandler { _, e -> + logger.error(e) { "error caught in custom exception handler" } + errorFlow.value = e + } + + runBlocking(Dispatchers.IO) { + withTimeout(15.seconds) { + val builder = MyBuilder() + // from Kotlin's documentation: + // all children coroutines (coroutines created in the context of another Job) delegate handling of their exceptions to their parent coroutine, which + // also delegates to the parent, and so on until the root, so the CoroutineExceptionHandler installed in their context is never used + // => here we need to create a new root scope otherwise our exception handler will not be used + val client = ElectrumClient(builder, GlobalScope, LoggerFactory.default, myErrorHandler) + client.connect(ServerAddress("my-test-node", 50002, TcpSocket.TLS.DISABLED)) // address and port do not matter, but we cannot use TLS (not implemented, see above) + errorFlow.filterNotNull().first { it.message!!.contains(myCustomError) } + client.stop() + } + + // if we use runBlocking's scope, our exception handler will not be used + errorFlow.value = null + val error = assertFails { + withTimeout(15.seconds) { + val builder = MyBuilder() + val client = ElectrumClient(builder, this, LoggerFactory.default, myErrorHandler) + client.connect(ServerAddress("my-test-node", 50002, TcpSocket.TLS.DISABLED)) // address and port do not matter, but we cannot use TLS (not implemented, see above) + errorFlow.filterNotNull().first { it.message!!.contains(myCustomError) } + client.stop() + } + } + assertTrue(error.message!!.contains(myCustomError)) + } + } }