diff --git a/src/main/scala/units/ConsensusClient.scala b/src/main/scala/units/ConsensusClient.scala index d8b24dcb..9507a361 100644 --- a/src/main/scala/units/ConsensusClient.scala +++ b/src/main/scala/units/ConsensusClient.scala @@ -13,6 +13,7 @@ import units.ConsensusClient.ChainHandler import units.client.engine.EngineApiClient import units.network.* +import java.util.concurrent.ScheduledExecutorService import scala.concurrent.Future import scala.jdk.CollectionConverters.CollectionHasAsScala import scala.util.Try @@ -79,7 +80,7 @@ object ConsensusClient { blockObserver: BlocksObserver, allChannels: DefaultChannelGroup, globalScheduler: Scheduler, - eluScheduler: Scheduler, + eluScheduler: ScheduledExecutorService, ownedResources: AutoCloseable ) extends AutoCloseable { def this(context: ExtensionContext, deps: ConsensusClientDependencies) = diff --git a/src/main/scala/units/ConsensusClientDependencies.scala b/src/main/scala/units/ConsensusClientDependencies.scala index 292c2057..98cfd89c 100644 --- a/src/main/scala/units/ConsensusClientDependencies.scala +++ b/src/main/scala/units/ConsensusClientDependencies.scala @@ -4,7 +4,7 @@ import com.wavesplatform.network.{PeerDatabaseImpl, PeerInfo} import com.wavesplatform.utils.{LoggerFacade, Schedulers} import io.netty.channel.Channel import io.netty.channel.group.DefaultChannelGroup -import io.netty.util.concurrent.GlobalEventExecutor +import io.netty.util.concurrent.{DefaultThreadFactory, GlobalEventExecutor} import monix.execution.Scheduler import monix.execution.schedulers.SchedulerService import org.slf4j.LoggerFactory @@ -13,7 +13,7 @@ import units.client.JwtAuthenticationBackend import units.client.engine.{HttpEngineApiClient, LoggedEngineApiClient} import units.network.{BlocksObserverImpl, HistoryReplier, MessageObserver, NetworkServer} -import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.{ConcurrentHashMap, Executors, ScheduledExecutorService, ThreadFactory} import scala.io.Source // A helper to create ConsensusClient due to Scala secondary constructors limitations @@ -23,8 +23,15 @@ class ConsensusClientDependencies(val config: ClientConfig) extends AutoCloseabl private val blockObserverScheduler = Schedulers.singleThread(s"block-observer-${config.chainContract}", reporter = { e => log.warn("Error in BlockObserver", e) }) val globalScheduler: Scheduler = monix.execution.Scheduler.global - val eluScheduler: SchedulerService = - Scheduler.singleThread(s"el-updater-${config.chainContract}", reporter = { e => log.warn("Exception in ELUpdater", e) }) + val eluScheduler: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor({ (r: Runnable) => + val t = new Thread(r) + t.setDaemon(true) + t.setName(s"el-updater-${config.chainContract}") + t.setUncaughtExceptionHandler((t: Thread, e: Throwable) => { + log.warn("Exception in ELUpdater", e) + }) + t + }) private val httpClientBackend = HttpClientSyncBackend() private val maybeAuthenticatedBackend = config.jwtSecretFile match { diff --git a/src/main/scala/units/ELUpdater.scala b/src/main/scala/units/ELUpdater.scala index e829acab..85ccf705 100644 --- a/src/main/scala/units/ELUpdater.scala +++ b/src/main/scala/units/ELUpdater.scala @@ -22,7 +22,6 @@ import com.wavesplatform.utx.UtxPool import com.wavesplatform.wallet.Wallet import io.netty.channel.Channel import io.netty.channel.group.DefaultChannelGroup -import monix.execution.cancelables.SerialCancelable import monix.execution.{CancelableFuture, Scheduler} import play.api.libs.json.* import units.ELUpdater.State.* @@ -38,6 +37,7 @@ import units.network.BlocksObserverImpl.BlockWithChannel import units.util.HexBytesConverter import units.util.HexBytesConverter.toHexNoPrefix +import java.util.concurrent.{CompletableFuture, Future, ScheduledExecutorService, TimeUnit} import scala.annotation.tailrec import scala.concurrent.duration.* import scala.util.* @@ -52,19 +52,25 @@ class ELUpdater( wallet: Wallet, requestBlockFromPeers: BlockHash => CancelableFuture[BlockWithChannel], broadcastTx: Transaction => TracedResult[ValidationError, Boolean], - scheduler: Scheduler, + scheduler: ScheduledExecutorService, globalScheduler: Scheduler ) extends StrictLogging { import ELUpdater.* - private val handleNextUpdate = SerialCancelable() - private val contractAddress = config.chainContractAddress + private val contractAddress = config.chainContractAddress private val chainContractClient = new ChainContractStateClient(contractAddress, blockchain) private[units] var state: State = Starting - def consensusLayerChanged(): Unit = - handleNextUpdate := scheduler.scheduleOnce(ClChangedProcessingDelay)(handleConsensusLayerChanged()) + @volatile private var handleNextUpdate: Future[?] = CompletableFuture.completedFuture(()) + def consensusLayerChanged(): Unit = { + handleNextUpdate.cancel(false) + handleNextUpdate = scheduler.schedule( + handleConsensusLayerChanged(), + ClChangedProcessingDelay.length, + ClChangedProcessingDelay.unit + ) + } def executionBlockReceived(block: NetworkL2Block, ch: Channel): Unit = scheduler.execute { () => logger.debug(s"New block ${block.hash}->${block.parentHash} (timestamp=${block.timestamp}, height=${block.height}) appeared") @@ -207,7 +213,7 @@ class ELUpdater( timestamp: Long, contractFunction: ContractFunction, chainContractOptions: ChainContractOptions - ): Unit = { + ): Runnable = () => { def getWaitingTime: Option[FiniteDuration] = { val timestampAheadTime = (timestamp - time.correctedTime() / 1000).max(0) if (timestampAheadTime > 0) { @@ -221,8 +227,10 @@ class ELUpdater( case Working(epochInfo, _, _, _, _, m: Mining, _, _) if m.currentPayloadId == payloadId => getWaitingTime match { case Some(waitingTime) => - scheduler.scheduleOnce(waitingTime)( - prepareAndApplyPayload(payloadId, referenceHash, timestamp, contractFunction, chainContractOptions) + scheduler.schedule( + (() => prepareAndApplyPayload(payloadId, referenceHash, timestamp, contractFunction, chainContractOptions)): Runnable, + waitingTime.length, + waitingTime.unit ) case _ => (for { @@ -374,14 +382,16 @@ class ELUpdater( ) setState("12", newState) - scheduler.scheduleOnce((miningData.nextBlockUnixTs - time.correctedTime() / 1000).min(1).seconds)( + scheduler.schedule( prepareAndApplyPayload( miningData.payloadId, parentBlock.hash, miningData.nextBlockUnixTs, newState.options.startEpochChainFunction(parentBlock.hash, epochInfo.hitSource, nodeChainInfo.toOption), newState.options - ) + ), + (miningData.nextBlockUnixTs - time.correctedTime() / 1000).min(1), + TimeUnit.SECONDS ) }).fold( err => logger.error(s"Error starting payload build process: ${err.message}"), @@ -396,7 +406,7 @@ class ELUpdater( epochNumber: Int, parentBlock: EcBlock, chainContractOptions: ChainContractOptions - ): Unit = { + ): Runnable = () => { state match { case w @ Working(epochInfo, _, finalizedBlock, _, _, m: Mining, _, _) if epochInfo.number == epochNumber && blockchain.height == epochNumber => val nextBlockUnixTs = (parentBlock.timestamp + config.blockDelay.toSeconds).max(time.correctedTime() / 1000) @@ -413,9 +423,11 @@ class ELUpdater( ).fold[Unit]( err => { logger.error(s"Error starting payload build process: ${err.message}") - scheduler.scheduleOnce(MiningRetryInterval) { - tryToForgeNextBlock(epochNumber, parentBlock, chainContractOptions) - } + scheduler.schedule( + tryToForgeNextBlock(epochNumber, parentBlock, chainContractOptions), + MiningRetryInterval.length, + MiningRetryInterval.unit + ) }, miningData => { val newState = w.copy( @@ -427,14 +439,16 @@ class ELUpdater( ) ) setState("11", newState) - scheduler.scheduleOnce((miningData.nextBlockUnixTs - time.correctedTime() / 1000).min(1).seconds)( + scheduler.schedule( prepareAndApplyPayload( miningData.payloadId, parentBlock.hash, miningData.nextBlockUnixTs, chainContractOptions.appendFunction(parentBlock.hash), chainContractOptions - ) + ), + (miningData.nextBlockUnixTs - time.correctedTime() / 1000).min(1), + TimeUnit.SECONDS ) } ) @@ -484,7 +498,7 @@ class ELUpdater( } } - private def handleConsensusLayerChanged(): Unit = { + private def handleConsensusLayerChanged(): Runnable = () => { state match { case Starting => updateStartingState() case w: Working[ChainStatus] => updateWorkingState(w) @@ -566,11 +580,16 @@ class ELUpdater( w.chainStatus match { case FollowingChain(_, Some(nextExpectedBlock)) => logger.debug(s"Waiting for block $nextExpectedBlock from peers") - scheduler.scheduleOnce(WaitRequestedBlockTimeout) { - if (blockchain.height == prevState.epochInfo.number) { - check(missedBlock) - } - } + scheduler.schedule( + ( + () => + if (blockchain.height == prevState.epochInfo.number) { + check(missedBlock) + } + ): Runnable, + WaitRequestedBlockTimeout.length, + WaitRequestedBlockTimeout.unit + ) case FollowingChain(nodeChainInfo, _) => tryToStartMining(w, Right(nodeChainInfo)) case WaitForNewChain(chainSwitchInfo) => @@ -583,11 +602,16 @@ class ELUpdater( prevState.chainStatus.nextExpectedBlock match { case Some(missedBlock) => - scheduler.scheduleOnce(WaitRequestedBlockTimeout) { - if (blockchain.height == prevState.epochInfo.number) { - check(missedBlock) - } - } + scheduler.schedule( + ( + () => + if (blockchain.height == prevState.epochInfo.number) { + check(missedBlock) + } + ): Runnable, + WaitRequestedBlockTimeout.length, + WaitRequestedBlockTimeout.unit + ) case _ => tryToStartMining(prevState, Right(prevState.chainStatus.nodeChainInfo)) } @@ -888,50 +912,55 @@ class ELUpdater( chainInfo.lastBlock.height > lastEcBlock.height && !chainContractClient.blockExists(lastEcBlock.hash) || chainInfo.lastBlock.height < lastEcBlock.height - private def waitForSyncCompletion(target: ContractBlock): Unit = scheduler.scheduleOnce(5.seconds)(state match { - case SyncingToFinalizedBlock(finalizedBlockHash) if finalizedBlockHash == target.hash => - logger.debug(s"Checking if EL has synced to ${target.hash} on height ${target.height}") - engineApiClient.getLastExecutionBlock() match { - case Left(error) => - logger.error(s"Sync to ${target.hash} was not completed, error=${error.message}") - setState("23", Starting) - case Right(lastBlock) if lastBlock.hash == target.hash => - logger.debug(s"Finished synchronization to ${target.hash} successfully") - calculateEpochInfo match { - case Left(err) => - logger.error(s"Could not transition to following chain state: $err") - setState("24", Starting) - case Right(newEpochInfo) => - chainContractClient.getMainChainInfo match { - case Some(mainChainInfo) => - logger.trace(s"Following main chain ${mainChainInfo.id}") - val fullValidationStatus = - FullValidationStatus( - lastValidatedBlock = target, - lastElWithdrawalIndex = None - ) - followChainAndRequestNextBlock( - newEpochInfo, - mainChainInfo, - lastBlock, - mainChainInfo, - target, - fullValidationStatus, - chainContractClient.getOptions, - None - ) - case _ => - logger.error(s"Can't get main chain info") - setState("25", Starting) + private def waitForSyncCompletion(target: ContractBlock): Unit = scheduler.schedule( + () => + state match { + case SyncingToFinalizedBlock(finalizedBlockHash) if finalizedBlockHash == target.hash => + logger.debug(s"Checking if EL has synced to ${target.hash} on height ${target.height}") + engineApiClient.getLastExecutionBlock() match { + case Left(error) => + logger.error(s"Sync to ${target.hash} was not completed, error=${error.message}") + setState("23", Starting) + case Right(lastBlock) if lastBlock.hash == target.hash => + logger.debug(s"Finished synchronization to ${target.hash} successfully") + calculateEpochInfo match { + case Left(err) => + logger.error(s"Could not transition to following chain state: $err") + setState("24", Starting) + case Right(newEpochInfo) => + chainContractClient.getMainChainInfo match { + case Some(mainChainInfo) => + logger.trace(s"Following main chain ${mainChainInfo.id}") + val fullValidationStatus = + FullValidationStatus( + lastValidatedBlock = target, + lastElWithdrawalIndex = None + ) + followChainAndRequestNextBlock( + newEpochInfo, + mainChainInfo, + lastBlock, + mainChainInfo, + target, + fullValidationStatus, + chainContractClient.getOptions, + None + ) + case _ => + logger.error(s"Can't get main chain info") + setState("25", Starting) + } } + case Right(lastBlock) => + logger.debug(s"Sync to ${target.hash} is in progress: current last block is ${lastBlock.hash} at height ${lastBlock.height}") + waitForSyncCompletion(target) } - case Right(lastBlock) => - logger.debug(s"Sync to ${target.hash} is in progress: current last block is ${lastBlock.hash} at height ${lastBlock.height}") - waitForSyncCompletion(target) - } - case other => - logger.debug(s"Unexpected state on sync: $other") - }) + case other => + logger.debug(s"Unexpected state on sync: $other") + }, + 5, + TimeUnit.SECONDS + ) private def validateRandao(block: EcBlock, epochNumber: Int): JobResult[Unit] = blockchain.vrf(epochNumber) match { diff --git a/src/test/scala/units/ExtensionDomain.scala b/src/test/scala/units/ExtensionDomain.scala index 3c9eeeed..e8f222b9 100644 --- a/src/test/scala/units/ExtensionDomain.scala +++ b/src/test/scala/units/ExtensionDomain.scala @@ -49,6 +49,7 @@ import units.network.TestBlocksObserver import units.test.CustomMatchers import java.nio.charset.StandardCharsets +import java.util.concurrent.Executors import scala.annotation.tailrec import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.reflect.ClassTag @@ -87,7 +88,7 @@ class ExtensionDomain( val ecClients = new TestEcClients(ecGenesisBlock, blockchain) val globalScheduler = TestScheduler(ExecutionModel.AlwaysAsyncExecution) - val eluScheduler = TestScheduler(ExecutionModel.AlwaysAsyncExecution) + val eluScheduler = Executors.newSingleThreadScheduledExecutor() val elBlockStream = PublishSubject[(Channel, NetworkL2Block)]() val blockObserver = new TestBlocksObserver(elBlockStream)