Skip to content

Commit

Permalink
Fix regression, add corresponding test
Browse files Browse the repository at this point in the history
  • Loading branch information
Nashatyrev committed Oct 21, 2024
1 parent c0fc108 commit f4313a8
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 10 deletions.
27 changes: 17 additions & 10 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -416,17 +416,24 @@ open class GossipRouter(
val topicMeshPeers = mesh[topic]
if (topicMeshPeers != null) {
// we are subscribed to the topic
val nonMeshTopicPeers = getTopicPeers(topic) - topicMeshPeers
val (nonMeshTopicPeersAbovePublishThreshold, nonMeshTopicPeersBelowPublishThreshold) =
nonMeshTopicPeers.partition { score.score(it.peerId) >= scoreParams.publishThreshold }
// this deviates from the original spec but we want at least D peers for publishing
// prioritizing mesh peers, then non-mesh peers with acceptable score,
// and then underscored non-mesh peers as a last resort
(
topicMeshPeers +
nonMeshTopicPeersAbovePublishThreshold.shuffled(random) +
if (topicMeshPeers.size < params.D) {
// we need extra non-mesh peers for more reliable publishing
val nonMeshTopicPeers = getTopicPeers(topic) - topicMeshPeers
val (nonMeshTopicPeersAbovePublishThreshold, nonMeshTopicPeersBelowPublishThreshold) =
nonMeshTopicPeers.partition { score.score(it.peerId) >= scoreParams.publishThreshold }
// this deviates from the original spec but we want at least D peers for publishing
// prioritizing mesh peers, then non-mesh peers with acceptable score,
// and then underscored non-mesh peers as a last resort
listOf(
topicMeshPeers,
nonMeshTopicPeersAbovePublishThreshold.shuffled(random),
nonMeshTopicPeersBelowPublishThreshold.shuffled(random)
).take(params.D)
)
.flatten()
.take(params.D)
} else {
topicMeshPeers
}
} else {
// we are not subscribed to the topic
fanout[topic] ?: getTopicPeers(topic).shuffled(random).take(params.D)
Expand Down
49 changes: 49 additions & 0 deletions libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,46 @@ class GossipV1_1Tests : GossipTestsBase() {
test.mockRouter.waitForMessage { it.publishCount > 0 }
}

@Test
fun `should publish to all mesh peers when mesh exceeds D`() {
val gossipParams = GossipParams(D = 6, DHigh = 10)
val test = ManyRoutersTest(params = gossipParams, mockRouterCount = gossipParams.DHigh)
val topic = "topic1"
test.connectAll()

test.mockRouters.forEach {
it.subscribe(topic)
// it.sendToSingle(createGraftMessage(topic))
}
test.gossipRouter.subscribe(topic)

// 2 heartbeats - the topic should be GRAFTed
test.fuzz.timeController.addTime(2.seconds)

assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == gossipParams.D)

test.mockRouters.forEach {
it.sendToSingle(createGraftMessage(topic))
}

test.fuzz.timeController.addTime(2.seconds)

assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == gossipParams.DHigh)

// remote peer leaves the mesh
val message1 = newMessage(topic, 0L, "Hello-0".toByteArray())
test.gossipRouter.publish(message1)

val routerReceivedMessageCount =
test.mockRouters.count { mockRouter ->
mockRouter.inboundMessages.any { msg ->
msg.publishCount > 0
}
}

assertTrue(routerReceivedMessageCount == gossipParams.DHigh)
}

@Test
fun `publishing should collect at least D peers if mesh is smaller`() {
val params = GossipParams()
Expand Down Expand Up @@ -1266,6 +1306,15 @@ class GossipV1_1Tests : GossipTestsBase() {
assertTrue(peersReceivedMessage.containsAll(goodScoredPeers))
}

private fun createGraftMessage(topic: String): Rpc.RPC {
return Rpc.RPC.newBuilder().setControl(
Rpc.ControlMessage.newBuilder().addGraft(
Rpc.ControlGraft.newBuilder()
.setTopicID(topic)
)
).build()
}

private fun createPruneMessage(topic: String, pxPeersCount: Int = 0): Rpc.RPC {
val peerInfos = List(pxPeersCount) {
Rpc.PeerInfo.newBuilder()
Expand Down

0 comments on commit f4313a8

Please sign in to comment.