Skip to content

Commit

Permalink
Merge pull request #748 from wavesplatform/NODE-414-status-rest-api
Browse files Browse the repository at this point in the history
NODE-414 Extend API for /node/status + make it available at start
  • Loading branch information
petermz authored Dec 15, 2017
2 parents 28b7cb0 + a3b9e7f commit 20df004
Show file tree
Hide file tree
Showing 13 changed files with 153 additions and 50 deletions.
6 changes: 2 additions & 4 deletions src/it/scala/com/wavesplatform/it/Docker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class Docker(suiteConfig: Config = ConfigFactory.empty,
val all = nodeConfigs.map(startNodeInternal)
Await.result(
for {
_ <- Future.traverse(all)(waitNodeIsUp)
_ <- Future.traverse(all)(_.waitForStartup())
_ <- Future.traverse(all)(connectToAll)
} yield (),
5.minutes
Expand All @@ -113,7 +113,7 @@ class Docker(suiteConfig: Config = ConfigFactory.empty,
def startNode(nodeConfig: Config, autoConnect: Boolean = true): Node = {
val node = startNodeInternal(nodeConfig)
Await.result(
waitNodeIsUp(node).flatMap(_ => if (autoConnect) connectToAll(node) else Future.successful(())),
node.waitForStartup().flatMap(_ => if (autoConnect) connectToAll(node) else Future.successful(())),
3.minutes
)
node
Expand Down Expand Up @@ -147,8 +147,6 @@ class Docker(suiteConfig: Config = ConfigFactory.empty,
.map(_ => ())
}

private def waitNodeIsUp(node: Node): Future[Unit] = node.waitFor[Int]("node is up")(_.height, _ >= 0, 1.second).map(_ => ())

private def startNodeInternal(nodeConfig: Config): Node = try {
val javaOptions = Option(System.getenv("CONTAINER_JAVA_OPTS")).getOrElse("")
val configOverrides = s"$javaOptions ${renderProperties(asProperties(nodeConfig.withFallback(suiteConfig)))} " +
Expand Down
36 changes: 36 additions & 0 deletions src/it/scala/com/wavesplatform/it/NodeApiTestSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.wavesplatform.it

import com.wavesplatform.it.NodeApiTestSuite._
import org.scalatest.{BeforeAndAfterAll, FreeSpec, Matchers}
import scorex.utils.ScorexLogging

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent._
import scala.concurrent.duration.DurationInt

class NodeApiTestSuite extends FreeSpec with Matchers with BeforeAndAfterAll with ScorexLogging {

private lazy val docker = Docker(getClass)

"/node/status should report status" in {
val node = docker.startNode(NodeConfig)
val f = for {
status <- node.status
_ = log.trace(s"#### $status")
_ = assert(status.blockchainHeight >= status.stateHeight)
} yield succeed

Await.ready(f, 2.minute)
}

override protected def afterAll(): Unit = {
super.afterAll()
docker.close()
}
}

private object NodeApiTestSuite {

private val NodeConfig = NodeConfigs.newBuilder.withDefault(1).build().head

}
17 changes: 15 additions & 2 deletions src/it/scala/com/wavesplatform/it/api/NodeApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.wavesplatform.it.util._
import com.wavesplatform.matcher.api.CancelOrderRequest
import com.wavesplatform.state2.{ByteStr, Portfolio}
import io.netty.util.{HashedWheelTimer, Timer}
import org.asynchttpclient.Dsl.{get => _get, post => _post}
import org.asynchttpclient.Dsl.{asyncHttpClient, get => _get, post => _post}
import org.asynchttpclient._
import org.asynchttpclient.util.HttpConstants
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -117,6 +117,19 @@ trait NodeApi {

def connect(host: String, port: Int): Future[Unit] = postJson("/peers/connect", ConnectReq(host, port)).map(_ => ())

def waitForStartup(): Future[Option[Response]] = {
def send(n: NodeApi) = asyncHttpClient()
.executeRequest(_get(s"http://$restAddress:$nodeRestPort/blocks/height")).toCompletableFuture.toScala
.map(Option(_))
.recoverWith {
case e@(_: IOException | _: TimeoutException) => Future(None)
}
def cond(ropt: Option[Response]) = ropt.exists { r =>
r.getStatusCode == HttpConstants.ResponseStatusCodes.OK_200 && (Json.parse(r.getResponseBody) \ "height").as[Int] > 0
}
waitFor("node is up")(send, cond, 1.second)
}

def waitForPeers(targetPeersCount: Int): Future[Seq[Peer]] = waitFor[Seq[Peer]](s"connectedPeers.size >= $targetPeersCount")(_.connectedPeers, _.size >= targetPeersCount, 1.second)

def height: Future[Int] = get("/blocks/height").as[JsValue].map(v => (v \ "height").as[Int])
Expand Down Expand Up @@ -325,7 +338,7 @@ object NodeApi extends ScorexLogging {
case class UnexpectedStatusCodeException(request: Request, response: Response) extends Exception(s"Request: ${request.getUrl}\n" +
s"Unexpected status code (${response.getStatusCode}): ${response.getResponseBody}")

case class Status(blockGeneratorStatus: Option[String], historySynchronizationStatus: Option[String])
case class Status(blockchainHeight: Int, stateHeight: Int, updatedTimestamp: Long, updatedDate: String)

implicit val statusFormat: Format[Status] = Json.format

Expand Down
36 changes: 25 additions & 11 deletions src/main/scala/com/wavesplatform/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.wavesplatform.metrics.Metrics
import com.wavesplatform.mining.{Miner, MinerImpl}
import com.wavesplatform.network._
import com.wavesplatform.settings._
import com.wavesplatform.state2.StateWriter
import com.wavesplatform.state2.appender.{BlockAppender, CheckpointAppender, ExtensionAppender, MicroblockAppender}
import com.wavesplatform.utils.{SystemInformationReporter, forceStopApplication}
import io.netty.channel.Channel
Expand Down Expand Up @@ -51,8 +52,23 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con

private val LocalScoreBroadcastDebounce = 1.second

// Start /node API right away
private implicit val as: ActorSystem = actorSystem
private implicit val materializer: ActorMaterializer = ActorMaterializer()

private val nodeApiLauncher = (bcu: BlockchainUpdater, state: StateWriter) =>
Option(settings.restAPISettings.enable).collect { case true =>
val tags = Seq(typeOf[NodeApiRoute])
val routes = Seq(NodeApiRoute(settings.restAPISettings, bcu, state, () => this.shutdown()))
val combinedRoute: Route = CompositeHttpService(actorSystem, tags, routes, settings.restAPISettings).compositeRoute
val httpFuture = Http().bindAndHandle(combinedRoute, settings.restAPISettings.bindAddress, settings.restAPISettings.port)
serverBinding = Await.result(httpFuture, 10.seconds)
log.info(s"Node REST API was bound on ${settings.restAPISettings.bindAddress}:${settings.restAPISettings.port}")
(tags, routes)
}

private val checkpointService = new CheckpointServiceImpl(settings.blockchainSettings.checkpointFile, settings.checkpointsSettings)
private val (history, featureProvider, stateWriter, stateReader, blockchainUpdater, blockchainDebugInfo) = StorageFactory(settings).get
private val (history, featureProvider, stateWriter, stateReader, blockchainUpdater, blockchainDebugInfo, nodeApi) = StorageFactory(settings, nodeApiLauncher).get
private lazy val upnp = new UPnP(settings.networkSettings.uPnPSettings) // don't initialize unless enabled
private val wallet: Wallet = Wallet(settings.walletSettings)
private val peerDatabase = new PeerDatabaseImpl(settings.networkSettings)
Expand Down Expand Up @@ -109,11 +125,9 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
upnp.addPort(addr.getPort)
}

implicit val as: ActorSystem = actorSystem
implicit val materializer: ActorMaterializer = ActorMaterializer()

if (settings.restAPISettings.enable) {
val apiRoutes = Seq(
// Start complete REST API. Node API is already running, so we need to re-bind
nodeApi.foreach { case (tags, routes) =>
val apiRoutes = routes ++ Seq(
BlocksApiRoute(settings.restAPISettings, history, blockchainUpdater, allChannels, c => processCheckpoint(None, c)),
TransactionsApiRoute(settings.restAPISettings, stateReader, history, utxStorage),
NxtConsensusApiRoute(settings.restAPISettings, stateReader, history, settings.blockchainSettings.functionalitySettings),
Expand All @@ -125,7 +139,6 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
DebugApiRoute(settings.restAPISettings, wallet, stateReader, history, peerDatabase, establishedConnections, blockchainUpdater, allChannels, utxStorage, blockchainDebugInfo, miner, configRoot),
WavesApiRoute(settings.restAPISettings, wallet, utxStorage, allChannels, time),
AssetsApiRoute(settings.restAPISettings, wallet, utxStorage, allChannels, stateReader, time),
NodeApiRoute(settings.restAPISettings, () => this.shutdown()),
ActivationApiRoute(settings.restAPISettings, settings.blockchainSettings.functionalitySettings, settings.featuresSettings, history, featureProvider),
AssetsBroadcastApiRoute(settings.restAPISettings, utxStorage, allChannels),
LeaseApiRoute(settings.restAPISettings, wallet, utxStorage, allChannels, time),
Expand All @@ -134,7 +147,7 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
AliasBroadcastApiRoute(settings.restAPISettings, utxStorage, allChannels)
)

val apiTypes = Seq(
val apiTypes = tags ++ Seq(
typeOf[BlocksApiRoute],
typeOf[TransactionsApiRoute],
typeOf[NxtConsensusApiRoute],
Expand All @@ -146,7 +159,6 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
typeOf[DebugApiRoute],
typeOf[WavesApiRoute],
typeOf[AssetsApiRoute],
typeOf[NodeApiRoute],
typeOf[ActivationApiRoute],
typeOf[AssetsBroadcastApiRoute],
typeOf[LeaseApiRoute],
Expand All @@ -155,8 +167,10 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
typeOf[AliasBroadcastApiRoute]
)
val combinedRoute: Route = CompositeHttpService(actorSystem, apiTypes, apiRoutes, settings.restAPISettings).compositeRoute
val httpFuture = Http().bindAndHandle(combinedRoute, settings.restAPISettings.bindAddress, settings.restAPISettings.port)
serverBinding = Await.result(httpFuture, 10.seconds)
val httpFuture = serverBinding.unbind().flatMap { _ =>
Http().bindAndHandle(combinedRoute, settings.restAPISettings.bindAddress, settings.restAPISettings.port)
}
serverBinding = Await.result(httpFuture, 20.seconds)
log.info(s"REST API was bound on ${settings.restAPISettings.bindAddress}:${settings.restAPISettings.port}")
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/wavesplatform/Importer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object Importer extends ScorexLogging {
case Success(inputStream) =>
deleteFile(settings.blockchainSettings.blockchainFile)
deleteFile(settings.blockchainSettings.stateFile)
val (history, _, stateWriter, _, blockchainUpdater, _) = StorageFactory(settings).get
val (history, _, stateWriter, _, blockchainUpdater, _, _) = StorageFactory(settings).get
checkGenesis(history, settings, blockchainUpdater)
val bis = new BufferedInputStream(inputStream)
var quit = false
Expand Down
12 changes: 9 additions & 3 deletions src/main/scala/com/wavesplatform/history/StorageFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,22 @@ object StorageFactory {
}
}

def apply(settings: WavesSettings, time: scorex.utils.Time = NTP): Try[(NgHistory with DebugNgHistory with AutoCloseable, FeatureProvider, AutoCloseable, StateReader, BlockchainUpdater, BlockchainDebugInfo)] = {
def apply[T](settings: WavesSettings,
beforeStateUpdate: (BlockchainUpdater, StateWriter) => T = (_: BlockchainUpdater, _: StateWriter) => (),
time: scorex.utils.Time = NTP):
Try[(NgHistory with DebugNgHistory with AutoCloseable, FeatureProvider, AutoCloseable, StateReader, BlockchainUpdater, BlockchainDebugInfo, T)] =
{
val lock = new RWL(true)
for {
historyWriter <- HistoryWriterImpl(settings.blockchainSettings.blockchainFile, lock, settings.blockchainSettings.functionalitySettings, settings.featuresSettings)
ss <- createStateStorage(historyWriter, settings.blockchainSettings.stateFile, settings.mvstorePageSplitSize)
stateWriter = new StateWriterImpl(ss, settings.blockchainSettings.storeTransactionsInState, lock)
bcu = BlockchainUpdaterImpl(stateWriter, historyWriter, settings, time, lock)
callbackResult = beforeStateUpdate(bcu, stateWriter)
} yield {
val bcu = BlockchainUpdaterImpl(stateWriter, historyWriter, settings, time, lock)
bcu.syncPersistedAndInMemory()
val history: NgHistory with DebugNgHistory with FeatureProvider = bcu.historyReader
(history, history, stateWriter, bcu.bestLiquidState, bcu, bcu)
(history, history, stateWriter, bcu.bestLiquidState, bcu, bcu, callbackResult)
}
}
}
21 changes: 19 additions & 2 deletions src/main/scala/com/wavesplatform/http/NodeApiRoute.scala
Original file line number Diff line number Diff line change
@@ -1,20 +1,29 @@
package com.wavesplatform.http

import java.time.Instant
import javax.ws.rs.Path

import akka.http.scaladsl.server.Route
import com.wavesplatform.Shutdownable
import com.wavesplatform.network.lastObserved
import com.wavesplatform.settings.{Constants, RestAPISettings}
import com.wavesplatform.state2.StateWriter
import io.swagger.annotations._
import monix.eval.Coeval
import monix.execution.Scheduler.Implicits.global
import play.api.libs.json.Json
import scorex.api.http.{ApiRoute, CommonApiFunctions}
import scorex.transaction.{BlockchainUpdater, LastBlockInfo}
import scorex.utils.ScorexLogging

@Path("/node")
@Api(value = "node")
case class NodeApiRoute(settings: RestAPISettings, application: Shutdownable)
case class NodeApiRoute(settings: RestAPISettings, blockchainUpdater: BlockchainUpdater, state: StateWriter, application: Shutdownable)
extends ApiRoute with CommonApiFunctions with ScorexLogging {

private val lastHeight: Coeval[Option[LastBlockInfo]] = lastObserved(blockchainUpdater.lastBlockInfo)
private val lastState: Coeval[Option[StateWriter.Status]] = lastObserved(state.status)

override lazy val route = pathPrefix("node") {
stop ~ status ~ version
}
Expand All @@ -39,6 +48,14 @@ case class NodeApiRoute(settings: RestAPISettings, application: Shutdownable)
@Path("/status")
@ApiOperation(value = "Status", notes = "Get status of the running core", httpMethod = "GET")
def status: Route = (get & path("status")) {
complete(Json.obj())
val (bcHeight, bcTime) = lastHeight().map { case LastBlockInfo(_, h, _, _, t) => (h, t) }.getOrElse((0, 0L))
val (stHeight, stTime) = lastState().map { case StateWriter.Status(h, t) => (h, t) }.getOrElse((0, 0L))
val lastUpdated = bcTime max stTime
complete(Json.obj(
"blockchainHeight" -> bcHeight,
"stateHeight" -> stHeight,
"updatedTimestamp" -> lastUpdated,
"updatedDate" -> Instant.ofEpochMilli(lastUpdated).toString
))
}
}
27 changes: 13 additions & 14 deletions src/main/scala/com/wavesplatform/state2/BlockchainUpdaterImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with SnapshotStateRea

// Store last block information in a cache
historyReader.lastBlock.foreach { b =>
internalLastBlockInfo.onNext(LastBlockInfo(b.uniqueId, historyReader.height(), historyReader.score(), blockchainReady))
internalLastBlockInfo.onNext(LastBlockInfo(b.uniqueId, historyReader.height(), historyReader.score(), blockchainReady, System.currentTimeMillis))
}

private def syncPersistedAndInMemory(): Unit = write { implicit l =>
private[wavesplatform] def syncPersistedAndInMemory(): Unit = write { implicit l =>
log.info(heights("State rebuild started"))

val notPersisted = historyWriter.height() - persisted.height
Expand Down Expand Up @@ -187,12 +187,13 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with SnapshotStateRea
}
}
}).map {
_ map { case ((newBlockDiff, discacrded)) =>
_ map { case ((newBlockDiff, discarded)) =>
val height = historyWriter.height() + 1
ngState.set(Some(new NgState(block, newBlockDiff, featuresApprovedWithBlock(block))))
historyReader.lastBlockId().foreach(id => internalLastBlockInfo.onNext(LastBlockInfo(id, historyReader.height(), historyReader.score(), blockchainReady)))
historyReader.lastBlockId().foreach(id =>
internalLastBlockInfo.onNext(LastBlockInfo(id, historyReader.height(), historyReader.score(), blockchainReady, System.currentTimeMillis)))
log.info(s"$block appended. New height: $height)")
discacrded
discarded
}
})
}
Expand Down Expand Up @@ -241,7 +242,8 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with SnapshotStateRea
}

val totalDiscardedBlocks: Seq[Block] = discardedHistoryBlocks ++ discardedNgBlock.toSeq
if (totalDiscardedBlocks.nonEmpty) internalLastBlockInfo.onNext(LastBlockInfo(blockId, historyReader.height(), historyReader.score(), blockchainReady))
if (totalDiscardedBlocks.nonEmpty) internalLastBlockInfo.onNext(
LastBlockInfo(blockId, historyReader.height(), historyReader.score(), blockchainReady, System.currentTimeMillis))
TxsInBlockchainStats.record(-totalDiscardedBlocks.size)
Right(totalDiscardedBlocks)
}
Expand Down Expand Up @@ -270,8 +272,9 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with SnapshotStateRea
historyWriter.lastBlock.map(_.timestamp), microBlock, ng.base.timestamp)
} yield {
log.info(s"$microBlock appended")
ng.append(microBlock, diff, System.currentTimeMillis())
internalLastBlockInfo.onNext(LastBlockInfo(microBlock.totalResBlockSig, historyReader.height(), historyReader.score(), ready = true))
val now = System.currentTimeMillis
ng.append(microBlock, diff, now)
internalLastBlockInfo.onNext(LastBlockInfo(microBlock.totalResBlockSig, historyReader.height(), historyReader.score(), ready = true, now))
}
}
}
Expand Down Expand Up @@ -300,12 +303,8 @@ object BlockchainUpdaterImpl extends ScorexLogging {
history: HistoryWriterImpl,
settings: WavesSettings,
time: scorex.utils.Time,
synchronizationToken: ReentrantReadWriteLock): BlockchainUpdaterImpl = {
val blockchainUpdater = new BlockchainUpdaterImpl(persistedState, settings, time, history, history, synchronizationToken)
log.info(blockchainUpdater.heights("Constructing BlockchainUpdaterImpl"))
blockchainUpdater.syncPersistedAndInMemory()
blockchainUpdater
}
synchronizationToken: ReentrantReadWriteLock): BlockchainUpdaterImpl =
new BlockchainUpdaterImpl(persistedState, settings, time, history, history, synchronizationToken)

