Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent deadlocks in Peer #484

Merged
merged 1 commit into from
Jun 21, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import fr.acinq.secp256k1.Hex
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.*
import org.kodein.log.newLogger
Expand Down Expand Up @@ -110,8 +110,10 @@ class Peer(

val remoteNodeId: PublicKey = walletParams.trampolineNode.id

private val input = Channel<PeerCommand>(BUFFERED)
private var output = Channel<ByteArray>(BUFFERED)
// We use unlimited buffers, otherwise we may end up in a deadlock since we're both
// receiving *and* sending to those channels in the same coroutine.
private val input = Channel<PeerCommand>(UNLIMITED)
private var output = Channel<ByteArray>(UNLIMITED)
val outputLightningMessages: ReceiveChannel<ByteArray> get() = output

private val logger = MDCLogger(nodeParams.loggerFactory.newLogger(this::class), staticMdc = mapOf("remoteNodeId" to remoteNodeId))
Expand Down Expand Up @@ -365,7 +367,7 @@ class Peer(

suspend fun respond() {
// Reset the output channel to avoid sending obsolete messages
output = Channel(BUFFERED)
output = Channel(UNLIMITED)
for (msg in output) send(msg)
}

Expand Down