Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flood publish max message size threshold #390

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ configure(
}
) {
group = "io.libp2p"
version = "develop"
version = "1.2.0-RELEASE"

apply(plugin = "kotlin")
apply(plugin = "idea")
Expand Down
2 changes: 1 addition & 1 deletion libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ interface PubsubMessage {
override fun equals(other: Any?): Boolean

/**
* WARNING: Use collision free functions only
* WARNING: Use collision resistant functions only
* Else the HashMap collision attack vector is open
*/
override fun hashCode(): Int
Expand Down
14 changes: 9 additions & 5 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,13 @@ data class GossipParams(
val seenTTL: Duration = 2.minutes,

/**
* [floodPublish] is a gossipsub router option that enables flood publishing.
* When this is enabled, published messages are forwarded to all peers with score >=
* to publishThreshold
* [floodPublishMaxMessageSizeThreshold] controls the maximum size (in bytes) a message will be
* published using flood publishing mode.
* When a message size is <= [floodPublishMaxMessageSizeThreshold], published messages are forwarded
* to all peers with score >= to [GossipScoreParams.publishThreshold]
* The default is 0 KiB (never flood publish).
*/
val floodPublish: Boolean = false,
val floodPublishMaxMessageSizeThreshold: Int = 0,

/**
* [gossipFactor] affects how many peers we will emit gossip to at each heartbeat.
Expand Down Expand Up @@ -240,7 +242,7 @@ data class GossipParams(

/**
* [iDontWantMinMessageSizeThreshold] controls the minimum size (in bytes) that an incoming message needs to be so that an IDONTWANT message is sent to mesh peers.
* The default is 16 KB.
* The default is 16 KiB.
*/
val iDontWantMinMessageSizeThreshold: Int = 16384,

Expand All @@ -260,6 +262,8 @@ data class GossipParams(
check(DLow <= D, "DLow should be <= D")
check(DHigh >= D, "DHigh should be >= D")
check(gossipFactor in 0.0..1.0, "gossipFactor should be in range [0.0, 1.0]")
check(floodPublishMaxMessageSizeThreshold >= 0, "floodPublishMaxMessageSizeThreshold should be >= 0")
check(iDontWantMinMessageSizeThreshold >= 0, "iDontWantMinMessageSizeThreshold should be >= 0")
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,10 @@ open class GossipRouter(
override fun broadcastOutbound(msg: PubsubMessage): CompletableFuture<Unit> {
msg.topics.forEach { lastPublished[it] = currentTimeSupplier() }

val floodPublish = msg.protobufMessage.data.size() <= params.floodPublishMaxMessageSizeThreshold

val peers =
if (params.floodPublish) {
if (floodPublish) {
msg.topics
.flatMap { getTopicPeers(it) }
.filter { score.score(it.peerId) >= scoreParams.publishThreshold }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class GossipParamsBuilder {

private var iDontWantMinMessageSizeThreshold: Int? = null

private var floodPublishMaxMessageSizeThreshold: Int? = null

private var iDontWantTTL: Duration? = null

init {
Expand All @@ -90,7 +92,6 @@ class GossipParamsBuilder {
this.maxPeersSentInPruneMsg = source.maxPeersSentInPruneMsg
this.maxPeersAcceptedInPruneMsg = source.maxPeersAcceptedInPruneMsg
this.pruneBackoff = source.pruneBackoff
this.floodPublish = source.floodPublish
this.gossipFactor = source.gossipFactor
this.opportunisticGraftPeers = source.opportunisticGraftPeers
this.opportunisticGraftTicks = source.opportunisticGraftTicks
Expand Down Expand Up @@ -141,8 +142,6 @@ class GossipParamsBuilder {

fun pruneBackoff(value: Duration): GossipParamsBuilder = apply { pruneBackoff = value }

fun floodPublish(value: Boolean): GossipParamsBuilder = apply { floodPublish = value }

fun gossipFactor(value: Double): GossipParamsBuilder = apply { gossipFactor = value }

fun opportunisticGraftPeers(value: Int): GossipParamsBuilder = apply {
Expand Down Expand Up @@ -185,6 +184,8 @@ class GossipParamsBuilder {

fun iDontWantMinMessageSizeThreshold(value: Int): GossipParamsBuilder = apply { iDontWantMinMessageSizeThreshold = value }

fun floodPublishMaxMessageSizeThreshold(value: Int): GossipParamsBuilder = apply { floodPublishMaxMessageSizeThreshold = value }

fun iDontWantTTL(value: Duration): GossipParamsBuilder = apply { iDontWantTTL = value }

fun build(): GossipParams {
Expand All @@ -203,7 +204,7 @@ class GossipParamsBuilder {
gossipHistoryLength = gossipHistoryLength!!,
heartbeatInterval = heartbeatInterval!!,
seenTTL = seenTTL!!,
floodPublish = floodPublish!!,
floodPublishMaxMessageSizeThreshold = floodPublishMaxMessageSizeThreshold!!,
gossipFactor = gossipFactor!!,
opportunisticGraftPeers = opportunisticGraftPeers!!,
opportunisticGraftTicks = opportunisticGraftTicks!!,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit

class GossipPubsubRouterTest : PubsubRouterTest(
createGossipFuzzRouterFactory {
GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublish = false))
GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublishMaxMessageSizeThreshold = 0))
}
) {

Expand Down Expand Up @@ -59,7 +59,7 @@ class GossipPubsubRouterTest : PubsubRouterTest(
// this is to test ihave/iwant
fuzz.timeController.addTime(Duration.ofMillis(1))

val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublish = false)) }
val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublishMaxMessageSizeThreshold = 0)) }
val routerCenter = fuzz.createTestGossipRouter(r)
allRouters.add(0, routerCenter)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,9 @@ class GossipV1_1Tests : GossipTestsBase() {

@Test
fun testNotFloodPublish() {
val message = newMessage("topic1", 0L, "Hello-0".toByteArray())
val appScore = mutableMapOf<PeerId, Double>().withDefault { 0.0 }
val coreParams = GossipParams(3, 3, 3, floodPublish = false)
val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = message.protobufMessage.data.size() - 1)
val peerScoreParams = GossipPeerScoreParams(appSpecificScore = { appScore.getValue(it) })
val scoreParams = GossipScoreParams(peerScoreParams = peerScoreParams)
val test = ManyRoutersTest(params = coreParams, scoreParams = scoreParams)
Expand All @@ -545,7 +546,7 @@ class GossipV1_1Tests : GossipTestsBase() {
val topicMesh = test.gossipRouter.mesh["topic1"]!!
assertTrue(topicMesh.size > 0 && topicMesh.size < test.routers.size)

test.gossipRouter.publish(newMessage("topic1", 0L, "Hello-0".toByteArray()))
test.gossipRouter.publish(message)

test.fuzz.timeController.addTime(50.millis)

Expand All @@ -557,8 +558,9 @@ class GossipV1_1Tests : GossipTestsBase() {

@Test
fun testFloodPublish() {
val message = newMessage("topic1", 0L, "Hello-0".toByteArray())
val appScore = mutableMapOf<PeerId, Double>().withDefault { 0.0 }
val coreParams = GossipParams(3, 3, 3, floodPublish = true)
val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = message.protobufMessage.data.size())
val peerScoreParams = GossipPeerScoreParams(
appSpecificScore = { appScore.getValue(it) },
appSpecificWeight = 1.0
Expand All @@ -580,7 +582,7 @@ class GossipV1_1Tests : GossipTestsBase() {
val topicMesh = test.gossipRouter.mesh["topic1"]!!.map { it.peerId }
assertTrue(topicMesh.size > 0 && topicMesh.size < test.routers.size)

test.gossipRouter.publish(newMessage("topic1", 0L, "Hello-0".toByteArray()))
test.gossipRouter.publish(message)

test.fuzz.timeController.addTime(50.millis)

Expand Down Expand Up @@ -650,7 +652,7 @@ class GossipV1_1Tests : GossipTestsBase() {
3,
3,
DLazy = 3,
floodPublish = false,
floodPublishMaxMessageSizeThreshold = 0,
gossipFactor = 0.5
)
val peerScoreParams = GossipPeerScoreParams(
Expand Down Expand Up @@ -714,7 +716,7 @@ class GossipV1_1Tests : GossipTestsBase() {
@Test
fun testOutboundMeshQuotas1() {
val appScore = mutableMapOf<PeerId, Double>().withDefault { 0.0 }
val coreParams = GossipParams(3, 3, 3, DLazy = 3, DOut = 1, floodPublish = false)
val coreParams = GossipParams(3, 3, 3, DLazy = 3, DOut = 1, floodPublishMaxMessageSizeThreshold = 0)
val peerScoreParams = GossipPeerScoreParams(appSpecificScore = { appScore.getValue(it) })
val scoreParams = GossipScoreParams(peerScoreParams = peerScoreParams)
val test = ManyRoutersTest(params = coreParams, scoreParams = scoreParams)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow

class SubscriptionsLimitTest : TwoGossipHostTestBase() {
override val params = GossipParams(maxSubscriptions = 5, floodPublish = true)
override val params = GossipParams(maxSubscriptions = 5, floodPublishMaxMessageSizeThreshold = 0)

@Test
fun `new peer subscribed to many topics`() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ val Eth2DefaultGossipParams = GossipParams(
DLazy = 8,

pruneBackoff = 1.minutes,
floodPublish = true,
floodPublishMaxMessageSizeThreshold = 0,
gossipFactor = 0.25,
DScore = 4,
DOut = 2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ val Eth2DefaultGossipParams = GossipParams(
DLazy = 8,

pruneBackoff = 1.minutes,
floodPublish = true,
floodPublishMaxMessageSizeThreshold = 16384,
gossipFactor = 0.25,
DScore = 4,
DOut = 2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class BlobDecouplingSimulation(
val randomSeed: Long = 3L,
val rnd: Random = Random(randomSeed),

val floodPublish: Boolean = true,
val floodPublishMaxMessageSizeThreshold: Int = 0,

val sendingPeerBand: Bandwidth = Bandwidth.mbitsPerSec(100),

Expand Down Expand Up @@ -85,7 +85,7 @@ class BlobDecouplingSimulation(
val gossipParams = Eth2DefaultGossipParams
.copy(
// heartbeatInterval = 1.minutes
floodPublish = floodPublish
floodPublishMaxMessageSizeThreshold = floodPublishMaxMessageSizeThreshold
)
val gossipScoreParams = Eth2DefaultScoreParams
val gossipRouterCtor = { _: Int ->
Expand Down Expand Up @@ -294,7 +294,7 @@ fun main() {
// logger = {},
nodeCount = 1000,
peerBands = band,
floodPublish = false,
floodPublishMaxMessageSizeThreshold = 0,
// randomSeed = 2
)

Expand Down
Loading