From c49ee4aed4c3ed2c07e59334bf57e0f2bd5e26d7 Mon Sep 17 00:00:00 2001 From: thinkAfCod Date: Thu, 25 Apr 2024 18:00:03 +0800 Subject: [PATCH] fix: start up in EL sync mode --- .../main/java/io/optimism/derive/State.java | 10 ++-- .../io/optimism/derive/stages/Batches.java | 3 +- .../main/java/io/optimism/driver/Driver.java | 60 ++++++++++++++----- .../java/io/optimism/driver/EngineDriver.java | 23 ++++--- .../optimism/utilities/rpc/Web3jProvider.java | 6 ++ 5 files changed, 73 insertions(+), 29 deletions(-) diff --git a/hildr-node/src/main/java/io/optimism/derive/State.java b/hildr-node/src/main/java/io/optimism/derive/State.java index 7116b4e4..33b58998 100644 --- a/hildr-node/src/main/java/io/optimism/derive/State.java +++ b/hildr-node/src/main/java/io/optimism/derive/State.java @@ -11,7 +11,7 @@ import java.util.TreeMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.StructuredTaskScope; -import java.util.function.Function; +import java.util.function.BiFunction; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +36,7 @@ public class State { private final TreeMap> l2Refs; - private final Function> l2Fetcher; + private final BiFunction> l2Fetcher; private BlockInfo safeHead; @@ -62,7 +62,7 @@ public State( TreeMap l1Info, TreeMap l1Hashes, TreeMap> l2Refs, - Function> l2Fetcher, + BiFunction> l2Fetcher, BlockInfo safeHead, Epoch safeEpoch, BigInteger currentEpochNum, @@ -89,7 +89,7 @@ public State( */ public static State create( TreeMap> l2Refs, - Function> l2Fetcher, + BiFunction> l2Fetcher, BlockInfo finalizedHead, Epoch finalizedEpoch, Config config) { @@ -144,7 +144,7 @@ public Tuple2 l2Info(BigInteger timestamp) { return cache; } LOGGER.warn("L2 refs cache not contains, will fetch from geth: blockNum = {}", blockNum); - var res = l2Fetcher.apply(blockNum); + var res = l2Fetcher.apply(DefaultBlockParameter.valueOf(blockNum), true); this.l2Refs.put(res.component1().number(), res); return res; } diff --git a/hildr-node/src/main/java/io/optimism/derive/stages/Batches.java b/hildr-node/src/main/java/io/optimism/derive/stages/Batches.java index f5f0b878..2a758fbc 100644 --- a/hildr-node/src/main/java/io/optimism/derive/stages/Batches.java +++ b/hildr-node/src/main/java/io/optimism/derive/stages/Batches.java @@ -336,7 +336,8 @@ private BatchStatus spanBatchStatus(final Batch batchWrapper) { // check batch timestamp if (spanEndTimestamp.compareTo(nextTimestamp) < 0) { - LOGGER.warn("past batch"); + LOGGER.warn("past batch: nextTimestamp = l2SafeHead({}) + blockTime({}), spanEndTimestamp({})", + l2SafeHead.timestamp(), this.config.chainConfig().blockTime(), spanEndTimestamp); return BatchStatus.Drop; } if (spanStartTimestamp.compareTo(nextTimestamp) > 0) { diff --git a/hildr-node/src/main/java/io/optimism/driver/Driver.java b/hildr-node/src/main/java/io/optimism/driver/Driver.java index 7d476798..ac80a10b 100644 --- a/hildr-node/src/main/java/io/optimism/driver/Driver.java +++ b/hildr-node/src/main/java/io/optimism/driver/Driver.java @@ -48,6 +48,7 @@ import java.util.concurrent.StructuredTaskScope; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.commons.collections4.CollectionUtils; @@ -84,6 +85,8 @@ public class Driver extends AbstractExecutionThreadService { private List futureUnsafeBlocks; + private final BiFunction> l2Fetcher; + private final AtomicReference state; private final ChainWatcher chainWatcher; @@ -106,6 +109,8 @@ public class Driver extends AbstractExecutionThreadService { private final AtomicBoolean isP2PNetworkStarted; + private final AtomicBoolean isElsyncFinished; + /** * Instantiates a new Driver. * @@ -123,6 +128,7 @@ public class Driver extends AbstractExecutionThreadService { public Driver( EngineDriver engineDriver, Pipeline pipeline, + BiFunction> l2Fetcher, AtomicReference state, ChainWatcher chainWatcher, MessagePassingQueue unsafeBlockQueue, @@ -133,6 +139,7 @@ public Driver( this.engineDriver = engineDriver; this.rpcServer = rpcServer; this.pipeline = pipeline; + this.l2Fetcher = l2Fetcher; this.state = state; this.chainWatcher = chainWatcher; this.unsafeBlockQueue = unsafeBlockQueue; @@ -148,6 +155,7 @@ public Driver( rpcHandler.put(RpcMethod.OP_ROLLUP_CONFIG.getRpcMethodName(), unused -> this.getRollupConfig()); this.rpcServer.register(rpcHandler); this.isP2PNetworkStarted = new AtomicBoolean(false); + this.isElsyncFinished = new AtomicBoolean(false); } /** @@ -218,9 +226,9 @@ public static Driver from(Config config, CountDownLatch latch) } else { l2Refs = io.optimism.derive.State.initL2Refs(finalizedHead.number(), config.chainConfig(), l2Provider); } - + var l2Fetcher = Driver.l2Fetcher(l2Provider); AtomicReference state = new AtomicReference<>(io.optimism.derive.State.create( - l2Refs, Driver.l2Fetcher(l2Provider), finalizedHead, finalizedEpoch, config)); + l2Refs, l2Fetcher, finalizedHead, finalizedEpoch, config)); EngineDriver engineDriver = new EngineDriver<>(finalizedHead, finalizedEpoch, l2Provider, config); @@ -234,7 +242,7 @@ public static Driver from(Config config, CountDownLatch latch) l2Provider.shutdown(); return new Driver<>( - engineDriver, pipeline, state, watcher, unsafeBlockQueue, rpcServer, latch, config, opStackNetwork); + engineDriver, pipeline, l2Fetcher, state, watcher, unsafeBlockQueue, rpcServer, latch, config, opStackNetwork); } /** @@ -494,10 +502,21 @@ private void advanceUnsafeHead() throws ExecutionException, InterruptedException } if (nextUnsafePayload.isPresent()) { - this.engineDriver.handleUnsafePayload(nextUnsafePayload.get()); + try { + this.engineDriver.handleUnsafePayload(nextUnsafePayload.get()); + } catch (ForkchoiceUpdateException e) { + if (!this.config.syncMode().isEl()) { + throw e; + } + // Ignore fork choice update exception during EL syncing + LOGGER.warn("Failed to insert unsafe payload for EL sync: ", e); + } if (this.config.syncMode().isEl() && !this.engineDriver.isEngineSyncing()) { - LOGGER.info("execution layer syncing is done, restarting chain watcher."); - this.restartChainWatcher(false); + if (!this.isElsyncFinished.compareAndExchange(false, true)) { + LOGGER.info("execution layer syncing is done, restarting chain watcher."); + this.fetchAndUpdateFinalizedHead(); + this.restartChainWatcher(); + } } } } @@ -533,14 +552,14 @@ private void handleNextBlockUpdate() { case BlockUpdate.Reorg ignored -> { LOGGER.warn("reorg detected, purging pipeline"); Driver.this.unfinalizedBlocks.clear(); - Driver.this.restartChainWatcher(true); + Driver.this.restartChainWatcher(); } case BlockUpdate.FinalityUpdate num -> Driver.this.finalizedL1BlockNumber = num.get(); default -> throw new IllegalArgumentException("unknown block update type"); } } - private void restartChainWatcher(boolean isReorg) { + private void restartChainWatcher() { Driver.this.chainWatcher.restart( Driver.this.engineDriver.getFinalizedEpoch().number().subtract(this.channelTimeout), Driver.this.engineDriver.getFinalizedHead().number()); @@ -550,9 +569,7 @@ private void restartChainWatcher(boolean isReorg) { return state; }); Driver.this.pipeline.purge(); - if (isReorg) { - Driver.this.engineDriver.reorg(); - } + Driver.this.engineDriver.reorg(); } private void updateFinalized() { @@ -646,11 +663,24 @@ public static L2BlockRef payloadToRef(ExecutionPayload payload, Config.ChainConf sequenceNumber); } - private static Function> l2Fetcher(final Web3j l2Provider) { - return blockNum -> { + private void fetchAndUpdateFinalizedHead() { + DefaultBlockParameter blockParameter; + if (this.engineDriver.getFinalizedHead().number().compareTo(BigInteger.ZERO) == 0) { + blockParameter = FINALIZED; + } else { + blockParameter = DefaultBlockParameter.valueOf(this.engineDriver.getFinalizedHead().number()); + } + Tuple2 finalizedHead = l2Fetcher.apply(blockParameter, true); + this.engineDriver.updateFinalized(finalizedHead.component1(), finalizedHead.component2()); + } + + + + private static BiFunction> l2Fetcher(final Web3j l2Provider) { + return (blockParameter, returnFull) -> { try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { StructuredTaskScope.Subtask blockTask = scope.fork(TracerTaskWrapper.wrap(() -> l2Provider - .ethGetBlockByNumber(DefaultBlockParameter.valueOf(blockNum), true) + .ethGetBlockByNumber(blockParameter, returnFull) .send())); scope.join(); scope.throwIfFailed(); @@ -668,6 +698,8 @@ private static Function> l2Fetcher(final We }; } + + /** * The type Unfinalized block. * diff --git a/hildr-node/src/main/java/io/optimism/driver/EngineDriver.java b/hildr-node/src/main/java/io/optimism/driver/EngineDriver.java index 61a403e6..4defc630 100644 --- a/hildr-node/src/main/java/io/optimism/driver/EngineDriver.java +++ b/hildr-node/src/main/java/io/optimism/driver/EngineDriver.java @@ -180,7 +180,6 @@ public void handleAttributes(PayloadAttributes attributes) throws ExecutionExcep */ public void handleUnsafePayload(ExecutionPayload payload) throws ExecutionException, InterruptedException { if (this.syncStatus == SyncStatus.WillStartEL) { - // todo query from l2Client var l2Finalized = l2Client.ethGetBlockByNumber(DefaultBlockParameterName.FINALIZED, true) .sendAsync() .get(); @@ -321,23 +320,29 @@ private void updateForkchoice() throws InterruptedException, ExecutionException scope.join(); scope.throwIfFailed(); - ForkChoiceUpdate forkChoiceUpdate = forkChoiceUpdateFuture.get().getForkChoiceUpdate(); - Status forkChoiceUpdateStatus = forkChoiceUpdate.payloadStatus().getStatus(); - if (this.syncModeEl) { - if (forkChoiceUpdateStatus == Status.VALID && this.syncStatus == SyncStatus.StartedEL) { + + var forkChoiceUpdate = forkChoiceUpdateFuture.get(); + if (forkChoiceUpdate.hasError()) { + throw new ForkchoiceUpdateException( + "could not accept new forkchoice: %s".formatted(forkChoiceUpdate.getError())); + } + var forkChoiceUpdateStatus = forkChoiceUpdate.getForkChoiceUpdate().payloadStatus(); + var updateStatus = forkChoiceUpdateStatus.getStatus(); + if (this.syncStatus.isEngineSyncing()) { + if (updateStatus == Status.VALID && this.syncStatus == SyncStatus.StartedEL) { this.syncStatus = SyncStatus.FinishedELNotFinalized; } // Allow SYNCING if engine P2P sync is enabled - if (forkChoiceUpdateStatus == Status.INVALID || forkChoiceUpdateStatus == Status.INVALID_BLOCK_HASH) { + if (updateStatus == Status.INVALID || forkChoiceUpdateStatus.getStatus() == Status.INVALID_BLOCK_HASH) { throw new ForkchoiceUpdateException(String.format( "could not accept new forkchoice: %s", - forkChoiceUpdate.payloadStatus().getValidationError())); + forkChoiceUpdateStatus.getValidationError())); } } else { - if (forkChoiceUpdate.payloadStatus().getStatus() != Status.VALID) { + if (updateStatus != Status.VALID) { throw new ForkchoiceUpdateException(String.format( "could not accept new forkchoice: %s", - forkChoiceUpdate.payloadStatus().getValidationError())); + forkChoiceUpdateStatus.getValidationError())); } } } diff --git a/hildr-utilities/src/main/java/io/optimism/utilities/rpc/Web3jProvider.java b/hildr-utilities/src/main/java/io/optimism/utilities/rpc/Web3jProvider.java index 34f54794..ece7d171 100644 --- a/hildr-utilities/src/main/java/io/optimism/utilities/rpc/Web3jProvider.java +++ b/hildr-utilities/src/main/java/io/optimism/utilities/rpc/Web3jProvider.java @@ -1,5 +1,6 @@ package io.optimism.utilities.rpc; +import ch.qos.logback.classic.Level; import com.google.common.util.concurrent.AbstractExecutionThreadService; import java.net.ConnectException; import java.util.ArrayList; @@ -72,6 +73,11 @@ public static Tuple2 create(String url, Function