diff --git a/.github/workflows/check-pr.yaml b/.github/workflows/check-pr.yaml index 0cb67e1254d..46003b84e25 100644 --- a/.github/workflows/check-pr.yaml +++ b/.github/workflows/check-pr.yaml @@ -27,7 +27,7 @@ jobs: path: ~/.cache/coursier key: coursier-cache - name: Check PR - run: sbt --mem 6144 --batch ";checkPR;completeQaseRun" + run: sbt --mem 4096 --batch ";checkPR;completeQaseRun" env: QASE_ENABLE: true QASE_RUN_NAME: checkPR diff --git a/grpc-server/src/main/resources/application.conf b/grpc-server/src/main/resources/application.conf index 29b5cd92629..d8f39a89228 100644 --- a/grpc-server/src/main/resources/application.conf +++ b/grpc-server/src/main/resources/application.conf @@ -2,10 +2,12 @@ waves { grpc { host = localhost port = 6870 + worker-threads = 4 } blockchain-updates { grpc-port = 6881 min-keep-alive = 5m + worker-threads = 4 } } diff --git a/grpc-server/src/main/scala/com/wavesplatform/api/grpc/GRPCServerExtension.scala b/grpc-server/src/main/scala/com/wavesplatform/api/grpc/GRPCServerExtension.scala index 5c8841eaa48..efd9aaa2300 100644 --- a/grpc-server/src/main/scala/com/wavesplatform/api/grpc/GRPCServerExtension.scala +++ b/grpc-server/src/main/scala/com/wavesplatform/api/grpc/GRPCServerExtension.scala @@ -1,7 +1,6 @@ package com.wavesplatform.api.grpc -import java.net.InetSocketAddress -import scala.concurrent.Future +import com.google.common.util.concurrent.ThreadFactoryBuilder import com.wavesplatform.extensions.{Extension, Context as ExtensionContext} import com.wavesplatform.settings.GRPCSettings import com.wavesplatform.utils.ScorexLogging @@ -9,15 +8,22 @@ import io.grpc.Server import io.grpc.netty.NettyServerBuilder import io.grpc.protobuf.services.ProtoReflectionService import monix.execution.Scheduler +import net.ceedubs.ficus.readers.namemappers.implicits.hyphenCase import net.ceedubs.ficus.Ficus.* import net.ceedubs.ficus.readers.ArbitraryTypeReader.* +import java.net.InetSocketAddress +import java.util.concurrent.Executors +import scala.concurrent.Future + class GRPCServerExtension(context: ExtensionContext) extends Extension with ScorexLogging { - private implicit val apiScheduler: Scheduler = Scheduler(context.actorSystem.dispatcher) - private val settings = context.settings.config.as[GRPCSettings]("waves.grpc") + private val settings = context.settings.config.as[GRPCSettings]("waves.grpc") + private val executor = Executors.newFixedThreadPool(settings.workerThreads, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("grpc-server-worker-%d").build()) + private implicit val apiScheduler: Scheduler = Scheduler(executor) private val bindAddress = new InetSocketAddress(settings.host, settings.port) private val server: Server = NettyServerBuilder .forAddress(bindAddress) + .executor(executor) .addService(TransactionsApiGrpc.bindService(new TransactionsApiGrpcImpl(context.blockchain, context.transactionsApi), apiScheduler)) .addService(BlocksApiGrpc.bindService(new BlocksApiGrpcImpl(context.blocksApi), apiScheduler)) .addService(AccountsApiGrpc.bindService(new AccountsApiGrpcImpl(context.accountsApi), apiScheduler)) @@ -34,6 +40,6 @@ class GRPCServerExtension(context: ExtensionContext) extends Extension with Scor override def shutdown(): Future[Unit] = { log.debug("Shutting down gRPC server") server.shutdown() - Future(server.awaitTermination())(context.actorSystem.dispatcher) + Future(server.awaitTermination())(apiScheduler) } } diff --git a/grpc-server/src/main/scala/com/wavesplatform/events/BlockchainUpdates.scala b/grpc-server/src/main/scala/com/wavesplatform/events/BlockchainUpdates.scala index 270a99986ff..7cf25acde8c 100644 --- a/grpc-server/src/main/scala/com/wavesplatform/events/BlockchainUpdates.scala +++ b/grpc-server/src/main/scala/com/wavesplatform/events/BlockchainUpdates.scala @@ -12,7 +12,9 @@ import io.grpc.protobuf.services.ProtoReflectionService import io.grpc.{Metadata, Server, ServerStreamTracer, Status} import monix.execution.schedulers.SchedulerService import monix.execution.{ExecutionModel, Scheduler, UncaughtExceptionReporter} +import net.ceedubs.ficus.readers.namemappers.implicits.hyphenCase import net.ceedubs.ficus.Ficus.* +import net.ceedubs.ficus.readers.ArbitraryTypeReader.* import org.rocksdb.RocksDB import java.net.InetSocketAddress @@ -22,15 +24,14 @@ import scala.concurrent.duration.* import scala.util.Try class BlockchainUpdates(private val context: Context) extends Extension with ScorexLogging with BlockchainUpdateTriggers { + private[this] val settings = context.settings.config.as[BlockchainUpdatesSettings]("waves.blockchain-updates") private[this] implicit val scheduler: SchedulerService = Schedulers.fixedPool( - sys.runtime.availableProcessors(), + settings.workerThreads, "blockchain-updates", UncaughtExceptionReporter(err => log.error("Uncaught exception in BlockchainUpdates scheduler", err)), ExecutionModel.Default, rejectedExecutionHandler = new akka.dispatch.SaneRejectedExecutionHandler ) - - private[this] val settings = context.settings.config.as[BlockchainUpdatesSettings]("waves.blockchain-updates") private[this] val rdb = RocksDB.open(context.settings.directory + "/blockchain-updates") private[this] val repo = new Repo(rdb, context.blocksApi) diff --git a/grpc-server/src/main/scala/com/wavesplatform/events/Repo.scala b/grpc-server/src/main/scala/com/wavesplatform/events/Repo.scala index e1435c0bd4c..2da480a6eb3 100644 --- a/grpc-server/src/main/scala/com/wavesplatform/events/Repo.scala +++ b/grpc-server/src/main/scala/com/wavesplatform/events/Repo.scala @@ -172,6 +172,7 @@ class Repo(db: RocksDB, blocksApi: CommonBlocksApi)(implicit s: Scheduler) } override def onMicroBlockRollback(blockchainBefore: Blockchain, toBlockId: ByteStr): Unit = monitor.synchronized { + log.trace(s"Rolling back liquid microblock to $toBlockId") liquidState match { case Some(ls) => val discardedMicroBlocks = if (ls.keyBlock.id == toBlockId) { diff --git a/grpc-server/src/main/scala/com/wavesplatform/events/settings/BlockchainUpdatesSettings.scala b/grpc-server/src/main/scala/com/wavesplatform/events/settings/BlockchainUpdatesSettings.scala index 0aa02475422..eae111d2e29 100644 --- a/grpc-server/src/main/scala/com/wavesplatform/events/settings/BlockchainUpdatesSettings.scala +++ b/grpc-server/src/main/scala/com/wavesplatform/events/settings/BlockchainUpdatesSettings.scala @@ -1,13 +1,9 @@ package com.wavesplatform.events.settings import scala.concurrent.duration.FiniteDuration -import net.ceedubs.ficus.Ficus._ -import net.ceedubs.ficus.readers.ArbitraryTypeReader.arbitraryTypeValueReader -import net.ceedubs.ficus.readers.{Generated, ValueReader} -import net.ceedubs.ficus.readers.namemappers.implicits.hyphenCase -case class BlockchainUpdatesSettings(grpcPort: Int, minKeepAlive: FiniteDuration) - -object BlockchainUpdatesSettings { - implicit val valueReader: Generated[ValueReader[BlockchainUpdatesSettings]] = arbitraryTypeValueReader -} +case class BlockchainUpdatesSettings( + grpcPort: Int, + minKeepAlive: FiniteDuration, + workerThreads: Int +) diff --git a/node/src/main/resources/application.conf b/node/src/main/resources/application.conf index 8501dbf4b1b..1664080de5e 100644 --- a/node/src/main/resources/application.conf +++ b/node/src/main/resources/application.conf @@ -47,8 +47,9 @@ waves { api-cache-size=16M write-buffer-size = 128M enable-statistics = false - # When enabled, after writing every SST file of the default column family, reopen it and read all the keys. - paranoid-checks = off + allow-mmap-reads = off + parallelism = 2 + max-open-files = 100 } } diff --git a/node/src/main/scala/com/wavesplatform/Application.scala b/node/src/main/scala/com/wavesplatform/Application.scala index 01d727aaa95..900e9e26134 100644 --- a/node/src/main/scala/com/wavesplatform/Application.scala +++ b/node/src/main/scala/com/wavesplatform/Application.scala @@ -47,7 +47,7 @@ import kamon.Kamon import kamon.instrumentation.executor.ExecutorInstrumentation import monix.eval.{Coeval, Task} import monix.execution.schedulers.{ExecutorScheduler, SchedulerService} -import monix.execution.{Scheduler, UncaughtExceptionReporter} +import monix.execution.{ExecutionModel, Scheduler, UncaughtExceptionReporter} import monix.reactive.Observable import monix.reactive.subjects.ConcurrentSubject import org.influxdb.dto.Point @@ -237,7 +237,6 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con override def utx: UtxPool = utxStorage override def broadcastTransaction(tx: Transaction): TracedResult[ValidationError, Boolean] = Await.result(transactionPublisher.validateAndBroadcast(tx, None), Duration.Inf) // TODO: Replace with async if possible - override def actorSystem: ActorSystem = app.actorSystem override def utxEvents: Observable[UtxEvent] = app.utxEvents override val transactionsApi: CommonTransactionsApi = CommonTransactionsApi( @@ -371,7 +370,8 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con val heavyRequestScheduler = Scheduler( if (settings.config.getBoolean("kamon.enable")) ExecutorInstrumentation.instrument(heavyRequestExecutor, "heavy-request-executor") - else heavyRequestExecutor + else heavyRequestExecutor, + ExecutionModel.BatchedExecution(100) ) val serverRequestTimeout = FiniteDuration(settings.config.getDuration("akka.http.server.request-timeout").getSeconds, TimeUnit.SECONDS) diff --git a/node/src/main/scala/com/wavesplatform/Explorer.scala b/node/src/main/scala/com/wavesplatform/Explorer.scala index b51f99e5e97..6d34a209601 100644 --- a/node/src/main/scala/com/wavesplatform/Explorer.scala +++ b/node/src/main/scala/com/wavesplatform/Explorer.scala @@ -231,7 +231,7 @@ object Explorer extends ScorexLogging { val result = new util.HashMap[Short, Stats] Seq(rdb.db.getDefaultColumnFamily, rdb.txHandle.handle, rdb.txSnapshotHandle.handle, rdb.txMetaHandle.handle).foreach { cf => Using.Manager { use => - val ro = use(new ReadOptions().setTotalOrderSeek(true)) + val ro = use(new ReadOptions().setTotalOrderSeek(true).setVerifyChecksums(false)) val iterator = use(rdb.db.newIterator(cf, ro)) iterator.seekToFirst() @@ -373,7 +373,7 @@ object Explorer extends ScorexLogging { log.info("Counting transaction IDs") var counter = 0 Using.Manager { use => - val ro = use(new ReadOptions().setTotalOrderSeek(true)) + val ro = use(new ReadOptions().setTotalOrderSeek(true).setVerifyChecksums(false)) val iter = use(rdb.db.newIterator(rdb.txMetaHandle.handle, ro)) iter.seekToFirst() // iter.seek(KeyTags.TransactionMetaById.prefixBytes) // Doesn't work, because of CappedPrefixExtractor(10) diff --git a/node/src/main/scala/com/wavesplatform/Importer.scala b/node/src/main/scala/com/wavesplatform/Importer.scala index 2b959f8030e..529287f9317 100644 --- a/node/src/main/scala/com/wavesplatform/Importer.scala +++ b/node/src/main/scala/com/wavesplatform/Importer.scala @@ -1,6 +1,5 @@ package com.wavesplatform -import akka.actor.ActorSystem import cats.implicits.catsSyntaxOption import cats.syntax.apply.* import com.google.common.io.ByteStreams @@ -123,7 +122,6 @@ object Importer extends ScorexLogging { extensionTime: Time, utxPool: UtxPool, rdb: RDB, - extensionActorSystem: ActorSystem ): Seq[Extension] = if (wavesSettings.extensions.isEmpty) Seq.empty else { @@ -139,7 +137,6 @@ object Importer extends ScorexLogging { override def broadcastTransaction(tx: Transaction): TracedResult[ValidationError, Boolean] = TracedResult.wrapE(Left(GenericError("Not implemented during import"))) - override def actorSystem: ActorSystem = extensionActorSystem override def utxEvents: Observable[UtxEvent] = Observable.empty override def transactionsApi: CommonTransactionsApi = CommonTransactionsApi( @@ -348,7 +345,6 @@ object Importer extends ScorexLogging { val scheduler = Schedulers.singleThread("appender") val time = new NTP(settings.ntpServer) - val actorSystem = ActorSystem("wavesplatform-import") val rdb = RDB.open(settings.dbSettings) val (blockchainUpdater, rdbWriter) = StorageFactory(settings, rdb, time, BlockchainUpdateTriggers.combined(triggers)) @@ -357,7 +353,7 @@ object Importer extends ScorexLogging { val extAppender: (Block, Option[BlockSnapshotResponse]) => Task[Either[ValidationError, BlockApplyResult]] = BlockAppender(blockchainUpdater, time, utxPool, pos, scheduler, importOptions.verify, txSignParCheck = false) - val extensions = initExtensions(settings, blockchainUpdater, scheduler, time, utxPool, rdb, actorSystem) + val extensions = initExtensions(settings, blockchainUpdater, scheduler, time, utxPool, rdb) checkGenesis(settings, blockchainUpdater, Miner.Disabled) val blocksFileOffset = @@ -392,7 +388,6 @@ object Importer extends ScorexLogging { sys.addShutdownHook { quit = true - Await.result(actorSystem.terminate(), 10.second) lock.synchronized { if (blockchainUpdater.isFeatureActivated(BlockchainFeatures.NG) && blockchainUpdater.liquidBlockMeta.nonEmpty) { // Force store liquid block in rocksdb diff --git a/node/src/main/scala/com/wavesplatform/api/http/RouteTimeout.scala b/node/src/main/scala/com/wavesplatform/api/http/RouteTimeout.scala index 8649789f35b..295d8b66b8a 100644 --- a/node/src/main/scala/com/wavesplatform/api/http/RouteTimeout.scala +++ b/node/src/main/scala/com/wavesplatform/api/http/RouteTimeout.scala @@ -2,17 +2,20 @@ package com.wavesplatform.api.http import akka.NotUsed import akka.http.scaladsl.marshalling.{ToResponseMarshallable, ToResponseMarshaller} -import akka.http.scaladsl.server.Directives.{complete, handleExceptions} +import akka.http.scaladsl.server.Directives.{complete, handleExceptions, withExecutionContext} import akka.http.scaladsl.server.{ExceptionHandler, Route} import akka.stream.scaladsl.Source +import com.typesafe.scalalogging.LazyLogging import monix.eval.Task import monix.execution.Scheduler import monix.reactive.Observable +import scala.concurrent.ExecutionContext.fromExecutor import scala.concurrent.TimeoutException import scala.concurrent.duration.FiniteDuration -class RouteTimeout(timeout: FiniteDuration)(implicit sc: Scheduler) extends ApiMarshallers { +class RouteTimeout(timeout: FiniteDuration)(implicit sc: Scheduler) extends ApiMarshallers with LazyLogging { + private val ece = fromExecutor(sc, t => logger.warn(s"Exception in RouteTimeout", t)) private val handler = ExceptionHandler { case _: TimeoutException => complete(ApiError.ServerRequestTimeout) } @@ -27,10 +30,15 @@ class RouteTimeout(timeout: FiniteDuration)(implicit sc: Scheduler) extends ApiM .map(Source(_).map(f))(sc) } - def executeFromObservable[T](observable: Observable[T])(implicit m: ToResponseMarshaller[Source[T, NotUsed]]): Route = { - handleExceptions(handler) & complete(Source.fromPublisher(observable.toReactivePublisher(sc)).initialTimeout(timeout)) - } + def executeFromObservable[T](observable: Observable[T])(implicit m: ToResponseMarshaller[Source[T, NotUsed]]): Route = + withExecutionContext(ece) { + handleExceptions(handler) & + complete(Source.fromPublisher(observable.toReactivePublisher(sc)).initialTimeout(timeout)) + } - def execute[T](task: Task[T])(f: (Task[T], Scheduler) => ToResponseMarshallable): Route = - handleExceptions(handler) & complete(f(task.timeout(timeout), sc)) + def execute[T](task: Task[T])(f: (Task[T], Scheduler) => ToResponseMarshallable): Route = { + withExecutionContext(ece) { + handleExceptions(handler) & complete(f(task.timeout(timeout), sc)) + } + } } diff --git a/node/src/main/scala/com/wavesplatform/api/http/TransactionJsonSerializer.scala b/node/src/main/scala/com/wavesplatform/api/http/TransactionJsonSerializer.scala index 11d8287c09d..95ac4f70118 100644 --- a/node/src/main/scala/com/wavesplatform/api/http/TransactionJsonSerializer.scala +++ b/node/src/main/scala/com/wavesplatform/api/http/TransactionJsonSerializer.scala @@ -221,7 +221,7 @@ final case class TransactionJsonSerializer(blockchain: Blockchain) { gen.writeStartObject() gen.writeStringField("dApp", inv.dApp.toString) gen.writeValueField("call")(callSerializer(numbersAsString).serialize(inv.call, _, serializers)) - gen.writeArrayField("payments", inv.payments)(attachedPaymentSerializer(numbersAsString), serializers) + gen.writeArrayField("payment", inv.payments)(attachedPaymentSerializer(numbersAsString), serializers) gen.writeValueField("stateChanges")(invokeScriptResultSerializer(numbersAsString).serialize(inv.stateChanges, _, serializers)) gen.writeEndObject() } @@ -258,7 +258,10 @@ final case class TransactionJsonSerializer(blockchain: Blockchain) { gen.writeNumberField("type", tx.tpe.id, numbersAsString) gen.writeStringField("id", tx.id().toString) gen.writeNumberField("fee", tx.assetFee._2, numbersAsString) - tx.assetFee._1.maybeBase58Repr.foreach(gen.writeStringField("feeAssetId", _)) + tx.feeAssetId match { + case IssuedAsset(id) => gen.writeStringField("feeAssetId", id.toString) + case Asset.Waves => gen.writeNullField("feeAssetId") + } gen.writeNumberField("timestamp", tx.timestamp, numbersAsString) gen.writeNumberField("version", tx.version, numbersAsString) if (PBSince.affects(tx)) gen.writeNumberField("chainId", tx.chainId, numbersAsString) diff --git a/node/src/main/scala/com/wavesplatform/api/http/assets/AssetsApiRoute.scala b/node/src/main/scala/com/wavesplatform/api/http/assets/AssetsApiRoute.scala index f7c447ad04a..b76fd77abd0 100644 --- a/node/src/main/scala/com/wavesplatform/api/http/assets/AssetsApiRoute.scala +++ b/node/src/main/scala/com/wavesplatform/api/http/assets/AssetsApiRoute.scala @@ -18,13 +18,7 @@ import com.wavesplatform.api.common.{CommonAccountsApi, CommonAssetsApi} import com.wavesplatform.api.http.* import com.wavesplatform.api.http.ApiError.* import com.wavesplatform.api.http.StreamSerializerUtils.* -import com.wavesplatform.api.http.assets.AssetsApiRoute.{ - AssetDetails, - AssetInfo, - DistributionParams, - assetDetailsSerializer, - assetDistributionSerializer -} +import com.wavesplatform.api.http.assets.AssetsApiRoute.* import com.wavesplatform.common.state.ByteStr import com.wavesplatform.lang.ValidationError import com.wavesplatform.settings.RestAPISettings @@ -38,6 +32,7 @@ import com.wavesplatform.transaction.{EthereumTransaction, TxTimestamp, TxVersio import com.wavesplatform.utils.Time import com.wavesplatform.wallet.Wallet import io.netty.util.concurrent.DefaultThreadFactory +import monix.eval.Task import monix.execution.Scheduler import monix.reactive.Observable import play.api.libs.json.* @@ -93,11 +88,10 @@ case class AssetsApiRoute( } } ~ pathPrefix("details") { (anyParam("id", limit = settings.assetDetailsLimit) & parameter("full".as[Boolean] ? false)) { (ids, full) => - val result = Either - .cond(ids.nonEmpty, (), AssetIdNotSpecified) - .map(_ => multipleDetails(ids.toList, full)) - - complete(result) + if (ids.isEmpty) complete(AssetIdNotSpecified) + else { + routeTimeout.executeToFuture(Task(multipleDetails(ids.toList, full))) + } } ~ (get & path(AssetId) & parameter("full".as[Boolean] ? false)) { (assetId, full) => singleDetails(assetId, full) } diff --git a/node/src/main/scala/com/wavesplatform/block/MicroBlock.scala b/node/src/main/scala/com/wavesplatform/block/MicroBlock.scala index faf6d9ae921..ff480fedf1d 100644 --- a/node/src/main/scala/com/wavesplatform/block/MicroBlock.scala +++ b/node/src/main/scala/com/wavesplatform/block/MicroBlock.scala @@ -17,7 +17,7 @@ case class MicroBlock( sender: PublicKey, transactionData: Seq[Transaction], reference: BlockId, - totalResBlockSig: BlockId, + totalResBlockSig: ByteStr, signature: ByteStr, stateHash: Option[ByteStr] ) extends Signed { diff --git a/node/src/main/scala/com/wavesplatform/database/DBResource.scala b/node/src/main/scala/com/wavesplatform/database/DBResource.scala index 7d3f17515b8..7577b8047cf 100644 --- a/node/src/main/scala/com/wavesplatform/database/DBResource.scala +++ b/node/src/main/scala/com/wavesplatform/database/DBResource.scala @@ -20,7 +20,8 @@ trait DBResource extends AutoCloseable { object DBResource { def apply(db: RocksDB, iteratorCfHandle: Option[ColumnFamilyHandle] = None): DBResource = new DBResource { private[this] val snapshot = db.getSnapshot - private[this] val readOptions = new ReadOptions().setSnapshot(snapshot) + // checksum may be verification is **very** expensive, so it's explicitly disabled + private[this] val readOptions = new ReadOptions().setSnapshot(snapshot).setVerifyChecksums(false) override def get[V](key: Key[V]): V = key.parse(db.get(key.columnFamilyHandle.getOrElse(db.getDefaultColumnFamily), readOptions, key.keyBytes)) diff --git a/node/src/main/scala/com/wavesplatform/database/RDB.scala b/node/src/main/scala/com/wavesplatform/database/RDB.scala index 7c9bb13a791..98c71688754 100644 --- a/node/src/main/scala/com/wavesplatform/database/RDB.scala +++ b/node/src/main/scala/com/wavesplatform/database/RDB.scala @@ -137,13 +137,13 @@ object RDB extends StrictLogging { private def createDbOptions(settings: DBSettings): OptionsWithResources[DBOptions] = { val dbOptions = new DBOptions() .setCreateIfMissing(true) - .setParanoidChecks(settings.rocksdb.paranoidChecks) - .setIncreaseParallelism(6) + .setIncreaseParallelism(settings.rocksdb.parallelism) .setBytesPerSync(2 << 20) .setCreateMissingColumnFamilies(true) - .setMaxOpenFiles(100) + .setMaxOpenFiles(settings.rocksdb.maxOpenFiles) .setMaxSubcompactions(2) // Write stalls expected without this option. Can lead to max_background_jobs * max_subcompactions background threads .setMaxManifestFileSize(200 << 20) + .setAllowMmapReads(settings.rocksdb.allowMmapReads) if (settings.rocksdb.enableStatistics) { val statistics = new Statistics() diff --git a/node/src/main/scala/com/wavesplatform/database/package.scala b/node/src/main/scala/com/wavesplatform/database/package.scala index b0fc6bc5400..36d6ce86319 100644 --- a/node/src/main/scala/com/wavesplatform/database/package.scala +++ b/node/src/main/scala/com/wavesplatform/database/package.scala @@ -423,7 +423,8 @@ package object database { def withReadOptions[A](f: ReadOptions => A): A = { val snapshot = db.getSnapshot - val ro = new ReadOptions().setSnapshot(snapshot) + // checksum may be verification is **very** expensive, so it's explicitly disabled + val ro = new ReadOptions().setSnapshot(snapshot).setVerifyChecksums(false) try f(ro) finally { ro.close() @@ -529,11 +530,13 @@ package object database { } else () } - val iterator = db.newIterator(cfh.getOrElse(db.getDefaultColumnFamily), new ReadOptions().setTotalOrderSeek(true)) - try { - iterator.seek(prefix) - loop(iterator) - } finally iterator.close() + withReadOptions { ro => + val iterator = db.newIterator(cfh.getOrElse(db.getDefaultColumnFamily), ro.setTotalOrderSeek(true)) + try { + iterator.seek(prefix) + loop(iterator) + } finally iterator.close() + } } def resourceObservable: Observable[DBResource] = diff --git a/node/src/main/scala/com/wavesplatform/extensions/Context.scala b/node/src/main/scala/com/wavesplatform/extensions/Context.scala index c0c2e9e900d..bfa690d02e4 100644 --- a/node/src/main/scala/com/wavesplatform/extensions/Context.scala +++ b/node/src/main/scala/com/wavesplatform/extensions/Context.scala @@ -1,6 +1,5 @@ package com.wavesplatform.extensions -import akka.actor.ActorSystem import com.wavesplatform.api.common.* import com.wavesplatform.common.state.ByteStr import com.wavesplatform.events.UtxEvent @@ -30,5 +29,4 @@ trait Context { def broadcastTransaction(tx: Transaction): TracedResult[ValidationError, Boolean] def utxEvents: Observable[UtxEvent] - def actorSystem: ActorSystem } diff --git a/node/src/main/scala/com/wavesplatform/metrics/HttpSpanLogger.scala b/node/src/main/scala/com/wavesplatform/metrics/HttpSpanLogger.scala index 44dbedac3fa..21b179d743d 100644 --- a/node/src/main/scala/com/wavesplatform/metrics/HttpSpanLogger.scala +++ b/node/src/main/scala/com/wavesplatform/metrics/HttpSpanLogger.scala @@ -57,11 +57,11 @@ class HttpSpanLogger extends CombinedReporter with LazyLogging { Json .obj( "@timestamp" -> LocalDateTime.ofInstant(snapshot.from, ZoneId.systemDefault()), - "executor_queue_duration_max" -> dist.max.toDouble / 10e6, - "executor_queue_duration_min" -> dist.min.toDouble / 10e6, - "executor_queue_duration_sum" -> dist.sum.toDouble / 10e6, + "executor_queue_duration_max" -> dist.max, + "executor_queue_duration_min" -> dist.min, + "executor_queue_duration_sum" -> dist.sum, "executor_queue_duration_count" -> dist.count, - "executor_queue_duration_avg" -> dist.sum.toDouble / dist.count / 10e6 + "executor_queue_duration_avg" -> dist.sum.toDouble / dist.count ) .toString ) @@ -81,9 +81,9 @@ object HttpSpanLogger { val TimeInQueueMetricKey = "executor.time-in-queue" - case class Mark(key: String, duration: Double) + case class Mark(key: String, durationMillis: Long) - def millisBetween(from: Instant, to: Instant): Double = Duration.between(from, to).toNanos * 1e-6 + def millisBetween(from: Instant, to: Instant): Long = Duration.between(from, to).toMillis implicit class FinishedSpanExt(val span: Span.Finished) extends AnyVal { def isAkkaHttpServer: Boolean = span.metricTags.get(Lookups.option("component")).contains("akka.http.server") def method: String = span.metricTags.get(Lookups.plain(TagKeys.HttpMethod)) diff --git a/node/src/main/scala/com/wavesplatform/settings/GRPCSettings.scala b/node/src/main/scala/com/wavesplatform/settings/GRPCSettings.scala index 254b21b1275..ea55ddfac37 100644 --- a/node/src/main/scala/com/wavesplatform/settings/GRPCSettings.scala +++ b/node/src/main/scala/com/wavesplatform/settings/GRPCSettings.scala @@ -1,3 +1,7 @@ package com.wavesplatform.settings -final case class GRPCSettings(host: String, port: Int) +final case class GRPCSettings( + host: String, + port: Int, + workerThreads: Int +) diff --git a/node/src/main/scala/com/wavesplatform/settings/RocksDBSettings.scala b/node/src/main/scala/com/wavesplatform/settings/RocksDBSettings.scala index 30ad8d172a7..3692e164f28 100644 --- a/node/src/main/scala/com/wavesplatform/settings/RocksDBSettings.scala +++ b/node/src/main/scala/com/wavesplatform/settings/RocksDBSettings.scala @@ -8,5 +8,7 @@ case class RocksDBSettings( apiCacheSize: SizeInBytes, writeBufferSize: SizeInBytes, enableStatistics: Boolean, - paranoidChecks: Boolean + allowMmapReads: Boolean, + parallelism: Int, + maxOpenFiles: Int ) diff --git a/node/src/main/scala/com/wavesplatform/state/BlockchainUpdaterImpl.scala b/node/src/main/scala/com/wavesplatform/state/BlockchainUpdaterImpl.scala index 26e694c7664..4852c021080 100644 --- a/node/src/main/scala/com/wavesplatform/state/BlockchainUpdaterImpl.scala +++ b/node/src/main/scala/com/wavesplatform/state/BlockchainUpdaterImpl.scala @@ -347,6 +347,8 @@ class BlockchainUpdaterImpl( ) miner.scheduleMining(Some(tempBlockchain)) + log.trace(s"Persisting block ${referencedForgedBlock.id()}, discarded microblock refs: ${discarded.map(_._1.reference).mkString("[", ",", "]")}") + if (discarded.nonEmpty) { blockchainUpdateTriggers.onMicroBlockRollback(this, block.header.reference) metrics.microBlockForkStats.increment() diff --git a/node/src/main/scala/com/wavesplatform/state/DataEntry.scala b/node/src/main/scala/com/wavesplatform/state/DataEntry.scala index d4c4558fd53..d827c4d176f 100644 --- a/node/src/main/scala/com/wavesplatform/state/DataEntry.scala +++ b/node/src/main/scala/com/wavesplatform/state/DataEntry.scala @@ -89,20 +89,20 @@ object DataEntry { gen.writeStartObject() value match { case BinaryDataEntry(key, value) => - gen.writeStringField("type", "binary") gen.writeStringField("key", key) + gen.writeStringField("type", "binary") gen.writeStringField("value", value.base64) case IntegerDataEntry(key, value) => - gen.writeStringField("type", "integer") gen.writeStringField("key", key) + gen.writeStringField("type", "integer") gen.writeNumberField("value", value, numberAsString) case BooleanDataEntry(key, value) => - gen.writeStringField("type", "boolean") gen.writeStringField("key", key) + gen.writeStringField("type", "boolean") gen.writeBooleanField("value", value) case StringDataEntry(key, value) => - gen.writeStringField("type", "string") gen.writeStringField("key", key) + gen.writeStringField("type", "string") gen.writeStringField("value", value) case EmptyDataEntry(key) => gen.writeStringField("key", key)