Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gossip: more reliable publishing #387

Merged
merged 9 commits into from
Oct 22, 2024
36 changes: 32 additions & 4 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -281,17 +281,21 @@ open class GossipRouter(
when {
isDirect(peer) ->
prune(peer, topic)

isBackOff(peer, topic) -> {
notifyRouterMisbehavior(peer, 1)
if (isBackOffFlood(peer, topic)) {
notifyRouterMisbehavior(peer, 1)
}
prune(peer, topic)
}

score.score(peer.peerId) < 0 ->
prune(peer, topic)

meshPeers.size >= params.DHigh && !peer.isOutbound() ->
prune(peer, topic)

peer !in meshPeers ->
graft(peer, topic)
}
Expand Down Expand Up @@ -410,11 +414,35 @@ open class GossipRouter(
.plus(getDirectPeers())
} else {
msg.topics
.mapNotNull { topic ->
mesh[topic] ?: fanout[topic] ?: getTopicPeers(topic).shuffled(random).take(params.D)
.also {
if (it.isNotEmpty()) fanout[topic] = it.toMutableSet()
.map { topic ->
val topicMeshPeers = mesh[topic]
if (topicMeshPeers != null) {
// we are subscribed to the topic
if (topicMeshPeers.size < params.D) {
// we need extra non-mesh peers for more reliable publishing
val nonMeshTopicPeers = getTopicPeers(topic) - topicMeshPeers
val (nonMeshTopicPeersAbovePublishThreshold, nonMeshTopicPeersBelowPublishThreshold) =
nonMeshTopicPeers.partition { score.score(it.peerId) >= scoreParams.publishThreshold }
// this deviates from the original spec but we want at least D peers for publishing
// prioritizing mesh peers, then non-mesh peers with acceptable score,
// and then underscored non-mesh peers as a last resort
listOf(
topicMeshPeers,
nonMeshTopicPeersAbovePublishThreshold.shuffled(random),
nonMeshTopicPeersBelowPublishThreshold.shuffled(random)
)
.flatten()
.take(params.D)
} else {
topicMeshPeers
}
} else {
// we are not subscribed to the topic
fanout[topic] ?: getTopicPeers(topic).shuffled(random).take(params.D)
.also {
if (it.isNotEmpty()) fanout[topic] = it.toMutableSet()
}
}
}
.flatten()
}
Expand Down
210 changes: 192 additions & 18 deletions libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ class GossipV1_1Tests : GossipTestsBase() {
super.initChannelWithHandler(streamHandler, handler)
}
}

val test = TwoRoutersTest(mockRouterFactory = { exec, _, _ -> MalformedMockRouter(exec) })
val mockRouter = test.router2.router as MalformedMockRouter

Expand Down Expand Up @@ -1128,34 +1129,207 @@ class GossipV1_1Tests : GossipTestsBase() {
// 2 heartbeats - the topic should be GRAFTed
test.fuzz.timeController.addTime(2.seconds)

fun createPruneMessage(peersCount: Int): Rpc.RPC {
val peerInfos = List(peersCount) {
Rpc.PeerInfo.newBuilder()
.setPeerID(PeerId.random().bytes.toProtobuf())
.setSignedPeerRecord(ByteString.EMPTY)
.build()
}
return Rpc.RPC.newBuilder().setControl(
Rpc.ControlMessage.newBuilder().addPrune(
Rpc.ControlPrune.newBuilder()
.setTopicID(topic)
.addAllPeers(peerInfos)
)
).build()
}

test.mockRouter.sendToSingle(
createPruneMessage(test.gossipRouter.params.maxPeersAcceptedInPruneMsg + 1)
createPruneMessage(topic, test.gossipRouter.params.maxPeersAcceptedInPruneMsg + 1)
)

// prune message should be dropped because too many peers
assertEquals(1, test.gossipRouter.mesh[topic]!!.size)

test.mockRouter.sendToSingle(
createPruneMessage(test.gossipRouter.params.maxPeersAcceptedInPruneMsg)
createPruneMessage(topic, test.gossipRouter.params.maxPeersAcceptedInPruneMsg)
)

// prune message should now be processed
assertEquals(0, test.gossipRouter.mesh[topic]!!.size)
}

@Test
fun `when a peer leaves the mesh it should still be considered for publishing`() {
val test = TwoRoutersTest()
val topic = "topic1"

test.mockRouter.subscribe(topic)
test.gossipRouter.subscribe(topic)

// 2 heartbeats - the topic should be GRAFTed
test.fuzz.timeController.addTime(2.seconds)

assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == 1)

// remote peer leaves the mesh
test.mockRouter.sendToSingle(createPruneMessage(topic))
test.fuzz.timeController.addTime(1.seconds)

assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == 0)

val message1 = newMessage(topic, 0L, "Hello-0".toByteArray())
test.gossipRouter.publish(message1)

test.mockRouter.waitForMessage { it.publishCount > 0 }
}

