Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/publish-to-d-peers' into publish…
Browse files Browse the repository at this point in the history
…-to-d-peers
  • Loading branch information
Nashatyrev committed Oct 22, 2024
2 parents 1900b83 + cf71d1b commit 38b5e44
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 32 deletions.
3 changes: 3 additions & 0 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ interface PubsubMessage {
val topics: List<Topic>
get() = protobufMessage.topicIDsList

val size: Int
get() = protobufMessage.data.size()

fun messageSha256() = sha256(protobufMessage.toByteArray())

override fun equals(other: Any?): Boolean
Expand Down
21 changes: 16 additions & 5 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ fun defaultDLazy(D: Int) = D
fun defaultDScore(D: Int) = D * 2 / 3
fun defaultDOut(D: Int, DLow: Int) = min(D / 2, max(DLow - 1, 0))

// floodPublishMaxMessageSizeThreshold shortcuts
const val NEVER_FLOOD_PUBLISH = 0
const val ALWAYS_FLOOD_PUBLISH = Int.MAX_VALUE

/**
* Parameters of Gossip 1.1 router
*/
Expand Down Expand Up @@ -112,11 +116,16 @@ data class GossipParams(
val seenTTL: Duration = 2.minutes,

/**
* [floodPublish] is a gossipsub router option that enables flood publishing.
* When this is enabled, published messages are forwarded to all peers with score >=
* to publishThreshold
* [floodPublishMaxMessageSizeThreshold] controls the maximum size (in bytes) a message will be
* published using flood publishing mode.
* When a message size is <= [floodPublishMaxMessageSizeThreshold], published messages are forwarded
* to all peers with score >= to [GossipScoreParams.publishThreshold]
*
* [NEVER_FLOOD_PUBLISH] and [ALWAYS_FLOOD_PUBLISH] can be used as shortcuts.
*
* The default is [NEVER_FLOOD_PUBLISH] (0 KiB).
*/
val floodPublish: Boolean = false,
val floodPublishMaxMessageSizeThreshold: Int = NEVER_FLOOD_PUBLISH,

/**
* [gossipFactor] affects how many peers we will emit gossip to at each heartbeat.
Expand Down Expand Up @@ -240,7 +249,7 @@ data class GossipParams(

/**
* [iDontWantMinMessageSizeThreshold] controls the minimum size (in bytes) that an incoming message needs to be so that an IDONTWANT message is sent to mesh peers.
* The default is 16 KB.
* The default is 16 KiB.
*/
val iDontWantMinMessageSizeThreshold: Int = 16384,

Expand All @@ -260,6 +269,8 @@ data class GossipParams(
check(DLow <= D, "DLow should be <= D")
check(DHigh >= D, "DHigh should be >= D")
check(gossipFactor in 0.0..1.0, "gossipFactor should be in range [0.0, 1.0]")
check(floodPublishMaxMessageSizeThreshold >= 0, "floodPublishMaxMessageSizeThreshold should be >= 0")
check(iDontWantMinMessageSizeThreshold >= 0, "iDontWantMinMessageSizeThreshold should be >= 0")
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,10 @@ open class GossipRouter(
override fun broadcastOutbound(msg: PubsubMessage): CompletableFuture<Unit> {
msg.topics.forEach { lastPublished[it] = currentTimeSupplier() }

val floodPublish = msg.size <= params.floodPublishMaxMessageSizeThreshold

val peers =
if (params.floodPublish) {
if (floodPublish) {
msg.topics
.flatMap { getTopicPeers(it) }
.filter { score.score(it.peerId) >= scoreParams.publishThreshold }
Expand Down Expand Up @@ -641,7 +643,7 @@ open class GossipRouter(

private fun iDontWant(msg: PubsubMessage, receivedFrom: PeerHandler? = null) {
if (!this.protocol.supportsIDontWant()) return
if (msg.protobufMessage.data.size() < params.iDontWantMinMessageSizeThreshold) return
if (msg.size < params.iDontWantMinMessageSizeThreshold) return
// we need to send IDONTWANT messages to mesh peers immediately in order for them to have an effect
msg.topics
.mapNotNull { mesh[it] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ class GossipParamsBuilder {

private var pruneBackoff: Duration? = null

private var floodPublish: Boolean? = null

private var gossipFactor: Double? = null

private var opportunisticGraftPeers: Int? = null
Expand Down Expand Up @@ -76,6 +74,8 @@ class GossipParamsBuilder {

private var iDontWantMinMessageSizeThreshold: Int? = null

private var floodPublishMaxMessageSizeThreshold: Int? = null

private var iDontWantTTL: Duration? = null

init {
Expand All @@ -90,7 +90,7 @@ class GossipParamsBuilder {
this.maxPeersSentInPruneMsg = source.maxPeersSentInPruneMsg
this.maxPeersAcceptedInPruneMsg = source.maxPeersAcceptedInPruneMsg
this.pruneBackoff = source.pruneBackoff
this.floodPublish = source.floodPublish
this.floodPublishMaxMessageSizeThreshold = source.floodPublishMaxMessageSizeThreshold
this.gossipFactor = source.gossipFactor
this.opportunisticGraftPeers = source.opportunisticGraftPeers
this.opportunisticGraftTicks = source.opportunisticGraftTicks
Expand Down Expand Up @@ -141,8 +141,6 @@ class GossipParamsBuilder {

fun pruneBackoff(value: Duration): GossipParamsBuilder = apply { pruneBackoff = value }

fun floodPublish(value: Boolean): GossipParamsBuilder = apply { floodPublish = value }

fun gossipFactor(value: Double): GossipParamsBuilder = apply { gossipFactor = value }

fun opportunisticGraftPeers(value: Int): GossipParamsBuilder = apply {
Expand Down Expand Up @@ -185,6 +183,8 @@ class GossipParamsBuilder {

fun iDontWantMinMessageSizeThreshold(value: Int): GossipParamsBuilder = apply { iDontWantMinMessageSizeThreshold = value }

fun floodPublishMaxMessageSizeThreshold(value: Int): GossipParamsBuilder = apply { floodPublishMaxMessageSizeThreshold = value }

fun iDontWantTTL(value: Duration): GossipParamsBuilder = apply { iDontWantTTL = value }

fun build(): GossipParams {
Expand All @@ -203,7 +203,7 @@ class GossipParamsBuilder {
gossipHistoryLength = gossipHistoryLength!!,
heartbeatInterval = heartbeatInterval!!,
seenTTL = seenTTL!!,
floodPublish = floodPublish!!,
floodPublishMaxMessageSizeThreshold = floodPublishMaxMessageSizeThreshold!!,
gossipFactor = gossipFactor!!,
opportunisticGraftPeers = opportunisticGraftPeers!!,
opportunisticGraftTicks = opportunisticGraftTicks!!,
Expand Down Expand Up @@ -252,7 +252,7 @@ class GossipParamsBuilder {
check(seenTTL != null, { "seenTTL must not be null" })
check(maxPeersSentInPruneMsg != null, { "maxPeersSentInPruneMsg must not be null" })
check(pruneBackoff != null, { "pruneBackoff must not be null" })
check(floodPublish != null, { "floodPublish must not be null" })
check(floodPublishMaxMessageSizeThreshold != null, { "floodPublishMaxMessageSizeThreshold must not be null" })
check(gossipFactor != null, { "gossipFactor must not be null" })
check(opportunisticGraftPeers != null, { "opportunisticGraftPeers must not be null" })
check(opportunisticGraftTicks != null, { "opportunisticGraftTicks must not be null" })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit

class GossipPubsubRouterTest : PubsubRouterTest(
createGossipFuzzRouterFactory {
GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublish = false))
GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublishMaxMessageSizeThreshold = NEVER_FLOOD_PUBLISH))
}
) {

Expand Down Expand Up @@ -59,7 +59,7 @@ class GossipPubsubRouterTest : PubsubRouterTest(
// this is to test ihave/iwant
fuzz.timeController.addTime(Duration.ofMillis(1))

val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublish = false)) }
val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublishMaxMessageSizeThreshold = NEVER_FLOOD_PUBLISH)) }
val routerCenter = fuzz.createTestGossipRouter(r)
allRouters.add(0, routerCenter)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,8 +531,9 @@ class GossipV1_1Tests : GossipTestsBase() {

@Test
fun testNotFloodPublish() {
val message = newMessage("topic1", 0L, "Hello-0".toByteArray())
val appScore = mutableMapOf<PeerId, Double>().withDefault { 0.0 }
val coreParams = GossipParams(3, 3, 3, floodPublish = false)
val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = message.size - 1)
val peerScoreParams = GossipPeerScoreParams(appSpecificScore = { appScore.getValue(it) })
val scoreParams = GossipScoreParams(peerScoreParams = peerScoreParams)
val test = ManyRoutersTest(params = coreParams, scoreParams = scoreParams)
Expand All @@ -546,7 +547,7 @@ class GossipV1_1Tests : GossipTestsBase() {
val topicMesh = test.gossipRouter.mesh["topic1"]!!
assertTrue(topicMesh.size > 0 && topicMesh.size < test.routers.size)

test.gossipRouter.publish(newMessage("topic1", 0L, "Hello-0".toByteArray()))
test.gossipRouter.publish(message)

test.fuzz.timeController.addTime(50.millis)

Expand All @@ -558,8 +559,9 @@ class GossipV1_1Tests : GossipTestsBase() {

@Test
fun testFloodPublish() {
val message = newMessage("topic1", 0L, "Hello-0".toByteArray())
val appScore = mutableMapOf<PeerId, Double>().withDefault { 0.0 }
val coreParams = GossipParams(3, 3, 3, floodPublish = true)
val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = message.size)
val peerScoreParams = GossipPeerScoreParams(
appSpecificScore = { appScore.getValue(it) },
appSpecificWeight = 1.0
Expand All @@ -581,7 +583,7 @@ class GossipV1_1Tests : GossipTestsBase() {
val topicMesh = test.gossipRouter.mesh["topic1"]!!.map { it.peerId }
assertTrue(topicMesh.size > 0 && topicMesh.size < test.routers.size)

test.gossipRouter.publish(newMessage("topic1", 0L, "Hello-0".toByteArray()))
test.gossipRouter.publish(message)

test.fuzz.timeController.addTime(50.millis)

Expand Down Expand Up @@ -651,7 +653,7 @@ class GossipV1_1Tests : GossipTestsBase() {
3,
3,
DLazy = 3,
floodPublish = false,
floodPublishMaxMessageSizeThreshold = 0,
gossipFactor = 0.5
)
val peerScoreParams = GossipPeerScoreParams(
Expand Down Expand Up @@ -715,7 +717,7 @@ class GossipV1_1Tests : GossipTestsBase() {
@Test
fun testOutboundMeshQuotas1() {
val appScore = mutableMapOf<PeerId, Double>().withDefault { 0.0 }
val coreParams = GossipParams(3, 3, 3, DLazy = 3, DOut = 1, floodPublish = false)
val coreParams = GossipParams(3, 3, 3, DLazy = 3, DOut = 1, floodPublishMaxMessageSizeThreshold = NEVER_FLOOD_PUBLISH)
val peerScoreParams = GossipPeerScoreParams(appSpecificScore = { appScore.getValue(it) })
val scoreParams = GossipScoreParams(peerScoreParams = peerScoreParams)
val test = ManyRoutersTest(params = coreParams, scoreParams = scoreParams)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow

class SubscriptionsLimitTest : TwoGossipHostTestBase() {
override val params = GossipParams(maxSubscriptions = 5, floodPublish = true)
override val params = GossipParams(maxSubscriptions = 5, floodPublishMaxMessageSizeThreshold = ALWAYS_FLOOD_PUBLISH)

@Test
fun `new peer subscribed to many topics`() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ val Eth2DefaultGossipParams = GossipParams(
DLazy = 8,

pruneBackoff = 1.minutes,
floodPublish = true,
floodPublishMaxMessageSizeThreshold = 16384,
gossipFactor = 0.25,
DScore = 4,
DOut = 2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ val Eth2DefaultGossipParams = GossipParams(
DLazy = 8,

pruneBackoff = 1.minutes,
floodPublish = true,
floodPublishMaxMessageSizeThreshold = 16384,
gossipFactor = 0.25,
DScore = 4,
DOut = 2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ class BlobDecouplingSimulation(
val randomSeed: Long = 3L,
val rnd: Random = Random(randomSeed),

val floodPublish: Boolean = true,

val sendingPeerBand: Bandwidth = Bandwidth.mbitsPerSec(100),

val peerBands: Iterator<Bandwidth> = iterator {
Expand Down Expand Up @@ -83,10 +81,6 @@ class BlobDecouplingSimulation(
)

val gossipParams = Eth2DefaultGossipParams
.copy(
// heartbeatInterval = 1.minutes
floodPublish = floodPublish
)
val gossipScoreParams = Eth2DefaultScoreParams
val gossipRouterCtor = { _: Int ->
SimGossipRouterBuilder().also {
Expand Down Expand Up @@ -294,7 +288,6 @@ fun main() {
// logger = {},
nodeCount = 1000,
peerBands = band,
floodPublish = false,
// randomSeed = 2
)

Expand Down

0 comments on commit 38b5e44

Please sign in to comment.