Skip to content

Commit

Permalink
Retrieve onchain feerate when we connect to an electrum server
Browse files Browse the repository at this point in the history
When we connect to an electrum sever we perform a "handshake" that includes exchanging protocol version messages and
retrieving the server's current tip, and now we also retrieve onchain fees.

Since this is done during the connection handshake, errors will be caught by the corouting exception handler that
we use in the client and will not crash the application.
  • Loading branch information
sstone committed Jun 29, 2023
1 parent b70fb15 commit c9c826d
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 49 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ plugins {

allprojects {
group = "fr.acinq.lightning"
version = "1.5.0"
version = "1.5.1-SNAPSHOT"

repositories {
// using the local maven repository with Kotlin Multi Platform can lead to build errors that are hard to diagnose.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package fr.acinq.lightning.blockchain.electrum

import fr.acinq.bitcoin.*
import fr.acinq.lightning.blockchain.fee.FeeratePerByte
import fr.acinq.lightning.blockchain.fee.FeeratePerKw
import fr.acinq.lightning.blockchain.fee.OnChainFeerates
import fr.acinq.lightning.io.TcpSocket
import fr.acinq.lightning.io.linesFlow
import fr.acinq.lightning.io.send
import fr.acinq.lightning.utils.*
import kotlinx.coroutines.*
Expand All @@ -23,14 +25,15 @@ sealed interface ElectrumClientCommand {
sealed interface ElectrumConnectionStatus {
data class Closed(val reason: TcpSocket.IOException?) : ElectrumConnectionStatus
object Connecting : ElectrumConnectionStatus
data class Connected(val version: ServerVersionResponse, val height: Int, val header: BlockHeader) : ElectrumConnectionStatus
data class Connected(val version: ServerVersionResponse, val height: Int, val header: BlockHeader, val onchainFeeRates: OnChainFeerates) : ElectrumConnectionStatus
}

@OptIn(ExperimentalCoroutinesApi::class)
class ElectrumClient(
socketBuilder: TcpSocket.Builder?,
scope: CoroutineScope,
private val loggerFactory: LoggerFactory
private val loggerFactory: LoggerFactory,
exceptionHandler_opt: CoroutineExceptionHandler? = null
) : CoroutineScope by scope, IElectrumClient {

private val logger = loggerFactory.newLogger(this::class)
Expand Down Expand Up @@ -99,9 +102,11 @@ class ElectrumClient(
}
}

private fun establishConnection(serverAddress: ServerAddress) = launch(CoroutineExceptionHandler { _, exception ->
val exceptionHandler = exceptionHandler_opt ?: CoroutineExceptionHandler { _, exception ->
logger.error(exception) { "error starting electrum client" }
}) {
}

private fun establishConnection(serverAddress: ServerAddress) = launch(exceptionHandler) {
_connectionStatus.value = ElectrumConnectionStatus.Connecting
val socket: TcpSocket = try {
val (host, port, tls) = serverAddress
Expand Down Expand Up @@ -139,22 +144,41 @@ class ElectrumClient(
}

val flow = socket.linesFlow().map { json.decodeFromString(ElectrumResponseDeserializer, it) }
val version = ServerVersion()
sendRequest(version, 0)
val rpcFlow = flow.filterIsInstance<Either.Right<Nothing, JsonRPCResponse>>().map { it.value }
var requestId = 0

val version = ServerVersion()
sendRequest(version, requestId++)
val theirVersion = parseJsonResponse(version, rpcFlow.first())
require(theirVersion is ServerVersionResponse) { "invalid server version response $theirVersion" }
logger.info { "server version $theirVersion" }
sendRequest(HeaderSubscription, 0)

sendRequest(HeaderSubscription, requestId++)
val header = parseJsonResponse(HeaderSubscription, rpcFlow.first())
require(header is HeaderSubscriptionResponse) { "invalid header subscription response $header" }

suspend fun estimateFee(confirmations: Int): EstimateFeeResponse {
val request = EstimateFees(confirmations)
sendRequest(request, requestId++)
val response = parseJsonResponse(request, rpcFlow.first())
require(response is EstimateFeeResponse) { "invalid estimatefee response $response" }
return response
}

val fees = listOf(estimateFee(2), estimateFee(6), estimateFee(18), estimateFee(144))
logger.info { "onchain fees $fees" }
val feeRates = OnChainFeerates(
fundingFeerate = fees[3].feerate ?: FeeratePerKw(FeeratePerByte(2.sat)),
mutualCloseFeerate = fees[2].feerate ?: FeeratePerKw(FeeratePerByte(10.sat)),
claimMainFeerate = fees[1].feerate ?: FeeratePerKw(FeeratePerByte(20.sat)),
fastFeerate = fees[0].feerate ?: FeeratePerKw(FeeratePerByte(50.sat))
)
_notifications.emit(header)
_connectionStatus.value = ElectrumConnectionStatus.Connected(theirVersion, header.blockHeight, header.header)
_connectionStatus.value = ElectrumConnectionStatus.Connected(theirVersion, header.blockHeight, header.header, feeRates)
logger.info { "server tip $header" }

// pending requests map
val requestMap = mutableMapOf<Int, Pair<ElectrumRequest, CompletableDeferred<ElectrumResponse>>>()
var requestId = 0

// reset mailbox
mailbox.cancel(CancellationException("connection in progress"))
Expand Down
28 changes: 5 additions & 23 deletions src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,11 @@ class Peer(
}
}
launch {
watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.collect {
// onchain fees are retrieved punctually, when electrum status moves to Connection.ESTABLISHED
// since the application is not running most of the time, and when it is, it will be only for a few minutes, this is good enough.
// (for a node that is online most of the time things would be different and we would need to re-evaluate onchain fee estimates on a regular basis)
updateEstimateFees()
watcher.client.connectionStatus.filterIsInstance<ElectrumConnectionStatus.Connected>().collect {
// Onchain fees are retrieved once when we establish a connection to an electrum server.
// It is acceptable since the application will typically not be running more than a few minutes at a time.
// (for a node that is online most of the time things would be different, and we would need to re-evaluate onchain fee estimates on a regular basis)
onChainFeeratesFlow.value = it.onchainFeeRates
}
}
launch {
Expand Down Expand Up @@ -257,24 +257,6 @@ class Peer(
}
}

private suspend fun updateEstimateFees() {
watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.first()
val sortedFees = listOf(
watcher.client.estimateFees(2),
watcher.client.estimateFees(6),
watcher.client.estimateFees(18),
watcher.client.estimateFees(144),
)
logger.info { "on-chain fees: $sortedFees" }
// TODO: If some feerates are null, we may implement a retry
onChainFeeratesFlow.value = OnChainFeerates(
fundingFeerate = sortedFees[3].feerate ?: FeeratePerKw(FeeratePerByte(2.sat)),
mutualCloseFeerate = sortedFees[2].feerate ?: FeeratePerKw(FeeratePerByte(10.sat)),
claimMainFeerate = sortedFees[1].feerate ?: FeeratePerKw(FeeratePerByte(20.sat)),
fastFeerate = sortedFees[0].feerate ?: FeeratePerKw(FeeratePerByte(50.sat))
)
}

fun connect() {
if (connectionState.value is Connection.CLOSED) establishConnection()
else logger.warning { "Peer is already connecting / connected" }
Expand Down
23 changes: 12 additions & 11 deletions src/commonMain/kotlin/fr/acinq/lightning/io/TcpSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ interface TcpSocket {
suspend fun receiveFully(buffer: ByteArray, offset: Int, length: Int)
suspend fun receiveAvailable(buffer: ByteArray, offset: Int, length: Int): Int

fun linesFlow(): Flow<String> {
return flow {
val buffer = ByteArray(8192)
while (true) {
val size = receiveAvailable(buffer)
emit(buffer.subArray(size))
}
}
.decodeToString()
.splitByLines()
}

suspend fun startTls(tls: TLS): TcpSocket

fun close()
Expand Down Expand Up @@ -71,14 +83,3 @@ internal expect object PlatformSocketBuilder : TcpSocket.Builder

suspend fun TcpSocket.receiveFully(size: Int): ByteArray =
ByteArray(size).also { receiveFully(it) }

fun TcpSocket.linesFlow(): Flow<String> =
flow {
val buffer = ByteArray(8192)
while (true) {
val size = receiveAvailable(buffer)
emit(buffer.subArray(size))
}
}
.decodeToString()
.splitByLines()
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package fr.acinq.lightning.blockchain.electrum

import fr.acinq.bitcoin.*
import fr.acinq.lightning.blockchain.fee.FeeratePerKw
import fr.acinq.lightning.io.TcpSocket
import fr.acinq.lightning.tests.utils.LightningTestSuite
import fr.acinq.lightning.tests.utils.runSuspendTest
import fr.acinq.lightning.utils.Connection
import fr.acinq.lightning.utils.ServerAddress
import fr.acinq.lightning.utils.toByteVector32
import fr.acinq.secp256k1.Hex
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.kodein.log.LoggerFactory
import kotlin.test.*
import kotlin.time.Duration.Companion.seconds

Expand Down Expand Up @@ -177,4 +178,45 @@ class ElectrumClientTest : LightningTestSuite() {

client.stop()
}

@OptIn(DelicateCoroutinesApi::class)
@Test
fun `catch coroutine errors`() {
val myCustomError = "this is a test error"

class MyTcpSopcket(val socket: TcpSocket) : TcpSocket by socket {
override fun linesFlow(): Flow<String> {
return super.linesFlow().map {
// during the handshake with the electrum server we first ask for the server version, then headers, fee rates
// so id == 2 means we're asking for fee rates, and here we return an error
val sendError = it.contains("\"id\": 2")
if (sendError) {
"""{"jsonrpc": "2.0", "error": {"code": 42, "message": "$myCustomError"}, "id": 2}"""
} else {
it
}
}
}
}

class MyBuilder(val builder: TcpSocket.Builder) : TcpSocket.Builder {
override suspend fun connect(host: String, port: Int, tls: TcpSocket.TLS, loggerFactory: LoggerFactory): TcpSocket {
val socket = builder.connect(host, port, tls, loggerFactory)
return MyTcpSopcket(socket)
}
}

runBlocking {
val builder = MyBuilder(TcpSocket.Builder())
val errorFlow = MutableStateFlow<Throwable?>(null)
val myErrorHandler = CoroutineExceptionHandler { _, e -> errorFlow.value = e }
val client = ElectrumClient(builder, GlobalScope, LoggerFactory.default, myErrorHandler)
client.connect(ServerAddress("electrum.acinq.co", 50002, TcpSocket.TLS.UNSAFE_CERTIFICATES))
client.connectionState.first { it is Connection.CLOSED }
client.connectionState.first { it is Connection.ESTABLISHING }
val error = errorFlow.filterNotNull().first()
assertTrue(error.message!!.contains(myCustomError))
client.stop()
}
}
}

0 comments on commit c9c826d

Please sign in to comment.