diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt index b2cad191..bbc229de 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt @@ -112,11 +112,13 @@ data class GossipParams( val seenTTL: Duration = 2.minutes, /** - * [floodPublish] is a gossipsub router option that enables flood publishing. - * When this is enabled, published messages are forwarded to all peers with score >= - * to publishThreshold + * [floodPublishMaxMessageSizeThreshold] controls the maximum size (in bytes) a message will be + * published using flood publishing mode. + * When a message size is <= [floodPublishMaxMessageSizeThreshold], published messages are forwarded + * to all peers with score >= to [GossipScoreParams.publishThreshold] + * The default is 0 KiB (never flood publish). */ - val floodPublish: Boolean = false, + val floodPublishMaxMessageSizeThreshold: Int = 0, /** * [gossipFactor] affects how many peers we will emit gossip to at each heartbeat. @@ -240,7 +242,7 @@ data class GossipParams( /** * [iDontWantMinMessageSizeThreshold] controls the minimum size (in bytes) that an incoming message needs to be so that an IDONTWANT message is sent to mesh peers. - * The default is 16 KB. + * The default is 16 KiB. */ val iDontWantMinMessageSizeThreshold: Int = 16000, @@ -260,6 +262,8 @@ data class GossipParams( check(DLow <= D, "DLow should be <= D") check(DHigh >= D, "DHigh should be >= D") check(gossipFactor in 0.0..1.0, "gossipFactor should be in range [0.0, 1.0]") + check(floodPublishMaxMessageSizeThreshold >= 0, "floodPublishMaxMessageSizeThreshold should be >= 0") + check(iDontWantMinMessageSizeThreshold >= 0, "iDontWantMinMessageSizeThreshold should be >= 0") } companion object { diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt index 191c8b03..75a4413a 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt @@ -400,8 +400,10 @@ open class GossipRouter( override fun broadcastOutbound(msg: PubsubMessage): CompletableFuture { msg.topics.forEach { lastPublished[it] = currentTimeSupplier() } + val floodPublish = msg.protobufMessage.data.size() <= params.floodPublishMaxMessageSizeThreshold + val peers = - if (params.floodPublish) { + if (floodPublish) { msg.topics .flatMap { getTopicPeers(it) } .filter { score.score(it.peerId) >= scoreParams.publishThreshold } diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt index a0f0b18a..a9841271 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt @@ -76,6 +76,8 @@ class GossipParamsBuilder { private var iDontWantMinMessageSizeThreshold: Int? = null + private var floodPublishMaxMessageSizeThreshold: Int? = null + private var iDontWantTTL: Duration? = null init { @@ -90,7 +92,6 @@ class GossipParamsBuilder { this.maxPeersSentInPruneMsg = source.maxPeersSentInPruneMsg this.maxPeersAcceptedInPruneMsg = source.maxPeersAcceptedInPruneMsg this.pruneBackoff = source.pruneBackoff - this.floodPublish = source.floodPublish this.gossipFactor = source.gossipFactor this.opportunisticGraftPeers = source.opportunisticGraftPeers this.opportunisticGraftTicks = source.opportunisticGraftTicks @@ -141,8 +142,6 @@ class GossipParamsBuilder { fun pruneBackoff(value: Duration): GossipParamsBuilder = apply { pruneBackoff = value } - fun floodPublish(value: Boolean): GossipParamsBuilder = apply { floodPublish = value } - fun gossipFactor(value: Double): GossipParamsBuilder = apply { gossipFactor = value } fun opportunisticGraftPeers(value: Int): GossipParamsBuilder = apply { @@ -185,6 +184,8 @@ class GossipParamsBuilder { fun iDontWantMinMessageSizeThreshold(value: Int): GossipParamsBuilder = apply { iDontWantMinMessageSizeThreshold = value } + fun floodPublishMaxMessageSizeThreshold(value: Int): GossipParamsBuilder = apply { floodPublishMaxMessageSizeThreshold = value } + fun iDontWantTTL(value: Duration): GossipParamsBuilder = apply { iDontWantTTL = value } fun build(): GossipParams { @@ -203,7 +204,7 @@ class GossipParamsBuilder { gossipHistoryLength = gossipHistoryLength!!, heartbeatInterval = heartbeatInterval!!, seenTTL = seenTTL!!, - floodPublish = floodPublish!!, + floodPublishMaxMessageSizeThreshold = floodPublishMaxMessageSizeThreshold!!, gossipFactor = gossipFactor!!, opportunisticGraftPeers = opportunisticGraftPeers!!, opportunisticGraftTicks = opportunisticGraftTicks!!, diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt index b63fcb2a..41f5c788 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit class GossipPubsubRouterTest : PubsubRouterTest( createGossipFuzzRouterFactory { - GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublish = false)) + GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublishMaxMessageSizeThreshold = 0)) } ) { @@ -59,7 +59,7 @@ class GossipPubsubRouterTest : PubsubRouterTest( // this is to test ihave/iwant fuzz.timeController.addTime(Duration.ofMillis(1)) - val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublish = false)) } + val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublishMaxMessageSizeThreshold = 0)) } val routerCenter = fuzz.createTestGossipRouter(r) allRouters.add(0, routerCenter) diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt index f0ceb9eb..65696344 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt @@ -530,8 +530,9 @@ class GossipV1_1Tests : GossipTestsBase() { @Test fun testNotFloodPublish() { + val message = newMessage("topic1", 0L, "Hello-0".toByteArray()) val appScore = mutableMapOf().withDefault { 0.0 } - val coreParams = GossipParams(3, 3, 3, floodPublish = false) + val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = message.protobufMessage.data.size() - 1) val peerScoreParams = GossipPeerScoreParams(appSpecificScore = { appScore.getValue(it) }) val scoreParams = GossipScoreParams(peerScoreParams = peerScoreParams) val test = ManyRoutersTest(params = coreParams, scoreParams = scoreParams) @@ -545,7 +546,7 @@ class GossipV1_1Tests : GossipTestsBase() { val topicMesh = test.gossipRouter.mesh["topic1"]!! assertTrue(topicMesh.size > 0 && topicMesh.size < test.routers.size) - test.gossipRouter.publish(newMessage("topic1", 0L, "Hello-0".toByteArray())) + test.gossipRouter.publish(message) test.fuzz.timeController.addTime(50.millis) @@ -557,8 +558,9 @@ class GossipV1_1Tests : GossipTestsBase() { @Test fun testFloodPublish() { + val message = newMessage("topic1", 0L, "Hello-0".toByteArray()) val appScore = mutableMapOf().withDefault { 0.0 } - val coreParams = GossipParams(3, 3, 3, floodPublish = true) + val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = message.protobufMessage.data.size()) val peerScoreParams = GossipPeerScoreParams( appSpecificScore = { appScore.getValue(it) }, appSpecificWeight = 1.0 @@ -580,7 +582,7 @@ class GossipV1_1Tests : GossipTestsBase() { val topicMesh = test.gossipRouter.mesh["topic1"]!!.map { it.peerId } assertTrue(topicMesh.size > 0 && topicMesh.size < test.routers.size) - test.gossipRouter.publish(newMessage("topic1", 0L, "Hello-0".toByteArray())) + test.gossipRouter.publish(message) test.fuzz.timeController.addTime(50.millis) @@ -650,7 +652,7 @@ class GossipV1_1Tests : GossipTestsBase() { 3, 3, DLazy = 3, - floodPublish = false, + floodPublishMaxMessageSizeThreshold = 0, gossipFactor = 0.5 ) val peerScoreParams = GossipPeerScoreParams( @@ -714,7 +716,7 @@ class GossipV1_1Tests : GossipTestsBase() { @Test fun testOutboundMeshQuotas1() { val appScore = mutableMapOf().withDefault { 0.0 } - val coreParams = GossipParams(3, 3, 3, DLazy = 3, DOut = 1, floodPublish = false) + val coreParams = GossipParams(3, 3, 3, DLazy = 3, DOut = 1, floodPublishMaxMessageSizeThreshold = 0) val peerScoreParams = GossipPeerScoreParams(appSpecificScore = { appScore.getValue(it) }) val scoreParams = GossipScoreParams(peerScoreParams = peerScoreParams) val test = ManyRoutersTest(params = coreParams, scoreParams = scoreParams) diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt index 28b4a6f6..1e78107b 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt @@ -10,7 +10,7 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow class SubscriptionsLimitTest : TwoGossipHostTestBase() { - override val params = GossipParams(maxSubscriptions = 5, floodPublish = true) + override val params = GossipParams(maxSubscriptions = 5, floodPublishMaxMessageSizeThreshold = 0) @Test fun `new peer subscribed to many topics`() { diff --git a/libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/gossip/Eth2GossipParams.kt b/libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/gossip/Eth2GossipParams.kt index 25aace38..2cdffd04 100644 --- a/libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/gossip/Eth2GossipParams.kt +++ b/libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/gossip/Eth2GossipParams.kt @@ -20,7 +20,7 @@ val Eth2DefaultGossipParams = GossipParams( DLazy = 8, pruneBackoff = 1.minutes, - floodPublish = true, + floodPublishMaxMessageSizeThreshold = 0, gossipFactor = 0.25, DScore = 4, DOut = 2, diff --git a/tools/simulator/src/main/kotlin/io/libp2p/simulate/gossip/Eth2GossipParams.kt b/tools/simulator/src/main/kotlin/io/libp2p/simulate/gossip/Eth2GossipParams.kt index e65abf01..8d47c0e6 100644 --- a/tools/simulator/src/main/kotlin/io/libp2p/simulate/gossip/Eth2GossipParams.kt +++ b/tools/simulator/src/main/kotlin/io/libp2p/simulate/gossip/Eth2GossipParams.kt @@ -25,7 +25,7 @@ val Eth2DefaultGossipParams = GossipParams( DLazy = 8, pruneBackoff = 1.minutes, - floodPublish = true, + floodPublishMaxMessageSizeThreshold = 16384, gossipFactor = 0.25, DScore = 4, DOut = 2, diff --git a/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt b/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt index 997a917b..3fb00355 100644 --- a/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt +++ b/tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt @@ -41,7 +41,7 @@ class BlobDecouplingSimulation( val randomSeed: Long = 3L, val rnd: Random = Random(randomSeed), - val floodPublish: Boolean = true, + val floodPublishMaxMessageSizeThreshold: Int = 0, val sendingPeerBand: Bandwidth = Bandwidth.mbitsPerSec(100), @@ -85,7 +85,7 @@ class BlobDecouplingSimulation( val gossipParams = Eth2DefaultGossipParams .copy( // heartbeatInterval = 1.minutes - floodPublish = floodPublish + floodPublishMaxMessageSizeThreshold = floodPublishMaxMessageSizeThreshold ) val gossipScoreParams = Eth2DefaultScoreParams val gossipRouterCtor = { _: Int -> @@ -294,7 +294,7 @@ fun main() { // logger = {}, nodeCount = 1000, peerBands = band, - floodPublish = false, + floodPublishMaxMessageSizeThreshold = 0, // randomSeed = 2 )