Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.2.1 release #394

Merged
merged 12 commits into from
Oct 24, 2024
27 changes: 17 additions & 10 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ abstract class AbstractRouter(

/**
* Flushes all pending message parts for all peers
* @see addPendingRpcPart
*/
protected fun flushAllPending() {
pendingRpcParts.pendingPeers.forEach(::flushPending)
Expand Down Expand Up @@ -163,7 +162,7 @@ abstract class AbstractRouter(

// Validate message
if (!validateMessageListLimits(msg)) {
logger.debug("Dropping msg with lists exceeding limits from peer $peer")
logger.debug("Dropping msg with lists exceeding limits from peer {}", peer)
return
}

Expand All @@ -173,7 +172,7 @@ abstract class AbstractRouter(
.filterIncomingSubscriptions(subscriptions, peersTopics.getByFirst(peer))
.forEach { handleMessageSubscriptions(peer, it) }
} catch (e: Exception) {
logger.debug("Subscription filter error, ignoring message from peer $peer", e)
logger.debug("Subscription filter error, ignoring message from peer {}", peer, e)
return
}

Expand All @@ -182,7 +181,7 @@ abstract class AbstractRouter(
}

val (msgSubscribed, nonSubscribed) = msg.publishList
.partition { it.topicIDsList.any { it in subscribedTopics } }
.partition { rpcMsg -> rpcMsg.topicIDsList.any { it in subscribedTopics } }

nonSubscribed.forEach { notifyNonSubscribedMessage(peer, it) }

Expand All @@ -207,7 +206,7 @@ abstract class AbstractRouter(
messageValidator.validate(it)
true
} catch (e: Exception) {
logger.debug("Invalid pubsub message from peer $peer: $it", e)
logger.debug("Invalid pubsub message from peer {}: {}", peer, it, e)
seenMessages[it] = Optional.of(ValidationResult.Invalid)
notifyUnseenInvalidMessage(peer, it)
false
Expand Down Expand Up @@ -248,8 +247,16 @@ abstract class AbstractRouter(
{ res, err ->
when {
err != null -> logger.warn("Exception while handling message from peer $peer: ${it.first}", err)
res == ValidationResult.Invalid -> logger.debug("Invalid pubsub message from peer $peer: ${it.first}")
res == ValidationResult.Ignore -> logger.trace("Ignoring pubsub message from peer $peer: ${it.first}")
res == ValidationResult.Invalid -> logger.debug(
"Invalid pubsub message from peer {}: {}",
peer,
it.first
)
res == ValidationResult.Ignore -> logger.trace(
"Ignoring pubsub message from peer {}: {}",
peer,
it.first
)
else -> {
newValidatedMessages(singletonList(it.first), peer)
flushAllPending()
Expand All @@ -273,15 +280,15 @@ abstract class AbstractRouter(

override fun onPeerWireException(peer: PeerHandler?, cause: Throwable) {
// exception occurred in protobuf decoders
logger.debug("Malformed message from $peer : $cause")
logger.debug("Malformed message from {} : {}", peer, cause)
peer?.also { notifyMalformedMessage(it) }
}

override fun onServiceException(peer: PeerHandler?, msg: Any?, cause: Throwable) {
if (cause is BadPeerException) {
logger.debug("Remote peer ($peer) misbehaviour on message $msg: $cause")
logger.debug("Remote peer ({}) misbehaviour on message {} : {}", peer, msg, cause)
} else {
logger.warn("AbstractRouter internal error on message $msg from peer $peer", cause)
logger.warn("AbstractRouter internal error on message {} from peer {}", msg, peer, cause)
}
}

Expand Down
3 changes: 3 additions & 0 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ interface PubsubMessage {
val topics: List<Topic>
get() = protobufMessage.topicIDsList

val size: Int
get() = protobufMessage.data.size()

fun messageSha256() = sha256(protobufMessage.toByteArray())

override fun equals(other: Any?): Boolean
Expand Down
23 changes: 17 additions & 6 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ fun defaultDLazy(D: Int) = D
fun defaultDScore(D: Int) = D * 2 / 3
fun defaultDOut(D: Int, DLow: Int) = min(D / 2, max(DLow - 1, 0))

// floodPublishMaxMessageSizeThreshold shortcuts
const val NEVER_FLOOD_PUBLISH = 0
const val ALWAYS_FLOOD_PUBLISH = Int.MAX_VALUE

/**
* Parameters of Gossip 1.1 router
*/
Expand Down Expand Up @@ -112,11 +116,16 @@ data class GossipParams(
val seenTTL: Duration = 2.minutes,

/**
* [floodPublish] is a gossipsub router option that enables flood publishing.
* When this is enabled, published messages are forwarded to all peers with score >=
* to publishThreshold
* [floodPublishMaxMessageSizeThreshold] controls the maximum size (in bytes) a message will be
* published using flood publishing mode.
* When a message size is <= [floodPublishMaxMessageSizeThreshold], published messages are forwarded
* to all peers with score >= to [GossipScoreParams.publishThreshold]
*
* [NEVER_FLOOD_PUBLISH] and [ALWAYS_FLOOD_PUBLISH] can be used as shortcuts.
*
* The default is [NEVER_FLOOD_PUBLISH] (0 KiB).
*/
val floodPublish: Boolean = false,
val floodPublishMaxMessageSizeThreshold: Int = NEVER_FLOOD_PUBLISH,

/**
* [gossipFactor] affects how many peers we will emit gossip to at each heartbeat.
Expand Down Expand Up @@ -240,9 +249,9 @@ data class GossipParams(

/**
* [iDontWantMinMessageSizeThreshold] controls the minimum size (in bytes) that an incoming message needs to be so that an IDONTWANT message is sent to mesh peers.
* The default is 16 KB.
* The default is 16 KiB.
*/
val iDontWantMinMessageSizeThreshold: Int = 16000,
val iDontWantMinMessageSizeThreshold: Int = 16384,

/**
* [iDontWantTTL] Expiry time for cache of received IDONTWANT messages for peers
Expand All @@ -260,6 +269,8 @@ data class GossipParams(
check(DLow <= D, "DLow should be <= D")
check(DHigh >= D, "DHigh should be >= D")
check(gossipFactor in 0.0..1.0, "gossipFactor should be in range [0.0, 1.0]")
check(floodPublishMaxMessageSizeThreshold >= 0, "floodPublishMaxMessageSizeThreshold should be >= 0")
check(iDontWantMinMessageSizeThreshold >= 0, "iDontWantMinMessageSizeThreshold should be >= 0")
}

companion object {
Expand Down
66 changes: 51 additions & 15 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -281,17 +281,21 @@ open class GossipRouter(
when {
isDirect(peer) ->
prune(peer, topic)

isBackOff(peer, topic) -> {
notifyRouterMisbehavior(peer, 1)
if (isBackOffFlood(peer, topic)) {
notifyRouterMisbehavior(peer, 1)
}
prune(peer, topic)
}

score.score(peer.peerId) < 0 ->
prune(peer, topic)

meshPeers.size >= params.DHigh && !peer.isOutbound() ->
prune(peer, topic)

peer !in meshPeers ->
graft(peer, topic)
}
Expand Down Expand Up @@ -400,31 +404,63 @@ open class GossipRouter(
override fun broadcastOutbound(msg: PubsubMessage): CompletableFuture<Unit> {
msg.topics.forEach { lastPublished[it] = currentTimeSupplier() }

val floodPublish = msg.size <= params.floodPublishMaxMessageSizeThreshold

val peers =
if (params.floodPublish) {
if (floodPublish) {
msg.topics
.flatMap { getTopicPeers(it) }
.filter { score.score(it.peerId) >= scoreParams.publishThreshold }
.plus(getDirectPeers())
} else {
msg.topics
.mapNotNull { topic ->
mesh[topic] ?: fanout[topic] ?: getTopicPeers(topic).shuffled(random).take(params.D)
.also {
if (it.isNotEmpty()) fanout[topic] = it.toMutableSet()
.map { topic ->
val topicMeshPeers = mesh[topic]
if (topicMeshPeers != null) {
// we are subscribed to the topic
if (topicMeshPeers.size < params.D) {
// we need extra non-mesh peers for more reliable publishing
val nonMeshTopicPeers = getTopicPeers(topic) - topicMeshPeers
val (nonMeshTopicPeersAbovePublishThreshold, nonMeshTopicPeersBelowPublishThreshold) =
nonMeshTopicPeers.partition { score.score(it.peerId) >= scoreParams.publishThreshold }
// this deviates from the original spec but we want at least D peers for publishing
// prioritizing mesh peers, then non-mesh peers with acceptable score,
// and then underscored non-mesh peers as a last resort
listOf(
topicMeshPeers,
nonMeshTopicPeersAbovePublishThreshold.shuffled(random),
nonMeshTopicPeersBelowPublishThreshold.shuffled(random)
)
.flatten()
.take(params.D)
} else {
topicMeshPeers
}
} else {
// we are not subscribed to the topic
fanout[topic] ?: getTopicPeers(topic).shuffled(random).take(params.D)
.also {
if (it.isNotEmpty()) fanout[topic] = it.toMutableSet()
}
}
}
.flatten()
}
val list = peers
.filterNot { peerDoesNotWantMessage(it, msg.messageId) }
.map { submitPublishMessage(it, msg) }

mCache += msg
flushAllPending()

return if (list.isNotEmpty()) {
anyComplete(list)
return if (peers.isNotEmpty()) {
iDontWant(msg)
val publishedMessages = peers
.filterNot { peerDoesNotWantMessage(it, msg.messageId) }
.map { submitPublishMessage(it, msg) }
if (publishedMessages.isEmpty()) {
// all peers have sent IDONTWANT for this message id
CompletableFuture.completedFuture(Unit)
} else {
flushAllPending()
anyComplete(publishedMessages)
}
} else {
completedExceptionally(
NoPeersForOutboundMessageException("No peers for message topics ${msg.topics} found")
Expand Down Expand Up @@ -605,16 +641,16 @@ open class GossipRouter(
enqueueIwant(peer, messageIds)
}

private fun iDontWant(msg: PubsubMessage, receivedFrom: PeerHandler) {
private fun iDontWant(msg: PubsubMessage, receivedFrom: PeerHandler? = null) {
if (!this.protocol.supportsIDontWant()) return
if (msg.protobufMessage.data.size() < params.iDontWantMinMessageSizeThreshold) return
if (msg.size < params.iDontWantMinMessageSizeThreshold) return
// we need to send IDONTWANT messages to mesh peers immediately in order for them to have an effect
msg.topics
.mapNotNull { mesh[it] }
.flatten()
.distinct()
.minus(receivedFrom)
.forEach { peer -> sendIdontwant(peer, msg.messageId) }
.minus(setOfNotNull(receivedFrom))
.forEach { sendIdontwant(it, msg.messageId) }
}

private fun enqueuePrune(peer: PeerHandler, topic: Topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ class GossipParamsBuilder {

private var pruneBackoff: Duration? = null

private var floodPublish: Boolean? = null

private var gossipFactor: Double? = null

private var opportunisticGraftPeers: Int? = null
Expand Down Expand Up @@ -76,6 +74,8 @@ class GossipParamsBuilder {

private var iDontWantMinMessageSizeThreshold: Int? = null

private var floodPublishMaxMessageSizeThreshold: Int? = null

private var iDontWantTTL: Duration? = null

init {
Expand All @@ -90,7 +90,7 @@ class GossipParamsBuilder {
this.maxPeersSentInPruneMsg = source.maxPeersSentInPruneMsg
this.maxPeersAcceptedInPruneMsg = source.maxPeersAcceptedInPruneMsg
this.pruneBackoff = source.pruneBackoff
this.floodPublish = source.floodPublish
this.floodPublishMaxMessageSizeThreshold = source.floodPublishMaxMessageSizeThreshold
this.gossipFactor = source.gossipFactor
this.opportunisticGraftPeers = source.opportunisticGraftPeers
this.opportunisticGraftTicks = source.opportunisticGraftTicks
Expand Down Expand Up @@ -141,8 +141,6 @@ class GossipParamsBuilder {

fun pruneBackoff(value: Duration): GossipParamsBuilder = apply { pruneBackoff = value }

fun floodPublish(value: Boolean): GossipParamsBuilder = apply { floodPublish = value }

fun gossipFactor(value: Double): GossipParamsBuilder = apply { gossipFactor = value }

fun opportunisticGraftPeers(value: Int): GossipParamsBuilder = apply {
Expand Down Expand Up @@ -185,6 +183,8 @@ class GossipParamsBuilder {

fun iDontWantMinMessageSizeThreshold(value: Int): GossipParamsBuilder = apply { iDontWantMinMessageSizeThreshold = value }

fun floodPublishMaxMessageSizeThreshold(value: Int): GossipParamsBuilder = apply { floodPublishMaxMessageSizeThreshold = value }

fun iDontWantTTL(value: Duration): GossipParamsBuilder = apply { iDontWantTTL = value }

fun build(): GossipParams {
Expand All @@ -203,7 +203,7 @@ class GossipParamsBuilder {
gossipHistoryLength = gossipHistoryLength!!,
heartbeatInterval = heartbeatInterval!!,
seenTTL = seenTTL!!,
floodPublish = floodPublish!!,
floodPublishMaxMessageSizeThreshold = floodPublishMaxMessageSizeThreshold!!,
gossipFactor = gossipFactor!!,
opportunisticGraftPeers = opportunisticGraftPeers!!,
opportunisticGraftTicks = opportunisticGraftTicks!!,
Expand Down Expand Up @@ -252,7 +252,7 @@ class GossipParamsBuilder {
check(seenTTL != null, { "seenTTL must not be null" })
check(maxPeersSentInPruneMsg != null, { "maxPeersSentInPruneMsg must not be null" })
check(pruneBackoff != null, { "pruneBackoff must not be null" })
check(floodPublish != null, { "floodPublish must not be null" })
check(floodPublishMaxMessageSizeThreshold != null, { "floodPublishMaxMessageSizeThreshold must not be null" })
check(gossipFactor != null, { "gossipFactor must not be null" })
check(opportunisticGraftPeers != null, { "opportunisticGraftPeers must not be null" })
check(opportunisticGraftTicks != null, { "opportunisticGraftTicks must not be null" })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit

class GossipPubsubRouterTest : PubsubRouterTest(
createGossipFuzzRouterFactory {
GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublish = false))
GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublishMaxMessageSizeThreshold = NEVER_FLOOD_PUBLISH))
}
) {

Expand Down Expand Up @@ -59,7 +59,7 @@ class GossipPubsubRouterTest : PubsubRouterTest(
// this is to test ihave/iwant
fuzz.timeController.addTime(Duration.ofMillis(1))

val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublish = false)) }
val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublishMaxMessageSizeThreshold = NEVER_FLOOD_PUBLISH)) }
val routerCenter = fuzz.createTestGossipRouter(r)
allRouters.add(0, routerCenter)

Expand Down
Loading
Loading