diff --git a/src/it/scala/com/wavesplatform/it/IntegrationSuiteWithThreeAddresses.scala b/src/it/scala/com/wavesplatform/it/IntegrationSuiteWithThreeAddresses.scala index 9bb7f977452..8eec65fae7d 100644 --- a/src/it/scala/com/wavesplatform/it/IntegrationSuiteWithThreeAddresses.scala +++ b/src/it/scala/com/wavesplatform/it/IntegrationSuiteWithThreeAddresses.scala @@ -95,13 +95,13 @@ trait IntegrationSuiteWithThreeAddresses extends FunSuite with BeforeAndAfterAll txs <- makeTransfers height <- traverse(allNodes)(_.height).map(_.max) - _ <- traverse(allNodes)(_.waitForHeight(height + 1)) + _ <- traverse(allNodes)(_.waitForHeight(height + 2)) _ <- waitForTxsToReachAllNodes(txs) _ <- Future.sequence(Seq(firstAddress, secondAddress, thirdAddress).map(address => assertBalances(address, defaultBalance, defaultBalance))) } yield succeed - Await.result(correctStartBalancesFuture, 2.minutes) + Await.result(correctStartBalancesFuture, 5.minutes) } } diff --git a/src/it/scala/com/wavesplatform/it/MatcherTestSuite.scala b/src/it/scala/com/wavesplatform/it/MatcherTestSuite.scala index 6f6fa0c285a..c6bb0a0732d 100644 --- a/src/it/scala/com/wavesplatform/it/MatcherTestSuite.scala +++ b/src/it/scala/com/wavesplatform/it/MatcherTestSuite.scala @@ -273,7 +273,7 @@ class MatcherTestSuite extends FreeSpec with Matchers with BeforeAndAfterAll { private def getBalance(node: Node): (Long, Long) = { val initialHeight = Await.result(node.height, 1.minute) - Await.result(node.waitForHeight(initialHeight + 1), 1.minute) + Await.result(node.waitForHeight(initialHeight + 2), 2.minute) val balance = Await.result(node.balance(node.address), 1.minute).balance val height = Await.result(node.height, 1.minute) diff --git a/src/it/scala/com/wavesplatform/it/NetworkSeparationTestSuite.scala b/src/it/scala/com/wavesplatform/it/NetworkSeparationTestSuite.scala index b000a9a1d8f..01f582bdc4a 100644 --- a/src/it/scala/com/wavesplatform/it/NetworkSeparationTestSuite.scala +++ b/src/it/scala/com/wavesplatform/it/NetworkSeparationTestSuite.scala @@ -21,17 +21,16 @@ class NetworkSeparationTestSuite extends FreeSpec with Matchers with BeforeAndAf private def validateBlocks(nodes: Seq[Node]): Unit = { val targetBlocks1 = result(for { height <- traverse(nodes)(_.height).map(_.max) - _ <- traverse(nodes)(_.waitForHeight(height + 20)) - _ <- traverse(nodes)(_.waitForHeight(height + 15)) - blocks <- traverse(nodes)(_.blockAt(height + 15)) + _ <- traverse(nodes)(_.waitForHeight(height + 10)) + blocks <- traverse(nodes)(_.blockAt(height + 8)) } yield blocks.map(_.signature), 5.minutes) all(targetBlocks1) shouldEqual targetBlocks1.head } - "node should grow up to 30 blocks together" in { + "node should grow up to 10 blocks together" in { val richestNode = nodes.maxBy(n => Await.result(n.balance(n.address), 1.minute).balance) - Await.result(richestNode.waitForHeight(30), 5.minutes) - Await.result(richestNode.height, 1.minute) >= 30 shouldBe true + Await.result(richestNode.waitForHeight(10), 5.minutes) + Await.result(richestNode.height, 1.minute) >= 10 shouldBe true } "then we disconnect nodes from the network" in { @@ -40,8 +39,8 @@ class NetworkSeparationTestSuite extends FreeSpec with Matchers with BeforeAndAf "and wait for another 10 blocks on one node" in { val richestNode = nodes.maxBy(n => Await.result(n.balance(n.address), 1.minute).balance) - Await.result(richestNode.waitForHeight(40), 5.minutes) - Await.result(richestNode.height, 1.minute) >= 40 shouldBe true + Await.result(richestNode.waitForHeight(20), 5.minutes) + Await.result(richestNode.height, 1.minute) >= 20 shouldBe true } "after that we connect nodes back to the network" in { diff --git a/src/main/scala/com/wavesplatform/Coordinator.scala b/src/main/scala/com/wavesplatform/Coordinator.scala index 16ecfc86ea1..2ac60f9c367 100644 --- a/src/main/scala/com/wavesplatform/Coordinator.scala +++ b/src/main/scala/com/wavesplatform/Coordinator.scala @@ -6,7 +6,7 @@ import com.wavesplatform.features.{BlockchainFeatures, FeatureProvider} import com.wavesplatform.metrics._ import com.wavesplatform.network.{BlockCheckpoint, Checkpoint} import com.wavesplatform.settings.{BlockchainSettings, FunctionalitySettings, WavesSettings} -import com.wavesplatform.state2.ByteStr +import com.wavesplatform.state2._ import com.wavesplatform.state2.reader.StateReader import kamon.Kamon import org.influxdb.dto.Point @@ -18,6 +18,7 @@ import scorex.transaction._ import scorex.utils.{ScorexLogging, Time} import scala.concurrent.duration._ +import scala.util.{Left, Right} object Coordinator extends ScorexLogging with Instrumented { def processFork(checkpoint: CheckpointService, history: History, blockchainUpdater: BlockchainUpdater, @@ -66,24 +67,34 @@ object Coordinator extends ScorexLogging with Instrumented { } val initalHeight = history.height() - for { + + val droppedBlocksEi = (for { commonBlockHeight <- history.heightOf(lastCommonBlockId).toRight(GenericError("Fork contains no common parent")) _ <- Either.cond(isForkValidWithCheckpoint(commonBlockHeight), (), GenericError("Fork contains block that doesn't match checkpoint, declining fork")) - droppedTransactions <- blockchainUpdater.removeAfter(lastCommonBlockId) - score <- forkApplicationResultEi - } yield { + droppedBlocks <- blockchainUpdater.removeAfter(lastCommonBlockId) + } yield (commonBlockHeight, droppedBlocks)).left.map((_, Seq.empty[Block])) + + (for { + commonHeightAndDroppedBlocks <- droppedBlocksEi + (commonBlockHeight, droppedBlocks) = commonHeightAndDroppedBlocks + score <- forkApplicationResultEi.left.map((_, droppedBlocks)) + } yield (commonBlockHeight, droppedBlocks, score)) + .right.map { case ((commonBlockHeight, droppedBlocks, score)) => val depth = initalHeight - commonBlockHeight if (depth > 0) { Metrics.write( Point .measurement("rollback") .addField("depth", initalHeight - commonBlockHeight) - .addField("txs", droppedTransactions.size) + .addField("txs", droppedBlocks.size) ) } - droppedTransactions.foreach(utxStorage.putIfNew) + droppedBlocks.flatMap(_.transactionData).foreach(utxStorage.putIfNew) updateBlockchainReadinessFlag(history, time, blockchainReadiness, settings.minerSettings.intervalAfterLastBlockThenGenerationIsAllowed) Some(score) + }.left.map { case ((err, droppedBlocks)) => + droppedBlocks.foreach(blockchainUpdater.processBlock(_).explicitGet()) + err } case None => log.debug("No new blocks found in extension") diff --git a/src/main/scala/com/wavesplatform/history/HistoryWriterImpl.scala b/src/main/scala/com/wavesplatform/history/HistoryWriterImpl.scala index 69a7abadef8..d7fa9f57262 100644 --- a/src/main/scala/com/wavesplatform/history/HistoryWriterImpl.scala +++ b/src/main/scala/com/wavesplatform/history/HistoryWriterImpl.scala @@ -86,13 +86,13 @@ class HistoryWriterImpl private(file: Option[File], val synchronizationToken: Re } } - def discardBlock(): Seq[Transaction] = write { implicit lock => + def discardBlock(): Option[Block] = write { implicit lock => val h = height() alterVotes(h, blockAt(h).map(b => b.featureVotes).getOrElse(Set.empty), -1) - val transactions = - Block.parseBytes(blockBodyByHeight.mutate(_.remove(h))).fold(_ => Seq.empty[Transaction], _.transactionData) + val removedBlockBytes = blockBodyByHeight.mutate(_.remove(h)) + val maybeDiscardedBlock = Block.parseBytes(removedBlockBytes).toOption scoreByHeight.mutate(_.remove(h)) if (h % activationWindowSize == 0) { @@ -104,7 +104,7 @@ class HistoryWriterImpl private(file: Option[File], val synchronizationToken: Re vOpt.map(v => heightByBlockId.mutate(_.remove(v))) db.commit() - transactions + maybeDiscardedBlock } override def lastBlockIds(howMany: Int): Seq[ByteStr] = read { implicit lock => diff --git a/src/main/scala/com/wavesplatform/state2/BlockchainUpdaterImpl.scala b/src/main/scala/com/wavesplatform/state2/BlockchainUpdaterImpl.scala index 618c3caa37b..083fd7d8090 100644 --- a/src/main/scala/com/wavesplatform/state2/BlockchainUpdaterImpl.scala +++ b/src/main/scala/com/wavesplatform/state2/BlockchainUpdaterImpl.scala @@ -170,7 +170,7 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with StateReader, } } - override def removeAfter(blockId: ByteStr): Either[ValidationError, Seq[Transaction]] = write { implicit l => + override def removeAfter(blockId: ByteStr): Either[ValidationError, Seq[Block]] = write { implicit l => val ng = ngState() if (ng.exists(_.contains(blockId))) { log.trace("Resetting liquid block, no rollback is necessary") @@ -181,16 +181,18 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with StateReader, log.warn(s"removeAfter nonexistent block $blockId") Left(GenericError(s"Failed to rollback to nonexistent block $blockId")) case Some(height) => - val discardedTransactions = Seq.newBuilder[Transaction] - discardedTransactions ++= ng.toSeq.flatMap(_.transactions) - val ngRolledBack = ngState().nonEmpty + val discardedNgBlock = ng.map(_.bestLiquidBlock) ngState.set(None) val baseRolledBack = height < historyWriter.height() - if (baseRolledBack) { + val discardedHistoryBlocks = if (baseRolledBack) { logHeights(s"Rollback to h=$height started") - while (historyWriter.height > height) - discardedTransactions ++= historyWriter.discardBlock() + val discarded = { + var buf = Seq.empty[Block] + while (historyWriter.height > height) + buf = historyWriter.discardBlock().toSeq ++ buf + buf + } if (height < persisted.height) { log.info(s"Rollback to h=$height requested. Persisted height=${persisted.height}, will drop state and reapply blockchain now") persisted.clear() @@ -209,15 +211,16 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with StateReader, } } logHeights(s"Rollback to h=$height completed:") + discarded } else { log.debug(s"No rollback in history is necessary") + Seq.empty[Block] } - if (baseRolledBack || ngRolledBack) lastBlockId.onNext(blockId) - - val r = discardedTransactions.result() - TxsInBlockchainStats.record(-r.size) - Right(r) + val totalDiscardedBlocks: Seq[Block] = discardedHistoryBlocks ++ discardedNgBlock.toSeq + if (totalDiscardedBlocks.nonEmpty) lastBlockId.onNext(blockId) + TxsInBlockchainStats.record(-totalDiscardedBlocks.size) + Right(totalDiscardedBlocks) } } } diff --git a/src/main/scala/scorex/transaction/BlockchainUpdater.scala b/src/main/scala/scorex/transaction/BlockchainUpdater.scala index d86580cfd54..adb8454a27c 100644 --- a/src/main/scala/scorex/transaction/BlockchainUpdater.scala +++ b/src/main/scala/scorex/transaction/BlockchainUpdater.scala @@ -12,7 +12,7 @@ trait BlockchainUpdater extends Synchronized { def processMicroBlock(microBlock: MicroBlock): Either[ValidationError, Unit] - def removeAfter(blockId: ByteStr): Either[ValidationError, DiscardedTransactions] + def removeAfter(blockId: ByteStr): Either[ValidationError, DiscardedBlocks] def lastBlockId: Observable[ByteStr] } diff --git a/src/main/scala/scorex/transaction/package.scala b/src/main/scala/scorex/transaction/package.scala index a00fbdf45c1..748575a7dd7 100755 --- a/src/main/scala/scorex/transaction/package.scala +++ b/src/main/scala/scorex/transaction/package.scala @@ -1,7 +1,7 @@ package scorex import com.wavesplatform.utils.base58Length -import scorex.block.MicroBlock +import scorex.block.{Block, MicroBlock} import scorex.crypto.hash.FastCryptographicHash package object transaction { @@ -10,6 +10,7 @@ package object transaction { val AssetIdLength = FastCryptographicHash.DigestSize val AssetIdStringLength = base58Length(AssetIdLength) type DiscardedTransactions = Seq[Transaction] + type DiscardedBlocks = Seq[Block] type DiscardedMicroBlocks = Seq[MicroBlock] } diff --git a/src/main/scala/scorex/waves/http/DebugApiRoute.scala b/src/main/scala/scorex/waves/http/DebugApiRoute.scala index a68a1375f45..98c45e581a7 100644 --- a/src/main/scala/scorex/waves/http/DebugApiRoute.scala +++ b/src/main/scala/scorex/waves/http/DebugApiRoute.scala @@ -132,10 +132,10 @@ case class DebugApiRoute(settings: RestAPISettings, private def rollbackToBlock(blockId: ByteStr, returnTransactionsToUtx: Boolean): Future[ToResponseMarshallable] = Future { blockchainUpdater.removeAfter(blockId) match { - case Right(txs) => + case Right(blocks) => allChannels.broadcast(LocalScoreChanged(history.score())) if (returnTransactionsToUtx) { - txs.foreach(tx => utxStorage.putIfNew(tx)) + blocks.flatMap(_.transactionData).foreach(tx => utxStorage.putIfNew(tx)) } miner.scheduleMining() Json.obj("BlockId" -> blockId.toString): ToResponseMarshallable