From 28daefad04aa7c1a1d0a329d865abc6b66c7b0af Mon Sep 17 00:00:00 2001 From: Dr Ian Preston Date: Wed, 12 Jun 2024 18:34:48 +0100 Subject: [PATCH 1/8] More mdns fixes (#368) --- .../io/libp2p/discovery/MDnsDiscovery.kt | 91 +++++++++++++++++-- .../io/libp2p/discovery/MDnsDiscoveryTest.kt | 23 +++++ 2 files changed, 104 insertions(+), 10 deletions(-) diff --git a/libp2p/src/main/kotlin/io/libp2p/discovery/MDnsDiscovery.kt b/libp2p/src/main/kotlin/io/libp2p/discovery/MDnsDiscovery.kt index 9838f4ad..8c7b1918 100644 --- a/libp2p/src/main/kotlin/io/libp2p/discovery/MDnsDiscovery.kt +++ b/libp2p/src/main/kotlin/io/libp2p/discovery/MDnsDiscovery.kt @@ -1,23 +1,21 @@ package io.libp2p.discovery -import io.libp2p.core.Discoverer -import io.libp2p.core.Host -import io.libp2p.core.PeerId -import io.libp2p.core.PeerInfo -import io.libp2p.core.PeerListener +import io.libp2p.core.* import io.libp2p.core.multiformats.Multiaddr +import io.libp2p.core.multiformats.MultiaddrComponent import io.libp2p.core.multiformats.Protocol import io.libp2p.discovery.mdns.AnswerListener import io.libp2p.discovery.mdns.JmDNS import io.libp2p.discovery.mdns.ServiceInfo import io.libp2p.discovery.mdns.impl.DNSRecord import io.libp2p.discovery.mdns.impl.constants.DNSRecordType -import java.net.Inet4Address -import java.net.Inet6Address -import java.net.InetAddress +import java.net.* +import java.util.* import java.util.concurrent.CompletableFuture import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.ForkJoinPool +import java.util.stream.Collectors +import java.util.stream.Stream class MDnsDiscovery( private val host: Host, @@ -61,6 +59,10 @@ class MDnsDiscovery( newPeerFoundListeners.forEach { it(peerInfo) } } + fun addHandler(h: PeerListener) { + newPeerFoundListeners += h + } + private fun ipfsDiscoveryInfo(): ServiceInfo { return ServiceInfo.create( serviceTag, @@ -87,11 +89,80 @@ class MDnsDiscovery( return Integer.parseInt(str) } + /* /ip6/::/tcp/4001 should expand to the following for example: + "/ip6/0:0:0:0:0:0:0:1/udp/4001/quic" + "/ip4/50.116.48.246/tcp/4001" + "/ip4/127.0.0.1/tcp/4001" + "/ip6/2600:3c03:0:0:f03c:92ff:fee7:bc1c/tcp/4001" + "/ip6/0:0:0:0:0:0:0:1/tcp/4001" + "/ip4/50.116.48.246/udp/4001/quic" + "/ip4/127.0.0.1/udp/4001/quic" + "/ip6/2600:3c03:0:0:f03c:92ff:fee7:bc1c/udp/4001/quic" + */ + fun expandWildcardAddresses(addr: Multiaddr): List { + // Do not include /p2p or /ipfs components which are superfluous here + if (!isWildcard(addr)) { + return java.util.List.of( + Multiaddr( + addr.components + .stream() + .filter { c: MultiaddrComponent -> + ( + c.protocol !== Protocol.P2P && + c.protocol !== Protocol.IPFS + ) + } + .collect(Collectors.toList()) + ) + ) + } + if (addr.has(Protocol.IP4)) return listNetworkAddresses(false, addr) + return if (addr.has(Protocol.IP6)) listNetworkAddresses(true, addr) else emptyList() + } + + fun listNetworkAddresses(includeIp6: Boolean, addr: Multiaddr): List { + return try { + Collections.list(NetworkInterface.getNetworkInterfaces()).stream() + .flatMap { net: NetworkInterface -> + net.interfaceAddresses.stream() + .map { obj: InterfaceAddress -> obj.address } + .filter { ip: InetAddress? -> includeIp6 || ip is Inet4Address } + } + .map { ip: InetAddress -> + Multiaddr( + Stream.concat( + Stream.of( + MultiaddrComponent( + if (ip is Inet4Address) Protocol.IP4 else Protocol.IP6, + ip.address + ) + ), + addr.components.stream() + .filter { c: MultiaddrComponent -> + c.protocol !== Protocol.IP4 && c.protocol !== Protocol.IP6 && c.protocol !== Protocol.P2P && c.protocol !== Protocol.IPFS + } + ) + .collect(Collectors.toList()) + ) + } + .collect(Collectors.toList()) + } catch (e: SocketException) { + throw RuntimeException(e) + } + } + + fun isWildcard(addr: Multiaddr): Boolean { + val s = addr.toString() + return s.contains("/::/") || s.contains("/0:0:0:0/") + } + private fun ip4Addresses() = ipAddresses(Protocol.IP4, Inet4Address::class.java) private fun ip6Addresses() = ipAddresses(Protocol.IP6, Inet6Address::class.java) private fun ipAddresses(protocol: Protocol, klass: Class): List { - return host.listenAddresses().map { + return host.listenAddresses().flatMap { + expandWildcardAddresses(it) + }.map { it.getFirstComponent(protocol) }.filterNotNull().map { InetAddress.getByAddress(localhost.hostName, it.value) @@ -112,7 +183,7 @@ class MDnsDiscovery( val aRecords = answers.filter { DNSRecordType.TYPE_A.equals(it.recordType) } val aaaaRecords = answers.filter { DNSRecordType.TYPE_AAAA.equals(it.recordType) } - if (txtRecord == null || srvRecord == null || aRecords.isEmpty()) { + if (txtRecord == null || srvRecord == null || (aRecords.isEmpty() && aaaaRecords.isEmpty())) { return // incomplete answers } diff --git a/libp2p/src/test/kotlin/io/libp2p/discovery/MDnsDiscoveryTest.kt b/libp2p/src/test/kotlin/io/libp2p/discovery/MDnsDiscoveryTest.kt index f0acd579..e0bddd94 100644 --- a/libp2p/src/test/kotlin/io/libp2p/discovery/MDnsDiscoveryTest.kt +++ b/libp2p/src/test/kotlin/io/libp2p/discovery/MDnsDiscoveryTest.kt @@ -6,6 +6,7 @@ import io.libp2p.core.multiformats.Multiaddr import io.libp2p.crypto.keys.generateEcdsaKeyPair import io.libp2p.tools.NullHost import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test import java.util.concurrent.TimeUnit @@ -90,6 +91,28 @@ class MDnsDiscoveryTest { assertEquals(host.listenAddresses().size, peerInfo?.addresses?.size) } + @Test + fun `start discovery and listen for self ipv6`() { + var peerInfo: PeerInfo? = null + val discoverer = MDnsDiscovery(hostIpv6, testServiceTag) + + discoverer.newPeerFoundListeners += { + peerInfo = it + } + + discoverer.start().get(1, TimeUnit.SECONDS) + for (i in 0..50) { + if (peerInfo != null) { + break + } + TimeUnit.MILLISECONDS.sleep(100) + } + discoverer.stop().get(5, TimeUnit.SECONDS) + + assertEquals(hostIpv6.peerId, peerInfo?.peerId) + assertTrue(hostIpv6.listenAddresses().size <= peerInfo?.addresses?.size!!) + } + @Test fun `start discovery and listen for other`() { var peerInfo: PeerInfo? = null From 3374fb4a81a790d6dc9dbc608dffe4ce4759332c Mon Sep 17 00:00:00 2001 From: Dave Huseby Date: Mon, 12 Aug 2024 18:39:36 -0600 Subject: [PATCH 2/8] Create funding.json --- funding.json | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 funding.json diff --git a/funding.json b/funding.json new file mode 100644 index 00000000..bcf7fc27 --- /dev/null +++ b/funding.json @@ -0,0 +1,5 @@ +{ + "opRetro": { + "projectId": "0x966804cb492e1a4bde5d781a676a44a23d69aa5dd2562fa7a4f95bb606021c8b" + } +} From 62fa8bbe89cfb8285b96f94fdb3edc15f47c969a Mon Sep 17 00:00:00 2001 From: Prithvi Shahi Date: Wed, 4 Sep 2024 15:44:04 -0700 Subject: [PATCH 3/8] chore: Update funding.json --- funding.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/funding.json b/funding.json index bcf7fc27..020558ba 100644 --- a/funding.json +++ b/funding.json @@ -1,5 +1,5 @@ { "opRetro": { - "projectId": "0x966804cb492e1a4bde5d781a676a44a23d69aa5dd2562fa7a4f95bb606021c8b" + "projectId": "0x0be3a0fa062180bdfbfdefa993b09acd9edcae93ba0d8d5829dd01c138268f40" } } From 7cf9e990c77be65f549ecfe9d4e908f59796c2dc Mon Sep 17 00:00:00 2001 From: Lucas Saldanha Date: Sat, 21 Sep 2024 00:29:24 +1200 Subject: [PATCH 4/8] Updating com.google.protobuf to 3.25.5 (#373) --- versions.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/versions.gradle b/versions.gradle index 497ef091..222ee08a 100644 --- a/versions.gradle +++ b/versions.gradle @@ -27,7 +27,7 @@ dependencyManagement { entry 'jmh-generator-annprocess' } - dependencySet(group: "com.google.protobuf", version: "3.24.3") { + dependencySet(group: "com.google.protobuf", version: "3.25.5") { entry 'protobuf-java' entry 'protoc' } From 601bda7cbb6bcb6adbfaa7cfa9c4646d38054e89 Mon Sep 17 00:00:00 2001 From: Anton Nashatyrev Date: Wed, 25 Sep 2024 17:16:14 +0400 Subject: [PATCH 5/8] Fix Gossip simulator issue (#375) * Fix the simulator issue when subsequent message on the same channel is delayed for extra connection latency * Add ChannelMessageDelayerTest --- .../simulate/delay/ChannelMessageDelayer.kt | 29 +++++++ .../simulate/stream/StreamNettyChannel.kt | 20 +---- .../delay/ChannelMessageDelayerTest.kt | 83 +++++++++++++++++++ 3 files changed, 116 insertions(+), 16 deletions(-) create mode 100644 tools/simulator/src/main/kotlin/io/libp2p/simulate/delay/ChannelMessageDelayer.kt create mode 100644 tools/simulator/src/test/kotlin/io/libp2p/simulate/delay/ChannelMessageDelayerTest.kt diff --git a/tools/simulator/src/main/kotlin/io/libp2p/simulate/delay/ChannelMessageDelayer.kt b/tools/simulator/src/main/kotlin/io/libp2p/simulate/delay/ChannelMessageDelayer.kt new file mode 100644 index 00000000..a69a095f --- /dev/null +++ b/tools/simulator/src/main/kotlin/io/libp2p/simulate/delay/ChannelMessageDelayer.kt @@ -0,0 +1,29 @@ +package io.libp2p.simulate.delay + +import io.libp2p.simulate.BandwidthDelayer +import io.libp2p.simulate.MessageDelayer +import io.libp2p.simulate.delay.SequentialDelayer.Companion.sequential +import java.util.concurrent.CompletableFuture +import java.util.concurrent.ScheduledExecutorService + +class ChannelMessageDelayer( + executor: ScheduledExecutorService, + localOutboundBandwidthDelayer: BandwidthDelayer, + connectionLatencyDelayer: MessageDelayer, + remoteInboundBandwidthDelayer: BandwidthDelayer, +) : MessageDelayer { + + private val sequentialOutboundBandwidthDelayer = localOutboundBandwidthDelayer.sequential(executor) + private val sequentialInboundBandwidthDelayer = remoteInboundBandwidthDelayer.sequential(executor) + + private val delayer = MessageDelayer { size -> + CompletableFuture.allOf( + sequentialOutboundBandwidthDelayer.delay(size) + .thenCompose { connectionLatencyDelayer.delay(size) }, + connectionLatencyDelayer.delay(size) + .thenCompose { sequentialInboundBandwidthDelayer.delay(size) } + ).thenApply { } + } + + override fun delay(size: Long): CompletableFuture = delayer.delay(size) +} diff --git a/tools/simulator/src/main/kotlin/io/libp2p/simulate/stream/StreamNettyChannel.kt b/tools/simulator/src/main/kotlin/io/libp2p/simulate/stream/StreamNettyChannel.kt index dc83f905..b90d7014 100644 --- a/tools/simulator/src/main/kotlin/io/libp2p/simulate/stream/StreamNettyChannel.kt +++ b/tools/simulator/src/main/kotlin/io/libp2p/simulate/stream/StreamNettyChannel.kt @@ -2,6 +2,7 @@ package io.libp2p.simulate.stream import io.libp2p.etc.types.lazyVar import io.libp2p.simulate.* +import io.libp2p.simulate.delay.ChannelMessageDelayer import io.libp2p.simulate.delay.SequentialDelayer.Companion.sequential import io.libp2p.simulate.util.GeneralSizeEstimator import io.netty.channel.Channel @@ -13,7 +14,6 @@ import io.netty.channel.DefaultChannelPromise import io.netty.channel.EventLoop import io.netty.channel.embedded.EmbeddedChannel import io.netty.util.internal.ObjectUtil -import java.util.concurrent.CompletableFuture import java.util.concurrent.Executors import java.util.concurrent.ScheduledExecutorService @@ -37,29 +37,17 @@ class StreamNettyChannel( var currentTime: () -> Long = System::currentTimeMillis var msgSizeEstimator = GeneralSizeEstimator private var msgDelayer: MessageDelayer by lazyVar { - createMessageDelayer(outboundBandwidth, MessageDelayer.NO_DELAYER, inboundBandwidth) + createMessageDelayer(MessageDelayer.NO_DELAYER) .sequential(executor) } fun setLatency(latency: MessageDelayer) { - msgDelayer = createMessageDelayer(outboundBandwidth, latency, inboundBandwidth) + msgDelayer = createMessageDelayer(latency) } private fun createMessageDelayer( - outboundBandwidthDelayer: BandwidthDelayer, connectionLatencyDelayer: MessageDelayer, - inboundBandwidthDelayer: BandwidthDelayer, - ): MessageDelayer { - return MessageDelayer { size -> - CompletableFuture.allOf( - outboundBandwidthDelayer.delay(size) - .thenCompose { connectionLatencyDelayer.delay(size) }, - connectionLatencyDelayer.delay(size) - .thenCompose { inboundBandwidthDelayer.delay(size) } - ).thenApply { } - } - .sequential(executor) - } + ): MessageDelayer = ChannelMessageDelayer(executor, outboundBandwidth, connectionLatencyDelayer, inboundBandwidth) @Synchronized fun connect(other: StreamNettyChannel) { diff --git a/tools/simulator/src/test/kotlin/io/libp2p/simulate/delay/ChannelMessageDelayerTest.kt b/tools/simulator/src/test/kotlin/io/libp2p/simulate/delay/ChannelMessageDelayerTest.kt new file mode 100644 index 00000000..84294c4d --- /dev/null +++ b/tools/simulator/src/test/kotlin/io/libp2p/simulate/delay/ChannelMessageDelayerTest.kt @@ -0,0 +1,83 @@ +package io.libp2p.simulate.delay + +import io.libp2p.simulate.Bandwidth +import io.libp2p.tools.schedulers.ControlledExecutorServiceImpl +import io.libp2p.tools.schedulers.TimeControllerImpl +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource +import kotlin.time.Duration.Companion.milliseconds + +class ChannelMessageDelayerTest { + val timeController = TimeControllerImpl() + val executor = ControlledExecutorServiceImpl(timeController) + + private val Int.bytesPerSecond get() = Bandwidth(this.toLong()) + private fun Bandwidth.simpleDelayer() = SimpleBandwidthTracker(this, executor) + private fun Int.millisLatencyDelayer() = TimeDelayer(executor) { this.milliseconds } + + @Test + fun `slow outbound bandwidth prevails`() { + val delayer = ChannelMessageDelayer( + executor, + 1000.bytesPerSecond.simpleDelayer(), + 300.millisLatencyDelayer(), + 1000000.bytesPerSecond.simpleDelayer() + ) + + val delay = delayer.delay(1000).thenApply { timeController.time } + + timeController.addTime(2000) + assertThat(delay).isCompletedWithValue(1300) + } + + @Test + fun `slow inbound bandwidth prevails`() { + val delayer = ChannelMessageDelayer( + executor, + 1000000.bytesPerSecond.simpleDelayer(), + 300.millisLatencyDelayer(), + 1000.bytesPerSecond.simpleDelayer() + ) + + val delay = delayer.delay(1000).thenApply { timeController.time } + timeController.addTime(2000) + + assertThat(delay).isCompletedWithValue(1300) + } + + @ParameterizedTest + @ValueSource(booleans = [true, false]) + fun `subsequent messages ordered and timely`(outboundBandwidthSlower: Boolean) { + val delayer = + when (outboundBandwidthSlower) { + true -> ChannelMessageDelayer( + executor, + 1000.bytesPerSecond.simpleDelayer(), + 300.millisLatencyDelayer(), + 1000000.bytesPerSecond.simpleDelayer() + ) + + false -> ChannelMessageDelayer( + executor, + 1000000.bytesPerSecond.simpleDelayer(), + 300.millisLatencyDelayer(), + 1000.bytesPerSecond.simpleDelayer() + ) + } + + val delay1 = delayer.delay(1000).thenApply { timeController.time } + val delay2 = delayer.delay(10).thenApply { timeController.time } + timeController.addTime(200) + val delay3 = delayer.delay(10).thenApply { timeController.time } + timeController.addTime(1099) + val delay4 = delayer.delay(10).thenApply { timeController.time } + timeController.addTime(10000) + + assertThat(delay1).isCompletedWithValue(1300) + assertThat(delay2).isCompletedWithValue(1310) + assertThat(delay3).isCompletedWithValue(1320) + assertThat(delay4).isCompletedWithValue(1609) + } +} From e9e1d3392db113dfdb182d4bc51e1f0b6003ef77 Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Wed, 25 Sep 2024 16:35:31 +0100 Subject: [PATCH 6/8] [GossipSub 1.2] Add IDONTWANT support (#374) --- .../kotlin/io/libp2p/pubsub/AbstractRouter.kt | 6 +- .../kotlin/io/libp2p/pubsub/PubsubProtocol.kt | 15 ++ .../kotlin/io/libp2p/pubsub/gossip/Gossip.kt | 24 ++- .../io/libp2p/pubsub/gossip/GossipParams.kt | 19 +- .../io/libp2p/pubsub/gossip/GossipRouter.kt | 91 ++++++++-- .../gossip/builders/GossipParamsBuilder.kt | 23 ++- .../gossip/builders/GossipRouterBuilder.kt | 2 +- libp2p/src/main/proto/rpc.proto | 5 + .../gossip/GossipBackwardCompatibilityTest.kt | 24 +-- .../libp2p/pubsub/gossip/GossipTestsBase.kt | 76 ++++++++ .../libp2p/pubsub/gossip/GossipV1_1Tests.kt | 112 +++--------- .../libp2p/pubsub/gossip/GossipV1_2Tests.kt | 170 ++++++++++++++++++ .../libp2p/simulate/gossip/GossipSimPeer.kt | 2 +- 13 files changed, 444 insertions(+), 125 deletions(-) create mode 100644 libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipTestsBase.kt create mode 100644 libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_2Tests.kt diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt index 4dd76409..c5ad1f9d 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt @@ -17,8 +17,6 @@ import java.util.Collections.singletonList import java.util.Optional import java.util.concurrent.CompletableFuture import java.util.concurrent.ScheduledExecutorService -import java.util.function.BiConsumer -import java.util.function.Consumer // 1 MB default max message size const val DEFAULT_MAX_PUBSUB_MESSAGE_SIZE = 1 shl 20 @@ -223,7 +221,7 @@ abstract class AbstractRouter( validFuts.forEach { (msg, validationFut) -> validationFut.thenAcceptAsync( - Consumer { res -> + { res -> seenMessages[msg] = Optional.of(res) if (res == ValidationResult.Invalid) notifyUnseenInvalidMessage(peer, msg) }, @@ -247,7 +245,7 @@ abstract class AbstractRouter( // broadcast others on completion undone.forEach { it.second.whenCompleteAsync( - BiConsumer { res, err -> + { res, err -> when { err != null -> logger.warn("Exception while handling message from peer $peer: ${it.first}", err) res == ValidationResult.Invalid -> logger.debug("Invalid pubsub message from peer $peer: ${it.first}") diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubProtocol.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubProtocol.kt index afab564d..49cf9523 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubProtocol.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubProtocol.kt @@ -6,10 +6,25 @@ enum class PubsubProtocol(val announceStr: ProtocolId) { Gossip_V_1_0("/meshsub/1.0.0"), Gossip_V_1_1("/meshsub/1.1.0"), + Gossip_V_1_2("/meshsub/1.2.0"), Floodsub("/floodsub/1.0.0"); companion object { fun fromProtocol(protocol: ProtocolId) = PubsubProtocol.values().find { protocol == it.announceStr } ?: throw NoSuchElementException("No PubsubProtocol found with protocol $protocol") } + + /** + * https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#prune-backoff-and-peer-exchange + */ + fun supportsBackoffAndPX(): Boolean { + return this == Gossip_V_1_1 || this == Gossip_V_1_2 + } + + /** + * https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md#idontwant-message + */ + fun supportsIDontWant(): Boolean { + return this == Gossip_V_1_2 + } } diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt index b413c03d..ae5f3c5e 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt @@ -30,13 +30,23 @@ class Gossip @JvmOverloads constructor( } override val protocolDescriptor = - if (router.protocol == PubsubProtocol.Gossip_V_1_1) { - ProtocolDescriptor( - PubsubProtocol.Gossip_V_1_1.announceStr, - PubsubProtocol.Gossip_V_1_0.announceStr - ) - } else { - ProtocolDescriptor(PubsubProtocol.Gossip_V_1_0.announceStr) + when (router.protocol) { + PubsubProtocol.Gossip_V_1_2 -> { + ProtocolDescriptor( + PubsubProtocol.Gossip_V_1_2.announceStr, + PubsubProtocol.Gossip_V_1_1.announceStr, + PubsubProtocol.Gossip_V_1_0.announceStr + ) + } + PubsubProtocol.Gossip_V_1_1 -> { + ProtocolDescriptor( + PubsubProtocol.Gossip_V_1_1.announceStr, + PubsubProtocol.Gossip_V_1_0.announceStr + ) + } + else -> { + ProtocolDescriptor(PubsubProtocol.Gossip_V_1_0.announceStr) + } } override fun handleConnection(conn: Connection) { diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt index e63e780d..b2cad191 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt @@ -231,7 +231,24 @@ data class GossipParams( * callback to notify outer system to which peers Gossip wants to be connected * The second parameter is a signed peer record: https://github.com/libp2p/specs/pull/217 */ - val connectCallback: (PeerId, ByteArray) -> Unit = { _: PeerId, _: ByteArray -> } + val connectCallback: (PeerId, ByteArray) -> Unit = { _: PeerId, _: ByteArray -> }, + + /** + * [maxIDontWantMessageIds] is the maximum number of IDONTWANT message ids allowed per heartbeat per peer + */ + val maxIDontWantMessageIds: Int = maxIHaveLength * maxIHaveMessages, + + /** + * [iDontWantMinMessageSizeThreshold] controls the minimum size (in bytes) that an incoming message needs to be so that an IDONTWANT message is sent to mesh peers. + * The default is 16 KB. + */ + val iDontWantMinMessageSizeThreshold: Int = 16000, + + /** + * [iDontWantTTL] Expiry time for cache of received IDONTWANT messages for peers + */ + val iDontWantTTL: Duration = 3.seconds + ) { init { check(D >= 0, "D should be >= 0") diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt index d548c517..191c8b03 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt @@ -23,7 +23,6 @@ import kotlin.collections.any import kotlin.collections.component1 import kotlin.collections.component2 import kotlin.collections.count -import kotlin.collections.distinct import kotlin.collections.drop import kotlin.collections.filter import kotlin.collections.filterNot @@ -46,7 +45,6 @@ import kotlin.collections.reversed import kotlin.collections.set import kotlin.collections.shuffled import kotlin.collections.sortedBy -import kotlin.collections.sum import kotlin.collections.take import kotlin.collections.toMutableSet import kotlin.math.max @@ -56,6 +54,7 @@ const val MaxBackoffEntries = 10 * 1024 const val MaxIAskedEntries = 256 const val MaxPeerIHaveEntries = 256 const val MaxIWantRequestsEntries = 10 * 1024 +const val MaxPeerIDontWantEntries = 256 typealias CurrentTimeSupplier = () -> Long @@ -122,6 +121,7 @@ open class GossipRouter( private val iAsked = createLRUMap(MaxIAskedEntries) private val peerIHave = createLRUMap(MaxPeerIHaveEntries) private val iWantRequests = createLRUMap, Long>(MaxIWantRequestsEntries) + private val peerIDontWant = createLRUMap(MaxPeerIDontWantEntries) private val heartbeatTask by lazy { executor.scheduleWithFixedDelay( ::catchingHeartbeat, @@ -166,6 +166,7 @@ open class GossipRouter( } override fun notifyUnseenMessage(peer: PeerHandler, msg: PubsubMessage) { + iDontWant(msg, peer) eventBroadcaster.notifyUnseenMessage(peer.peerId, msg) notifyAnyMessage(peer, msg) } @@ -250,8 +251,8 @@ open class GossipRouter( } override fun validateMessageListLimits(msg: Rpc.RPCOrBuilder): Boolean { - val iWantMessageIdCount = msg.control?.iwantList?.map { w -> w.messageIDsCount }?.sum() ?: 0 - val iHaveMessageIdCount = msg.control?.ihaveList?.map { w -> w.messageIDsCount }?.sum() ?: 0 + val iWantMessageIdCount = msg.control?.iwantList?.sumOf { w -> w.messageIDsCount } ?: 0 + val iHaveMessageIdCount = msg.control?.ihaveList?.sumOf { w -> w.messageIDsCount } ?: 0 return params.maxPublishedMessages?.let { msg.publishCount <= it } ?: true && params.maxTopicsPerPublishedMessage?.let { msg.publishList.none { m -> m.topicIDsCount > it } } ?: true && @@ -269,6 +270,7 @@ open class GossipRouter( is Rpc.ControlPrune -> handlePrune(controlMsg, receivedFrom) is Rpc.ControlIHave -> handleIHave(controlMsg, receivedFrom) is Rpc.ControlIWant -> handleIWant(controlMsg, receivedFrom) + is Rpc.ControlIDontWant -> handleIDontWant(controlMsg, receivedFrom) } } @@ -300,7 +302,7 @@ open class GossipRouter( mesh[topic]?.remove(peer)?.also { notifyPruned(peer, topic) } - if (this.protocol == PubsubProtocol.Gossip_V_1_1) { + if (this.protocol.supportsBackoffAndPX()) { if (msg.hasBackoff()) { setBackOff(peer, topic, msg.backoff.seconds.toMillis()) } else { @@ -348,8 +350,22 @@ open class GossipRouter( msg.messageIDsList .mapNotNull { mCache.getMessageForPeer(peer.peerId, it.toWBytes()) } .filter { it.sentCount < params.gossipRetransmission } - .map { it.msg } - .forEach { submitPublishMessage(peer, it) } + .forEach { submitPublishMessage(peer, it.msg) } + } + + private fun handleIDontWant(msg: Rpc.ControlIDontWant, peer: PeerHandler) { + if (!this.protocol.supportsIDontWant()) return + val peerScore = score.score(peer.peerId) + if (peerScore < scoreParams.gossipThreshold) return + val iDontWantCacheEntry = peerIDontWant.computeIfAbsent(peer) { IDontWantCacheEntry() } + iDontWantCacheEntry.heartbeatMessageIdsCount += msg.messageIDsCount + if (iDontWantCacheEntry.heartbeatMessageIdsCount > params.maxIDontWantMessageIds) { + return + } + val timeReceived = currentTimeSupplier() + msg.messageIDsList + .map { it.toWBytes() } + .associateWithTo(iDontWantCacheEntry.messageIdsAndTimeReceived) { timeReceived } } private fun processPrunePeers(peersList: List) { @@ -361,18 +377,20 @@ open class GossipRouter( override fun processControl(ctrl: Rpc.ControlMessage, receivedFrom: PeerHandler) { ctrl.run { - (graftList + pruneList + ihaveList + iwantList) + (graftList + pruneList + ihaveList + iwantList + idontwantList) }.forEach { processControlMessage(it, receivedFrom) } } override fun broadcastInbound(msgs: List, receivedFrom: PeerHandler) { msgs.forEach { pubMsg -> pubMsg.topics + .asSequence() .mapNotNull { mesh[it] } .flatten() .distinct() .plus(getDirectPeers()) - .filter { it != receivedFrom } + .minus(receivedFrom) + .filterNot { peerDoesNotWantMessage(it, pubMsg.messageId) } .forEach { submitPublishMessage(it, pubMsg) } mCache += pubMsg } @@ -398,15 +416,17 @@ open class GossipRouter( } .flatten() } - val list = peers.map { submitPublishMessage(it, msg) } + val list = peers + .filterNot { peerDoesNotWantMessage(it, msg.messageId) } + .map { submitPublishMessage(it, msg) } mCache += msg flushAllPending() - if (list.isNotEmpty()) { - return anyComplete(list) + return if (list.isNotEmpty()) { + anyComplete(list) } else { - return completedExceptionally( + completedExceptionally( NoPeersForOutboundMessageException("No peers for message topics ${msg.topics} found") ) } @@ -459,6 +479,15 @@ open class GossipRouter( .whenTrue { notifyIWantTimeout(key.first, key.second) } } + val staleIDontWantTime = this.currentTimeSupplier() - params.iDontWantTTL.toMillis() + peerIDontWant.entries.removeIf { (_, cacheEntry) -> + // reset on heartbeat + cacheEntry.heartbeatMessageIdsCount = 0 + cacheEntry.messageIdsAndTimeReceived.values.removeIf { timeReceived -> timeReceived < staleIDontWantTime } + // remove entry for peer if no IDONTWANT message ids are left in the cache + cacheEntry.messageIdsAndTimeReceived.isEmpty() + } + try { mesh.entries.forEach { (topic, peers) -> @@ -565,6 +594,10 @@ open class GossipRouter( } } + private fun peerDoesNotWantMessage(peer: PeerHandler, messageId: MessageId): Boolean { + return peerIDontWant[peer]?.messageIdsAndTimeReceived?.contains(messageId) == true + } + private fun iWant(peer: PeerHandler, messageIds: List) { if (messageIds.isEmpty()) return messageIds[random.nextInt(messageIds.size)] @@ -572,9 +605,21 @@ open class GossipRouter( enqueueIwant(peer, messageIds) } + private fun iDontWant(msg: PubsubMessage, receivedFrom: PeerHandler) { + if (!this.protocol.supportsIDontWant()) return + if (msg.protobufMessage.data.size() < params.iDontWantMinMessageSizeThreshold) return + // we need to send IDONTWANT messages to mesh peers immediately in order for them to have an effect + msg.topics + .mapNotNull { mesh[it] } + .flatten() + .distinct() + .minus(receivedFrom) + .forEach { peer -> sendIdontwant(peer, msg.messageId) } + } + private fun enqueuePrune(peer: PeerHandler, topic: Topic) { val peerQueue = pendingRpcParts.getQueue(peer) - if (peer.getPeerProtocol() == PubsubProtocol.Gossip_V_1_1 && this.protocol == PubsubProtocol.Gossip_V_1_1) { + if (peer.getPeerProtocol().supportsBackoffAndPX() && this.protocol.supportsBackoffAndPX()) { val backoffPeers = (getTopicPeers(topic) - peer) .take(params.maxPeersSentInPruneMsg) .filter { score.score(it.peerId) >= 0 } @@ -594,7 +639,25 @@ open class GossipRouter( private fun enqueueIhave(peer: PeerHandler, messageIds: List, topic: Topic) = pendingRpcParts.getQueue(peer).addIHaves(messageIds, topic) + private fun sendIdontwant(peer: PeerHandler, messageId: MessageId) { + if (!peer.getPeerProtocol().supportsIDontWant()) { + return + } + val iDontWant = Rpc.RPC.newBuilder().setControl( + Rpc.ControlMessage.newBuilder().addIdontwant( + Rpc.ControlIDontWant.newBuilder() + .addMessageIDs(messageId.toProtobuf()) + ) + ).build() + send(peer, iDontWant) + } + data class AcceptRequestsWhitelistEntry(val whitelistedTill: Long, val messagesAccepted: Int = 0) { fun incrementMessageCount() = AcceptRequestsWhitelistEntry(whitelistedTill, messagesAccepted + 1) } + + data class IDontWantCacheEntry( + var heartbeatMessageIdsCount: Int = 0, + val messageIdsAndTimeReceived: MutableMap = mutableMapOf() + ) } diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt index 7e9c9ebf..a0f0b18a 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt @@ -72,6 +72,12 @@ class GossipParamsBuilder { private var connectCallback: Function2? = null + private var maxIDontWantMessageIds: Int? = null + + private var iDontWantMinMessageSizeThreshold: Int? = null + + private var iDontWantTTL: Duration? = null + init { val source = GossipParams() this.D = source.D @@ -100,6 +106,9 @@ class GossipParamsBuilder { this.maxPruneMessages = source.maxPruneMessages this.gossipRetransmission = source.gossipRetransmission this.connectCallback = source.connectCallback + this.maxIDontWantMessageIds = source.maxIDontWantMessageIds + this.iDontWantMinMessageSizeThreshold = source.iDontWantMinMessageSizeThreshold + this.iDontWantTTL = source.iDontWantTTL } fun D(value: Int): GossipParamsBuilder = apply { D = value } @@ -172,6 +181,12 @@ class GossipParamsBuilder { connectCallback = value } + fun maxIDontWantMessageIds(value: Int): GossipParamsBuilder = apply { maxIDontWantMessageIds = value } + + fun iDontWantMinMessageSizeThreshold(value: Int): GossipParamsBuilder = apply { iDontWantMinMessageSizeThreshold = value } + + fun iDontWantTTL(value: Duration): GossipParamsBuilder = apply { iDontWantTTL = value } + fun build(): GossipParams { calculateMissing() checkRequiredFields() @@ -206,7 +221,10 @@ class GossipParamsBuilder { pruneBackoff = pruneBackoff!!, maxPruneMessages = maxPruneMessages, gossipRetransmission = gossipRetransmission!!, - connectCallback = connectCallback!! + connectCallback = connectCallback!!, + maxIDontWantMessageIds = maxIDontWantMessageIds!!, + iDontWantMinMessageSizeThreshold = iDontWantMinMessageSizeThreshold!!, + iDontWantTTL = iDontWantTTL!! ) } @@ -244,5 +262,8 @@ class GossipParamsBuilder { check(iWantFollowupTime != null, { "iWantFollowupTime must not be null" }) check(gossipRetransmission != null, { "gossipRetransmission must not be null" }) check(connectCallback != null, { "connectCallback must not be null" }) + check(maxIDontWantMessageIds != null, { "maxIDontWantMessageIds must not be null" }) + check(iDontWantMinMessageSizeThreshold != null, { "iDontWantMinMessageSizeThreshold must not be null" }) + check(iDontWantTTL != null, { "iDontWantTTL must not be null" }) } } diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipRouterBuilder.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipRouterBuilder.kt index 06b7db40..5c783ce5 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipRouterBuilder.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipRouterBuilder.kt @@ -16,7 +16,7 @@ typealias GossipScoreFactory = open class GossipRouterBuilder( var name: String = "GossipRouter", - var protocol: PubsubProtocol = PubsubProtocol.Gossip_V_1_1, + var protocol: PubsubProtocol = PubsubProtocol.Gossip_V_1_2, var params: GossipParams = GossipParams(), var scoreParams: GossipScoreParams = GossipScoreParams(), diff --git a/libp2p/src/main/proto/rpc.proto b/libp2p/src/main/proto/rpc.proto index 479e73b8..080eef47 100644 --- a/libp2p/src/main/proto/rpc.proto +++ b/libp2p/src/main/proto/rpc.proto @@ -28,6 +28,7 @@ message ControlMessage { repeated ControlIWant iwant = 2; repeated ControlGraft graft = 3; repeated ControlPrune prune = 4; + repeated ControlIDontWant idontwant = 5; } message ControlIHave { @@ -49,6 +50,10 @@ message ControlPrune { optional uint64 backoff = 3; } +message ControlIDontWant { + repeated bytes messageIDs = 1; +} + message PeerInfo { optional bytes peerID = 1; optional bytes signedPeerRecord = 2; diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipBackwardCompatibilityTest.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipBackwardCompatibilityTest.kt index 53031820..66b6f1c2 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipBackwardCompatibilityTest.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipBackwardCompatibilityTest.kt @@ -6,49 +6,49 @@ import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test class GossipBackwardCompatibilityTest : TwoGossipHostTestBase() { - override val router1 = GossipRouterBuilder(protocol = PubsubProtocol.Gossip_V_1_0).build() - override val router2 = GossipRouterBuilder(protocol = PubsubProtocol.Gossip_V_1_1).build() + override val router1 = GossipRouterBuilder(protocol = PubsubProtocol.Gossip_V_1_1).build() + override val router2 = GossipRouterBuilder(protocol = PubsubProtocol.Gossip_V_1_2).build() @Test - fun testConnect_1_0_to_1_1() { + fun testConnect_1_1_to_1_2() { connect() Assertions.assertEquals( - PubsubProtocol.Gossip_V_1_0.announceStr, + PubsubProtocol.Gossip_V_1_1.announceStr, router1.peers[0].getInboundHandler()!!.stream.getProtocol().get() ) Assertions.assertEquals( - PubsubProtocol.Gossip_V_1_0.announceStr, + PubsubProtocol.Gossip_V_1_1.announceStr, router1.peers[0].getOutboundHandler()!!.stream.getProtocol().get() ) Assertions.assertEquals( - PubsubProtocol.Gossip_V_1_0.announceStr, + PubsubProtocol.Gossip_V_1_1.announceStr, router2.peers[0].getInboundHandler()!!.stream.getProtocol().get() ) Assertions.assertEquals( - PubsubProtocol.Gossip_V_1_0.announceStr, + PubsubProtocol.Gossip_V_1_1.announceStr, router2.peers[0].getOutboundHandler()!!.stream.getProtocol().get() ) } @Test - fun testConnect_1_1_to_1_0() { + fun testConnect_1_2_to_1_1() { connect() Assertions.assertEquals( - PubsubProtocol.Gossip_V_1_0.announceStr, + PubsubProtocol.Gossip_V_1_1.announceStr, router1.peers[0].getInboundHandler()!!.stream.getProtocol().get() ) Assertions.assertEquals( - PubsubProtocol.Gossip_V_1_0.announceStr, + PubsubProtocol.Gossip_V_1_1.announceStr, router1.peers[0].getOutboundHandler()!!.stream.getProtocol().get() ) Assertions.assertEquals( - PubsubProtocol.Gossip_V_1_0.announceStr, + PubsubProtocol.Gossip_V_1_1.announceStr, router2.peers[0].getInboundHandler()!!.stream.getProtocol().get() ) Assertions.assertEquals( - PubsubProtocol.Gossip_V_1_0.announceStr, + PubsubProtocol.Gossip_V_1_1.announceStr, router2.peers[0].getOutboundHandler()!!.stream.getProtocol().get() ) } diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipTestsBase.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipTestsBase.kt new file mode 100644 index 00000000..ecc91225 --- /dev/null +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipTestsBase.kt @@ -0,0 +1,76 @@ +package io.libp2p.pubsub.gossip + +import io.libp2p.core.PeerId +import io.libp2p.etc.types.toBytesBigEndian +import io.libp2p.etc.types.toProtobuf +import io.libp2p.etc.types.toWBytes +import io.libp2p.pubsub.* +import io.libp2p.pubsub.DeterministicFuzz.Companion.createGossipFuzzRouterFactory +import io.libp2p.pubsub.DeterministicFuzz.Companion.createMockFuzzRouterFactory +import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder +import io.netty.handler.logging.LogLevel +import pubsub.pb.Rpc + +abstract class GossipTestsBase { + + protected val GossipScore.testPeerScores get() = (this as DefaultGossipScore).peerScores + + protected fun newProtoMessage(topic: Topic, seqNo: Long, data: ByteArray) = + Rpc.Message.newBuilder() + .addTopicIDs(topic) + .setSeqno(seqNo.toBytesBigEndian().toProtobuf()) + .setData(data.toProtobuf()) + .build() + + protected fun newMessage(topic: Topic, seqNo: Long, data: ByteArray) = + DefaultPubsubMessage(newProtoMessage(topic, seqNo, data)) + + protected fun getMessageId(msg: Rpc.Message): MessageId = msg.from.toWBytes() + msg.seqno.toWBytes() + + class ManyRoutersTest( + val mockRouterCount: Int = 10, + val params: GossipParams = GossipParams(), + val scoreParams: GossipScoreParams = GossipScoreParams(), + val protocol: PubsubProtocol = PubsubProtocol.Gossip_V_1_1 + ) { + val fuzz = DeterministicFuzz() + val gossipRouterBuilderFactory = { GossipRouterBuilder(protocol = protocol, params = params, scoreParams = scoreParams) } + val router0 = fuzz.createTestRouter(createGossipFuzzRouterFactory(gossipRouterBuilderFactory)) + val routers = (0 until mockRouterCount).map { fuzz.createTestRouter(createMockFuzzRouterFactory()) } + val connections = mutableListOf() + val gossipRouter = router0.router as GossipRouter + val mockRouters = routers.map { it.router as MockRouter } + + fun connectAll() = connect(routers.indices) + fun connect(routerIndexes: IntRange, outbound: Boolean = true): List { + val list = + routers.slice(routerIndexes).map { + if (outbound) { + router0.connectSemiDuplex(it, null, LogLevel.ERROR) + } else { + it.connectSemiDuplex(router0, null, LogLevel.ERROR) + } + } + connections += list + return list + } + + fun getMockRouter(peerId: PeerId) = mockRouters[routers.indexOfFirst { it.peerId == peerId }] + } + + class TwoRoutersTest( + val coreParams: GossipParams = GossipParams(), + val scoreParams: GossipScoreParams = GossipScoreParams(), + val mockRouterFactory: DeterministicFuzzRouterFactory = createMockFuzzRouterFactory(), + val protocol: PubsubProtocol = PubsubProtocol.Gossip_V_1_1 + ) { + val fuzz = DeterministicFuzz() + val gossipRouterBuilderFactory = { GossipRouterBuilder(protocol = protocol, params = coreParams, scoreParams = scoreParams) } + val router1 = fuzz.createTestRouter(createGossipFuzzRouterFactory(gossipRouterBuilderFactory)) + val router2 = fuzz.createTestRouter(mockRouterFactory) + val gossipRouter = router1.router as GossipRouter + val mockRouter = router2.router as MockRouter + + val connection = router1.connectSemiDuplex(router2, null, LogLevel.ERROR) + } +} diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt index 3750803e..f0ceb9eb 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt @@ -5,25 +5,9 @@ package io.libp2p.pubsub.gossip import com.google.common.util.concurrent.AtomicDouble import com.google.protobuf.ByteString import io.libp2p.core.PeerId -import io.libp2p.core.pubsub.MessageApi -import io.libp2p.core.pubsub.RESULT_IGNORE -import io.libp2p.core.pubsub.RESULT_INVALID -import io.libp2p.core.pubsub.RESULT_VALID -import io.libp2p.core.pubsub.Subscriber -import io.libp2p.core.pubsub.ValidationResult -import io.libp2p.core.pubsub.Validator -import io.libp2p.core.pubsub.createPubsubApi -import io.libp2p.etc.types.millis -import io.libp2p.etc.types.minutes -import io.libp2p.etc.types.seconds -import io.libp2p.etc.types.times -import io.libp2p.etc.types.toBytesBigEndian -import io.libp2p.etc.types.toProtobuf -import io.libp2p.etc.types.toWBytes -import io.libp2p.pubsub.* -import io.libp2p.pubsub.DeterministicFuzz.Companion.createGossipFuzzRouterFactory -import io.libp2p.pubsub.DeterministicFuzz.Companion.createMockFuzzRouterFactory -import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder +import io.libp2p.core.pubsub.* +import io.libp2p.etc.types.* +import io.libp2p.pubsub.MockRouter import io.netty.buffer.ByteBuf import io.netty.buffer.Unpooled import io.netty.channel.ChannelHandler @@ -31,10 +15,7 @@ import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelOutboundHandlerAdapter import io.netty.channel.ChannelPromise import io.netty.handler.logging.LogLevel -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertNotNull -import org.junit.jupiter.api.Assertions.assertNull -import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import pubsub.pb.Rpc import java.nio.charset.StandardCharsets @@ -43,52 +24,30 @@ import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference - -class GossipV1_1Tests { - - private val GossipScore.testPeerScores get() = (this as DefaultGossipScore).peerScores - - private fun newProtoMessage(topic: Topic, seqNo: Long, data: ByteArray) = - Rpc.Message.newBuilder() - .addTopicIDs(topic) - .setSeqno(seqNo.toBytesBigEndian().toProtobuf()) - .setData(data.toProtobuf()) - .build() - private fun newMessage(topic: Topic, seqNo: Long, data: ByteArray) = - DefaultPubsubMessage(newProtoMessage(topic, seqNo, data)) - - protected fun getMessageId(msg: Rpc.Message): MessageId = msg.from.toWBytes() + msg.seqno.toWBytes() - - class ManyRoutersTest( - val mockRouterCount: Int = 10, - val params: GossipParams = GossipParams(), - val scoreParams: GossipScoreParams = GossipScoreParams(), -// mockRouters: () -> List = { (0 until mockRouterCount).map { MockRouter() } } - ) { - val fuzz = DeterministicFuzz() - val gossipRouterBuilderFactory = { GossipRouterBuilder(params = params, scoreParams = scoreParams) } - val router0 = fuzz.createTestRouter(createGossipFuzzRouterFactory(gossipRouterBuilderFactory)) - val routers = (0 until mockRouterCount).map { fuzz.createTestRouter(createMockFuzzRouterFactory()) } - val connections = mutableListOf() - val gossipRouter = router0.router as GossipRouter - val mockRouters = routers.map { it.router as MockRouter } - - fun connectAll() = connect(routers.indices) - fun connect(routerIndexes: IntRange, outbound: Boolean = true): List { - val list = - routers.slice(routerIndexes).map { - if (outbound) { - router0.connectSemiDuplex(it, null, LogLevel.ERROR) - } else { - it.connectSemiDuplex(router0, null, LogLevel.ERROR) - } - } - connections += list - return list - } - - fun getMockRouter(peerId: PeerId) = mockRouters[routers.indexOfFirst { it.peerId == peerId }] - } +import kotlin.collections.List +import kotlin.collections.component1 +import kotlin.collections.component2 +import kotlin.collections.count +import kotlin.collections.distinct +import kotlin.collections.filter +import kotlin.collections.first +import kotlin.collections.flatMap +import kotlin.collections.forEach +import kotlin.collections.getValue +import kotlin.collections.intersect +import kotlin.collections.map +import kotlin.collections.mapValues +import kotlin.collections.minus +import kotlin.collections.mutableListOf +import kotlin.collections.mutableMapOf +import kotlin.collections.plusAssign +import kotlin.collections.set +import kotlin.collections.slice +import kotlin.collections.take +import kotlin.collections.toMap +import kotlin.collections.withDefault + +class GossipV1_1Tests : GossipTestsBase() { @Test fun selfSanityTest() { @@ -100,21 +59,6 @@ class GossipV1_1Tests { test.mockRouter.waitForMessage { it.publishCount > 0 } } - class TwoRoutersTest( - val coreParams: GossipParams = GossipParams(), - val scoreParams: GossipScoreParams = GossipScoreParams(), - val mockRouterFactory: DeterministicFuzzRouterFactory = createMockFuzzRouterFactory() - ) { - val fuzz = DeterministicFuzz() - val gossipRouterBuilderFactory = { GossipRouterBuilder(params = coreParams, scoreParams = scoreParams) } - val router1 = fuzz.createTestRouter(createGossipFuzzRouterFactory(gossipRouterBuilderFactory)) - val router2 = fuzz.createTestRouter(mockRouterFactory) - val gossipRouter = router1.router as GossipRouter - val mockRouter = router2.router as MockRouter - - val connection = router1.connectSemiDuplex(router2, null, LogLevel.ERROR) - } - @Test fun testSeenTTL() { val test = TwoRoutersTest(GossipParams(seenTTL = 1.minutes)) diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_2Tests.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_2Tests.kt new file mode 100644 index 00000000..cc7cd4d5 --- /dev/null +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_2Tests.kt @@ -0,0 +1,170 @@ +@file:Suppress("ktlint:standard:class-naming") + +package io.libp2p.pubsub.gossip + +import io.libp2p.etc.types.millis +import io.libp2p.etc.types.seconds +import io.libp2p.etc.types.toProtobuf +import io.libp2p.etc.types.toWBytes +import io.libp2p.pubsub.PubsubProtocol +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import pubsub.pb.Rpc + +class GossipV1_2Tests : GossipTestsBase() { + + @Test + fun selfSanityTest() { + val test = TwoRoutersTest(protocol = PubsubProtocol.Gossip_V_1_2) + + test.mockRouter.subscribe("topic1") + val msg = newMessage("topic1", 0L, "Hello".toByteArray()) + test.gossipRouter.publish(msg) + test.mockRouter.waitForMessage { it.publishCount > 0 } + } + + @Test + fun iDontWantIsBroadcastToMeshPeers() { + val test = startSingleTopicNetwork( + params = GossipParams(iDontWantMinMessageSizeThreshold = 5), + mockRouterCount = 3 + ) + + val publisher = test.mockRouters[0] + val gossipers = listOf(test.mockRouters[1], test.mockRouters[2]) + + val msg = newMessage("topic1", 0L, "Hello".toByteArray()) + + publisher.sendToSingle( + Rpc.RPC.newBuilder().addPublish(msg.protobufMessage).build() + ) + + test.fuzz.timeController.addTime(100.millis) + + val iDontWants = + gossipers.flatMap { it.inboundMessages }.filter { it.hasControl() }.flatMap { it.control.idontwantList } + + // both gossipers should have received IDONTWANT from the GossipRouter + assertTrue(iDontWants.size == 2) + + iDontWants.forEach { iDontWant -> + assertThat(iDontWant.messageIDsList.map { it.toWBytes() }).containsExactly(msg.messageId) + } + } + + @Test + fun messageIsNotBroadcastIfPeerHasSentIDONTWANT() { + val test = startSingleTopicNetwork( + params = GossipParams(iDontWantMinMessageSizeThreshold = 5), + mockRouterCount = 2 + ) + + val publisher = test.mockRouters[0] + val iDontWantPeer = test.mockRouters[1] + + val msg = newMessage("topic1", 0L, "Hello".toByteArray()) + + // sending IDONTWANT + iDontWantPeer.sendToSingle( + Rpc.RPC.newBuilder().setControl( + Rpc.ControlMessage.newBuilder().addIdontwant( + Rpc.ControlIDontWant.newBuilder().addMessageIDs(msg.messageId.toProtobuf()) + ) + ).build() + ) + + test.fuzz.timeController.addTime(100.millis) + + publisher.sendToSingle( + Rpc.RPC.newBuilder().addPublish(msg.protobufMessage).build() + ) + + test.fuzz.timeController.addTime(100.millis) + + val receivedMessages = iDontWantPeer.inboundMessages.flatMap { it.publishList } + + // message shouldn't have been received + assertThat(receivedMessages).isEmpty() + } + + @Test + fun iDontWantIsNotSentIfSizeIsLessThanTheMinimumConfigured() { + val test = startSingleTopicNetwork( + params = GossipParams(iDontWantMinMessageSizeThreshold = 5), + mockRouterCount = 3 + ) + + val publisher = test.mockRouters[0] + val gossipers = listOf(test.mockRouters[1], test.mockRouters[2]) + + // 4 bytes and minimum is 5, so IDONTWANT shouldn't be sent + val msg = newMessage("topic1", 0L, "Hell".toByteArray()) + + publisher.sendToSingle( + Rpc.RPC.newBuilder().addPublish(msg.protobufMessage).build() + ) + + test.fuzz.timeController.addTime(100.millis) + + val iDontWants = + gossipers.flatMap { it.inboundMessages }.filter { it.hasControl() }.flatMap { it.control.idontwantList } + + assertThat(iDontWants).isEmpty() + } + + @Test + fun testIDontWantTTL() { + val test = startSingleTopicNetwork( + // set TTL to 700ms + params = GossipParams(iDontWantMinMessageSizeThreshold = 5, iDontWantTTL = 700.millis), + mockRouterCount = 2 + ) + + val publisher = test.mockRouters[0] + val iDontWantPeer = test.mockRouters[1] + + val msg = newMessage("topic1", 0L, "Hello".toByteArray()) + + // sending IDONTWANT + iDontWantPeer.sendToSingle( + Rpc.RPC.newBuilder().setControl( + Rpc.ControlMessage.newBuilder().addIdontwant( + Rpc.ControlIDontWant.newBuilder().addMessageIDs(msg.messageId.toProtobuf()) + ) + ).build() + ) + + // 1 heartbeat - the IDONTWANT should have expired + test.fuzz.timeController.addTime(1.seconds) + + publisher.sendToSingle( + Rpc.RPC.newBuilder().addPublish(msg.protobufMessage).build() + ) + + test.fuzz.timeController.addTime(100.millis) + + val receivedMessages = iDontWantPeer.inboundMessages.flatMap { it.publishList } + + // message shouldn't have been received + assertThat(receivedMessages).containsExactly(msg.protobufMessage) + } + + private fun startSingleTopicNetwork(params: GossipParams, mockRouterCount: Int): ManyRoutersTest { + val test = ManyRoutersTest( + protocol = PubsubProtocol.Gossip_V_1_2, + params = params, + mockRouterCount = mockRouterCount + ) + + test.connectAll() + + test.gossipRouter.subscribe("topic1") + test.mockRouters.forEach { it.subscribe("topic1") } + + // 2 heartbeats - the topic should be GRAFTed + test.fuzz.timeController.addTime(2.seconds) + + return test + } +} diff --git a/tools/simulator/src/main/kotlin/io/libp2p/simulate/gossip/GossipSimPeer.kt b/tools/simulator/src/main/kotlin/io/libp2p/simulate/gossip/GossipSimPeer.kt index 4a60444f..08eadfaa 100644 --- a/tools/simulator/src/main/kotlin/io/libp2p/simulate/gossip/GossipSimPeer.kt +++ b/tools/simulator/src/main/kotlin/io/libp2p/simulate/gossip/GossipSimPeer.kt @@ -15,7 +15,7 @@ import java.util.concurrent.CompletableFuture class GossipSimPeer( override val simPeerId: Int, override val random: Random, - protocol: PubsubProtocol = PubsubProtocol.Gossip_V_1_1 + protocol: PubsubProtocol = PubsubProtocol.Gossip_V_1_2 ) : StreamSimPeer(true, protocol.announceStr) { var routerBuilder = SimGossipRouterBuilder() From a32f48666ab8acd5bd6d1004a6f588e63d2b440b Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Thu, 26 Sep 2024 13:53:20 +0100 Subject: [PATCH 7/8] Dependencies sweep (#376) --- build.gradle.kts | 12 ++++++++---- examples/android-chatter/build.gradle | 1 + .../kotlin/io/libp2p/example/chat/ChatProtocol.kt | 4 ++-- libp2p/build.gradle.kts | 2 +- .../libp2p/core/multistream/NegotiatedProtocol.kt | 2 +- .../main/kotlin/io/libp2p/crypto/Libp2pCrypto.kt | 4 ++-- .../src/main/kotlin/io/libp2p/crypto/keys/Rsa.kt | 4 ++-- .../libp2p/etc/util/netty/AbstractChildChannel.kt | 5 ++++- .../src/main/kotlin/io/libp2p/pubsub/SeenCache.kt | 4 +++- .../implementation/ConnectionOverNetty.kt | 8 ++++++-- .../kotlin/io/libp2p/tools/P2pdRunner.kt | 2 +- .../io/libp2p/simulate/stats/StatsFactory.kt | 2 +- .../libp2p/simulate/stream/Libp2pConnectionImpl.kt | 3 +-- .../main/kotlin/io/libp2p/simulate/util/NumExt.kt | 12 +++++++++--- versions.gradle | 14 +++++++------- 15 files changed, 49 insertions(+), 30 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index 631b7434..0b002b33 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -13,15 +13,15 @@ plugins { id("org.jetbrains.kotlin.jvm") version kotlinVersion apply false - id("com.github.ben-manes.versions").version("0.48.0") + id("com.github.ben-manes.versions").version("0.51.0") id("idea") id("io.gitlab.arturbosch.detekt").version("1.22.0") id("java") id("maven-publish") id("org.jetbrains.dokka").version("1.9.20") - id("com.diffplug.spotless").version("6.21.0") + id("com.diffplug.spotless").version("6.25.0") id("java-test-fixtures") - id("io.spring.dependency-management").version("1.1.3") + id("io.spring.dependency-management").version("1.1.6") id("org.jetbrains.kotlin.android") version kotlinVersion apply false id("com.android.application") version "7.4.2" apply false @@ -115,7 +115,11 @@ configure( "ktlint_standard_no-wildcard-imports" to "disabled", "ktlint_standard_enum-entry-name-case" to "disabled", "ktlint_standard_trailing-comma-on-call-site" to "disabled", - "ktlint_standard_trailing-comma-on-declaration-site" to "disabled" + "ktlint_standard_trailing-comma-on-declaration-site" to "disabled", + "ktlint_standard_value-parameter-comment" to "disabled", + "ktlint_standard_value-argument-comment" to "disabled", + "ktlint_standard_property-naming" to "disabled", + "ktlint_standard_function-naming" to "disabled" ) ) } diff --git a/examples/android-chatter/build.gradle b/examples/android-chatter/build.gradle index fdf82983..41dd1bfc 100644 --- a/examples/android-chatter/build.gradle +++ b/examples/android-chatter/build.gradle @@ -24,6 +24,7 @@ android { packagingOptions { exclude 'META-INF/io.netty.versions.properties' exclude 'META-INF/INDEX.LIST' + exclude 'META-INF/versions/9/OSGI-INF/MANIFEST.MF' } kotlinOptions { jvmTarget = "11" diff --git a/examples/chatter/src/main/kotlin/io/libp2p/example/chat/ChatProtocol.kt b/examples/chatter/src/main/kotlin/io/libp2p/example/chat/ChatProtocol.kt index 4517cde8..6674f8db 100644 --- a/examples/chatter/src/main/kotlin/io/libp2p/example/chat/ChatProtocol.kt +++ b/examples/chatter/src/main/kotlin/io/libp2p/example/chat/ChatProtocol.kt @@ -19,9 +19,9 @@ typealias OnChatMessage = (PeerId, String) -> Unit class Chat(chatCallback: OnChatMessage) : ChatBinding(ChatProtocol(chatCallback)) -const val protocolId: ProtocolId = "/example/chat/0.1.0" +const val PROTOCOL_ID: ProtocolId = "/example/chat/0.1.0" -open class ChatBinding(echo: ChatProtocol) : StrictProtocolBinding(protocolId, echo) +open class ChatBinding(echo: ChatProtocol) : StrictProtocolBinding(PROTOCOL_ID, echo) open class ChatProtocol( private val chatCallback: OnChatMessage diff --git a/libp2p/build.gradle.kts b/libp2p/build.gradle.kts index 7b8a8e98..4810f674 100644 --- a/libp2p/build.gradle.kts +++ b/libp2p/build.gradle.kts @@ -1,6 +1,6 @@ plugins { id("com.google.protobuf").version("0.9.4") - id("me.champeau.jmh").version("0.7.1") + id("me.champeau.jmh").version("0.7.2") } // https://docs.gradle.org/current/userguide/java_testing.html#ex-disable-publishing-of-test-fixtures-variants diff --git a/libp2p/src/main/kotlin/io/libp2p/core/multistream/NegotiatedProtocol.kt b/libp2p/src/main/kotlin/io/libp2p/core/multistream/NegotiatedProtocol.kt index e0b04d65..294fd1f6 100644 --- a/libp2p/src/main/kotlin/io/libp2p/core/multistream/NegotiatedProtocol.kt +++ b/libp2p/src/main/kotlin/io/libp2p/core/multistream/NegotiatedProtocol.kt @@ -6,7 +6,7 @@ import java.util.concurrent.CompletableFuture /** * Represents [ProtocolBinding] with exact protocol version which was agreed on */ -open class NegotiatedProtocol> ( +open class NegotiatedProtocol>( val binding: TBinding, val protocol: ProtocolId ) { diff --git a/libp2p/src/main/kotlin/io/libp2p/crypto/Libp2pCrypto.kt b/libp2p/src/main/kotlin/io/libp2p/crypto/Libp2pCrypto.kt index daa6ff4d..7cb7b0e7 100644 --- a/libp2p/src/main/kotlin/io/libp2p/crypto/Libp2pCrypto.kt +++ b/libp2p/src/main/kotlin/io/libp2p/crypto/Libp2pCrypto.kt @@ -19,11 +19,11 @@ import org.bouncycastle.crypto.macs.HMac import org.bouncycastle.crypto.params.KeyParameter /** - * ErrRsaKeyTooSmall is returned when trying to generate or parse an RSA key + * ERR_RSA_KEY_TOO_SMALL is returned when trying to generate or parse an RSA key * that's smaller than 512 bits. Keys need to be larger enough to sign a 256bit * hash so this is a reasonable absolute minimum. */ -const val ErrRsaKeyTooSmall = "rsa keys must be >= 512 bits to be useful" +const val ERR_RSA_KEY_TOO_SMALL = "rsa keys must be >= 512 bits to be useful" const val RSA_ALGORITHM = "RSA" const val SHA_ALGORITHM = "SHA-256" diff --git a/libp2p/src/main/kotlin/io/libp2p/crypto/keys/Rsa.kt b/libp2p/src/main/kotlin/io/libp2p/crypto/keys/Rsa.kt index 4e0e14f4..752a23cf 100644 --- a/libp2p/src/main/kotlin/io/libp2p/crypto/keys/Rsa.kt +++ b/libp2p/src/main/kotlin/io/libp2p/crypto/keys/Rsa.kt @@ -16,7 +16,7 @@ import crypto.pb.Crypto import io.libp2p.core.Libp2pException import io.libp2p.core.crypto.PrivKey import io.libp2p.core.crypto.PubKey -import io.libp2p.crypto.ErrRsaKeyTooSmall +import io.libp2p.crypto.ERR_RSA_KEY_TOO_SMALL import io.libp2p.crypto.KEY_PKCS8 import io.libp2p.crypto.Libp2pCrypto import io.libp2p.crypto.RSA_ALGORITHM @@ -100,7 +100,7 @@ class RsaPublicKey(private val k: JavaPublicKey) : PubKey(Crypto.KeyType.RSA) { @JvmOverloads fun generateRsaKeyPair(bits: Int, random: SecureRandom = SecureRandom()): Pair { if (bits < 2048) { - throw Libp2pException(ErrRsaKeyTooSmall) + throw Libp2pException(ERR_RSA_KEY_TOO_SMALL) } val kp: KeyPair = with( diff --git a/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/AbstractChildChannel.kt b/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/AbstractChildChannel.kt index d914e2a4..26060624 100644 --- a/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/AbstractChildChannel.kt +++ b/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/AbstractChildChannel.kt @@ -20,7 +20,10 @@ import java.net.SocketAddress */ abstract class AbstractChildChannel(parent: Channel, id: ChannelId?) : AbstractChannel(parent, id) { private enum class State { - OPEN, ACTIVE, INACTIVE, CLOSED + OPEN, + ACTIVE, + INACTIVE, + CLOSED } private val parentCloseFuture = parent.closeFuture() diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/SeenCache.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/SeenCache.kt index 676ba5f2..9136899a 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/SeenCache.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/SeenCache.kt @@ -51,7 +51,9 @@ class SimpleSeenCache : SeenCache { override fun put(msg: PubsubMessage, value: TValue) { map[msg.messageId] = value } - override fun remove(messageId: MessageId) { map -= messageId } + override fun remove(messageId: MessageId) { + map -= messageId + } } class LRUSeenCache(val delegate: SeenCache, private val maxSize: Int) : SeenCache by delegate { diff --git a/libp2p/src/main/kotlin/io/libp2p/transport/implementation/ConnectionOverNetty.kt b/libp2p/src/main/kotlin/io/libp2p/transport/implementation/ConnectionOverNetty.kt index f6617cc0..90c1d824 100644 --- a/libp2p/src/main/kotlin/io/libp2p/transport/implementation/ConnectionOverNetty.kt +++ b/libp2p/src/main/kotlin/io/libp2p/transport/implementation/ConnectionOverNetty.kt @@ -30,8 +30,12 @@ open class ConnectionOverNetty( ch.attr(CONNECTION).set(this) } - fun setMuxerSession(ms: StreamMuxer.Session) { muxerSession = ms } - fun setSecureSession(ss: SecureChannel.Session) { secureSession = ss } + fun setMuxerSession(ms: StreamMuxer.Session) { + muxerSession = ms + } + fun setSecureSession(ss: SecureChannel.Session) { + secureSession = ss + } override fun muxerSession() = muxerSession override fun secureSession() = secureSession diff --git a/libp2p/src/testFixtures/kotlin/io/libp2p/tools/P2pdRunner.kt b/libp2p/src/testFixtures/kotlin/io/libp2p/tools/P2pdRunner.kt index dd204b7d..2045ff67 100644 --- a/libp2p/src/testFixtures/kotlin/io/libp2p/tools/P2pdRunner.kt +++ b/libp2p/src/testFixtures/kotlin/io/libp2p/tools/P2pdRunner.kt @@ -14,7 +14,7 @@ class P2pdRunner(val execNames: List = listOf("p2pd", "p2pd.exe"), val e fun findP2pdExe(): String? = (predefinedSearchPaths + execSearchPaths) .flatMap { path -> execNames.map { File(path, it) } } - .firstOrNull() { it.canExecute() } + .firstOrNull { it.canExecute() } ?.absoluteFile?.canonicalPath fun launcher() = findP2pdExe()?.let { DaemonLauncher(it) } diff --git a/tools/simulator/src/main/kotlin/io/libp2p/simulate/stats/StatsFactory.kt b/tools/simulator/src/main/kotlin/io/libp2p/simulate/stats/StatsFactory.kt index ded5f14a..5eb94f54 100644 --- a/tools/simulator/src/main/kotlin/io/libp2p/simulate/stats/StatsFactory.kt +++ b/tools/simulator/src/main/kotlin/io/libp2p/simulate/stats/StatsFactory.kt @@ -17,7 +17,7 @@ interface StatsFactory { override fun toString() = "" } - var DEFAULT: StatsFactory = object : StatsFactory { + val DEFAULT: StatsFactory = object : StatsFactory { override fun createStats(name: String) = DescriptiveStatsImpl() } } diff --git a/tools/simulator/src/main/kotlin/io/libp2p/simulate/stream/Libp2pConnectionImpl.kt b/tools/simulator/src/main/kotlin/io/libp2p/simulate/stream/Libp2pConnectionImpl.kt index 3976b9c6..3ca75270 100644 --- a/tools/simulator/src/main/kotlin/io/libp2p/simulate/stream/Libp2pConnectionImpl.kt +++ b/tools/simulator/src/main/kotlin/io/libp2p/simulate/stream/Libp2pConnectionImpl.kt @@ -9,8 +9,7 @@ import io.libp2p.simulate.util.NullTransport import io.libp2p.transport.implementation.ConnectionOverNetty class Libp2pConnectionImpl( - val remoteAddr: - Multiaddr, + val remoteAddr: Multiaddr, isInitiator: Boolean, localPubkey: PubKey, remotePubkey: PubKey, diff --git a/tools/simulator/src/main/kotlin/io/libp2p/simulate/util/NumExt.kt b/tools/simulator/src/main/kotlin/io/libp2p/simulate/util/NumExt.kt index fae6fd79..be3113ac 100644 --- a/tools/simulator/src/main/kotlin/io/libp2p/simulate/util/NumExt.kt +++ b/tools/simulator/src/main/kotlin/io/libp2p/simulate/util/NumExt.kt @@ -22,11 +22,15 @@ fun Collection.groupByRangesBy( where TKey : Number, TKey : Comparable = groupByRangesBy(keyExtractor, { it }, *ranges) -fun Collection>.groupByRanges(vararg ranges: ClosedRange): Map, List> +fun Collection>.groupByRanges( + vararg ranges: ClosedRange +): Map, List> where T : Number, T : Comparable = groupByRangesBy({ it.first }, { it.second }, *ranges) -fun Collection.countByRanges(vararg ranges: ClosedRange): List +fun Collection.countByRanges( + vararg ranges: ClosedRange +): List where T : Number, T : Comparable { val v = this .map { it to it } @@ -35,7 +39,9 @@ fun Collection.countByRanges(vararg ranges: ClosedRange): List return ranges.map { v[it]?.size ?: 0 } } -fun Collection.countByRanges(ranges: List>): List +fun Collection.countByRanges( + ranges: List> +): List where T : Number, T : Comparable = countByRanges(*ranges.toTypedArray()) diff --git a/versions.gradle b/versions.gradle index 222ee08a..67743992 100644 --- a/versions.gradle +++ b/versions.gradle @@ -6,23 +6,23 @@ dependencyManagement { dependencies { dependency "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.4" - dependency "com.google.guava:guava:32.1.2-jre" + dependency "com.google.guava:guava:33.3.1-jre" dependency "org.slf4j:slf4j-api:2.0.9" - dependencySet(group: 'org.apache.logging.log4j', version: '2.20.0') { + dependencySet(group: 'org.apache.logging.log4j', version: '2.24.0') { entry 'log4j-core' entry 'log4j-slf4j2-impl' } - dependencySet(group: 'org.junit.jupiter', version: '5.10.0') { + dependencySet(group: 'org.junit.jupiter', version: '5.11.1') { entry 'junit-jupiter-api' entry 'junit-jupiter-engine' entry 'junit-jupiter-params' } dependency "io.mockk:mockk:1.13.3" - dependency "org.assertj:assertj-core:3.24.2" + dependency "org.assertj:assertj-core:3.26.3" - dependencySet(group: "org.openjdk.jmh", version: "1.36") { + dependencySet(group: "org.openjdk.jmh", version: "1.37") { entry 'jmh-core' entry 'jmh-generator-annprocess' } @@ -31,7 +31,7 @@ dependencyManagement { entry 'protobuf-java' entry 'protoc' } - dependencySet(group: "io.netty", version: "4.1.108.Final") { + dependencySet(group: "io.netty", version: "4.1.113.Final") { entry 'netty-common' entry 'netty-handler' entry 'netty-transport' @@ -41,7 +41,7 @@ dependencyManagement { } dependency "com.github.multiformats:java-multibase:v1.1.1" dependency "tech.pegasys:noise-java:22.1.0" - dependencySet(group: "org.bouncycastle", version: "1.76") { + dependencySet(group: "org.bouncycastle", version: "1.78.1") { entry 'bcprov-jdk18on' entry 'bcpkix-jdk18on' entry 'bctls-jdk18on' From 312a18cebfde72942439b2874ab6db68e2c10d03 Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Thu, 26 Sep 2024 14:37:50 +0100 Subject: [PATCH 8/8] 1.2.0 release --- build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle.kts b/build.gradle.kts index 0fcafa17..15ce9e5a 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -37,7 +37,7 @@ configure( } ) { group = "io.libp2p" - version = "1.1.1-RELEASE" + version = "1.2.0-RELEASE" apply(plugin = "kotlin") apply(plugin = "idea")