Skip to content

Commit

Permalink
Merge pull request #266 from wavesplatform/T442-performance
Browse files Browse the repository at this point in the history
T442 performance
  • Loading branch information
alexeykiselev authored Apr 27, 2017
2 parents e577235 + 74424a9 commit b7e6026
Show file tree
Hide file tree
Showing 14 changed files with 89 additions and 109 deletions.
4 changes: 2 additions & 2 deletions lock.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dependencyOverrides in ThisBuild ++= Set(
"com.github.swagger-akka-http" % "swagger-akka-http_2.12" % "0.9.1",
"com.google.code.findbugs" % "annotations" % "2.0.1",
"com.google.guava" % "guava" % "21.0",
"com.h2database" % "h2-mvstore" % "1.4.194",
"com.h2database" % "h2-mvstore" % "1.4.195",
"com.iheart" % "ficus_2.12" % "1.4.0",
"com.ning" % "async-http-client" % "1.9.11",
"com.spotify" % "docker-client" % "8.1.2",
Expand Down Expand Up @@ -132,4 +132,4 @@ dependencyOverrides in ThisBuild ++= Set(
"org.whispersystems" % "curve25519-java" % "0.3.0",
"org.yaml" % "snakeyaml" % "1.17"
)
// LIBRARY_DEPENDENCIES_HASH 493197a166efa557a02754040535be47a126d1cd
// LIBRARY_DEPENDENCIES_HASH a8bc8a963b2267fa77f6f01218cb9904464481e3
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object Dependencies {
)

lazy val db = Seq(
"com.h2database" % "h2-mvstore" % "1.4.194"
"com.h2database" % "h2-mvstore" % "1.4.195"
)

lazy val logging = Seq(
Expand Down
23 changes: 0 additions & 23 deletions src/main/scala/com/wavesplatform/history/HistoryStorage.scala

This file was deleted.

12 changes: 1 addition & 11 deletions src/main/scala/com/wavesplatform/history/HistoryWriterImpl.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.wavesplatform.history

import com.google.common.cache.{CacheBuilder, CacheLoader}
import org.h2.mvstore.MVStore
import scorex.account.Account
import scorex.block.Block
Expand All @@ -17,13 +16,6 @@ class HistoryWriterImpl(db: MVStore) extends History with HistoryWriter with Sco
private val heightByBlockId = db.openMap("signaturesReverse", new LogMVMapBuilder[BlockId, Int])
private val scoreByHeight = db.openMap("score", new LogMVMapBuilder[Int, BigInt])

private val BlocksCacheSizeLimit: Int = 1000
private val blocksCache = CacheBuilder.newBuilder()
.maximumSize(BlocksCacheSizeLimit)
.build(CacheLoader.from[Integer, Block] { height =>
Block.parseBytes(blockBodyByHeight.get(height)).get
})

override def appendBlock(block: Block): Either[ValidationError, Unit] = {
if ((height() == 0) || (this.lastBlock.uniqueId sameElements block.reference)) {
val h = height() + 1
Expand All @@ -39,16 +31,14 @@ class HistoryWriterImpl(db: MVStore) extends History with HistoryWriter with Sco
}

override def discardBlock(): Unit = {
require(height() > 1, "Chain is empty or contains genesis block only, can't make rollback")
val h = height()
blocksCache.invalidate(h)
blockBodyByHeight.remove(h)
val vOpt = Option(blockIdByHeight.remove(h))
vOpt.map(v => heightByBlockId.remove(v))
db.commit()
}

override def blockAt(height: Int): Option[Block] = scala.util.control.Exception.allCatch.opt(blocksCache.get(height))
override def blockAt(height: Int): Option[Block] = Option(blockBodyByHeight.get(height)).map(Block.parseBytes(_).get)

override def lastBlockIds(howMany: Int): Seq[BlockId] =
(Math.max(1, height() - howMany + 1) to height()).flatMap(i => Option(blockIdByHeight.get(i)))
Expand Down
42 changes: 25 additions & 17 deletions src/main/scala/com/wavesplatform/state2/BlockchainUpdaterImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import scorex.crypto.encode.Base58
import scorex.transaction._
import scorex.utils.ScorexLogging

import StateWriterImpl._
import BlockchainUpdaterImpl._

class BlockchainUpdaterImpl(persisted: StateWriter with StateReader, settings: FunctionalitySettings, bc: HistoryWriter with History)
extends BlockchainUpdater with ScorexLogging {

Expand All @@ -18,20 +21,14 @@ class BlockchainUpdaterImpl(persisted: StateWriter with StateReader, settings: F

private val unsafeDifferByRange: (StateReader, (Int, Int)) => BlockDiff = {
case (sr, (from, to)) =>
log.debug(s"Reading blocks from $from to $to")
val blocks = Range(from, to).foldLeft((List.empty[Block], 0)) { case ((acc, counter), i) =>
if (counter % 1000 == 0) {
log.debug(s"Read block $counter of Range($from, $to)")
}
(bc.blockAt(i).get +: acc, counter + 1)
}._1.reverse
log.debug(s"Blocks read from $from to $to")
val r = BlockDiffer.unsafeDiffMany(settings)(sr, blocks)
log.debug(s"Diff for Range($from, $to) rebuilt")
r
val blocks = measureLog(s"Reading blocks from $from up to $to") {
Range(from, to).map(bc.blockAt(_).get)
}
measureLog(s"Building diff from $from up to $to") {
BlockDiffer.unsafeDiffMany(settings)(sr, blocks)
}
}
private val unsafeDiffByRange: ((Int, Int)) => BlockDiff = unsafeDifferByRange(persisted, _)
private val unsafeDiffNotPersisted: () => BlockDiff = () => unsafeDiffByRange(persisted.height + 1, bc.height() + 1)
private val unsafeDiffAgainstPersistedByRange: ((Int, Int)) => BlockDiff = unsafeDifferByRange(persisted, _)

@volatile var inMemoryDiff: BlockDiff = Monoid[BlockDiff].empty

Expand All @@ -53,9 +50,12 @@ class BlockchainUpdaterImpl(persisted: StateWriter with StateReader, settings: F
logHeights("State rebuild started:")
val persistFrom = persisted.height + 1
val persistUpTo = bc.height - MinInMemDiff + 1
val diffToBePersisted = unsafeDiffByRange(persistFrom, persistUpTo)
persisted.applyBlockDiff(diffToBePersisted)
inMemoryDiff = unsafeDiffNotPersisted()

ranges(persistFrom, persistUpTo, 200).foreach { case (head, last) =>
val diffToBePersisted = unsafeDiffAgainstPersistedByRange(head, last)
persisted.applyBlockDiff(diffToBePersisted)
}
inMemoryDiff = unsafeDiffAgainstPersistedByRange(persisted.height + 1, bc.height() + 1)
logHeights("State rebuild finished:")
}

Expand Down Expand Up @@ -85,11 +85,19 @@ class BlockchainUpdaterImpl(persisted: StateWriter with StateReader, settings: F

} else {
if (currentState.height != height) {
inMemoryDiff = unsafeDiffByRange(persisted.height + 1, height + 1)
inMemoryDiff = unsafeDiffAgainstPersistedByRange(persisted.height + 1, height + 1)
}
}
case None =>
log.warn(s"removeAfter non-existing block ${Base58.encode(blockId)}")
}
}
}

object BlockchainUpdaterImpl {
def ranges(from: Int, to: Int, by: Int): List[(Int, Int)] =
if (from + by < to)
(from, from + by) +: ranges(from + by, to, by)
else List((from, to))

}
6 changes: 3 additions & 3 deletions src/main/scala/com/wavesplatform/state2/Portfolio.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ case class Portfolio(balance: Long, leaseInfo: LeaseInfo, assets: Map[ByteArray,
}

object Portfolio {
implicit val longSemigroup: Semigroup[Long] = (x: Long, y: Long) => safeSum(x, y)

implicit val portfolioMonoid = new Monoid[Portfolio] {
override def empty: Portfolio = Portfolio(0L, Monoid[LeaseInfo].empty, Map.empty)

override def combine(older: Portfolio, newer: Portfolio): Portfolio
= Portfolio(
balance = safeSum(older.balance, newer.balance),
leaseInfo = Monoid.combine(older.leaseInfo, newer.leaseInfo),
assets = (older.assets.keys ++ newer.assets.keys)
.map(ba => ba -> safeSum(older.assets.getOrElse(ba, 0), newer.assets.getOrElse(ba, 0)))
.toMap)
assets = older.assets.combine(newer.assets))
}
}
33 changes: 21 additions & 12 deletions src/main/scala/com/wavesplatform/state2/StateWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ class StateWriterImpl(p: StateStorage) extends StateReaderImpl(p) with StateWrit
override def applyBlockDiff(blockDiff: BlockDiff): Unit = {
val txsDiff = blockDiff.txsDiff

measurePersist("transactions")(txsDiff.transactions) {
log.debug(s"Starting persist from ${p.getHeight} to ${p.getHeight + blockDiff.heightDiff}")

measureSizeLog("transactions")(txsDiff.transactions) {
_.foreach { case (id, (h, tx, _)) =>
p.transactions.put(id.arr, (h, tx.bytes))
}
}

measurePersist("previousExchangeTxs")(blockDiff.txsDiff.previousExchangeTxs) {
measureSizeLog("previousExchangeTxs")(blockDiff.txsDiff.previousExchangeTxs) {
_.foreach { case (oid, txs) =>
Option(p.exchangeTransactionsByOrder.get(oid.arr)) match {
case Some(ll) =>
Expand All @@ -37,7 +39,7 @@ class StateWriterImpl(p: StateStorage) extends StateReaderImpl(p) with StateWrit
}
}

measurePersist("portfolios")(txsDiff.portfolios) {
measureSizeLog("portfolios")(txsDiff.portfolios) {
_.foreach { case (account, portfolioDiff) =>
val updatedPortfolio = accountPortfolio(account).combine(portfolioDiff)
p.portfolios.put(account.bytes,
Expand All @@ -48,7 +50,7 @@ class StateWriterImpl(p: StateStorage) extends StateReaderImpl(p) with StateWrit
}


measurePersist("assets")(txsDiff.issuedAssets) {
measureSizeLog("assets")(txsDiff.issuedAssets) {
_.foreach { case (id, assetInfo) =>
val updated = (Option(p.assets.get(id.arr)) match {
case None => Monoid[AssetInfo].empty
Expand All @@ -59,7 +61,7 @@ class StateWriterImpl(p: StateStorage) extends StateReaderImpl(p) with StateWrit
}
}

measurePersist("accountTransactionIds")(blockDiff.txsDiff.accountTransactionIds) {
measureSizeLog("accountTransactionIds")(blockDiff.txsDiff.accountTransactionIds) {
_.foreach { case (acc, txIds) =>
Option(p.accountTransactionIds.get(acc.bytes)) match {
case Some(ll) =>
Expand All @@ -72,27 +74,27 @@ class StateWriterImpl(p: StateStorage) extends StateReaderImpl(p) with StateWrit
}
}

measurePersist("paymentTransactionIdsByHashes")(blockDiff.txsDiff.paymentTransactionIdsByHashes) {
measureSizeLog("paymentTransactionIdsByHashes")(blockDiff.txsDiff.paymentTransactionIdsByHashes) {
_.foreach { case (EqByteArray(hash), EqByteArray(id)) =>
p.paymentTransactionHashes.put(hash, id)
}
}

measurePersist("effectiveBalanceSnapshots")(blockDiff.snapshots)(
measureSizeLog("effectiveBalanceSnapshots")(blockDiff.snapshots)(
_.foreach { case (acc, snapshotsByHeight) =>
snapshotsByHeight.foreach { case (h, snapshot) =>
p.balanceSnapshots.put(StateStorage.snapshotKey(acc, h), (snapshot.prevHeight, snapshot.balance, snapshot.effectiveBalance))
}
p.lastUpdateHeight.put(acc.bytes, snapshotsByHeight.keys.max)
})

measurePersist("aliases")(blockDiff.txsDiff.aliases) {
measureSizeLog("aliases")(blockDiff.txsDiff.aliases) {
_.foreach { case (alias, acc) =>
p.aliasToAddress.put(alias.name, acc.bytes)
}
}

measurePersist("lease info")(blockDiff.txsDiff.leaseState)(
measureSizeLog("lease info")(blockDiff.txsDiff.leaseState)(
_.foreach { case (id, isActive) => p.leaseState.put(id.arr, isActive) })

p.setHeight(p.getHeight + blockDiff.heightDiff)
Expand Down Expand Up @@ -126,9 +128,16 @@ object StateWriterImpl extends ScorexLogging {
(r, t1 - t0)
}

def measurePersist[F[_] <: TraversableOnce[_], A](s: String)(fa: => F[A])(f: F[A] => Unit): Unit = {
val (_, time) = withTime(f(fa))
log.debug(s"Persisting $s(size=${fa.size}) took ${time}ms")
def measureSizeLog[F[_] <: TraversableOnce[_], A, R](s: String)(fa: => F[A])(f: F[A] => R): R = {
val (r, time) = withTime(f(fa))
log.debug(s"processing of ${fa.size} $s took ${time}ms")
r
}

def measureLog[R](s: String)(f: => R): R = {
val (r, time) = withTime(f)
log.debug(s"$s took ${time}ms")
r
}
}

11 changes: 2 additions & 9 deletions src/main/scala/com/wavesplatform/state2/diffs/BlockDiffer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,9 @@ object BlockDiffer extends ScorexLogging {
}
}


def unsafeDiffMany(settings: FunctionalitySettings)(s: StateReader, blocks: Seq[Block]): BlockDiff = {
val r = blocks.foldLeft(Monoid[BlockDiff].empty) { case (diff, block) =>
def unsafeDiffMany(settings: FunctionalitySettings)(s: StateReader, blocks: Seq[Block]): BlockDiff =
blocks.foldLeft(Monoid[BlockDiff].empty) { case (diff, block) =>
val blockDiff = apply(settings)(new CompositeStateReader(s, diff), block).explicitGet()
if (diff.heightDiff % 1000 == 0) {
log.info(s"Rebuilt ${diff.heightDiff} blocks out of ${blocks.size}")
}
Monoid[BlockDiff].combine(diff, blockDiff)
}
log.info(s"Rebuild of ${blocks.size} blocks completed")
r
}
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
package com.wavesplatform.state2.diffs

import cats._
import com.wavesplatform.settings.FunctionalitySettings
import com.wavesplatform.state2.{EqByteArray, Portfolio}
import com.wavesplatform.state2.reader.StateReader
import scorex.account.{Account, Alias}
import com.wavesplatform.state2.{EqByteArray, Portfolio, _}
import scorex.account.Account
import scorex.transaction.ValidationError.TransactionValidationError
import scorex.transaction._
import scorex.transaction.assets.exchange.ExchangeTransaction
import scorex.transaction.assets.{BurnTransaction, IssueTransaction, ReissueTransaction, TransferTransaction}
import scorex.transaction.lease.{LeaseCancelTransaction, LeaseTransaction}
import com.wavesplatform.state2._

import cats._
import cats.implicits._
import cats.syntax.all._


import scala.util.{Left, Right}
import scala.concurrent.duration._
import scala.util.{Left, Right}

object CommonValidation {
def disallowSendingGreaterThanBalance[T <: Transaction](s: StateReader, settings: FunctionalitySettings, blockTime: Long, tx: T): Either[ValidationError, T] =
Expand All @@ -38,23 +33,26 @@ object CommonValidation {
case None => Portfolio(-ttx.fee, LeaseInfo.empty, Map.empty)
}

val temporaryStateWhileTransfer = Monoid[Portfolio].combineAll(Seq(s.accountPortfolio(sender), amountDiff, feeDiff))
val accountPortfolio = s.accountPortfolio(sender)
val spendings = Monoid.combine(amountDiff, feeDiff)
accountPortfolio.balance + spendings.balance


if (temporaryStateWhileTransfer.balance < 0 || temporaryStateWhileTransfer.assets.values.exists(_ < 0))
lazy val negativeAssets: Boolean = spendings.assets.exists { case (id, amt) => (accountPortfolio.assets.getOrElse(id, 0L) + amt) < 0L }
lazy val negativeWaves = accountPortfolio.balance + spendings.balance < 0
if (negativeWaves || negativeAssets)
Left(TransactionValidationError(ttx, s"Attempt to transfer unavailable funds:" +
s" Transaction application leads from ${s.accountPortfolio(sender)} to (at least) temporary negative state: $temporaryStateWhileTransfer"))
s" Transaction application leads from $accountPortfolio to (at least) temporary negative state"))
else Right(tx)
case _ => Right(tx)
} else Right(tx)

def disallowDuplicateIds[T <: Transaction](state: StateReader, settings: FunctionalitySettings, height: Int, tx: T): Either[ValidationError, T] = tx match {
case ptx: PaymentTransaction if ptx.timestamp < settings.requirePaymentUniqueId => Right(tx)
case _ =>
state.transactionInfo(EqByteArray(tx.id)) match {
case None => Right(tx)
case Some((oldH, oldTx)) => Left(TransactionValidationError(tx, s"Tx id(exc. for some PaymentTransactions) cannot be duplicated." +
s" Current height is: $height. Tx with such id aready present at H=$oldH: $oldTx"))
}
if (state.containsTransaction(EqByteArray(tx.id)))
Left(TransactionValidationError(tx, s"Tx id(exc. for some PaymentTransactions) cannot be duplicated. Current height is: $height. Tx with such id aready present"))
else Right(tx)
}

def disallowBeforeActivationTime[T <: Transaction](state: StateReader, settings: FunctionalitySettings, tx: T): Either[ValidationError, T] =
Expand Down
Loading

0 comments on commit b7e6026

Please sign in to comment.