diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt index f5e9c45d..7843564a 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt @@ -416,17 +416,24 @@ open class GossipRouter( val topicMeshPeers = mesh[topic] if (topicMeshPeers != null) { // we are subscribed to the topic - val nonMeshTopicPeers = getTopicPeers(topic) - topicMeshPeers - val (nonMeshTopicPeersAbovePublishThreshold, nonMeshTopicPeersBelowPublishThreshold) = - nonMeshTopicPeers.partition { score.score(it.peerId) >= scoreParams.publishThreshold } - // this deviates from the original spec but we want at least D peers for publishing - // prioritizing mesh peers, then non-mesh peers with acceptable score, - // and then underscored non-mesh peers as a last resort - ( - topicMeshPeers + - nonMeshTopicPeersAbovePublishThreshold.shuffled(random) + + if (topicMeshPeers.size < params.D) { + // we need extra non-mesh peers for more reliable publishing + val nonMeshTopicPeers = getTopicPeers(topic) - topicMeshPeers + val (nonMeshTopicPeersAbovePublishThreshold, nonMeshTopicPeersBelowPublishThreshold) = + nonMeshTopicPeers.partition { score.score(it.peerId) >= scoreParams.publishThreshold } + // this deviates from the original spec but we want at least D peers for publishing + // prioritizing mesh peers, then non-mesh peers with acceptable score, + // and then underscored non-mesh peers as a last resort + listOf( + topicMeshPeers, + nonMeshTopicPeersAbovePublishThreshold.shuffled(random), nonMeshTopicPeersBelowPublishThreshold.shuffled(random) - ).take(params.D) + ) + .flatten() + .take(params.D) + } else { + topicMeshPeers + } } else { // we are not subscribed to the topic fanout[topic] ?: getTopicPeers(topic).shuffled(random).take(params.D) diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt index ec2c98ad..9c33c948 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt @@ -1167,6 +1167,46 @@ class GossipV1_1Tests : GossipTestsBase() { test.mockRouter.waitForMessage { it.publishCount > 0 } } + @Test + fun `should publish to all mesh peers when mesh exceeds D`() { + val gossipParams = GossipParams(D = 6, DHigh = 10) + val test = ManyRoutersTest(params = gossipParams, mockRouterCount = gossipParams.DHigh) + val topic = "topic1" + test.connectAll() + + test.mockRouters.forEach { + it.subscribe(topic) +// it.sendToSingle(createGraftMessage(topic)) + } + test.gossipRouter.subscribe(topic) + + // 2 heartbeats - the topic should be GRAFTed + test.fuzz.timeController.addTime(2.seconds) + + assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == gossipParams.D) + + test.mockRouters.forEach { + it.sendToSingle(createGraftMessage(topic)) + } + + test.fuzz.timeController.addTime(2.seconds) + + assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == gossipParams.DHigh) + + // remote peer leaves the mesh + val message1 = newMessage(topic, 0L, "Hello-0".toByteArray()) + test.gossipRouter.publish(message1) + + val routerReceivedMessageCount = + test.mockRouters.count { mockRouter -> + mockRouter.inboundMessages.any { msg -> + msg.publishCount > 0 + } + } + + assertTrue(routerReceivedMessageCount == gossipParams.DHigh) + } + @Test fun `publishing should collect at least D peers if mesh is smaller`() { val params = GossipParams() @@ -1266,6 +1306,15 @@ class GossipV1_1Tests : GossipTestsBase() { assertTrue(peersReceivedMessage.containsAll(goodScoredPeers)) } + private fun createGraftMessage(topic: String): Rpc.RPC { + return Rpc.RPC.newBuilder().setControl( + Rpc.ControlMessage.newBuilder().addGraft( + Rpc.ControlGraft.newBuilder() + .setTopicID(topic) + ) + ).build() + } + private fun createPruneMessage(topic: String, pxPeersCount: Int = 0): Rpc.RPC { val peerInfos = List(pxPeersCount) { Rpc.PeerInfo.newBuilder()