Skip to content

Commit

Permalink
Merge pull request #251 from wavesplatform/T442-restore-balance-at-he…
Browse files Browse the repository at this point in the history
…ight

T442: restore 'balance with confirmations' and 'balance at height' functionality
  • Loading branch information
alexeykiselev authored Apr 26, 2017
2 parents 96ff2b7 + d3d06c4 commit e577235
Show file tree
Hide file tree
Showing 21 changed files with 287 additions and 324 deletions.
15 changes: 12 additions & 3 deletions src/main/scala/com/wavesplatform/state2/BlockDiff.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,27 @@ package com.wavesplatform.state2
import cats._
import cats.implicits._
import cats.Monoid
import scorex.account.Account

import scala.collection.SortedMap

case class BlockDiff(txsDiff: Diff,
heightDiff: Int,
effectiveBalanceSnapshots: Set[EffectiveBalanceSnapshot])
snapshots: Map[Account, SortedMap[Int, Snapshot]])

object BlockDiff {

implicit def sortedMapForSnapshotsMonoid[A: Ordering, Snapshot]: Monoid[SortedMap[A, Snapshot]] = new Monoid[SortedMap[A, Snapshot]] {
def empty: SortedMap[A, Snapshot] = SortedMap.empty[A, Snapshot]
def combine(f1: SortedMap[A, Snapshot], f2: SortedMap[A, Snapshot]): SortedMap[A, Snapshot] = f1 ++ f2
}

implicit val blockDiffMonoid = new Monoid[BlockDiff] {
override def empty: BlockDiff = BlockDiff(Monoid[Diff].empty, 0, Set.empty)
override def empty: BlockDiff = BlockDiff(Monoid[Diff].empty, 0, Map.empty)

override def combine(older: BlockDiff, newer: BlockDiff): BlockDiff = BlockDiff(
txsDiff = older.txsDiff.combine(newer.txsDiff),
heightDiff = older.heightDiff + newer.heightDiff,
effectiveBalanceSnapshots = older.effectiveBalanceSnapshots ++ newer.effectiveBalanceSnapshots)
snapshots = older.snapshots.combine(newer.snapshots))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,20 @@ import scorex.utils.ScorexLogging
class BlockchainUpdaterImpl(persisted: StateWriter with StateReader, settings: FunctionalitySettings, bc: HistoryWriter with History)
extends BlockchainUpdater with ScorexLogging {

private val MinInMemDiff = 100
private val MaxInMemDiff = 200
private val MinInMemDiff = 200
private val MaxInMemDiff = MinInMemDiff * 2

private val unsafeDifferByRange: (StateReader, (Int, Int)) => BlockDiff = {
case (sr, (from, to)) =>
log.debug(s"Reading blocks from $from to $to")
val blocks = Range(from, to).map(bc.blockAt(_).get)
val blocks = Range(from, to).foldLeft((List.empty[Block], 0)) { case ((acc, counter), i) =>
if (counter % 1000 == 0) {
log.debug(s"Read block $counter of Range($from, $to)")
}
(bc.blockAt(i).get +: acc, counter + 1)
}._1.reverse
log.debug(s"Blocks read from $from to $to")
val r = BlockDiffer.unsafeDiffMany(settings, m => log.info(m))(sr, blocks)
val r = BlockDiffer.unsafeDiffMany(settings)(sr, blocks)
log.debug(s"Diff for Range($from, $to) rebuilt")
r
}
Expand All @@ -42,7 +47,7 @@ class BlockchainUpdaterImpl(persisted: StateWriter with StateReader, settings: F
}
}

def currentState: StateReader = CompositeStateReader.proxy(persisted, () => inMemoryDiff)
def currentState: StateReader = new CompositeStateReader.Proxy(persisted, () => inMemoryDiff)

private def updatePersistedAndInMemory(): Unit = {
logHeights("State rebuild started:")
Expand Down
46 changes: 13 additions & 33 deletions src/main/scala/com/wavesplatform/state2/Diff.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ import cats.implicits._
import scorex.account.{Account, Alias}
import scorex.transaction.Transaction
import scorex.transaction.assets.exchange.ExchangeTransaction
import scorex.transaction.lease.{LeaseCancelTransaction, LeaseTransaction}

case class EffectiveBalanceSnapshot(acc: Account, height: Int, prevEffectiveBalance: Long, effectiveBalance: Long)
case class Snapshot(prevHeight: Int, balance: Long, effectiveBalance: Long)

case class LeaseInfo(leaseIn: Long, leaseOut: Long)

object LeaseInfo {
val empty = LeaseInfo(0, 0)

implicit val leaseInfoMonoid = new Monoid[LeaseInfo] {
override def empty: LeaseInfo = LeaseInfo.empty

Expand All @@ -38,35 +36,18 @@ case class Diff(transactions: Map[ByteArray, (Int, Transaction, Set[Account])],
aliases: Map[Alias, Account],
paymentTransactionIdsByHashes: Map[ByteArray, ByteArray],
previousExchangeTxs: Map[ByteArray, Set[ExchangeTransaction]],
patchExtraLeaseIdsToCancel: Seq[ByteArray]) {
leaseState: Map[ByteArray, Boolean]) {

lazy val accountTransactionIds: Map[Account, List[ByteArray]] = {
val map: List[(Account, List[(Int, Long, ByteArray)])] = transactions.toList
.flatMap { case (id, (h, tx, accs)) => accs.map(acc => acc -> List((h, tx.timestamp, id))) }
val groupedByAcc = map.foldLeft(Map.empty[Account, List[(Int, Long, ByteArray)]]) { case (m, (acc, list)) =>
m.combine(Map(acc -> list))
val map: List[(Account, Set[(Int, Long, ByteArray)])] = transactions.toList
.flatMap { case (id, (h, tx, accs)) => accs.map(acc => acc -> Set((h, tx.timestamp, id))) }
val groupedByAcc = map.foldLeft(Map.empty[Account, Set[(Int, Long, ByteArray)]]) { case (m, (acc, set)) =>
m.combine(Map(acc -> set))
}
groupedByAcc
.mapValues(l => l.sortBy { case ((h, t, id)) => (-h, -t) }) // fresh head ([h=2, h=1, h=0])
.mapValues(l => l.toList.sortBy { case ((h, t, id)) => (-h, -t) }) // fresh head ([h=2, h=1, h=0])
.mapValues(_.map(_._3))
}

lazy val effectiveLeaseTxUpdates: (Set[EqByteArray], Set[EqByteArray]) = {
val txs = transactions.values.map(_._2)

val canceledLeaseIds: Set[EqByteArray] = txs
.collect { case (lctx: LeaseCancelTransaction) => EqByteArray(lctx.leaseId) }
.toSet

val newLeaseIds = txs
.collect { case (ltx: LeaseTransaction) => EqByteArray(ltx.id) }
.toSet

val effectiveNewCancels = (canceledLeaseIds ++ patchExtraLeaseIdsToCancel).diff(newLeaseIds)
val effectiveNewLeases = newLeaseIds.diff(canceledLeaseIds ++ patchExtraLeaseIdsToCancel)
(effectiveNewLeases, effectiveNewCancels)
}

}

object Diff {
Expand All @@ -75,22 +56,23 @@ object Diff {
assetInfos: Map[ByteArray, AssetInfo] = Map.empty,
aliases: Map[Alias, Account] = Map.empty,
previousExchangeTxs: Map[ByteArray, Set[ExchangeTransaction]] = Map.empty,
paymentTransactionIdsByHashes: Map[ByteArray, ByteArray] = Map.empty
paymentTransactionIdsByHashes: Map[ByteArray, ByteArray] = Map.empty,
leaseState: Map[ByteArray, Boolean] = Map.empty
): Diff = Diff(
transactions = Map(EqByteArray(tx.id) -> (height, tx, portfolios.keys.toSet)),
portfolios = portfolios,
issuedAssets = assetInfos,
aliases = aliases,
paymentTransactionIdsByHashes = paymentTransactionIdsByHashes,
previousExchangeTxs = previousExchangeTxs,
patchExtraLeaseIdsToCancel = Seq.empty)
leaseState = leaseState)

implicit class DiffExt(d: Diff) {
def asBlockDiff: BlockDiff = BlockDiff(d, 0, Set.empty)
def asBlockDiff: BlockDiff = BlockDiff(d, 0, Map.empty)
}

implicit val diffMonoid = new Monoid[Diff] {
override def empty: Diff = Diff(transactions = Map.empty, portfolios = Map.empty, issuedAssets = Map.empty, Map.empty, Map.empty, Map.empty, Seq.empty)
override def empty: Diff = Diff(transactions = Map.empty, portfolios = Map.empty, issuedAssets = Map.empty, Map.empty, Map.empty, Map.empty, Map.empty)

override def combine(older: Diff, newer: Diff): Diff = Diff(
transactions = older.transactions ++ newer.transactions,
Expand All @@ -99,8 +81,6 @@ object Diff {
aliases = older.aliases ++ newer.aliases,
paymentTransactionIdsByHashes = older.paymentTransactionIdsByHashes ++ newer.paymentTransactionIdsByHashes,
previousExchangeTxs = older.previousExchangeTxs ++ newer.previousExchangeTxs,
patchExtraLeaseIdsToCancel = newer.patchExtraLeaseIdsToCancel ++ older.patchExtraLeaseIdsToCancel
)
leaseState = older.leaseState ++ newer.leaseState)
}

}
16 changes: 14 additions & 2 deletions src/main/scala/com/wavesplatform/state2/StateStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package com.wavesplatform.state2

import java.util

import com.google.common.primitives.Ints
import com.wavesplatform.state2.StateStorage.SnapshotKey
import org.h2.mvstore.{MVMap, MVStore}
import scorex.account.Account

class StateStorage(db: MVStore) {

Expand All @@ -21,15 +24,24 @@ class StateStorage(db: MVStore) {

val accountTransactionIds: util.Map[Array[Byte], List[Array[Byte]]] = db.openMap("accountTransactionIds")

val effectiveBalanceSnapshots: util.Map[(Array[Byte], Int), (Long, Long)] = db.openMap("effectiveBalanceUpdates")
val balanceSnapshots: util.Map[SnapshotKey, (Int, Long, Long)] = db.openMap("balanceSnapshots")

val paymentTransactionHashes: util.Map[Array[Byte], Array[Byte]] = db.openMap("paymentTransactionHashes")

val aliasToAddress: util.Map[String, Array[Byte]] = db.openMap("aliasToAddress")

val exchangeTransactionsByOrder: util.Map[Array[Byte], Set[Array[Byte]]] = db.openMap("exchangeTransactionsByOrder")

val leaseState: util.Map[Array[Byte], Boolean] = db.openMap("leaseState")
val leaseState: util.Map[Array[Byte], Boolean] = db.openMap("leaseState")

val lastUpdateHeight: MVMap[Array[Byte], Int] = db.openMap("lastUpdateHeight")

def commit(): Unit = db.commit()

}

object StateStorage {
type SnapshotKey = Array[Byte]

def snapshotKey(acc: Account, height: Int): SnapshotKey = acc.bytes ++ Ints.toByteArray(height)
}
23 changes: 13 additions & 10 deletions src/main/scala/com/wavesplatform/state2/StateWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,22 @@ class StateWriterImpl(p: StateStorage) extends StateReaderImpl(p) with StateWrit
}
}

measurePersist("effectiveBalanceSnapshots")(blockDiff.effectiveBalanceSnapshots) {
_.foreach { ebs =>
p.effectiveBalanceSnapshots.put((ebs.acc.bytes, ebs.height), (ebs.prevEffectiveBalance, ebs.effectiveBalance))
}
}
measurePersist("effectiveBalanceSnapshots")(blockDiff.snapshots)(
_.foreach { case (acc, snapshotsByHeight) =>
snapshotsByHeight.foreach { case (h, snapshot) =>
p.balanceSnapshots.put(StateStorage.snapshotKey(acc, h), (snapshot.prevHeight, snapshot.balance, snapshot.effectiveBalance))
}
p.lastUpdateHeight.put(acc.bytes, snapshotsByHeight.keys.max)
})

measurePersist("aliases")(blockDiff.txsDiff.aliases) {
_.foreach { case (alias, acc) =>
p.aliasToAddress.put(alias.name, acc.bytes)
}
}

val (effectiveNewLeases, effectiveNewCancels) = blockDiff.txsDiff.effectiveLeaseTxUpdates
measurePersist("effectiveNewLeases")(effectiveNewLeases)(_.foreach(id => p.leaseState.put(id.arr, true)))
measurePersist("effectiveNewCancels")(effectiveNewCancels)(_.foreach(id => p.leaseState.put(id.arr, false)))
measurePersist("lease info")(blockDiff.txsDiff.leaseState)(
_.foreach { case (id, isActive) => p.leaseState.put(id.arr, isActive) })

p.setHeight(p.getHeight + blockDiff.heightDiff)
p.commit()
Expand All @@ -103,11 +105,12 @@ class StateWriterImpl(p: StateStorage) extends StateReaderImpl(p) with StateWrit
p.portfolios.clear()
p.assets.clear()
p.accountTransactionIds.clear()
p.effectiveBalanceSnapshots.clear()
p.balanceSnapshots.clear()
p.paymentTransactionHashes.clear()
p.exchangeTransactionsByOrder.clear()
p.aliasToAddress.clear()
p.leaseState.clear()
p.lastUpdateHeight.clear()

p.setHeight(0)
p.commit()
Expand All @@ -123,7 +126,7 @@ object StateWriterImpl extends ScorexLogging {
(r, t1 - t0)
}

def measurePersist[F[_] <: TraversableOnce[_], A](s: String)(fa: F[A])(f: F[A] => Unit): Unit = {
def measurePersist[F[_] <: TraversableOnce[_], A](s: String)(fa: => F[A])(f: F[A] => Unit): Unit = {
val (_, time) = withTime(f(fa))
log.debug(s"Persisting $s(size=${fa.size}) took ${time}ms")
}
Expand Down
45 changes: 22 additions & 23 deletions src/main/scala/com/wavesplatform/state2/diffs/BlockDiffer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@ import scorex.block.Block
import scorex.transaction.{AssetAcc, ValidationError}
import scorex.utils.ScorexLogging

import scala.collection.SortedMap


object BlockDiffer extends ScorexLogging {

def right(diff: Diff): Either[ValidationError, Diff] = Right(diff)

def apply(settings: FunctionalitySettings)(s: StateReader, block: Block): Either[ValidationError, BlockDiff] = {

val txDiffer = TransactionDiffer(settings, block.timestamp, s.height + 1) _
val currentBlockHeight = s.height + 1

val txDiffer = TransactionDiffer(settings, block.timestamp, currentBlockHeight) _

val accountPortfolioFeesMap: List[(Account, Portfolio)] = block.feesDistribution.toList.map {
case (AssetAcc(account, maybeAssetId), feeVolume) =>
Expand All @@ -29,45 +33,40 @@ object BlockDiffer extends ScorexLogging {
})
}
val feeDiff = Monoid[Diff].combineAll(accountPortfolioFeesMap.map { case (acc, p) =>
new Diff(Map.empty, Map(acc -> p), Map.empty, Map.empty, Map.empty, Map.empty, Seq.empty)
new Diff(Map.empty, Map(acc -> p), Map.empty, Map.empty, Map.empty, Map.empty, Map.empty)
})

val txsDiffEi = block.transactionData.foldLeft(right(feeDiff)) { case (ei, tx) => ei.flatMap(diff =>
txDiffer(new CompositeStateReader(s, diff.asBlockDiff), tx)
.map(newDiff => diff.combine(newDiff)))
}

txsDiffEi
.map(d => if (s.height + 1 == settings.resetEffectiveBalancesAtHeight)
txsDiffEi.map { d =>
val diff = if (currentBlockHeight == settings.resetEffectiveBalancesAtHeight)
Monoid.combine(d, LeasePatch(new CompositeStateReader(s, d.asBlockDiff)))
else d)
.map(diff => {
val effectiveBalanceSnapshots = diff.portfolios
.filter { case (acc, portfolio) => portfolio.effectiveBalance != 0 }
.map { case (acc, portfolio) => (acc, s.accountPortfolio(acc).effectiveBalance, portfolio.effectiveBalance) }
.map { case (acc, oldEffBalance, effBalanceDiff) =>
EffectiveBalanceSnapshot(acc = acc,
height = s.height + 1,
prevEffectiveBalance = if (s.height == 0) (oldEffBalance + effBalanceDiff) else oldEffBalance,
effectiveBalance = oldEffBalance + effBalanceDiff)
}
.toSet

BlockDiff(diff, 1, effectiveBalanceSnapshots)
}
)
else d
val newSnapshots = diff.portfolios
.collect { case (acc, portfolioDiff) if (portfolioDiff.balance != 0 || portfolioDiff.effectiveBalance != 0) =>
val oldPortfolio = s.accountPortfolio(acc)
acc -> SortedMap(currentBlockHeight -> Snapshot(
prevHeight = s.lastUpdateHeight(acc).getOrElse(0),
balance = oldPortfolio.balance + portfolioDiff.balance,
effectiveBalance = oldPortfolio.effectiveBalance + portfolioDiff.effectiveBalance))
}
BlockDiff(diff, 1, newSnapshots)
}
}


def unsafeDiffMany(settings: FunctionalitySettings, log: (String) => Unit = _ => ())(s: StateReader, blocks: Seq[Block]): BlockDiff = {
def unsafeDiffMany(settings: FunctionalitySettings)(s: StateReader, blocks: Seq[Block]): BlockDiff = {
val r = blocks.foldLeft(Monoid[BlockDiff].empty) { case (diff, block) =>
val blockDiff = apply(settings)(new CompositeStateReader(s, diff), block).explicitGet()
if (diff.heightDiff % 1000 == 0) {
log(s"Rebuilt ${diff.heightDiff} blocks out of ${blocks.size}")
log.info(s"Rebuilt ${diff.heightDiff} blocks out of ${blocks.size}")
}
Monoid[BlockDiff].combine(diff, blockDiff)
}
log(s"Rebuild of ${blocks.size} completed")
log.info(s"Rebuild of ${blocks.size} blocks completed")
r
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object LeaseTransactionsDiff {
sender -> Portfolio(-tx.fee, LeaseInfo(0, tx.amount), Map.empty),
recipient -> Portfolio(0, LeaseInfo(tx.amount, 0), Map.empty)
)
Right(Diff(height = height, tx = tx, portfolios = portfolioDiff))
Right(Diff(height = height, tx = tx, portfolios = portfolioDiff, leaseState = Map(EqByteArray(tx.id) -> true)))
}
}
}
Expand All @@ -53,14 +53,14 @@ object LeaseTransactionsDiff {
Right(Monoid.combine(
Map(canceller -> Portfolio(-tx.fee, LeaseInfo(0, -lease.amount), Map.empty)),
Map(recipient -> Portfolio(0, LeaseInfo(-lease.amount, 0), Map.empty))))
} else if (time < settings.allowMultipleLeaseCancelTransactionUntilTimestamp) { // cancel of another acc
} else if (time < settings.allowMultipleLeaseCancelTransactionUntilTimestamp) { // cancel of another acc
Right(Monoid.combine(
Map(canceller -> Portfolio(-tx.fee, LeaseInfo(0, -lease.amount), Map.empty)),
Map(recipient -> Portfolio(0, LeaseInfo(-lease.amount, 0), Map.empty))))
} else Left(TransactionValidationError(tx, s"LeaseTransaction was leased by other sender " +
s"and time=$time > allowMultipleLeaseCancelTransactionUntilTimestamp=${settings.allowMultipleLeaseCancelTransactionUntilTimestamp}"))

} yield Diff(height = height, tx = tx, portfolios = portfolioDiff)
} yield Diff(height = height, tx = tx, portfolios = portfolioDiff, leaseState = Map(EqByteArray(lease.id) -> false))
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ import com.wavesplatform.state2.{Diff, LeaseInfo, Portfolio}
object LeasePatch {
def apply(s: StateReader): Diff = {

def invertLeaseInfo(l: LeaseInfo): LeaseInfo = LeaseInfo(-l.leaseIn, -l.leaseOut )
def invertLeaseInfo(l: LeaseInfo): LeaseInfo = LeaseInfo(-l.leaseIn, -l.leaseOut)

val portfolioUpd = s.accountPortfolios
.filter { case (_, pf) => pf.leaseInfo != LeaseInfo.empty }
.map { case (acc, pf) => acc -> Portfolio(0, invertLeaseInfo(pf.leaseInfo), Map.empty) }
.collect { case (acc, pf) if pf.leaseInfo != LeaseInfo.empty =>
acc -> Portfolio(0, invertLeaseInfo(pf.leaseInfo), Map.empty) }

Diff(transactions = Map.empty,
portfolios = portfolioUpd,
issuedAssets = Map.empty,
aliases = Map.empty,
paymentTransactionIdsByHashes = Map.empty,
previousExchangeTxs = Map.empty,
patchExtraLeaseIdsToCancel = s.activeLeases())
leaseState = s.activeLeases().map(_ -> false).toMap)
}

}
Loading

0 comments on commit e577235

Please sign in to comment.