From a3b9e7f75d89708323715dad70a9caf51d439eba Mon Sep 17 00:00:00 2001 From: peterz Date: Wed, 13 Dec 2017 15:40:33 +0300 Subject: [PATCH] NODE-414 Extend API for /node/status + make it available at start --- .../scala/com/wavesplatform/it/Docker.scala | 6 ++-- .../wavesplatform/it/NodeApiTestSuite.scala | 36 +++++++++++++++++++ .../com/wavesplatform/it/api/NodeApi.scala | 17 +++++++-- .../scala/com/wavesplatform/Application.scala | 36 +++++++++++++------ .../scala/com/wavesplatform/Importer.scala | 2 +- .../history/StorageFactory.scala | 12 +++++-- .../com/wavesplatform/http/NodeApiRoute.scala | 21 +++++++++-- .../state2/BlockchainUpdaterImpl.scala | 27 +++++++------- .../wavesplatform/state2/StateWriter.scala | 24 +++++++++++-- .../transaction/BlockchainUpdater.scala | 2 +- .../wavesplatform/UtxPoolSpecification.scala | 2 +- .../com/wavesplatform/history/package.scala | 2 +- .../transaction/BlockchainUpdaterTest.scala | 16 ++++----- 13 files changed, 153 insertions(+), 50 deletions(-) create mode 100644 src/it/scala/com/wavesplatform/it/NodeApiTestSuite.scala diff --git a/src/it/scala/com/wavesplatform/it/Docker.scala b/src/it/scala/com/wavesplatform/it/Docker.scala index 9650ec97e68..dd92ea15801 100644 --- a/src/it/scala/com/wavesplatform/it/Docker.scala +++ b/src/it/scala/com/wavesplatform/it/Docker.scala @@ -102,7 +102,7 @@ class Docker(suiteConfig: Config = ConfigFactory.empty, val all = nodeConfigs.map(startNodeInternal) Await.result( for { - _ <- Future.traverse(all)(waitNodeIsUp) + _ <- Future.traverse(all)(_.waitForStartup()) _ <- Future.traverse(all)(connectToAll) } yield (), 5.minutes @@ -113,7 +113,7 @@ class Docker(suiteConfig: Config = ConfigFactory.empty, def startNode(nodeConfig: Config, autoConnect: Boolean = true): Node = { val node = startNodeInternal(nodeConfig) Await.result( - waitNodeIsUp(node).flatMap(_ => if (autoConnect) connectToAll(node) else Future.successful(())), + node.waitForStartup().flatMap(_ => if (autoConnect) connectToAll(node) else Future.successful(())), 3.minutes ) node @@ -147,8 +147,6 @@ class Docker(suiteConfig: Config = ConfigFactory.empty, .map(_ => ()) } - private def waitNodeIsUp(node: Node): Future[Unit] = node.waitFor[Int]("node is up")(_.height, _ >= 0, 1.second).map(_ => ()) - private def startNodeInternal(nodeConfig: Config): Node = try { val javaOptions = Option(System.getenv("CONTAINER_JAVA_OPTS")).getOrElse("") val configOverrides = s"$javaOptions ${renderProperties(asProperties(nodeConfig.withFallback(suiteConfig)))} " + diff --git a/src/it/scala/com/wavesplatform/it/NodeApiTestSuite.scala b/src/it/scala/com/wavesplatform/it/NodeApiTestSuite.scala new file mode 100644 index 00000000000..b6921635e4f --- /dev/null +++ b/src/it/scala/com/wavesplatform/it/NodeApiTestSuite.scala @@ -0,0 +1,36 @@ +package com.wavesplatform.it + +import com.wavesplatform.it.NodeApiTestSuite._ +import org.scalatest.{BeforeAndAfterAll, FreeSpec, Matchers} +import scorex.utils.ScorexLogging + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent._ +import scala.concurrent.duration.DurationInt + +class NodeApiTestSuite extends FreeSpec with Matchers with BeforeAndAfterAll with ScorexLogging { + + private lazy val docker = Docker(getClass) + + "/node/status should report status" in { + val node = docker.startNode(NodeConfig) + val f = for { + status <- node.status + _ = log.trace(s"#### $status") + _ = assert(status.blockchainHeight >= status.stateHeight) + } yield succeed + + Await.ready(f, 2.minute) + } + + override protected def afterAll(): Unit = { + super.afterAll() + docker.close() + } +} + +private object NodeApiTestSuite { + + private val NodeConfig = NodeConfigs.newBuilder.withDefault(1).build().head + +} diff --git a/src/it/scala/com/wavesplatform/it/api/NodeApi.scala b/src/it/scala/com/wavesplatform/it/api/NodeApi.scala index e8332dd9171..44f0dcf8b83 100644 --- a/src/it/scala/com/wavesplatform/it/api/NodeApi.scala +++ b/src/it/scala/com/wavesplatform/it/api/NodeApi.scala @@ -8,7 +8,7 @@ import com.wavesplatform.it.util._ import com.wavesplatform.matcher.api.CancelOrderRequest import com.wavesplatform.state2.{ByteStr, Portfolio} import io.netty.util.{HashedWheelTimer, Timer} -import org.asynchttpclient.Dsl.{get => _get, post => _post} +import org.asynchttpclient.Dsl.{asyncHttpClient, get => _get, post => _post} import org.asynchttpclient._ import org.asynchttpclient.util.HttpConstants import org.slf4j.LoggerFactory @@ -117,6 +117,19 @@ trait NodeApi { def connect(host: String, port: Int): Future[Unit] = postJson("/peers/connect", ConnectReq(host, port)).map(_ => ()) + def waitForStartup(): Future[Option[Response]] = { + def send(n: NodeApi) = asyncHttpClient() + .executeRequest(_get(s"http://$restAddress:$nodeRestPort/blocks/height")).toCompletableFuture.toScala + .map(Option(_)) + .recoverWith { + case e@(_: IOException | _: TimeoutException) => Future(None) + } + def cond(ropt: Option[Response]) = ropt.exists { r => + r.getStatusCode == HttpConstants.ResponseStatusCodes.OK_200 && (Json.parse(r.getResponseBody) \ "height").as[Int] > 0 + } + waitFor("node is up")(send, cond, 1.second) + } + def waitForPeers(targetPeersCount: Int): Future[Seq[Peer]] = waitFor[Seq[Peer]](s"connectedPeers.size >= $targetPeersCount")(_.connectedPeers, _.size >= targetPeersCount, 1.second) def height: Future[Int] = get("/blocks/height").as[JsValue].map(v => (v \ "height").as[Int]) @@ -325,7 +338,7 @@ object NodeApi extends ScorexLogging { case class UnexpectedStatusCodeException(request: Request, response: Response) extends Exception(s"Request: ${request.getUrl}\n" + s"Unexpected status code (${response.getStatusCode}): ${response.getResponseBody}") - case class Status(blockGeneratorStatus: Option[String], historySynchronizationStatus: Option[String]) + case class Status(blockchainHeight: Int, stateHeight: Int, updatedTimestamp: Long, updatedDate: String) implicit val statusFormat: Format[Status] = Json.format diff --git a/src/main/scala/com/wavesplatform/Application.scala b/src/main/scala/com/wavesplatform/Application.scala index b70405d9775..bfa3378fde5 100644 --- a/src/main/scala/com/wavesplatform/Application.scala +++ b/src/main/scala/com/wavesplatform/Application.scala @@ -20,6 +20,7 @@ import com.wavesplatform.metrics.Metrics import com.wavesplatform.mining.{Miner, MinerImpl} import com.wavesplatform.network._ import com.wavesplatform.settings._ +import com.wavesplatform.state2.StateWriter import com.wavesplatform.state2.appender.{BlockAppender, CheckpointAppender, ExtensionAppender, MicroblockAppender} import com.wavesplatform.utils.{SystemInformationReporter, forceStopApplication} import io.netty.channel.Channel @@ -51,8 +52,23 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con private val LocalScoreBroadcastDebounce = 1.second + // Start /node API right away + private implicit val as: ActorSystem = actorSystem + private implicit val materializer: ActorMaterializer = ActorMaterializer() + + private val nodeApiLauncher = (bcu: BlockchainUpdater, state: StateWriter) => + Option(settings.restAPISettings.enable).collect { case true => + val tags = Seq(typeOf[NodeApiRoute]) + val routes = Seq(NodeApiRoute(settings.restAPISettings, bcu, state, () => this.shutdown())) + val combinedRoute: Route = CompositeHttpService(actorSystem, tags, routes, settings.restAPISettings).compositeRoute + val httpFuture = Http().bindAndHandle(combinedRoute, settings.restAPISettings.bindAddress, settings.restAPISettings.port) + serverBinding = Await.result(httpFuture, 10.seconds) + log.info(s"Node REST API was bound on ${settings.restAPISettings.bindAddress}:${settings.restAPISettings.port}") + (tags, routes) + } + private val checkpointService = new CheckpointServiceImpl(settings.blockchainSettings.checkpointFile, settings.checkpointsSettings) - private val (history, featureProvider, stateWriter, stateReader, blockchainUpdater, blockchainDebugInfo) = StorageFactory(settings).get + private val (history, featureProvider, stateWriter, stateReader, blockchainUpdater, blockchainDebugInfo, nodeApi) = StorageFactory(settings, nodeApiLauncher).get private lazy val upnp = new UPnP(settings.networkSettings.uPnPSettings) // don't initialize unless enabled private val wallet: Wallet = Wallet(settings.walletSettings) private val peerDatabase = new PeerDatabaseImpl(settings.networkSettings) @@ -109,11 +125,9 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con upnp.addPort(addr.getPort) } - implicit val as: ActorSystem = actorSystem - implicit val materializer: ActorMaterializer = ActorMaterializer() - - if (settings.restAPISettings.enable) { - val apiRoutes = Seq( + // Start complete REST API. Node API is already running, so we need to re-bind + nodeApi.foreach { case (tags, routes) => + val apiRoutes = routes ++ Seq( BlocksApiRoute(settings.restAPISettings, history, blockchainUpdater, allChannels, c => processCheckpoint(None, c)), TransactionsApiRoute(settings.restAPISettings, stateReader, history, utxStorage), NxtConsensusApiRoute(settings.restAPISettings, stateReader, history, settings.blockchainSettings.functionalitySettings), @@ -125,7 +139,6 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con DebugApiRoute(settings.restAPISettings, wallet, stateReader, history, peerDatabase, establishedConnections, blockchainUpdater, allChannels, utxStorage, blockchainDebugInfo, miner, configRoot), WavesApiRoute(settings.restAPISettings, wallet, utxStorage, allChannels, time), AssetsApiRoute(settings.restAPISettings, wallet, utxStorage, allChannels, stateReader, time), - NodeApiRoute(settings.restAPISettings, () => this.shutdown()), ActivationApiRoute(settings.restAPISettings, settings.blockchainSettings.functionalitySettings, settings.featuresSettings, history, featureProvider), AssetsBroadcastApiRoute(settings.restAPISettings, utxStorage, allChannels), LeaseApiRoute(settings.restAPISettings, wallet, utxStorage, allChannels, time), @@ -134,7 +147,7 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con AliasBroadcastApiRoute(settings.restAPISettings, utxStorage, allChannels) ) - val apiTypes = Seq( + val apiTypes = tags ++ Seq( typeOf[BlocksApiRoute], typeOf[TransactionsApiRoute], typeOf[NxtConsensusApiRoute], @@ -146,7 +159,6 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con typeOf[DebugApiRoute], typeOf[WavesApiRoute], typeOf[AssetsApiRoute], - typeOf[NodeApiRoute], typeOf[ActivationApiRoute], typeOf[AssetsBroadcastApiRoute], typeOf[LeaseApiRoute], @@ -155,8 +167,10 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con typeOf[AliasBroadcastApiRoute] ) val combinedRoute: Route = CompositeHttpService(actorSystem, apiTypes, apiRoutes, settings.restAPISettings).compositeRoute - val httpFuture = Http().bindAndHandle(combinedRoute, settings.restAPISettings.bindAddress, settings.restAPISettings.port) - serverBinding = Await.result(httpFuture, 10.seconds) + val httpFuture = serverBinding.unbind().flatMap { _ => + Http().bindAndHandle(combinedRoute, settings.restAPISettings.bindAddress, settings.restAPISettings.port) + } + serverBinding = Await.result(httpFuture, 20.seconds) log.info(s"REST API was bound on ${settings.restAPISettings.bindAddress}:${settings.restAPISettings.port}") } diff --git a/src/main/scala/com/wavesplatform/Importer.scala b/src/main/scala/com/wavesplatform/Importer.scala index 69311c31280..6cd7b0b1bea 100644 --- a/src/main/scala/com/wavesplatform/Importer.scala +++ b/src/main/scala/com/wavesplatform/Importer.scala @@ -33,7 +33,7 @@ object Importer extends ScorexLogging { case Success(inputStream) => deleteFile(settings.blockchainSettings.blockchainFile) deleteFile(settings.blockchainSettings.stateFile) - val (history, _, stateWriter, _, blockchainUpdater, _) = StorageFactory(settings).get + val (history, _, stateWriter, _, blockchainUpdater, _, _) = StorageFactory(settings).get checkGenesis(history, settings, blockchainUpdater) val bis = new BufferedInputStream(inputStream) var quit = false diff --git a/src/main/scala/com/wavesplatform/history/StorageFactory.scala b/src/main/scala/com/wavesplatform/history/StorageFactory.scala index ea674b7c835..e9676da568a 100644 --- a/src/main/scala/com/wavesplatform/history/StorageFactory.scala +++ b/src/main/scala/com/wavesplatform/history/StorageFactory.scala @@ -21,16 +21,22 @@ object StorageFactory { } } - def apply(settings: WavesSettings, time: scorex.utils.Time = NTP): Try[(NgHistory with DebugNgHistory with AutoCloseable, FeatureProvider, AutoCloseable, StateReader, BlockchainUpdater, BlockchainDebugInfo)] = { + def apply[T](settings: WavesSettings, + beforeStateUpdate: (BlockchainUpdater, StateWriter) => T = (_: BlockchainUpdater, _: StateWriter) => (), + time: scorex.utils.Time = NTP): + Try[(NgHistory with DebugNgHistory with AutoCloseable, FeatureProvider, AutoCloseable, StateReader, BlockchainUpdater, BlockchainDebugInfo, T)] = + { val lock = new RWL(true) for { historyWriter <- HistoryWriterImpl(settings.blockchainSettings.blockchainFile, lock, settings.blockchainSettings.functionalitySettings, settings.featuresSettings) ss <- createStateStorage(historyWriter, settings.blockchainSettings.stateFile, settings.mvstorePageSplitSize) stateWriter = new StateWriterImpl(ss, settings.blockchainSettings.storeTransactionsInState, lock) + bcu = BlockchainUpdaterImpl(stateWriter, historyWriter, settings, time, lock) + callbackResult = beforeStateUpdate(bcu, stateWriter) } yield { - val bcu = BlockchainUpdaterImpl(stateWriter, historyWriter, settings, time, lock) + bcu.syncPersistedAndInMemory() val history: NgHistory with DebugNgHistory with FeatureProvider = bcu.historyReader - (history, history, stateWriter, bcu.bestLiquidState, bcu, bcu) + (history, history, stateWriter, bcu.bestLiquidState, bcu, bcu, callbackResult) } } } diff --git a/src/main/scala/com/wavesplatform/http/NodeApiRoute.scala b/src/main/scala/com/wavesplatform/http/NodeApiRoute.scala index 4f3a924c24b..2ce8f138020 100644 --- a/src/main/scala/com/wavesplatform/http/NodeApiRoute.scala +++ b/src/main/scala/com/wavesplatform/http/NodeApiRoute.scala @@ -1,20 +1,29 @@ package com.wavesplatform.http +import java.time.Instant import javax.ws.rs.Path import akka.http.scaladsl.server.Route import com.wavesplatform.Shutdownable +import com.wavesplatform.network.lastObserved import com.wavesplatform.settings.{Constants, RestAPISettings} +import com.wavesplatform.state2.StateWriter import io.swagger.annotations._ +import monix.eval.Coeval +import monix.execution.Scheduler.Implicits.global import play.api.libs.json.Json import scorex.api.http.{ApiRoute, CommonApiFunctions} +import scorex.transaction.{BlockchainUpdater, LastBlockInfo} import scorex.utils.ScorexLogging @Path("/node") @Api(value = "node") -case class NodeApiRoute(settings: RestAPISettings, application: Shutdownable) +case class NodeApiRoute(settings: RestAPISettings, blockchainUpdater: BlockchainUpdater, state: StateWriter, application: Shutdownable) extends ApiRoute with CommonApiFunctions with ScorexLogging { + private val lastHeight: Coeval[Option[LastBlockInfo]] = lastObserved(blockchainUpdater.lastBlockInfo) + private val lastState: Coeval[Option[StateWriter.Status]] = lastObserved(state.status) + override lazy val route = pathPrefix("node") { stop ~ status ~ version } @@ -39,6 +48,14 @@ case class NodeApiRoute(settings: RestAPISettings, application: Shutdownable) @Path("/status") @ApiOperation(value = "Status", notes = "Get status of the running core", httpMethod = "GET") def status: Route = (get & path("status")) { - complete(Json.obj()) + val (bcHeight, bcTime) = lastHeight().map { case LastBlockInfo(_, h, _, _, t) => (h, t) }.getOrElse((0, 0L)) + val (stHeight, stTime) = lastState().map { case StateWriter.Status(h, t) => (h, t) }.getOrElse((0, 0L)) + val lastUpdated = bcTime max stTime + complete(Json.obj( + "blockchainHeight" -> bcHeight, + "stateHeight" -> stHeight, + "updatedTimestamp" -> lastUpdated, + "updatedDate" -> Instant.ofEpochMilli(lastUpdated).toString + )) } } diff --git a/src/main/scala/com/wavesplatform/state2/BlockchainUpdaterImpl.scala b/src/main/scala/com/wavesplatform/state2/BlockchainUpdaterImpl.scala index 7b11afc9676..7941fd4407d 100644 --- a/src/main/scala/com/wavesplatform/state2/BlockchainUpdaterImpl.scala +++ b/src/main/scala/com/wavesplatform/state2/BlockchainUpdaterImpl.scala @@ -66,10 +66,10 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with SnapshotStateRea // Store last block information in a cache historyReader.lastBlock.foreach { b => - internalLastBlockInfo.onNext(LastBlockInfo(b.uniqueId, historyReader.height(), historyReader.score(), blockchainReady)) + internalLastBlockInfo.onNext(LastBlockInfo(b.uniqueId, historyReader.height(), historyReader.score(), blockchainReady, System.currentTimeMillis)) } - private def syncPersistedAndInMemory(): Unit = write { implicit l => + private[wavesplatform] def syncPersistedAndInMemory(): Unit = write { implicit l => log.info(heights("State rebuild started")) val notPersisted = historyWriter.height() - persisted.height @@ -187,12 +187,13 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with SnapshotStateRea } } }).map { - _ map { case ((newBlockDiff, discacrded)) => + _ map { case ((newBlockDiff, discarded)) => val height = historyWriter.height() + 1 ngState.set(Some(new NgState(block, newBlockDiff, featuresApprovedWithBlock(block)))) - historyReader.lastBlockId().foreach(id => internalLastBlockInfo.onNext(LastBlockInfo(id, historyReader.height(), historyReader.score(), blockchainReady))) + historyReader.lastBlockId().foreach(id => + internalLastBlockInfo.onNext(LastBlockInfo(id, historyReader.height(), historyReader.score(), blockchainReady, System.currentTimeMillis))) log.info(s"$block appended. New height: $height)") - discacrded + discarded } }) } @@ -241,7 +242,8 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with SnapshotStateRea } val totalDiscardedBlocks: Seq[Block] = discardedHistoryBlocks ++ discardedNgBlock.toSeq - if (totalDiscardedBlocks.nonEmpty) internalLastBlockInfo.onNext(LastBlockInfo(blockId, historyReader.height(), historyReader.score(), blockchainReady)) + if (totalDiscardedBlocks.nonEmpty) internalLastBlockInfo.onNext( + LastBlockInfo(blockId, historyReader.height(), historyReader.score(), blockchainReady, System.currentTimeMillis)) TxsInBlockchainStats.record(-totalDiscardedBlocks.size) Right(totalDiscardedBlocks) } @@ -270,8 +272,9 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with SnapshotStateRea historyWriter.lastBlock.map(_.timestamp), microBlock, ng.base.timestamp) } yield { log.info(s"$microBlock appended") - ng.append(microBlock, diff, System.currentTimeMillis()) - internalLastBlockInfo.onNext(LastBlockInfo(microBlock.totalResBlockSig, historyReader.height(), historyReader.score(), ready = true)) + val now = System.currentTimeMillis + ng.append(microBlock, diff, now) + internalLastBlockInfo.onNext(LastBlockInfo(microBlock.totalResBlockSig, historyReader.height(), historyReader.score(), ready = true, now)) } } } @@ -300,12 +303,8 @@ object BlockchainUpdaterImpl extends ScorexLogging { history: HistoryWriterImpl, settings: WavesSettings, time: scorex.utils.Time, - synchronizationToken: ReentrantReadWriteLock): BlockchainUpdaterImpl = { - val blockchainUpdater = new BlockchainUpdaterImpl(persistedState, settings, time, history, history, synchronizationToken) - log.info(blockchainUpdater.heights("Constructing BlockchainUpdaterImpl")) - blockchainUpdater.syncPersistedAndInMemory() - blockchainUpdater - } + synchronizationToken: ReentrantReadWriteLock): BlockchainUpdaterImpl = + new BlockchainUpdaterImpl(persistedState, settings, time, history, history, synchronizationToken) def areVersionsOfSameBlock(b1: Block, b2: Block): Boolean = b1.signerData.generator == b2.signerData.generator && diff --git a/src/main/scala/com/wavesplatform/state2/StateWriter.scala b/src/main/scala/com/wavesplatform/state2/StateWriter.scala index 7fecad86e3d..ae31a747d75 100644 --- a/src/main/scala/com/wavesplatform/state2/StateWriter.scala +++ b/src/main/scala/com/wavesplatform/state2/StateWriter.scala @@ -5,16 +5,26 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import cats.Monoid import cats.implicits._ import com.wavesplatform.metrics.Instrumented +import com.wavesplatform.state2.StateWriter.Status import com.wavesplatform.state2.reader.StateReaderImpl +import monix.reactive.Observable +import monix.reactive.subjects.BehaviorSubject import scorex.transaction.PaymentTransaction import scorex.transaction.assets.TransferTransaction import scorex.transaction.assets.exchange.ExchangeTransaction import scorex.utils.ScorexLogging +import scorex.utils.Synchronized.ReadLock trait StateWriter { def applyBlockDiff(blockDiff: BlockDiff): Unit def clear(): Unit + + def status: Observable[Status] +} + +object StateWriter { + case class Status(height: Int, lastUpdated: Long) } class StateWriterImpl(p: StateStorage, storeTransactions: Boolean, synchronizationToken: ReentrantReadWriteLock) @@ -22,6 +32,10 @@ class StateWriterImpl(p: StateStorage, storeTransactions: Boolean, synchronizati import StateStorage._ + override val status = read { implicit l => + BehaviorSubject(Status(sp().getHeight, System.currentTimeMillis)) + } + override def close(): Unit = p.close() override def applyBlockDiff(blockDiff: BlockDiff): Unit = write { implicit l => @@ -106,7 +120,8 @@ class StateWriterImpl(p: StateStorage, storeTransactions: Boolean, synchronizati measureSizeLog("lease info")(blockDiff.txsDiff.leaseState)( _.foreach { case (id, isActive) => sp().leaseState.put(id, isActive) }) - sp().setHeight(newHeight) + setHeight(newHeight) + val nextChunkOfBlocks = !sameQuotient(newHeight, oldHeight, 1000) sp().commit(nextChunkOfBlocks) log.info(s"BlockDiff commit complete. Persisted height = $newHeight") @@ -125,7 +140,12 @@ class StateWriterImpl(p: StateStorage, storeTransactions: Boolean, synchronizati sp().aliasToAddress.clear() sp().leaseState.clear() sp().lastBalanceSnapshotHeight.clear() - sp().setHeight(0) + setHeight(0) sp().commit(compact = true) } + + private def setHeight(newHeight: Int)(implicit lock: ReadLock) = { + sp().setHeight(newHeight) + status.onNext(Status(newHeight, System.currentTimeMillis)) + } } diff --git a/src/main/scala/scorex/transaction/BlockchainUpdater.scala b/src/main/scala/scorex/transaction/BlockchainUpdater.scala index 4fef37b6d4c..b215dadf4bc 100644 --- a/src/main/scala/scorex/transaction/BlockchainUpdater.scala +++ b/src/main/scala/scorex/transaction/BlockchainUpdater.scala @@ -25,7 +25,7 @@ trait BlockchainDebugInfo { } -case class LastBlockInfo(id: BlockId, height: Int, score: BlockchainScore, ready: Boolean) +case class LastBlockInfo(id: BlockId, height: Int, score: BlockchainScore, ready: Boolean, lastUpdated: Long) case class HashInfo(height: Int, hash: Int) diff --git a/src/test/scala/com/wavesplatform/UtxPoolSpecification.scala b/src/test/scala/com/wavesplatform/UtxPoolSpecification.scala index a6692fb61d8..788e39f85e5 100755 --- a/src/test/scala/com/wavesplatform/UtxPoolSpecification.scala +++ b/src/test/scala/com/wavesplatform/UtxPoolSpecification.scala @@ -40,7 +40,7 @@ class UtxPoolSpecification extends FreeSpec val genesisSettings = TestHelpers.genesisSettings(Map(senderAccount -> senderBalance)) val settings = WavesSettings.fromConfig(config).copy(blockchainSettings = BlockchainSettings(None, None, false, None, 'T', 5, 5, FunctionalitySettings.TESTNET, genesisSettings)) - val (history, _, _, state, bcu, _) = StorageFactory(settings).get + val (history, _, _, state, bcu, _, _) = StorageFactory(settings).get bcu.processBlock(Block.genesis(genesisSettings).right.get) diff --git a/src/test/scala/com/wavesplatform/history/package.scala b/src/test/scala/com/wavesplatform/history/package.scala index 4c1e08267a3..6c64416219c 100644 --- a/src/test/scala/com/wavesplatform/history/package.scala +++ b/src/test/scala/com/wavesplatform/history/package.scala @@ -38,7 +38,7 @@ package object history { val DefaultWavesSettings: WavesSettings = settings.copy(blockchainSettings = DefaultBlockchainSettings) def domain(settings: WavesSettings): Domain = { - val (history, _, _, stateReader, blockchainUpdater, _) = StorageFactory(settings).get + val (history, _, _, stateReader, blockchainUpdater, _, _) = StorageFactory(settings).get Domain(history, stateReader, blockchainUpdater) } diff --git a/src/test/scala/scorex/transaction/BlockchainUpdaterTest.scala b/src/test/scala/scorex/transaction/BlockchainUpdaterTest.scala index 09d2881df1b..96ded94c038 100644 --- a/src/test/scala/scorex/transaction/BlockchainUpdaterTest.scala +++ b/src/test/scala/scorex/transaction/BlockchainUpdaterTest.scala @@ -31,7 +31,7 @@ class BlockchainUpdaterTest extends FunSuite with Matchers with HistoryTest with private def storageFactory() = StorageFactory(WavesSettings).get ignore ("concurrent access to lastBlock doesn't throw any exception") { - val (h, fp, _, _, bu, _) = storageFactory() + val (h, fp, _, _, bu, _, _) = storageFactory() bu.processBlock(genesisBlock) @@ -57,7 +57,7 @@ class BlockchainUpdaterTest extends FunSuite with Matchers with HistoryTest with test("features approved and accepted as height grows") { - val (h, fp, _, _, bu, _) = storageFactory() + val (h, fp, _, _, bu, _, _) = storageFactory() bu.processBlock(genesisBlock) @@ -94,7 +94,7 @@ class BlockchainUpdaterTest extends FunSuite with Matchers with HistoryTest with } test("features rollback with block rollback") { - val (h, fp, _, _, bu, _) = storageFactory() + val (h, fp, _, _, bu, _, _) = storageFactory() bu.processBlock(genesisBlock) @@ -149,7 +149,7 @@ class BlockchainUpdaterTest extends FunSuite with Matchers with HistoryTest with } test("feature activation height is not overrided with further periods") { - val (h, fp, _, _, bu, _) = storageFactory() + val (h, fp, _, _, bu, _, _) = storageFactory() bu.processBlock(genesisBlock) @@ -171,7 +171,7 @@ class BlockchainUpdaterTest extends FunSuite with Matchers with HistoryTest with } test("feature activated only by 90% of blocks") { - val (h, fp, _, _, bu, _) = storageFactory() + val (h, fp, _, _, bu, _, _) = storageFactory() bu.processBlock(genesisBlock) @@ -194,7 +194,7 @@ class BlockchainUpdaterTest extends FunSuite with Matchers with HistoryTest with } test("features votes resets when voting window changes") { - val (h, fp, _, _, bu, _) = storageFactory() + val (h, fp, _, _, bu, _, _) = storageFactory() bu.processBlock(genesisBlock) @@ -234,7 +234,7 @@ class BlockchainUpdaterTest extends FunSuite with Matchers with HistoryTest with }) - val (h, fp, _, _, bu, _) = StorageFactory(WavesSettings).get + val (h, fp, _, _, bu, _, _) = storageFactory() bu.processBlock(genesisBlock) (1 to ApprovalPeriod * 2).foreach { i => @@ -249,7 +249,7 @@ class BlockchainUpdaterTest extends FunSuite with Matchers with HistoryTest with } test("sunny day test when known feature activated") { - val (h, fp, _, _, bu, _) = StorageFactory(WavesSettings).get + val (h, fp, _, _, bu, _, _) = storageFactory() bu.processBlock(genesisBlock) (1 until ApprovalPeriod * 2 - 1).foreach { i =>