Skip to content

Commit

Permalink
Gossip: direct peers handling fix (#398)
Browse files Browse the repository at this point in the history
  • Loading branch information
tbenr authored Nov 5, 2024
1 parent 1cde874 commit 461d8c8
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 43 deletions.
99 changes: 60 additions & 39 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ open class GossipRouter(
return currentTimeSupplier() < expire - (params.pruneBackoff + params.graftFloodThreshold).toMillis()
}

private fun getDirectPeers() = peers.filter(::isDirect)
private fun getDirectPeers(topic: Topic): List<PeerHandler> {
return getTopicPeers(topic).filter(::isDirect)
}
private fun isDirect(peer: PeerHandler) = scoreParams.peerScoreParams.isDirect(peer.peerId)
private fun isConnected(peerId: PeerId) = peers.any { it.peerId == peerId }

Expand Down Expand Up @@ -387,12 +389,18 @@ open class GossipRouter(

override fun broadcastInbound(msgs: List<PubsubMessage>, receivedFrom: PeerHandler) {
msgs.forEach { pubMsg ->
pubMsg.topics
val topics = pubMsg.topics
.asSequence()

val peersFromMesh = topics
.mapNotNull { mesh[it] }
.flatten()

val peersFromDirectPeers = topics.flatMap { getDirectPeers(it) }

peersFromDirectPeers
.plus(peersFromMesh)
.distinct()
.plus(getDirectPeers())
.minus(receivedFrom)
.filterNot { peerDoesNotWantMessage(it, pubMsg.messageId) }
.forEach { submitPublishMessage(it, pubMsg) }
Expand All @@ -408,43 +416,9 @@ open class GossipRouter(

val peers =
if (floodPublish) {
msg.topics
.flatMap { getTopicPeers(it) }
.filter { score.score(it.peerId) >= scoreParams.publishThreshold }
.plus(getDirectPeers())
selectPeersForOutboundBroadcastingInFloodPublish(msg)
} else {
msg.topics
.map { topic ->
val topicMeshPeers = mesh[topic]
if (topicMeshPeers != null) {
// we are subscribed to the topic
if (topicMeshPeers.size < params.D) {
// we need extra non-mesh peers for more reliable publishing
val nonMeshTopicPeers = getTopicPeers(topic) - topicMeshPeers
val (nonMeshTopicPeersAbovePublishThreshold, nonMeshTopicPeersBelowPublishThreshold) =
nonMeshTopicPeers.partition { score.score(it.peerId) >= scoreParams.publishThreshold }
// this deviates from the original spec but we want at least D peers for publishing
// prioritizing mesh peers, then non-mesh peers with acceptable score,
// and then underscored non-mesh peers as a last resort
listOf(
topicMeshPeers,
nonMeshTopicPeersAbovePublishThreshold.shuffled(random),
nonMeshTopicPeersBelowPublishThreshold.shuffled(random)
)
.flatten()
.take(params.D)
} else {
topicMeshPeers
}
} else {
// we are not subscribed to the topic
fanout[topic] ?: getTopicPeers(topic).shuffled(random).take(params.D)
.also {
if (it.isNotEmpty()) fanout[topic] = it.toMutableSet()
}
}
}
.flatten()
selectPeersForOutboundBroadcasting(msg)
}

mCache += msg
Expand All @@ -468,6 +442,53 @@ open class GossipRouter(
}
}

private fun selectPeersForOutboundBroadcastingInFloodPublish(msg: PubsubMessage): List<PeerHandler> {
return msg.topics
.flatMap { getTopicPeers(it) }
.filter { isDirect(it) || score.score(it.peerId) >= scoreParams.publishThreshold }
}

private fun selectPeersForOutboundBroadcasting(msg: PubsubMessage): List<PeerHandler> {
val fromMesh = msg.topics
.map { topic ->
val topicMeshPeers = mesh[topic]
if (topicMeshPeers != null) {
// we are subscribed to the topic
if (topicMeshPeers.size < params.D) {
// we need extra non-mesh peers for more reliable publishing
val nonMeshTopicPeers = getTopicPeers(topic) - topicMeshPeers
val (nonMeshTopicPeersAbovePublishThreshold, nonMeshTopicPeersBelowPublishThreshold) =
nonMeshTopicPeers.partition { score.score(it.peerId) >= scoreParams.publishThreshold }
// this deviates from the original spec but we want at least D peers for publishing
// prioritizing mesh peers, then non-mesh peers with acceptable score,
// and then underscored non-mesh peers as a last resort
listOf(
topicMeshPeers,
nonMeshTopicPeersAbovePublishThreshold.shuffled(random),
nonMeshTopicPeersBelowPublishThreshold.shuffled(random)
)
.flatten()
.take(params.D)
} else {
topicMeshPeers
}
} else {
// we are not subscribed to the topic
fanout[topic] ?: getTopicPeers(topic).shuffled(random).take(params.D)
.also {
if (it.isNotEmpty()) fanout[topic] = it.toMutableSet()
}
}
}
.flatten()

val fromDirectPeers = msg.topics.flatMap { getDirectPeers(it) }

return fromMesh
.plus(fromDirectPeers)
.distinct()
}

override fun subscribe(topic: Topic) {
super.subscribe(topic)
val fanoutPeers = (fanout[topic] ?: mutableSetOf())
Expand Down
108 changes: 104 additions & 4 deletions libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class GossipV1_1Tests : GossipTestsBase() {

val api = createPubsubApi(test.gossipRouter)
val apiMessages = mutableListOf<MessageApi>()
api.subscribe(Subscriber { apiMessages += it }, io.libp2p.core.pubsub.Topic("topic2"))
api.subscribe(Subscriber { apiMessages += it }, Topic("topic2"))

val msg1 = Rpc.RPC.newBuilder()
.addPublish(newProtoMessage("topic2", 0L, "Hello-1".toByteArray()))
Expand Down Expand Up @@ -131,7 +131,7 @@ class GossipV1_1Tests : GossipTestsBase() {

val api = createPubsubApi(test.gossipRouter)
val apiMessages = mutableListOf<MessageApi>()
api.subscribe(Subscriber { apiMessages += it }, io.libp2p.core.pubsub.Topic("topic1"))
api.subscribe(Subscriber { apiMessages += it }, Topic("topic1"))

val msg1 = Rpc.RPC.newBuilder()
.addPublish(newProtoMessage("topic1", 0L, "Hello-1".toByteArray()))
Expand Down Expand Up @@ -653,7 +653,7 @@ class GossipV1_1Tests : GossipTestsBase() {
3,
3,
DLazy = 3,
floodPublishMaxMessageSizeThreshold = 0,
floodPublishMaxMessageSizeThreshold = NEVER_FLOOD_PUBLISH,
gossipFactor = 0.5
)
val peerScoreParams = GossipPeerScoreParams(
Expand Down Expand Up @@ -936,7 +936,7 @@ class GossipV1_1Tests : GossipTestsBase() {
receivedMessages += it
validationResult
}
api.subscribe(slowValidator, io.libp2p.core.pubsub.Topic("topic1"))
api.subscribe(slowValidator, Topic("topic1"))
test.mockRouters.forEach { it.subscribe("topic1") }

val gossiper = test.mockRouters[0]
Expand Down Expand Up @@ -1307,6 +1307,106 @@ class GossipV1_1Tests : GossipTestsBase() {
assertTrue(peersReceivedMessage.containsAll(goodScoredPeers))
}

@Test
fun `should always flood publish to subscribed direct peers`() {
val message = newMessage("topic1", 0L, "Hello-0".toByteArray())
val appScore = mutableMapOf<PeerId, Double>().withDefault { 0.0 }
val directPeers = mutableSetOf<PeerId>()
val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = ALWAYS_FLOOD_PUBLISH)
val peerScoreParams = GossipPeerScoreParams(
appSpecificScore = { appScore.getValue(it) },
appSpecificWeight = 1.0,
isDirect = { directPeers.contains(it) }
)
val scoreParams = GossipScoreParams(
peerScoreParams = peerScoreParams,
graylistThreshold = -15.0,
publishThreshold = -10.0,
)
val test = ManyRoutersTest(mockRouterCount = 10, params = coreParams, scoreParams = scoreParams)
test.connectAll()

test.gossipRouter.subscribe("topic1")
test.routers.slice(0..5).forEach {
it.router.subscribe("topic1")
}

test.routers.slice(1..6).forEach {
directPeers.add(it.peerId)
}

// now only peers from 1 to 5 are direct peers subscribed to the topic

test.fuzz.timeController.addTime(2.seconds)

// let's down score all peers
test.routers.forEach {
appScore[it.peerId] = -20.0
}
test.gossipRouter.publish(message)

test.fuzz.timeController.addTime(50.millis)

val publishedCount = test.mockRouters.flatMap { it.inboundMessages }.count { it.publishCount > 0 }

// only subscribed direct peers should receive the message
assertEquals(5, publishedCount)
}

@Test
fun `should always publish to subscribed direct peers`() {
val message = newMessage("topic1", 0L, "Hello-0".toByteArray())
val appScore = mutableMapOf<PeerId, Double>().withDefault { 0.0 }
val directPeers = mutableSetOf<PeerId>()
val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = NEVER_FLOOD_PUBLISH)
val peerScoreParams = GossipPeerScoreParams(
appSpecificScore = { appScore.getValue(it) },
appSpecificWeight = 1.0,
isDirect = { directPeers.contains(it) }
)
val scoreParams = GossipScoreParams(
peerScoreParams = peerScoreParams,
graylistThreshold = -15.0,
publishThreshold = -10.0,
)
val test = ManyRoutersTest(mockRouterCount = 10, params = coreParams, scoreParams = scoreParams)
test.connectAll()

test.gossipRouter.subscribe("topic1")

test.routers.slice(0..5).forEach {
it.router.subscribe("topic1")
}
test.routers.slice(1..6).forEach {
directPeers.add(it.peerId)
}

// now only peers from 1 to 5 are direct peers subscribed to the topic
val subscribedDirectPeers = test.routers.slice(1..5).map { it.peerId }

test.fuzz.timeController.addTime(2.seconds)

// let's down score all direct peers
directPeers.forEach {
appScore[it] = -20.0
}

val topicMeshRouters = test.gossipRouter.mesh["topic1"]!!.toList()

// the mesh is strictly smaller than the number of subscribed direct peers
assertTrue(topicMeshRouters.size < subscribedDirectPeers.size)

val expectedPublishedCount = topicMeshRouters.map { it.peerId }.plus(subscribedDirectPeers).distinct().size

test.gossipRouter.publish(message)

test.fuzz.timeController.addTime(50.millis)

val publishedCount = test.mockRouters.flatMap { it.inboundMessages }.count { it.publishCount > 0 }

assertEquals(expectedPublishedCount, publishedCount)
}

private fun createGraftMessage(topic: String): Rpc.RPC {
return Rpc.RPC.newBuilder().setControl(
Rpc.ControlMessage.newBuilder().addGraft(
Expand Down

0 comments on commit 461d8c8

Please sign in to comment.