diff --git a/src/main/scala/com/wavesplatform/UtxPool.scala b/src/main/scala/com/wavesplatform/UtxPool.scala index b89079a6505..f0a881106b7 100644 --- a/src/main/scala/com/wavesplatform/UtxPool.scala +++ b/src/main/scala/com/wavesplatform/UtxPool.scala @@ -157,9 +157,9 @@ class UtxPoolImpl(time: Time, .values.asScala.toSeq .sorted(TransactionsOrdering.InUTXPool) .foldLeft((Seq.empty[ByteStr], Seq.empty[Transaction], Monoid[Diff].empty)) { - case ((invalid, valid, diff), tx) if valid.size <= max => + case ((invalid, valid, diff), tx) if valid.lengthCompare(max) <= 0 => differ(composite(diff.asBlockDiff, s), tx) match { - case Right(newDiff) if valid.size < max => + case Right(newDiff) if valid.lengthCompare(max) < 0 => (invalid, tx +: valid, Monoid.combine(diff, newDiff)) case Right(_) => (invalid, valid, diff) diff --git a/src/main/scala/com/wavesplatform/mining/Miner.scala b/src/main/scala/com/wavesplatform/mining/Miner.scala index 2892393f440..fa863a58a7a 100644 --- a/src/main/scala/com/wavesplatform/mining/Miner.scala +++ b/src/main/scala/com/wavesplatform/mining/Miner.scala @@ -124,25 +124,28 @@ class MinerImpl( }.delayExecution(delay) - private def generateOneMicroBlockTask(account: PrivateKeyAccount, accumulatedBlock: Block): Task[Either[ValidationError, Option[Block]]] = Task { - log.trace(s"Generating microblock for $account") + private def generateOneMicroBlockTask(account: PrivateKeyAccount, accumulatedBlock: Block): Task[MicroblockMiningResult] = { + log.trace(s"Generating microBlock for $account") val pc = allChannels.size() + val accumulatedCount = accumulatedBlock.transactionCount lazy val unconfirmed = measureLog("packing unconfirmed transactions for microblock") { - utx.packUnconfirmed(settings.minerSettings.maxTransactionsInMicroBlock, sortInBlock = false) + val maxTxsForMicroblock = Math.min(MaxTransactionsPerBlockVer3 - accumulatedCount, settings.minerSettings.maxTransactionsInMicroBlock) + utx.packUnconfirmed(maxTxsForMicroblock, sortInBlock = false) } - if (pc < minerSettings.quorum) { + if (accumulatedCount == MaxTransactionsPerBlockVer3) { + log.trace(s"Stopping forging microBlocks, block is already full") + Task.now(Stop) + } else if (pc < minerSettings.quorum) { log.trace(s"Quorum not available ($pc/${minerSettings.quorum}, not forging microblock with ${account.address}") - Task.now(Right(None)) - } - else if (unconfirmed.isEmpty) { - log.trace("skipping microBlock because no txs in utx pool") - Task.now(Right(None)) - } - else { + Task.now(Retry) + } else if (unconfirmed.isEmpty) { + log.trace(s"Skipping microBlock because utx is empty") + Task.now(Retry) + } else { log.trace(s"Accumulated ${unconfirmed.size} txs for microblock") val start = System.currentTimeMillis() (for { - signedBlock <- EitherT(Task.now(Block.buildAndSign( + signedBlock <- EitherT.fromEither[Task](Block.buildAndSign( version = 3, timestamp = accumulatedBlock.timestamp, reference = accumulatedBlock.reference, @@ -150,32 +153,35 @@ class MinerImpl( transactionData = accumulatedBlock.transactionData ++ unconfirmed, signer = account, featureVotes = accumulatedBlock.featureVotes - ))) - microBlock <- EitherT(Task.now(MicroBlock.buildAndSign(account, unconfirmed, accumulatedBlock.signerData.signature, signedBlock.signerData.signature))) + )) + microBlock <- EitherT.fromEither[Task](MicroBlock.buildAndSign(account, unconfirmed, accumulatedBlock.signerData.signature, signedBlock.signerData.signature)) _ = microBlockBuildTimeStats.safeRecord(System.currentTimeMillis() - start) _ <- EitherT(MicroblockAppender(checkpoint, history, blockchainUpdater, utx)(microBlock)) - } yield { - BlockStats.mined(microBlock) - log.trace(s"$microBlock has been mined for $account}") - allChannels.broadcast(MicroBlockInv(account, microBlock.totalResBlockSig, microBlock.prevResBlockSig)) - Some(signedBlock) - }).value.map { - _.left.map { err => - log.trace(s"MicroBlock has NOT been mined for $account} because $err") - err - } + } yield (microBlock, signedBlock)).value map { + case Left(err) => + Error(err) + case Right((microBlock, signedBlock)) => + BlockStats.mined(microBlock) + log.trace(s"$microBlock has been mined for $account}") + allChannels.broadcast(MicroBlockInv(account, microBlock.totalResBlockSig, microBlock.prevResBlockSig)) + Success(signedBlock) } } - }.flatten + } private def generateMicroBlockSequence(account: PrivateKeyAccount, accumulatedBlock: Block, delay: FiniteDuration): Task[Unit] = { debugState = MinerDebugInfo.MiningMicroblocks generateOneMicroBlockTask(account, accumulatedBlock).delayExecution(delay).flatMap { - case Left(err) => Task { + case Error(e) => Task { + debugState = MinerDebugInfo.Error(e.toString) + log.warn("Error mining MicroBlock: " + e.toString) + } + case Success(newTotal) => generateMicroBlockSequence(account, newTotal, minerSettings.microBlockInterval) + case Retry => generateMicroBlockSequence(account, accumulatedBlock, minerSettings.microBlockInterval) + case Stop => Task { debugState = MinerDebugInfo.MiningBlocks - log.warn("Error mining MicroBlock: " + err.toString) + log.debug("MicroBlock mining completed, block is full") } - case Right(maybeNewTotal) => generateMicroBlockSequence(account, maybeNewTotal.getOrElse(accumulatedBlock), minerSettings.microBlockInterval) } } @@ -254,4 +260,11 @@ object Miner { val calculatedOffset = calculatedGenerationTimestamp - timeService.correctedTime() Math.max(minimalBlockGenerationOffset.toMillis, calculatedOffset).millis } + + sealed trait MicroblockMiningResult + case object Stop extends MicroblockMiningResult + case object Retry extends MicroblockMiningResult + case class Error(e: ValidationError) extends MicroblockMiningResult + case class Success(b: Block) extends MicroblockMiningResult + }