Skip to content

Commit

Permalink
Merge pull request #636 from wavesplatform/node-309-parameterize-in-m…
Browse files Browse the repository at this point in the history
…em-diff-nel

Node 309 parameterize in mem diff
  • Loading branch information
alexeykiselev authored Oct 30, 2017
2 parents ff72ab3 + 89c20ee commit c067958
Show file tree
Hide file tree
Showing 65 changed files with 475 additions and 434 deletions.
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,6 @@ object Dependencies {
)

lazy val fp = Seq(
"org.typelevel" %% "cats-core" % "0.9.0"
"org.typelevel" %% "cats-core" % "1.0.0-MF"
)
}
4 changes: 2 additions & 2 deletions src/it/scala/com/wavesplatform/it/BlacklistTestSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.Random

class BlacklistTestSuite extends FreeSpec with Matchers with BeforeAndAfterAll with CancelAfterFailure {
class BlacklistTestSuite extends FreeSpec with Matchers with BeforeAndAfterAll with CancelAfterFailure with ReportingTestName{

import BlacklistTestSuite._

private val docker = Docker(getClass)
private val nodes = Configs.map(docker.startNode)
override val nodes = Configs.map(docker.startNode)
private val richestNode = nodes.head
private val otherNodes = nodes.tail

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
package com.wavesplatform.it

import org.scalatest.{BeforeAndAfterAll, Suite}
import org.scalatest._
import scorex.utils.ScorexLogging

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global

trait IntegrationNodesInitializationAndStopping extends BeforeAndAfterAll { this: Suite =>
trait IntegrationNodesInitializationAndStopping extends BeforeAndAfterAll with ScorexLogging with ReportingTestName {
th: Suite =>
def docker: Docker

def nodes: Seq[Node]

override protected def beforeAll(): Unit = {
super.beforeAll()

log.debug("Waiting for nodes to start")
Await.result(Future.traverse(nodes)(_.status), 1.minute)

log.debug("Waiting for nodes to connect")
Await.result(
for {
count <- Future.traverse(nodes)(_.waitForPeers(nodes.size - 1))
Expand All @@ -31,4 +34,7 @@ trait IntegrationNodesInitializationAndStopping extends BeforeAndAfterAll { this
private def ensureNoDeadlock() = {
Await.result(Future.traverse(nodes)(_.height), 7.seconds)
}

}


4 changes: 2 additions & 2 deletions src/it/scala/com/wavesplatform/it/MatcherTestSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.Random

class MatcherTestSuite extends FreeSpec with Matchers with BeforeAndAfterAll {
class MatcherTestSuite extends FreeSpec with Matchers with BeforeAndAfterAll with ReportingTestName{

import MatcherTestSuite._

private val docker = Docker(getClass)

private val nodes = Configs.map(docker.startNode)
override val nodes = Configs.map(docker.startNode)

private val matcherNode = nodes.head
private val aliceNode = nodes(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import scala.concurrent._
import scala.concurrent.duration.DurationInt
import scala.util.Random

class NetworkUniqueConnectionsTestSuite extends FreeSpec with Matchers with BeforeAndAfterAll {
class NetworkUniqueConnectionsTestSuite extends FreeSpec with Matchers with BeforeAndAfterAll{

private val docker = Docker(getClass)

Expand Down
22 changes: 22 additions & 0 deletions src/it/scala/com/wavesplatform/it/ReportingTestName.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.wavesplatform.it

import org.scalatest.{Args, Status, Suite, SuiteMixin}
import scorex.waves.http.DebugMessage

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._


trait ReportingTestName extends SuiteMixin {
th: Suite =>
def nodes: Seq[Node]

abstract override def runTest(testName: String, args: Args): Status = {
import scala.concurrent.ExecutionContext.Implicits.global

Await.result(Future.traverse(nodes)(_.printDebugMessage(DebugMessage(s"---------- Test '$testName' started ----------"))), 10.seconds)
val r = super.runTest(testName, args)
Await.result(Future.traverse(nodes)(_.printDebugMessage(DebugMessage(s"---------- Test `$testName` finished ----------"))), 10.seconds)
r
}
}
16 changes: 5 additions & 11 deletions src/it/scala/com/wavesplatform/it/RollbackSpecSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class RollbackSpecSuite extends FreeSpec with ScalaFutures with IntegrationPatie
with Matchers with TransferSending with IntegrationNodesInitializationAndStopping {
override val docker = Docker(getClass)
// there are nodes with big and small balances to reduce the number of forks
override val nodes: Seq[Node] = Configs.map(docker.startNode)
override val nodes: Seq[Node] = configs.map(docker.startNode)

private val transactionsCount = 190

Expand Down Expand Up @@ -64,16 +64,10 @@ class RollbackSpecSuite extends FreeSpec with ScalaFutures with IntegrationPatie
all(infos) shouldEqual infos.head
infos.head
})

_ <- processRequests(requests)

_ <- traverse(nodes)(_.waitFor[Int](_.utxSize, _ == 0, 1.second))

_ <- nodes.head.waitFor[Int](_.utxSize, _ == 0, 1.second)
_ <- traverse(nodes)(_.rollback(startHeight, returnToUTX = false))

_ <- traverse(nodes)(_.utx).map(utxs => {
all(utxs) shouldBe 'empty
})
_ <- nodes.head.utx.map( _ shouldBe 'empty )

hashAfterApply <- nodes.head.waitForDebugInfoAt(startHeight + waitBlocks).map(_.stateHash)
} yield {
Expand Down Expand Up @@ -108,6 +102,6 @@ object RollbackSpecSuite {
""".stripMargin
)

val Configs: Seq[Config] = Seq(dockerConfigs.last) :+
nonGeneratingNodesConfig.withFallback(Random.shuffle(dockerConfigs.init).head)
val configs: Seq[Config] = Seq(dockerConfigs.last,
nonGeneratingNodesConfig.withFallback(Random.shuffle(dockerConfigs.init).head))
}
32 changes: 16 additions & 16 deletions src/it/scala/com/wavesplatform/it/TestFourNodesSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.wavesplatform.it
import com.typesafe.config.ConfigFactory
import com.wavesplatform.it.transactions._
import com.wavesplatform.it.transactions.debug._
import org.scalatest.{BeforeAndAfterAll, FreeSpec, Matchers, Suite}
import org.scalatest._
import scorex.utils.ScorexLogging

import scala.collection.JavaConverters._
Expand All @@ -13,7 +13,7 @@ import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.Random

class TestFourNodesSuite extends FreeSpec with BeforeAndAfterAll with ScorexLogging with Matchers {
class TestFourNodesSuite extends FreeSpec with BeforeAndAfterAll with ScorexLogging with Matchers with ReportingTestName{

private val nonGeneratingPeerConfig = ConfigFactory.parseString(
"""
Expand All @@ -27,17 +27,17 @@ class TestFourNodesSuite extends FreeSpec with BeforeAndAfterAll with ScorexLogg
private val nodeConfigs = Seq(nonGeneratingPeerConfig.withFallback(dockerConfigs.head)) ++ dockerConfigs.tail


private val allNodes = nodeConfigs.map(docker.startNode)
private val notMiner = allNodes.head
override val nodes: Seq[Node] = nodeConfigs.map(docker.startNode)
private val notMiner = nodes.head

override protected def beforeAll(): Unit = {
log.debug("Waiting for nodes to start")
Await.result(Future.traverse(allNodes)(_.status), 1.minute)
Await.result(Future.traverse(nodes)(_.status), 1.minute)

log.debug("Waiting for nodes to connect")
val peersCounts = Await.result(
for {
count <- Future.traverse(allNodes)(_.waitForPeers(nodesCount - 1))
count <- Future.traverse(nodes)(_.waitForPeers(nodesCount - 1))
} yield count, 1.minute
)

Expand All @@ -49,16 +49,16 @@ class TestFourNodesSuite extends FreeSpec with BeforeAndAfterAll with ScorexLogg
}

override def nestedSuites: IndexedSeq[Suite] = IndexedSeq(
new WideStateGenerationSpec(allNodes),
new ValidChainGenerationSpec(allNodes),
new BurnTransactionSpecification(allNodes, notMiner),
new IssueTransactionSpecification(allNodes, notMiner),
new LeasingTransactionsSpecification(allNodes, notMiner),
new PaymentTransactionSpecification(allNodes, notMiner),
new ReissueTransactionSpecification(allNodes, notMiner),
new TransferTransactionSpecification(allNodes, notMiner),
new AliasTransactionSpecification(allNodes, notMiner),
new DebugPortfoliosSpecification(allNodes, notMiner)
new WideStateGenerationSpec(nodes),
new ValidChainGenerationSpec(nodes),
new BurnTransactionSpecification(nodes, notMiner),
new IssueTransactionSpecification(nodes, notMiner),
new LeasingTransactionsSpecification(nodes, notMiner),
new PaymentTransactionSpecification(nodes, notMiner),
new ReissueTransactionSpecification(nodes, notMiner),
new TransferTransactionSpecification(nodes, notMiner),
new AliasTransactionSpecification(nodes, notMiner),
new DebugPortfoliosSpecification(nodes, notMiner)
)
override protected def afterAll(): Unit = docker.close()
}
15 changes: 12 additions & 3 deletions src/it/scala/com/wavesplatform/it/api/NodeApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import scorex.api.http.leasing.{LeaseCancelRequest, LeaseRequest}
import scorex.api.http.PeersApiRoute.{ConnectReq, connectFormat}
import scorex.transaction.assets.exchange.Order
import scorex.utils.{LoggerFacade, ScorexLogging}
import scorex.waves.http.DebugApiRoute.portfolioFormat
import scorex.waves.http.RollbackParams
import scorex.waves.http.DebugApiRoute._
import scorex.waves.http.{DebugMessage, RollbackParams}
import scorex.waves.http.DebugMessage._

import scala.compat.java8.FutureConverters._
import scala.concurrent.ExecutionContext.Implicits.global
Expand Down Expand Up @@ -68,11 +69,17 @@ trait NodeApi {
.build()
}

def postJsonWihApiKey[A: Writes](path: String, body: A): Future[Response] = retrying {
_post(s"http://$restAddress:$nodeRestPort$path")
.setHeader("api_key", "integration-test-rest-api")
.setHeader("Content-type", "application/json").setBody(stringify(toJson(body)))
.build()
}

def post(url: String, port: Int, path: String, f: RequestBuilder => RequestBuilder = identity): Future[Response] =
retrying(f(
_post(s"$url:$port$path").setHeader("api_key", "integration-test-rest-api")
).build())

def postJson[A: Writes](path: String, body: A): Future[Response] =
post(path, stringify(toJson(body)))

Expand All @@ -83,6 +90,8 @@ trait NodeApi {
def blacklist(networkIpAddress: String, hostNetworkPort: Int): Future[Unit] =
post("/debug/blacklist", s"$networkIpAddress:$hostNetworkPort").map(_ => ())

def printDebugMessage(db: DebugMessage): Future[Response] = postJsonWihApiKey("/debug/print", db)

def connectedPeers: Future[Seq[Peer]] = get("/peers/connected").map { r =>
(Json.parse(r.getResponseBody) \ "peers").as[Seq[Peer]]
}
Expand Down
7 changes: 5 additions & 2 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,11 @@ waves {
store-transactions-in-state = true
checkpoint-file = ${waves.directory}"/data/checkpoint.dat"

# Min buffer size. Fast rollback is possible up to this value.
minimum-in-memory-diff-blocks = 55
# Min block buffer size. Fast rollback is possible up to (in-memory-chunk-size * in-mem-chunks-amount).
in-memory-chunk-size = 5

# Max amount of block buffers in memory. Fast rollback is possible up to (in-memory-chunk-size * in-mem-chunks-amount).
in-mem-chunks-amount = 10

# Blockchain type. Could be TESTNET | MAINNET | CUSTOM. Default value is TESTNET.
type = TESTNET
Expand Down
3 changes: 3 additions & 0 deletions src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
<logger name="io.swagger" level="INFO"/>
<logger name="org.asynchttpclient" level="INFO"/>

<logger name="sun.rmi.loader" level="INFO"/>
<logger name="sun.rmi.transport.tcp" level="INFO"/>

<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/wavesplatform/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
NodeApiRoute(settings.restAPISettings, () => this.shutdown()),
ActivationApiRoute(settings.restAPISettings, settings.blockchainSettings.functionalitySettings, settings.featuresSettings, history, featureProvider),
AssetsBroadcastApiRoute(settings.restAPISettings, utxStorage, allChannels),
LeaseApiRoute(settings.restAPISettings, wallet, utxStorage, allChannels, stateReader, time),
LeaseApiRoute(settings.restAPISettings, wallet, utxStorage, allChannels, time),
LeaseBroadcastApiRoute(settings.restAPISettings, utxStorage, allChannels),
AliasApiRoute(settings.restAPISettings, wallet, utxStorage, allChannels, time, stateReader),
AliasBroadcastApiRoute(settings.restAPISettings, utxStorage, allChannels)
Expand Down
8 changes: 4 additions & 4 deletions src/main/scala/com/wavesplatform/Coordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.wavesplatform.metrics._
import com.wavesplatform.network.{BlockCheckpoint, Checkpoint}
import com.wavesplatform.settings.{BlockchainSettings, FunctionalitySettings, WavesSettings}
import com.wavesplatform.state2._
import com.wavesplatform.state2.reader.StateReader
import com.wavesplatform.state2.reader.SnapshotStateReader
import kamon.Kamon
import org.influxdb.dto.Point
import scorex.block.{Block, MicroBlock}
Expand Down Expand Up @@ -35,7 +35,7 @@ object Coordinator extends ScorexLogging with Instrumented {

lazy val forkApplicationResultEi: Either[ValidationError, BigInt] = {
val firstDeclined = extension.view.map { b =>
b -> appendBlock(checkpoint, history, blockchainUpdater, stateReader, utxStorage, time, settings.blockchainSettings, featureProvider)(b).right.map {
b -> appendBlock(checkpoint, history, blockchainUpdater, stateReader(), utxStorage, time, settings.blockchainSettings, featureProvider)(b).right.map {
_.foreach(bh => BlockStats.applied(b, BlockStats.Source.Ext, bh))
}
}
Expand Down Expand Up @@ -111,7 +111,7 @@ object Coordinator extends ScorexLogging with Instrumented {
if (history.contains(newBlock)) Right(None)
else for {
_ <- Either.cond(history.heightOf(newBlock.reference).exists(_ >= history.height() - 1), (), GenericError("Can process either new top block or current top block's competitor"))
maybeBaseHeight <- appendBlock(checkpoint, history, blockchainUpdater, stateReader, utxStorage, time, settings, featureProvider)(newBlock)
maybeBaseHeight <- appendBlock(checkpoint, history, blockchainUpdater, stateReader(), utxStorage, time, settings, featureProvider)(newBlock)
} yield maybeBaseHeight map (_ => history.score())
})

Expand All @@ -130,7 +130,7 @@ object Coordinator extends ScorexLogging with Instrumented {
s"generator's effective balance $effectiveBalance is less that required for generation")

private def appendBlock(checkpoint: CheckpointService, history: History, blockchainUpdater: BlockchainUpdater,
stateReader: StateReader, utxStorage: UtxPool, time: Time, settings: BlockchainSettings,
stateReader: SnapshotStateReader, utxStorage: UtxPool, time: Time, settings: BlockchainSettings,
featureProvider: FeatureProvider)(block: Block): Either[ValidationError, Option[Int]] = for {
_ <- Either.cond(checkpoint.isBlockValid(block.signerData.signature, history.height() + 1), (),
GenericError(s"Block $block at height ${history.height() + 1} is not valid w.r.t. checkpoint"))
Expand Down
15 changes: 8 additions & 7 deletions src/main/scala/com/wavesplatform/UtxPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import com.wavesplatform.UtxPool.PessimisticPortfolios
import com.wavesplatform.metrics.Instrumented
import com.wavesplatform.settings.{FunctionalitySettings, UtxSettings}
import com.wavesplatform.state2.diffs.TransactionDiffer
import com.wavesplatform.state2.reader.{CompositeStateReader, StateReader}
import com.wavesplatform.state2.{ByteStr, Diff, Portfolio}
import com.wavesplatform.state2.reader.CompositeStateReader.composite
import com.wavesplatform.state2.{ByteStr, Diff, Portfolio, StateReader}
import kamon.Kamon
import kamon.metric.instrument.{Time => KamonTime}
import scorex.account.Address
Expand Down Expand Up @@ -63,10 +63,11 @@ class UtxPool(time: Time,
case Some(Right(_)) => Right(false)
case Some(Left(er)) => Left(er)
case None =>
val s = stateReader()
val res = for {
_ <- Either.cond(transactions.size < utxSettings.maxSize, (), GenericError("Transaction pool size limit is reached"))
_ <- feeCalculator.enoughFee(tx)
diff <- TransactionDiffer(fs, history.lastBlockTimestamp(), time.correctedTime(), stateReader.height)(stateReader, tx)
diff <- TransactionDiffer(fs, history.lastBlockTimestamp(), time.correctedTime(), s.height)(s, tx)
} yield {
utxPoolSizeStats.increment()
pessimisticPortfolios.add(tx.id, diff)
Expand All @@ -90,7 +91,7 @@ class UtxPool(time: Time,
}

def portfolio(addr: Address): Portfolio = {
val base = stateReader.accountPortfolio(addr)
val base = stateReader().accountPortfolio(addr)
val foundInUtx = pessimisticPortfolios.getAggregated(addr)

Monoid.combine(base, foundInUtx)
Expand All @@ -104,17 +105,17 @@ class UtxPool(time: Time,

def transactionById(transactionId: ByteStr): Option[Transaction] = Option(transactions.get(transactionId))


def packUnconfirmed(max: Int, sortInBlock: Boolean): Seq[Transaction] = {
val currentTs = time.correctedTime()
removeExpired(currentTs)
val differ = TransactionDiffer(fs, history.lastBlockTimestamp(), currentTs, stateReader.height) _
val s = stateReader()
val differ = TransactionDiffer(fs, history.lastBlockTimestamp(), currentTs, s.height) _
val (invalidTxs, reversedValidTxs, _) = transactions
.values.asScala.toSeq
.sorted(TransactionsOrdering.InUTXPool)
.foldLeft((Seq.empty[ByteStr], Seq.empty[Transaction], Monoid[Diff].empty)) {
case ((invalid, valid, diff), tx) if valid.size <= max =>
differ(new CompositeStateReader(stateReader, diff.asBlockDiff), tx) match {
differ(composite(s, diff.asBlockDiff), tx) match {
case Right(newDiff) if valid.size < max =>
(invalid, tx +: valid, Monoid.combine(diff, newDiff))
case Right(_) =>
Expand Down
5 changes: 2 additions & 3 deletions src/main/scala/com/wavesplatform/history/StorageFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import java.util.concurrent.locks.{ReentrantReadWriteLock => RWL}

import com.wavesplatform.features.FeatureProvider
import com.wavesplatform.settings.WavesSettings
import com.wavesplatform.state2.reader.StateReader
import com.wavesplatform.state2.{BlockchainUpdaterImpl, StateStorage, StateWriterImpl}
import com.wavesplatform.state2._
import scorex.transaction._

import scala.util.{Success, Try}
Expand All @@ -28,7 +27,7 @@ object StorageFactory {
ss <- createStateStorage(historyWriter, settings.blockchainSettings.stateFile, settings.mvstorePageSplitSize)
stateWriter = new StateWriterImpl(ss, settings.blockchainSettings.storeTransactionsInState, lock)
} yield {
val bcu = BlockchainUpdaterImpl(stateWriter, historyWriter, settings, settings.blockchainSettings.minimumInMemoryDiffSize, lock)
val bcu = BlockchainUpdaterImpl(stateWriter, historyWriter, settings, lock)
val history: NgHistory with DebugNgHistory with FeatureProvider = bcu.historyReader
(history, history, stateWriter, bcu.bestLiquidState, bcu, bcu)
}
Expand Down
Loading

0 comments on commit c067958

Please sign in to comment.