From c0fc1084e73c893077d2246e918a64ec6e67fb01 Mon Sep 17 00:00:00 2001 From: Anton Nashatyrev Date: Mon, 21 Oct 2024 17:51:16 +0400 Subject: [PATCH] When publishing prioritize back up non-mesh peers with acceptable score --- .../io/libp2p/pubsub/gossip/GossipRouter.kt | 16 ++++- .../libp2p/pubsub/gossip/GossipV1_1Tests.kt | 60 +++++++++++++++++++ 2 files changed, 74 insertions(+), 2 deletions(-) diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt index 3a9f5c9a..f5e9c45d 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt @@ -281,6 +281,7 @@ open class GossipRouter( when { isDirect(peer) -> prune(peer, topic) + isBackOff(peer, topic) -> { notifyRouterMisbehavior(peer, 1) if (isBackOffFlood(peer, topic)) { @@ -288,10 +289,13 @@ open class GossipRouter( } prune(peer, topic) } + score.score(peer.peerId) < 0 -> prune(peer, topic) + meshPeers.size >= params.DHigh && !peer.isOutbound() -> prune(peer, topic) + peer !in meshPeers -> graft(peer, topic) } @@ -412,9 +416,17 @@ open class GossipRouter( val topicMeshPeers = mesh[topic] if (topicMeshPeers != null) { // we are subscribed to the topic - val addFromNonMeshCount = max(0, params.D - topicMeshPeers.size) val nonMeshTopicPeers = getTopicPeers(topic) - topicMeshPeers - topicMeshPeers + nonMeshTopicPeers.shuffled(random).take(addFromNonMeshCount) + val (nonMeshTopicPeersAbovePublishThreshold, nonMeshTopicPeersBelowPublishThreshold) = + nonMeshTopicPeers.partition { score.score(it.peerId) >= scoreParams.publishThreshold } + // this deviates from the original spec but we want at least D peers for publishing + // prioritizing mesh peers, then non-mesh peers with acceptable score, + // and then underscored non-mesh peers as a last resort + ( + topicMeshPeers + + nonMeshTopicPeersAbovePublishThreshold.shuffled(random) + + nonMeshTopicPeersBelowPublishThreshold.shuffled(random) + ).take(params.D) } else { // we are not subscribed to the topic fanout[topic] ?: getTopicPeers(topic).shuffled(random).take(params.D) diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt index cf4cb03b..ec2c98ad 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt @@ -1206,6 +1206,66 @@ class GossipV1_1Tests : GossipTestsBase() { assertTrue(routerReceivedMessageCount >= params.D) } + @Test + fun `publishing should collect at least D peers if mesh is smaller and prefer well scored peers`() { + val params = GossipParams() + val peerAppScores = mutableMapOf() + val gossipScoreParams = GossipScoreParams( + peerScoreParams = GossipPeerScoreParams( + appSpecificScore = { + peerAppScores[it]?.toDouble() ?: 0.0 + }, + appSpecificWeight = 1.0 + ) + ) + + val test = ManyRoutersTest(params = params, scoreParams = gossipScoreParams, mockRouterCount = 10) + val topic = "topic1" + test.connectAll() + + test.mockRouters.forEach { it.subscribe(topic) } + test.gossipRouter.subscribe(topic) + + // 2 heartbeats - the topic should be GRAFTed + test.fuzz.timeController.addTime(2.seconds) + + val topicMeshRouters = test.gossipRouter.mesh[topic]!!.toList() + assertTrue((topicMeshRouters.size) == params.D) + + // leave just 2 peers in the mesh + topicMeshRouters.drop(2) + .forEach { + test.getMockRouter(it.peerId).sendToSingle(createPruneMessage(topic)) + } + // downscore all peers except 5 + val goodScoredPeers = topicMeshRouters.take(5).map { it.peerId }.toSet() + test.routers + .map { it.peerId } + .filter { it !in goodScoredPeers } + .forEach { peerAppScores[it] = -gossipScoreParams.publishThreshold.toInt() - 1 } + + // for D = 6: 2 peers in the mesh + 3 peers outside of mesh + others are significantly downscored + test.fuzz.timeController.addTime(1.seconds) + + assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == 2) + + val message1 = newMessage(topic, 0L, "Hello-0".toByteArray()) + test.gossipRouter.publish(message1) + + // router should take 2 mesh peers, 3 well scored peers and 1 peer scored below publishThreshold + val peersReceivedMessage = test.routers + .filter { + val mockRouter = it.router as MockRouter + mockRouter.inboundMessages.any { msg -> + msg.publishCount > 0 + } + } + .map { it.peerId } + + assertTrue(peersReceivedMessage.size == params.D) + assertTrue(peersReceivedMessage.containsAll(goodScoredPeers)) + } + private fun createPruneMessage(topic: String, pxPeersCount: Int = 0): Rpc.RPC { val peerInfos = List(pxPeersCount) { Rpc.PeerInfo.newBuilder()