diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt index b2cad191..a2820070 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt @@ -242,7 +242,7 @@ data class GossipParams( * [iDontWantMinMessageSizeThreshold] controls the minimum size (in bytes) that an incoming message needs to be so that an IDONTWANT message is sent to mesh peers. * The default is 16 KB. */ - val iDontWantMinMessageSizeThreshold: Int = 16000, + val iDontWantMinMessageSizeThreshold: Int = 16384, /** * [iDontWantTTL] Expiry time for cache of received IDONTWANT messages for peers diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt index d139609f..10198688 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt @@ -420,6 +420,7 @@ open class GossipRouter( mCache += msg return if (peers.isNotEmpty()) { + iDontWant(msg) val publishedMessages = peers .filterNot { peerDoesNotWantMessage(it, msg.messageId) } .map { submitPublishMessage(it, msg) } @@ -610,7 +611,7 @@ open class GossipRouter( enqueueIwant(peer, messageIds) } - private fun iDontWant(msg: PubsubMessage, receivedFrom: PeerHandler) { + private fun iDontWant(msg: PubsubMessage, receivedFrom: PeerHandler? = null) { if (!this.protocol.supportsIDontWant()) return if (msg.protobufMessage.data.size() < params.iDontWantMinMessageSizeThreshold) return // we need to send IDONTWANT messages to mesh peers immediately in order for them to have an effect @@ -618,7 +619,7 @@ open class GossipRouter( .mapNotNull { mesh[it] } .flatten() .distinct() - .minus(receivedFrom) + .minus(setOfNotNull(receivedFrom)) .forEach { sendIdontwant(it, msg.messageId) } } diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_2Tests.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_2Tests.kt index cc7cd4d5..8ce1919c 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_2Tests.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_2Tests.kt @@ -150,6 +150,27 @@ class GossipV1_2Tests : GossipTestsBase() { assertThat(receivedMessages).containsExactly(msg.protobufMessage) } + @Test + fun iDontWantIsSentOnPublishing() { + val test = startSingleTopicNetwork( + params = GossipParams(iDontWantMinMessageSizeThreshold = 5), + mockRouterCount = 3 + ) + + test.mockRouters.forEach { it.subscribe("topic1") } + val msgToPublish = newMessage("topic1", 0L, "Hello".toByteArray()) + test.gossipRouter.publish(msgToPublish) + test.mockRouters.forEach { + // IDONTWANT is received + it.waitForMessage { msg -> + msg.control.idontwantCount == 1 && + msg.control.idontwantList.first().messageIDsList.map { mIds -> mIds.toWBytes() }.contains(msgToPublish.messageId) + } + // msg is received + it.waitForMessage { msg -> msg.publishCount > 0 } + } + } + private fun startSingleTopicNetwork(params: GossipParams, mockRouterCount: Int): ManyRoutersTest { val test = ManyRoutersTest( protocol = PubsubProtocol.Gossip_V_1_2,