Skip to content

Commit

Permalink
Prevent deadlocks in Peer (#484)
Browse files Browse the repository at this point in the history
We were using BUFFERED channels, with the default behavior where the
sender suspends when the buffer is full (64 elements by default).

The issue is that the same coroutine receives from the `input` channel
and emits potentially multiple events to that same channel. We would thus
fill the buffer, which causes us to `suspend`, which stops dequeuing
events, leading to a complete deadlock.
  • Loading branch information
t-bast committed Jun 21, 2023
1 parent 9b0f37b commit 4b65d7a
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import fr.acinq.secp256k1.Hex
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.*
import org.kodein.log.newLogger
Expand Down Expand Up @@ -113,8 +113,10 @@ class Peer(

val remoteNodeId: PublicKey = walletParams.trampolineNode.id

private val input = Channel<PeerCommand>(BUFFERED)
private var output = Channel<ByteArray>(BUFFERED)
// We use unlimited buffers, otherwise we may end up in a deadlock since we're both
// receiving *and* sending to those channels in the same coroutine.
private val input = Channel<PeerCommand>(UNLIMITED)
private var output = Channel<ByteArray>(UNLIMITED)
val outputLightningMessages: ReceiveChannel<ByteArray> get() = output

private val logger = MDCLogger(nodeParams.loggerFactory.newLogger(this::class), staticMdc = mapOf("remoteNodeId" to remoteNodeId))
Expand Down Expand Up @@ -368,7 +370,7 @@ class Peer(

suspend fun respond() {
// Reset the output channel to avoid sending obsolete messages
output = Channel(BUFFERED)
output = Channel(UNLIMITED)
for (msg in output) send(msg)
}

Expand Down

0 comments on commit 4b65d7a

Please sign in to comment.