From cfdff90a2d9f865bfd065401b55bfdab7587e348 Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Wed, 16 Oct 2024 16:25:06 +0800 Subject: [PATCH 01/11] Don't throw NoPeersForOutboundMessageException if peers DONTWANT message (#385) --- .../io/libp2p/pubsub/gossip/GossipRouter.kt | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt index 191c8b03..d139609f 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt @@ -416,15 +416,20 @@ open class GossipRouter( } .flatten() } - val list = peers - .filterNot { peerDoesNotWantMessage(it, msg.messageId) } - .map { submitPublishMessage(it, msg) } mCache += msg - flushAllPending() - return if (list.isNotEmpty()) { - anyComplete(list) + return if (peers.isNotEmpty()) { + val publishedMessages = peers + .filterNot { peerDoesNotWantMessage(it, msg.messageId) } + .map { submitPublishMessage(it, msg) } + if (publishedMessages.isEmpty()) { + // all peers have sent IDONTWANT for this message id + CompletableFuture.completedFuture(Unit) + } else { + flushAllPending() + anyComplete(publishedMessages) + } } else { completedExceptionally( NoPeersForOutboundMessageException("No peers for message topics ${msg.topics} found") @@ -614,7 +619,7 @@ open class GossipRouter( .flatten() .distinct() .minus(receivedFrom) - .forEach { peer -> sendIdontwant(peer, msg.messageId) } + .forEach { sendIdontwant(it, msg.messageId) } } private fun enqueuePrune(peer: PeerHandler, topic: Topic) { From a00728e3ab469c46f3c915e8f5783aa92377f29e Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Fri, 18 Oct 2024 18:28:46 +0800 Subject: [PATCH 02/11] Send IDONTWANT prior to publish (#386) --- .../io/libp2p/pubsub/gossip/GossipParams.kt | 2 +- .../io/libp2p/pubsub/gossip/GossipRouter.kt | 5 +++-- .../libp2p/pubsub/gossip/GossipV1_2Tests.kt | 21 +++++++++++++++++++ 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt index b2cad191..a2820070 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt @@ -242,7 +242,7 @@ data class GossipParams( * [iDontWantMinMessageSizeThreshold] controls the minimum size (in bytes) that an incoming message needs to be so that an IDONTWANT message is sent to mesh peers. * The default is 16 KB. */ - val iDontWantMinMessageSizeThreshold: Int = 16000, + val iDontWantMinMessageSizeThreshold: Int = 16384, /** * [iDontWantTTL] Expiry time for cache of received IDONTWANT messages for peers diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt index d139609f..10198688 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt @@ -420,6 +420,7 @@ open class GossipRouter( mCache += msg return if (peers.isNotEmpty()) { + iDontWant(msg) val publishedMessages = peers .filterNot { peerDoesNotWantMessage(it, msg.messageId) } .map { submitPublishMessage(it, msg) } @@ -610,7 +611,7 @@ open class GossipRouter( enqueueIwant(peer, messageIds) } - private fun iDontWant(msg: PubsubMessage, receivedFrom: PeerHandler) { + private fun iDontWant(msg: PubsubMessage, receivedFrom: PeerHandler? = null) { if (!this.protocol.supportsIDontWant()) return if (msg.protobufMessage.data.size() < params.iDontWantMinMessageSizeThreshold) return // we need to send IDONTWANT messages to mesh peers immediately in order for them to have an effect @@ -618,7 +619,7 @@ open class GossipRouter( .mapNotNull { mesh[it] } .flatten() .distinct() - .minus(receivedFrom) + .minus(setOfNotNull(receivedFrom)) .forEach { sendIdontwant(it, msg.messageId) } } diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_2Tests.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_2Tests.kt index cc7cd4d5..8ce1919c 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_2Tests.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_2Tests.kt @@ -150,6 +150,27 @@ class GossipV1_2Tests : GossipTestsBase() { assertThat(receivedMessages).containsExactly(msg.protobufMessage) } + @Test + fun iDontWantIsSentOnPublishing() { + val test = startSingleTopicNetwork( + params = GossipParams(iDontWantMinMessageSizeThreshold = 5), + mockRouterCount = 3 + ) + + test.mockRouters.forEach { it.subscribe("topic1") } + val msgToPublish = newMessage("topic1", 0L, "Hello".toByteArray()) + test.gossipRouter.publish(msgToPublish) + test.mockRouters.forEach { + // IDONTWANT is received + it.waitForMessage { msg -> + msg.control.idontwantCount == 1 && + msg.control.idontwantList.first().messageIDsList.map { mIds -> mIds.toWBytes() }.contains(msgToPublish.messageId) + } + // msg is received + it.waitForMessage { msg -> msg.publishCount > 0 } + } + } + private fun startSingleTopicNetwork(params: GossipParams, mockRouterCount: Int): ManyRoutersTest { val test = ManyRoutersTest( protocol = PubsubProtocol.Gossip_V_1_2, From 64bf461879f2d7f349425d563c60be2369f7f0bd Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Mon, 21 Oct 2024 12:36:48 +0200 Subject: [PATCH 03/11] floodPublishMaxMessageSizeThreshold --- .../kotlin/io/libp2p/pubsub/gossip/GossipParams.kt | 14 +++++++++----- .../kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt | 4 +++- .../pubsub/gossip/builders/GossipParamsBuilder.kt | 9 +++++---- .../libp2p/pubsub/gossip/GossipPubsubRouterTest.kt | 4 ++-- .../io/libp2p/pubsub/gossip/GossipV1_1Tests.kt | 14 ++++++++------ .../libp2p/pubsub/gossip/SubscriptionsLimitTest.kt | 2 +- .../io/libp2p/pubsub/gossip/Eth2GossipParams.kt | 2 +- .../io/libp2p/simulate/gossip/Eth2GossipParams.kt | 2 +- .../simulate/main/BlobDecouplingSimulation.kt | 6 +++--- 9 files changed, 33 insertions(+), 24 deletions(-) diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt index a2820070..af2a0217 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt @@ -112,11 +112,13 @@ data class GossipParams( val seenTTL: Duration = 2.minutes, /** - * [floodPublish] is a gossipsub router option that enables flood publishing. - * When this is enabled, published messages are forwarded to all peers with score >= - * to publishThreshold + * [floodPublishMaxMessageSizeThreshold] controls the maximum size (in bytes) a message will be + * published using flood publishing mode. + * When a message size is <= [floodPublishMaxMessageSizeThreshold], published messages are forwarded + * to all peers with score >= to [GossipScoreParams.publishThreshold] + * The default is 0 KiB (never flood publish). */ - val floodPublish: Boolean = false, + val floodPublishMaxMessageSizeThreshold: Int = 0, /** * [gossipFactor] affects how many peers we will emit gossip to at each heartbeat. @@ -240,7 +242,7 @@ data class GossipParams( /** * [iDontWantMinMessageSizeThreshold] controls the minimum size (in bytes) that an incoming message needs to be so that an IDONTWANT message is sent to mesh peers. - * The default is 16 KB. + * The default is 16 KiB. */ val iDontWantMinMessageSizeThreshold: Int = 16384, @@ -260,6 +262,8 @@ data class GossipParams( check(DLow <= D, "DLow should be <= D") check(DHigh >= D, "DHigh should be >= D") check(gossipFactor in 0.0..1.0, "gossipFactor should be in range [0.0, 1.0]") + check(floodPublishMaxMessageSizeThreshold >= 0, "floodPublishMaxMessageSizeThreshold should be >= 0") + check(iDontWantMinMessageSizeThreshold >= 0, "iDontWantMinMessageSizeThreshold should be >= 0") } companion object { diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt index 10198688..287a97b6 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt @@ -400,8 +400,10 @@ open class GossipRouter( override fun broadcastOutbound(msg: PubsubMessage): CompletableFuture { msg.topics.forEach { lastPublished[it] = currentTimeSupplier() } + val floodPublish = msg.protobufMessage.data.size() <= params.floodPublishMaxMessageSizeThreshold + val peers = - if (params.floodPublish) { + if (floodPublish) { msg.topics .flatMap { getTopicPeers(it) } .filter { score.score(it.peerId) >= scoreParams.publishThreshold } diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt index a0f0b18a..a9841271 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt @@ -76,6 +76,8 @@ class GossipParamsBuilder { private var iDontWantMinMessageSizeThreshold: Int? = null + private var floodPublishMaxMessageSizeThreshold: Int? = null + private var iDontWantTTL: Duration? = null init { @@ -90,7 +92,6 @@ class GossipParamsBuilder { this.maxPeersSentInPruneMsg = source.maxPeersSentInPruneMsg this.maxPeersAcceptedInPruneMsg = source.maxPeersAcceptedInPruneMsg this.pruneBackoff = source.pruneBackoff - this.floodPublish = source.floodPublish this.gossipFactor = source.gossipFactor this.opportunisticGraftPeers = source.opportunisticGraftPeers this.opportunisticGraftTicks = source.opportunisticGraftTicks @@ -141,8 +142,6 @@ class GossipParamsBuilder { fun pruneBackoff(value: Duration): GossipParamsBuilder = apply { pruneBackoff = value } - fun floodPublish(value: Boolean): GossipParamsBuilder = apply { floodPublish = value } - fun gossipFactor(value: Double): GossipParamsBuilder = apply { gossipFactor = value } fun opportunisticGraftPeers(value: Int): GossipParamsBuilder = apply { @@ -185,6 +184,8 @@ class GossipParamsBuilder { fun iDontWantMinMessageSizeThreshold(value: Int): GossipParamsBuilder = apply { iDontWantMinMessageSizeThreshold = value } + fun floodPublishMaxMessageSizeThreshold(value: Int): GossipParamsBuilder = apply { floodPublishMaxMessageSizeThreshold = value } + fun iDontWantTTL(value: Duration): GossipParamsBuilder = apply { iDontWantTTL = value } fun build(): GossipParams { @@ -203,7 +204,7 @@ class GossipParamsBuilder { gossipHistoryLength = gossipHistoryLength!!, heartbeatInterval = heartbeatInterval!!, seenTTL = seenTTL!!, - floodPublish = floodPublish!!, + floodPublishMaxMessageSizeThreshold = floodPublishMaxMessageSizeThreshold!!, gossipFactor = gossipFactor!!, opportunisticGraftPeers = opportunisticGraftPeers!!, opportunisticGraftTicks = opportunisticGraftTicks!!, diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt index b63fcb2a..41f5c788 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit class GossipPubsubRouterTest : PubsubRouterTest( createGossipFuzzRouterFactory { - GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublish = false)) + GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublishMaxMessageSizeThreshold = 0)) } ) { @@ -59,7 +59,7 @@ class GossipPubsubRouterTest : PubsubRouterTest( // this is to test ihave/iwant fuzz.timeController.addTime(Duration.ofMillis(1)) - val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublish = false)) } + val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublishMaxMessageSizeThreshold = 0)) } val routerCenter = fuzz.createTestGossipRouter(r) allRouters.add(0, routerCenter) diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt index f0ceb9eb..65696344 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt @@ -530,8 +530,9 @@ class GossipV1_1Tests : GossipTestsBase() { @Test fun testNotFloodPublish() { + val message = newMessage("topic1", 0L, "Hello-0".toByteArray()) val appScore = mutableMapOf().withDefault { 0.0 } - val coreParams = GossipParams(3, 3, 3, floodPublish = false) + val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = message.protobufMessage.data.size() - 1) val peerScoreParams = GossipPeerScoreParams(appSpecificScore = { appScore.getValue(it) }) val scoreParams = GossipScoreParams(peerScoreParams = peerScoreParams) val test = ManyRoutersTest(params = coreParams, scoreParams = scoreParams) @@ -545,7 +546,7 @@ class GossipV1_1Tests : GossipTestsBase() { val topicMesh = test.gossipRouter.mesh["topic1"]!! assertTrue(topicMesh.size > 0 && topicMesh.size < test.routers.size) - test.gossipRouter.publish(newMessage("topic1", 0L, "Hello-0".toByteArray())) + test.gossipRouter.publish(message) test.fuzz.timeController.addTime(50.millis) @@ -557,8 +558,9 @@ class GossipV1_1Tests : GossipTestsBase() { @Test fun testFloodPublish() { + val message = newMessage("topic1", 0L, "Hello-0".toByteArray()) val appScore = mutableMapOf().withDefault { 0.0 } - val coreParams = GossipParams(3, 3, 3, floodPublish = true) + val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = message.protobufMessage.data.size()) val peerScoreParams = GossipPeerScoreParams( appSpecificScore = { appScore.getValue(it) }, appSpecificWeight = 1.0 @@ -580,7 +582,7 @@ class GossipV1_1Tests : GossipTestsBase() { val topicMesh = test.gossipRouter.mesh["topic1"]!!.map { it.peerId } assertTrue(topicMesh.size > 0 && topicMesh.size < test.routers.size) - test.gossipRouter.publish(newMessage("topic1", 0L, "Hello-0".toByteArray())) + test.gossipRouter.publish(message) test.fuzz.timeController.addTime(50.millis) @@ -650,7 +652,7 @@ class GossipV1_1Tests : GossipTestsBase() { 3, 3, DLazy = 3, - floodPublish = false, + floodPublishMaxMessageSizeThreshold = 0, gossipFactor = 0.5 ) val peerScoreParams = GossipPeerScoreParams( @@ -714,7 +716,7 @@ class GossipV1_1Tests : GossipTestsBase() { @Test fun testOutboundMeshQuotas1() { val appScore = mutableMapOf().withDefault { 0.0 } - val coreParams = GossipParams(3, 3, 3, DLazy = 3, DOut = 1, floodPublish = false) + val coreParams = GossipParams(3, 3, 3, DLazy = 3, DOut = 1, floodPublishMaxMessageSizeThreshold = 0) val peerScoreParams = GossipPeerScoreParams(appSpecificScore = { appScore.getValue(it) }) val scoreParams = GossipScoreParams(peerScoreParams = peerScoreParams) val test = ManyRoutersTest(params = coreParams, scoreParams = scoreParams) diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt index 28b4a6f6..1e78107b 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt @@ -10,7 +10,7 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow class SubscriptionsLimitTest : TwoGossipHostTestBase() { - override val params = GossipParams(maxSubscriptions = 5, floodPublish = true) + override val params = GossipParams(maxSubscriptions = 5, floodPublishMaxMessageSizeThreshold = 0) @Test fun `new peer subscribed to many topics`() { diff --git a/libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/gossip/Eth2GossipParams.kt b/libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/gossip/Eth2GossipParams.kt index 25aace38..2cdffd04 100644 --- a/libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/gossip/Eth2GossipParams.kt +++ b/libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/gossip/Eth2GossipParams.kt @@ -20,7 +20,7 @@ val Eth2DefaultGossipParams = GossipParams( DLazy = 8, pruneBackoff = 1.minutes, - floodPublish = true, + floodPublishMaxMessageSizeThreshold = 0, gossipFactor = 0.25, DScore = 4, DOut = 2, diff --git a/tools/simulator/src/main/kotlin/io/libp2p/simulate/gossip/Eth2GossipParams.kt b/tools/simulator/src/main/kotlin/io/libp2p/simulate/gossip/Eth2GossipParams.kt index e65abf01..8d47c0e6 100644 --- a/tools/simulator/src/main/kotlin/io/libp2p/simulate/gossip/Eth2GossipParams.kt +++ b/tools/simulator/src/main/kotlin/io/libp2p/simulate/gossip/Eth2GossipParams.kt @@ -25,7 +25,7 @@ val Eth2DefaultGossipParams = GossipParams( DLazy = 8, pruneBackoff = 1.minutes, - floodPublish = true, + floodPublishMaxMessageSizeThreshold = 16384, gossipFactor = 0.25, DScore = 4, DOut = 2, diff --git a/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt b/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt index 997a917b..3fb00355 100644 --- a/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt +++ b/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt @@ -41,7 +41,7 @@ class BlobDecouplingSimulation( val randomSeed: Long = 3L, val rnd: Random = Random(randomSeed), - val floodPublish: Boolean = true, + val floodPublishMaxMessageSizeThreshold: Int = 0, val sendingPeerBand: Bandwidth = Bandwidth.mbitsPerSec(100), @@ -85,7 +85,7 @@ class BlobDecouplingSimulation( val gossipParams = Eth2DefaultGossipParams .copy( // heartbeatInterval = 1.minutes - floodPublish = floodPublish + floodPublishMaxMessageSizeThreshold = floodPublishMaxMessageSizeThreshold ) val gossipScoreParams = Eth2DefaultScoreParams val gossipRouterCtor = { _: Int -> @@ -294,7 +294,7 @@ fun main() { // logger = {}, nodeCount = 1000, peerBands = band, - floodPublish = false, + floodPublishMaxMessageSizeThreshold = 0, // randomSeed = 2 ) From 20ab78cde3c103e42c23b17e3747325ae71710c8 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Mon, 21 Oct 2024 12:47:35 +0200 Subject: [PATCH 04/11] align param values --- .../kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt | 2 +- .../kotlin/io/libp2p/pubsub/gossip/Eth2GossipParams.kt | 2 +- .../kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt index 1e78107b..141d6c43 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt @@ -10,7 +10,7 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow class SubscriptionsLimitTest : TwoGossipHostTestBase() { - override val params = GossipParams(maxSubscriptions = 5, floodPublishMaxMessageSizeThreshold = 0) + override val params = GossipParams(maxSubscriptions = 5, floodPublishMaxMessageSizeThreshold = 16384) @Test fun `new peer subscribed to many topics`() { diff --git a/libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/gossip/Eth2GossipParams.kt b/libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/gossip/Eth2GossipParams.kt index 2cdffd04..4eb050f5 100644 --- a/libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/gossip/Eth2GossipParams.kt +++ b/libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/gossip/Eth2GossipParams.kt @@ -20,7 +20,7 @@ val Eth2DefaultGossipParams = GossipParams( DLazy = 8, pruneBackoff = 1.minutes, - floodPublishMaxMessageSizeThreshold = 0, + floodPublishMaxMessageSizeThreshold = 16384, gossipFactor = 0.25, DScore = 4, DOut = 2, diff --git a/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt b/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt index 3fb00355..5112f5f6 100644 --- a/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt +++ b/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt @@ -41,7 +41,7 @@ class BlobDecouplingSimulation( val randomSeed: Long = 3L, val rnd: Random = Random(randomSeed), - val floodPublishMaxMessageSizeThreshold: Int = 0, + val floodPublishMaxMessageSizeThreshold: Int = 16384, val sendingPeerBand: Bandwidth = Bandwidth.mbitsPerSec(100), From 3553637aa76b81799ecb0b103535a9cab46a2dc5 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Mon, 21 Oct 2024 12:50:22 +0200 Subject: [PATCH 05/11] fix param builder --- .../io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt index a9841271..2f107d9f 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt @@ -40,8 +40,6 @@ class GossipParamsBuilder { private var pruneBackoff: Duration? = null - private var floodPublish: Boolean? = null - private var gossipFactor: Double? = null private var opportunisticGraftPeers: Int? = null @@ -253,7 +251,7 @@ class GossipParamsBuilder { check(seenTTL != null, { "seenTTL must not be null" }) check(maxPeersSentInPruneMsg != null, { "maxPeersSentInPruneMsg must not be null" }) check(pruneBackoff != null, { "pruneBackoff must not be null" }) - check(floodPublish != null, { "floodPublish must not be null" }) + check(floodPublishMaxMessageSizeThreshold != null, { "floodPublishMaxMessageSizeThreshold must not be null" }) check(gossipFactor != null, { "gossipFactor must not be null" }) check(opportunisticGraftPeers != null, { "opportunisticGraftPeers must not be null" }) check(opportunisticGraftTicks != null, { "opportunisticGraftTicks must not be null" }) From 3805ba744015a374e9312e1418c453ccc9f86807 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Mon, 21 Oct 2024 13:04:16 +0200 Subject: [PATCH 06/11] fix builder --- .../io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt index 2f107d9f..9696a8f8 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt @@ -90,6 +90,7 @@ class GossipParamsBuilder { this.maxPeersSentInPruneMsg = source.maxPeersSentInPruneMsg this.maxPeersAcceptedInPruneMsg = source.maxPeersAcceptedInPruneMsg this.pruneBackoff = source.pruneBackoff + this.floodPublishMaxMessageSizeThreshold = source.floodPublishMaxMessageSizeThreshold this.gossipFactor = source.gossipFactor this.opportunisticGraftPeers = source.opportunisticGraftPeers this.opportunisticGraftTicks = source.opportunisticGraftTicks From bd50c9cf612573c496cd727cd551df0ea3066f2d Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Mon, 21 Oct 2024 15:04:34 +0200 Subject: [PATCH 07/11] add constants and PubsubMessage size method --- .../src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt | 3 +++ .../kotlin/io/libp2p/pubsub/gossip/GossipParams.kt | 11 +++++++++-- .../kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt | 4 ++-- .../io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt | 4 ++-- .../kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt | 4 ++-- 5 files changed, 18 insertions(+), 8 deletions(-) diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt index ae3d94b1..af3f2d72 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt @@ -22,6 +22,9 @@ interface PubsubMessage { val topics: List get() = protobufMessage.topicIDsList + val size: Int + get() = protobufMessage.data.size() + fun messageSha256() = sha256(protobufMessage.toByteArray()) override fun equals(other: Any?): Boolean diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt index af2a0217..654683ab 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt @@ -22,6 +22,10 @@ fun defaultDLazy(D: Int) = D fun defaultDScore(D: Int) = D * 2 / 3 fun defaultDOut(D: Int, DLow: Int) = min(D / 2, max(DLow - 1, 0)) +// floodPublishMaxMessageSizeThreshold shortcuts +const val NEVER_FLOOD_PUBLISH = 0 +const val ALWAYS_FLOOD_PUBLISH = Int.MAX_VALUE + /** * Parameters of Gossip 1.1 router */ @@ -116,9 +120,12 @@ data class GossipParams( * published using flood publishing mode. * When a message size is <= [floodPublishMaxMessageSizeThreshold], published messages are forwarded * to all peers with score >= to [GossipScoreParams.publishThreshold] - * The default is 0 KiB (never flood publish). + * + * [NEVER_FLOOD_PUBLISH] and [ALWAYS_FLOOD_PUBLISH] can be used as shortcuts. + * + * The default is [NEVER_FLOOD_PUBLISH] (0 KiB). */ - val floodPublishMaxMessageSizeThreshold: Int = 0, + val floodPublishMaxMessageSizeThreshold: Int = NEVER_FLOOD_PUBLISH, /** * [gossipFactor] affects how many peers we will emit gossip to at each heartbeat. diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt index 287a97b6..fc03490f 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt @@ -400,7 +400,7 @@ open class GossipRouter( override fun broadcastOutbound(msg: PubsubMessage): CompletableFuture { msg.topics.forEach { lastPublished[it] = currentTimeSupplier() } - val floodPublish = msg.protobufMessage.data.size() <= params.floodPublishMaxMessageSizeThreshold + val floodPublish = msg.size <= params.floodPublishMaxMessageSizeThreshold val peers = if (floodPublish) { @@ -615,7 +615,7 @@ open class GossipRouter( private fun iDontWant(msg: PubsubMessage, receivedFrom: PeerHandler? = null) { if (!this.protocol.supportsIDontWant()) return - if (msg.protobufMessage.data.size() < params.iDontWantMinMessageSizeThreshold) return + if (msg.size < params.iDontWantMinMessageSizeThreshold) return // we need to send IDONTWANT messages to mesh peers immediately in order for them to have an effect msg.topics .mapNotNull { mesh[it] } diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt index 41f5c788..80297d12 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit class GossipPubsubRouterTest : PubsubRouterTest( createGossipFuzzRouterFactory { - GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublishMaxMessageSizeThreshold = 0)) + GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublishMaxMessageSizeThreshold = NEVER_FLOOD_PUBLISH)) } ) { @@ -59,7 +59,7 @@ class GossipPubsubRouterTest : PubsubRouterTest( // this is to test ihave/iwant fuzz.timeController.addTime(Duration.ofMillis(1)) - val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublishMaxMessageSizeThreshold = 0)) } + val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublishMaxMessageSizeThreshold = NEVER_FLOOD_PUBLISH)) } val routerCenter = fuzz.createTestGossipRouter(r) allRouters.add(0, routerCenter) diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt index 65696344..7843860a 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt @@ -532,7 +532,7 @@ class GossipV1_1Tests : GossipTestsBase() { fun testNotFloodPublish() { val message = newMessage("topic1", 0L, "Hello-0".toByteArray()) val appScore = mutableMapOf().withDefault { 0.0 } - val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = message.protobufMessage.data.size() - 1) + val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = message.size - 1) val peerScoreParams = GossipPeerScoreParams(appSpecificScore = { appScore.getValue(it) }) val scoreParams = GossipScoreParams(peerScoreParams = peerScoreParams) val test = ManyRoutersTest(params = coreParams, scoreParams = scoreParams) @@ -560,7 +560,7 @@ class GossipV1_1Tests : GossipTestsBase() { fun testFloodPublish() { val message = newMessage("topic1", 0L, "Hello-0".toByteArray()) val appScore = mutableMapOf().withDefault { 0.0 } - val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = message.protobufMessage.data.size()) + val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = message.size) val peerScoreParams = GossipPeerScoreParams( appSpecificScore = { appScore.getValue(it) }, appSpecificWeight = 1.0 From 851d2c648ddde6765e3714beac9226a9cc5d03be Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Mon, 21 Oct 2024 15:16:35 +0200 Subject: [PATCH 08/11] spotless --- libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt index af3f2d72..c960fdb6 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt @@ -23,7 +23,7 @@ interface PubsubMessage { get() = protobufMessage.topicIDsList val size: Int - get() = protobufMessage.data.size() + get() = protobufMessage.data.size() fun messageSha256() = sha256(protobufMessage.toByteArray()) From 88fe8aefe640cea8780c87445daa5706689d9339 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Mon, 21 Oct 2024 15:33:54 +0200 Subject: [PATCH 09/11] cleanups --- .../test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt | 2 +- .../io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt | 2 +- .../io/libp2p/simulate/main/BlobDecouplingSimulation.kt | 7 ------- 3 files changed, 2 insertions(+), 9 deletions(-) diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt index 7843860a..0ef84747 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt @@ -716,7 +716,7 @@ class GossipV1_1Tests : GossipTestsBase() { @Test fun testOutboundMeshQuotas1() { val appScore = mutableMapOf().withDefault { 0.0 } - val coreParams = GossipParams(3, 3, 3, DLazy = 3, DOut = 1, floodPublishMaxMessageSizeThreshold = 0) + val coreParams = GossipParams(3, 3, 3, DLazy = 3, DOut = 1, floodPublishMaxMessageSizeThreshold = NEVER_FLOOD_PUBLISH) val peerScoreParams = GossipPeerScoreParams(appSpecificScore = { appScore.getValue(it) }) val scoreParams = GossipScoreParams(peerScoreParams = peerScoreParams) val test = ManyRoutersTest(params = coreParams, scoreParams = scoreParams) diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt index 141d6c43..bc1f02fa 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt @@ -10,7 +10,7 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow class SubscriptionsLimitTest : TwoGossipHostTestBase() { - override val params = GossipParams(maxSubscriptions = 5, floodPublishMaxMessageSizeThreshold = 16384) + override val params = GossipParams(maxSubscriptions = 5, floodPublishMaxMessageSizeThreshold = ALWAYS_FLOOD_PUBLISH) @Test fun `new peer subscribed to many topics`() { diff --git a/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt b/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt index 5112f5f6..ca45433c 100644 --- a/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt +++ b/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt @@ -41,8 +41,6 @@ class BlobDecouplingSimulation( val randomSeed: Long = 3L, val rnd: Random = Random(randomSeed), - val floodPublishMaxMessageSizeThreshold: Int = 16384, - val sendingPeerBand: Bandwidth = Bandwidth.mbitsPerSec(100), val peerBands: Iterator = iterator { @@ -83,10 +81,6 @@ class BlobDecouplingSimulation( ) val gossipParams = Eth2DefaultGossipParams - .copy( -// heartbeatInterval = 1.minutes - floodPublishMaxMessageSizeThreshold = floodPublishMaxMessageSizeThreshold - ) val gossipScoreParams = Eth2DefaultScoreParams val gossipRouterCtor = { _: Int -> SimGossipRouterBuilder().also { @@ -294,7 +288,6 @@ fun main() { // logger = {}, nodeCount = 1000, peerBands = band, - floodPublishMaxMessageSizeThreshold = 0, // randomSeed = 2 ) From 269af486859c6c2486878bae1ab3b04e83a013a2 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Tue, 22 Oct 2024 16:27:12 +0200 Subject: [PATCH 10/11] Logging and other small warnings removal (#392) --- .../kotlin/io/libp2p/pubsub/AbstractRouter.kt | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt index c5ad1f9d..d5e16401 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt @@ -85,7 +85,6 @@ abstract class AbstractRouter( /** * Flushes all pending message parts for all peers - * @see addPendingRpcPart */ protected fun flushAllPending() { pendingRpcParts.pendingPeers.forEach(::flushPending) @@ -163,7 +162,7 @@ abstract class AbstractRouter( // Validate message if (!validateMessageListLimits(msg)) { - logger.debug("Dropping msg with lists exceeding limits from peer $peer") + logger.debug("Dropping msg with lists exceeding limits from peer {}", peer) return } @@ -173,7 +172,7 @@ abstract class AbstractRouter( .filterIncomingSubscriptions(subscriptions, peersTopics.getByFirst(peer)) .forEach { handleMessageSubscriptions(peer, it) } } catch (e: Exception) { - logger.debug("Subscription filter error, ignoring message from peer $peer", e) + logger.debug("Subscription filter error, ignoring message from peer {}", peer, e) return } @@ -182,7 +181,7 @@ abstract class AbstractRouter( } val (msgSubscribed, nonSubscribed) = msg.publishList - .partition { it.topicIDsList.any { it in subscribedTopics } } + .partition { rpcMsg -> rpcMsg.topicIDsList.any { it in subscribedTopics } } nonSubscribed.forEach { notifyNonSubscribedMessage(peer, it) } @@ -207,7 +206,7 @@ abstract class AbstractRouter( messageValidator.validate(it) true } catch (e: Exception) { - logger.debug("Invalid pubsub message from peer $peer: $it", e) + logger.debug("Invalid pubsub message from peer {}: {}", peer, it, e) seenMessages[it] = Optional.of(ValidationResult.Invalid) notifyUnseenInvalidMessage(peer, it) false @@ -248,8 +247,16 @@ abstract class AbstractRouter( { res, err -> when { err != null -> logger.warn("Exception while handling message from peer $peer: ${it.first}", err) - res == ValidationResult.Invalid -> logger.debug("Invalid pubsub message from peer $peer: ${it.first}") - res == ValidationResult.Ignore -> logger.trace("Ignoring pubsub message from peer $peer: ${it.first}") + res == ValidationResult.Invalid -> logger.debug( + "Invalid pubsub message from peer {}: {}", + peer, + it.first + ) + res == ValidationResult.Ignore -> logger.trace( + "Ignoring pubsub message from peer {}: {}", + peer, + it.first + ) else -> { newValidatedMessages(singletonList(it.first), peer) flushAllPending() @@ -273,15 +280,15 @@ abstract class AbstractRouter( override fun onPeerWireException(peer: PeerHandler?, cause: Throwable) { // exception occurred in protobuf decoders - logger.debug("Malformed message from $peer : $cause") + logger.debug("Malformed message from {} : {}", peer, cause) peer?.also { notifyMalformedMessage(it) } } override fun onServiceException(peer: PeerHandler?, msg: Any?, cause: Throwable) { if (cause is BadPeerException) { - logger.debug("Remote peer ($peer) misbehaviour on message $msg: $cause") + logger.debug("Remote peer ({}) misbehaviour on message {} : {}", peer, msg, cause) } else { - logger.warn("AbstractRouter internal error on message $msg from peer $peer", cause) + logger.warn("AbstractRouter internal error on message {} from peer {}", msg, peer, cause) } } From 1cde874089d45f90e2f6ca20f39a77b69cd7e66e Mon Sep 17 00:00:00 2001 From: Anton Nashatyrev Date: Tue, 22 Oct 2024 19:08:02 +0400 Subject: [PATCH 11/11] Gossip: more reliable publishing (#387) * When publishing take extra nodes out of mesh when mesh is not large enough * Add tests for publish extra nodes * When publishing prioritize back up non-mesh peers with acceptable score --- .../io/libp2p/pubsub/gossip/GossipRouter.kt | 36 ++- .../libp2p/pubsub/gossip/GossipV1_1Tests.kt | 210 ++++++++++++++++-- 2 files changed, 224 insertions(+), 22 deletions(-) diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt index fc03490f..1bd43193 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt @@ -281,6 +281,7 @@ open class GossipRouter( when { isDirect(peer) -> prune(peer, topic) + isBackOff(peer, topic) -> { notifyRouterMisbehavior(peer, 1) if (isBackOffFlood(peer, topic)) { @@ -288,10 +289,13 @@ open class GossipRouter( } prune(peer, topic) } + score.score(peer.peerId) < 0 -> prune(peer, topic) + meshPeers.size >= params.DHigh && !peer.isOutbound() -> prune(peer, topic) + peer !in meshPeers -> graft(peer, topic) } @@ -410,11 +414,35 @@ open class GossipRouter( .plus(getDirectPeers()) } else { msg.topics - .mapNotNull { topic -> - mesh[topic] ?: fanout[topic] ?: getTopicPeers(topic).shuffled(random).take(params.D) - .also { - if (it.isNotEmpty()) fanout[topic] = it.toMutableSet() + .map { topic -> + val topicMeshPeers = mesh[topic] + if (topicMeshPeers != null) { + // we are subscribed to the topic + if (topicMeshPeers.size < params.D) { + // we need extra non-mesh peers for more reliable publishing + val nonMeshTopicPeers = getTopicPeers(topic) - topicMeshPeers + val (nonMeshTopicPeersAbovePublishThreshold, nonMeshTopicPeersBelowPublishThreshold) = + nonMeshTopicPeers.partition { score.score(it.peerId) >= scoreParams.publishThreshold } + // this deviates from the original spec but we want at least D peers for publishing + // prioritizing mesh peers, then non-mesh peers with acceptable score, + // and then underscored non-mesh peers as a last resort + listOf( + topicMeshPeers, + nonMeshTopicPeersAbovePublishThreshold.shuffled(random), + nonMeshTopicPeersBelowPublishThreshold.shuffled(random) + ) + .flatten() + .take(params.D) + } else { + topicMeshPeers } + } else { + // we are not subscribed to the topic + fanout[topic] ?: getTopicPeers(topic).shuffled(random).take(params.D) + .also { + if (it.isNotEmpty()) fanout[topic] = it.toMutableSet() + } + } } .flatten() } diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt index 0ef84747..dc45bf63 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt @@ -125,6 +125,7 @@ class GossipV1_1Tests : GossipTestsBase() { super.initChannelWithHandler(streamHandler, handler) } } + val test = TwoRoutersTest(mockRouterFactory = { exec, _, _ -> MalformedMockRouter(exec) }) val mockRouter = test.router2.router as MalformedMockRouter @@ -1128,34 +1129,207 @@ class GossipV1_1Tests : GossipTestsBase() { // 2 heartbeats - the topic should be GRAFTed test.fuzz.timeController.addTime(2.seconds) - fun createPruneMessage(peersCount: Int): Rpc.RPC { - val peerInfos = List(peersCount) { - Rpc.PeerInfo.newBuilder() - .setPeerID(PeerId.random().bytes.toProtobuf()) - .setSignedPeerRecord(ByteString.EMPTY) - .build() - } - return Rpc.RPC.newBuilder().setControl( - Rpc.ControlMessage.newBuilder().addPrune( - Rpc.ControlPrune.newBuilder() - .setTopicID(topic) - .addAllPeers(peerInfos) - ) - ).build() - } - test.mockRouter.sendToSingle( - createPruneMessage(test.gossipRouter.params.maxPeersAcceptedInPruneMsg + 1) + createPruneMessage(topic, test.gossipRouter.params.maxPeersAcceptedInPruneMsg + 1) ) // prune message should be dropped because too many peers assertEquals(1, test.gossipRouter.mesh[topic]!!.size) test.mockRouter.sendToSingle( - createPruneMessage(test.gossipRouter.params.maxPeersAcceptedInPruneMsg) + createPruneMessage(topic, test.gossipRouter.params.maxPeersAcceptedInPruneMsg) ) // prune message should now be processed assertEquals(0, test.gossipRouter.mesh[topic]!!.size) } + + @Test + fun `when a peer leaves the mesh it should still be considered for publishing`() { + val test = TwoRoutersTest() + val topic = "topic1" + + test.mockRouter.subscribe(topic) + test.gossipRouter.subscribe(topic) + + // 2 heartbeats - the topic should be GRAFTed + test.fuzz.timeController.addTime(2.seconds) + + assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == 1) + + // remote peer leaves the mesh + test.mockRouter.sendToSingle(createPruneMessage(topic)) + test.fuzz.timeController.addTime(1.seconds) + + assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == 0) + + val message1 = newMessage(topic, 0L, "Hello-0".toByteArray()) + test.gossipRouter.publish(message1) + + test.mockRouter.waitForMessage { it.publishCount > 0 } + } + + @Test + fun `should publish to all mesh peers when mesh exceeds D`() { + val gossipParams = GossipParams(D = 6, DHigh = 10) + val test = ManyRoutersTest(params = gossipParams, mockRouterCount = gossipParams.DHigh) + val topic = "topic1" + test.connectAll() + + test.mockRouters.forEach { + it.subscribe(topic) + } + test.gossipRouter.subscribe(topic) + + // 2 heartbeats - the topic should be GRAFTed + test.fuzz.timeController.addTime(2.seconds) + + assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == gossipParams.D) + + test.mockRouters.forEach { + it.sendToSingle(createGraftMessage(topic)) + } + + test.fuzz.timeController.addTime(2.seconds) + + assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == gossipParams.DHigh) + + // remote peer leaves the mesh + val message1 = newMessage(topic, 0L, "Hello-0".toByteArray()) + test.gossipRouter.publish(message1) + + val routerReceivedMessageCount = + test.mockRouters.count { mockRouter -> + mockRouter.inboundMessages.any { msg -> + msg.publishCount > 0 + } + } + + assertTrue(routerReceivedMessageCount == gossipParams.DHigh) + } + + @Test + fun `publishing should collect at least D peers if mesh is smaller`() { + val params = GossipParams() + + val test = ManyRoutersTest(params = params, mockRouterCount = params.D) + val topic = "topic1" + test.connectAll() + + test.mockRouters.forEach { it.subscribe(topic) } + test.gossipRouter.subscribe(topic) + + // 2 heartbeats - the topic should be GRAFTed + test.fuzz.timeController.addTime(2.seconds) + + val topicMeshRouters = test.gossipRouter.mesh[topic]!! + assertTrue((topicMeshRouters.size) >= params.DLow) + + // leave just 2 peers in the mesh + topicMeshRouters.drop(2) + .forEach { + test.getMockRouter(it.peerId).sendToSingle(createPruneMessage(topic)) + } + test.fuzz.timeController.addTime(1.seconds) + + assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == 2) + + val message1 = newMessage(topic, 0L, "Hello-0".toByteArray()) + test.gossipRouter.publish(message1) + + val routerReceivedMessageCount = + test.mockRouters.count { mockRouter -> + mockRouter.inboundMessages.any { msg -> + msg.publishCount > 0 + } + } + + assertTrue(routerReceivedMessageCount >= params.D) + } + + @Test + fun `publishing should collect at least D peers if mesh is smaller and prefer well scored peers`() { + val params = GossipParams() + val peerAppScores = mutableMapOf() + val gossipScoreParams = GossipScoreParams( + peerScoreParams = GossipPeerScoreParams( + appSpecificScore = { + peerAppScores[it]?.toDouble() ?: 0.0 + }, + appSpecificWeight = 1.0 + ) + ) + + val test = ManyRoutersTest(params = params, scoreParams = gossipScoreParams, mockRouterCount = 10) + val topic = "topic1" + test.connectAll() + + test.mockRouters.forEach { it.subscribe(topic) } + test.gossipRouter.subscribe(topic) + + // 2 heartbeats - the topic should be GRAFTed + test.fuzz.timeController.addTime(2.seconds) + + val topicMeshRouters = test.gossipRouter.mesh[topic]!!.toList() + assertTrue((topicMeshRouters.size) == params.D) + + // leave just 2 peers in the mesh + topicMeshRouters.drop(2) + .forEach { + test.getMockRouter(it.peerId).sendToSingle(createPruneMessage(topic)) + } + // downscore all peers except 5 + val goodScoredPeers = topicMeshRouters.take(5).map { it.peerId }.toSet() + test.routers + .map { it.peerId } + .filter { it !in goodScoredPeers } + .forEach { peerAppScores[it] = -gossipScoreParams.publishThreshold.toInt() - 1 } + + // for D = 6: 2 peers in the mesh + 3 peers outside of mesh + others are significantly downscored + test.fuzz.timeController.addTime(1.seconds) + + assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == 2) + + val message1 = newMessage(topic, 0L, "Hello-0".toByteArray()) + test.gossipRouter.publish(message1) + + // router should take 2 mesh peers, 3 well scored peers and 1 peer scored below publishThreshold + val peersReceivedMessage = test.routers + .filter { + val mockRouter = it.router as MockRouter + mockRouter.inboundMessages.any { msg -> + msg.publishCount > 0 + } + } + .map { it.peerId } + + assertTrue(peersReceivedMessage.size == params.D) + assertTrue(peersReceivedMessage.containsAll(goodScoredPeers)) + } + + private fun createGraftMessage(topic: String): Rpc.RPC { + return Rpc.RPC.newBuilder().setControl( + Rpc.ControlMessage.newBuilder().addGraft( + Rpc.ControlGraft.newBuilder() + .setTopicID(topic) + ) + ).build() + } + + private fun createPruneMessage(topic: String, pxPeersCount: Int = 0): Rpc.RPC { + val peerInfos = List(pxPeersCount) { + Rpc.PeerInfo.newBuilder() + .setPeerID(PeerId.random().bytes.toProtobuf()) + .setSignedPeerRecord(ByteString.EMPTY) + .build() + } + return Rpc.RPC.newBuilder().setControl( + Rpc.ControlMessage.newBuilder().addPrune( + Rpc.ControlPrune.newBuilder() + .setTopicID(topic) + .setBackoff(10) + .addAllPeers(peerInfos) + ) + ).build() + } }