Skip to content

Commit

Permalink
Merge pull request #759 from wavesplatform/node-438-utx-sync
Browse files Browse the repository at this point in the history
NODE-438: utx cache to move to network message handler
  • Loading branch information
Tolsi authored Dec 25, 2017
2 parents 8057e1c + 1c36489 commit 564d9c1
Show file tree
Hide file tree
Showing 12 changed files with 91 additions and 82 deletions.
11 changes: 10 additions & 1 deletion src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -354,10 +354,19 @@ waves {
# Max microblocks to cache
max-micro-block-cache-size = 50

# Max blocks to cache
# Max blocks to cache
max-block-cache-size = 20
}

# Utx synchronizer caching settings
utx-synchronizer {
# Max microblocks to cache
network-tx-cache-size = 1000000

# Max blocks to cache
network-tx-cache-time = 10s
}

# MicroBlock synchronizer settings
micro-block-synchronizer {
# How much time to wait before a new request of a microblock will be done
Expand Down
7 changes: 4 additions & 3 deletions src/main/scala/com/wavesplatform/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,19 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
}(scheduler)

val network = NetworkServer(settings, lastBlockInfo, history, utxStorage, peerDatabase, allChannels, establishedConnections)
val (signatures, blocks, blockchainScores, checkpoints, microblockInvs, microblockResponses) = network.messages
val (signatures, blocks, blockchainScores, checkpoints, microblockInvs, microblockResponses, transactions) = network.messages

val syncWithChannelClosed = RxScoreObserver(settings.synchronizationSettings.scoreTTL, history.score(), lastScore, blockchainScores, network.closedChannels)
val microblockDatas = MicroBlockSynchronizer(settings.synchronizationSettings.microBlockSynchronizer, peerDatabase, lastBlockInfo.map(_.id), microblockInvs, microblockResponses)
val newBlocks = RxExtensionLoader(settings.synchronizationSettings.maxRollback, settings.synchronizationSettings.synchronizationTimeout,
history, peerDatabase, knownInvalidBlocks, blocks, signatures, syncWithChannelClosed) { case ((c, b)) => processFork(c, b.blocks) }

val microblockSink = microblockDatas.mapTask(scala.Function.tupled(processMicroBlock))
val utxSink = UtxPoolSynchronizer(utxStorage, settings.synchronizationSettings.utxSynchronizerSettings, allChannels, transactions)
val microBlockSink = microblockDatas.mapTask(scala.Function.tupled(processMicroBlock))
val blockSink = newBlocks.mapTask(scala.Function.tupled(processBlock))
val checkpointSink = checkpoints.mapTask { case ((s, c)) => processCheckpoint(Some(s), c) }

Observable.merge(microblockSink, blockSink, checkpointSink).subscribe()(monix.execution.Scheduler.Implicits.global)
Observable.merge(microBlockSink, blockSink, checkpointSink, utxSink).subscribe()(monix.execution.Scheduler.Implicits.global)
miner.scheduleMining()

