Skip to content

Commit

Permalink
Merge pull request #641 from wavesplatform/node-discovery-tool
Browse files Browse the repository at this point in the history
Node discovery tool
  • Loading branch information
Tolsi authored Nov 7, 2017
2 parents b27f5e7 + d495b95 commit 3803d5a
Show file tree
Hide file tree
Showing 18 changed files with 1,089 additions and 0 deletions.
16 changes: 16 additions & 0 deletions discovery/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
discovery {
chains = [
{
chain-id = W
initial-peers = ["138.201.152.163:6868", "138.201.152.164:6868", "138.201.152.165:6868", "35.156.19.4:6868", "52.50.69.247:6868", "52.57.147.71:6868"]
},
{
chain-id = T
initial-peers = ["52.30.47.67:6863", "52.28.66.217:6863", "52.77.111.219:6863", "52.51.92.182:6863"]
}
]
web-socket-host = "localhost"
web-socket-port = 8080
workers-count = 10
discovery-interval = 500ms
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.wavesplatform.discovery

import akka.actor.Cancellable

object CancellableExt {
implicit def Ext(self: Cancellable) = new {
def combine(other: Cancellable): Cancellable = new Cancellable {
override def cancel() = self.cancel() & other.cancel()
override def isCancelled = self.isCancelled && other.isCancelled
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.wavesplatform.discovery

import java.util.concurrent.TimeUnit

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.TextMessage
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import com.wavesplatform.discovery.actors.MainActor
import com.wavesplatform.discovery.actors.MainActor.WebSocketConnected
import com.wavesplatform.discovery.CancellableExt._
import scorex.utils.ScorexLogging

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

object DiscoveryApp extends App with ScorexLogging {

implicit val ec: ExecutionContext = ExecutionContext.global
implicit val system: ActorSystem = ActorSystem("Default")
implicit val flowMaterializer: ActorMaterializer = ActorMaterializer()
import akka.http.scaladsl.server.Directives._

val (route, timer) = Settings.default.chains.map {cs =>
val mainActor = MainActor(cs.chainId, Settings.default.workersCount)
mainActor ! MainActor.Peers(cs.initialPeers.toSet)

val route = get {
path(cs.chainId.toLower.toString) {

val sink: Sink[akka.http.scaladsl.model.ws.Message, _] = Sink.ignore
val source: Source[akka.http.scaladsl.model.ws.Message, NotUsed] =
Source.actorRef[String](1, OverflowStrategy.dropTail)
.mapMaterializedValue { actor =>
mainActor ! WebSocketConnected(actor)
NotUsed
}.map(
(outMsg: String) => TextMessage(outMsg))

handleWebSocketMessages(Flow.fromSinkAndSource(sink, source))
}
}

(route, system.scheduler.schedule(FiniteDuration(0, TimeUnit.SECONDS), Settings.default.discoveryInterval, mainActor, MainActor.Discover))
}.reduce((a,b) => (a._1 ~ b._1, a._2.combine(b._2)))

val binding = Http().bindAndHandle(route, Settings.default.webSocketHost, Settings.default.webSocketPort)

sys.addShutdownHook {
binding.flatMap(_.unbind()).onComplete(_ => {
timer.cancel()
system.terminate()
})
}

log.info(s"Server is now online at http://${Settings.default.webSocketHost}:${Settings.default.webSocketPort}")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.wavesplatform.discovery

import java.net.InetSocketAddress

import net.ceedubs.ficus.Ficus._
import com.typesafe.config.{Config, ConfigFactory}
import net.ceedubs.ficus.readers.{NameMapper, ValueReader}

import scala.concurrent.duration.FiniteDuration

case class ChainSettings(chainId: Char, initialPeers: Seq[InetSocketAddress])

case class Settings(chains: Seq[ChainSettings],
webSocketHost: String,
webSocketPort: Int,
workersCount: Int,
discoveryInterval: FiniteDuration)

object Settings{
implicit val readConfigInHyphen: NameMapper = net.ceedubs.ficus.readers.namemappers.implicits.hyphenCase // IDEA bug

implicit val inetSocketAddressReader: ValueReader[InetSocketAddress] = { (config: Config, path: String) =>
val value = config.as[String](s"$path").split(":")
new InetSocketAddress(
value(0),
value(1).toInt
)
}

implicit val charReader: ValueReader[Char] = (config: Config, path: String) => config.as[String](s"$path").head

import net.ceedubs.ficus.readers.ArbitraryTypeReader._

lazy val default: Settings = ConfigFactory.load().as[Settings]("discovery")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.wavesplatform.discovery.actors

import java.net.InetSocketAddress

import akka.actor.SupervisorStrategy.Resume
import akka.actor.{Actor, ActorRef, ActorSystem, OneForOneStrategy, Props, SupervisorStrategy}
import akka.routing.{ActorRefRoutee, Router}
import com.wavesplatform.discovery.collections.{ExpirationSet, Pool}
import com.wavesplatform.discovery.routers.SmallestMailboxWithThresholdRoutingLogic
import play.api.libs.json._

class MainActor(chainId: Char, workersCount: Int) extends Actor {
import MainActor._

private val mailboxThreshold = 5
private val router = {
val routes = Vector.fill(workersCount) {
ActorRefRoutee(context.actorOf(Props(classOf[PeerDiscoveryActor], chainId)))
}
Router(SmallestMailboxWithThresholdRoutingLogic(mailboxThreshold), routes)
}

private val alivePeers = new Pool[InetSocketAddress]
private val deadPeersCacheTimeout = 5
private val deadPeers = new ExpirationSet[InetSocketAddress](1000*60*60*1)
private val peerResponses = scala.collection.mutable.Map.empty[InetSocketAddress, Set[InetSocketAddress]]
private val connections = scala.collection.mutable.Set.empty[ActorRef]

override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case _: Exception => Resume
}

def receive: PartialFunction[Any, Unit] = {

case Peers(p) => (p -- deadPeers).foreach(alivePeers.add)

case Discover => alivePeers.next().foreach(peer => router.route(PeerDiscoveryActor.GetPeersFrom(peer), self))

case PeerInfo(peer, peers) => {
self ! Peers(peers)
deadPeers.remove(peer)
peerResponses.put(peer, peers) match {
case Some(oldValue) if oldValue == peers => //nothing changed
case _ if (peers -- deadPeers).nonEmpty => broadcastPeerInfo(peer, peers -- deadPeers)
case _ =>
}
}

case PeerProblem(peer) => {
println("PeerProblem")
alivePeers.remove(peer)
deadPeers.add(peer)
}

case WebSocketConnected(client) => {
connections.add(client)
client ! jsonPeersData
}
}

private def jsonPeersData = peerResponses.foldLeft(Json.obj())((json, keyValue) => json + (keyValue._1.getHostString, JsArray(keyValue._2.map(v => JsString(v.getHostString)).toSeq))).toString()

private def broadcastPeerInfo(peer: InetSocketAddress, peers: Set[InetSocketAddress]): Unit = {
val response = Json.obj(peer.getHostString -> JsArray(peers.map(p => JsString(p.getHostString)).toSeq)).toString()
connections.foreach(c => c ! response)
}
}

object MainActor {

case class PeerInfo(peer: InetSocketAddress, peers: Set[InetSocketAddress])

case class PeerProblem(peer: InetSocketAddress)

case class Peers(peers: Set[InetSocketAddress])

case class WebSocketConnected(actor: ActorRef)

case object Discover

def apply(chainId: Char, workersCount: Int)(implicit system: ActorSystem): ActorRef = {
system.actorOf(Props(classOf[MainActor], chainId, workersCount))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.wavesplatform.discovery.actors

import java.net.{InetAddress, InetSocketAddress}
import java.util.concurrent.TimeUnit

import akka.actor.Actor
import com.wavesplatform.discovery._
import com.wavesplatform.discovery.network._
import com.wavesplatform.network.{GetPeers, Handshake, KnownPeers, LegacyFrameCodec, PeerDatabase, PipelineInitializer}
import io.netty.bootstrap.Bootstrap
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.{LengthFieldBasedFrameDecoder, LengthFieldPrepender}

import scala.concurrent.Await
import scala.concurrent.duration.FiniteDuration

object PeerDiscoveryActor {
case class GetPeersFrom(peer: InetSocketAddress)

val peerDatabaseStub = new PeerDatabase {override def suspend(host: InetAddress): Unit = {}

override def knownPeers: Map[InetSocketAddress, Long] = Map.empty

override def randomPeer(excluded: Set[InetSocketAddress]): Option[InetSocketAddress] = None

override def blacklist(host: InetAddress, reason: String): Unit = {}

override def touch(socketAddress: InetSocketAddress): Unit = {}

override def suspendedHosts: Set[InetAddress] = Set.empty

override def blacklistedHosts: Set[InetAddress] = Set.empty

override def detailedBlacklist: Map[InetAddress, (Long, String)] = Map.empty

override def clearBlacklist(): Unit = {}

override def detailedSuspended: Map[InetAddress, Long] = Map.empty

override def addCandidate(socketAddress: InetSocketAddress): Unit = {}
}
}

class PeerDiscoveryActor(chainId: Char) extends Actor {

import PeerDiscoveryActor._

def receive: PartialFunction[Any, Unit] = {
case GetPeersFrom(peer) => context.parent ! MainActor.PeerInfo(peer, getPeersFromNode(peer))
}

private val getPeersTimeout = 10

private def getPeersFromNode(address: InetSocketAddress): Set[InetSocketAddress]= {
var peers: Set[InetSocketAddress] = Set.empty

val exceptionHandler = new ExceptionHandler()

implicit val workerGroup: NioEventLoopGroup = new NioEventLoopGroup

new Bootstrap()
.group(workerGroup)
.channel(classOf[NioSocketChannel])
.handler(new PipelineInitializer[SocketChannel](Seq(
exceptionHandler,
new HandshakeHandler(chainId),
new LengthFieldPrepender(4),
new LengthFieldBasedFrameDecoder(100 * 1024 * 1024, 0, 4, 0, 4),
new LegacyFrameCodec(peerDatabaseStub),
new MessageCodec(),
new MessageHandler({ case (msg, ctx) =>
msg match {
case hs: Handshake => ctx.writeAndFlush(GetPeers)
case KnownPeers(p) => peers = p.toSet; ctx.close()
case _ =>
}
})
)))
.remoteAddress(address.getAddress, address.getPort)
.connect()

Await.result(exceptionHandler.closed, new FiniteDuration(getPeersTimeout, TimeUnit.SECONDS))
workerGroup.shutdownGracefully()
peers
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.wavesplatform.discovery.collections

import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._

import com.google.common.cache.CacheBuilder

class ExpirationSet[T <: Object](expirationTimeMilis: Long) extends scala.collection.mutable.Set[T]{
private val emptyItem = new Object()

private var inner = CacheBuilder.newBuilder()
.expireAfterWrite(expirationTimeMilis, TimeUnit.MILLISECONDS)
.build[T, Object]()

override def +=(elem: T): ExpirationSet.this.type = {
inner.put(elem, emptyItem)
this
}

override def -=(elem: T): ExpirationSet.this.type = {
inner.invalidate(elem)
this
}

override def contains(elem: T): Boolean = inner.asMap().containsKey(elem)

override def iterator: Iterator[T] = inner.asMap().keySet().iterator().asScala
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.wavesplatform.discovery.collections

class Pool[T] {
private val queue = scala.collection.mutable.Queue.empty[T]
private val items = scala.collection.mutable.Set.empty[T]

def add(item: T): Unit = {
if (!items.contains(item)) {
items.add(item)
queue.enqueue(item)
}
}

def next(): Option[T] = {
if (queue.nonEmpty) {
val item = queue.dequeue()
queue.enqueue(item)
Some(item)
}
else None
}

def remove(item: T): Unit = {
items.remove(item)
queue.dequeueFirst(i => i == item)
}
}
Loading

0 comments on commit 3803d5a

Please sign in to comment.