@Test
fun `should publish to all mesh peers when mesh exceeds D`() {
val gossipParams = GossipParams(D = 6, DHigh = 10)
val test = ManyRoutersTest(params = gossipParams, mockRouterCount = gossipParams.DHigh)
val topic = "topic1"
test.connectAll()

test.mockRouters.forEach {
it.subscribe(topic)
}
test.gossipRouter.subscribe(topic)

// 2 heartbeats - the topic should be GRAFTed
test.fuzz.timeController.addTime(2.seconds)

assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == gossipParams.D)

test.mockRouters.forEach {
it.sendToSingle(createGraftMessage(topic))
}

test.fuzz.timeController.addTime(2.seconds)

assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == gossipParams.DHigh)

// remote peer leaves the mesh
val message1 = newMessage(topic, 0L, "Hello-0".toByteArray())
test.gossipRouter.publish(message1)

val routerReceivedMessageCount =
test.mockRouters.count { mockRouter ->
mockRouter.inboundMessages.any { msg ->
msg.publishCount > 0
}
}

assertTrue(routerReceivedMessageCount == gossipParams.DHigh)
}

@Test
fun `publishing should collect at least D peers if mesh is smaller`() {
val params = GossipParams()

val test = ManyRoutersTest(params = params, mockRouterCount = params.D)
val topic = "topic1"
test.connectAll()

test.mockRouters.forEach { it.subscribe(topic) }
test.gossipRouter.subscribe(topic)

// 2 heartbeats - the topic should be GRAFTed
test.fuzz.timeController.addTime(2.seconds)

val topicMeshRouters = test.gossipRouter.mesh[topic]!!
assertTrue((topicMeshRouters.size) >= params.DLow)

// leave just 2 peers in the mesh
topicMeshRouters.drop(2)
.forEach {
test.getMockRouter(it.peerId).sendToSingle(createPruneMessage(topic))
}
test.fuzz.timeController.addTime(1.seconds)

assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == 2)

val message1 = newMessage(topic, 0L, "Hello-0".toByteArray())
test.gossipRouter.publish(message1)

val routerReceivedMessageCount =
test.mockRouters.count { mockRouter ->
mockRouter.inboundMessages.any { msg ->
msg.publishCount > 0
}
}

assertTrue(routerReceivedMessageCount >= params.D)
}

@Test
fun `publishing should collect at least D peers if mesh is smaller and prefer well scored peers`() {
val params = GossipParams()
val peerAppScores = mutableMapOf<PeerId, Int>()
val gossipScoreParams = GossipScoreParams(
peerScoreParams = GossipPeerScoreParams(
appSpecificScore = {
peerAppScores[it]?.toDouble() ?: 0.0
},
appSpecificWeight = 1.0
)
)

val test = ManyRoutersTest(params = params, scoreParams = gossipScoreParams, mockRouterCount = 10)
val topic = "topic1"
test.connectAll()

test.mockRouters.forEach { it.subscribe(topic) }
test.gossipRouter.subscribe(topic)

// 2 heartbeats - the topic should be GRAFTed
test.fuzz.timeController.addTime(2.seconds)

val topicMeshRouters = test.gossipRouter.mesh[topic]!!.toList()
assertTrue((topicMeshRouters.size) == params.D)

// leave just 2 peers in the mesh
topicMeshRouters.drop(2)
.forEach {
test.getMockRouter(it.peerId).sendToSingle(createPruneMessage(topic))
}
// downscore all peers except 5
val goodScoredPeers = topicMeshRouters.take(5).map { it.peerId }.toSet()
test.routers
.map { it.peerId }
.filter { it !in goodScoredPeers }
.forEach { peerAppScores[it] = -gossipScoreParams.publishThreshold.toInt() - 1 }

// for D = 6: 2 peers in the mesh + 3 peers outside of mesh + others are significantly downscored
test.fuzz.timeController.addTime(1.seconds)

assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == 2)

val message1 = newMessage(topic, 0L, "Hello-0".toByteArray())
test.gossipRouter.publish(message1)

// router should take 2 mesh peers, 3 well scored peers and 1 peer scored below publishThreshold
val peersReceivedMessage = test.routers
.filter {
val mockRouter = it.router as MockRouter
mockRouter.inboundMessages.any { msg ->
msg.publishCount > 0
}
}
.map { it.peerId }

assertTrue(peersReceivedMessage.size == params.D)
assertTrue(peersReceivedMessage.containsAll(goodScoredPeers))
}

private fun createGraftMessage(topic: String): Rpc.RPC {
return Rpc.RPC.newBuilder().setControl(
Rpc.ControlMessage.newBuilder().addGraft(
Rpc.ControlGraft.newBuilder()
.setTopicID(topic)
)
).build()
}

private fun createPruneMessage(topic: String, pxPeersCount: Int = 0): Rpc.RPC {
val peerInfos = List(pxPeersCount) {
Rpc.PeerInfo.newBuilder()
.setPeerID(PeerId.random().bytes.toProtobuf())
.setSignedPeerRecord(ByteString.EMPTY)
.build()
}
return Rpc.RPC.newBuilder().setControl(
Rpc.ControlMessage.newBuilder().addPrune(
Rpc.ControlPrune.newBuilder()
.setTopicID(topic)
.setBackoff(10)
.addAllPeers(peerInfos)
)
).build()
}
}
Loading