Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retrieve onchain feerate when we connect to an electrum server #490

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package fr.acinq.lightning.blockchain.electrum

import fr.acinq.bitcoin.*
import fr.acinq.lightning.blockchain.fee.FeeratePerByte
import fr.acinq.lightning.blockchain.fee.FeeratePerKw
import fr.acinq.lightning.blockchain.fee.OnChainFeerates
import fr.acinq.lightning.io.TcpSocket
import fr.acinq.lightning.io.linesFlow
import fr.acinq.lightning.io.send
import fr.acinq.lightning.utils.*
import kotlinx.coroutines.*
Expand All @@ -23,13 +25,14 @@ sealed interface ElectrumClientCommand {
sealed interface ElectrumConnectionStatus {
data class Closed(val reason: TcpSocket.IOException?) : ElectrumConnectionStatus
object Connecting : ElectrumConnectionStatus
data class Connected(val version: ServerVersionResponse, val height: Int, val header: BlockHeader) : ElectrumConnectionStatus
data class Connected(val version: ServerVersionResponse, val height: Int, val header: BlockHeader, val onchainFeeRates: OnChainFeerates) : ElectrumConnectionStatus
}

class ElectrumClient(
socketBuilder: TcpSocket.Builder?,
scope: CoroutineScope,
private val loggerFactory: LoggerFactory
private val loggerFactory: LoggerFactory,
defaultExceptionHandler: CoroutineExceptionHandler? = null
) : CoroutineScope by scope, IElectrumClient {

private val logger = loggerFactory.newLogger(this::class)
Expand Down Expand Up @@ -98,9 +101,11 @@ class ElectrumClient(
}
}

private fun establishConnection(serverAddress: ServerAddress) = launch(CoroutineExceptionHandler { _, exception ->
logger.error(exception) { "error starting electrum client: " }
}) {
val exceptionHandler = defaultExceptionHandler ?: CoroutineExceptionHandler { _, exception ->
logger.error(exception) { "error starting electrum client" }
}

private fun establishConnection(serverAddress: ServerAddress) = launch(exceptionHandler) {
_connectionStatus.value = ElectrumConnectionStatus.Connecting
val socket: TcpSocket = try {
val (host, port, tls) = serverAddress
Expand Down Expand Up @@ -138,22 +143,41 @@ class ElectrumClient(
}

val flow = socket.linesFlow().map { json.decodeFromString(ElectrumResponseDeserializer, it) }
val version = ServerVersion()
sendRequest(version, 0)
val rpcFlow = flow.filterIsInstance<Either.Right<Nothing, JsonRPCResponse>>().map { it.value }
var requestId = 0

val version = ServerVersion()
sendRequest(version, requestId++)
val theirVersion = parseJsonResponse(version, rpcFlow.first())
require(theirVersion is ServerVersionResponse) { "invalid server version response $theirVersion" }
logger.info { "server version $theirVersion" }
sendRequest(HeaderSubscription, 0)

sendRequest(HeaderSubscription, requestId++)
val header = parseJsonResponse(HeaderSubscription, rpcFlow.first())
require(header is HeaderSubscriptionResponse) { "invalid header subscription response $header" }

suspend fun estimateFee(confirmations: Int): EstimateFeeResponse {
val request = EstimateFees(confirmations)
sendRequest(request, requestId++)
val response = parseJsonResponse(request, rpcFlow.first())
require(response is EstimateFeeResponse) { "invalid estimatefee response $response" }
return response
}

val fees = listOf(estimateFee(2), estimateFee(6), estimateFee(18), estimateFee(144))
logger.info { "onchain fees $fees" }
val feeRates = OnChainFeerates(
fundingFeerate = fees[3].feerate ?: FeeratePerKw(FeeratePerByte(2.sat)),
mutualCloseFeerate = fees[2].feerate ?: FeeratePerKw(FeeratePerByte(10.sat)),
claimMainFeerate = fees[1].feerate ?: FeeratePerKw(FeeratePerByte(20.sat)),
fastFeerate = fees[0].feerate ?: FeeratePerKw(FeeratePerByte(50.sat))
)
_notifications.emit(header)
_connectionStatus.value = ElectrumConnectionStatus.Connected(theirVersion, header.blockHeight, header.header)
_connectionStatus.value = ElectrumConnectionStatus.Connected(theirVersion, header.blockHeight, header.header, feeRates)
logger.info { "server tip $header" }

// pending requests map
val requestMap = mutableMapOf<Int, Pair<ElectrumRequest, CompletableDeferred<ElectrumResponse>>>()
var requestId = 0

// reset mailbox
mailbox.cancel(CancellationException("connection in progress"))
Expand Down
28 changes: 5 additions & 23 deletions src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,11 @@ class Peer(
}
}
launch {
watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.collect {
// onchain fees are retrieved punctually, when electrum status moves to Connection.ESTABLISHED
// since the application is not running most of the time, and when it is, it will be only for a few minutes, this is good enough.
// (for a node that is online most of the time things would be different and we would need to re-evaluate onchain fee estimates on a regular basis)
updateEstimateFees()
watcher.client.connectionStatus.filterIsInstance<ElectrumConnectionStatus.Connected>().collect {
// Onchain fees are retrieved once when we establish a connection to an electrum server.
// It is acceptable since the application will typically not be running more than a few minutes at a time.
// (for a node that is online most of the time things would be different, and we would need to re-evaluate onchain fee estimates on a regular basis)
onChainFeeratesFlow.value = it.onchainFeeRates
}
}
launch {
Expand Down Expand Up @@ -255,24 +255,6 @@ class Peer(
}
}

private suspend fun updateEstimateFees() {
watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.first()
val sortedFees = listOf(
watcher.client.estimateFees(2),
watcher.client.estimateFees(6),
watcher.client.estimateFees(18),
watcher.client.estimateFees(144),
)
logger.info { "on-chain fees: $sortedFees" }
// TODO: If some feerates are null, we may implement a retry
onChainFeeratesFlow.value = OnChainFeerates(
fundingFeerate = sortedFees[3].feerate ?: FeeratePerKw(FeeratePerByte(2.sat)),
mutualCloseFeerate = sortedFees[2].feerate ?: FeeratePerKw(FeeratePerByte(10.sat)),
claimMainFeerate = sortedFees[1].feerate ?: FeeratePerKw(FeeratePerByte(20.sat)),
fastFeerate = sortedFees[0].feerate ?: FeeratePerKw(FeeratePerByte(50.sat))
)
}

fun connect() {
if (connectionState.value is Connection.CLOSED) establishConnection()
else logger.warning { "Peer is already connecting / connected" }
Expand Down
20 changes: 12 additions & 8 deletions src/commonMain/kotlin/fr/acinq/lightning/io/TcpSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@ interface TcpSocket {
suspend fun receiveFully(buffer: ByteArray, offset: Int, length: Int)
suspend fun receiveAvailable(buffer: ByteArray, offset: Int, length: Int): Int

fun linesFlow(): Flow<String> {
return flow {
val buffer = ByteArray(8192)
while (true) {
val size = receiveAvailable(buffer)
emit(buffer.subArray(size))
}
}
.decodeToString()
.splitByLines()
}

suspend fun startTls(tls: TLS): TcpSocket

fun close()
Expand Down Expand Up @@ -69,11 +81,3 @@ suspend fun TcpSocket.receiveAvailable(buffer: ByteArray) = receiveAvailable(buf
internal expect object PlatformSocketBuilder : TcpSocket.Builder

suspend fun TcpSocket.receiveFully(size: Int): ByteArray = ByteArray(size).also { receiveFully(it) }

fun TcpSocket.linesFlow(): Flow<String> = flow {
val buffer = ByteArray(8192)
while (true) {
val size = receiveAvailable(buffer)
emit(buffer.subArray(size))
}
}.decodeToString().splitByLines()
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@ package fr.acinq.lightning.blockchain.electrum

import fr.acinq.bitcoin.*
import fr.acinq.lightning.blockchain.fee.FeeratePerKw
import fr.acinq.lightning.io.TcpSocket
import fr.acinq.lightning.tests.utils.LightningTestSuite
import fr.acinq.lightning.tests.utils.runSuspendTest
import fr.acinq.lightning.utils.Connection
import fr.acinq.lightning.utils.ServerAddress
import fr.acinq.lightning.utils.toByteVector32
import fr.acinq.secp256k1.Hex
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.jsonObject
import kotlinx.serialization.json.jsonPrimitive
import org.kodein.log.LoggerFactory
import org.kodein.log.newLogger
import kotlin.test.*
import kotlin.time.Duration.Companion.seconds

Expand Down Expand Up @@ -177,4 +182,76 @@ class ElectrumClientTest : LightningTestSuite() {

client.stop()
}

@OptIn(DelicateCoroutinesApi::class)
@Test
fun `catch coroutine errors`() {
val myCustomError = "this is a test error"

class MyTcpSocket() : TcpSocket {
val output = MutableSharedFlow<String>()
override suspend fun send(bytes: ByteArray?, offset: Int, length: Int, flush: Boolean) {
if (bytes != null) {
CoroutineScope(Dispatchers.IO).launch {
val encoded = bytes.decodeToString(offset, offset + length)
val request = Json.parseToJsonElement(encoded)
val response = when (request.jsonObject["method"]!!.jsonPrimitive.content) {
"server.version" -> """{"jsonrpc": "2.0", "result": ["ElectrumX 1.15.0", "1.4"], "id": 0}"""
"blockchain.headers.subscribe" -> """{"jsonrpc": "2.0", "result": {"hex": "000080209a35ef4422bc37b0e1c3df9d32cfaaef6a6d31047c0202000000000000000000b9f14c32922d305844c739829ef13df9d188953e74a392720c02eeadd93acbf9ae22a464be8e05174bc5c367", "height": 797144}, "id": 1}"""
"blockchain.estimatefee" -> """{"jsonrpc": "2.0", "error": {"code": 42, "message": "$myCustomError"}, "id": 2}""" // we return an error, as if estimatefee had failed
else -> """{"jsonrpc": "2.0", "error": {"code": 43, "message": "unhandled request"}, "id": 2}"""
}
output.emit(response)
}
}
}

override suspend fun receiveFully(buffer: ByteArray, offset: Int, length: Int) = TODO("Not yet implemented")
override suspend fun receiveAvailable(buffer: ByteArray, offset: Int, length: Int): Int = TODO("Not yet implemented")
override suspend fun startTls(tls: TcpSocket.TLS): TcpSocket = TODO("Not yet implemented")
override fun close() {}
override fun linesFlow(): Flow<String> = output.asSharedFlow()
}

class MyBuilder() : TcpSocket.Builder {
override suspend fun connect(host: String, port: Int, tls: TcpSocket.TLS, loggerFactory: LoggerFactory): TcpSocket {
return MyTcpSocket()
}
}

val errorFlow = MutableStateFlow<Throwable?>(null)
val loggerFactory = LoggerFactory.default
val logger = loggerFactory.newLogger(this::class)
val myErrorHandler = CoroutineExceptionHandler { _, e ->
logger.error(e) { "error caught in custom exception handler" }
errorFlow.value = e
}

runBlocking(Dispatchers.IO) {
withTimeout(15.seconds) {
val builder = MyBuilder()
// from Kotlin's documentation:
// all children coroutines (coroutines created in the context of another Job) delegate handling of their exceptions to their parent coroutine, which
// also delegates to the parent, and so on until the root, so the CoroutineExceptionHandler installed in their context is never used
// => here we need to create a new root scope otherwise our exception handler will not be used
val client = ElectrumClient(builder, GlobalScope, LoggerFactory.default, myErrorHandler)
client.connect(ServerAddress("my-test-node", 50002, TcpSocket.TLS.DISABLED)) // address and port do not matter, but we cannot use TLS (not implemented, see above)
errorFlow.filterNotNull().first { it.message!!.contains(myCustomError) }
client.stop()
}

// if we use runBlocking's scope, our exception handler will not be used
errorFlow.value = null
val error = assertFails {
withTimeout(15.seconds) {
val builder = MyBuilder()
val client = ElectrumClient(builder, this, LoggerFactory.default, myErrorHandler)
client.connect(ServerAddress("my-test-node", 50002, TcpSocket.TLS.DISABLED)) // address and port do not matter, but we cannot use TLS (not implemented, see above)
errorFlow.filterNotNull().first { it.message!!.contains(myCustomError) }
client.stop()
}
}
assertTrue(error.message!!.contains(myCustomError))
}
}
}