From 06f58b639b614602b1b727375c07515e2b6b0dc1 Mon Sep 17 00:00:00 2001 From: Ilya Smagin Date: Wed, 10 Jan 2018 16:42:00 +0300 Subject: [PATCH 1/5] NODE-464: discard 'error executing task' message if it's element not found --- .../wavesplatform/network/HistoryReplier.scala | 8 +++++--- src/main/scala/scorex/utils/ScorexLogging.scala | 16 ++++++++++++++++ 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/wavesplatform/network/HistoryReplier.scala b/src/main/scala/com/wavesplatform/network/HistoryReplier.scala index 802f3097004..6bd7f30e1ca 100644 --- a/src/main/scala/com/wavesplatform/network/HistoryReplier.scala +++ b/src/main/scala/com/wavesplatform/network/HistoryReplier.scala @@ -41,7 +41,7 @@ class HistoryReplier(history: NgHistory, settings: SynchronizationSettings) exte case Some((parent, extension)) => log.debug(s"${id(ctx)} Got GetSignatures with ${otherSigs.length}, found common parent $parent and sending ${extension.length} more signatures") ctx.writeAndFlush(Signatures(parent +: extension)) - case None if otherSigs.length == 1 && otherSigs.head == history.lastBlock.get.uniqueId => + case None if otherSigs.lengthCompare(1) == 0 && otherSigs.head == history.lastBlock.get.uniqueId => // this is the special case when both nodes only have genesis block log.debug(s"${id(ctx)} Both local and remote nodes only have genesis block") ctx.writeAndFlush(Signatures(otherSigs)) @@ -52,13 +52,15 @@ class HistoryReplier(history: NgHistory, settings: SynchronizationSettings) exte case GetBlock(sig) => Task(knownBlocks.get(sig)).map(bytes => ctx.writeAndFlush(RawBytes(BlockSpec.messageCode, bytes))) - .runAsyncLogErr + .logErrDiscardNoSuchElementException + .runAsync case mbr@MicroBlockRequest(totalResBlockSig) => Task(knownMicroBlocks.get(totalResBlockSig)).map { bytes => ctx.writeAndFlush(RawBytes(MicroBlockResponseSpec.messageCode, bytes)) log.trace(id(ctx) + s"Sent MicroBlockResponse(total=${totalResBlockSig.trim})") - }.runAsyncLogErr + }.logErrDiscardNoSuchElementException + .runAsync case _: Handshake => Task { ctx.writeAndFlush(LocalScoreChanged(history.score())) diff --git a/src/main/scala/scorex/utils/ScorexLogging.scala b/src/main/scala/scorex/utils/ScorexLogging.scala index 7aace9a3433..d927f5090ef 100755 --- a/src/main/scala/scorex/utils/ScorexLogging.scala +++ b/src/main/scala/scorex/utils/ScorexLogging.scala @@ -1,5 +1,6 @@ package scorex.utils +import com.google.common.util.concurrent.UncheckedExecutionException import monix.eval.Task import monix.execution.{CancelableFuture, Scheduler} import monix.reactive.Observable @@ -69,6 +70,21 @@ trait ScorexLogging { Task.raiseError[A](ex) }) } + + def logErrDiscardNoSuchElementException: Task[A] = { + t.onErrorHandleWith(ex => { + ex match { + case gex: UncheckedExecutionException => + Option(gex.getCause) match { + case Some(nseex: NoSuchElementException) => + case _ => log.error(s"Error executing task", ex) + } + case _ => log.error(s"Error executing task", ex) + } + Task.raiseError[A](ex) + } + ) + } } implicit class ObservableExt[A](o: Observable[A]) { From c09f0df7cc57f07253f097b238255b829f3c70fe Mon Sep 17 00:00:00 2001 From: Ilya Smagin Date: Thu, 11 Jan 2018 12:42:55 +0300 Subject: [PATCH 2/5] NODE-449: Node should not report a warning if mine microblocks when the transactions limit is met --- src/main/scala/com/wavesplatform/UtxPool.scala | 4 ++-- src/main/scala/com/wavesplatform/mining/Miner.scala | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com/wavesplatform/UtxPool.scala b/src/main/scala/com/wavesplatform/UtxPool.scala index b89079a6505..f0a881106b7 100644 --- a/src/main/scala/com/wavesplatform/UtxPool.scala +++ b/src/main/scala/com/wavesplatform/UtxPool.scala @@ -157,9 +157,9 @@ class UtxPoolImpl(time: Time, .values.asScala.toSeq .sorted(TransactionsOrdering.InUTXPool) .foldLeft((Seq.empty[ByteStr], Seq.empty[Transaction], Monoid[Diff].empty)) { - case ((invalid, valid, diff), tx) if valid.size <= max => + case ((invalid, valid, diff), tx) if valid.lengthCompare(max) <= 0 => differ(composite(diff.asBlockDiff, s), tx) match { - case Right(newDiff) if valid.size < max => + case Right(newDiff) if valid.lengthCompare(max) < 0 => (invalid, tx +: valid, Monoid.combine(diff, newDiff)) case Right(_) => (invalid, valid, diff) diff --git a/src/main/scala/com/wavesplatform/mining/Miner.scala b/src/main/scala/com/wavesplatform/mining/Miner.scala index 2892393f440..48716174ec2 100644 --- a/src/main/scala/com/wavesplatform/mining/Miner.scala +++ b/src/main/scala/com/wavesplatform/mining/Miner.scala @@ -127,15 +127,17 @@ class MinerImpl( private def generateOneMicroBlockTask(account: PrivateKeyAccount, accumulatedBlock: Block): Task[Either[ValidationError, Option[Block]]] = Task { log.trace(s"Generating microblock for $account") val pc = allChannels.size() + val accumulatedCount = accumulatedBlock.transactionCount lazy val unconfirmed = measureLog("packing unconfirmed transactions for microblock") { - utx.packUnconfirmed(settings.minerSettings.maxTransactionsInMicroBlock, sortInBlock = false) + val maxTxsForMicroblock = Math.min(MaxTransactionsPerBlockVer3 - accumulatedCount, settings.minerSettings.maxTransactionsInMicroBlock) + utx.packUnconfirmed(maxTxsForMicroblock, sortInBlock = false) } if (pc < minerSettings.quorum) { log.trace(s"Quorum not available ($pc/${minerSettings.quorum}, not forging microblock with ${account.address}") Task.now(Right(None)) } else if (unconfirmed.isEmpty) { - log.trace("skipping microBlock because no txs in utx pool") + log.trace(s"Skipping microBlock because no txs packed(already accumulated in block: $accumulatedCount)") Task.now(Right(None)) } else { From b72299d1102d3fd3ce22347a5dd18f2f56f5b5a0 Mon Sep 17 00:00:00 2001 From: Ilya Smagin Date: Thu, 11 Jan 2018 13:01:57 +0300 Subject: [PATCH 3/5] NODE-449: Node should not report a warning if mine microblocks when the transactions limit is met --- .../com/wavesplatform/mining/Miner.scala | 65 +++++++++++-------- 1 file changed, 38 insertions(+), 27 deletions(-) diff --git a/src/main/scala/com/wavesplatform/mining/Miner.scala b/src/main/scala/com/wavesplatform/mining/Miner.scala index 48716174ec2..fa863a58a7a 100644 --- a/src/main/scala/com/wavesplatform/mining/Miner.scala +++ b/src/main/scala/com/wavesplatform/mining/Miner.scala @@ -124,27 +124,28 @@ class MinerImpl( }.delayExecution(delay) - private def generateOneMicroBlockTask(account: PrivateKeyAccount, accumulatedBlock: Block): Task[Either[ValidationError, Option[Block]]] = Task { - log.trace(s"Generating microblock for $account") + private def generateOneMicroBlockTask(account: PrivateKeyAccount, accumulatedBlock: Block): Task[MicroblockMiningResult] = { + log.trace(s"Generating microBlock for $account") val pc = allChannels.size() val accumulatedCount = accumulatedBlock.transactionCount lazy val unconfirmed = measureLog("packing unconfirmed transactions for microblock") { val maxTxsForMicroblock = Math.min(MaxTransactionsPerBlockVer3 - accumulatedCount, settings.minerSettings.maxTransactionsInMicroBlock) utx.packUnconfirmed(maxTxsForMicroblock, sortInBlock = false) } - if (pc < minerSettings.quorum) { + if (accumulatedCount == MaxTransactionsPerBlockVer3) { + log.trace(s"Stopping forging microBlocks, block is already full") + Task.now(Stop) + } else if (pc < minerSettings.quorum) { log.trace(s"Quorum not available ($pc/${minerSettings.quorum}, not forging microblock with ${account.address}") - Task.now(Right(None)) - } - else if (unconfirmed.isEmpty) { - log.trace(s"Skipping microBlock because no txs packed(already accumulated in block: $accumulatedCount)") - Task.now(Right(None)) - } - else { + Task.now(Retry) + } else if (unconfirmed.isEmpty) { + log.trace(s"Skipping microBlock because utx is empty") + Task.now(Retry) + } else { log.trace(s"Accumulated ${unconfirmed.size} txs for microblock") val start = System.currentTimeMillis() (for { - signedBlock <- EitherT(Task.now(Block.buildAndSign( + signedBlock <- EitherT.fromEither[Task](Block.buildAndSign( version = 3, timestamp = accumulatedBlock.timestamp, reference = accumulatedBlock.reference, @@ -152,32 +153,35 @@ class MinerImpl( transactionData = accumulatedBlock.transactionData ++ unconfirmed, signer = account, featureVotes = accumulatedBlock.featureVotes - ))) - microBlock <- EitherT(Task.now(MicroBlock.buildAndSign(account, unconfirmed, accumulatedBlock.signerData.signature, signedBlock.signerData.signature))) + )) + microBlock <- EitherT.fromEither[Task](MicroBlock.buildAndSign(account, unconfirmed, accumulatedBlock.signerData.signature, signedBlock.signerData.signature)) _ = microBlockBuildTimeStats.safeRecord(System.currentTimeMillis() - start) _ <- EitherT(MicroblockAppender(checkpoint, history, blockchainUpdater, utx)(microBlock)) - } yield { - BlockStats.mined(microBlock) - log.trace(s"$microBlock has been mined for $account}") - allChannels.broadcast(MicroBlockInv(account, microBlock.totalResBlockSig, microBlock.prevResBlockSig)) - Some(signedBlock) - }).value.map { - _.left.map { err => - log.trace(s"MicroBlock has NOT been mined for $account} because $err") - err - } + } yield (microBlock, signedBlock)).value map { + case Left(err) => + Error(err) + case Right((microBlock, signedBlock)) => + BlockStats.mined(microBlock) + log.trace(s"$microBlock has been mined for $account}") + allChannels.broadcast(MicroBlockInv(account, microBlock.totalResBlockSig, microBlock.prevResBlockSig)) + Success(signedBlock) } } - }.flatten + } private def generateMicroBlockSequence(account: PrivateKeyAccount, accumulatedBlock: Block, delay: FiniteDuration): Task[Unit] = { debugState = MinerDebugInfo.MiningMicroblocks generateOneMicroBlockTask(account, accumulatedBlock).delayExecution(delay).flatMap { - case Left(err) => Task { + case Error(e) => Task { + debugState = MinerDebugInfo.Error(e.toString) + log.warn("Error mining MicroBlock: " + e.toString) + } + case Success(newTotal) => generateMicroBlockSequence(account, newTotal, minerSettings.microBlockInterval) + case Retry => generateMicroBlockSequence(account, accumulatedBlock, minerSettings.microBlockInterval) + case Stop => Task { debugState = MinerDebugInfo.MiningBlocks - log.warn("Error mining MicroBlock: " + err.toString) + log.debug("MicroBlock mining completed, block is full") } - case Right(maybeNewTotal) => generateMicroBlockSequence(account, maybeNewTotal.getOrElse(accumulatedBlock), minerSettings.microBlockInterval) } } @@ -256,4 +260,11 @@ object Miner { val calculatedOffset = calculatedGenerationTimestamp - timeService.correctedTime() Math.max(minimalBlockGenerationOffset.toMillis, calculatedOffset).millis } + + sealed trait MicroblockMiningResult + case object Stop extends MicroblockMiningResult + case object Retry extends MicroblockMiningResult + case class Error(e: ValidationError) extends MicroblockMiningResult + case class Success(b: Block) extends MicroblockMiningResult + } From d714e672284eee1af6b0605623373f61d48a26f8 Mon Sep 17 00:00:00 2001 From: peterz Date: Fri, 12 Jan 2018 12:54:18 +0300 Subject: [PATCH 4/5] NODE-467 Stop exposing internal entities to NodeApiRoute --- .../scala/com/wavesplatform/Application.scala | 24 +++++------ .../scala/com/wavesplatform/Importer.scala | 3 +- .../history/HistoryWriterImpl.scala | 24 ++++++++--- .../history/StorageFactory.scala | 40 ++++++++++--------- .../com/wavesplatform/http/NodeApiRoute.scala | 13 ++---- .../state2/BlockchainUpdaterImpl.scala | 20 +++++----- .../wavesplatform/state2/StateStorage.scala | 17 +++++--- .../wavesplatform/state2/StateWriter.scala | 23 +---------- .../com/wavesplatform/utils/package.scala | 2 + .../transaction/BlockchainUpdater.scala | 2 +- .../scala/scorex/transaction/History.scala | 3 ++ .../scorex/transaction/NgHistoryReader.scala | 3 ++ .../wavesplatform/UtxPoolSpecification.scala | 3 +- .../com/wavesplatform/history/package.scala | 3 +- .../wavesplatform/network/TestHistory.scala | 3 ++ .../transaction/BlockchainUpdaterTest.scala | 21 +++++----- 16 files changed, 108 insertions(+), 96 deletions(-) diff --git a/src/main/scala/com/wavesplatform/Application.scala b/src/main/scala/com/wavesplatform/Application.scala index c27f20f146d..ca20e2a8dc6 100644 --- a/src/main/scala/com/wavesplatform/Application.scala +++ b/src/main/scala/com/wavesplatform/Application.scala @@ -20,7 +20,6 @@ import com.wavesplatform.metrics.Metrics import com.wavesplatform.mining.{Miner, MinerImpl} import com.wavesplatform.network._ import com.wavesplatform.settings._ -import com.wavesplatform.state2.StateWriter import com.wavesplatform.state2.appender.{BlockAppender, CheckpointAppender, ExtensionAppender, MicroblockAppender} import com.wavesplatform.utils.{SystemInformationReporter, forceStopApplication} import io.netty.channel.Channel @@ -55,20 +54,19 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con // Start /node API right away private implicit val as: ActorSystem = actorSystem private implicit val materializer: ActorMaterializer = ActorMaterializer() - - private val nodeApiLauncher = (bcu: BlockchainUpdater, state: StateWriter) => - Option(settings.restAPISettings.enable).collect { case true => - val tags = Seq(typeOf[NodeApiRoute]) - val routes = Seq(NodeApiRoute(settings.restAPISettings, bcu, state, () => this.shutdown())) - val combinedRoute: Route = CompositeHttpService(actorSystem, tags, routes, settings.restAPISettings).compositeRoute - val httpFuture = Http().bindAndHandle(combinedRoute, settings.restAPISettings.bindAddress, settings.restAPISettings.port) - serverBinding = Await.result(httpFuture, 10.seconds) - log.info(s"Node REST API was bound on ${settings.restAPISettings.bindAddress}:${settings.restAPISettings.port}") - (tags, routes) - } + private val (storage, heights) = StorageFactory(settings).get + private val nodeApi = Option(settings.restAPISettings.enable).collect { case true => + val tags = Seq(typeOf[NodeApiRoute]) + val routes = Seq(NodeApiRoute(settings.restAPISettings, heights, () => this.shutdown())) + val combinedRoute: Route = CompositeHttpService(actorSystem, tags, routes, settings.restAPISettings).compositeRoute + val httpFuture = Http().bindAndHandle(combinedRoute, settings.restAPISettings.bindAddress, settings.restAPISettings.port) + serverBinding = Await.result(httpFuture, 10.seconds) + log.info(s"Node REST API was bound on ${settings.restAPISettings.bindAddress}:${settings.restAPISettings.port}") + (tags, routes) + } private val checkpointService = new CheckpointServiceImpl(settings.blockchainSettings.checkpointFile, settings.checkpointsSettings) - private val (history, featureProvider, stateWriter, stateReader, blockchainUpdater, blockchainDebugInfo, nodeApi) = StorageFactory(settings, nodeApiLauncher).get + private val (history, featureProvider, stateWriter, stateReader, blockchainUpdater, blockchainDebugInfo) = storage() private lazy val upnp = new UPnP(settings.networkSettings.uPnPSettings) // don't initialize unless enabled private val wallet: Wallet = Wallet(settings.walletSettings) private val peerDatabase = new PeerDatabaseImpl(settings.networkSettings) diff --git a/src/main/scala/com/wavesplatform/Importer.scala b/src/main/scala/com/wavesplatform/Importer.scala index 6cd7b0b1bea..2916c68939e 100644 --- a/src/main/scala/com/wavesplatform/Importer.scala +++ b/src/main/scala/com/wavesplatform/Importer.scala @@ -33,7 +33,8 @@ object Importer extends ScorexLogging { case Success(inputStream) => deleteFile(settings.blockchainSettings.blockchainFile) deleteFile(settings.blockchainSettings.stateFile) - val (history, _, stateWriter, _, blockchainUpdater, _, _) = StorageFactory(settings).get + val (storage, _) = StorageFactory(settings).get + val (history, _, stateWriter, _, blockchainUpdater, _) = storage() checkGenesis(history, settings, blockchainUpdater) val bis = new BufferedInputStream(inputStream) var quit = false diff --git a/src/main/scala/com/wavesplatform/history/HistoryWriterImpl.scala b/src/main/scala/com/wavesplatform/history/HistoryWriterImpl.scala index 1b34a470953..06ef3eaf9cf 100644 --- a/src/main/scala/com/wavesplatform/history/HistoryWriterImpl.scala +++ b/src/main/scala/com/wavesplatform/history/HistoryWriterImpl.scala @@ -8,17 +8,19 @@ import com.wavesplatform.settings.{FeaturesSettings, FunctionalitySettings} import com.wavesplatform.state2.{BlockDiff, ByteStr, DataTypes, VariablesStorage, VersionableStorage} import com.wavesplatform.utils._ import kamon.Kamon +import org.h2.mvstore.MVMap import scorex.block.{Block, BlockHeader} import scorex.transaction.History.BlockchainScore import scorex.transaction.ValidationError.GenericError import scorex.transaction._ -import scorex.utils.{LogMVMapBuilder, ScorexLogging} +import scorex.utils.Synchronized.WriteLock +import scorex.utils.{LogMVMapBuilder, NTP, ScorexLogging, Time} import scala.collection.JavaConverters._ import scala.util.Try class HistoryWriterImpl private(file: Option[File], val synchronizationToken: ReentrantReadWriteLock, - functionalitySettings: FunctionalitySettings, featuresSettings: FeaturesSettings) + functionalitySettings: FunctionalitySettings, featuresSettings: FeaturesSettings, time: Time) extends VariablesStorage(createMVStore(file)) with VersionableStorage with History with FeatureProvider with ScorexLogging { override protected val Version: Int = 1 @@ -67,7 +69,7 @@ class HistoryWriterImpl private(file: Option[File], val synchronizationToken: Re val score = (if (height() == 0) BigInt(0) else this.score()) + block.blockScore() blockBodyByHeight.mutate(_.put(h, block.bytes())) scoreByHeight.mutate(_.put(h, score)) - blockIdByHeight.mutate(_.put(h, block.uniqueId)) + mutateBlockIdByHeight(_.put(h, block.uniqueId)) heightByBlockId.mutate(_.put(block.uniqueId, h)) featuresState.mutate(_.putAll(acceptedFeatures.diff(featuresState().keySet.asScala).map(_ -> h).toMap.asJava)) alterVotes(h, block.featureVotes, 1) @@ -99,7 +101,7 @@ class HistoryWriterImpl private(file: Option[File], val synchronizationToken: Re featuresState.mutate(fs => featuresToRemove.foreach(fs.remove)) } - val vOpt = Option(blockIdByHeight.mutate(_.remove(h))) + val vOpt = Option(mutateBlockIdByHeight(_.remove(h))) vOpt.map(v => heightByBlockId.mutate(_.remove(v))) db.commit() @@ -135,6 +137,16 @@ class HistoryWriterImpl private(file: Option[File], val synchronizationToken: Re override def blockHeaderAndSizeAt(height: Int): Option[(BlockHeader, Int)] = blockBytes(height).map(bytes => (BlockHeader.parseBytes(bytes).get._1, bytes.length)) + + private var heightInfo: (Int, Long) = (height(), time.getTimestamp()) + + override def debugInfo: HeightInfo = heightInfo + + private def mutateBlockIdByHeight[R](f: MVMap[Int, ByteStr] => R)(implicit readWriteLock: WriteLock): R = { + val result = blockIdByHeight.mutate(f) + heightInfo = (blockIdByHeight().size(), time.getTimestamp()) + result + } } object HistoryWriterImpl extends ScorexLogging { @@ -142,8 +154,8 @@ object HistoryWriterImpl extends ScorexLogging { private val CompactMemorySize = 10 * 1024 * 1024 def apply(file: Option[File], synchronizationToken: ReentrantReadWriteLock, functionalitySettings: FunctionalitySettings, - featuresSettings: FeaturesSettings): Try[HistoryWriterImpl] = - createWithStore[HistoryWriterImpl](file, new HistoryWriterImpl(file, synchronizationToken, functionalitySettings, featuresSettings), h => h.isConsistent) + featuresSettings: FeaturesSettings, time: Time = NTP): Try[HistoryWriterImpl] = + createWithStore[HistoryWriterImpl](file, new HistoryWriterImpl(file, synchronizationToken, functionalitySettings, featuresSettings, time), h => h.isConsistent) private val blockHeightStats = Kamon.metrics.histogram("block-height") private val blockSizeStats = Kamon.metrics.histogram("block-size-bytes") diff --git a/src/main/scala/com/wavesplatform/history/StorageFactory.scala b/src/main/scala/com/wavesplatform/history/StorageFactory.scala index e9676da568a..223032cb7e2 100644 --- a/src/main/scala/com/wavesplatform/history/StorageFactory.scala +++ b/src/main/scala/com/wavesplatform/history/StorageFactory.scala @@ -6,37 +6,39 @@ import java.util.concurrent.locks.{ReentrantReadWriteLock => RWL} import com.wavesplatform.features.FeatureProvider import com.wavesplatform.settings.WavesSettings import com.wavesplatform.state2._ +import com.wavesplatform.utils.HeightInfo +import monix.eval.Coeval import scorex.transaction._ -import scorex.utils.NTP +import scorex.utils.{NTP, Time} import scala.util.{Success, Try} object StorageFactory { - private def createStateStorage(history: History with FeatureProvider, stateFile: Option[File], pageSplitSize: Int): Try[StateStorage] = - StateStorage(stateFile, dropExisting = false).flatMap { ss => + type Storage = Coeval[(NgHistory with DebugNgHistory with AutoCloseable, FeatureProvider, AutoCloseable, StateReader, BlockchainUpdater, BlockchainDebugInfo)] + type HeightInfos = Coeval[(HeightInfo, HeightInfo)] + + private def createStateStorage(history: History with FeatureProvider, stateFile: Option[File], pageSplitSize: Int, time: Time): Try[StateStorage] = + StateStorage(stateFile, dropExisting = false, time).flatMap { ss => if (ss.getHeight <= history.height()) Success(ss) else { ss.close() - StateStorage(stateFile, dropExisting = true) + StateStorage(stateFile, dropExisting = true, time) } } - def apply[T](settings: WavesSettings, - beforeStateUpdate: (BlockchainUpdater, StateWriter) => T = (_: BlockchainUpdater, _: StateWriter) => (), - time: scorex.utils.Time = NTP): - Try[(NgHistory with DebugNgHistory with AutoCloseable, FeatureProvider, AutoCloseable, StateReader, BlockchainUpdater, BlockchainDebugInfo, T)] = - { + def apply[T](settings: WavesSettings, time: Time = NTP): Try[(Storage, HeightInfos)] = { val lock = new RWL(true) for { - historyWriter <- HistoryWriterImpl(settings.blockchainSettings.blockchainFile, lock, settings.blockchainSettings.functionalitySettings, settings.featuresSettings) - ss <- createStateStorage(historyWriter, settings.blockchainSettings.stateFile, settings.mvstorePageSplitSize) - stateWriter = new StateWriterImpl(ss, settings.blockchainSettings.storeTransactionsInState, lock) - bcu = BlockchainUpdaterImpl(stateWriter, historyWriter, settings, time, lock) - callbackResult = beforeStateUpdate(bcu, stateWriter) - } yield { - bcu.syncPersistedAndInMemory() - val history: NgHistory with DebugNgHistory with FeatureProvider = bcu.historyReader - (history, history, stateWriter, bcu.bestLiquidState, bcu, bcu, callbackResult) - } + historyWriter <- HistoryWriterImpl(settings.blockchainSettings.blockchainFile, lock, settings.blockchainSettings.functionalitySettings, settings.featuresSettings, time) + ss <- createStateStorage(historyWriter, settings.blockchainSettings.stateFile, settings.mvstorePageSplitSize, time) + } yield ( + Coeval { + val stateWriter = new StateWriterImpl(ss, settings.blockchainSettings.storeTransactionsInState, lock) + val bcu = BlockchainUpdaterImpl(stateWriter, historyWriter, settings, time, lock) + val history: NgHistory with DebugNgHistory with FeatureProvider = bcu.historyReader + (history, history, stateWriter, bcu.bestLiquidState, bcu, bcu) + }, + Coeval { (historyWriter.debugInfo, ss.debugInfo) } + ) } } diff --git a/src/main/scala/com/wavesplatform/http/NodeApiRoute.scala b/src/main/scala/com/wavesplatform/http/NodeApiRoute.scala index 2ce8f138020..5b8f6f8782a 100644 --- a/src/main/scala/com/wavesplatform/http/NodeApiRoute.scala +++ b/src/main/scala/com/wavesplatform/http/NodeApiRoute.scala @@ -5,25 +5,19 @@ import javax.ws.rs.Path import akka.http.scaladsl.server.Route import com.wavesplatform.Shutdownable -import com.wavesplatform.network.lastObserved import com.wavesplatform.settings.{Constants, RestAPISettings} -import com.wavesplatform.state2.StateWriter +import com.wavesplatform.utils.HeightInfo import io.swagger.annotations._ import monix.eval.Coeval -import monix.execution.Scheduler.Implicits.global import play.api.libs.json.Json import scorex.api.http.{ApiRoute, CommonApiFunctions} -import scorex.transaction.{BlockchainUpdater, LastBlockInfo} import scorex.utils.ScorexLogging @Path("/node") @Api(value = "node") -case class NodeApiRoute(settings: RestAPISettings, blockchainUpdater: BlockchainUpdater, state: StateWriter, application: Shutdownable) +case class NodeApiRoute(settings: RestAPISettings, heights: Coeval[(HeightInfo, HeightInfo)], application: Shutdownable) extends ApiRoute with CommonApiFunctions with ScorexLogging { - private val lastHeight: Coeval[Option[LastBlockInfo]] = lastObserved(blockchainUpdater.lastBlockInfo) - private val lastState: Coeval[Option[StateWriter.Status]] = lastObserved(state.status) - override lazy val route = pathPrefix("node") { stop ~ status ~ version } @@ -48,8 +42,7 @@ case class NodeApiRoute(settings: RestAPISettings, blockchainUpdater: Blockchain @Path("/status") @ApiOperation(value = "Status", notes = "Get status of the running core", httpMethod = "GET") def status: Route = (get & path("status")) { - val (bcHeight, bcTime) = lastHeight().map { case LastBlockInfo(_, h, _, _, t) => (h, t) }.getOrElse((0, 0L)) - val (stHeight, stTime) = lastState().map { case StateWriter.Status(h, t) => (h, t) }.getOrElse((0, 0L)) + val ((bcHeight, bcTime), (stHeight, stTime)) = heights() val lastUpdated = bcTime max stTime complete(Json.obj( "blockchainHeight" -> bcHeight, diff --git a/src/main/scala/com/wavesplatform/state2/BlockchainUpdaterImpl.scala b/src/main/scala/com/wavesplatform/state2/BlockchainUpdaterImpl.scala index 7941fd4407d..482e6a685ab 100644 --- a/src/main/scala/com/wavesplatform/state2/BlockchainUpdaterImpl.scala +++ b/src/main/scala/com/wavesplatform/state2/BlockchainUpdaterImpl.scala @@ -66,10 +66,10 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with SnapshotStateRea // Store last block information in a cache historyReader.lastBlock.foreach { b => - internalLastBlockInfo.onNext(LastBlockInfo(b.uniqueId, historyReader.height(), historyReader.score(), blockchainReady, System.currentTimeMillis)) + internalLastBlockInfo.onNext(LastBlockInfo(b.uniqueId, historyReader.height(), historyReader.score(), blockchainReady)) } - private[wavesplatform] def syncPersistedAndInMemory(): Unit = write { implicit l => + private def syncPersistedAndInMemory(): Unit = write { implicit l => log.info(heights("State rebuild started")) val notPersisted = historyWriter.height() - persisted.height @@ -191,7 +191,7 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with SnapshotStateRea val height = historyWriter.height() + 1 ngState.set(Some(new NgState(block, newBlockDiff, featuresApprovedWithBlock(block)))) historyReader.lastBlockId().foreach(id => - internalLastBlockInfo.onNext(LastBlockInfo(id, historyReader.height(), historyReader.score(), blockchainReady, System.currentTimeMillis))) + internalLastBlockInfo.onNext(LastBlockInfo(id, historyReader.height(), historyReader.score(), blockchainReady))) log.info(s"$block appended. New height: $height)") discarded } @@ -243,7 +243,7 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with SnapshotStateRea val totalDiscardedBlocks: Seq[Block] = discardedHistoryBlocks ++ discardedNgBlock.toSeq if (totalDiscardedBlocks.nonEmpty) internalLastBlockInfo.onNext( - LastBlockInfo(blockId, historyReader.height(), historyReader.score(), blockchainReady, System.currentTimeMillis)) + LastBlockInfo(blockId, historyReader.height(), historyReader.score(), blockchainReady)) TxsInBlockchainStats.record(-totalDiscardedBlocks.size) Right(totalDiscardedBlocks) } @@ -272,9 +272,8 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with SnapshotStateRea historyWriter.lastBlock.map(_.timestamp), microBlock, ng.base.timestamp) } yield { log.info(s"$microBlock appended") - val now = System.currentTimeMillis - ng.append(microBlock, diff, now) - internalLastBlockInfo.onNext(LastBlockInfo(microBlock.totalResBlockSig, historyReader.height(), historyReader.score(), ready = true, now)) + ng.append(microBlock, diff, System.currentTimeMillis) + internalLastBlockInfo.onNext(LastBlockInfo(microBlock.totalResBlockSig, historyReader.height(), historyReader.score(), ready = true)) } } } @@ -303,8 +302,11 @@ object BlockchainUpdaterImpl extends ScorexLogging { history: HistoryWriterImpl, settings: WavesSettings, time: scorex.utils.Time, - synchronizationToken: ReentrantReadWriteLock): BlockchainUpdaterImpl = - new BlockchainUpdaterImpl(persistedState, settings, time, history, history, synchronizationToken) + synchronizationToken: ReentrantReadWriteLock): BlockchainUpdaterImpl = { + val blockchainUpdater = new BlockchainUpdaterImpl(persistedState, settings, time, history, history, synchronizationToken) + blockchainUpdater.syncPersistedAndInMemory() + blockchainUpdater + } def areVersionsOfSameBlock(b1: Block, b2: Block): Boolean = b1.signerData.generator == b2.signerData.generator && diff --git a/src/main/scala/com/wavesplatform/state2/StateStorage.scala b/src/main/scala/com/wavesplatform/state2/StateStorage.scala index 1d8bdc46804..af363b0df49 100644 --- a/src/main/scala/com/wavesplatform/state2/StateStorage.scala +++ b/src/main/scala/com/wavesplatform/state2/StateStorage.scala @@ -7,11 +7,11 @@ import com.wavesplatform.utils._ import org.h2.mvstore.`type`.ObjectDataType import org.h2.mvstore.MVMap import scorex.account.Address -import scorex.utils.LogMVMapBuilder +import scorex.utils.{LogMVMapBuilder, NTP, Time} import scala.util.Try -class StateStorage private(file: Option[File]) extends VariablesStorage(createMVStore(file)) with VersionableStorage with AutoCloseable { +class StateStorage private(file: Option[File], time: Time) extends VariablesStorage(createMVStore(file)) with VersionableStorage with AutoCloseable { import StateStorage._ @@ -19,7 +19,10 @@ class StateStorage private(file: Option[File]) extends VariablesStorage(createMV def getHeight: Int = getInt(heightKey).getOrElse(0) - def setHeight(i: Int): Unit = putInt(heightKey, i) + def setHeight(i: Int): Unit = { + putInt(heightKey, i) + heightTimestamp = time.getTimestamp() + } val transactions: MVMap[ByteStr, (Int, Array[Byte])] = db.openMap("txs", new LogMVMapBuilder[ByteStr, (Int, Array[Byte])] .keyType(DataTypes.byteStr).valueType(DataTypes.tupleIntByteArray)) @@ -66,6 +69,10 @@ class StateStorage private(file: Option[File]) extends VariablesStorage(createMV } override def close(): Unit = db.close() + + private var heightTimestamp: Long = time.getTimestamp() + + def debugInfo: HeightInfo = (getHeight, heightTimestamp) } object StateStorage { @@ -74,8 +81,8 @@ object StateStorage { private val heightKey = "height" - def apply(file: Option[File], dropExisting: Boolean): Try[StateStorage] = - createWithStore(file, new StateStorage(file), deleteExisting = dropExisting) + def apply(file: Option[File], dropExisting: Boolean, time: Time = NTP): Try[StateStorage] = + createWithStore(file, new StateStorage(file, time), deleteExisting = dropExisting) type AccountIdxKey = Array[Byte] diff --git a/src/main/scala/com/wavesplatform/state2/StateWriter.scala b/src/main/scala/com/wavesplatform/state2/StateWriter.scala index ae31a747d75..be737cbc10c 100644 --- a/src/main/scala/com/wavesplatform/state2/StateWriter.scala +++ b/src/main/scala/com/wavesplatform/state2/StateWriter.scala @@ -5,26 +5,16 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import cats.Monoid import cats.implicits._ import com.wavesplatform.metrics.Instrumented -import com.wavesplatform.state2.StateWriter.Status import com.wavesplatform.state2.reader.StateReaderImpl -import monix.reactive.Observable -import monix.reactive.subjects.BehaviorSubject import scorex.transaction.PaymentTransaction import scorex.transaction.assets.TransferTransaction import scorex.transaction.assets.exchange.ExchangeTransaction import scorex.utils.ScorexLogging -import scorex.utils.Synchronized.ReadLock trait StateWriter { def applyBlockDiff(blockDiff: BlockDiff): Unit def clear(): Unit - - def status: Observable[Status] -} - -object StateWriter { - case class Status(height: Int, lastUpdated: Long) } class StateWriterImpl(p: StateStorage, storeTransactions: Boolean, synchronizationToken: ReentrantReadWriteLock) @@ -32,10 +22,6 @@ class StateWriterImpl(p: StateStorage, storeTransactions: Boolean, synchronizati import StateStorage._ - override val status = read { implicit l => - BehaviorSubject(Status(sp().getHeight, System.currentTimeMillis)) - } - override def close(): Unit = p.close() override def applyBlockDiff(blockDiff: BlockDiff): Unit = write { implicit l => @@ -120,7 +106,7 @@ class StateWriterImpl(p: StateStorage, storeTransactions: Boolean, synchronizati measureSizeLog("lease info")(blockDiff.txsDiff.leaseState)( _.foreach { case (id, isActive) => sp().leaseState.put(id, isActive) }) - setHeight(newHeight) + sp().setHeight(newHeight) val nextChunkOfBlocks = !sameQuotient(newHeight, oldHeight, 1000) sp().commit(nextChunkOfBlocks) @@ -140,12 +126,7 @@ class StateWriterImpl(p: StateStorage, storeTransactions: Boolean, synchronizati sp().aliasToAddress.clear() sp().leaseState.clear() sp().lastBalanceSnapshotHeight.clear() - setHeight(0) + sp().setHeight(0) sp().commit(compact = true) } - - private def setHeight(newHeight: Int)(implicit lock: ReadLock) = { - sp().setHeight(newHeight) - status.onNext(Status(newHeight, System.currentTimeMillis)) - } } diff --git a/src/main/scala/com/wavesplatform/utils/package.scala b/src/main/scala/com/wavesplatform/utils/package.scala index bd0e13bf2d2..63d9037aba4 100644 --- a/src/main/scala/com/wavesplatform/utils/package.scala +++ b/src/main/scala/com/wavesplatform/utils/package.scala @@ -13,6 +13,8 @@ import scala.util.Try package object utils extends ScorexLogging { + type HeightInfo = (Int, Long) + private val DefaultPageSplitSize = 4 * 1024 def base58Length(byteArrayLength: Int): Int = math.ceil(math.log(256) / math.log(58) * byteArrayLength).toInt diff --git a/src/main/scala/scorex/transaction/BlockchainUpdater.scala b/src/main/scala/scorex/transaction/BlockchainUpdater.scala index b215dadf4bc..4fef37b6d4c 100644 --- a/src/main/scala/scorex/transaction/BlockchainUpdater.scala +++ b/src/main/scala/scorex/transaction/BlockchainUpdater.scala @@ -25,7 +25,7 @@ trait BlockchainDebugInfo { } -case class LastBlockInfo(id: BlockId, height: Int, score: BlockchainScore, ready: Boolean, lastUpdated: Long) +case class LastBlockInfo(id: BlockId, height: Int, score: BlockchainScore, ready: Boolean) case class HashInfo(height: Int, hash: Int) diff --git a/src/main/scala/scorex/transaction/History.scala b/src/main/scala/scorex/transaction/History.scala index b1c8d40dc16..04f85579caa 100755 --- a/src/main/scala/scorex/transaction/History.scala +++ b/src/main/scala/scorex/transaction/History.scala @@ -2,6 +2,7 @@ package scorex.transaction import com.wavesplatform.network.{BlockCheckpoint, Checkpoint} import com.wavesplatform.state2.ByteStr +import com.wavesplatform.utils.HeightInfo import scorex.block.Block.BlockId import scorex.block.{Block, BlockHeader, MicroBlock} import scorex.consensus.nxt.NxtLikeConsensusBlockData @@ -29,6 +30,8 @@ trait History extends Synchronized with AutoCloseable { def lastBlockTimestamp(): Option[Long] def lastBlockId(): Option[ByteStr] + + def debugInfo: HeightInfo } trait NgHistory extends History { diff --git a/src/main/scala/scorex/transaction/NgHistoryReader.scala b/src/main/scala/scorex/transaction/NgHistoryReader.scala index 53eefd60c7a..36d4f256e8d 100644 --- a/src/main/scala/scorex/transaction/NgHistoryReader.scala +++ b/src/main/scala/scorex/transaction/NgHistoryReader.scala @@ -9,6 +9,7 @@ import scorex.block.Block.BlockId import scorex.block.{Block, BlockHeader, MicroBlock} import scorex.transaction.History.{BlockMinerInfo, BlockchainScore} import cats.implicits._ +import com.wavesplatform.utils.HeightInfo class NgHistoryReader(ngState: () => Option[NgState], inner: History with FeatureProvider, settings: FunctionalitySettings) extends History with NgHistory with DebugNgHistory with FeatureProvider { @@ -102,4 +103,6 @@ class NgHistoryReader(ngState: () => Option[NgState], inner: History with Featur else inner.blockHeaderAndSizeAt(height) } + + override def debugInfo: HeightInfo = inner.debugInfo } diff --git a/src/test/scala/com/wavesplatform/UtxPoolSpecification.scala b/src/test/scala/com/wavesplatform/UtxPoolSpecification.scala index 788e39f85e5..d40f89c6e05 100755 --- a/src/test/scala/com/wavesplatform/UtxPoolSpecification.scala +++ b/src/test/scala/com/wavesplatform/UtxPoolSpecification.scala @@ -40,7 +40,8 @@ class UtxPoolSpecification extends FreeSpec val genesisSettings = TestHelpers.genesisSettings(Map(senderAccount -> senderBalance)) val settings = WavesSettings.fromConfig(config).copy(blockchainSettings = BlockchainSettings(None, None, false, None, 'T', 5, 5, FunctionalitySettings.TESTNET, genesisSettings)) - val (history, _, _, state, bcu, _, _) = StorageFactory(settings).get + val (storage, _) = StorageFactory(settings).get + val (history, _, _, state, bcu, _) = storage() bcu.processBlock(Block.genesis(genesisSettings).right.get) diff --git a/src/test/scala/com/wavesplatform/history/package.scala b/src/test/scala/com/wavesplatform/history/package.scala index 6c64416219c..0ab53f7be10 100644 --- a/src/test/scala/com/wavesplatform/history/package.scala +++ b/src/test/scala/com/wavesplatform/history/package.scala @@ -38,7 +38,8 @@ package object history { val DefaultWavesSettings: WavesSettings = settings.copy(blockchainSettings = DefaultBlockchainSettings) def domain(settings: WavesSettings): Domain = { - val (history, _, _, stateReader, blockchainUpdater, _, _) = StorageFactory(settings).get + val (storage, _) = StorageFactory(settings).get + val (history, _, _, stateReader, blockchainUpdater, _) = storage() Domain(history, stateReader, blockchainUpdater) } diff --git a/src/test/scala/com/wavesplatform/network/TestHistory.scala b/src/test/scala/com/wavesplatform/network/TestHistory.scala index 10947e1110f..57f7db903be 100644 --- a/src/test/scala/com/wavesplatform/network/TestHistory.scala +++ b/src/test/scala/com/wavesplatform/network/TestHistory.scala @@ -3,6 +3,7 @@ package com.wavesplatform.network import java.util.concurrent.locks.ReentrantReadWriteLock import com.wavesplatform.state2.ByteStr +import com.wavesplatform.utils.HeightInfo import scorex.block.Block.BlockId import scorex.block.{Block, BlockHeader, MicroBlock} import scorex.transaction.History.BlockchainScore @@ -40,4 +41,6 @@ class TestHistory extends NgHistory { override def synchronizationToken: ReentrantReadWriteLock = ??? override def close(): Unit = ??? + + override def debugInfo: HeightInfo = ??? } \ No newline at end of file diff --git a/src/test/scala/scorex/transaction/BlockchainUpdaterTest.scala b/src/test/scala/scorex/transaction/BlockchainUpdaterTest.scala index 96ded94c038..f698fcb5a88 100644 --- a/src/test/scala/scorex/transaction/BlockchainUpdaterTest.scala +++ b/src/test/scala/scorex/transaction/BlockchainUpdaterTest.scala @@ -28,10 +28,13 @@ class BlockchainUpdaterTest extends FunSuite with Matchers with HistoryTest with featuresSettings = DefaultWavesSettings.featuresSettings.copy(autoShutdownOnUnsupportedFeature = true) ) - private def storageFactory() = StorageFactory(WavesSettings).get + private def storageFactory() = { + val (storage, _) = StorageFactory(WavesSettings).get + storage() + } ignore ("concurrent access to lastBlock doesn't throw any exception") { - val (h, fp, _, _, bu, _, _) = storageFactory() + val (h, fp, _, _, bu, _) = storageFactory() bu.processBlock(genesisBlock) @@ -57,7 +60,7 @@ class BlockchainUpdaterTest extends FunSuite with Matchers with HistoryTest with test("features approved and accepted as height grows") { - val (h, fp, _, _, bu, _, _) = storageFactory() + val (h, fp, _, _, bu, _) = storageFactory() bu.processBlock(genesisBlock) @@ -94,7 +97,7 @@ class BlockchainUpdaterTest extends FunSuite with Matchers with HistoryTest with } test("features rollback with block rollback") { - val (h, fp, _, _, bu, _, _) = storageFactory() + val (h, fp, _, _, bu, _) = storageFactory() bu.processBlock(genesisBlock) @@ -149,7 +152,7 @@ class BlockchainUpdaterTest extends FunSuite with Matchers with HistoryTest with } test("feature activation height is not overrided with further periods") { - val (h, fp, _, _, bu, _, _) = storageFactory() + val (h, fp, _, _, bu, _) = storageFactory() bu.processBlock(genesisBlock) @@ -171,7 +174,7 @@ class BlockchainUpdaterTest extends FunSuite with Matchers with HistoryTest with } test("feature activated only by 90% of blocks") { - val (h, fp, _, _, bu, _, _) = storageFactory() + val (h, fp, _, _, bu, _) = storageFactory() bu.processBlock(genesisBlock) @@ -194,7 +197,7 @@ class BlockchainUpdaterTest extends FunSuite with Matchers with HistoryTest with } test("features votes resets when voting window changes") { - val (h, fp, _, _, bu, _, _) = storageFactory() + val (h, fp, _, _, bu, _) = storageFactory() bu.processBlock(genesisBlock) @@ -234,7 +237,7 @@ class BlockchainUpdaterTest extends FunSuite with Matchers with HistoryTest with }) - val (h, fp, _, _, bu, _, _) = storageFactory() + val (h, fp, _, _, bu, _) = storageFactory() bu.processBlock(genesisBlock) (1 to ApprovalPeriod * 2).foreach { i => @@ -249,7 +252,7 @@ class BlockchainUpdaterTest extends FunSuite with Matchers with HistoryTest with } test("sunny day test when known feature activated") { - val (h, fp, _, _, bu, _, _) = storageFactory() + val (h, fp, _, _, bu, _) = storageFactory() bu.processBlock(genesisBlock) (1 until ApprovalPeriod * 2 - 1).foreach { i => From 6544b1ec6f293dc10f04ba8225bad278fca22984 Mon Sep 17 00:00:00 2001 From: Alexey Kiselev Date: Fri, 12 Jan 2018 15:49:14 +0300 Subject: [PATCH 5/5] Exceptions for two valid blocks were added --- .../wavesplatform/state2/appender/package.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/wavesplatform/state2/appender/package.scala b/src/main/scala/com/wavesplatform/state2/appender/package.scala index 3dd2dbd9187..f33dce51b5b 100644 --- a/src/main/scala/com/wavesplatform/state2/appender/package.scala +++ b/src/main/scala/com/wavesplatform/state2/appender/package.scala @@ -11,11 +11,10 @@ import io.netty.channel.group.ChannelGroup import monix.eval.Task import scorex.block.Block import scorex.consensus.TransactionsOrdering -import scorex.transaction.PoSCalc.{calcBaseTarget, calcGeneratorSignature, calcHit, calcTarget} -import scorex.transaction._ +import scorex.transaction.PoSCalc.{calcBaseTarget, calcGeneratorSignature, calcHit, calcTarget, _} import scorex.transaction.ValidationError.{BlockFromFuture, GenericError} +import scorex.transaction._ import scorex.utils.{ScorexLogging, Time} -import scorex.transaction.PoSCalc._ import scala.util.{Left, Right} @@ -23,11 +22,16 @@ package object appender extends ScorexLogging { private val MaxTimeDrift: Long = 100 // millis + private val correctBlockId1 = ByteStr.decodeBase58("2GNCYVy7k3kEPXzz12saMtRDeXFKr8cymVsG8Yxx3sZZ75eHj9csfXnGHuuJe7XawbcwjKdifUrV1uMq4ZNCWPf1").get + private val correctBlockId2 = ByteStr.decodeBase58("5uZoDnRKeWZV9Thu2nvJVZ5dBvPB7k2gvpzFD618FMXCbBVBMN2rRyvKBZBhAGnGdgeh2LXEeSr9bJqruJxngsE7").get + private val height1 = 812608 + private val height2 = 813207 + private[appender] val scheduler = monix.execution.Scheduler.singleThread("appender") private[appender] def processAndBlacklistOnFailure[A, B](ch: Channel, peerDatabase: PeerDatabase, miner: Miner, allChannels: ChannelGroup, start: => String, success: => String, errorPrefix: String)( - f: => Task[Either[B, Option[BigInt]]]): Task[Either[B, Option[BigInt]]] = { + f: => Task[Either[B, Option[BigInt]]]): Task[Either[B, Option[BigInt]]] = { log.debug(start) f map { @@ -96,7 +100,8 @@ package object appender extends ScorexLogging { effectiveBalance <- genBalance(height).left.map(GenericError(_)) hit = calcHit(prevBlockData, generator) target = calcTarget(parent.timestamp, parent.consensusData.baseTarget, blockTime, effectiveBalance) - _ <- Either.cond(hit < target, (), GenericError(s"calculated hit $hit >= calculated target $target")) + _ <- Either.cond(hit < target || (height == height1 && block.uniqueId == correctBlockId1) || (height == height2 && block.uniqueId == correctBlockId2), + (), GenericError(s"calculated hit $hit >= calculated target $target")) } yield () r.left.map {