def areVersionsOfSameBlock(b1: Block, b2: Block): Boolean =
b1.signerData.generator == b2.signerData.generator &&
Expand Down
24 changes: 22 additions & 2 deletions src/main/scala/com/wavesplatform/state2/StateWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,37 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import cats.Monoid
import cats.implicits._
import com.wavesplatform.metrics.Instrumented
import com.wavesplatform.state2.StateWriter.Status
import com.wavesplatform.state2.reader.StateReaderImpl
import monix.reactive.Observable
import monix.reactive.subjects.BehaviorSubject
import scorex.transaction.PaymentTransaction
import scorex.transaction.assets.TransferTransaction
import scorex.transaction.assets.exchange.ExchangeTransaction
import scorex.utils.ScorexLogging
import scorex.utils.Synchronized.ReadLock

trait StateWriter {
def applyBlockDiff(blockDiff: BlockDiff): Unit

def clear(): Unit

def status: Observable[Status]
}

object StateWriter {
case class Status(height: Int, lastUpdated: Long)
}

class StateWriterImpl(p: StateStorage, storeTransactions: Boolean, synchronizationToken: ReentrantReadWriteLock)
extends StateReaderImpl(p, synchronizationToken) with StateWriter with AutoCloseable with ScorexLogging with Instrumented {

import StateStorage._

override val status = read { implicit l =>
BehaviorSubject(Status(sp().getHeight, System.currentTimeMillis))
}

override def close(): Unit = p.close()

override def applyBlockDiff(blockDiff: BlockDiff): Unit = write { implicit l =>
Expand Down Expand Up @@ -106,7 +120,8 @@ class StateWriterImpl(p: StateStorage, storeTransactions: Boolean, synchronizati
measureSizeLog("lease info")(blockDiff.txsDiff.leaseState)(
_.foreach { case (id, isActive) => sp().leaseState.put(id, isActive) })

sp().setHeight(newHeight)
setHeight(newHeight)

val nextChunkOfBlocks = !sameQuotient(newHeight, oldHeight, 1000)
sp().commit(nextChunkOfBlocks)
log.info(s"BlockDiff commit complete. Persisted height = $newHeight")
Expand All @@ -125,7 +140,12 @@ class StateWriterImpl(p: StateStorage, storeTransactions: Boolean, synchronizati
sp().aliasToAddress.clear()
sp().leaseState.clear()
sp().lastBalanceSnapshotHeight.clear()
sp().setHeight(0)
setHeight(0)
sp().commit(compact = true)
}

private def setHeight(newHeight: Int)(implicit lock: ReadLock) = {
sp().setHeight(newHeight)
status.onNext(Status(newHeight, System.currentTimeMillis))
}
}
Loading

0 comments on commit 20df004

Please sign in to comment.