Skip to content

Commit

Permalink
Merge pull request #624 from wavesplatform/node-301-dont-rebroadcast-…
Browse files Browse the repository at this point in the history
…if-not-applied-2

NODE-301: Don't rebroadcast block if it has not been applied + Simplify processSingleBlock
  • Loading branch information
alexeykiselev authored Oct 23, 2017
2 parents e35bf5a + 607def0 commit 8fd9cc5
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 45 deletions.
51 changes: 17 additions & 34 deletions src/main/scala/com/wavesplatform/Coordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,11 @@ object Coordinator extends ScorexLogging with Instrumented {
extension.zipWithIndex.forall(p => checkpoint.isBlockValid(p._1.signerData.signature, lastCommonHeight + 1 + p._2))

lazy val forkApplicationResultEi: Either[ValidationError, BigInt] = {
val firstDeclined = extension.view
.map { b =>
b -> appendBlock(
checkpoint, history, blockchainUpdater, stateReader, utxStorage, time, settings.blockchainSettings,
featureProvider
)(b, local = false).right.map { baseHeight =>
BlockStats.applied(b, BlockStats.Source.Ext, baseHeight)
}
val firstDeclined = extension.view.map { b =>
b -> appendBlock(checkpoint, history, blockchainUpdater, stateReader, utxStorage, time, settings.blockchainSettings, featureProvider)(b).right.map {
_.foreach(bh => BlockStats.applied(b, BlockStats.Source.Ext, bh))
}
}
.zipWithIndex
.collectFirst { case ((b, Left(e)), i) => (i, b, e) }

Expand Down Expand Up @@ -103,32 +99,20 @@ object Coordinator extends ScorexLogging with Instrumented {
}


private def updateBlockchainReadinessFlag(history: History, time: Time, blockchainReadiness: AtomicBoolean, maxBlockchainAge: FiniteDuration): Boolean = {
def updateBlockchainReadinessFlag(history: History, time: Time, blockchainReadiness: AtomicBoolean, maxBlockchainAge: FiniteDuration): Boolean = {
val expired = time.correctedTime() - history.lastBlockTimestamp().get < maxBlockchainAge.toMillis
blockchainReadiness.compareAndSet(expired, !expired)
}

def processSingleBlock(checkpoint: CheckpointService, history: History, blockchainUpdater: BlockchainUpdater, time: Time,
stateReader: StateReader, utxStorage: UtxPool, blockchainReadiness: AtomicBoolean,
settings: WavesSettings, featureProvider: FeatureProvider)
(newBlock: Block, local: Boolean): Either[ValidationError, Option[BigInt]] = measureSuccessful(blockProcessingTimeStats, history.write { implicit l =>
if (history.contains(newBlock))
Right(None)
else {
val newScore = for {
_ <- Either.cond(history.heightOf(newBlock.reference).exists(_ >= history.height() - 1), (), GenericError("Can process either new top block or current top block's competitor"))
baseHeight <- appendBlock(checkpoint, history, blockchainUpdater, stateReader, utxStorage, time, settings.blockchainSettings, featureProvider)(newBlock, local)
} yield {
if (local) BlockStats.mined(newBlock, baseHeight)
else BlockStats.applied(newBlock, BlockStats.Source.Broadcast, baseHeight)
history.score()
}

if (local || newScore.isRight) {
updateBlockchainReadinessFlag(history, time, blockchainReadiness, settings.minerSettings.intervalAfterLastBlockThenGenerationIsAllowed)
}
newScore.right.map(Some(_))
}
stateReader: StateReader, utxStorage: UtxPool,
settings: BlockchainSettings, featureProvider: FeatureProvider)
(newBlock: Block): Either[ValidationError, Option[BigInt]] = measureSuccessful(blockProcessingTimeStats, history.write { implicit l =>
if (history.contains(newBlock)) Right(None)
else for {
_ <- Either.cond(history.heightOf(newBlock.reference).exists(_ >= history.height() - 1), (), GenericError("Can process either new top block or current top block's competitor"))
maybeBaseHeight <- appendBlock(checkpoint, history, blockchainUpdater, stateReader, utxStorage, time, settings, featureProvider)(newBlock)
} yield maybeBaseHeight map (_ => history.score())
})

def processMicroBlock(checkpoint: CheckpointService, history: History, blockchainUpdater: BlockchainUpdater, utxStorage: UtxPool)
Expand All @@ -147,20 +131,19 @@ object Coordinator extends ScorexLogging with Instrumented {

private def appendBlock(checkpoint: CheckpointService, history: History, blockchainUpdater: BlockchainUpdater,
stateReader: StateReader, utxStorage: UtxPool, time: Time, settings: BlockchainSettings,
featureProvider: FeatureProvider)
(block: Block, local: Boolean): Either[ValidationError, Int] = for {
featureProvider: FeatureProvider)(block: Block): Either[ValidationError, Option[Int]] = for {
_ <- Either.cond(checkpoint.isBlockValid(block.signerData.signature, history.height() + 1), (),
GenericError(s"Block $block at height ${history.height() + 1} is not valid w.r.t. checkpoint"))
_ <- blockConsensusValidation(history, featureProvider, settings, time.correctedTime(), block) { height =>
PoSCalc.generatingBalance(stateReader, settings.functionalitySettings, block.signerData.generator, height).toEither.left.map(_.toString)
.flatMap(validateEffectiveBalance(featureProvider, settings.functionalitySettings, block, height))
}
baseHeight = history.height()
discardedTxs <- blockchainUpdater.processBlock(block)
maybeDiscardedTxs <- blockchainUpdater.processBlock(block)
} yield {
utxStorage.removeAll(block.transactionData)
discardedTxs.foreach(utxStorage.putIfNew)
baseHeight
maybeDiscardedTxs.toSeq.flatten.foreach(utxStorage.putIfNew)
maybeDiscardedTxs.map(_ => baseHeight)
}

def processCheckpoint(checkpoint: CheckpointService, history: History, blockchainUpdater: BlockchainUpdater)
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/wavesplatform/metrics/BlockStats.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ object BlockStats {
Seq.empty
)

def applied(b: Block, source: Source, baseHeight: Int): Unit = write(
def applied(b: Block, source: Source, newHeight: Int): Unit = write(
block(b, source)
.addField("txs", b.transactionData.size)
.addField("height", baseHeight),
.addField("height", newHeight),
Event.Applied,
Seq.empty
)
Expand Down
6 changes: 4 additions & 2 deletions src/main/scala/com/wavesplatform/mining/Miner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class MinerImpl(
private lazy val minerSettings = settings.minerSettings
private lazy val minMicroBlockDurationMills = minerSettings.minMicroBlockAge.toMillis
private lazy val blockchainSettings = settings.blockchainSettings
private lazy val processBlock = Coordinator.processSingleBlock(checkpoint, history, blockchainUpdater, timeService, stateReader, utx, blockchainReadiness, settings, featureProvider) _
private lazy val processBlock = Coordinator.processSingleBlock(checkpoint, history, blockchainUpdater, timeService, stateReader, utx, settings.blockchainSettings, featureProvider) _

private val scheduledAttempts = SerialCancelable()
private val microBlockAttempt = SerialCancelable()
Expand Down Expand Up @@ -183,9 +183,11 @@ class MinerImpl(
nextBlockGenerationTimes += account.toAddress -> (System.currentTimeMillis() + offset.toMillis)
generateOneBlockTask(account, balance)(offset).flatMap {
case Right(block) => Task.now {
processBlock(block, true) match {
processBlock(block) match {
case Left(err) => log.warn("Error mining Block: " + err.toString)
case Right(Some(score)) =>
BlockStats.mined(block, history.height())
Coordinator.updateBlockchainReadinessFlag(history, timeService, blockchainReadiness, settings.minerSettings.intervalAfterLastBlockThenGenerationIsAllowed)
allChannels.broadcast(BlockForged(block))
allChannels.broadcast(LocalScoreChanged(score))
scheduleMining()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class CoordinatorHandler(checkpointService: CheckpointService,

private val processCheckpoint = Coordinator.processCheckpoint(checkpointService, history, blockchainUpdater) _
private val processFork = Coordinator.processFork(checkpointService, history, blockchainUpdater, stateReader, utxStorage, time, settings, blockchainReadiness, featureProvider) _
private val processBlock = Coordinator.processSingleBlock(checkpointService, history, blockchainUpdater, time, stateReader, utxStorage, blockchainReadiness, settings, featureProvider) _
private val processBlock = Coordinator.processSingleBlock(checkpointService, history, blockchainUpdater, time, stateReader, utxStorage, settings.blockchainSettings, featureProvider) _
private val processMicroBlock = Coordinator.processMicroBlock(checkpointService, history, blockchainUpdater, utxStorage) _

private def scheduleMiningAndBroadcastScore(score: BigInt): Unit = {
Expand Down Expand Up @@ -81,11 +81,13 @@ class CoordinatorHandler(checkpointService: CheckpointService,
case b: Block => (Task {
BlockStats.received(b, BlockStats.Source.Broadcast, ctx)
CoordinatorHandler.blockReceivingLag.safeRecord(System.currentTimeMillis() - b.timestamp)
Signed.validateSignatures(b).flatMap(b => processBlock(b, false))
Signed.validateSignatures(b).flatMap(b => processBlock(b))
} map {
case Right(None) =>
log.trace(s"$b already appended")
case Right(Some(newScore)) =>
BlockStats.applied(b, BlockStats.Source.Broadcast, history.height())
Coordinator.updateBlockchainReadinessFlag(history, time, blockchainReadiness, settings.minerSettings.intervalAfterLastBlockThenGenerationIsAllowed)
log.debug(s"Appended $b")
if (b.transactionData.isEmpty)
allChannels.broadcast(BlockForged(b), Some(ctx.channel()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with StateReader,
else Set.empty
}

override def processBlock(block: Block): Either[ValidationError, DiscardedTransactions] = write { implicit l =>
override def processBlock(block: Block): Either[ValidationError, Option[DiscardedTransactions]] = write { implicit l =>
if (topMemoryDiff().heightDiff >= minimumInMemoryDiffSize) {
persisted.applyBlockDiff(bottomMemoryDiff())
bottomMemoryDiff.set(topMemoryDiff())
Expand Down Expand Up @@ -175,14 +175,13 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with StateReader,
log.error(errorText)
Left(BlockAppendError(errorText, block))
}
}).map {
case Some((newBlockDiff, discacrded)) =>
}).map { _ map { case ((newBlockDiff, discacrded)) =>
val height = historyWriter.height() + 1
ngState.set(Some(NgState(block, newBlockDiff, 0L, featuresApprovedWithBlock(block))))
historyReader.lastBlockId().foreach(lastBlockId.onNext)
log.info(s"$block appended. New height: $height)")
discacrded
case None => Seq.empty
}
})
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/scorex/transaction/BlockchainUpdater.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import scorex.utils.Synchronized

trait BlockchainUpdater extends Synchronized {

def processBlock(block: Block): Either[ValidationError, DiscardedTransactions]
def processBlock(block: Block): Either[ValidationError, Option[DiscardedTransactions]]

def processMicroBlock(microBlock: MicroBlock): Either[ValidationError, Unit]

Expand Down

0 comments on commit 8fd9cc5

Please sign in to comment.