Skip to content

Commit

Permalink
Retrieve onchain feerate when we connect to an electrum server
Browse files Browse the repository at this point in the history
When we connect to an electrum sever we perform a "handshake" that includes exchanging protocol version messages and
retrieving the server's current tip, and now we also retrieve onchain fees.

We also add an extra, optional CoroutineExceptionHandler to ElectrumClient's constructor, which can be used for testing or to specify a different
behaviour to the one that is currently hard-coded.

Since this is done during the connection handshake, errors will be caught by the corouting exception handler that
we use in the client and will not crash the application.
  • Loading branch information
sstone committed Jul 6, 2023
1 parent 7091e9a commit 636a768
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package fr.acinq.lightning.blockchain.electrum

import fr.acinq.bitcoin.*
import fr.acinq.lightning.blockchain.fee.FeeratePerByte
import fr.acinq.lightning.blockchain.fee.FeeratePerKw
import fr.acinq.lightning.blockchain.fee.OnChainFeerates
import fr.acinq.lightning.io.TcpSocket
import fr.acinq.lightning.io.linesFlow
import fr.acinq.lightning.io.send
import fr.acinq.lightning.utils.*
import kotlinx.coroutines.*
Expand All @@ -23,13 +25,14 @@ sealed interface ElectrumClientCommand {
sealed interface ElectrumConnectionStatus {
data class Closed(val reason: TcpSocket.IOException?) : ElectrumConnectionStatus
object Connecting : ElectrumConnectionStatus
data class Connected(val version: ServerVersionResponse, val height: Int, val header: BlockHeader) : ElectrumConnectionStatus
data class Connected(val version: ServerVersionResponse, val height: Int, val header: BlockHeader, val onchainFeeRates: OnChainFeerates) : ElectrumConnectionStatus
}

class ElectrumClient(
socketBuilder: TcpSocket.Builder?,
scope: CoroutineScope,
private val loggerFactory: LoggerFactory
private val loggerFactory: LoggerFactory,
defaultExceptionHandler: CoroutineExceptionHandler? = null
) : CoroutineScope by scope, IElectrumClient {

private val logger = loggerFactory.newLogger(this::class)
Expand Down Expand Up @@ -98,9 +101,11 @@ class ElectrumClient(
}
}

private fun establishConnection(serverAddress: ServerAddress) = launch(CoroutineExceptionHandler { _, exception ->
logger.error(exception) { "error starting electrum client: " }
}) {
val exceptionHandler = defaultExceptionHandler ?: CoroutineExceptionHandler { _, exception ->
logger.error(exception) { "error starting electrum client" }
}

private fun establishConnection(serverAddress: ServerAddress) = launch(exceptionHandler) {
_connectionStatus.value = ElectrumConnectionStatus.Connecting
val socket: TcpSocket = try {
val (host, port, tls) = serverAddress
Expand Down Expand Up @@ -138,22 +143,41 @@ class ElectrumClient(
}

val flow = socket.linesFlow().map { json.decodeFromString(ElectrumResponseDeserializer, it) }
val version = ServerVersion()
sendRequest(version, 0)
val rpcFlow = flow.filterIsInstance<Either.Right<Nothing, JsonRPCResponse>>().map { it.value }
var requestId = 0

val version = ServerVersion()
sendRequest(version, requestId++)
val theirVersion = parseJsonResponse(version, rpcFlow.first())
require(theirVersion is ServerVersionResponse) { "invalid server version response $theirVersion" }
logger.info { "server version $theirVersion" }
sendRequest(HeaderSubscription, 0)

sendRequest(HeaderSubscription, requestId++)
val header = parseJsonResponse(HeaderSubscription, rpcFlow.first())
require(header is HeaderSubscriptionResponse) { "invalid header subscription response $header" }

suspend fun estimateFee(confirmations: Int): EstimateFeeResponse {
val request = EstimateFees(confirmations)
sendRequest(request, requestId++)
val response = parseJsonResponse(request, rpcFlow.first())
require(response is EstimateFeeResponse) { "invalid estimatefee response $response" }
return response
}

val fees = listOf(estimateFee(2), estimateFee(6), estimateFee(18), estimateFee(144))
logger.info { "onchain fees $fees" }
val feeRates = OnChainFeerates(
fundingFeerate = fees[3].feerate ?: FeeratePerKw(FeeratePerByte(2.sat)),
mutualCloseFeerate = fees[2].feerate ?: FeeratePerKw(FeeratePerByte(10.sat)),
claimMainFeerate = fees[1].feerate ?: FeeratePerKw(FeeratePerByte(20.sat)),
fastFeerate = fees[0].feerate ?: FeeratePerKw(FeeratePerByte(50.sat))
)
_notifications.emit(header)
_connectionStatus.value = ElectrumConnectionStatus.Connected(theirVersion, header.blockHeight, header.header)
_connectionStatus.value = ElectrumConnectionStatus.Connected(theirVersion, header.blockHeight, header.header, feeRates)
logger.info { "server tip $header" }

// pending requests map
val requestMap = mutableMapOf<Int, Pair<ElectrumRequest, CompletableDeferred<ElectrumResponse>>>()
var requestId = 0

// reset mailbox
mailbox.cancel(CancellationException("connection in progress"))
Expand Down
28 changes: 5 additions & 23 deletions src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,11 @@ class Peer(
}
}
launch {
watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.collect {
// onchain fees are retrieved punctually, when electrum status moves to Connection.ESTABLISHED
// since the application is not running most of the time, and when it is, it will be only for a few minutes, this is good enough.
// (for a node that is online most of the time things would be different and we would need to re-evaluate onchain fee estimates on a regular basis)
updateEstimateFees()
watcher.client.connectionStatus.filterIsInstance<ElectrumConnectionStatus.Connected>().collect {
// Onchain fees are retrieved once when we establish a connection to an electrum server.
// It is acceptable since the application will typically not be running more than a few minutes at a time.
// (for a node that is online most of the time things would be different, and we would need to re-evaluate onchain fee estimates on a regular basis)
onChainFeeratesFlow.value = it.onchainFeeRates
}
}
launch {
Expand Down Expand Up @@ -255,24 +255,6 @@ class Peer(
}
}

private suspend fun updateEstimateFees() {
watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.first()
val sortedFees = listOf(
watcher.client.estimateFees(2),
watcher.client.estimateFees(6),
watcher.client.estimateFees(18),
watcher.client.estimateFees(144),
)
logger.info { "on-chain fees: $sortedFees" }
// TODO: If some feerates are null, we may implement a retry
onChainFeeratesFlow.value = OnChainFeerates(
fundingFeerate = sortedFees[3].feerate ?: FeeratePerKw(FeeratePerByte(2.sat)),
mutualCloseFeerate = sortedFees[2].feerate ?: FeeratePerKw(FeeratePerByte(10.sat)),
claimMainFeerate = sortedFees[1].feerate ?: FeeratePerKw(FeeratePerByte(20.sat)),
fastFeerate = sortedFees[0].feerate ?: FeeratePerKw(FeeratePerByte(50.sat))
)
}

fun connect() {
if (connectionState.value is Connection.CLOSED) establishConnection()
else logger.warning { "Peer is already connecting / connected" }
Expand Down
20 changes: 12 additions & 8 deletions src/commonMain/kotlin/fr/acinq/lightning/io/TcpSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@ interface TcpSocket {
suspend fun receiveFully(buffer: ByteArray, offset: Int, length: Int)
suspend fun receiveAvailable(buffer: ByteArray, offset: Int, length: Int): Int

fun linesFlow(): Flow<String> {
return flow {
val buffer = ByteArray(8192)
while (true) {
val size = receiveAvailable(buffer)
emit(buffer.subArray(size))
}
}
.decodeToString()
.splitByLines()
}

suspend fun startTls(tls: TLS): TcpSocket

fun close()
Expand Down Expand Up @@ -69,11 +81,3 @@ suspend fun TcpSocket.receiveAvailable(buffer: ByteArray) = receiveAvailable(buf
internal expect object PlatformSocketBuilder : TcpSocket.Builder

suspend fun TcpSocket.receiveFully(size: Int): ByteArray = ByteArray(size).also { receiveFully(it) }

fun TcpSocket.linesFlow(): Flow<String> = flow {
val buffer = ByteArray(8192)
while (true) {
val size = receiveAvailable(buffer)
emit(buffer.subArray(size))
}
}.decodeToString().splitByLines()
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@ package fr.acinq.lightning.blockchain.electrum

import fr.acinq.bitcoin.*
import fr.acinq.lightning.blockchain.fee.FeeratePerKw
import fr.acinq.lightning.io.TcpSocket
import fr.acinq.lightning.tests.utils.LightningTestSuite
import fr.acinq.lightning.tests.utils.runSuspendTest
import fr.acinq.lightning.utils.Connection
import fr.acinq.lightning.utils.ServerAddress
import fr.acinq.lightning.utils.toByteVector32
import fr.acinq.secp256k1.Hex
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.jsonObject
import kotlinx.serialization.json.jsonPrimitive
import org.kodein.log.LoggerFactory
import org.kodein.log.newLogger
import kotlin.test.*
import kotlin.time.Duration.Companion.seconds

Expand Down Expand Up @@ -177,4 +182,76 @@ class ElectrumClientTest : LightningTestSuite() {

client.stop()
}

@OptIn(DelicateCoroutinesApi::class)
@Test
fun `catch coroutine errors`() {
val myCustomError = "this is a test error"

class MyTcpSocket() : TcpSocket {
val output = MutableSharedFlow<String>()
override suspend fun send(bytes: ByteArray?, offset: Int, length: Int, flush: Boolean) {
if (bytes != null) {
CoroutineScope(Dispatchers.IO).launch {
val encoded = bytes.decodeToString(offset, offset + length)
val request = Json.parseToJsonElement(encoded)
val response = when (request.jsonObject["method"]!!.jsonPrimitive.content) {
"server.version" -> """{"jsonrpc": "2.0", "result": ["ElectrumX 1.15.0", "1.4"], "id": 0}"""
"blockchain.headers.subscribe" -> """{"jsonrpc": "2.0", "result": {"hex": "000080209a35ef4422bc37b0e1c3df9d32cfaaef6a6d31047c0202000000000000000000b9f14c32922d305844c739829ef13df9d188953e74a392720c02eeadd93acbf9ae22a464be8e05174bc5c367", "height": 797144}, "id": 1}"""
"blockchain.estimatefee" -> """{"jsonrpc": "2.0", "error": {"code": 42, "message": "$myCustomError"}, "id": 2}""" // we return an error, as if estimatefee had failed
else -> """{"jsonrpc": "2.0", "error": {"code": 43, "message": "unhandled request"}, "id": 2}"""
}
output.emit(response)
}
}
}

override suspend fun receiveFully(buffer: ByteArray, offset: Int, length: Int) = TODO("Not yet implemented")
override suspend fun receiveAvailable(buffer: ByteArray, offset: Int, length: Int): Int = TODO("Not yet implemented")
override suspend fun startTls(tls: TcpSocket.TLS): TcpSocket = TODO("Not yet implemented")
override fun close() {}
override fun linesFlow(): Flow<String> = output.asSharedFlow()
}

class MyBuilder() : TcpSocket.Builder {
override suspend fun connect(host: String, port: Int, tls: TcpSocket.TLS, loggerFactory: LoggerFactory): TcpSocket {
return MyTcpSocket()
}
}

val errorFlow = MutableStateFlow<Throwable?>(null)
val loggerFactory = LoggerFactory.default
val logger = loggerFactory.newLogger(this::class)
val myErrorHandler = CoroutineExceptionHandler { _, e ->
logger.error(e) { "error caught in custom exception handler" }
errorFlow.value = e
}

runBlocking(Dispatchers.IO) {
withTimeout(15.seconds) {
val builder = MyBuilder()
// from Kotlin's documentation:
// all children coroutines (coroutines created in the context of another Job) delegate handling of their exceptions to their parent coroutine, which
// also delegates to the parent, and so on until the root, so the CoroutineExceptionHandler installed in their context is never used
// => here we need to create a new root scope (or we could use GlobalScope) otherwise our exception handler will not be used
val client = ElectrumClient(builder, GlobalScope, LoggerFactory.default, myErrorHandler)
client.connect(ServerAddress("my-test-node", 50002, TcpSocket.TLS.DISABLED)) // address and port do not matter, but we cannot use TLS (not implemented, see above)
errorFlow.filterNotNull().first { it.message!!.contains(myCustomError) }
client.stop()
}

// if we use runBlocking's scope, our exception handler will not be used
errorFlow.value = null
val error = assertFails {
withTimeout(15.seconds) {
val builder = MyBuilder()
val client = ElectrumClient(builder, this, LoggerFactory.default, myErrorHandler)
client.connect(ServerAddress("my-test-node", 50002, TcpSocket.TLS.DISABLED)) // address and port do not matter, but we cannot use TLS (not implemented, see above)
errorFlow.filterNotNull().first { it.message!!.contains(myCustomError) }
client.stop()
}
}
assertTrue(error.message!!.contains(myCustomError))
}
}
}

0 comments on commit 636a768

Please sign in to comment.