Skip to content

Commit

Permalink
[GossipSub 1.2] Add IDONTWANT support (#374)
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov authored Sep 25, 2024
1 parent 601bda7 commit e9e1d33
Show file tree
Hide file tree
Showing 13 changed files with 444 additions and 125 deletions.
6 changes: 2 additions & 4 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import java.util.Collections.singletonList
import java.util.Optional
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ScheduledExecutorService
import java.util.function.BiConsumer
import java.util.function.Consumer

// 1 MB default max message size
const val DEFAULT_MAX_PUBSUB_MESSAGE_SIZE = 1 shl 20
Expand Down Expand Up @@ -223,7 +221,7 @@ abstract class AbstractRouter(

validFuts.forEach { (msg, validationFut) ->
validationFut.thenAcceptAsync(
Consumer { res ->
{ res ->
seenMessages[msg] = Optional.of(res)
if (res == ValidationResult.Invalid) notifyUnseenInvalidMessage(peer, msg)
},
Expand All @@ -247,7 +245,7 @@ abstract class AbstractRouter(
// broadcast others on completion
undone.forEach {
it.second.whenCompleteAsync(
BiConsumer { res, err ->
{ res, err ->
when {
err != null -> logger.warn("Exception while handling message from peer $peer: ${it.first}", err)
res == ValidationResult.Invalid -> logger.debug("Invalid pubsub message from peer $peer: ${it.first}")
Expand Down
15 changes: 15 additions & 0 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubProtocol.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,25 @@ enum class PubsubProtocol(val announceStr: ProtocolId) {

Gossip_V_1_0("/meshsub/1.0.0"),
Gossip_V_1_1("/meshsub/1.1.0"),
Gossip_V_1_2("/meshsub/1.2.0"),
Floodsub("/floodsub/1.0.0");

companion object {
fun fromProtocol(protocol: ProtocolId) = PubsubProtocol.values().find { protocol == it.announceStr }
?: throw NoSuchElementException("No PubsubProtocol found with protocol $protocol")
}

/**
* https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#prune-backoff-and-peer-exchange
*/
fun supportsBackoffAndPX(): Boolean {
return this == Gossip_V_1_1 || this == Gossip_V_1_2
}

/**
* https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md#idontwant-message
*/
fun supportsIDontWant(): Boolean {
return this == Gossip_V_1_2
}
}
24 changes: 17 additions & 7 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,23 @@ class Gossip @JvmOverloads constructor(
}

override val protocolDescriptor =
if (router.protocol == PubsubProtocol.Gossip_V_1_1) {
ProtocolDescriptor(
PubsubProtocol.Gossip_V_1_1.announceStr,
PubsubProtocol.Gossip_V_1_0.announceStr
)
} else {
ProtocolDescriptor(PubsubProtocol.Gossip_V_1_0.announceStr)
when (router.protocol) {
PubsubProtocol.Gossip_V_1_2 -> {
ProtocolDescriptor(
PubsubProtocol.Gossip_V_1_2.announceStr,
PubsubProtocol.Gossip_V_1_1.announceStr,
PubsubProtocol.Gossip_V_1_0.announceStr
)
}
PubsubProtocol.Gossip_V_1_1 -> {
ProtocolDescriptor(
PubsubProtocol.Gossip_V_1_1.announceStr,
PubsubProtocol.Gossip_V_1_0.announceStr
)
}
else -> {
ProtocolDescriptor(PubsubProtocol.Gossip_V_1_0.announceStr)
}
}

override fun handleConnection(conn: Connection) {
Expand Down
19 changes: 18 additions & 1 deletion libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,24 @@ data class GossipParams(
* callback to notify outer system to which peers Gossip wants to be connected
* The second parameter is a signed peer record: https://github.com/libp2p/specs/pull/217
*/
val connectCallback: (PeerId, ByteArray) -> Unit = { _: PeerId, _: ByteArray -> }
val connectCallback: (PeerId, ByteArray) -> Unit = { _: PeerId, _: ByteArray -> },

/**
* [maxIDontWantMessageIds] is the maximum number of IDONTWANT message ids allowed per heartbeat per peer
*/
val maxIDontWantMessageIds: Int = maxIHaveLength * maxIHaveMessages,

/**
* [iDontWantMinMessageSizeThreshold] controls the minimum size (in bytes) that an incoming message needs to be so that an IDONTWANT message is sent to mesh peers.
* The default is 16 KB.
*/
val iDontWantMinMessageSizeThreshold: Int = 16000,

/**
* [iDontWantTTL] Expiry time for cache of received IDONTWANT messages for peers
*/
val iDontWantTTL: Duration = 3.seconds

) {
init {
check(D >= 0, "D should be >= 0")
Expand Down
91 changes: 77 additions & 14 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import kotlin.collections.any
import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.collections.count
import kotlin.collections.distinct
import kotlin.collections.drop
import kotlin.collections.filter
import kotlin.collections.filterNot
Expand All @@ -46,7 +45,6 @@ import kotlin.collections.reversed
import kotlin.collections.set
import kotlin.collections.shuffled
import kotlin.collections.sortedBy
import kotlin.collections.sum
import kotlin.collections.take
import kotlin.collections.toMutableSet
import kotlin.math.max
Expand All @@ -56,6 +54,7 @@ const val MaxBackoffEntries = 10 * 1024
const val MaxIAskedEntries = 256
const val MaxPeerIHaveEntries = 256
const val MaxIWantRequestsEntries = 10 * 1024
const val MaxPeerIDontWantEntries = 256

typealias CurrentTimeSupplier = () -> Long

Expand Down Expand Up @@ -122,6 +121,7 @@ open class GossipRouter(
private val iAsked = createLRUMap<PeerHandler, AtomicInteger>(MaxIAskedEntries)
private val peerIHave = createLRUMap<PeerHandler, AtomicInteger>(MaxPeerIHaveEntries)
private val iWantRequests = createLRUMap<Pair<PeerHandler, MessageId>, Long>(MaxIWantRequestsEntries)
private val peerIDontWant = createLRUMap<PeerHandler, IDontWantCacheEntry>(MaxPeerIDontWantEntries)
private val heartbeatTask by lazy {
executor.scheduleWithFixedDelay(
::catchingHeartbeat,
Expand Down Expand Up @@ -166,6 +166,7 @@ open class GossipRouter(
}

override fun notifyUnseenMessage(peer: PeerHandler, msg: PubsubMessage) {
iDontWant(msg, peer)
eventBroadcaster.notifyUnseenMessage(peer.peerId, msg)
notifyAnyMessage(peer, msg)
}
Expand Down Expand Up @@ -250,8 +251,8 @@ open class GossipRouter(
}

override fun validateMessageListLimits(msg: Rpc.RPCOrBuilder): Boolean {
val iWantMessageIdCount = msg.control?.iwantList?.map { w -> w.messageIDsCount }?.sum() ?: 0
val iHaveMessageIdCount = msg.control?.ihaveList?.map { w -> w.messageIDsCount }?.sum() ?: 0
val iWantMessageIdCount = msg.control?.iwantList?.sumOf { w -> w.messageIDsCount } ?: 0
val iHaveMessageIdCount = msg.control?.ihaveList?.sumOf { w -> w.messageIDsCount } ?: 0

return params.maxPublishedMessages?.let { msg.publishCount <= it } ?: true &&
params.maxTopicsPerPublishedMessage?.let { msg.publishList.none { m -> m.topicIDsCount > it } } ?: true &&
Expand All @@ -269,6 +270,7 @@ open class GossipRouter(
is Rpc.ControlPrune -> handlePrune(controlMsg, receivedFrom)
is Rpc.ControlIHave -> handleIHave(controlMsg, receivedFrom)
is Rpc.ControlIWant -> handleIWant(controlMsg, receivedFrom)
is Rpc.ControlIDontWant -> handleIDontWant(controlMsg, receivedFrom)
}
}

Expand Down Expand Up @@ -300,7 +302,7 @@ open class GossipRouter(
mesh[topic]?.remove(peer)?.also {
notifyPruned(peer, topic)
}
if (this.protocol == PubsubProtocol.Gossip_V_1_1) {
if (this.protocol.supportsBackoffAndPX()) {
if (msg.hasBackoff()) {
setBackOff(peer, topic, msg.backoff.seconds.toMillis())
} else {
Expand Down Expand Up @@ -348,8 +350,22 @@ open class GossipRouter(
msg.messageIDsList
.mapNotNull { mCache.getMessageForPeer(peer.peerId, it.toWBytes()) }
.filter { it.sentCount < params.gossipRetransmission }
.map { it.msg }
.forEach { submitPublishMessage(peer, it) }
.forEach { submitPublishMessage(peer, it.msg) }
}

private fun handleIDontWant(msg: Rpc.ControlIDontWant, peer: PeerHandler) {
if (!this.protocol.supportsIDontWant()) return
val peerScore = score.score(peer.peerId)
if (peerScore < scoreParams.gossipThreshold) return
val iDontWantCacheEntry = peerIDontWant.computeIfAbsent(peer) { IDontWantCacheEntry() }
iDontWantCacheEntry.heartbeatMessageIdsCount += msg.messageIDsCount
if (iDontWantCacheEntry.heartbeatMessageIdsCount > params.maxIDontWantMessageIds) {
return
}
val timeReceived = currentTimeSupplier()
msg.messageIDsList
.map { it.toWBytes() }
.associateWithTo(iDontWantCacheEntry.messageIdsAndTimeReceived) { timeReceived }
}

private fun processPrunePeers(peersList: List<Rpc.PeerInfo>) {
Expand All @@ -361,18 +377,20 @@ open class GossipRouter(

override fun processControl(ctrl: Rpc.ControlMessage, receivedFrom: PeerHandler) {
ctrl.run {
(graftList + pruneList + ihaveList + iwantList)
(graftList + pruneList + ihaveList + iwantList + idontwantList)
}.forEach { processControlMessage(it, receivedFrom) }
}

override fun broadcastInbound(msgs: List<PubsubMessage>, receivedFrom: PeerHandler) {
msgs.forEach { pubMsg ->
pubMsg.topics
.asSequence()
.mapNotNull { mesh[it] }
.flatten()
.distinct()
.plus(getDirectPeers())
.filter { it != receivedFrom }
.minus(receivedFrom)
.filterNot { peerDoesNotWantMessage(it, pubMsg.messageId) }
.forEach { submitPublishMessage(it, pubMsg) }
mCache += pubMsg
}
Expand All @@ -398,15 +416,17 @@ open class GossipRouter(
}
.flatten()
}
val list = peers.map { submitPublishMessage(it, msg) }
val list = peers
.filterNot { peerDoesNotWantMessage(it, msg.messageId) }
.map { submitPublishMessage(it, msg) }

mCache += msg
flushAllPending()

if (list.isNotEmpty()) {
return anyComplete(list)
return if (list.isNotEmpty()) {
anyComplete(list)
} else {
return completedExceptionally(
completedExceptionally(
NoPeersForOutboundMessageException("No peers for message topics ${msg.topics} found")
)
}
Expand Down Expand Up @@ -459,6 +479,15 @@ open class GossipRouter(
.whenTrue { notifyIWantTimeout(key.first, key.second) }
}

val staleIDontWantTime = this.currentTimeSupplier() - params.iDontWantTTL.toMillis()
peerIDontWant.entries.removeIf { (_, cacheEntry) ->
// reset on heartbeat
cacheEntry.heartbeatMessageIdsCount = 0
cacheEntry.messageIdsAndTimeReceived.values.removeIf { timeReceived -> timeReceived < staleIDontWantTime }
// remove entry for peer if no IDONTWANT message ids are left in the cache
cacheEntry.messageIdsAndTimeReceived.isEmpty()
}

try {
mesh.entries.forEach { (topic, peers) ->

Expand Down Expand Up @@ -565,16 +594,32 @@ open class GossipRouter(
}
}

private fun peerDoesNotWantMessage(peer: PeerHandler, messageId: MessageId): Boolean {
return peerIDontWant[peer]?.messageIdsAndTimeReceived?.contains(messageId) == true
}

private fun iWant(peer: PeerHandler, messageIds: List<MessageId>) {
if (messageIds.isEmpty()) return
messageIds[random.nextInt(messageIds.size)]
.also { iWantRequests[peer to it] = currentTimeSupplier() }
enqueueIwant(peer, messageIds)
}

private fun iDontWant(msg: PubsubMessage, receivedFrom: PeerHandler) {
if (!this.protocol.supportsIDontWant()) return
if (msg.protobufMessage.data.size() < params.iDontWantMinMessageSizeThreshold) return
// we need to send IDONTWANT messages to mesh peers immediately in order for them to have an effect
msg.topics
.mapNotNull { mesh[it] }
.flatten()
.distinct()
.minus(receivedFrom)
.forEach { peer -> sendIdontwant(peer, msg.messageId) }
}

private fun enqueuePrune(peer: PeerHandler, topic: Topic) {
val peerQueue = pendingRpcParts.getQueue(peer)
if (peer.getPeerProtocol() == PubsubProtocol.Gossip_V_1_1 && this.protocol == PubsubProtocol.Gossip_V_1_1) {
if (peer.getPeerProtocol().supportsBackoffAndPX() && this.protocol.supportsBackoffAndPX()) {
val backoffPeers = (getTopicPeers(topic) - peer)
.take(params.maxPeersSentInPruneMsg)
.filter { score.score(it.peerId) >= 0 }
Expand All @@ -594,7 +639,25 @@ open class GossipRouter(
private fun enqueueIhave(peer: PeerHandler, messageIds: List<MessageId>, topic: Topic) =
pendingRpcParts.getQueue(peer).addIHaves(messageIds, topic)

private fun sendIdontwant(peer: PeerHandler, messageId: MessageId) {
if (!peer.getPeerProtocol().supportsIDontWant()) {
return
}
val iDontWant = Rpc.RPC.newBuilder().setControl(
Rpc.ControlMessage.newBuilder().addIdontwant(
Rpc.ControlIDontWant.newBuilder()
.addMessageIDs(messageId.toProtobuf())
)
).build()
send(peer, iDontWant)
}

data class AcceptRequestsWhitelistEntry(val whitelistedTill: Long, val messagesAccepted: Int = 0) {
fun incrementMessageCount() = AcceptRequestsWhitelistEntry(whitelistedTill, messagesAccepted + 1)
}

data class IDontWantCacheEntry(
var heartbeatMessageIdsCount: Int = 0,
val messageIdsAndTimeReceived: MutableMap<MessageId, Long> = mutableMapOf()
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ class GossipParamsBuilder {

private var connectCallback: Function2<PeerId, ByteArray, Unit>? = null

private var maxIDontWantMessageIds: Int? = null

private var iDontWantMinMessageSizeThreshold: Int? = null

private var iDontWantTTL: Duration? = null

init {
val source = GossipParams()
this.D = source.D
Expand Down Expand Up @@ -100,6 +106,9 @@ class GossipParamsBuilder {
this.maxPruneMessages = source.maxPruneMessages
this.gossipRetransmission = source.gossipRetransmission
this.connectCallback = source.connectCallback
this.maxIDontWantMessageIds = source.maxIDontWantMessageIds
this.iDontWantMinMessageSizeThreshold = source.iDontWantMinMessageSizeThreshold
this.iDontWantTTL = source.iDontWantTTL
}

fun D(value: Int): GossipParamsBuilder = apply { D = value }
Expand Down Expand Up @@ -172,6 +181,12 @@ class GossipParamsBuilder {
connectCallback = value
}

fun maxIDontWantMessageIds(value: Int): GossipParamsBuilder = apply { maxIDontWantMessageIds = value }

fun iDontWantMinMessageSizeThreshold(value: Int): GossipParamsBuilder = apply { iDontWantMinMessageSizeThreshold = value }

fun iDontWantTTL(value: Duration): GossipParamsBuilder = apply { iDontWantTTL = value }

fun build(): GossipParams {
calculateMissing()
checkRequiredFields()
Expand Down Expand Up @@ -206,7 +221,10 @@ class GossipParamsBuilder {
pruneBackoff = pruneBackoff!!,
maxPruneMessages = maxPruneMessages,
gossipRetransmission = gossipRetransmission!!,
connectCallback = connectCallback!!
connectCallback = connectCallback!!,
maxIDontWantMessageIds = maxIDontWantMessageIds!!,
iDontWantMinMessageSizeThreshold = iDontWantMinMessageSizeThreshold!!,
iDontWantTTL = iDontWantTTL!!
)
}

Expand Down Expand Up @@ -244,5 +262,8 @@ class GossipParamsBuilder {
check(iWantFollowupTime != null, { "iWantFollowupTime must not be null" })
check(gossipRetransmission != null, { "gossipRetransmission must not be null" })
check(connectCallback != null, { "connectCallback must not be null" })
check(maxIDontWantMessageIds != null, { "maxIDontWantMessageIds must not be null" })
check(iDontWantMinMessageSizeThreshold != null, { "iDontWantMinMessageSizeThreshold must not be null" })
check(iDontWantTTL != null, { "iDontWantTTL must not be null" })
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ typealias GossipScoreFactory =
open class GossipRouterBuilder(

var name: String = "GossipRouter",
var protocol: PubsubProtocol = PubsubProtocol.Gossip_V_1_1,
var protocol: PubsubProtocol = PubsubProtocol.Gossip_V_1_2,

var params: GossipParams = GossipParams(),
var scoreParams: GossipScoreParams = GossipScoreParams(),
Expand Down
Loading

0 comments on commit e9e1d33

Please sign in to comment.