Skip to content

Commit

Permalink
Merge pull request #779 from wavesplatform/NODE-449-miner-logs
Browse files Browse the repository at this point in the history
NODE-449: Node should not report a warning if mine microblocks when t…
  • Loading branch information
alexeykiselev authored Jan 12, 2018
2 parents d95dac4 + b72299d commit dccf0db
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 30 deletions.
4 changes: 2 additions & 2 deletions src/main/scala/com/wavesplatform/UtxPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ class UtxPoolImpl(time: Time,
.values.asScala.toSeq
.sorted(TransactionsOrdering.InUTXPool)
.foldLeft((Seq.empty[ByteStr], Seq.empty[Transaction], Monoid[Diff].empty)) {
case ((invalid, valid, diff), tx) if valid.size <= max =>
case ((invalid, valid, diff), tx) if valid.lengthCompare(max) <= 0 =>
differ(composite(diff.asBlockDiff, s), tx) match {
case Right(newDiff) if valid.size < max =>
case Right(newDiff) if valid.lengthCompare(max) < 0 =>
(invalid, tx +: valid, Monoid.combine(diff, newDiff))
case Right(_) =>
(invalid, valid, diff)
Expand Down
69 changes: 41 additions & 28 deletions src/main/scala/com/wavesplatform/mining/Miner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,58 +124,64 @@ class MinerImpl(
}.delayExecution(delay)


private def generateOneMicroBlockTask(account: PrivateKeyAccount, accumulatedBlock: Block): Task[Either[ValidationError, Option[Block]]] = Task {
log.trace(s"Generating microblock for $account")
private def generateOneMicroBlockTask(account: PrivateKeyAccount, accumulatedBlock: Block): Task[MicroblockMiningResult] = {
log.trace(s"Generating microBlock for $account")
val pc = allChannels.size()
val accumulatedCount = accumulatedBlock.transactionCount
lazy val unconfirmed = measureLog("packing unconfirmed transactions for microblock") {
utx.packUnconfirmed(settings.minerSettings.maxTransactionsInMicroBlock, sortInBlock = false)
val maxTxsForMicroblock = Math.min(MaxTransactionsPerBlockVer3 - accumulatedCount, settings.minerSettings.maxTransactionsInMicroBlock)
utx.packUnconfirmed(maxTxsForMicroblock, sortInBlock = false)
}
if (pc < minerSettings.quorum) {
if (accumulatedCount == MaxTransactionsPerBlockVer3) {
log.trace(s"Stopping forging microBlocks, block is already full")
Task.now(Stop)
} else if (pc < minerSettings.quorum) {
log.trace(s"Quorum not available ($pc/${minerSettings.quorum}, not forging microblock with ${account.address}")
Task.now(Right(None))
}
else if (unconfirmed.isEmpty) {
log.trace("skipping microBlock because no txs in utx pool")
Task.now(Right(None))
}
else {
Task.now(Retry)
} else if (unconfirmed.isEmpty) {
log.trace(s"Skipping microBlock because utx is empty")
Task.now(Retry)
} else {
log.trace(s"Accumulated ${unconfirmed.size} txs for microblock")
val start = System.currentTimeMillis()
(for {
signedBlock <- EitherT(Task.now(Block.buildAndSign(
signedBlock <- EitherT.fromEither[Task](Block.buildAndSign(
version = 3,
timestamp = accumulatedBlock.timestamp,
reference = accumulatedBlock.reference,
consensusData = accumulatedBlock.consensusData,
transactionData = accumulatedBlock.transactionData ++ unconfirmed,
signer = account,
featureVotes = accumulatedBlock.featureVotes
)))
microBlock <- EitherT(Task.now(MicroBlock.buildAndSign(account, unconfirmed, accumulatedBlock.signerData.signature, signedBlock.signerData.signature)))
))
microBlock <- EitherT.fromEither[Task](MicroBlock.buildAndSign(account, unconfirmed, accumulatedBlock.signerData.signature, signedBlock.signerData.signature))
_ = microBlockBuildTimeStats.safeRecord(System.currentTimeMillis() - start)
_ <- EitherT(MicroblockAppender(checkpoint, history, blockchainUpdater, utx)(microBlock))
} yield {
BlockStats.mined(microBlock)
log.trace(s"$microBlock has been mined for $account}")
allChannels.broadcast(MicroBlockInv(account, microBlock.totalResBlockSig, microBlock.prevResBlockSig))
Some(signedBlock)
}).value.map {
_.left.map { err =>
log.trace(s"MicroBlock has NOT been mined for $account} because $err")
err
}
} yield (microBlock, signedBlock)).value map {
case Left(err) =>
Error(err)
case Right((microBlock, signedBlock)) =>
BlockStats.mined(microBlock)
log.trace(s"$microBlock has been mined for $account}")
allChannels.broadcast(MicroBlockInv(account, microBlock.totalResBlockSig, microBlock.prevResBlockSig))
Success(signedBlock)
}
}
}.flatten
}

private def generateMicroBlockSequence(account: PrivateKeyAccount, accumulatedBlock: Block, delay: FiniteDuration): Task[Unit] = {
debugState = MinerDebugInfo.MiningMicroblocks
generateOneMicroBlockTask(account, accumulatedBlock).delayExecution(delay).flatMap {
case Left(err) => Task {
case Error(e) => Task {
debugState = MinerDebugInfo.Error(e.toString)
log.warn("Error mining MicroBlock: " + e.toString)
}
case Success(newTotal) => generateMicroBlockSequence(account, newTotal, minerSettings.microBlockInterval)
case Retry => generateMicroBlockSequence(account, accumulatedBlock, minerSettings.microBlockInterval)
case Stop => Task {
debugState = MinerDebugInfo.MiningBlocks
log.warn("Error mining MicroBlock: " + err.toString)
log.debug("MicroBlock mining completed, block is full")
}
case Right(maybeNewTotal) => generateMicroBlockSequence(account, maybeNewTotal.getOrElse(accumulatedBlock), minerSettings.microBlockInterval)
}
}

Expand Down Expand Up @@ -254,4 +260,11 @@ object Miner {
val calculatedOffset = calculatedGenerationTimestamp - timeService.correctedTime()
Math.max(minimalBlockGenerationOffset.toMillis, calculatedOffset).millis
}

sealed trait MicroblockMiningResult
case object Stop extends MicroblockMiningResult
case object Retry extends MicroblockMiningResult
case class Error(e: ValidationError) extends MicroblockMiningResult
case class Success(b: Block) extends MicroblockMiningResult

}

0 comments on commit dccf0db

Please sign in to comment.