diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt index fc03490f..1bd43193 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt @@ -281,6 +281,7 @@ open class GossipRouter( when { isDirect(peer) -> prune(peer, topic) + isBackOff(peer, topic) -> { notifyRouterMisbehavior(peer, 1) if (isBackOffFlood(peer, topic)) { @@ -288,10 +289,13 @@ open class GossipRouter( } prune(peer, topic) } + score.score(peer.peerId) < 0 -> prune(peer, topic) + meshPeers.size >= params.DHigh && !peer.isOutbound() -> prune(peer, topic) + peer !in meshPeers -> graft(peer, topic) } @@ -410,11 +414,35 @@ open class GossipRouter( .plus(getDirectPeers()) } else { msg.topics - .mapNotNull { topic -> - mesh[topic] ?: fanout[topic] ?: getTopicPeers(topic).shuffled(random).take(params.D) - .also { - if (it.isNotEmpty()) fanout[topic] = it.toMutableSet() + .map { topic -> + val topicMeshPeers = mesh[topic] + if (topicMeshPeers != null) { + // we are subscribed to the topic + if (topicMeshPeers.size < params.D) { + // we need extra non-mesh peers for more reliable publishing + val nonMeshTopicPeers = getTopicPeers(topic) - topicMeshPeers + val (nonMeshTopicPeersAbovePublishThreshold, nonMeshTopicPeersBelowPublishThreshold) = + nonMeshTopicPeers.partition { score.score(it.peerId) >= scoreParams.publishThreshold } + // this deviates from the original spec but we want at least D peers for publishing + // prioritizing mesh peers, then non-mesh peers with acceptable score, + // and then underscored non-mesh peers as a last resort + listOf( + topicMeshPeers, + nonMeshTopicPeersAbovePublishThreshold.shuffled(random), + nonMeshTopicPeersBelowPublishThreshold.shuffled(random) + ) + .flatten() + .take(params.D) + } else { + topicMeshPeers } + } else { + // we are not subscribed to the topic + fanout[topic] ?: getTopicPeers(topic).shuffled(random).take(params.D) + .also { + if (it.isNotEmpty()) fanout[topic] = it.toMutableSet() + } + } } .flatten() } diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt index 0ef84747..dc45bf63 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt @@ -125,6 +125,7 @@ class GossipV1_1Tests : GossipTestsBase() { super.initChannelWithHandler(streamHandler, handler) } } + val test = TwoRoutersTest(mockRouterFactory = { exec, _, _ -> MalformedMockRouter(exec) }) val mockRouter = test.router2.router as MalformedMockRouter @@ -1128,34 +1129,207 @@ class GossipV1_1Tests : GossipTestsBase() { // 2 heartbeats - the topic should be GRAFTed test.fuzz.timeController.addTime(2.seconds) - fun createPruneMessage(peersCount: Int): Rpc.RPC { - val peerInfos = List(peersCount) { - Rpc.PeerInfo.newBuilder() - .setPeerID(PeerId.random().bytes.toProtobuf()) - .setSignedPeerRecord(ByteString.EMPTY) - .build() - } - return Rpc.RPC.newBuilder().setControl( - Rpc.ControlMessage.newBuilder().addPrune( - Rpc.ControlPrune.newBuilder() - .setTopicID(topic) - .addAllPeers(peerInfos) - ) - ).build() - } - test.mockRouter.sendToSingle( - createPruneMessage(test.gossipRouter.params.maxPeersAcceptedInPruneMsg + 1) + createPruneMessage(topic, test.gossipRouter.params.maxPeersAcceptedInPruneMsg + 1) ) // prune message should be dropped because too many peers assertEquals(1, test.gossipRouter.mesh[topic]!!.size) test.mockRouter.sendToSingle( - createPruneMessage(test.gossipRouter.params.maxPeersAcceptedInPruneMsg) + createPruneMessage(topic, test.gossipRouter.params.maxPeersAcceptedInPruneMsg) ) // prune message should now be processed assertEquals(0, test.gossipRouter.mesh[topic]!!.size) } + + @Test + fun `when a peer leaves the mesh it should still be considered for publishing`() { + val test = TwoRoutersTest() + val topic = "topic1" + + test.mockRouter.subscribe(topic) + test.gossipRouter.subscribe(topic) + + // 2 heartbeats - the topic should be GRAFTed + test.fuzz.timeController.addTime(2.seconds) + + assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == 1) + + // remote peer leaves the mesh + test.mockRouter.sendToSingle(createPruneMessage(topic)) + test.fuzz.timeController.addTime(1.seconds) + + assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == 0) + + val message1 = newMessage(topic, 0L, "Hello-0".toByteArray()) + test.gossipRouter.publish(message1) + + test.mockRouter.waitForMessage { it.publishCount > 0 } + } + + @Test + fun `should publish to all mesh peers when mesh exceeds D`() { + val gossipParams = GossipParams(D = 6, DHigh = 10) + val test = ManyRoutersTest(params = gossipParams, mockRouterCount = gossipParams.DHigh) + val topic = "topic1" + test.connectAll() + + test.mockRouters.forEach { + it.subscribe(topic) + } + test.gossipRouter.subscribe(topic) + + // 2 heartbeats - the topic should be GRAFTed + test.fuzz.timeController.addTime(2.seconds) + + assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == gossipParams.D) + + test.mockRouters.forEach { + it.sendToSingle(createGraftMessage(topic)) + } + + test.fuzz.timeController.addTime(2.seconds) + + assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == gossipParams.DHigh) + + // remote peer leaves the mesh + val message1 = newMessage(topic, 0L, "Hello-0".toByteArray()) + test.gossipRouter.publish(message1) + + val routerReceivedMessageCount = + test.mockRouters.count { mockRouter -> + mockRouter.inboundMessages.any { msg -> + msg.publishCount > 0 + } + } + + assertTrue(routerReceivedMessageCount == gossipParams.DHigh) + } + + @Test + fun `publishing should collect at least D peers if mesh is smaller`() { + val params = GossipParams() + + val test = ManyRoutersTest(params = params, mockRouterCount = params.D) + val topic = "topic1" + test.connectAll() + + test.mockRouters.forEach { it.subscribe(topic) } + test.gossipRouter.subscribe(topic) + + // 2 heartbeats - the topic should be GRAFTed + test.fuzz.timeController.addTime(2.seconds) + + val topicMeshRouters = test.gossipRouter.mesh[topic]!! + assertTrue((topicMeshRouters.size) >= params.DLow) + + // leave just 2 peers in the mesh + topicMeshRouters.drop(2) + .forEach { + test.getMockRouter(it.peerId).sendToSingle(createPruneMessage(topic)) + } + test.fuzz.timeController.addTime(1.seconds) + + assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == 2) + + val message1 = newMessage(topic, 0L, "Hello-0".toByteArray()) + test.gossipRouter.publish(message1) + + val routerReceivedMessageCount = + test.mockRouters.count { mockRouter -> + mockRouter.inboundMessages.any { msg -> + msg.publishCount > 0 + } + } + + assertTrue(routerReceivedMessageCount >= params.D) + } + + @Test + fun `publishing should collect at least D peers if mesh is smaller and prefer well scored peers`() { + val params = GossipParams() + val peerAppScores = mutableMapOf() + val gossipScoreParams = GossipScoreParams( + peerScoreParams = GossipPeerScoreParams( + appSpecificScore = { + peerAppScores[it]?.toDouble() ?: 0.0 + }, + appSpecificWeight = 1.0 + ) + ) + + val test = ManyRoutersTest(params = params, scoreParams = gossipScoreParams, mockRouterCount = 10) + val topic = "topic1" + test.connectAll() + + test.mockRouters.forEach { it.subscribe(topic) } + test.gossipRouter.subscribe(topic) + + // 2 heartbeats - the topic should be GRAFTed + test.fuzz.timeController.addTime(2.seconds) + + val topicMeshRouters = test.gossipRouter.mesh[topic]!!.toList() + assertTrue((topicMeshRouters.size) == params.D) + + // leave just 2 peers in the mesh + topicMeshRouters.drop(2) + .forEach { + test.getMockRouter(it.peerId).sendToSingle(createPruneMessage(topic)) + } + // downscore all peers except 5 + val goodScoredPeers = topicMeshRouters.take(5).map { it.peerId }.toSet() + test.routers + .map { it.peerId } + .filter { it !in goodScoredPeers } + .forEach { peerAppScores[it] = -gossipScoreParams.publishThreshold.toInt() - 1 } + + // for D = 6: 2 peers in the mesh + 3 peers outside of mesh + others are significantly downscored + test.fuzz.timeController.addTime(1.seconds) + + assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == 2) + + val message1 = newMessage(topic, 0L, "Hello-0".toByteArray()) + test.gossipRouter.publish(message1) + + // router should take 2 mesh peers, 3 well scored peers and 1 peer scored below publishThreshold + val peersReceivedMessage = test.routers + .filter { + val mockRouter = it.router as MockRouter + mockRouter.inboundMessages.any { msg -> + msg.publishCount > 0 + } + } + .map { it.peerId } + + assertTrue(peersReceivedMessage.size == params.D) + assertTrue(peersReceivedMessage.containsAll(goodScoredPeers)) + } + + private fun createGraftMessage(topic: String): Rpc.RPC { + return Rpc.RPC.newBuilder().setControl( + Rpc.ControlMessage.newBuilder().addGraft( + Rpc.ControlGraft.newBuilder() + .setTopicID(topic) + ) + ).build() + } + + private fun createPruneMessage(topic: String, pxPeersCount: Int = 0): Rpc.RPC { + val peerInfos = List(pxPeersCount) { + Rpc.PeerInfo.newBuilder() + .setPeerID(PeerId.random().bytes.toProtobuf()) + .setSignedPeerRecord(ByteString.EMPTY) + .build() + } + return Rpc.RPC.newBuilder().setControl( + Rpc.ControlMessage.newBuilder().addPrune( + Rpc.ControlPrune.newBuilder() + .setTopicID(topic) + .setBackoff(10) + .addAllPeers(peerInfos) + ) + ).build() + } }