Skip to content

Commit

Permalink
Send IDONTWANT prior to publish (#386)
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov authored Oct 18, 2024
1 parent cfdff90 commit a00728e
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ data class GossipParams(
* [iDontWantMinMessageSizeThreshold] controls the minimum size (in bytes) that an incoming message needs to be so that an IDONTWANT message is sent to mesh peers.
* The default is 16 KB.
*/
val iDontWantMinMessageSizeThreshold: Int = 16000,
val iDontWantMinMessageSizeThreshold: Int = 16384,

/**
* [iDontWantTTL] Expiry time for cache of received IDONTWANT messages for peers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ open class GossipRouter(
mCache += msg

return if (peers.isNotEmpty()) {
iDontWant(msg)
val publishedMessages = peers
.filterNot { peerDoesNotWantMessage(it, msg.messageId) }
.map { submitPublishMessage(it, msg) }
Expand Down Expand Up @@ -610,15 +611,15 @@ open class GossipRouter(
enqueueIwant(peer, messageIds)
}

private fun iDontWant(msg: PubsubMessage, receivedFrom: PeerHandler) {
private fun iDontWant(msg: PubsubMessage, receivedFrom: PeerHandler? = null) {
if (!this.protocol.supportsIDontWant()) return
if (msg.protobufMessage.data.size() < params.iDontWantMinMessageSizeThreshold) return
// we need to send IDONTWANT messages to mesh peers immediately in order for them to have an effect
msg.topics
.mapNotNull { mesh[it] }
.flatten()
.distinct()
.minus(receivedFrom)
.minus(setOfNotNull(receivedFrom))
.forEach { sendIdontwant(it, msg.messageId) }
}

Expand Down
21 changes: 21 additions & 0 deletions libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_2Tests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,27 @@ class GossipV1_2Tests : GossipTestsBase() {
assertThat(receivedMessages).containsExactly(msg.protobufMessage)
}

@Test
fun iDontWantIsSentOnPublishing() {
val test = startSingleTopicNetwork(
params = GossipParams(iDontWantMinMessageSizeThreshold = 5),
mockRouterCount = 3
)

test.mockRouters.forEach { it.subscribe("topic1") }
val msgToPublish = newMessage("topic1", 0L, "Hello".toByteArray())
test.gossipRouter.publish(msgToPublish)
test.mockRouters.forEach {
// IDONTWANT is received
it.waitForMessage { msg ->
msg.control.idontwantCount == 1 &&
msg.control.idontwantList.first().messageIDsList.map { mIds -> mIds.toWBytes() }.contains(msgToPublish.messageId)
}
// msg is received
it.waitForMessage { msg -> msg.publishCount > 0 }
}
}

private fun startSingleTopicNetwork(params: GossipParams, mockRouterCount: Int): ManyRoutersTest {
val test = ManyRoutersTest(
protocol = PubsubProtocol.Gossip_V_1_2,
Expand Down

0 comments on commit a00728e

Please sign in to comment.