From 87003891a07da50fcc9cd58136f302c7d4da8028 Mon Sep 17 00:00:00 2001 From: Ilya Smagin Date: Mon, 25 Dec 2017 16:37:07 +0300 Subject: [PATCH 1/3] NODE-438: utx cache to move to network message handler --- src/main/resources/application.conf | 11 +++- .../scala/com/wavesplatform/Application.scala | 7 +-- .../scala/com/wavesplatform/UtxPool.scala | 53 ++++++------------- .../network/MessageObserver.scala | 8 +-- .../network/MicroBlockSynchronizer.scala | 2 +- .../wavesplatform/network/NetworkServer.scala | 3 -- .../network/UtxPoolSynchronizer.scala | 42 ++++++++------- .../settings/SynchronizationSettings.scala | 9 +++- .../wavesplatform/settings/UtxSettings.scala | 6 ++- src/main/scala/scorex/BroadcastRoute.scala | 14 ++--- .../wavesplatform/http/PaymentRouteSpec.scala | 2 +- .../market/OrderBookActorSpecification.scala | 4 +- ...SynchronizationSettingsSpecification.scala | 14 ++++- 13 files changed, 92 insertions(+), 83 deletions(-) diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 94bac509256..b44e97a19a6 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -354,10 +354,19 @@ waves { # Max microblocks to cache max-micro-block-cache-size = 50 - # Max blocks to cache + # Max blocks to cache max-block-cache-size = 20 } + # History replier caching settings + utx-synchronizer { + # Max microblocks to cache + network-tx-cache-size = 1000000 + + # Max blocks to cache + network-tx-cache-time = 10s + } + # MicroBlock synchronizer settings micro-block-synchronizer { # How much time to wait before a new request of a microblock will be done diff --git a/src/main/scala/com/wavesplatform/Application.scala b/src/main/scala/com/wavesplatform/Application.scala index 468e9b852f3..2c56c3934b1 100644 --- a/src/main/scala/com/wavesplatform/Application.scala +++ b/src/main/scala/com/wavesplatform/Application.scala @@ -107,18 +107,19 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con }(scheduler) val network = NetworkServer(settings, lastBlockInfo, history, utxStorage, peerDatabase, allChannels, establishedConnections) - val (signatures, blocks, blockchainScores, checkpoints, microblockInvs, microblockResponses) = network.messages + val (signatures, blocks, blockchainScores, checkpoints, microblockInvs, microblockResponses, transactions) = network.messages val syncWithChannelClosed = RxScoreObserver(settings.synchronizationSettings.scoreTTL, history.score(), lastScore, blockchainScores, network.closedChannels) val microblockDatas = MicroBlockSynchronizer(settings.synchronizationSettings.microBlockSynchronizer, peerDatabase, lastBlockInfo.map(_.id), microblockInvs, microblockResponses) val newBlocks = RxExtensionLoader(settings.synchronizationSettings.maxRollback, settings.synchronizationSettings.synchronizationTimeout, history, peerDatabase, knownInvalidBlocks, blocks, signatures, syncWithChannelClosed) { case ((c, b)) => processFork(c, b.blocks) } - val microblockSink = microblockDatas.mapTask(scala.Function.tupled(processMicroBlock)) + val utxSink = UtxPoolSynchronizer(utxStorage, settings.synchronizationSettings.utxSynchronizerSettings, allChannels, transactions) + val microBlockSink = microblockDatas.mapTask(scala.Function.tupled(processMicroBlock)) val blockSink = newBlocks.mapTask(scala.Function.tupled(processBlock)) val checkpointSink = checkpoints.mapTask { case ((s, c)) => processCheckpoint(Some(s), c) } - Observable.merge(microblockSink, blockSink, checkpointSink).subscribe()(monix.execution.Scheduler.Implicits.global) + Observable.merge(microBlockSink, blockSink, checkpointSink, utxSink).subscribe()(monix.execution.Scheduler.Implicits.global) miner.scheduleMining() for (addr <- settings.networkSettings.declaredAddress if settings.networkSettings.uPnPSettings.enable) { diff --git a/src/main/scala/com/wavesplatform/UtxPool.scala b/src/main/scala/com/wavesplatform/UtxPool.scala index 0a28defed9e..5314b3eb2c9 100644 --- a/src/main/scala/com/wavesplatform/UtxPool.scala +++ b/src/main/scala/com/wavesplatform/UtxPool.scala @@ -3,7 +3,6 @@ package com.wavesplatform import java.util.concurrent.ConcurrentHashMap import cats._ -import com.google.common.cache.CacheBuilder import com.wavesplatform.UtxPoolImpl.PessimisticPortfolios import com.wavesplatform.metrics.Instrumented import com.wavesplatform.settings.{FunctionalitySettings, UtxSettings} @@ -28,7 +27,7 @@ import scala.util.{Left, Right} trait UtxPool { - def putIfNew(tx: Transaction): Either[ValidationError, Boolean] + def putIfNew(tx: Transaction): Either[ValidationError, Transaction] def removeAll(txs: Traversable[Transaction]): Unit @@ -50,27 +49,21 @@ class UtxPoolImpl(time: Time, fs: FunctionalitySettings, utxSettings: UtxSettings) extends ScorexLogging with Instrumented with AutoCloseable with UtxPool { - private val transactions = new ConcurrentHashMap[ByteStr, Transaction]() - private implicit val scheduler: Scheduler = Scheduler.singleThread("utx-pool-cleanup") + private val transactions = new ConcurrentHashMap[ByteStr, Transaction]() + private val pessimisticPortfolios = new PessimisticPortfolios + private val removeInvalid = Task { val state = stateReader() val transactionsToRemove = transactions.values.asScala.filter(t => state.containsTransaction(t.id())) removeAll(transactionsToRemove) }.delayExecution(utxSettings.cleanupInterval) - private val cleanup = removeInvalid.flatMap(_ => removeInvalid).runAsync + private val cleanup = removeInvalid.flatMap(_ => removeInvalid).runAsyncLogErr override def close(): Unit = cleanup.cancel() - private lazy val knownTransactions = CacheBuilder - .newBuilder() - .maximumSize(utxSettings.maxSize * 2) - .build[ByteStr, Either[ValidationError, Boolean]]() - - private val pessimisticPortfolios = new PessimisticPortfolios - private val utxPoolSizeStats = Kamon.metrics.minMaxCounter("utx-pool-size", 500.millis) private val processingTimeStats = Kamon.metrics.histogram("utx-transaction-processing-time", KamonTime.Milliseconds) private val putRequestStats = Kamon.metrics.counter("utx-pool-put-if-new") @@ -89,31 +82,20 @@ class UtxPoolImpl(time: Time, } } - override def putIfNew(tx: Transaction): Either[ValidationError, Boolean] = { + override def putIfNew(tx: Transaction): Either[ValidationError, Transaction] = { putRequestStats.increment() measureSuccessful(processingTimeStats, { - Option(knownTransactions.getIfPresent(tx.id())) match { - case Some(Right(_)) => Right(false) - case Some(Left(er)) => Left(er) - case None => - val added = for { - _ <- Either.cond(transactions.size < utxSettings.maxSize, (), GenericError("Transaction pool size limit is reached")) - _ <- checkNotBlacklisted(tx) - _ <- feeCalculator.enoughFee(tx) - diff <- { - val s = stateReader() - TransactionDiffer(fs, history.lastBlockTimestamp(), time.correctedTime(), s.height)(s, tx) - } - } yield Option(transactions.putIfAbsent(tx.id(), tx)) match { - case Some(_) => false - case None => - utxPoolSizeStats.increment() - pessimisticPortfolios.add(tx.id(), diff) - true - } - - knownTransactions.put(tx.id(), added) - added + val s = stateReader() + for { + _ <- Either.cond(transactions.size < utxSettings.maxSize, (), GenericError("Transaction pool size limit is reached")) + _ <- checkNotBlacklisted(tx) + _ <- feeCalculator.enoughFee(tx) + diff <- TransactionDiffer(fs, history.lastBlockTimestamp(), time.correctedTime(), s.height)(s, tx) + } yield { + utxPoolSizeStats.increment() + pessimisticPortfolios.add(tx.id(), diff) + transactions.put(tx.id(), tx) + tx } }) } @@ -145,7 +127,6 @@ class UtxPoolImpl(time: Time, override def removeAll(txs: Traversable[Transaction]): Unit = { txs.view.map(_.id()).foreach { id => - knownTransactions.invalidate(id) Option(transactions.remove(id)).foreach(_ => utxPoolSizeStats.decrement()) pessimisticPortfolios.remove(id) } diff --git a/src/main/scala/com/wavesplatform/network/MessageObserver.scala b/src/main/scala/com/wavesplatform/network/MessageObserver.scala index d6df301e811..16a78916431 100644 --- a/src/main/scala/com/wavesplatform/network/MessageObserver.scala +++ b/src/main/scala/com/wavesplatform/network/MessageObserver.scala @@ -4,7 +4,7 @@ import io.netty.channel.ChannelHandler.Sharable import io.netty.channel.{Channel, ChannelHandlerContext, ChannelInboundHandlerAdapter} import monix.reactive.subjects.ConcurrentSubject import scorex.block.Block -import scorex.transaction.History +import scorex.transaction.{History, Transaction} import scorex.transaction.History.BlockchainScore import scorex.utils.ScorexLogging @@ -19,6 +19,7 @@ class MessageObserver extends ChannelInboundHandlerAdapter with ScorexLogging { private val blockchainScores = ConcurrentSubject.publish[(Channel, BlockchainScore)] private val microblockInvs = ConcurrentSubject.publish[(Channel, MicroBlockInv)] private val microblockResponses = ConcurrentSubject.publish[(Channel, MicroBlockResponse)] + private val transactions = ConcurrentSubject.publish[(Channel, Transaction)] override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = msg match { case b: Block => blocks.onNext((ctx.channel(), b)) @@ -27,16 +28,17 @@ class MessageObserver extends ChannelInboundHandlerAdapter with ScorexLogging { case c: Checkpoint => checkpoints.onNext((ctx.channel(), c)) case mbInv: MicroBlockInv => microblockInvs.onNext((ctx.channel(), mbInv)) case mb: MicroBlockResponse => microblockResponses.onNext((ctx.channel(), mb)) + case tx: Transaction => transactions.onNext((ctx.channel(), tx)) case _ => super.channelRead(ctx, msg) } } object MessageObserver { - type Messages = (ChannelObservable[Signatures], ChannelObservable[Block], ChannelObservable[BlockchainScore], ChannelObservable[Checkpoint], ChannelObservable[MicroBlockInv], ChannelObservable[MicroBlockResponse]) + type Messages = (ChannelObservable[Signatures], ChannelObservable[Block], ChannelObservable[BlockchainScore], ChannelObservable[Checkpoint], ChannelObservable[MicroBlockInv], ChannelObservable[MicroBlockResponse], ChannelObservable[Transaction]) def apply(): (MessageObserver, Messages) = { val mo = new MessageObserver() - (mo, (mo.signatures, mo.blocks, mo.blockchainScores, mo.checkpoints, mo.microblockInvs, mo.microblockResponses)) + (mo, (mo.signatures, mo.blocks, mo.blockchainScores, mo.checkpoints, mo.microblockInvs, mo.microblockResponses, mo.transactions)) } } diff --git a/src/main/scala/com/wavesplatform/network/MicroBlockSynchronizer.scala b/src/main/scala/com/wavesplatform/network/MicroBlockSynchronizer.scala index 8d47ac9e13f..85561bf2de6 100644 --- a/src/main/scala/com/wavesplatform/network/MicroBlockSynchronizer.scala +++ b/src/main/scala/com/wavesplatform/network/MicroBlockSynchronizer.scala @@ -70,7 +70,7 @@ object MicroBlockSynchronizer { peerDatabase.blacklistAndClose(ch, err.toString) case Right(_) => microBlockOwners.get(totalSig, () => MSet.empty) += ch - nextInvs.get(prevSig, { () => + nextInvs. get(prevSig, { () => BlockStats.inv(mbInv, ch) mbInv }) diff --git a/src/main/scala/com/wavesplatform/network/NetworkServer.scala b/src/main/scala/com/wavesplatform/network/NetworkServer.scala index a3be6d34f90..729ef0722a7 100644 --- a/src/main/scala/com/wavesplatform/network/NetworkServer.scala +++ b/src/main/scala/com/wavesplatform/network/NetworkServer.scala @@ -89,7 +89,6 @@ object NetworkServer extends ScorexLogging { val discardingHandler = new DiscardingHandler(lastBlockInfos.map(_.ready)) val peerConnections = new ConcurrentHashMap[PeerKey, Channel](10, 0.9f, 10) val serverHandshakeHandler = new HandshakeHandler.Server(handshake, peerInfo, peerConnections, peerDatabase, allChannels) - val utxPoolSynchronizer = new UtxPoolSynchronizer(utxPool, allChannels) def peerSynchronizer: ChannelHandlerAdapter = { if (settings.networkSettings.enablePeersExchange) { @@ -117,7 +116,6 @@ object NetworkServer extends ScorexLogging { writeErrorHandler, peerSynchronizer, historyReplier, - utxPoolSynchronizer, mesageObserver, fatalErrorHandler))) .bind(settings.networkSettings.bindAddress) @@ -147,7 +145,6 @@ object NetworkServer extends ScorexLogging { writeErrorHandler, peerSynchronizer, historyReplier, - utxPoolSynchronizer, mesageObserver, fatalErrorHandler))) diff --git a/src/main/scala/com/wavesplatform/network/UtxPoolSynchronizer.scala b/src/main/scala/com/wavesplatform/network/UtxPoolSynchronizer.scala index ddd1ce28c52..4e1d8af40cc 100644 --- a/src/main/scala/com/wavesplatform/network/UtxPoolSynchronizer.scala +++ b/src/main/scala/com/wavesplatform/network/UtxPoolSynchronizer.scala @@ -1,31 +1,33 @@ package com.wavesplatform.network +import java.util.concurrent.TimeUnit + +import com.google.common.cache.CacheBuilder import com.wavesplatform.UtxPool -import com.wavesplatform.state2.diffs.TransactionDiffer.TransactionValidationError -import io.netty.channel.ChannelHandler.Sharable +import com.wavesplatform.settings.SynchronizationSettings.UtxSynchronizerSettings +import com.wavesplatform.state2.ByteStr import io.netty.channel.group.ChannelGroup -import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter} -import monix.eval.Task import monix.execution.Scheduler -import monix.execution.schedulers.SchedulerService +import monix.reactive.Observable import scorex.transaction.Transaction -import scorex.utils.ScorexLogging +object UtxPoolSynchronizer { + def apply(utx: UtxPool, utxSynchronizerSettings: UtxSynchronizerSettings, allChannels: ChannelGroup, txs: ChannelObservable[Transaction]): Observable[Unit] = { + + implicit val scheduler: Scheduler = Scheduler.singleThread("utx-pool-sync") -@Sharable -class UtxPoolSynchronizer(utx: UtxPool, allChannels: ChannelGroup) - extends ChannelInboundHandlerAdapter with ScorexLogging { + val dummy = new Object() - private implicit val scheduler: SchedulerService = Scheduler.singleThread("utx-pool-synchronizer") + val knownTransactions = CacheBuilder + .newBuilder() + .maximumSize(utxSynchronizerSettings.networkTxCacheSize) + .expireAfterWrite(utxSynchronizerSettings.networkTxCacheTime.toMillis, TimeUnit.MILLISECONDS) + .build[ByteStr, Object] - override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = msg match { - case t: Transaction => Task(utx.putIfNew(t) match { - case Right(true) => // log.trace(s"${id(ctx)} Added transaction ${t.id} to UTX pool") - allChannels.broadcastTx(t, Some(ctx.channel())) - case Left(TransactionValidationError(e, _)) => // log.trace(s"${id(ctx)} Error processing transaction ${t.id}: $e") - case Left(e) => // log.trace(s"${id(ctx)} Error processing transaction ${t.id}: $e") - case Right(false) => // log.trace(s"${id(ctx)} TX ${t.id} already known") - }).runAsyncLogErr - case _ => super.channelRead(ctx, msg) + txs.observeOn(scheduler).map { case (channel, tx) => knownTransactions.get(tx.id(), () => { + utx.putIfNew(tx).map(_ => allChannels.broadcast(tx, Some(channel))) + dummy + }) + } } -} +} \ No newline at end of file diff --git a/src/main/scala/com/wavesplatform/settings/SynchronizationSettings.scala b/src/main/scala/com/wavesplatform/settings/SynchronizationSettings.scala index c64243cf42b..b2251666b1a 100644 --- a/src/main/scala/com/wavesplatform/settings/SynchronizationSettings.scala +++ b/src/main/scala/com/wavesplatform/settings/SynchronizationSettings.scala @@ -14,7 +14,8 @@ case class SynchronizationSettings(maxRollback: Int, scoreTTL: FiniteDuration, invalidBlocksStorage: InvalidBlockStorageSettings, microBlockSynchronizer: MicroblockSynchronizerSettings, - historyReplierSettings: HistoryReplierSettings) + historyReplierSettings: HistoryReplierSettings, + utxSynchronizerSettings: UtxSynchronizerSettings) object SynchronizationSettings { @@ -25,6 +26,9 @@ object SynchronizationSettings { case class HistoryReplierSettings(maxMicroBlockCacheSize: Int, maxBlockCacheSize: Int) + case class UtxSynchronizerSettings(networkTxCacheSize: Int, + networkTxCacheTime: FiniteDuration) + val configPath: String = "waves.synchronization" def fromConfig(config: Config): SynchronizationSettings = { @@ -35,8 +39,9 @@ object SynchronizationSettings { val invalidBlocksStorage = config.as[InvalidBlockStorageSettings](s"$configPath.invalid-blocks-storage") val microBlockSynchronizer = config.as[MicroblockSynchronizerSettings](s"$configPath.micro-block-synchronizer") val historyReplierSettings = config.as[HistoryReplierSettings](s"$configPath.history-replier") + val utxSynchronizerSettings = config.as[UtxSynchronizerSettings](s"$configPath.utx-synchronizer") SynchronizationSettings(maxRollback, maxChainLength, synchronizationTimeout, scoreTTL, invalidBlocksStorage, - microBlockSynchronizer, historyReplierSettings) + microBlockSynchronizer, historyReplierSettings, utxSynchronizerSettings) } } \ No newline at end of file diff --git a/src/main/scala/com/wavesplatform/settings/UtxSettings.scala b/src/main/scala/com/wavesplatform/settings/UtxSettings.scala index 3169a4b8494..f04f3fb6f40 100644 --- a/src/main/scala/com/wavesplatform/settings/UtxSettings.scala +++ b/src/main/scala/com/wavesplatform/settings/UtxSettings.scala @@ -2,4 +2,8 @@ package com.wavesplatform.settings import scala.concurrent.duration.FiniteDuration -case class UtxSettings(maxSize: Int, maxTransactionAge: FiniteDuration, blacklistSenderAddresses: Set[String], allowBlacklistedTransferTo: Set[String], cleanupInterval: FiniteDuration) +case class UtxSettings(maxSize: Int, + maxTransactionAge: FiniteDuration, + blacklistSenderAddresses: Set[String], + allowBlacklistedTransferTo: Set[String], + cleanupInterval: FiniteDuration) diff --git a/src/main/scala/scorex/BroadcastRoute.scala b/src/main/scala/scorex/BroadcastRoute.scala index 5f37380b901..ecce68215be 100644 --- a/src/main/scala/scorex/BroadcastRoute.scala +++ b/src/main/scala/scorex/BroadcastRoute.scala @@ -16,15 +16,11 @@ trait BroadcastRoute { import scala.concurrent.ExecutionContext.Implicits.global protected def doBroadcast(v: Either[ValidationError, Transaction]): Future[Either[ApiError, Transaction]] = Future { - (for { - tx <- v - utxResult <- utx.putIfNew(tx) - } yield { - if (utxResult) { - allChannels.broadcastTx(tx, None) - } + v.flatMap(utx.putIfNew) + .left.map(ApiError.fromValidationError) + .right.map { tx => + allChannels.broadcastTx(tx, None) tx - }).left.map(ApiError.fromValidationError) - + } } } diff --git a/src/test/scala/com/wavesplatform/http/PaymentRouteSpec.scala b/src/test/scala/com/wavesplatform/http/PaymentRouteSpec.scala index 8de88221607..f7c1ed29db6 100644 --- a/src/test/scala/com/wavesplatform/http/PaymentRouteSpec.scala +++ b/src/test/scala/com/wavesplatform/http/PaymentRouteSpec.scala @@ -21,7 +21,7 @@ class PaymentRouteSpec with NoShrink { private val utx = stub[UtxPool] - (utx.putIfNew _).when(*).onCall((t: Transaction) => Right(true)).anyNumberOfTimes() + (utx.putIfNew _).when(*).onCall((t: Transaction) => Right(t)).anyNumberOfTimes() private val allChannels = stub[ChannelGroup] "accepts payments" in { diff --git a/src/test/scala/com/wavesplatform/matcher/market/OrderBookActorSpecification.scala b/src/test/scala/com/wavesplatform/matcher/market/OrderBookActorSpecification.scala index aac2d659da7..7ef2c2a1736 100755 --- a/src/test/scala/com/wavesplatform/matcher/market/OrderBookActorSpecification.scala +++ b/src/test/scala/com/wavesplatform/matcher/market/OrderBookActorSpecification.scala @@ -97,7 +97,7 @@ class OrderBookActorSpecification extends TestKit(ActorSystem("MatcherTest")) val functionalitySettings = TestFunctionalitySettings.Stub val utx = stub[UtxPool] - (utx.putIfNew _).when(*).onCall((tx: Transaction) => Right(true)) + (utx.putIfNew _).when(*).onCall((tx: Transaction) => Right(tx)) val allChannels = stub[ChannelGroup] actor = system.actorOf(Props(new OrderBookActor(pair, orderHistoryRef, Coeval.now(storedState), wallet, utx, allChannels, settings, history, functionalitySettings) with RestartableActor)) @@ -281,7 +281,7 @@ class OrderBookActorSpecification extends TestKit(ActorSystem("MatcherTest")) (pool.putIfNew _).when(*).onCall { (tx: Transaction) => tx match { case om: ExchangeTransaction if om.buyOrder == ord2 => Left(ValidationError.GenericError("test")) - case _: Transaction => Right(true) + case _: Transaction => Right(tx) } } val allChannels = stub[ChannelGroup] diff --git a/src/test/scala/com/wavesplatform/settings/SynchronizationSettingsSpecification.scala b/src/test/scala/com/wavesplatform/settings/SynchronizationSettingsSpecification.scala index 1b851936fed..452f0af16cb 100644 --- a/src/test/scala/com/wavesplatform/settings/SynchronizationSettingsSpecification.scala +++ b/src/test/scala/com/wavesplatform/settings/SynchronizationSettingsSpecification.scala @@ -2,7 +2,7 @@ package com.wavesplatform.settings import com.typesafe.config.ConfigFactory import com.wavesplatform.network.InvalidBlockStorageImpl.InvalidBlockStorageSettings -import com.wavesplatform.settings.SynchronizationSettings.{HistoryReplierSettings, MicroblockSynchronizerSettings} +import com.wavesplatform.settings.SynchronizationSettings.{HistoryReplierSettings, MicroblockSynchronizerSettings, UtxSynchronizerSettings} import org.scalatest.{FlatSpec, Matchers} import scala.concurrent.duration._ @@ -28,6 +28,15 @@ class SynchronizationSettingsSpecification extends FlatSpec with Matchers { | max-block-cache-size = 2 | } | + | # History replier caching settings + | utx-synchronizer { + | # Max microblocks to cache + | network-tx-cache-size = 7000000 + | + | # Max blocks to cache + | network-tx-cache-time = 70s + | } + | | micro-block-synchronizer { | wait-response-timeout: 5s | processed-micro-blocks-cache-timeout: 2s @@ -55,5 +64,8 @@ class SynchronizationSettingsSpecification extends FlatSpec with Matchers { maxMicroBlockCacheSize = 5, maxBlockCacheSize = 2 ) + + settings.utxSynchronizerSettings shouldBe UtxSynchronizerSettings(7000000, 70.seconds) + } } From 315bea3fe7bb72d26930810c9561cd44746a8ffd Mon Sep 17 00:00:00 2001 From: Ilya Smagin Date: Mon, 25 Dec 2017 16:48:08 +0300 Subject: [PATCH 2/3] NODE-438: utx cache to move to network message handler --- src/main/resources/application.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index b44e97a19a6..e03be964dd9 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -358,7 +358,7 @@ waves { max-block-cache-size = 20 } - # History replier caching settings + # Utx synchronizer caching settings utx-synchronizer { # Max microblocks to cache network-tx-cache-size = 1000000 From 1c364899ae6680f65b97119ea123728beece55dd Mon Sep 17 00:00:00 2001 From: Sergey Tolmachev Date: Mon, 25 Dec 2017 17:01:13 +0300 Subject: [PATCH 3/3] space --- .../com/wavesplatform/network/MicroBlockSynchronizer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/wavesplatform/network/MicroBlockSynchronizer.scala b/src/main/scala/com/wavesplatform/network/MicroBlockSynchronizer.scala index 85561bf2de6..8d47ac9e13f 100644 --- a/src/main/scala/com/wavesplatform/network/MicroBlockSynchronizer.scala +++ b/src/main/scala/com/wavesplatform/network/MicroBlockSynchronizer.scala @@ -70,7 +70,7 @@ object MicroBlockSynchronizer { peerDatabase.blacklistAndClose(ch, err.toString) case Right(_) => microBlockOwners.get(totalSig, () => MSet.empty) += ch - nextInvs. get(prevSig, { () => + nextInvs.get(prevSig, { () => BlockStats.inv(mbInv, ch) mbInv })