Skip to content

Commit

Permalink
When publishing prioritize back up non-mesh peers with acceptable score
Browse files Browse the repository at this point in the history
  • Loading branch information
Nashatyrev committed Oct 21, 2024
1 parent 1defc23 commit c0fc108
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 2 deletions.
16 changes: 14 additions & 2 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -281,17 +281,21 @@ open class GossipRouter(
when {
isDirect(peer) ->
prune(peer, topic)

isBackOff(peer, topic) -> {
notifyRouterMisbehavior(peer, 1)
if (isBackOffFlood(peer, topic)) {
notifyRouterMisbehavior(peer, 1)
}
prune(peer, topic)
}

score.score(peer.peerId) < 0 ->
prune(peer, topic)

meshPeers.size >= params.DHigh && !peer.isOutbound() ->
prune(peer, topic)

peer !in meshPeers ->
graft(peer, topic)
}
Expand Down Expand Up @@ -412,9 +416,17 @@ open class GossipRouter(
val topicMeshPeers = mesh[topic]
if (topicMeshPeers != null) {
// we are subscribed to the topic
val addFromNonMeshCount = max(0, params.D - topicMeshPeers.size)
val nonMeshTopicPeers = getTopicPeers(topic) - topicMeshPeers
topicMeshPeers + nonMeshTopicPeers.shuffled(random).take(addFromNonMeshCount)
val (nonMeshTopicPeersAbovePublishThreshold, nonMeshTopicPeersBelowPublishThreshold) =
nonMeshTopicPeers.partition { score.score(it.peerId) >= scoreParams.publishThreshold }
// this deviates from the original spec but we want at least D peers for publishing
// prioritizing mesh peers, then non-mesh peers with acceptable score,
// and then underscored non-mesh peers as a last resort
(
topicMeshPeers +
nonMeshTopicPeersAbovePublishThreshold.shuffled(random) +
nonMeshTopicPeersBelowPublishThreshold.shuffled(random)
).take(params.D)
} else {
// we are not subscribed to the topic
fanout[topic] ?: getTopicPeers(topic).shuffled(random).take(params.D)
Expand Down
60 changes: 60 additions & 0 deletions libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -1206,6 +1206,66 @@ class GossipV1_1Tests : GossipTestsBase() {
assertTrue(routerReceivedMessageCount >= params.D)
}

@Test
fun `publishing should collect at least D peers if mesh is smaller and prefer well scored peers`() {
val params = GossipParams()
val peerAppScores = mutableMapOf<PeerId, Int>()
val gossipScoreParams = GossipScoreParams(
peerScoreParams = GossipPeerScoreParams(
appSpecificScore = {
peerAppScores[it]?.toDouble() ?: 0.0
},
appSpecificWeight = 1.0
)
)

val test = ManyRoutersTest(params = params, scoreParams = gossipScoreParams, mockRouterCount = 10)
val topic = "topic1"
test.connectAll()

test.mockRouters.forEach { it.subscribe(topic) }
test.gossipRouter.subscribe(topic)

// 2 heartbeats - the topic should be GRAFTed
test.fuzz.timeController.addTime(2.seconds)

val topicMeshRouters = test.gossipRouter.mesh[topic]!!.toList()
assertTrue((topicMeshRouters.size) == params.D)

// leave just 2 peers in the mesh
topicMeshRouters.drop(2)
.forEach {
test.getMockRouter(it.peerId).sendToSingle(createPruneMessage(topic))
}
// downscore all peers except 5
val goodScoredPeers = topicMeshRouters.take(5).map { it.peerId }.toSet()
test.routers
.map { it.peerId }
.filter { it !in goodScoredPeers }
.forEach { peerAppScores[it] = -gossipScoreParams.publishThreshold.toInt() - 1 }

// for D = 6: 2 peers in the mesh + 3 peers outside of mesh + others are significantly downscored
test.fuzz.timeController.addTime(1.seconds)

assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == 2)

val message1 = newMessage(topic, 0L, "Hello-0".toByteArray())
test.gossipRouter.publish(message1)

// router should take 2 mesh peers, 3 well scored peers and 1 peer scored below publishThreshold
val peersReceivedMessage = test.routers
.filter {
val mockRouter = it.router as MockRouter
mockRouter.inboundMessages.any { msg ->
msg.publishCount > 0
}
}
.map { it.peerId }

assertTrue(peersReceivedMessage.size == params.D)
assertTrue(peersReceivedMessage.containsAll(goodScoredPeers))
}

private fun createPruneMessage(topic: String, pxPeersCount: Int = 0): Rpc.RPC {
val peerInfos = List(pxPeersCount) {
Rpc.PeerInfo.newBuilder()
Expand Down

0 comments on commit c0fc108

Please sign in to comment.