Skip to content

Commit

Permalink
Don't throw NoPeersForOutboundMessageException if peers DONTWANT mess…
Browse files Browse the repository at this point in the history
…age (#385)
  • Loading branch information
StefanBratanov authored Oct 16, 2024
1 parent a32f486 commit cfdff90
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -416,15 +416,20 @@ open class GossipRouter(
}
.flatten()
}
val list = peers
.filterNot { peerDoesNotWantMessage(it, msg.messageId) }
.map { submitPublishMessage(it, msg) }

mCache += msg
flushAllPending()

return if (list.isNotEmpty()) {
anyComplete(list)
return if (peers.isNotEmpty()) {
val publishedMessages = peers
.filterNot { peerDoesNotWantMessage(it, msg.messageId) }
.map { submitPublishMessage(it, msg) }
if (publishedMessages.isEmpty()) {
// all peers have sent IDONTWANT for this message id
CompletableFuture.completedFuture(Unit)
} else {
flushAllPending()
anyComplete(publishedMessages)
}
} else {
completedExceptionally(
NoPeersForOutboundMessageException("No peers for message topics ${msg.topics} found")
Expand Down Expand Up @@ -614,7 +619,7 @@ open class GossipRouter(
.flatten()
.distinct()
.minus(receivedFrom)
.forEach { peer -> sendIdontwant(peer, msg.messageId) }
.forEach { sendIdontwant(it, msg.messageId) }
}

private fun enqueuePrune(peer: PeerHandler, topic: Topic) {
Expand Down

0 comments on commit cfdff90

Please sign in to comment.