Skip to content

Commit

Permalink
Merge pull request #595 from wavesplatform/node-267-improve-resolutio…
Browse files Browse the repository at this point in the history
…n-of-forks

Node 267 improve resolution of forks
  • Loading branch information
alexeykiselev authored Oct 13, 2017
2 parents f2b18ce + ee9510a commit 00c59dd
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ trait IntegrationSuiteWithThreeAddresses extends FunSuite with BeforeAndAfterAll
txs <- makeTransfers

height <- traverse(allNodes)(_.height).map(_.max)
_ <- traverse(allNodes)(_.waitForHeight(height + 1))
_ <- traverse(allNodes)(_.waitForHeight(height + 2))

_ <- waitForTxsToReachAllNodes(txs)

_ <- Future.sequence(Seq(firstAddress, secondAddress, thirdAddress).map(address => assertBalances(address, defaultBalance, defaultBalance)))
} yield succeed

Await.result(correctStartBalancesFuture, 2.minutes)
Await.result(correctStartBalancesFuture, 5.minutes)
}
}
2 changes: 1 addition & 1 deletion src/it/scala/com/wavesplatform/it/MatcherTestSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ class MatcherTestSuite extends FreeSpec with Matchers with BeforeAndAfterAll {

private def getBalance(node: Node): (Long, Long) = {
val initialHeight = Await.result(node.height, 1.minute)
Await.result(node.waitForHeight(initialHeight + 1), 1.minute)
Await.result(node.waitForHeight(initialHeight + 2), 2.minute)

val balance = Await.result(node.balance(node.address), 1.minute).balance
val height = Await.result(node.height, 1.minute)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@ class NetworkSeparationTestSuite extends FreeSpec with Matchers with BeforeAndAf
private def validateBlocks(nodes: Seq[Node]): Unit = {
val targetBlocks1 = result(for {
height <- traverse(nodes)(_.height).map(_.max)
_ <- traverse(nodes)(_.waitForHeight(height + 20))
_ <- traverse(nodes)(_.waitForHeight(height + 15))
blocks <- traverse(nodes)(_.blockAt(height + 15))
_ <- traverse(nodes)(_.waitForHeight(height + 10))
blocks <- traverse(nodes)(_.blockAt(height + 8))
} yield blocks.map(_.signature), 5.minutes)
all(targetBlocks1) shouldEqual targetBlocks1.head
}

"node should grow up to 30 blocks together" in {
"node should grow up to 10 blocks together" in {
val richestNode = nodes.maxBy(n => Await.result(n.balance(n.address), 1.minute).balance)
Await.result(richestNode.waitForHeight(30), 5.minutes)
Await.result(richestNode.height, 1.minute) >= 30 shouldBe true
Await.result(richestNode.waitForHeight(10), 5.minutes)
Await.result(richestNode.height, 1.minute) >= 10 shouldBe true
}

"then we disconnect nodes from the network" in {
Expand All @@ -40,8 +39,8 @@ class NetworkSeparationTestSuite extends FreeSpec with Matchers with BeforeAndAf

"and wait for another 10 blocks on one node" in {
val richestNode = nodes.maxBy(n => Await.result(n.balance(n.address), 1.minute).balance)
Await.result(richestNode.waitForHeight(40), 5.minutes)
Await.result(richestNode.height, 1.minute) >= 40 shouldBe true
Await.result(richestNode.waitForHeight(20), 5.minutes)
Await.result(richestNode.height, 1.minute) >= 20 shouldBe true
}

"after that we connect nodes back to the network" in {
Expand Down
25 changes: 18 additions & 7 deletions src/main/scala/com/wavesplatform/Coordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.wavesplatform.features.{BlockchainFeatures, FeatureProvider}
import com.wavesplatform.metrics._
import com.wavesplatform.network.{BlockCheckpoint, Checkpoint}
import com.wavesplatform.settings.{BlockchainSettings, FunctionalitySettings, WavesSettings}
import com.wavesplatform.state2.ByteStr
import com.wavesplatform.state2._
import com.wavesplatform.state2.reader.StateReader
import kamon.Kamon
import org.influxdb.dto.Point
Expand All @@ -18,6 +18,7 @@ import scorex.transaction._
import scorex.utils.{ScorexLogging, Time}

import scala.concurrent.duration._
import scala.util.{Left, Right}

object Coordinator extends ScorexLogging with Instrumented {
def processFork(checkpoint: CheckpointService, history: History, blockchainUpdater: BlockchainUpdater,
Expand Down Expand Up @@ -66,24 +67,34 @@ object Coordinator extends ScorexLogging with Instrumented {
}

val initalHeight = history.height()
for {

val droppedBlocksEi = (for {
commonBlockHeight <- history.heightOf(lastCommonBlockId).toRight(GenericError("Fork contains no common parent"))
_ <- Either.cond(isForkValidWithCheckpoint(commonBlockHeight), (), GenericError("Fork contains block that doesn't match checkpoint, declining fork"))
droppedTransactions <- blockchainUpdater.removeAfter(lastCommonBlockId)
score <- forkApplicationResultEi
} yield {
droppedBlocks <- blockchainUpdater.removeAfter(lastCommonBlockId)
} yield (commonBlockHeight, droppedBlocks)).left.map((_, Seq.empty[Block]))

(for {
commonHeightAndDroppedBlocks <- droppedBlocksEi
(commonBlockHeight, droppedBlocks) = commonHeightAndDroppedBlocks
score <- forkApplicationResultEi.left.map((_, droppedBlocks))
} yield (commonBlockHeight, droppedBlocks, score))
.right.map { case ((commonBlockHeight, droppedBlocks, score)) =>
val depth = initalHeight - commonBlockHeight
if (depth > 0) {
Metrics.write(
Point
.measurement("rollback")
.addField("depth", initalHeight - commonBlockHeight)
.addField("txs", droppedTransactions.size)
.addField("txs", droppedBlocks.size)
)
}
droppedTransactions.foreach(utxStorage.putIfNew)
droppedBlocks.flatMap(_.transactionData).foreach(utxStorage.putIfNew)
updateBlockchainReadinessFlag(history, time, blockchainReadiness, settings.minerSettings.intervalAfterLastBlockThenGenerationIsAllowed)
Some(score)
}.left.map { case ((err, droppedBlocks)) =>
droppedBlocks.foreach(blockchainUpdater.processBlock(_).explicitGet())
err
}
case None =>
log.debug("No new blocks found in extension")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ class HistoryWriterImpl private(file: Option[File], val synchronizationToken: Re
}
}

def discardBlock(): Seq[Transaction] = write { implicit lock =>
def discardBlock(): Option[Block] = write { implicit lock =>
val h = height()

alterVotes(h, blockAt(h).map(b => b.featureVotes).getOrElse(Set.empty), -1)

val transactions =
Block.parseBytes(blockBodyByHeight.mutate(_.remove(h))).fold(_ => Seq.empty[Transaction], _.transactionData)
val removedBlockBytes = blockBodyByHeight.mutate(_.remove(h))
val maybeDiscardedBlock = Block.parseBytes(removedBlockBytes).toOption
scoreByHeight.mutate(_.remove(h))

if (h % activationWindowSize == 0) {
Expand All @@ -104,7 +104,7 @@ class HistoryWriterImpl private(file: Option[File], val synchronizationToken: Re
vOpt.map(v => heightByBlockId.mutate(_.remove(v)))
db.commit()

transactions
maybeDiscardedBlock
}

override def lastBlockIds(howMany: Int): Seq[ByteStr] = read { implicit lock =>
Expand Down
27 changes: 15 additions & 12 deletions src/main/scala/com/wavesplatform/state2/BlockchainUpdaterImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with StateReader,
}
}

override def removeAfter(blockId: ByteStr): Either[ValidationError, Seq[Transaction]] = write { implicit l =>
override def removeAfter(blockId: ByteStr): Either[ValidationError, Seq[Block]] = write { implicit l =>
val ng = ngState()
if (ng.exists(_.contains(blockId))) {
log.trace("Resetting liquid block, no rollback is necessary")
Expand All @@ -181,16 +181,18 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with StateReader,
log.warn(s"removeAfter nonexistent block $blockId")
Left(GenericError(s"Failed to rollback to nonexistent block $blockId"))
case Some(height) =>
val discardedTransactions = Seq.newBuilder[Transaction]
discardedTransactions ++= ng.toSeq.flatMap(_.transactions)
val ngRolledBack = ngState().nonEmpty
val discardedNgBlock = ng.map(_.bestLiquidBlock)
ngState.set(None)

val baseRolledBack = height < historyWriter.height()
if (baseRolledBack) {
val discardedHistoryBlocks = if (baseRolledBack) {
logHeights(s"Rollback to h=$height started")
while (historyWriter.height > height)
discardedTransactions ++= historyWriter.discardBlock()
val discarded = {
var buf = Seq.empty[Block]
while (historyWriter.height > height)
buf = historyWriter.discardBlock().toSeq ++ buf
buf
}
if (height < persisted.height) {
log.info(s"Rollback to h=$height requested. Persisted height=${persisted.height}, will drop state and reapply blockchain now")
persisted.clear()
Expand All @@ -209,15 +211,16 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with StateReader,
}
}
logHeights(s"Rollback to h=$height completed:")
discarded
} else {
log.debug(s"No rollback in history is necessary")
Seq.empty[Block]
}

if (baseRolledBack || ngRolledBack) lastBlockId.onNext(blockId)

val r = discardedTransactions.result()
TxsInBlockchainStats.record(-r.size)
Right(r)
val totalDiscardedBlocks: Seq[Block] = discardedHistoryBlocks ++ discardedNgBlock.toSeq
if (totalDiscardedBlocks.nonEmpty) lastBlockId.onNext(blockId)
TxsInBlockchainStats.record(-totalDiscardedBlocks.size)
Right(totalDiscardedBlocks)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/scorex/transaction/BlockchainUpdater.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ trait BlockchainUpdater extends Synchronized {

def processMicroBlock(microBlock: MicroBlock): Either[ValidationError, Unit]

def removeAfter(blockId: ByteStr): Either[ValidationError, DiscardedTransactions]
def removeAfter(blockId: ByteStr): Either[ValidationError, DiscardedBlocks]

def lastBlockId: Observable[ByteStr]
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/scorex/transaction/package.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package scorex

import com.wavesplatform.utils.base58Length
import scorex.block.MicroBlock
import scorex.block.{Block, MicroBlock}
import scorex.crypto.hash.FastCryptographicHash

package object transaction {
Expand All @@ -10,6 +10,7 @@ package object transaction {
val AssetIdLength = FastCryptographicHash.DigestSize
val AssetIdStringLength = base58Length(AssetIdLength)
type DiscardedTransactions = Seq[Transaction]
type DiscardedBlocks = Seq[Block]
type DiscardedMicroBlocks = Seq[MicroBlock]

}
4 changes: 2 additions & 2 deletions src/main/scala/scorex/waves/http/DebugApiRoute.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ case class DebugApiRoute(settings: RestAPISettings,

private def rollbackToBlock(blockId: ByteStr, returnTransactionsToUtx: Boolean): Future[ToResponseMarshallable] = Future {
blockchainUpdater.removeAfter(blockId) match {
case Right(txs) =>
case Right(blocks) =>
allChannels.broadcast(LocalScoreChanged(history.score()))
if (returnTransactionsToUtx) {
txs.foreach(tx => utxStorage.putIfNew(tx))
blocks.flatMap(_.transactionData).foreach(tx => utxStorage.putIfNew(tx))
}
miner.scheduleMining()
Json.obj("BlockId" -> blockId.toString): ToResponseMarshallable
Expand Down

0 comments on commit 00c59dd

Please sign in to comment.