From 64bf461879f2d7f349425d563c60be2369f7f0bd Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Mon, 21 Oct 2024 12:36:48 +0200 Subject: [PATCH 1/7] floodPublishMaxMessageSizeThreshold --- .../kotlin/io/libp2p/pubsub/gossip/GossipParams.kt | 14 +++++++++----- .../kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt | 4 +++- .../pubsub/gossip/builders/GossipParamsBuilder.kt | 9 +++++---- .../libp2p/pubsub/gossip/GossipPubsubRouterTest.kt | 4 ++-- .../io/libp2p/pubsub/gossip/GossipV1_1Tests.kt | 14 ++++++++------ .../libp2p/pubsub/gossip/SubscriptionsLimitTest.kt | 2 +- .../io/libp2p/pubsub/gossip/Eth2GossipParams.kt | 2 +- .../io/libp2p/simulate/gossip/Eth2GossipParams.kt | 2 +- .../simulate/main/BlobDecouplingSimulation.kt | 6 +++--- 9 files changed, 33 insertions(+), 24 deletions(-) diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt index a28200708..af2a02173 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt @@ -112,11 +112,13 @@ data class GossipParams( val seenTTL: Duration = 2.minutes, /** - * [floodPublish] is a gossipsub router option that enables flood publishing. - * When this is enabled, published messages are forwarded to all peers with score >= - * to publishThreshold + * [floodPublishMaxMessageSizeThreshold] controls the maximum size (in bytes) a message will be + * published using flood publishing mode. + * When a message size is <= [floodPublishMaxMessageSizeThreshold], published messages are forwarded + * to all peers with score >= to [GossipScoreParams.publishThreshold] + * The default is 0 KiB (never flood publish). */ - val floodPublish: Boolean = false, + val floodPublishMaxMessageSizeThreshold: Int = 0, /** * [gossipFactor] affects how many peers we will emit gossip to at each heartbeat. @@ -240,7 +242,7 @@ data class GossipParams( /** * [iDontWantMinMessageSizeThreshold] controls the minimum size (in bytes) that an incoming message needs to be so that an IDONTWANT message is sent to mesh peers. - * The default is 16 KB. + * The default is 16 KiB. */ val iDontWantMinMessageSizeThreshold: Int = 16384, @@ -260,6 +262,8 @@ data class GossipParams( check(DLow <= D, "DLow should be <= D") check(DHigh >= D, "DHigh should be >= D") check(gossipFactor in 0.0..1.0, "gossipFactor should be in range [0.0, 1.0]") + check(floodPublishMaxMessageSizeThreshold >= 0, "floodPublishMaxMessageSizeThreshold should be >= 0") + check(iDontWantMinMessageSizeThreshold >= 0, "iDontWantMinMessageSizeThreshold should be >= 0") } companion object { diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt index 101986888..287a97b66 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt @@ -400,8 +400,10 @@ open class GossipRouter( override fun broadcastOutbound(msg: PubsubMessage): CompletableFuture { msg.topics.forEach { lastPublished[it] = currentTimeSupplier() } + val floodPublish = msg.protobufMessage.data.size() <= params.floodPublishMaxMessageSizeThreshold + val peers = - if (params.floodPublish) { + if (floodPublish) { msg.topics .flatMap { getTopicPeers(it) } .filter { score.score(it.peerId) >= scoreParams.publishThreshold } diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt index a0f0b18ac..a98412715 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt @@ -76,6 +76,8 @@ class GossipParamsBuilder { private var iDontWantMinMessageSizeThreshold: Int? = null + private var floodPublishMaxMessageSizeThreshold: Int? = null + private var iDontWantTTL: Duration? = null init { @@ -90,7 +92,6 @@ class GossipParamsBuilder { this.maxPeersSentInPruneMsg = source.maxPeersSentInPruneMsg this.maxPeersAcceptedInPruneMsg = source.maxPeersAcceptedInPruneMsg this.pruneBackoff = source.pruneBackoff - this.floodPublish = source.floodPublish this.gossipFactor = source.gossipFactor this.opportunisticGraftPeers = source.opportunisticGraftPeers this.opportunisticGraftTicks = source.opportunisticGraftTicks @@ -141,8 +142,6 @@ class GossipParamsBuilder { fun pruneBackoff(value: Duration): GossipParamsBuilder = apply { pruneBackoff = value } - fun floodPublish(value: Boolean): GossipParamsBuilder = apply { floodPublish = value } - fun gossipFactor(value: Double): GossipParamsBuilder = apply { gossipFactor = value } fun opportunisticGraftPeers(value: Int): GossipParamsBuilder = apply { @@ -185,6 +184,8 @@ class GossipParamsBuilder { fun iDontWantMinMessageSizeThreshold(value: Int): GossipParamsBuilder = apply { iDontWantMinMessageSizeThreshold = value } + fun floodPublishMaxMessageSizeThreshold(value: Int): GossipParamsBuilder = apply { floodPublishMaxMessageSizeThreshold = value } + fun iDontWantTTL(value: Duration): GossipParamsBuilder = apply { iDontWantTTL = value } fun build(): GossipParams { @@ -203,7 +204,7 @@ class GossipParamsBuilder { gossipHistoryLength = gossipHistoryLength!!, heartbeatInterval = heartbeatInterval!!, seenTTL = seenTTL!!, - floodPublish = floodPublish!!, + floodPublishMaxMessageSizeThreshold = floodPublishMaxMessageSizeThreshold!!, gossipFactor = gossipFactor!!, opportunisticGraftPeers = opportunisticGraftPeers!!, opportunisticGraftTicks = opportunisticGraftTicks!!, diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt index b63fcb2af..41f5c7889 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit class GossipPubsubRouterTest : PubsubRouterTest( createGossipFuzzRouterFactory { - GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublish = false)) + GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublishMaxMessageSizeThreshold = 0)) } ) { @@ -59,7 +59,7 @@ class GossipPubsubRouterTest : PubsubRouterTest( // this is to test ihave/iwant fuzz.timeController.addTime(Duration.ofMillis(1)) - val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublish = false)) } + val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublishMaxMessageSizeThreshold = 0)) } val routerCenter = fuzz.createTestGossipRouter(r) allRouters.add(0, routerCenter) diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt index f0ceb9eb1..656963443 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt @@ -530,8 +530,9 @@ class GossipV1_1Tests : GossipTestsBase() { @Test fun testNotFloodPublish() { + val message = newMessage("topic1", 0L, "Hello-0".toByteArray()) val appScore = mutableMapOf().withDefault { 0.0 } - val coreParams = GossipParams(3, 3, 3, floodPublish = false) + val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = message.protobufMessage.data.size() - 1) val peerScoreParams = GossipPeerScoreParams(appSpecificScore = { appScore.getValue(it) }) val scoreParams = GossipScoreParams(peerScoreParams = peerScoreParams) val test = ManyRoutersTest(params = coreParams, scoreParams = scoreParams) @@ -545,7 +546,7 @@ class GossipV1_1Tests : GossipTestsBase() { val topicMesh = test.gossipRouter.mesh["topic1"]!! assertTrue(topicMesh.size > 0 && topicMesh.size < test.routers.size) - test.gossipRouter.publish(newMessage("topic1", 0L, "Hello-0".toByteArray())) + test.gossipRouter.publish(message) test.fuzz.timeController.addTime(50.millis) @@ -557,8 +558,9 @@ class GossipV1_1Tests : GossipTestsBase() { @Test fun testFloodPublish() { + val message = newMessage("topic1", 0L, "Hello-0".toByteArray()) val appScore = mutableMapOf().withDefault { 0.0 } - val coreParams = GossipParams(3, 3, 3, floodPublish = true) + val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = message.protobufMessage.data.size()) val peerScoreParams = GossipPeerScoreParams( appSpecificScore = { appScore.getValue(it) }, appSpecificWeight = 1.0 @@ -580,7 +582,7 @@ class GossipV1_1Tests : GossipTestsBase() { val topicMesh = test.gossipRouter.mesh["topic1"]!!.map { it.peerId } assertTrue(topicMesh.size > 0 && topicMesh.size < test.routers.size) - test.gossipRouter.publish(newMessage("topic1", 0L, "Hello-0".toByteArray())) + test.gossipRouter.publish(message) test.fuzz.timeController.addTime(50.millis) @@ -650,7 +652,7 @@ class GossipV1_1Tests : GossipTestsBase() { 3, 3, DLazy = 3, - floodPublish = false, + floodPublishMaxMessageSizeThreshold = 0, gossipFactor = 0.5 ) val peerScoreParams = GossipPeerScoreParams( @@ -714,7 +716,7 @@ class GossipV1_1Tests : GossipTestsBase() { @Test fun testOutboundMeshQuotas1() { val appScore = mutableMapOf().withDefault { 0.0 } - val coreParams = GossipParams(3, 3, 3, DLazy = 3, DOut = 1, floodPublish = false) + val coreParams = GossipParams(3, 3, 3, DLazy = 3, DOut = 1, floodPublishMaxMessageSizeThreshold = 0) val peerScoreParams = GossipPeerScoreParams(appSpecificScore = { appScore.getValue(it) }) val scoreParams = GossipScoreParams(peerScoreParams = peerScoreParams) val test = ManyRoutersTest(params = coreParams, scoreParams = scoreParams) diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt index 28b4a6f67..1e78107bc 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt @@ -10,7 +10,7 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow class SubscriptionsLimitTest : TwoGossipHostTestBase() { - override val params = GossipParams(maxSubscriptions = 5, floodPublish = true) + override val params = GossipParams(maxSubscriptions = 5, floodPublishMaxMessageSizeThreshold = 0) @Test fun `new peer subscribed to many topics`() { diff --git a/libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/gossip/Eth2GossipParams.kt b/libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/gossip/Eth2GossipParams.kt index 25aace38c..2cdffd044 100644 --- a/libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/gossip/Eth2GossipParams.kt +++ b/libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/gossip/Eth2GossipParams.kt @@ -20,7 +20,7 @@ val Eth2DefaultGossipParams = GossipParams( DLazy = 8, pruneBackoff = 1.minutes, - floodPublish = true, + floodPublishMaxMessageSizeThreshold = 0, gossipFactor = 0.25, DScore = 4, DOut = 2, diff --git a/tools/simulator/src/main/kotlin/io/libp2p/simulate/gossip/Eth2GossipParams.kt b/tools/simulator/src/main/kotlin/io/libp2p/simulate/gossip/Eth2GossipParams.kt index e65abf015..8d47c0e66 100644 --- a/tools/simulator/src/main/kotlin/io/libp2p/simulate/gossip/Eth2GossipParams.kt +++ b/tools/simulator/src/main/kotlin/io/libp2p/simulate/gossip/Eth2GossipParams.kt @@ -25,7 +25,7 @@ val Eth2DefaultGossipParams = GossipParams( DLazy = 8, pruneBackoff = 1.minutes, - floodPublish = true, + floodPublishMaxMessageSizeThreshold = 16384, gossipFactor = 0.25, DScore = 4, DOut = 2, diff --git a/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt b/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt index 997a917b3..3fb00355f 100644 --- a/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt +++ b/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt @@ -41,7 +41,7 @@ class BlobDecouplingSimulation( val randomSeed: Long = 3L, val rnd: Random = Random(randomSeed), - val floodPublish: Boolean = true, + val floodPublishMaxMessageSizeThreshold: Int = 0, val sendingPeerBand: Bandwidth = Bandwidth.mbitsPerSec(100), @@ -85,7 +85,7 @@ class BlobDecouplingSimulation( val gossipParams = Eth2DefaultGossipParams .copy( // heartbeatInterval = 1.minutes - floodPublish = floodPublish + floodPublishMaxMessageSizeThreshold = floodPublishMaxMessageSizeThreshold ) val gossipScoreParams = Eth2DefaultScoreParams val gossipRouterCtor = { _: Int -> @@ -294,7 +294,7 @@ fun main() { // logger = {}, nodeCount = 1000, peerBands = band, - floodPublish = false, + floodPublishMaxMessageSizeThreshold = 0, // randomSeed = 2 ) From 20ab78cde3c103e42c23b17e3747325ae71710c8 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Mon, 21 Oct 2024 12:47:35 +0200 Subject: [PATCH 2/7] align param values --- .../kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt | 2 +- .../kotlin/io/libp2p/pubsub/gossip/Eth2GossipParams.kt | 2 +- .../kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt index 1e78107bc..141d6c433 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt @@ -10,7 +10,7 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow class SubscriptionsLimitTest : TwoGossipHostTestBase() { - override val params = GossipParams(maxSubscriptions = 5, floodPublishMaxMessageSizeThreshold = 0) + override val params = GossipParams(maxSubscriptions = 5, floodPublishMaxMessageSizeThreshold = 16384) @Test fun `new peer subscribed to many topics`() { diff --git a/libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/gossip/Eth2GossipParams.kt b/libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/gossip/Eth2GossipParams.kt index 2cdffd044..4eb050f53 100644 --- a/libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/gossip/Eth2GossipParams.kt +++ b/libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/gossip/Eth2GossipParams.kt @@ -20,7 +20,7 @@ val Eth2DefaultGossipParams = GossipParams( DLazy = 8, pruneBackoff = 1.minutes, - floodPublishMaxMessageSizeThreshold = 0, + floodPublishMaxMessageSizeThreshold = 16384, gossipFactor = 0.25, DScore = 4, DOut = 2, diff --git a/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt b/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt index 3fb00355f..5112f5f6a 100644 --- a/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt +++ b/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt @@ -41,7 +41,7 @@ class BlobDecouplingSimulation( val randomSeed: Long = 3L, val rnd: Random = Random(randomSeed), - val floodPublishMaxMessageSizeThreshold: Int = 0, + val floodPublishMaxMessageSizeThreshold: Int = 16384, val sendingPeerBand: Bandwidth = Bandwidth.mbitsPerSec(100), From 3553637aa76b81799ecb0b103535a9cab46a2dc5 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Mon, 21 Oct 2024 12:50:22 +0200 Subject: [PATCH 3/7] fix param builder --- .../io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt index a98412715..2f107d9f8 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt @@ -40,8 +40,6 @@ class GossipParamsBuilder { private var pruneBackoff: Duration? = null - private var floodPublish: Boolean? = null - private var gossipFactor: Double? = null private var opportunisticGraftPeers: Int? = null @@ -253,7 +251,7 @@ class GossipParamsBuilder { check(seenTTL != null, { "seenTTL must not be null" }) check(maxPeersSentInPruneMsg != null, { "maxPeersSentInPruneMsg must not be null" }) check(pruneBackoff != null, { "pruneBackoff must not be null" }) - check(floodPublish != null, { "floodPublish must not be null" }) + check(floodPublishMaxMessageSizeThreshold != null, { "floodPublishMaxMessageSizeThreshold must not be null" }) check(gossipFactor != null, { "gossipFactor must not be null" }) check(opportunisticGraftPeers != null, { "opportunisticGraftPeers must not be null" }) check(opportunisticGraftTicks != null, { "opportunisticGraftTicks must not be null" }) From 3805ba744015a374e9312e1418c453ccc9f86807 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Mon, 21 Oct 2024 13:04:16 +0200 Subject: [PATCH 4/7] fix builder --- .../io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt index 2f107d9f8..9696a8f8c 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt @@ -90,6 +90,7 @@ class GossipParamsBuilder { this.maxPeersSentInPruneMsg = source.maxPeersSentInPruneMsg this.maxPeersAcceptedInPruneMsg = source.maxPeersAcceptedInPruneMsg this.pruneBackoff = source.pruneBackoff + this.floodPublishMaxMessageSizeThreshold = source.floodPublishMaxMessageSizeThreshold this.gossipFactor = source.gossipFactor this.opportunisticGraftPeers = source.opportunisticGraftPeers this.opportunisticGraftTicks = source.opportunisticGraftTicks From bd50c9cf612573c496cd727cd551df0ea3066f2d Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Mon, 21 Oct 2024 15:04:34 +0200 Subject: [PATCH 5/7] add constants and PubsubMessage size method --- .../src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt | 3 +++ .../kotlin/io/libp2p/pubsub/gossip/GossipParams.kt | 11 +++++++++-- .../kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt | 4 ++-- .../io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt | 4 ++-- .../kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt | 4 ++-- 5 files changed, 18 insertions(+), 8 deletions(-) diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt index ae3d94b16..af3f2d723 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt @@ -22,6 +22,9 @@ interface PubsubMessage { val topics: List get() = protobufMessage.topicIDsList + val size: Int + get() = protobufMessage.data.size() + fun messageSha256() = sha256(protobufMessage.toByteArray()) override fun equals(other: Any?): Boolean diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt index af2a02173..654683ab6 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt @@ -22,6 +22,10 @@ fun defaultDLazy(D: Int) = D fun defaultDScore(D: Int) = D * 2 / 3 fun defaultDOut(D: Int, DLow: Int) = min(D / 2, max(DLow - 1, 0)) +// floodPublishMaxMessageSizeThreshold shortcuts +const val NEVER_FLOOD_PUBLISH = 0 +const val ALWAYS_FLOOD_PUBLISH = Int.MAX_VALUE + /** * Parameters of Gossip 1.1 router */ @@ -116,9 +120,12 @@ data class GossipParams( * published using flood publishing mode. * When a message size is <= [floodPublishMaxMessageSizeThreshold], published messages are forwarded * to all peers with score >= to [GossipScoreParams.publishThreshold] - * The default is 0 KiB (never flood publish). + * + * [NEVER_FLOOD_PUBLISH] and [ALWAYS_FLOOD_PUBLISH] can be used as shortcuts. + * + * The default is [NEVER_FLOOD_PUBLISH] (0 KiB). */ - val floodPublishMaxMessageSizeThreshold: Int = 0, + val floodPublishMaxMessageSizeThreshold: Int = NEVER_FLOOD_PUBLISH, /** * [gossipFactor] affects how many peers we will emit gossip to at each heartbeat. diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt index 287a97b66..fc03490f1 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt @@ -400,7 +400,7 @@ open class GossipRouter( override fun broadcastOutbound(msg: PubsubMessage): CompletableFuture { msg.topics.forEach { lastPublished[it] = currentTimeSupplier() } - val floodPublish = msg.protobufMessage.data.size() <= params.floodPublishMaxMessageSizeThreshold + val floodPublish = msg.size <= params.floodPublishMaxMessageSizeThreshold val peers = if (floodPublish) { @@ -615,7 +615,7 @@ open class GossipRouter( private fun iDontWant(msg: PubsubMessage, receivedFrom: PeerHandler? = null) { if (!this.protocol.supportsIDontWant()) return - if (msg.protobufMessage.data.size() < params.iDontWantMinMessageSizeThreshold) return + if (msg.size < params.iDontWantMinMessageSizeThreshold) return // we need to send IDONTWANT messages to mesh peers immediately in order for them to have an effect msg.topics .mapNotNull { mesh[it] } diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt index 41f5c7889..80297d123 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit class GossipPubsubRouterTest : PubsubRouterTest( createGossipFuzzRouterFactory { - GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublishMaxMessageSizeThreshold = 0)) + GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublishMaxMessageSizeThreshold = NEVER_FLOOD_PUBLISH)) } ) { @@ -59,7 +59,7 @@ class GossipPubsubRouterTest : PubsubRouterTest( // this is to test ihave/iwant fuzz.timeController.addTime(Duration.ofMillis(1)) - val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublishMaxMessageSizeThreshold = 0)) } + val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublishMaxMessageSizeThreshold = NEVER_FLOOD_PUBLISH)) } val routerCenter = fuzz.createTestGossipRouter(r) allRouters.add(0, routerCenter) diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt index 656963443..7843860a3 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt @@ -532,7 +532,7 @@ class GossipV1_1Tests : GossipTestsBase() { fun testNotFloodPublish() { val message = newMessage("topic1", 0L, "Hello-0".toByteArray()) val appScore = mutableMapOf().withDefault { 0.0 } - val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = message.protobufMessage.data.size() - 1) + val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = message.size - 1) val peerScoreParams = GossipPeerScoreParams(appSpecificScore = { appScore.getValue(it) }) val scoreParams = GossipScoreParams(peerScoreParams = peerScoreParams) val test = ManyRoutersTest(params = coreParams, scoreParams = scoreParams) @@ -560,7 +560,7 @@ class GossipV1_1Tests : GossipTestsBase() { fun testFloodPublish() { val message = newMessage("topic1", 0L, "Hello-0".toByteArray()) val appScore = mutableMapOf().withDefault { 0.0 } - val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = message.protobufMessage.data.size()) + val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = message.size) val peerScoreParams = GossipPeerScoreParams( appSpecificScore = { appScore.getValue(it) }, appSpecificWeight = 1.0 From 851d2c648ddde6765e3714beac9226a9cc5d03be Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Mon, 21 Oct 2024 15:16:35 +0200 Subject: [PATCH 6/7] spotless --- libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt index af3f2d723..c960fdb68 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt @@ -23,7 +23,7 @@ interface PubsubMessage { get() = protobufMessage.topicIDsList val size: Int - get() = protobufMessage.data.size() + get() = protobufMessage.data.size() fun messageSha256() = sha256(protobufMessage.toByteArray()) From 88fe8aefe640cea8780c87445daa5706689d9339 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Mon, 21 Oct 2024 15:33:54 +0200 Subject: [PATCH 7/7] cleanups --- .../test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt | 2 +- .../io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt | 2 +- .../io/libp2p/simulate/main/BlobDecouplingSimulation.kt | 7 ------- 3 files changed, 2 insertions(+), 9 deletions(-) diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt index 7843860a3..0ef84747b 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt @@ -716,7 +716,7 @@ class GossipV1_1Tests : GossipTestsBase() { @Test fun testOutboundMeshQuotas1() { val appScore = mutableMapOf().withDefault { 0.0 } - val coreParams = GossipParams(3, 3, 3, DLazy = 3, DOut = 1, floodPublishMaxMessageSizeThreshold = 0) + val coreParams = GossipParams(3, 3, 3, DLazy = 3, DOut = 1, floodPublishMaxMessageSizeThreshold = NEVER_FLOOD_PUBLISH) val peerScoreParams = GossipPeerScoreParams(appSpecificScore = { appScore.getValue(it) }) val scoreParams = GossipScoreParams(peerScoreParams = peerScoreParams) val test = ManyRoutersTest(params = coreParams, scoreParams = scoreParams) diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt index 141d6c433..bc1f02fa6 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt @@ -10,7 +10,7 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow class SubscriptionsLimitTest : TwoGossipHostTestBase() { - override val params = GossipParams(maxSubscriptions = 5, floodPublishMaxMessageSizeThreshold = 16384) + override val params = GossipParams(maxSubscriptions = 5, floodPublishMaxMessageSizeThreshold = ALWAYS_FLOOD_PUBLISH) @Test fun `new peer subscribed to many topics`() { diff --git a/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt b/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt index 5112f5f6a..ca45433cd 100644 --- a/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt +++ b/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt @@ -41,8 +41,6 @@ class BlobDecouplingSimulation( val randomSeed: Long = 3L, val rnd: Random = Random(randomSeed), - val floodPublishMaxMessageSizeThreshold: Int = 16384, - val sendingPeerBand: Bandwidth = Bandwidth.mbitsPerSec(100), val peerBands: Iterator = iterator { @@ -83,10 +81,6 @@ class BlobDecouplingSimulation( ) val gossipParams = Eth2DefaultGossipParams - .copy( -// heartbeatInterval = 1.minutes - floodPublishMaxMessageSizeThreshold = floodPublishMaxMessageSizeThreshold - ) val gossipScoreParams = Eth2DefaultScoreParams val gossipRouterCtor = { _: Int -> SimGossipRouterBuilder().also { @@ -294,7 +288,6 @@ fun main() { // logger = {}, nodeCount = 1000, peerBands = band, - floodPublishMaxMessageSizeThreshold = 0, // randomSeed = 2 )