Skip to content

Commit

Permalink
Merge pull request #472 from wavesplatform/node-116-peer-exchange
Browse files Browse the repository at this point in the history
NODE-116: Fixed peer exchange
  • Loading branch information
alexeykiselev authored Aug 29, 2017
2 parents c25acae + 32ab5dc commit e04fbde
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 38 deletions.
2 changes: 1 addition & 1 deletion src/it/resources/template.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ waves {
network {
known-peers = []
black-list-residence-time = 30s
peers-broadcast-interval = 5s
peers-broadcast-interval = 2s
connection-timeout = 30s
}
blockchain {
Expand Down
8 changes: 5 additions & 3 deletions src/it/scala/com/wavesplatform/it/Docker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,14 @@ class Docker(suiteConfig: Config = ConfigFactory.empty) extends AutoCloseable wi

private val client = DefaultDockerClient.fromEnv().build()
private var nodes = Map.empty[String, Node]
private var seedAddress = Option.empty[String]
private val isStopped = new AtomicBoolean(false)

sys.addShutdownHook {
close()
}

private def knownPeers = nodes.values.zipWithIndex.map {
case (n, index) => s"-Dwaves.network.known-peers.$index=${n.nodeInfo.networkIpAddress}:${n.nodeInfo.containerNetworkPort}"
} mkString " "
private def knownPeers = seedAddress.fold("")(sa => s"-Dwaves.network.known-peers.0=$sa")

private val networkName = "waves-" + this.##.toLong.toHexString

Expand Down Expand Up @@ -100,6 +99,9 @@ class Docker(suiteConfig: Config = ConfigFactory.empty) extends AutoCloseable wi
containerId,
extractHostPort(ports, matcherApiPort))
val node = new Node(actualConfig, nodeInfo, http)
if (seedAddress.isEmpty) {
seedAddress = Some(s"${nodeInfo.networkIpAddress}:${nodeInfo.containerNetworkPort}")
}
nodes += containerId -> node
Await.result(node.lastBlock, Duration.Inf)
node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.net.InetSocketAddress
import java.util.concurrent.ConcurrentHashMap

import com.wavesplatform.Version
import com.wavesplatform.network.{Handshake, HandshakeHandler, PeerInfo, PeerKey}
import com.wavesplatform.network.{Handshake, HandshakeHandler, PeerInfo}
import com.wavesplatform.settings._
import io.netty.bootstrap.Bootstrap
import io.netty.channel._
Expand All @@ -27,12 +27,9 @@ class NetworkClient(
Handshake(Constants.ApplicationName + chainId, Version.VersionTuple, nodeName,
nonce, None)

private val peerUniqueness = new ConcurrentHashMap[PeerKey, Channel]()

private val channels = new ConcurrentHashMap[InetSocketAddress, Channel]

private val clientHandshakeHandler =
new HandshakeHandler.Client(handshake, peerInfo, peerUniqueness, NopPeerDatabase)
private val clientHandshakeHandler = new HandshakeHandler.Client(handshake, peerInfo, NopPeerDatabase)

private val bootstrap = new Bootstrap()
.group(workerGroup)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package com.wavesplatform.it.network

import com.wavesplatform.network.AttributeKeys
import io.netty.channel.Channel

package object client {
def id(chan: Channel): String = s"[${chan.id().asShortText()}: ${chan.attr(AttributeKeys.DeclaredAddress)}]"
def id(chan: Channel): String = s"[${chan.id().asShortText()}]"
}
9 changes: 0 additions & 9 deletions src/main/scala/com/wavesplatform/network/AttributeKeys.scala

This file was deleted.

15 changes: 6 additions & 9 deletions src/main/scala/com/wavesplatform/network/HandshakeHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.wavesplatform.network

import java.net.InetSocketAddress
import java.util
import java.util.concurrent.{ConcurrentMap, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, TimeUnit}

import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandler.Sharable
Expand Down Expand Up @@ -52,10 +52,11 @@ class HandshakeTimeoutHandler(handshakeTimeout: FiniteDuration) extends ChannelI
abstract class HandshakeHandler(
localHandshake: Handshake,
establishedConnections: ConcurrentMap[Channel, PeerInfo],
connections: ConcurrentMap[PeerKey, Channel],
peerDatabase: PeerDatabase) extends ChannelInboundHandlerAdapter with ScorexLogging {
import HandshakeHandler._

private val connections = new ConcurrentHashMap[PeerKey, Channel](10, 0.9f, 10)

def connectionNegotiated(ctx: ChannelHandlerContext): Unit

override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = msg match {
Expand Down Expand Up @@ -112,10 +113,9 @@ object HandshakeHandler extends ScorexLogging {
class Server(
handshake: Handshake,
establishedConnections: ConcurrentMap[Channel, PeerInfo],
connections: ConcurrentMap[PeerKey, Channel],
peerDatabase: PeerDatabase,
allChannels: ChannelGroup)
extends HandshakeHandler(handshake, establishedConnections, connections, peerDatabase) {
extends HandshakeHandler(handshake, establishedConnections, peerDatabase) {
override def connectionNegotiated(ctx: ChannelHandlerContext) = {
ctx.writeAndFlush(handshake.encode(ctx.alloc().buffer()))
ctx.channel().closeFuture().addListener((_: ChannelFuture) => allChannels.remove(ctx.channel()))
Expand All @@ -127,13 +127,10 @@ object HandshakeHandler extends ScorexLogging {
class Client(
handshake: Handshake,
establishedConnections: ConcurrentMap[Channel, PeerInfo],
connections: ConcurrentMap[PeerKey, Channel],
peerDatabase: PeerDatabase)
extends HandshakeHandler(handshake, establishedConnections, connections, peerDatabase) {
extends HandshakeHandler(handshake, establishedConnections, peerDatabase) {

override def connectionNegotiated(ctx: ChannelHandlerContext) = {
ctx.channel().attr(AttributeKeys.DeclaredAddress).set(ctx.remoteAddress)
}
override def connectionNegotiated(ctx: ChannelHandlerContext) = {}

override def channelActive(ctx: ChannelHandlerContext) = {
ctx.writeAndFlush(handshake.encode(ctx.alloc().buffer()))
Expand Down
5 changes: 2 additions & 3 deletions src/main/scala/com/wavesplatform/network/NetworkServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,9 @@ class NetworkServer(checkpointService: CheckpointService,

private val coordinatorHandler = new CoordinatorHandler(checkpointService, history, blockchainUpdater, time,
stateReader, utxPool, blockchainReadiness, miner, settings, peerDatabase, allChannels)
private val peerUniqueness = new ConcurrentHashMap[PeerKey, Channel]()

private val serverHandshakeHandler =
new HandshakeHandler.Server(handshake, peerInfo, peerUniqueness, peerDatabase, allChannels)
new HandshakeHandler.Server(handshake, peerInfo, peerDatabase, allChannels)

private val utxPoolSynchronizer = new UtxPoolSynchronizer(utxPool, allChannels)

Expand Down Expand Up @@ -124,7 +123,7 @@ class NetworkServer(checkpointService: CheckpointService,
peerInfo.values().asScala.flatMap(_.declaredAddress)

private val clientHandshakeHandler =
new HandshakeHandler.Client(handshake, peerInfo, peerUniqueness, peerDatabase)
new HandshakeHandler.Client(handshake, peerInfo, peerDatabase)


private val bootstrap = new Bootstrap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ class PeerDatabaseImpl(settings: NetworkSettings) extends PeerDatabase with Auto

override def addCandidate(socketAddress: InetSocketAddress): Unit = unverifiedPeers.synchronized {
if (!peersPersistence.containsKey(socketAddress) && !unverifiedPeers.contains(socketAddress)) {
log.trace(s"Adding candidate $socketAddress")
unverifiedPeers.add(socketAddress)
} else {
log.trace(s"NOT adding candidate $socketAddress")
}
}

Expand Down Expand Up @@ -55,6 +58,9 @@ class PeerDatabaseImpl(settings: NetworkSettings) extends PeerDatabase with Auto
log.trace(s"Excluding: $excluded")
def excludeAddress(isa: InetSocketAddress) = excluded(isa) || blacklistedHosts(isa.getAddress)

// excluded only contains local addresses, our declared address, and external declared addresses we already have
// connection to, so it's safe to filter out all matching candidates
unverifiedPeers.removeIf(isa => excluded(isa))
log.trace(s"Evicting queue: $unverifiedPeers")
val unverified = Option(unverifiedPeers.peek()).filterNot(excludeAddress)
val verified = Random.shuffle(knownPeers.keySet.diff(excluded).toSeq).headOption.filterNot(excludeAddress)
Expand Down
19 changes: 16 additions & 3 deletions src/main/scala/com/wavesplatform/network/PeerSynchronizer.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.wavesplatform.network

import java.net.InetSocketAddress

import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}
import scorex.utils.ScorexLogging

Expand All @@ -9,6 +11,7 @@ class PeerSynchronizer(peerDatabase: PeerDatabase, peerRequestInterval: FiniteDu
extends ChannelInboundHandlerAdapter with ScorexLogging {

private var peersRequested = false
private var declaredAddress = Option.empty[InetSocketAddress]

def requestPeers(ctx: ChannelHandlerContext): Unit = if (ctx.channel().isActive) {
log.trace(s"${id(ctx)} Requesting peers")
Expand All @@ -20,13 +23,23 @@ class PeerSynchronizer(peerDatabase: PeerDatabase, peerRequestInterval: FiniteDu
}
}

override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef) = {
Option(ctx.channel().attr(AttributeKeys.DeclaredAddress).get()).foreach(peerDatabase.touch)
override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = {
declaredAddress.foreach(peerDatabase.touch)
msg match {
case hs: Handshake =>
hs.declaredAddress.foreach(peerDatabase.addCandidate)
hs.declaredAddress.foreach { rda =>
if (rda.getAddress == ctx.remoteAddress.getAddress) {
log.trace(s"${id(ctx)} Touching declared address")
peerDatabase.touch(rda)
declaredAddress = Some(rda)
} else {
log.debug(s"${id(ctx)} Declared address $rda does not match actual remote address")
}
}
requestPeers(ctx)
super.channelRead(ctx, msg)
case GetPeers =>
log.debug(s"${id(ctx)} Sending known peers: ${peerDatabase.knownPeers.mkString("[", ", ", "]")}")
ctx.writeAndFlush(KnownPeers(peerDatabase.knownPeers.keys.toSeq))
case KnownPeers(peers) if peersRequested =>
peersRequested = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ class PeerDatabaseImplSpecification extends path.FreeSpecLike with Matchers {
database.randomPeer(Set()) should be(empty)
}

}

"Peer database2" - {
"random peer should return peers from both from database and buffer" in {
database2.touch(address1)
database2.addCandidate(address2)
Expand All @@ -97,6 +94,14 @@ class PeerDatabaseImplSpecification extends path.FreeSpecLike with Matchers {
set should contain(address1)
set should contain(address2)
}

"filters out excluded candidates" in {
database.addCandidate(address1)
database.addCandidate(address1)
database.addCandidate(address2)

database.randomPeer(Set(address1)) should contain(address2)
}
}

private def sleepLong() = Thread.sleep(2200)
Expand Down

0 comments on commit e04fbde

Please sign in to comment.