diff --git a/discovery/src/main/resources/application.conf b/discovery/src/main/resources/application.conf new file mode 100644 index 00000000000..9ea933ba4c2 --- /dev/null +++ b/discovery/src/main/resources/application.conf @@ -0,0 +1,16 @@ +discovery { + chains = [ + { + chain-id = W + initial-peers = ["138.201.152.163:6868", "138.201.152.164:6868", "138.201.152.165:6868", "35.156.19.4:6868", "52.50.69.247:6868", "52.57.147.71:6868"] + }, + { + chain-id = T + initial-peers = ["52.30.47.67:6863", "52.28.66.217:6863", "52.77.111.219:6863", "52.51.92.182:6863"] + } + ] + web-socket-host = "localhost" + web-socket-port = 8080 + workers-count = 10 + discovery-interval = 500ms +} diff --git a/discovery/src/main/scala/com.wavesplatform.discovery/CancellableExt.scala b/discovery/src/main/scala/com.wavesplatform.discovery/CancellableExt.scala new file mode 100644 index 00000000000..8eec3d34d55 --- /dev/null +++ b/discovery/src/main/scala/com.wavesplatform.discovery/CancellableExt.scala @@ -0,0 +1,12 @@ +package com.wavesplatform.discovery + +import akka.actor.Cancellable + +object CancellableExt { + implicit def Ext(self: Cancellable) = new { + def combine(other: Cancellable): Cancellable = new Cancellable { + override def cancel() = self.cancel() & other.cancel() + override def isCancelled = self.isCancelled && other.isCancelled + } + } +} \ No newline at end of file diff --git a/discovery/src/main/scala/com.wavesplatform.discovery/DiscoveryApp.scala b/discovery/src/main/scala/com.wavesplatform.discovery/DiscoveryApp.scala new file mode 100644 index 00000000000..8f2998b9391 --- /dev/null +++ b/discovery/src/main/scala/com.wavesplatform.discovery/DiscoveryApp.scala @@ -0,0 +1,59 @@ +package com.wavesplatform.discovery + +import java.util.concurrent.TimeUnit + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.ws.TextMessage +import akka.stream.scaladsl.{Flow, Sink, Source} +import akka.stream.{ActorMaterializer, OverflowStrategy} +import com.wavesplatform.discovery.actors.MainActor +import com.wavesplatform.discovery.actors.MainActor.WebSocketConnected +import com.wavesplatform.discovery.CancellableExt._ +import scorex.utils.ScorexLogging + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.FiniteDuration + +object DiscoveryApp extends App with ScorexLogging { + + implicit val ec: ExecutionContext = ExecutionContext.global + implicit val system: ActorSystem = ActorSystem("Default") + implicit val flowMaterializer: ActorMaterializer = ActorMaterializer() + import akka.http.scaladsl.server.Directives._ + + val (route, timer) = Settings.default.chains.map {cs => + val mainActor = MainActor(cs.chainId, Settings.default.workersCount) + mainActor ! MainActor.Peers(cs.initialPeers.toSet) + + val route = get { + path(cs.chainId.toLower.toString) { + + val sink: Sink[akka.http.scaladsl.model.ws.Message, _] = Sink.ignore + val source: Source[akka.http.scaladsl.model.ws.Message, NotUsed] = + Source.actorRef[String](1, OverflowStrategy.dropTail) + .mapMaterializedValue { actor => + mainActor ! WebSocketConnected(actor) + NotUsed + }.map( + (outMsg: String) => TextMessage(outMsg)) + + handleWebSocketMessages(Flow.fromSinkAndSource(sink, source)) + } + } + + (route, system.scheduler.schedule(FiniteDuration(0, TimeUnit.SECONDS), Settings.default.discoveryInterval, mainActor, MainActor.Discover)) + }.reduce((a,b) => (a._1 ~ b._1, a._2.combine(b._2))) + + val binding = Http().bindAndHandle(route, Settings.default.webSocketHost, Settings.default.webSocketPort) + + sys.addShutdownHook { + binding.flatMap(_.unbind()).onComplete(_ => { + timer.cancel() + system.terminate() + }) + } + + log.info(s"Server is now online at http://${Settings.default.webSocketHost}:${Settings.default.webSocketPort}") +} diff --git a/discovery/src/main/scala/com.wavesplatform.discovery/Settings.scala b/discovery/src/main/scala/com.wavesplatform.discovery/Settings.scala new file mode 100644 index 00000000000..90f701b132b --- /dev/null +++ b/discovery/src/main/scala/com.wavesplatform.discovery/Settings.scala @@ -0,0 +1,35 @@ +package com.wavesplatform.discovery + +import java.net.InetSocketAddress + +import net.ceedubs.ficus.Ficus._ +import com.typesafe.config.{Config, ConfigFactory} +import net.ceedubs.ficus.readers.{NameMapper, ValueReader} + +import scala.concurrent.duration.FiniteDuration + +case class ChainSettings(chainId: Char, initialPeers: Seq[InetSocketAddress]) + +case class Settings(chains: Seq[ChainSettings], + webSocketHost: String, + webSocketPort: Int, + workersCount: Int, + discoveryInterval: FiniteDuration) + +object Settings{ + implicit val readConfigInHyphen: NameMapper = net.ceedubs.ficus.readers.namemappers.implicits.hyphenCase // IDEA bug + + implicit val inetSocketAddressReader: ValueReader[InetSocketAddress] = { (config: Config, path: String) => + val value = config.as[String](s"$path").split(":") + new InetSocketAddress( + value(0), + value(1).toInt + ) + } + + implicit val charReader: ValueReader[Char] = (config: Config, path: String) => config.as[String](s"$path").head + + import net.ceedubs.ficus.readers.ArbitraryTypeReader._ + + lazy val default: Settings = ConfigFactory.load().as[Settings]("discovery") +} \ No newline at end of file diff --git a/discovery/src/main/scala/com.wavesplatform.discovery/actors/MainActor.scala b/discovery/src/main/scala/com.wavesplatform.discovery/actors/MainActor.scala new file mode 100644 index 00000000000..9349563c9c1 --- /dev/null +++ b/discovery/src/main/scala/com.wavesplatform.discovery/actors/MainActor.scala @@ -0,0 +1,84 @@ +package com.wavesplatform.discovery.actors + +import java.net.InetSocketAddress + +import akka.actor.SupervisorStrategy.Resume +import akka.actor.{Actor, ActorRef, ActorSystem, OneForOneStrategy, Props, SupervisorStrategy} +import akka.routing.{ActorRefRoutee, Router} +import com.wavesplatform.discovery.collections.{ExpirationSet, Pool} +import com.wavesplatform.discovery.routers.SmallestMailboxWithThresholdRoutingLogic +import play.api.libs.json._ + +class MainActor(chainId: Char, workersCount: Int) extends Actor { + import MainActor._ + + private val mailboxThreshold = 5 + private val router = { + val routes = Vector.fill(workersCount) { + ActorRefRoutee(context.actorOf(Props(classOf[PeerDiscoveryActor], chainId))) + } + Router(SmallestMailboxWithThresholdRoutingLogic(mailboxThreshold), routes) + } + + private val alivePeers = new Pool[InetSocketAddress] + private val deadPeersCacheTimeout = 5 + private val deadPeers = new ExpirationSet[InetSocketAddress](1000*60*60*1) + private val peerResponses = scala.collection.mutable.Map.empty[InetSocketAddress, Set[InetSocketAddress]] + private val connections = scala.collection.mutable.Set.empty[ActorRef] + + override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { + case _: Exception => Resume + } + + def receive: PartialFunction[Any, Unit] = { + + case Peers(p) => (p -- deadPeers).foreach(alivePeers.add) + + case Discover => alivePeers.next().foreach(peer => router.route(PeerDiscoveryActor.GetPeersFrom(peer), self)) + + case PeerInfo(peer, peers) => { + self ! Peers(peers) + deadPeers.remove(peer) + peerResponses.put(peer, peers) match { + case Some(oldValue) if oldValue == peers => //nothing changed + case _ if (peers -- deadPeers).nonEmpty => broadcastPeerInfo(peer, peers -- deadPeers) + case _ => + } + } + + case PeerProblem(peer) => { + println("PeerProblem") + alivePeers.remove(peer) + deadPeers.add(peer) + } + + case WebSocketConnected(client) => { + connections.add(client) + client ! jsonPeersData + } + } + + private def jsonPeersData = peerResponses.foldLeft(Json.obj())((json, keyValue) => json + (keyValue._1.getHostString, JsArray(keyValue._2.map(v => JsString(v.getHostString)).toSeq))).toString() + + private def broadcastPeerInfo(peer: InetSocketAddress, peers: Set[InetSocketAddress]): Unit = { + val response = Json.obj(peer.getHostString -> JsArray(peers.map(p => JsString(p.getHostString)).toSeq)).toString() + connections.foreach(c => c ! response) + } +} + +object MainActor { + + case class PeerInfo(peer: InetSocketAddress, peers: Set[InetSocketAddress]) + + case class PeerProblem(peer: InetSocketAddress) + + case class Peers(peers: Set[InetSocketAddress]) + + case class WebSocketConnected(actor: ActorRef) + + case object Discover + + def apply(chainId: Char, workersCount: Int)(implicit system: ActorSystem): ActorRef = { + system.actorOf(Props(classOf[MainActor], chainId, workersCount)) + } +} \ No newline at end of file diff --git a/discovery/src/main/scala/com.wavesplatform.discovery/actors/PeerDiscoveryActor.scala b/discovery/src/main/scala/com.wavesplatform.discovery/actors/PeerDiscoveryActor.scala new file mode 100644 index 00000000000..5786087581b --- /dev/null +++ b/discovery/src/main/scala/com.wavesplatform.discovery/actors/PeerDiscoveryActor.scala @@ -0,0 +1,89 @@ +package com.wavesplatform.discovery.actors + +import java.net.{InetAddress, InetSocketAddress} +import java.util.concurrent.TimeUnit + +import akka.actor.Actor +import com.wavesplatform.discovery._ +import com.wavesplatform.discovery.network._ +import com.wavesplatform.network.{GetPeers, Handshake, KnownPeers, LegacyFrameCodec, PeerDatabase, PipelineInitializer} +import io.netty.bootstrap.Bootstrap +import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.socket.SocketChannel +import io.netty.channel.socket.nio.NioSocketChannel +import io.netty.handler.codec.{LengthFieldBasedFrameDecoder, LengthFieldPrepender} + +import scala.concurrent.Await +import scala.concurrent.duration.FiniteDuration + +object PeerDiscoveryActor { + case class GetPeersFrom(peer: InetSocketAddress) + + val peerDatabaseStub = new PeerDatabase {override def suspend(host: InetAddress): Unit = {} + + override def knownPeers: Map[InetSocketAddress, Long] = Map.empty + + override def randomPeer(excluded: Set[InetSocketAddress]): Option[InetSocketAddress] = None + + override def blacklist(host: InetAddress, reason: String): Unit = {} + + override def touch(socketAddress: InetSocketAddress): Unit = {} + + override def suspendedHosts: Set[InetAddress] = Set.empty + + override def blacklistedHosts: Set[InetAddress] = Set.empty + + override def detailedBlacklist: Map[InetAddress, (Long, String)] = Map.empty + + override def clearBlacklist(): Unit = {} + + override def detailedSuspended: Map[InetAddress, Long] = Map.empty + + override def addCandidate(socketAddress: InetSocketAddress): Unit = {} + } +} + +class PeerDiscoveryActor(chainId: Char) extends Actor { + + import PeerDiscoveryActor._ + + def receive: PartialFunction[Any, Unit] = { + case GetPeersFrom(peer) => context.parent ! MainActor.PeerInfo(peer, getPeersFromNode(peer)) + } + + private val getPeersTimeout = 10 + + private def getPeersFromNode(address: InetSocketAddress): Set[InetSocketAddress]= { + var peers: Set[InetSocketAddress] = Set.empty + + val exceptionHandler = new ExceptionHandler() + + implicit val workerGroup: NioEventLoopGroup = new NioEventLoopGroup + + new Bootstrap() + .group(workerGroup) + .channel(classOf[NioSocketChannel]) + .handler(new PipelineInitializer[SocketChannel](Seq( + exceptionHandler, + new HandshakeHandler(chainId), + new LengthFieldPrepender(4), + new LengthFieldBasedFrameDecoder(100 * 1024 * 1024, 0, 4, 0, 4), + new LegacyFrameCodec(peerDatabaseStub), + new MessageCodec(), + new MessageHandler({ case (msg, ctx) => + msg match { + case hs: Handshake => ctx.writeAndFlush(GetPeers) + case KnownPeers(p) => peers = p.toSet; ctx.close() + case _ => + } + }) + ))) + .remoteAddress(address.getAddress, address.getPort) + .connect() + + Await.result(exceptionHandler.closed, new FiniteDuration(getPeersTimeout, TimeUnit.SECONDS)) + workerGroup.shutdownGracefully() + peers + } + +} \ No newline at end of file diff --git a/discovery/src/main/scala/com.wavesplatform.discovery/collections/ExpirationSet.scala b/discovery/src/main/scala/com.wavesplatform.discovery/collections/ExpirationSet.scala new file mode 100644 index 00000000000..915b3c2815e --- /dev/null +++ b/discovery/src/main/scala/com.wavesplatform.discovery/collections/ExpirationSet.scala @@ -0,0 +1,28 @@ +package com.wavesplatform.discovery.collections + +import java.util.concurrent.TimeUnit +import scala.collection.JavaConverters._ + +import com.google.common.cache.CacheBuilder + +class ExpirationSet[T <: Object](expirationTimeMilis: Long) extends scala.collection.mutable.Set[T]{ + private val emptyItem = new Object() + + private var inner = CacheBuilder.newBuilder() + .expireAfterWrite(expirationTimeMilis, TimeUnit.MILLISECONDS) + .build[T, Object]() + + override def +=(elem: T): ExpirationSet.this.type = { + inner.put(elem, emptyItem) + this + } + + override def -=(elem: T): ExpirationSet.this.type = { + inner.invalidate(elem) + this + } + + override def contains(elem: T): Boolean = inner.asMap().containsKey(elem) + + override def iterator: Iterator[T] = inner.asMap().keySet().iterator().asScala +} diff --git a/discovery/src/main/scala/com.wavesplatform.discovery/collections/Pool.scala b/discovery/src/main/scala/com.wavesplatform.discovery/collections/Pool.scala new file mode 100644 index 00000000000..ecfd2197289 --- /dev/null +++ b/discovery/src/main/scala/com.wavesplatform.discovery/collections/Pool.scala @@ -0,0 +1,27 @@ +package com.wavesplatform.discovery.collections + +class Pool[T] { + private val queue = scala.collection.mutable.Queue.empty[T] + private val items = scala.collection.mutable.Set.empty[T] + + def add(item: T): Unit = { + if (!items.contains(item)) { + items.add(item) + queue.enqueue(item) + } + } + + def next(): Option[T] = { + if (queue.nonEmpty) { + val item = queue.dequeue() + queue.enqueue(item) + Some(item) + } + else None + } + + def remove(item: T): Unit = { + items.remove(item) + queue.dequeueFirst(i => i == item) + } +} diff --git a/discovery/src/main/scala/com.wavesplatform.discovery/network/ExceptionHandler.scala b/discovery/src/main/scala/com.wavesplatform.discovery/network/ExceptionHandler.scala new file mode 100644 index 00000000000..00a2bff5877 --- /dev/null +++ b/discovery/src/main/scala/com.wavesplatform.discovery/network/ExceptionHandler.scala @@ -0,0 +1,39 @@ +package com.wavesplatform.discovery.network + +import java.net.SocketAddress + +import io.netty.channel.{ChannelDuplexHandler, ChannelFuture, ChannelHandlerContext, ChannelPromise} + +import scala.concurrent.{Future, Promise} + +class ExceptionHandler extends ChannelDuplexHandler { + + private val p = Promise[Boolean]() + val closed: Future[Boolean] = p.future + + override def close(ctx: ChannelHandlerContext, promise: ChannelPromise): Unit = { + super.close(ctx, promise) + if (!p.isCompleted) + p.success(true) + } + + override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { + p.failure(cause) + } + + override def connect(ctx: ChannelHandlerContext, remoteAddress: SocketAddress, localAddress: SocketAddress, promise: ChannelPromise): Unit = { + ctx.connect(remoteAddress, localAddress, promise.addListener((future: ChannelFuture) => { + if (!future.isSuccess) { + p.failure(future.cause()) + } + })) + } + + override def write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise): Unit = { + ctx.write(msg, promise.addListener((future: ChannelFuture) => { + if (!future.isSuccess) { + p.failure(future.cause()) + } + })) + } +} diff --git a/discovery/src/main/scala/com.wavesplatform.discovery/network/HandshakeHandler.scala b/discovery/src/main/scala/com.wavesplatform.discovery/network/HandshakeHandler.scala new file mode 100644 index 00000000000..99b777a15f6 --- /dev/null +++ b/discovery/src/main/scala/com.wavesplatform.discovery/network/HandshakeHandler.scala @@ -0,0 +1,28 @@ +package com.wavesplatform.discovery.network + +import java.util + +import com.wavesplatform.Version +import com.wavesplatform.network.Handshake +import com.wavesplatform.settings.Constants +import io.netty.buffer.ByteBuf +import io.netty.channel.ChannelHandlerContext +import io.netty.handler.codec.ReplayingDecoder +import scorex.utils.ScorexLogging + +import scala.util.Random + +class HandshakeHandler(chainId: Char) extends ReplayingDecoder[Void] with ScorexLogging { + private val handshake = + Handshake(Constants.ApplicationName + chainId, Version.VersionTuple, + "discovery", new Random().nextLong(), None) + + override def decode(ctx: ChannelHandlerContext, in: ByteBuf, out: util.List[AnyRef]): Unit = { + out.add(Handshake.decode(in)) + ctx.pipeline().remove(this) + } + + override def channelActive(ctx: ChannelHandlerContext): Unit = { + ctx.writeAndFlush(handshake.encode(ctx.alloc().buffer())) + } +} diff --git a/discovery/src/main/scala/com.wavesplatform.discovery/network/MessageCodec.scala b/discovery/src/main/scala/com.wavesplatform.discovery/network/MessageCodec.scala new file mode 100644 index 00000000000..c3b3dd47425 --- /dev/null +++ b/discovery/src/main/scala/com.wavesplatform.discovery/network/MessageCodec.scala @@ -0,0 +1,31 @@ +package com.wavesplatform.discovery.network + +import java.util + +import com.wavesplatform.network.{BasicMessagesRepo, BlockForged, BlockMessageSpec, GetBlock, GetBlockSpec, GetPeers, GetPeersSpec, GetSignatures, GetSignaturesSpec, KnownPeers, LocalScoreChanged, Message, MicroBlockInv, MicroBlockInvMessageSpec, MicroBlockRequest, MicroBlockRequestMessageSpec, MicroBlockResponse, MicroBlockResponseMessageSpec, PeersSpec, RawBytes, ScoreMessageSpec, Signatures, SignaturesSpec} +import io.netty.channel.ChannelHandler.Sharable +import io.netty.channel.ChannelHandlerContext +import io.netty.handler.codec.MessageToMessageCodec +import scorex.network.message.MessageSpec +import scorex.utils.ScorexLogging + +import scala.util.{Failure, Success} + +@Sharable +class MessageCodec() extends MessageToMessageCodec[RawBytes, Message] with ScorexLogging { + + private val specs: Map[Byte, MessageSpec[_ <: AnyRef]] = BasicMessagesRepo.specs.map(s => s.messageCode -> s).toMap + + override def encode(ctx: ChannelHandlerContext, msg: Message, out: util.List[AnyRef]): Unit = msg match { + case GetPeers => out.add(RawBytes(GetPeersSpec.messageCode, Array[Byte]())) + case r: RawBytes => out.add(r) + case _ => + } + + override def decode(ctx: ChannelHandlerContext, msg: RawBytes, out: util.List[AnyRef]): Unit = { + specs(msg.code).deserializeData(msg.data) match { + case Success(x) => out.add(x) + case Failure(e) => log.error(e.getMessage) + } + } +} diff --git a/discovery/src/main/scala/com.wavesplatform.discovery/network/MessageHandler.scala b/discovery/src/main/scala/com.wavesplatform.discovery/network/MessageHandler.scala new file mode 100644 index 00000000000..1f9e0184182 --- /dev/null +++ b/discovery/src/main/scala/com.wavesplatform.discovery/network/MessageHandler.scala @@ -0,0 +1,9 @@ +package com.wavesplatform.discovery.network + +import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter} + +class MessageHandler(handler: PartialFunction[(Any, ChannelHandlerContext), Unit]) extends ChannelInboundHandlerAdapter{ + override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = { + handler((msg, ctx)) + } +} diff --git a/discovery/src/main/scala/com.wavesplatform.discovery/routers/SmallestMailboxWithThresholdRoutingLogic.scala b/discovery/src/main/scala/com.wavesplatform.discovery/routers/SmallestMailboxWithThresholdRoutingLogic.scala new file mode 100644 index 00000000000..6875ed182c2 --- /dev/null +++ b/discovery/src/main/scala/com.wavesplatform.discovery/routers/SmallestMailboxWithThresholdRoutingLogic.scala @@ -0,0 +1,18 @@ +package com.wavesplatform.discovery.routers + +import akka.actor.ActorRef +import akka.routing.{Routee, SmallestMailboxRoutingLogic} + +import scala.collection.immutable + +case class SmallestMailboxWithThresholdRoutingLogic(mailboxThreshold: Int) extends SmallestMailboxRoutingLogic { + override def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee = { + + if (routees.exists(numberOfMessages(_) < mailboxThreshold)) { + super.select(message, routees) + } + else { + (_: Any, _: ActorRef) => () + } + } +} diff --git a/discovery/src/main/web/app.css b/discovery/src/main/web/app.css new file mode 100644 index 00000000000..cdfc26e57a4 --- /dev/null +++ b/discovery/src/main/web/app.css @@ -0,0 +1,57 @@ +svg { + background-color: #FFF; + cursor: default; + -webkit-user-select: none; + -moz-user-select: none; + -ms-user-select: none; + -o-user-select: none; + user-select: none; +} + +svg:not(.active):not(.ctrl) { + cursor: crosshair; +} + +path.link { + fill: none; + stroke: #000; + stroke-width: 0.1px; + cursor: default; +} + +svg:not(.active):not(.ctrl) path.link { + cursor: pointer; +} + +path.link.selected { + stroke-dasharray: 10,2; +} + +path.link.dragline { + pointer-events: none; +} + +path.link.hidden { + stroke-width: 0; +} + +circle.node { + stroke-width: 1.5px; + cursor: pointer; +} + +circle.node.reflexive { + stroke: #000 !important; + stroke-width: 2.5px; +} + +text { + font: 6px sans-serif; + font-weight:lighter; + pointer-events: none; +} + +text.id { + text-anchor: middle; + font-weight: bold; +} \ No newline at end of file diff --git a/discovery/src/main/web/data.js b/discovery/src/main/web/data.js new file mode 100644 index 00000000000..5e9d0b08953 --- /dev/null +++ b/discovery/src/main/web/data.js @@ -0,0 +1,76 @@ +Array.prototype.except = function(arr) { + var self = this.slice() + return this.filter(x => !arr.includes(x)) +} + +function isEmpty(obj) { + for(var key in obj) { + if(obj.hasOwnProperty(key)) + return false; + } + return true; +} + +function PeersInfo() +{ + var peerSet = {} + var peerStats = {} + var peerHops = {} + + this.peers = function(peer) { + if(!peerSet[peer]) + return [] + + return peerSet[peer] + } + + + this.hopsByLevel = function(peerHops) { + byLevel = {} + for(hop in peerHops) { + var level = peerHops[hop] + + if(!byLevel[level]) + byLevel[level] = [] + + byLevel[level].push(hop) + } + + return byLevel + } + + this.addPeerWithPeers = function(peer, peers) { + peerSet[peer] = peers + } + + this.calculatePeerHops = function(peer, peerHops = {}, depth = 1, max = 1) { + + if(depth > max) + return peerHops + + if(isEmpty(peerHops)) { + peerHops = {} + this.peers(peer).forEach(hop => peerHops[hop] = depth) + } else { + for(hop in peerHops) { + if(peerHops[hop] == depth - 1) { + this.peers(hop).filter(p => peerHops[p] ? false : true).forEach(hop => peerHops[hop] = depth) + } + } + } + + return this.calculatePeerHops(peer, peerHops, depth + 1, max) + } + + this.calculateHops = function() { + var result = [] + for(peer in peerSet) { + var byLevel = this.hopsByLevel(this.calculatePeerHops(peer, 0, 1, 6)) + byLevel.peer = peer + result.push(byLevel) + } + + return result + } +} + diff --git a/discovery/src/main/web/graph.js b/discovery/src/main/web/graph.js new file mode 100644 index 00000000000..4da6e3c1053 --- /dev/null +++ b/discovery/src/main/web/graph.js @@ -0,0 +1,351 @@ +var width = 960, + height = 500, + colors = d3.scale.category10(); +var svg = d3.select('svg').attr('oncontextmenu', 'return false;').attr('width', width).attr('height', height); +var nodes = [], + lastNodeId = 0, + links = []; + +var nodesSet = {} +var linksSet = {} + +function addPeerWithPeers(peer, peers) { + addPeer(peer) + peers.forEach(function(p) { + addPeer(p) + addLink(peer,p) + }); +} + +function addPeer(peer) { + if(nodesSet[peer]) + return + + node = { + id: peer, + reflexive: false + }; + + nodesSet[peer] = node + nodes.push(node); +} + +function addLink(from, to) { + if(from == to) + return + + if(linksSet[from] && linksSet[from][to]) + return + + if(!linksSet[from]) + linksSet[from] = {} + + if(linksSet[to] && linksSet[to][from]) { + linksSet[to][from].left = true + } + else if(linksSet[from][to]) { + } + else { + link = { + source: nodesSet[from], + target: nodesSet[to], + left: false, + right: true + }; + links.push(link); + linksSet[from][to] = link + } +} + +var radius = 3 + +var force = d3.layout.force().nodes(nodes).links(links).size([width, height]).linkDistance(3).linkStrength(0.1).friction(0.1).theta(0.8).charge(-3).on('tick', tick) +svg.append('svg:defs').append('svg:marker').attr('id', 'end-arrow').attr('viewBox', '0 -5 10 10').attr('refX', 6).attr('markerWidth', 3).attr('markerHeight', 3).attr('orient', 'auto').append('svg:path').attr('d', 'M0,-5L10,0L0,5').attr('fill', '#000'); +svg.append('svg:defs').append('svg:marker').attr('id', 'start-arrow').attr('viewBox', '0 -5 10 10').attr('refX', 4).attr('markerWidth', 3).attr('markerHeight', 3).attr('orient', 'auto').append('svg:path').attr('d', 'M10,-5L0,0L10,5').attr('fill', '#000'); +var drag_line = svg.append('svg:path').attr('class', 'link dragline hidden').attr('d', 'M0,0L0,0'); + + var g = svg.append("g"); + +var x = d3.scale.linear() + .domain([0, width]) + .range([0, width]); + +var y = d3.scale.linear() + .domain([0, height]) + .range([height, 0]); + + var zoom = d3.behavior.zoom().x(x).y(y) + .scaleExtent([1, 50]) + .on("zoom", function() { + var e = d3.event, + tx = Math.min(0, Math.max(e.translate[0], width - width * e.scale)), + ty = Math.min(0, Math.max(e.translate[1], height - height * e.scale)); + zoom.translate([tx, ty]); + g.attr("transform", [ + "translate(" + [tx, ty] + ")", + "scale(" + e.scale + ")" + ].join(" ")); + + g.selectAll(".node").attr("transform", "scale(" + 1.0 / e.scale + ")"); + g.selectAll(".link").style("stroke-width", 0.1 / e.scale + "px"); + svg.selectAll("text").attr("transform", "scale(" + 1.0 / e.scale + ")"); + }); + svg.call(zoom); + +var path = g.selectAll('path'), + circle = g.selectAll('g'); +var selected_node = null, + selected_link = null, + mousedown_link = null, + mousedown_node = null, + mouseup_node = null; + +function resetMouseVars() { + mousedown_node = null; + mouseup_node = null; + mousedown_link = null; +} + +function tick() { + path.attr('d', function(d) { + var deltaX = d.target.x - d.source.x, + deltaY = d.target.y - d.source.y, + dist = Math.sqrt(deltaX * deltaX + deltaY * deltaY), + normX = deltaX / dist, + normY = deltaY / dist, + sourcePadding = 0, // d.left ? radius * 1.3 : radius, + targetPadding = 0 // d.right ? radius * 1.3 : radius, + sourceX = d.source.x + (sourcePadding * normX), + sourceY = d.source.y + (sourcePadding * normY), + targetX = d.target.x - (targetPadding * normX), + targetY = d.target.y - (targetPadding * normY); + return 'M' + sourceX + ',' + sourceY + 'L' + targetX + ',' + targetY; + }); + circle.attr('transform', function(d) { + return 'translate(' + d.x + ',' + d.y + ')'; + }); +} + +function restart() { + path = path.data(links); + path.classed('selected', function(d) { + return d === selected_link; + }).style('marker-start', function(d) { + return d.left ? 'url(#start-arrow)' : ''; + }).style('marker-end', function(d) { + return d.right ? 'url(#end-arrow)' : ''; + }); + path.enter().append('svg:path').attr('class', 'link').classed('selected', function(d) { + return d === selected_link; + }).style('marker-start', function(d) { + return d.left ? 'url(#start-arrow)' : ''; + }).style('marker-end', function(d) { + return d.right ? 'url(#end-arrow)' : ''; + }).on('mousedown', function(d) { + if (d3.event.ctrlKey) return; + mousedown_link = d; + if (mousedown_link === selected_link) selected_link = null; + else selected_link = mousedown_link; + selected_node = null; + restart(); + }); + path.exit().remove(); + circle = circle.data(nodes, function(d) { + return d.id; + }); + circle.selectAll('circle').style('fill', function(d) { + return (d === selected_node) ? d3.rgb(colors(d.id)).brighter().toString() : colors(d.id); + }).classed('reflexive', function(d) { + return d.reflexive; + }); + var g = circle.enter().append('svg:g'); + + + + g.append('svg:circle').attr('class', 'node').attr('r', radius).style('fill', function(d) { + return (d === selected_node) ? d3.rgb(colors(d.id)).brighter().toString() : colors(d.id); + }).style('stroke', function(d) { + return d3.rgb(colors(d.id)).darker().toString(); + }).classed('reflexive', function(d) { + return d.reflexive; + }).on('mouseover', function(d) { + if (!mousedown_node || d === mousedown_node) return; + d3.select(this).attr('transform', 'scale(1.1)'); + }).on('mouseout', function(d) { + if (!mousedown_node || d === mousedown_node) return; + d3.select(this).attr('transform', ''); + }).on('mousedown', function(d) { + if (d3.event.ctrlKey) return; + mousedown_node = d; + if (mousedown_node === selected_node) selected_node = null; + else selected_node = mousedown_node; + selected_link = null; + drag_line.style('marker-end', 'url(#end-arrow)').classed('hidden', false).attr('d', 'M' + mousedown_node.x + ',' + mousedown_node.y + 'L' + mousedown_node.x + ',' + mousedown_node.y); + restart(); + }).on('mouseup', function(d) { + if (!mousedown_node) return; + drag_line.classed('hidden', true).style('marker-end', ''); + mouseup_node = d; + if (mouseup_node === mousedown_node) { + resetMouseVars(); + return; + } + d3.select(this).attr('transform', ''); + var source, target, direction; + if (mousedown_node.id < mouseup_node.id) { + source = mousedown_node; + target = mouseup_node; + direction = 'right'; + } else { + source = mouseup_node; + target = mousedown_node; + direction = 'left'; + } + var link; + link = links.filter(function(l) { + return (l.source === source && l.target === target); + })[0]; + if (link) { + link[direction] = true; + } else { + link = { + source: source, + target: target, + left: false, + right: false + }; + link[direction] = true; + links.push(link); + } + selected_link = link; + selected_node = null; + restart(); + }); + g.append('svg:text').attr('x', 0).attr('y', -radius * 1.8).attr('class', 'id').text(function(d) { + return d.id; + }); + circle.exit().remove(); + force.start(); + //for (var i = 0; i < 100; ++i) force.tick(); +} +Array.prototype.remByVal = function(val) { + for (var i = 0; i < this.length; i++) { + if (this[i] === val) { + this.splice(i, 1); + i--; + } + } + return this; +} +//Call like + +function mousedown() { + +//var foo = []; +//var n = 70 +//for (var i = 1; i <= n; i++) { +// foo.push(i); +//} +// +//for (var j = 1; j <= n; j++) { +// addPeerWithPeers(j, foo.slice()) +//} +// +//restart() + +return + svg.classed('active', true); + if (d3.event.ctrlKey || mousedown_node || mousedown_link) return; + var point = d3.mouse(this), + node = { + id: ++lastNodeId, + reflexive: false + }; + node.x = point[0]; + node.y = point[1]; + nodes.push(node); + restart(); +} + +function mousemove() { + if (!mousedown_node) return; + drag_line.attr('d', 'M' + mousedown_node.x + ',' + mousedown_node.y + 'L' + d3.mouse(this)[0] + ',' + d3.mouse(this)[1]); + restart(); +} + +function mouseup() { + if (mousedown_node) { + drag_line.classed('hidden', true).style('marker-end', ''); + } + svg.classed('active', false); + resetMouseVars(); +} + +function spliceLinksForNode(node) { + var toSplice = links.filter(function(l) { + return (l.source === node || l.target === node); + }); + toSplice.map(function(l) { + links.splice(links.indexOf(l), 1); + }); +} +var lastKeyDown = -1; + +function keydown() { + d3.event.preventDefault(); + if (lastKeyDown !== -1) return; + lastKeyDown = d3.event.keyCode; + if (d3.event.keyCode === 17) { + circle.call(force.drag); + svg.classed('ctrl', true); + } + if (!selected_node && !selected_link) return; + switch (d3.event.keyCode) { + case 8: + case 46: + if (selected_node) { + nodes.splice(nodes.indexOf(selected_node), 1); + spliceLinksForNode(selected_node); + } else if (selected_link) { + links.splice(links.indexOf(selected_link), 1); + } + selected_link = null; + selected_node = null; + restart(); + break; + case 66: + if (selected_link) { + selected_link.left = true; + selected_link.right = true; + } + restart(); + break; + case 76: + if (selected_link) { + selected_link.left = true; + selected_link.right = false; + } + restart(); + break; + case 82: + if (selected_node) { + selected_node.reflexive = !selected_node.reflexive; + } else if (selected_link) { + selected_link.left = false; + selected_link.right = true; + } + restart(); + break; + } +} + +function keyup() { + lastKeyDown = -1; + if (d3.event.keyCode === 17) { + circle.on('mousedown.drag', null).on('touchstart.drag', null); + svg.classed('ctrl', false); + } +} +svg.on('mousedown', mousedown).on('mousemove', mousemove).on('mouseup', mouseup); +d3.select(window).on('keydown', keydown).on('keyup', keyup); +restart(); \ No newline at end of file diff --git a/discovery/src/main/web/index.html b/discovery/src/main/web/index.html new file mode 100644 index 00000000000..85ef1fc81cb --- /dev/null +++ b/discovery/src/main/web/index.html @@ -0,0 +1,39 @@ + + +
+ + + + + + + +