diff --git a/src/main/scala/com/wavesplatform/Coordinator.scala b/src/main/scala/com/wavesplatform/Coordinator.scala index 002829e95df..b065dbfc15d 100644 --- a/src/main/scala/com/wavesplatform/Coordinator.scala +++ b/src/main/scala/com/wavesplatform/Coordinator.scala @@ -34,15 +34,11 @@ object Coordinator extends ScorexLogging with Instrumented { extension.zipWithIndex.forall(p => checkpoint.isBlockValid(p._1.signerData.signature, lastCommonHeight + 1 + p._2)) lazy val forkApplicationResultEi: Either[ValidationError, BigInt] = { - val firstDeclined = extension.view - .map { b => - b -> appendBlock( - checkpoint, history, blockchainUpdater, stateReader, utxStorage, time, settings.blockchainSettings, - featureProvider - )(b, local = false).right.map { baseHeight => - BlockStats.applied(b, BlockStats.Source.Ext, baseHeight) - } + val firstDeclined = extension.view.map { b => + b -> appendBlock(checkpoint, history, blockchainUpdater, stateReader, utxStorage, time, settings.blockchainSettings, featureProvider)(b).right.map { + _.foreach(bh => BlockStats.applied(b, BlockStats.Source.Ext, bh)) } + } .zipWithIndex .collectFirst { case ((b, Left(e)), i) => (i, b, e) } @@ -103,32 +99,20 @@ object Coordinator extends ScorexLogging with Instrumented { } - private def updateBlockchainReadinessFlag(history: History, time: Time, blockchainReadiness: AtomicBoolean, maxBlockchainAge: FiniteDuration): Boolean = { + def updateBlockchainReadinessFlag(history: History, time: Time, blockchainReadiness: AtomicBoolean, maxBlockchainAge: FiniteDuration): Boolean = { val expired = time.correctedTime() - history.lastBlockTimestamp().get < maxBlockchainAge.toMillis blockchainReadiness.compareAndSet(expired, !expired) } def processSingleBlock(checkpoint: CheckpointService, history: History, blockchainUpdater: BlockchainUpdater, time: Time, - stateReader: StateReader, utxStorage: UtxPool, blockchainReadiness: AtomicBoolean, - settings: WavesSettings, featureProvider: FeatureProvider) - (newBlock: Block, local: Boolean): Either[ValidationError, Option[BigInt]] = measureSuccessful(blockProcessingTimeStats, history.write { implicit l => - if (history.contains(newBlock)) - Right(None) - else { - val newScore = for { - _ <- Either.cond(history.heightOf(newBlock.reference).exists(_ >= history.height() - 1), (), GenericError("Can process either new top block or current top block's competitor")) - baseHeight <- appendBlock(checkpoint, history, blockchainUpdater, stateReader, utxStorage, time, settings.blockchainSettings, featureProvider)(newBlock, local) - } yield { - if (local) BlockStats.mined(newBlock, baseHeight) - else BlockStats.applied(newBlock, BlockStats.Source.Broadcast, baseHeight) - history.score() - } - - if (local || newScore.isRight) { - updateBlockchainReadinessFlag(history, time, blockchainReadiness, settings.minerSettings.intervalAfterLastBlockThenGenerationIsAllowed) - } - newScore.right.map(Some(_)) - } + stateReader: StateReader, utxStorage: UtxPool, + settings: BlockchainSettings, featureProvider: FeatureProvider) + (newBlock: Block): Either[ValidationError, Option[BigInt]] = measureSuccessful(blockProcessingTimeStats, history.write { implicit l => + if (history.contains(newBlock)) Right(None) + else for { + _ <- Either.cond(history.heightOf(newBlock.reference).exists(_ >= history.height() - 1), (), GenericError("Can process either new top block or current top block's competitor")) + maybeBaseHeight <- appendBlock(checkpoint, history, blockchainUpdater, stateReader, utxStorage, time, settings, featureProvider)(newBlock) + } yield maybeBaseHeight map (_ => history.score()) }) def processMicroBlock(checkpoint: CheckpointService, history: History, blockchainUpdater: BlockchainUpdater, utxStorage: UtxPool) @@ -147,8 +131,7 @@ object Coordinator extends ScorexLogging with Instrumented { private def appendBlock(checkpoint: CheckpointService, history: History, blockchainUpdater: BlockchainUpdater, stateReader: StateReader, utxStorage: UtxPool, time: Time, settings: BlockchainSettings, - featureProvider: FeatureProvider) - (block: Block, local: Boolean): Either[ValidationError, Int] = for { + featureProvider: FeatureProvider)(block: Block): Either[ValidationError, Option[Int]] = for { _ <- Either.cond(checkpoint.isBlockValid(block.signerData.signature, history.height() + 1), (), GenericError(s"Block $block at height ${history.height() + 1} is not valid w.r.t. checkpoint")) _ <- blockConsensusValidation(history, featureProvider, settings, time.correctedTime(), block) { height => @@ -156,11 +139,11 @@ object Coordinator extends ScorexLogging with Instrumented { .flatMap(validateEffectiveBalance(featureProvider, settings.functionalitySettings, block, height)) } baseHeight = history.height() - discardedTxs <- blockchainUpdater.processBlock(block) + maybeDiscardedTxs <- blockchainUpdater.processBlock(block) } yield { utxStorage.removeAll(block.transactionData) - discardedTxs.foreach(utxStorage.putIfNew) - baseHeight + maybeDiscardedTxs.toSeq.flatten.foreach(utxStorage.putIfNew) + maybeDiscardedTxs.map(_ => baseHeight) } def processCheckpoint(checkpoint: CheckpointService, history: History, blockchainUpdater: BlockchainUpdater) diff --git a/src/main/scala/com/wavesplatform/metrics/BlockStats.scala b/src/main/scala/com/wavesplatform/metrics/BlockStats.scala index 884ae9f8f23..225b1613391 100644 --- a/src/main/scala/com/wavesplatform/metrics/BlockStats.scala +++ b/src/main/scala/com/wavesplatform/metrics/BlockStats.scala @@ -47,10 +47,10 @@ object BlockStats { Seq.empty ) - def applied(b: Block, source: Source, baseHeight: Int): Unit = write( + def applied(b: Block, source: Source, newHeight: Int): Unit = write( block(b, source) .addField("txs", b.transactionData.size) - .addField("height", baseHeight), + .addField("height", newHeight), Event.Applied, Seq.empty ) diff --git a/src/main/scala/com/wavesplatform/mining/Miner.scala b/src/main/scala/com/wavesplatform/mining/Miner.scala index 93f1d89260c..fd636179b07 100644 --- a/src/main/scala/com/wavesplatform/mining/Miner.scala +++ b/src/main/scala/com/wavesplatform/mining/Miner.scala @@ -59,7 +59,7 @@ class MinerImpl( private lazy val minerSettings = settings.minerSettings private lazy val minMicroBlockDurationMills = minerSettings.minMicroBlockAge.toMillis private lazy val blockchainSettings = settings.blockchainSettings - private lazy val processBlock = Coordinator.processSingleBlock(checkpoint, history, blockchainUpdater, timeService, stateReader, utx, blockchainReadiness, settings, featureProvider) _ + private lazy val processBlock = Coordinator.processSingleBlock(checkpoint, history, blockchainUpdater, timeService, stateReader, utx, settings.blockchainSettings, featureProvider) _ private val scheduledAttempts = SerialCancelable() private val microBlockAttempt = SerialCancelable() @@ -183,9 +183,11 @@ class MinerImpl( nextBlockGenerationTimes += account.toAddress -> (System.currentTimeMillis() + offset.toMillis) generateOneBlockTask(account, balance)(offset).flatMap { case Right(block) => Task.now { - processBlock(block, true) match { + processBlock(block) match { case Left(err) => log.warn("Error mining Block: " + err.toString) case Right(Some(score)) => + BlockStats.mined(block, history.height()) + Coordinator.updateBlockchainReadinessFlag(history, timeService, blockchainReadiness, settings.minerSettings.intervalAfterLastBlockThenGenerationIsAllowed) allChannels.broadcast(BlockForged(block)) allChannels.broadcast(LocalScoreChanged(score)) scheduleMining() diff --git a/src/main/scala/com/wavesplatform/network/CoordinatorHandler.scala b/src/main/scala/com/wavesplatform/network/CoordinatorHandler.scala index c4cd90f7a65..5e444da7291 100644 --- a/src/main/scala/com/wavesplatform/network/CoordinatorHandler.scala +++ b/src/main/scala/com/wavesplatform/network/CoordinatorHandler.scala @@ -40,7 +40,7 @@ class CoordinatorHandler(checkpointService: CheckpointService, private val processCheckpoint = Coordinator.processCheckpoint(checkpointService, history, blockchainUpdater) _ private val processFork = Coordinator.processFork(checkpointService, history, blockchainUpdater, stateReader, utxStorage, time, settings, blockchainReadiness, featureProvider) _ - private val processBlock = Coordinator.processSingleBlock(checkpointService, history, blockchainUpdater, time, stateReader, utxStorage, blockchainReadiness, settings, featureProvider) _ + private val processBlock = Coordinator.processSingleBlock(checkpointService, history, blockchainUpdater, time, stateReader, utxStorage, settings.blockchainSettings, featureProvider) _ private val processMicroBlock = Coordinator.processMicroBlock(checkpointService, history, blockchainUpdater, utxStorage) _ private def scheduleMiningAndBroadcastScore(score: BigInt): Unit = { @@ -81,11 +81,13 @@ class CoordinatorHandler(checkpointService: CheckpointService, case b: Block => (Task { BlockStats.received(b, BlockStats.Source.Broadcast, ctx) CoordinatorHandler.blockReceivingLag.safeRecord(System.currentTimeMillis() - b.timestamp) - Signed.validateSignatures(b).flatMap(b => processBlock(b, false)) + Signed.validateSignatures(b).flatMap(b => processBlock(b)) } map { case Right(None) => log.trace(s"$b already appended") case Right(Some(newScore)) => + BlockStats.applied(b, BlockStats.Source.Broadcast, history.height()) + Coordinator.updateBlockchainReadinessFlag(history, time, blockchainReadiness, settings.minerSettings.intervalAfterLastBlockThenGenerationIsAllowed) log.debug(s"Appended $b") if (b.transactionData.isEmpty) allChannels.broadcast(BlockForged(b), Some(ctx.channel())) diff --git a/src/main/scala/com/wavesplatform/state2/BlockchainUpdaterImpl.scala b/src/main/scala/com/wavesplatform/state2/BlockchainUpdaterImpl.scala index bd535031d4e..522da4350b8 100644 --- a/src/main/scala/com/wavesplatform/state2/BlockchainUpdaterImpl.scala +++ b/src/main/scala/com/wavesplatform/state2/BlockchainUpdaterImpl.scala @@ -114,7 +114,7 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with StateReader, else Set.empty } - override def processBlock(block: Block): Either[ValidationError, DiscardedTransactions] = write { implicit l => + override def processBlock(block: Block): Either[ValidationError, Option[DiscardedTransactions]] = write { implicit l => if (topMemoryDiff().heightDiff >= minimumInMemoryDiffSize) { persisted.applyBlockDiff(bottomMemoryDiff()) bottomMemoryDiff.set(topMemoryDiff()) @@ -175,14 +175,13 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with StateReader, log.error(errorText) Left(BlockAppendError(errorText, block)) } - }).map { - case Some((newBlockDiff, discacrded)) => + }).map { _ map { case ((newBlockDiff, discacrded)) => val height = historyWriter.height() + 1 ngState.set(Some(NgState(block, newBlockDiff, 0L, featuresApprovedWithBlock(block)))) historyReader.lastBlockId().foreach(lastBlockId.onNext) log.info(s"$block appended. New height: $height)") discacrded - case None => Seq.empty + } }) } diff --git a/src/main/scala/scorex/transaction/BlockchainUpdater.scala b/src/main/scala/scorex/transaction/BlockchainUpdater.scala index adb8454a27c..6b052aa42fe 100644 --- a/src/main/scala/scorex/transaction/BlockchainUpdater.scala +++ b/src/main/scala/scorex/transaction/BlockchainUpdater.scala @@ -8,7 +8,7 @@ import scorex.utils.Synchronized trait BlockchainUpdater extends Synchronized { - def processBlock(block: Block): Either[ValidationError, DiscardedTransactions] + def processBlock(block: Block): Either[ValidationError, Option[DiscardedTransactions]] def processMicroBlock(microBlock: MicroBlock): Either[ValidationError, Unit]