for (addr <- settings.networkSettings.declaredAddress if settings.networkSettings.uPnPSettings.enable) {
Expand Down
53 changes: 17 additions & 36 deletions src/main/scala/com/wavesplatform/UtxPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package com.wavesplatform
import java.util.concurrent.ConcurrentHashMap

import cats._
import com.google.common.cache.CacheBuilder
import com.wavesplatform.UtxPoolImpl.PessimisticPortfolios
import com.wavesplatform.metrics.Instrumented
import com.wavesplatform.settings.{FunctionalitySettings, UtxSettings}
Expand All @@ -28,7 +27,7 @@ import scala.util.{Left, Right}

trait UtxPool {

def putIfNew(tx: Transaction): Either[ValidationError, Boolean]
def putIfNew(tx: Transaction): Either[ValidationError, Transaction]

def removeAll(txs: Traversable[Transaction]): Unit

Expand All @@ -50,27 +49,21 @@ class UtxPoolImpl(time: Time,
fs: FunctionalitySettings,
utxSettings: UtxSettings) extends ScorexLogging with Instrumented with AutoCloseable with UtxPool {

private val transactions = new ConcurrentHashMap[ByteStr, Transaction]()

private implicit val scheduler: Scheduler = Scheduler.singleThread("utx-pool-cleanup")

private val transactions = new ConcurrentHashMap[ByteStr, Transaction]()
private val pessimisticPortfolios = new PessimisticPortfolios

private val removeInvalid = Task {
val state = stateReader()
val transactionsToRemove = transactions.values.asScala.filter(t => state.containsTransaction(t.id()))
removeAll(transactionsToRemove)
}.delayExecution(utxSettings.cleanupInterval)

private val cleanup = removeInvalid.flatMap(_ => removeInvalid).runAsync
private val cleanup = removeInvalid.flatMap(_ => removeInvalid).runAsyncLogErr

override def close(): Unit = cleanup.cancel()

private lazy val knownTransactions = CacheBuilder
.newBuilder()
.maximumSize(utxSettings.maxSize * 2)
.build[ByteStr, Either[ValidationError, Boolean]]()

private val pessimisticPortfolios = new PessimisticPortfolios

private val utxPoolSizeStats = Kamon.metrics.minMaxCounter("utx-pool-size", 500.millis)
private val processingTimeStats = Kamon.metrics.histogram("utx-transaction-processing-time", KamonTime.Milliseconds)
private val putRequestStats = Kamon.metrics.counter("utx-pool-put-if-new")
Expand All @@ -89,31 +82,20 @@ class UtxPoolImpl(time: Time,
}
}

override def putIfNew(tx: Transaction): Either[ValidationError, Boolean] = {
override def putIfNew(tx: Transaction): Either[ValidationError, Transaction] = {
putRequestStats.increment()
measureSuccessful(processingTimeStats, {
Option(knownTransactions.getIfPresent(tx.id())) match {
case Some(Right(_)) => Right(false)
case Some(Left(er)) => Left(er)
case None =>
val added = for {
_ <- Either.cond(transactions.size < utxSettings.maxSize, (), GenericError("Transaction pool size limit is reached"))
_ <- checkNotBlacklisted(tx)
_ <- feeCalculator.enoughFee(tx)
diff <- {
val s = stateReader()
TransactionDiffer(fs, history.lastBlockTimestamp(), time.correctedTime(), s.height)(s, tx)
}
} yield Option(transactions.putIfAbsent(tx.id(), tx)) match {
case Some(_) => false
case None =>
utxPoolSizeStats.increment()
pessimisticPortfolios.add(tx.id(), diff)
true
}

knownTransactions.put(tx.id(), added)
added
val s = stateReader()
for {
_ <- Either.cond(transactions.size < utxSettings.maxSize, (), GenericError("Transaction pool size limit is reached"))
_ <- checkNotBlacklisted(tx)
_ <- feeCalculator.enoughFee(tx)
diff <- TransactionDiffer(fs, history.lastBlockTimestamp(), time.correctedTime(), s.height)(s, tx)
} yield {
utxPoolSizeStats.increment()
pessimisticPortfolios.add(tx.id(), diff)
transactions.put(tx.id(), tx)
tx
}
})
}
Expand Down Expand Up @@ -145,7 +127,6 @@ class UtxPoolImpl(time: Time,

override def removeAll(txs: Traversable[Transaction]): Unit = {
txs.view.map(_.id()).foreach { id =>
knownTransactions.invalidate(id)
Option(transactions.remove(id)).foreach(_ => utxPoolSizeStats.decrement())
pessimisticPortfolios.remove(id)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.{Channel, ChannelHandlerContext, ChannelInboundHandlerAdapter}
import monix.reactive.subjects.ConcurrentSubject
import scorex.block.Block
import scorex.transaction.History
import scorex.transaction.{History, Transaction}
import scorex.transaction.History.BlockchainScore
import scorex.utils.ScorexLogging

Expand All @@ -19,6 +19,7 @@ class MessageObserver extends ChannelInboundHandlerAdapter with ScorexLogging {
private val blockchainScores = ConcurrentSubject.publish[(Channel, BlockchainScore)]
private val microblockInvs = ConcurrentSubject.publish[(Channel, MicroBlockInv)]
private val microblockResponses = ConcurrentSubject.publish[(Channel, MicroBlockResponse)]
private val transactions = ConcurrentSubject.publish[(Channel, Transaction)]

override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = msg match {
case b: Block => blocks.onNext((ctx.channel(), b))
Expand All @@ -27,16 +28,17 @@ class MessageObserver extends ChannelInboundHandlerAdapter with ScorexLogging {
case c: Checkpoint => checkpoints.onNext((ctx.channel(), c))
case mbInv: MicroBlockInv => microblockInvs.onNext((ctx.channel(), mbInv))
case mb: MicroBlockResponse => microblockResponses.onNext((ctx.channel(), mb))
case tx: Transaction => transactions.onNext((ctx.channel(), tx))
case _ => super.channelRead(ctx, msg)

}
}

object MessageObserver {
type Messages = (ChannelObservable[Signatures], ChannelObservable[Block], ChannelObservable[BlockchainScore], ChannelObservable[Checkpoint], ChannelObservable[MicroBlockInv], ChannelObservable[MicroBlockResponse])
type Messages = (ChannelObservable[Signatures], ChannelObservable[Block], ChannelObservable[BlockchainScore], ChannelObservable[Checkpoint], ChannelObservable[MicroBlockInv], ChannelObservable[MicroBlockResponse], ChannelObservable[Transaction])

def apply(): (MessageObserver, Messages) = {
val mo = new MessageObserver()
(mo, (mo.signatures, mo.blocks, mo.blockchainScores, mo.checkpoints, mo.microblockInvs, mo.microblockResponses))
(mo, (mo.signatures, mo.blocks, mo.blockchainScores, mo.checkpoints, mo.microblockInvs, mo.microblockResponses, mo.transactions))
}
}
3 changes: 0 additions & 3 deletions src/main/scala/com/wavesplatform/network/NetworkServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ object NetworkServer extends ScorexLogging {
val discardingHandler = new DiscardingHandler(lastBlockInfos.map(_.ready))
val peerConnections = new ConcurrentHashMap[PeerKey, Channel](10, 0.9f, 10)
val serverHandshakeHandler = new HandshakeHandler.Server(handshake, peerInfo, peerConnections, peerDatabase, allChannels)
val utxPoolSynchronizer = new UtxPoolSynchronizer(utxPool, allChannels)

def peerSynchronizer: ChannelHandlerAdapter = {
if (settings.networkSettings.enablePeersExchange) {
Expand Down Expand Up @@ -117,7 +116,6 @@ object NetworkServer extends ScorexLogging {
writeErrorHandler,
peerSynchronizer,
historyReplier,
utxPoolSynchronizer,
mesageObserver,
fatalErrorHandler)))
.bind(settings.networkSettings.bindAddress)
Expand Down Expand Up @@ -147,7 +145,6 @@ object NetworkServer extends ScorexLogging {
writeErrorHandler,
peerSynchronizer,
historyReplier,
utxPoolSynchronizer,
mesageObserver,
fatalErrorHandler)))

Expand Down
42 changes: 22 additions & 20 deletions src/main/scala/com/wavesplatform/network/UtxPoolSynchronizer.scala
Original file line number Diff line number Diff line change
@@ -1,31 +1,33 @@
package com.wavesplatform.network

import java.util.concurrent.TimeUnit

import com.google.common.cache.CacheBuilder
import com.wavesplatform.UtxPool
import com.wavesplatform.state2.diffs.TransactionDiffer.TransactionValidationError
import io.netty.channel.ChannelHandler.Sharable
import com.wavesplatform.settings.SynchronizationSettings.UtxSynchronizerSettings
import com.wavesplatform.state2.ByteStr
import io.netty.channel.group.ChannelGroup
import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}
import monix.eval.Task
import monix.execution.Scheduler
import monix.execution.schedulers.SchedulerService
import monix.reactive.Observable
import scorex.transaction.Transaction
import scorex.utils.ScorexLogging

object UtxPoolSynchronizer {
def apply(utx: UtxPool, utxSynchronizerSettings: UtxSynchronizerSettings, allChannels: ChannelGroup, txs: ChannelObservable[Transaction]): Observable[Unit] = {

implicit val scheduler: Scheduler = Scheduler.singleThread("utx-pool-sync")

@Sharable
class UtxPoolSynchronizer(utx: UtxPool, allChannels: ChannelGroup)
extends ChannelInboundHandlerAdapter with ScorexLogging {
val dummy = new Object()

private implicit val scheduler: SchedulerService = Scheduler.singleThread("utx-pool-synchronizer")
val knownTransactions = CacheBuilder
.newBuilder()
.maximumSize(utxSynchronizerSettings.networkTxCacheSize)
.expireAfterWrite(utxSynchronizerSettings.networkTxCacheTime.toMillis, TimeUnit.MILLISECONDS)
.build[ByteStr, Object]

override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = msg match {
case t: Transaction => Task(utx.putIfNew(t) match {
case Right(true) => // log.trace(s"${id(ctx)} Added transaction ${t.id} to UTX pool")
allChannels.broadcastTx(t, Some(ctx.channel()))
case Left(TransactionValidationError(e, _)) => // log.trace(s"${id(ctx)} Error processing transaction ${t.id}: $e")
case Left(e) => // log.trace(s"${id(ctx)} Error processing transaction ${t.id}: $e")
case Right(false) => // log.trace(s"${id(ctx)} TX ${t.id} already known")
}).runAsyncLogErr
case _ => super.channelRead(ctx, msg)
txs.observeOn(scheduler).map { case (channel, tx) => knownTransactions.get(tx.id(), () => {
utx.putIfNew(tx).map(_ => allChannels.broadcast(tx, Some(channel)))
dummy
})
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ case class SynchronizationSettings(maxRollback: Int,
scoreTTL: FiniteDuration,
invalidBlocksStorage: InvalidBlockStorageSettings,
microBlockSynchronizer: MicroblockSynchronizerSettings,
historyReplierSettings: HistoryReplierSettings)
historyReplierSettings: HistoryReplierSettings,
utxSynchronizerSettings: UtxSynchronizerSettings)

object SynchronizationSettings {

Expand All @@ -25,6 +26,9 @@ object SynchronizationSettings {
case class HistoryReplierSettings(maxMicroBlockCacheSize: Int,
maxBlockCacheSize: Int)

case class UtxSynchronizerSettings(networkTxCacheSize: Int,
networkTxCacheTime: FiniteDuration)

val configPath: String = "waves.synchronization"

def fromConfig(config: Config): SynchronizationSettings = {
Expand All @@ -35,8 +39,9 @@ object SynchronizationSettings {
val invalidBlocksStorage = config.as[InvalidBlockStorageSettings](s"$configPath.invalid-blocks-storage")
val microBlockSynchronizer = config.as[MicroblockSynchronizerSettings](s"$configPath.micro-block-synchronizer")
val historyReplierSettings = config.as[HistoryReplierSettings](s"$configPath.history-replier")
val utxSynchronizerSettings = config.as[UtxSynchronizerSettings](s"$configPath.utx-synchronizer")

SynchronizationSettings(maxRollback, maxChainLength, synchronizationTimeout, scoreTTL, invalidBlocksStorage,
microBlockSynchronizer, historyReplierSettings)
microBlockSynchronizer, historyReplierSettings, utxSynchronizerSettings)
}
}
6 changes: 5 additions & 1 deletion src/main/scala/com/wavesplatform/settings/UtxSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@ package com.wavesplatform.settings

import scala.concurrent.duration.FiniteDuration

case class UtxSettings(maxSize: Int, maxTransactionAge: FiniteDuration, blacklistSenderAddresses: Set[String], allowBlacklistedTransferTo: Set[String], cleanupInterval: FiniteDuration)
case class UtxSettings(maxSize: Int,
maxTransactionAge: FiniteDuration,
blacklistSenderAddresses: Set[String],
allowBlacklistedTransferTo: Set[String],
cleanupInterval: FiniteDuration)
14 changes: 5 additions & 9 deletions src/main/scala/scorex/BroadcastRoute.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,11 @@ trait BroadcastRoute {
import scala.concurrent.ExecutionContext.Implicits.global

protected def doBroadcast(v: Either[ValidationError, Transaction]): Future[Either[ApiError, Transaction]] = Future {
(for {
tx <- v
utxResult <- utx.putIfNew(tx)
} yield {
if (utxResult) {
allChannels.broadcastTx(tx, None)
}
v.flatMap(utx.putIfNew)
.left.map(ApiError.fromValidationError)
.right.map { tx =>
allChannels.broadcastTx(tx, None)
tx
}).left.map(ApiError.fromValidationError)

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class PaymentRouteSpec
with NoShrink {

private val utx = stub[UtxPool]
(utx.putIfNew _).when(*).onCall((t: Transaction) => Right(true)).anyNumberOfTimes()
(utx.putIfNew _).when(*).onCall((t: Transaction) => Right(t)).anyNumberOfTimes()
private val allChannels = stub[ChannelGroup]

"accepts payments" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class OrderBookActorSpecification extends TestKit(ActorSystem("MatcherTest"))
val functionalitySettings = TestFunctionalitySettings.Stub

val utx = stub[UtxPool]
(utx.putIfNew _).when(*).onCall((tx: Transaction) => Right(true))
(utx.putIfNew _).when(*).onCall((tx: Transaction) => Right(tx))
val allChannels = stub[ChannelGroup]
actor = system.actorOf(Props(new OrderBookActor(pair, orderHistoryRef, Coeval.now(storedState),
wallet, utx, allChannels, settings, history, functionalitySettings) with RestartableActor))
Expand Down Expand Up @@ -281,7 +281,7 @@ class OrderBookActorSpecification extends TestKit(ActorSystem("MatcherTest"))
(pool.putIfNew _).when(*).onCall { (tx: Transaction) =>
tx match {
case om: ExchangeTransaction if om.buyOrder == ord2 => Left(ValidationError.GenericError("test"))
case _: Transaction => Right(true)
case _: Transaction => Right(tx)
}
}
val allChannels = stub[ChannelGroup]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.wavesplatform.settings

import com.typesafe.config.ConfigFactory
import com.wavesplatform.network.InvalidBlockStorageImpl.InvalidBlockStorageSettings
import com.wavesplatform.settings.SynchronizationSettings.{HistoryReplierSettings, MicroblockSynchronizerSettings}
import com.wavesplatform.settings.SynchronizationSettings.{HistoryReplierSettings, MicroblockSynchronizerSettings, UtxSynchronizerSettings}
import org.scalatest.{FlatSpec, Matchers}

import scala.concurrent.duration._
Expand All @@ -28,6 +28,15 @@ class SynchronizationSettingsSpecification extends FlatSpec with Matchers {
| max-block-cache-size = 2
| }
|
| # History replier caching settings
| utx-synchronizer {
| # Max microblocks to cache
| network-tx-cache-size = 7000000
|
| # Max blocks to cache
| network-tx-cache-time = 70s
| }
|
| micro-block-synchronizer {
| wait-response-timeout: 5s
| processed-micro-blocks-cache-timeout: 2s
Expand Down Expand Up @@ -55,5 +64,8 @@ class SynchronizationSettingsSpecification extends FlatSpec with Matchers {
maxMicroBlockCacheSize = 5,
maxBlockCacheSize = 2
)

settings.utxSynchronizerSettings shouldBe UtxSynchronizerSettings(7000000, 70.seconds)

}
}

0 comments on commit 564d9c1

Please sign in to comment.