Skip to content

Commit

Permalink
API Tweaks (#3942)
Browse files Browse the repository at this point in the history
  • Loading branch information
phearnot authored Mar 12, 2024
1 parent c374b48 commit 84226e2
Show file tree
Hide file tree
Showing 23 changed files with 94 additions and 77 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/check-pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
path: ~/.cache/coursier
key: coursier-cache
- name: Check PR
run: sbt --mem 6144 --batch ";checkPR;completeQaseRun"
run: sbt --mem 4096 --batch ";checkPR;completeQaseRun"
env:
QASE_ENABLE: true
QASE_RUN_NAME: checkPR
Expand Down
2 changes: 2 additions & 0 deletions grpc-server/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ waves {
grpc {
host = localhost
port = 6870
worker-threads = 4
}

blockchain-updates {
grpc-port = 6881
min-keep-alive = 5m
worker-threads = 4
}
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
package com.wavesplatform.api.grpc

import java.net.InetSocketAddress
import scala.concurrent.Future
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.wavesplatform.extensions.{Extension, Context as ExtensionContext}
import com.wavesplatform.settings.GRPCSettings
import com.wavesplatform.utils.ScorexLogging
import io.grpc.Server
import io.grpc.netty.NettyServerBuilder
import io.grpc.protobuf.services.ProtoReflectionService
import monix.execution.Scheduler
import net.ceedubs.ficus.readers.namemappers.implicits.hyphenCase
import net.ceedubs.ficus.Ficus.*
import net.ceedubs.ficus.readers.ArbitraryTypeReader.*

import java.net.InetSocketAddress
import java.util.concurrent.Executors
import scala.concurrent.Future

class GRPCServerExtension(context: ExtensionContext) extends Extension with ScorexLogging {
private implicit val apiScheduler: Scheduler = Scheduler(context.actorSystem.dispatcher)
private val settings = context.settings.config.as[GRPCSettings]("waves.grpc")
private val settings = context.settings.config.as[GRPCSettings]("waves.grpc")
private val executor = Executors.newFixedThreadPool(settings.workerThreads, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("grpc-server-worker-%d").build())
private implicit val apiScheduler: Scheduler = Scheduler(executor)
private val bindAddress = new InetSocketAddress(settings.host, settings.port)
private val server: Server = NettyServerBuilder
.forAddress(bindAddress)
.executor(executor)
.addService(TransactionsApiGrpc.bindService(new TransactionsApiGrpcImpl(context.blockchain, context.transactionsApi), apiScheduler))
.addService(BlocksApiGrpc.bindService(new BlocksApiGrpcImpl(context.blocksApi), apiScheduler))
.addService(AccountsApiGrpc.bindService(new AccountsApiGrpcImpl(context.accountsApi), apiScheduler))
Expand All @@ -34,6 +40,6 @@ class GRPCServerExtension(context: ExtensionContext) extends Extension with Scor
override def shutdown(): Future[Unit] = {
log.debug("Shutting down gRPC server")
server.shutdown()
Future(server.awaitTermination())(context.actorSystem.dispatcher)
Future(server.awaitTermination())(apiScheduler)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import io.grpc.protobuf.services.ProtoReflectionService
import io.grpc.{Metadata, Server, ServerStreamTracer, Status}
import monix.execution.schedulers.SchedulerService
import monix.execution.{ExecutionModel, Scheduler, UncaughtExceptionReporter}
import net.ceedubs.ficus.readers.namemappers.implicits.hyphenCase
import net.ceedubs.ficus.Ficus.*
import net.ceedubs.ficus.readers.ArbitraryTypeReader.*
import org.rocksdb.RocksDB

import java.net.InetSocketAddress
Expand All @@ -22,15 +24,14 @@ import scala.concurrent.duration.*
import scala.util.Try

class BlockchainUpdates(private val context: Context) extends Extension with ScorexLogging with BlockchainUpdateTriggers {
private[this] val settings = context.settings.config.as[BlockchainUpdatesSettings]("waves.blockchain-updates")
private[this] implicit val scheduler: SchedulerService = Schedulers.fixedPool(
sys.runtime.availableProcessors(),
settings.workerThreads,
"blockchain-updates",
UncaughtExceptionReporter(err => log.error("Uncaught exception in BlockchainUpdates scheduler", err)),
ExecutionModel.Default,
rejectedExecutionHandler = new akka.dispatch.SaneRejectedExecutionHandler
)

private[this] val settings = context.settings.config.as[BlockchainUpdatesSettings]("waves.blockchain-updates")
private[this] val rdb = RocksDB.open(context.settings.directory + "/blockchain-updates")
private[this] val repo = new Repo(rdb, context.blocksApi)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ class Repo(db: RocksDB, blocksApi: CommonBlocksApi)(implicit s: Scheduler)
}

override def onMicroBlockRollback(blockchainBefore: Blockchain, toBlockId: ByteStr): Unit = monitor.synchronized {
log.trace(s"Rolling back liquid microblock to $toBlockId")
liquidState match {
case Some(ls) =>
val discardedMicroBlocks = if (ls.keyBlock.id == toBlockId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package com.wavesplatform.events.settings

import scala.concurrent.duration.FiniteDuration
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ArbitraryTypeReader.arbitraryTypeValueReader
import net.ceedubs.ficus.readers.{Generated, ValueReader}
import net.ceedubs.ficus.readers.namemappers.implicits.hyphenCase

case class BlockchainUpdatesSettings(grpcPort: Int, minKeepAlive: FiniteDuration)

object BlockchainUpdatesSettings {
implicit val valueReader: Generated[ValueReader[BlockchainUpdatesSettings]] = arbitraryTypeValueReader
}
case class BlockchainUpdatesSettings(
grpcPort: Int,
minKeepAlive: FiniteDuration,
workerThreads: Int
)
5 changes: 3 additions & 2 deletions node/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ waves {
api-cache-size=16M
write-buffer-size = 128M
enable-statistics = false
# When enabled, after writing every SST file of the default column family, reopen it and read all the keys.
paranoid-checks = off
allow-mmap-reads = off
parallelism = 2
max-open-files = 100
}
}

Expand Down
6 changes: 3 additions & 3 deletions node/src/main/scala/com/wavesplatform/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import kamon.Kamon
import kamon.instrumentation.executor.ExecutorInstrumentation
import monix.eval.{Coeval, Task}
import monix.execution.schedulers.{ExecutorScheduler, SchedulerService}
import monix.execution.{Scheduler, UncaughtExceptionReporter}
import monix.execution.{ExecutionModel, Scheduler, UncaughtExceptionReporter}
import monix.reactive.Observable
import monix.reactive.subjects.ConcurrentSubject
import org.influxdb.dto.Point
Expand Down Expand Up @@ -237,7 +237,6 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
override def utx: UtxPool = utxStorage
override def broadcastTransaction(tx: Transaction): TracedResult[ValidationError, Boolean] =
Await.result(transactionPublisher.validateAndBroadcast(tx, None), Duration.Inf) // TODO: Replace with async if possible
override def actorSystem: ActorSystem = app.actorSystem
override def utxEvents: Observable[UtxEvent] = app.utxEvents

override val transactionsApi: CommonTransactionsApi = CommonTransactionsApi(
Expand Down Expand Up @@ -371,7 +370,8 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
val heavyRequestScheduler = Scheduler(
if (settings.config.getBoolean("kamon.enable"))
ExecutorInstrumentation.instrument(heavyRequestExecutor, "heavy-request-executor")
else heavyRequestExecutor
else heavyRequestExecutor,
ExecutionModel.BatchedExecution(100)
)

val serverRequestTimeout = FiniteDuration(settings.config.getDuration("akka.http.server.request-timeout").getSeconds, TimeUnit.SECONDS)
Expand Down
4 changes: 2 additions & 2 deletions node/src/main/scala/com/wavesplatform/Explorer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ object Explorer extends ScorexLogging {
val result = new util.HashMap[Short, Stats]
Seq(rdb.db.getDefaultColumnFamily, rdb.txHandle.handle, rdb.txSnapshotHandle.handle, rdb.txMetaHandle.handle).foreach { cf =>
Using.Manager { use =>
val ro = use(new ReadOptions().setTotalOrderSeek(true))
val ro = use(new ReadOptions().setTotalOrderSeek(true).setVerifyChecksums(false))
val iterator = use(rdb.db.newIterator(cf, ro))
iterator.seekToFirst()

Expand Down Expand Up @@ -373,7 +373,7 @@ object Explorer extends ScorexLogging {
log.info("Counting transaction IDs")
var counter = 0
Using.Manager { use =>
val ro = use(new ReadOptions().setTotalOrderSeek(true))
val ro = use(new ReadOptions().setTotalOrderSeek(true).setVerifyChecksums(false))
val iter = use(rdb.db.newIterator(rdb.txMetaHandle.handle, ro))
iter.seekToFirst()
// iter.seek(KeyTags.TransactionMetaById.prefixBytes) // Doesn't work, because of CappedPrefixExtractor(10)
Expand Down
7 changes: 1 addition & 6 deletions node/src/main/scala/com/wavesplatform/Importer.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.wavesplatform

import akka.actor.ActorSystem
import cats.implicits.catsSyntaxOption
import cats.syntax.apply.*
import com.google.common.io.ByteStreams
Expand Down Expand Up @@ -123,7 +122,6 @@ object Importer extends ScorexLogging {
extensionTime: Time,
utxPool: UtxPool,
rdb: RDB,
extensionActorSystem: ActorSystem
): Seq[Extension] =
if (wavesSettings.extensions.isEmpty) Seq.empty
else {
Expand All @@ -139,7 +137,6 @@ object Importer extends ScorexLogging {

override def broadcastTransaction(tx: Transaction): TracedResult[ValidationError, Boolean] =
TracedResult.wrapE(Left(GenericError("Not implemented during import")))
override def actorSystem: ActorSystem = extensionActorSystem
override def utxEvents: Observable[UtxEvent] = Observable.empty
override def transactionsApi: CommonTransactionsApi =
CommonTransactionsApi(
Expand Down Expand Up @@ -348,7 +345,6 @@ object Importer extends ScorexLogging {
val scheduler = Schedulers.singleThread("appender")
val time = new NTP(settings.ntpServer)

val actorSystem = ActorSystem("wavesplatform-import")
val rdb = RDB.open(settings.dbSettings)
val (blockchainUpdater, rdbWriter) =
StorageFactory(settings, rdb, time, BlockchainUpdateTriggers.combined(triggers))
Expand All @@ -357,7 +353,7 @@ object Importer extends ScorexLogging {
val extAppender: (Block, Option[BlockSnapshotResponse]) => Task[Either[ValidationError, BlockApplyResult]] =
BlockAppender(blockchainUpdater, time, utxPool, pos, scheduler, importOptions.verify, txSignParCheck = false)

val extensions = initExtensions(settings, blockchainUpdater, scheduler, time, utxPool, rdb, actorSystem)
val extensions = initExtensions(settings, blockchainUpdater, scheduler, time, utxPool, rdb)
checkGenesis(settings, blockchainUpdater, Miner.Disabled)

val blocksFileOffset =
Expand Down Expand Up @@ -392,7 +388,6 @@ object Importer extends ScorexLogging {

sys.addShutdownHook {
quit = true
Await.result(actorSystem.terminate(), 10.second)
lock.synchronized {
if (blockchainUpdater.isFeatureActivated(BlockchainFeatures.NG) && blockchainUpdater.liquidBlockMeta.nonEmpty) {
// Force store liquid block in rocksdb
Expand Down
22 changes: 15 additions & 7 deletions node/src/main/scala/com/wavesplatform/api/http/RouteTimeout.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ package com.wavesplatform.api.http

import akka.NotUsed
import akka.http.scaladsl.marshalling.{ToResponseMarshallable, ToResponseMarshaller}
import akka.http.scaladsl.server.Directives.{complete, handleExceptions}
import akka.http.scaladsl.server.Directives.{complete, handleExceptions, withExecutionContext}
import akka.http.scaladsl.server.{ExceptionHandler, Route}
import akka.stream.scaladsl.Source
import com.typesafe.scalalogging.LazyLogging
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.Observable

import scala.concurrent.ExecutionContext.fromExecutor
import scala.concurrent.TimeoutException
import scala.concurrent.duration.FiniteDuration

class RouteTimeout(timeout: FiniteDuration)(implicit sc: Scheduler) extends ApiMarshallers {
class RouteTimeout(timeout: FiniteDuration)(implicit sc: Scheduler) extends ApiMarshallers with LazyLogging {
private val ece = fromExecutor(sc, t => logger.warn(s"Exception in RouteTimeout", t))
private val handler = ExceptionHandler { case _: TimeoutException =>
complete(ApiError.ServerRequestTimeout)
}
Expand All @@ -27,10 +30,15 @@ class RouteTimeout(timeout: FiniteDuration)(implicit sc: Scheduler) extends ApiM
.map(Source(_).map(f))(sc)
}

def executeFromObservable[T](observable: Observable[T])(implicit m: ToResponseMarshaller[Source[T, NotUsed]]): Route = {
handleExceptions(handler) & complete(Source.fromPublisher(observable.toReactivePublisher(sc)).initialTimeout(timeout))
}
def executeFromObservable[T](observable: Observable[T])(implicit m: ToResponseMarshaller[Source[T, NotUsed]]): Route =
withExecutionContext(ece) {
handleExceptions(handler) &
complete(Source.fromPublisher(observable.toReactivePublisher(sc)).initialTimeout(timeout))
}

def execute[T](task: Task[T])(f: (Task[T], Scheduler) => ToResponseMarshallable): Route =
handleExceptions(handler) & complete(f(task.timeout(timeout), sc))
def execute[T](task: Task[T])(f: (Task[T], Scheduler) => ToResponseMarshallable): Route = {
withExecutionContext(ece) {
handleExceptions(handler) & complete(f(task.timeout(timeout), sc))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ final case class TransactionJsonSerializer(blockchain: Blockchain) {
gen.writeStartObject()
gen.writeStringField("dApp", inv.dApp.toString)
gen.writeValueField("call")(callSerializer(numbersAsString).serialize(inv.call, _, serializers))
gen.writeArrayField("payments", inv.payments)(attachedPaymentSerializer(numbersAsString), serializers)
gen.writeArrayField("payment", inv.payments)(attachedPaymentSerializer(numbersAsString), serializers)
gen.writeValueField("stateChanges")(invokeScriptResultSerializer(numbersAsString).serialize(inv.stateChanges, _, serializers))
gen.writeEndObject()
}
Expand Down Expand Up @@ -258,7 +258,10 @@ final case class TransactionJsonSerializer(blockchain: Blockchain) {
gen.writeNumberField("type", tx.tpe.id, numbersAsString)
gen.writeStringField("id", tx.id().toString)
gen.writeNumberField("fee", tx.assetFee._2, numbersAsString)
tx.assetFee._1.maybeBase58Repr.foreach(gen.writeStringField("feeAssetId", _))
tx.feeAssetId match {
case IssuedAsset(id) => gen.writeStringField("feeAssetId", id.toString)
case Asset.Waves => gen.writeNullField("feeAssetId")
}
gen.writeNumberField("timestamp", tx.timestamp, numbersAsString)
gen.writeNumberField("version", tx.version, numbersAsString)
if (PBSince.affects(tx)) gen.writeNumberField("chainId", tx.chainId, numbersAsString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,7 @@ import com.wavesplatform.api.common.{CommonAccountsApi, CommonAssetsApi}
import com.wavesplatform.api.http.*
import com.wavesplatform.api.http.ApiError.*
import com.wavesplatform.api.http.StreamSerializerUtils.*
import com.wavesplatform.api.http.assets.AssetsApiRoute.{
AssetDetails,
AssetInfo,
DistributionParams,
assetDetailsSerializer,
assetDistributionSerializer
}
import com.wavesplatform.api.http.assets.AssetsApiRoute.*
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.lang.ValidationError
import com.wavesplatform.settings.RestAPISettings
Expand All @@ -38,6 +32,7 @@ import com.wavesplatform.transaction.{EthereumTransaction, TxTimestamp, TxVersio
import com.wavesplatform.utils.Time
import com.wavesplatform.wallet.Wallet
import io.netty.util.concurrent.DefaultThreadFactory
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.Observable
import play.api.libs.json.*
Expand Down Expand Up @@ -93,11 +88,10 @@ case class AssetsApiRoute(
}
} ~ pathPrefix("details") {
(anyParam("id", limit = settings.assetDetailsLimit) & parameter("full".as[Boolean] ? false)) { (ids, full) =>
val result = Either
.cond(ids.nonEmpty, (), AssetIdNotSpecified)
.map(_ => multipleDetails(ids.toList, full))

complete(result)
if (ids.isEmpty) complete(AssetIdNotSpecified)
else {
routeTimeout.executeToFuture(Task(multipleDetails(ids.toList, full)))
}
} ~ (get & path(AssetId) & parameter("full".as[Boolean] ? false)) { (assetId, full) =>
singleDetails(assetId, full)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ case class MicroBlock(
sender: PublicKey,
transactionData: Seq[Transaction],
reference: BlockId,
totalResBlockSig: BlockId,
totalResBlockSig: ByteStr,
signature: ByteStr,
stateHash: Option[ByteStr]
) extends Signed {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ trait DBResource extends AutoCloseable {
object DBResource {
def apply(db: RocksDB, iteratorCfHandle: Option[ColumnFamilyHandle] = None): DBResource = new DBResource {
private[this] val snapshot = db.getSnapshot
private[this] val readOptions = new ReadOptions().setSnapshot(snapshot)
// checksum may be verification is **very** expensive, so it's explicitly disabled
private[this] val readOptions = new ReadOptions().setSnapshot(snapshot).setVerifyChecksums(false)

override def get[V](key: Key[V]): V = key.parse(db.get(key.columnFamilyHandle.getOrElse(db.getDefaultColumnFamily), readOptions, key.keyBytes))

Expand Down
6 changes: 3 additions & 3 deletions node/src/main/scala/com/wavesplatform/database/RDB.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,13 @@ object RDB extends StrictLogging {
private def createDbOptions(settings: DBSettings): OptionsWithResources[DBOptions] = {
val dbOptions = new DBOptions()
.setCreateIfMissing(true)
.setParanoidChecks(settings.rocksdb.paranoidChecks)
.setIncreaseParallelism(6)
.setIncreaseParallelism(settings.rocksdb.parallelism)
.setBytesPerSync(2 << 20)
.setCreateMissingColumnFamilies(true)
.setMaxOpenFiles(100)
.setMaxOpenFiles(settings.rocksdb.maxOpenFiles)
.setMaxSubcompactions(2) // Write stalls expected without this option. Can lead to max_background_jobs * max_subcompactions background threads
.setMaxManifestFileSize(200 << 20)
.setAllowMmapReads(settings.rocksdb.allowMmapReads)

if (settings.rocksdb.enableStatistics) {
val statistics = new Statistics()
Expand Down
15 changes: 9 additions & 6 deletions node/src/main/scala/com/wavesplatform/database/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,8 @@ package object database {

def withReadOptions[A](f: ReadOptions => A): A = {
val snapshot = db.getSnapshot
val ro = new ReadOptions().setSnapshot(snapshot)
// checksum may be verification is **very** expensive, so it's explicitly disabled
val ro = new ReadOptions().setSnapshot(snapshot).setVerifyChecksums(false)
try f(ro)
finally {
ro.close()
Expand Down Expand Up @@ -529,11 +530,13 @@ package object database {
} else ()
}

val iterator = db.newIterator(cfh.getOrElse(db.getDefaultColumnFamily), new ReadOptions().setTotalOrderSeek(true))
try {
iterator.seek(prefix)
loop(iterator)
} finally iterator.close()
withReadOptions { ro =>
val iterator = db.newIterator(cfh.getOrElse(db.getDefaultColumnFamily), ro.setTotalOrderSeek(true))
try {
iterator.seek(prefix)
loop(iterator)
} finally iterator.close()
}
}

def resourceObservable: Observable[DBResource] =
Expand Down
Loading

0 comments on commit 84226e2

Please sign in to comment.