From 1b0253aa2d64d87885458aeb186df9f497f013b7 Mon Sep 17 00:00:00 2001 From: Sergey Nazarov Date: Mon, 28 Aug 2017 20:27:36 +0300 Subject: [PATCH 1/3] Excluded peer in the head of queue no longer blocks selection process --- .../com/wavesplatform/network/PeerDatabaseImpl.scala | 6 ++++++ .../network/peer/PeerDatabaseImplSpecification.scala | 11 ++++++++--- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/wavesplatform/network/PeerDatabaseImpl.scala b/src/main/scala/com/wavesplatform/network/PeerDatabaseImpl.scala index e534d204115..bc53a8d8498 100755 --- a/src/main/scala/com/wavesplatform/network/PeerDatabaseImpl.scala +++ b/src/main/scala/com/wavesplatform/network/PeerDatabaseImpl.scala @@ -25,7 +25,10 @@ class PeerDatabaseImpl(settings: NetworkSettings) extends PeerDatabase with Auto override def addCandidate(socketAddress: InetSocketAddress): Unit = unverifiedPeers.synchronized { if (!peersPersistence.containsKey(socketAddress) && !unverifiedPeers.contains(socketAddress)) { + log.trace(s"Adding candidate $socketAddress") unverifiedPeers.add(socketAddress) + } else { + log.trace(s"NOT adding candidate $socketAddress") } } @@ -55,6 +58,9 @@ class PeerDatabaseImpl(settings: NetworkSettings) extends PeerDatabase with Auto log.trace(s"Excluding: $excluded") def excludeAddress(isa: InetSocketAddress) = excluded(isa) || blacklistedHosts(isa.getAddress) + // excluded only contains local addresses, our declared address, and external declared addresses we already have + // connection to, so it's safe to filter out all matching candidates + unverifiedPeers.removeIf(isa => excluded(isa)) log.trace(s"Evicting queue: $unverifiedPeers") val unverified = Option(unverifiedPeers.peek()).filterNot(excludeAddress) val verified = Random.shuffle(knownPeers.keySet.diff(excluded).toSeq).headOption.filterNot(excludeAddress) diff --git a/src/test/scala/scorex/network/peer/PeerDatabaseImplSpecification.scala b/src/test/scala/scorex/network/peer/PeerDatabaseImplSpecification.scala index 2300a29c942..a8409aa0340 100755 --- a/src/test/scala/scorex/network/peer/PeerDatabaseImplSpecification.scala +++ b/src/test/scala/scorex/network/peer/PeerDatabaseImplSpecification.scala @@ -82,9 +82,6 @@ class PeerDatabaseImplSpecification extends path.FreeSpecLike with Matchers { database.randomPeer(Set()) should be(empty) } - } - - "Peer database2" - { "random peer should return peers from both from database and buffer" in { database2.touch(address1) database2.addCandidate(address2) @@ -97,6 +94,14 @@ class PeerDatabaseImplSpecification extends path.FreeSpecLike with Matchers { set should contain(address1) set should contain(address2) } + + "filters out excluded candidates" in { + database.addCandidate(address1) + database.addCandidate(address1) + database.addCandidate(address2) + + database.randomPeer(Set(address1)) should contain(address2) + } } private def sleepLong() = Thread.sleep(2200) From 192b54234aa6c93bfd33667c6604e69012e0d83f Mon Sep 17 00:00:00 2001 From: Sergey Nazarov Date: Mon, 28 Aug 2017 20:29:20 +0300 Subject: [PATCH 2/3] Trust declared address when it matches actual remote address --- .../it/network/client/package.scala | 3 +-- .../wavesplatform/network/AttributeKeys.scala | 9 --------- .../network/HandshakeHandler.scala | 15 ++++++--------- .../wavesplatform/network/NetworkServer.scala | 5 ++--- .../network/PeerSynchronizer.scala | 19 ++++++++++++++++--- 5 files changed, 25 insertions(+), 26 deletions(-) delete mode 100644 src/main/scala/com/wavesplatform/network/AttributeKeys.scala diff --git a/src/it/scala/com/wavesplatform/it/network/client/package.scala b/src/it/scala/com/wavesplatform/it/network/client/package.scala index 6d35420111a..5c1e6bf6e6c 100644 --- a/src/it/scala/com/wavesplatform/it/network/client/package.scala +++ b/src/it/scala/com/wavesplatform/it/network/client/package.scala @@ -1,8 +1,7 @@ package com.wavesplatform.it.network -import com.wavesplatform.network.AttributeKeys import io.netty.channel.Channel package object client { - def id(chan: Channel): String = s"[${chan.id().asShortText()}: ${chan.attr(AttributeKeys.DeclaredAddress)}]" + def id(chan: Channel): String = s"[${chan.id().asShortText()}]" } diff --git a/src/main/scala/com/wavesplatform/network/AttributeKeys.scala b/src/main/scala/com/wavesplatform/network/AttributeKeys.scala deleted file mode 100644 index 840dba60f6a..00000000000 --- a/src/main/scala/com/wavesplatform/network/AttributeKeys.scala +++ /dev/null @@ -1,9 +0,0 @@ -package com.wavesplatform.network - -import java.net.InetSocketAddress - -import io.netty.util.AttributeKey - -object AttributeKeys { - val DeclaredAddress = AttributeKey.newInstance[InetSocketAddress]("declared-address") -} diff --git a/src/main/scala/com/wavesplatform/network/HandshakeHandler.scala b/src/main/scala/com/wavesplatform/network/HandshakeHandler.scala index 043aebb7a78..4f5e0c80920 100644 --- a/src/main/scala/com/wavesplatform/network/HandshakeHandler.scala +++ b/src/main/scala/com/wavesplatform/network/HandshakeHandler.scala @@ -2,7 +2,7 @@ package com.wavesplatform.network import java.net.InetSocketAddress import java.util -import java.util.concurrent.{ConcurrentMap, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, TimeUnit} import io.netty.buffer.ByteBuf import io.netty.channel.ChannelHandler.Sharable @@ -52,10 +52,11 @@ class HandshakeTimeoutHandler(handshakeTimeout: FiniteDuration) extends ChannelI abstract class HandshakeHandler( localHandshake: Handshake, establishedConnections: ConcurrentMap[Channel, PeerInfo], - connections: ConcurrentMap[PeerKey, Channel], peerDatabase: PeerDatabase) extends ChannelInboundHandlerAdapter with ScorexLogging { import HandshakeHandler._ + private val connections = new ConcurrentHashMap[PeerKey, Channel](10, 0.9f, 10) + def connectionNegotiated(ctx: ChannelHandlerContext): Unit override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = msg match { @@ -112,10 +113,9 @@ object HandshakeHandler extends ScorexLogging { class Server( handshake: Handshake, establishedConnections: ConcurrentMap[Channel, PeerInfo], - connections: ConcurrentMap[PeerKey, Channel], peerDatabase: PeerDatabase, allChannels: ChannelGroup) - extends HandshakeHandler(handshake, establishedConnections, connections, peerDatabase) { + extends HandshakeHandler(handshake, establishedConnections, peerDatabase) { override def connectionNegotiated(ctx: ChannelHandlerContext) = { ctx.writeAndFlush(handshake.encode(ctx.alloc().buffer())) ctx.channel().closeFuture().addListener((_: ChannelFuture) => allChannels.remove(ctx.channel())) @@ -127,13 +127,10 @@ object HandshakeHandler extends ScorexLogging { class Client( handshake: Handshake, establishedConnections: ConcurrentMap[Channel, PeerInfo], - connections: ConcurrentMap[PeerKey, Channel], peerDatabase: PeerDatabase) - extends HandshakeHandler(handshake, establishedConnections, connections, peerDatabase) { + extends HandshakeHandler(handshake, establishedConnections, peerDatabase) { - override def connectionNegotiated(ctx: ChannelHandlerContext) = { - ctx.channel().attr(AttributeKeys.DeclaredAddress).set(ctx.remoteAddress) - } + override def connectionNegotiated(ctx: ChannelHandlerContext) = {} override def channelActive(ctx: ChannelHandlerContext) = { ctx.writeAndFlush(handshake.encode(ctx.alloc().buffer())) diff --git a/src/main/scala/com/wavesplatform/network/NetworkServer.scala b/src/main/scala/com/wavesplatform/network/NetworkServer.scala index 20b02ca48d0..f0760cbb4b1 100644 --- a/src/main/scala/com/wavesplatform/network/NetworkServer.scala +++ b/src/main/scala/com/wavesplatform/network/NetworkServer.scala @@ -81,10 +81,9 @@ class NetworkServer(checkpointService: CheckpointService, private val coordinatorHandler = new CoordinatorHandler(checkpointService, history, blockchainUpdater, time, stateReader, utxPool, blockchainReadiness, miner, settings, peerDatabase, allChannels) - private val peerUniqueness = new ConcurrentHashMap[PeerKey, Channel]() private val serverHandshakeHandler = - new HandshakeHandler.Server(handshake, peerInfo, peerUniqueness, peerDatabase, allChannels) + new HandshakeHandler.Server(handshake, peerInfo, peerDatabase, allChannels) private val utxPoolSynchronizer = new UtxPoolSynchronizer(utxPool, allChannels) @@ -124,7 +123,7 @@ class NetworkServer(checkpointService: CheckpointService, peerInfo.values().asScala.flatMap(_.declaredAddress) private val clientHandshakeHandler = - new HandshakeHandler.Client(handshake, peerInfo, peerUniqueness, peerDatabase) + new HandshakeHandler.Client(handshake, peerInfo, peerDatabase) private val bootstrap = new Bootstrap() diff --git a/src/main/scala/com/wavesplatform/network/PeerSynchronizer.scala b/src/main/scala/com/wavesplatform/network/PeerSynchronizer.scala index e96c623e412..26794c7bacd 100644 --- a/src/main/scala/com/wavesplatform/network/PeerSynchronizer.scala +++ b/src/main/scala/com/wavesplatform/network/PeerSynchronizer.scala @@ -1,5 +1,7 @@ package com.wavesplatform.network +import java.net.InetSocketAddress + import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter} import scorex.utils.ScorexLogging @@ -9,6 +11,7 @@ class PeerSynchronizer(peerDatabase: PeerDatabase, peerRequestInterval: FiniteDu extends ChannelInboundHandlerAdapter with ScorexLogging { private var peersRequested = false + private var declaredAddress = Option.empty[InetSocketAddress] def requestPeers(ctx: ChannelHandlerContext): Unit = if (ctx.channel().isActive) { log.trace(s"${id(ctx)} Requesting peers") @@ -20,13 +23,23 @@ class PeerSynchronizer(peerDatabase: PeerDatabase, peerRequestInterval: FiniteDu } } - override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef) = { - Option(ctx.channel().attr(AttributeKeys.DeclaredAddress).get()).foreach(peerDatabase.touch) + override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = { + declaredAddress.foreach(peerDatabase.touch) msg match { case hs: Handshake => - hs.declaredAddress.foreach(peerDatabase.addCandidate) + hs.declaredAddress.foreach { rda => + if (rda.getAddress == ctx.remoteAddress.getAddress) { + log.trace(s"${id(ctx)} Touching declared address") + peerDatabase.touch(rda) + declaredAddress = Some(rda) + } else { + log.debug(s"${id(ctx)} Declared address $rda does not match actual remote address") + } + } + requestPeers(ctx) super.channelRead(ctx, msg) case GetPeers => + log.debug(s"${id(ctx)} Sending known peers: ${peerDatabase.knownPeers.mkString("[", ", ", "]")}") ctx.writeAndFlush(KnownPeers(peerDatabase.knownPeers.keys.toSeq)) case KnownPeers(peers) if peersRequested => peersRequested = false From 32ab5dc40a7fb0cf2736a795cd64416d7d02c413 Mon Sep 17 00:00:00 2001 From: Sergey Nazarov Date: Mon, 28 Aug 2017 20:29:59 +0300 Subject: [PATCH 3/3] Changed integration tests to start from a star topology --- src/it/resources/template.conf | 2 +- src/it/scala/com/wavesplatform/it/Docker.scala | 8 +++++--- .../wavesplatform/it/network/client/NetworkClient.scala | 7 ++----- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/it/resources/template.conf b/src/it/resources/template.conf index 01f339affdf..d86244f9a6a 100644 --- a/src/it/resources/template.conf +++ b/src/it/resources/template.conf @@ -4,7 +4,7 @@ waves { network { known-peers = [] black-list-residence-time = 30s - peers-broadcast-interval = 5s + peers-broadcast-interval = 2s connection-timeout = 30s } blockchain { diff --git a/src/it/scala/com/wavesplatform/it/Docker.scala b/src/it/scala/com/wavesplatform/it/Docker.scala index be55be9179e..775ba931921 100644 --- a/src/it/scala/com/wavesplatform/it/Docker.scala +++ b/src/it/scala/com/wavesplatform/it/Docker.scala @@ -39,15 +39,14 @@ class Docker(suiteConfig: Config = ConfigFactory.empty) extends AutoCloseable wi private val client = DefaultDockerClient.fromEnv().build() private var nodes = Map.empty[String, Node] + private var seedAddress = Option.empty[String] private val isStopped = new AtomicBoolean(false) sys.addShutdownHook { close() } - private def knownPeers = nodes.values.zipWithIndex.map { - case (n, index) => s"-Dwaves.network.known-peers.$index=${n.nodeInfo.networkIpAddress}:${n.nodeInfo.containerNetworkPort}" - } mkString " " + private def knownPeers = seedAddress.fold("")(sa => s"-Dwaves.network.known-peers.0=$sa") private val networkName = "waves-" + this.##.toLong.toHexString @@ -100,6 +99,9 @@ class Docker(suiteConfig: Config = ConfigFactory.empty) extends AutoCloseable wi containerId, extractHostPort(ports, matcherApiPort)) val node = new Node(actualConfig, nodeInfo, http) + if (seedAddress.isEmpty) { + seedAddress = Some(s"${nodeInfo.networkIpAddress}:${nodeInfo.containerNetworkPort}") + } nodes += containerId -> node Await.result(node.lastBlock, Duration.Inf) node diff --git a/src/it/scala/com/wavesplatform/it/network/client/NetworkClient.scala b/src/it/scala/com/wavesplatform/it/network/client/NetworkClient.scala index f55db882aab..20db8dbb295 100644 --- a/src/it/scala/com/wavesplatform/it/network/client/NetworkClient.scala +++ b/src/it/scala/com/wavesplatform/it/network/client/NetworkClient.scala @@ -4,7 +4,7 @@ import java.net.InetSocketAddress import java.util.concurrent.ConcurrentHashMap import com.wavesplatform.Version -import com.wavesplatform.network.{Handshake, HandshakeHandler, PeerInfo, PeerKey} +import com.wavesplatform.network.{Handshake, HandshakeHandler, PeerInfo} import com.wavesplatform.settings._ import io.netty.bootstrap.Bootstrap import io.netty.channel._ @@ -27,12 +27,9 @@ class NetworkClient( Handshake(Constants.ApplicationName + chainId, Version.VersionTuple, nodeName, nonce, None) - private val peerUniqueness = new ConcurrentHashMap[PeerKey, Channel]() - private val channels = new ConcurrentHashMap[InetSocketAddress, Channel] - private val clientHandshakeHandler = - new HandshakeHandler.Client(handshake, peerInfo, peerUniqueness, NopPeerDatabase) + private val clientHandshakeHandler = new HandshakeHandler.Client(handshake, peerInfo, NopPeerDatabase) private val bootstrap = new Bootstrap() .group(workerGroup)