Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gossip: direct peers handling fix #398

Merged
merged 3 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 60 additions & 39 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ open class GossipRouter(
return currentTimeSupplier() < expire - (params.pruneBackoff + params.graftFloodThreshold).toMillis()
}

private fun getDirectPeers() = peers.filter(::isDirect)
private fun getDirectPeers(topic: Topic): List<PeerHandler> {
return getTopicPeers(topic).filter(::isDirect)
}
private fun isDirect(peer: PeerHandler) = scoreParams.peerScoreParams.isDirect(peer.peerId)
private fun isConnected(peerId: PeerId) = peers.any { it.peerId == peerId }

Expand Down Expand Up @@ -387,12 +389,18 @@ open class GossipRouter(

override fun broadcastInbound(msgs: List<PubsubMessage>, receivedFrom: PeerHandler) {
msgs.forEach { pubMsg ->
pubMsg.topics
val topics = pubMsg.topics
.asSequence()

val peersFromMesh = topics
.mapNotNull { mesh[it] }
.flatten()

val peersFromDirectPeers = topics.flatMap { getDirectPeers(it) }

peersFromDirectPeers
.plus(peersFromMesh)
.distinct()
.plus(getDirectPeers())
.minus(receivedFrom)
.filterNot { peerDoesNotWantMessage(it, pubMsg.messageId) }
.forEach { submitPublishMessage(it, pubMsg) }
Expand All @@ -408,43 +416,9 @@ open class GossipRouter(

val peers =
if (floodPublish) {
msg.topics
.flatMap { getTopicPeers(it) }
.filter { score.score(it.peerId) >= scoreParams.publishThreshold }
.plus(getDirectPeers())
selectPeersForOutboundBroadcastingInFloodPublish(msg)
} else {
msg.topics
.map { topic ->
val topicMeshPeers = mesh[topic]
if (topicMeshPeers != null) {
// we are subscribed to the topic
if (topicMeshPeers.size < params.D) {
// we need extra non-mesh peers for more reliable publishing
val nonMeshTopicPeers = getTopicPeers(topic) - topicMeshPeers
val (nonMeshTopicPeersAbovePublishThreshold, nonMeshTopicPeersBelowPublishThreshold) =
nonMeshTopicPeers.partition { score.score(it.peerId) >= scoreParams.publishThreshold }
// this deviates from the original spec but we want at least D peers for publishing
// prioritizing mesh peers, then non-mesh peers with acceptable score,
// and then underscored non-mesh peers as a last resort
listOf(
topicMeshPeers,
nonMeshTopicPeersAbovePublishThreshold.shuffled(random),
nonMeshTopicPeersBelowPublishThreshold.shuffled(random)
)
.flatten()
.take(params.D)
} else {
topicMeshPeers
}
} else {
// we are not subscribed to the topic
fanout[topic] ?: getTopicPeers(topic).shuffled(random).take(params.D)
.also {
if (it.isNotEmpty()) fanout[topic] = it.toMutableSet()
}
}
}
.flatten()
selectPeersForOutboundBroadcasting(msg)
}

mCache += msg
Expand All @@ -468,6 +442,53 @@ open class GossipRouter(
}
}

private fun selectPeersForOutboundBroadcastingInFloodPublish(msg: PubsubMessage): List<PeerHandler> {
return msg.topics
.flatMap { getTopicPeers(it) }
.filter { isDirect(it) || score.score(it.peerId) >= scoreParams.publishThreshold }
}

private fun selectPeersForOutboundBroadcasting(msg: PubsubMessage): List<PeerHandler> {
val fromMesh = msg.topics
.map { topic ->
val topicMeshPeers = mesh[topic]
if (topicMeshPeers != null) {
// we are subscribed to the topic
if (topicMeshPeers.size < params.D) {
// we need extra non-mesh peers for more reliable publishing
val nonMeshTopicPeers = getTopicPeers(topic) - topicMeshPeers
val (nonMeshTopicPeersAbovePublishThreshold, nonMeshTopicPeersBelowPublishThreshold) =
nonMeshTopicPeers.partition { score.score(it.peerId) >= scoreParams.publishThreshold }
// this deviates from the original spec but we want at least D peers for publishing
// prioritizing mesh peers, then non-mesh peers with acceptable score,
// and then underscored non-mesh peers as a last resort
listOf(
topicMeshPeers,
nonMeshTopicPeersAbovePublishThreshold.shuffled(random),
nonMeshTopicPeersBelowPublishThreshold.shuffled(random)
)
.flatten()
.take(params.D)
} else {
topicMeshPeers
}
} else {
// we are not subscribed to the topic
fanout[topic] ?: getTopicPeers(topic).shuffled(random).take(params.D)
.also {
if (it.isNotEmpty()) fanout[topic] = it.toMutableSet()
}
}
}
.flatten()

val fromDirectPeers = msg.topics.flatMap { getDirectPeers(it) }

return fromMesh
.plus(fromDirectPeers)
.distinct()
}

override fun subscribe(topic: Topic) {
super.subscribe(topic)
val fanoutPeers = (fanout[topic] ?: mutableSetOf())
Expand Down
108 changes: 104 additions & 4 deletions libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class GossipV1_1Tests : GossipTestsBase() {

val api = createPubsubApi(test.gossipRouter)
val apiMessages = mutableListOf<MessageApi>()
api.subscribe(Subscriber { apiMessages += it }, io.libp2p.core.pubsub.Topic("topic2"))
api.subscribe(Subscriber { apiMessages += it }, Topic("topic2"))

val msg1 = Rpc.RPC.newBuilder()
.addPublish(newProtoMessage("topic2", 0L, "Hello-1".toByteArray()))
Expand Down Expand Up @@ -131,7 +131,7 @@ class GossipV1_1Tests : GossipTestsBase() {

val api = createPubsubApi(test.gossipRouter)
val apiMessages = mutableListOf<MessageApi>()
api.subscribe(Subscriber { apiMessages += it }, io.libp2p.core.pubsub.Topic("topic1"))
api.subscribe(Subscriber { apiMessages += it }, Topic("topic1"))

val msg1 = Rpc.RPC.newBuilder()
.addPublish(newProtoMessage("topic1", 0L, "Hello-1".toByteArray()))
Expand Down Expand Up @@ -653,7 +653,7 @@ class GossipV1_1Tests : GossipTestsBase() {
3,
3,
DLazy = 3,
floodPublishMaxMessageSizeThreshold = 0,
floodPublishMaxMessageSizeThreshold = NEVER_FLOOD_PUBLISH,
gossipFactor = 0.5
)
val peerScoreParams = GossipPeerScoreParams(
Expand Down Expand Up @@ -936,7 +936,7 @@ class GossipV1_1Tests : GossipTestsBase() {
receivedMessages += it
validationResult
}
api.subscribe(slowValidator, io.libp2p.core.pubsub.Topic("topic1"))
api.subscribe(slowValidator, Topic("topic1"))
test.mockRouters.forEach { it.subscribe("topic1") }

val gossiper = test.mockRouters[0]
Expand Down Expand Up @@ -1307,6 +1307,106 @@ class GossipV1_1Tests : GossipTestsBase() {
assertTrue(peersReceivedMessage.containsAll(goodScoredPeers))
}

@Test
fun `should always flood publish to subscribed direct peers`() {
val message = newMessage("topic1", 0L, "Hello-0".toByteArray())
val appScore = mutableMapOf<PeerId, Double>().withDefault { 0.0 }
val directPeers = mutableSetOf<PeerId>()
val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = ALWAYS_FLOOD_PUBLISH)
val peerScoreParams = GossipPeerScoreParams(
appSpecificScore = { appScore.getValue(it) },
appSpecificWeight = 1.0,
isDirect = { directPeers.contains(it) }
)
val scoreParams = GossipScoreParams(
peerScoreParams = peerScoreParams,
graylistThreshold = -15.0,
publishThreshold = -10.0,
)
val test = ManyRoutersTest(mockRouterCount = 10, params = coreParams, scoreParams = scoreParams)
test.connectAll()

test.gossipRouter.subscribe("topic1")
test.routers.slice(0..5).forEach {
it.router.subscribe("topic1")
}

test.routers.slice(1..6).forEach {
directPeers.add(it.peerId)
}

// now only peers from 1 to 5 are direct peers subscribed to the topic

test.fuzz.timeController.addTime(2.seconds)

// let's down score all peers
test.routers.forEach {
appScore[it.peerId] = -20.0
}
test.gossipRouter.publish(message)

test.fuzz.timeController.addTime(50.millis)

val publishedCount = test.mockRouters.flatMap { it.inboundMessages }.count { it.publishCount > 0 }

// only subscribed direct peers should receive the message
assertEquals(5, publishedCount)
}

@Test
fun `should always publish to subscribed direct peers`() {
val message = newMessage("topic1", 0L, "Hello-0".toByteArray())
val appScore = mutableMapOf<PeerId, Double>().withDefault { 0.0 }
val directPeers = mutableSetOf<PeerId>()
val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = NEVER_FLOOD_PUBLISH)
val peerScoreParams = GossipPeerScoreParams(
appSpecificScore = { appScore.getValue(it) },
appSpecificWeight = 1.0,
isDirect = { directPeers.contains(it) }
)
val scoreParams = GossipScoreParams(
peerScoreParams = peerScoreParams,
graylistThreshold = -15.0,
publishThreshold = -10.0,
)
val test = ManyRoutersTest(mockRouterCount = 10, params = coreParams, scoreParams = scoreParams)
test.connectAll()

test.gossipRouter.subscribe("topic1")

test.routers.slice(0..5).forEach {
it.router.subscribe("topic1")
}
test.routers.slice(1..6).forEach {
directPeers.add(it.peerId)
}

// now only peers from 1 to 5 are direct peers subscribed to the topic
val subscribedDirectPeers = test.routers.slice(1..5).map { it.peerId }

test.fuzz.timeController.addTime(2.seconds)

// let's down score all direct peers
directPeers.forEach {
appScore[it] = -20.0
}

val topicMeshRouters = test.gossipRouter.mesh["topic1"]!!.toList()

// the mesh is strictly smaller than the number of subscribed direct peers
assertTrue(topicMeshRouters.size < subscribedDirectPeers.size)

val expectedPublishedCount = topicMeshRouters.map { it.peerId }.plus(subscribedDirectPeers).distinct().size

test.gossipRouter.publish(message)

test.fuzz.timeController.addTime(50.millis)

val publishedCount = test.mockRouters.flatMap { it.inboundMessages }.count { it.publishCount > 0 }

assertEquals(expectedPublishedCount, publishedCount)
}

private fun createGraftMessage(topic: String): Rpc.RPC {
return Rpc.RPC.newBuilder().setControl(
Rpc.ControlMessage.newBuilder().addGraft(
Expand Down
Loading