Skip to content

Commit

Permalink
fix: start up in EL sync mode
Browse files Browse the repository at this point in the history
  • Loading branch information
thinkAfCod committed Apr 25, 2024
1 parent aba2d3c commit c49ee4a
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 29 deletions.
10 changes: 5 additions & 5 deletions hildr-node/src/main/java/io/optimism/derive/State.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.StructuredTaskScope;
import java.util.function.Function;
import java.util.function.BiFunction;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,7 +36,7 @@ public class State {

private final TreeMap<BigInteger, Tuple2<BlockInfo, Epoch>> l2Refs;

private final Function<BigInteger, Tuple2<BlockInfo, Epoch>> l2Fetcher;
private final BiFunction<DefaultBlockParameter, Boolean, Tuple2<BlockInfo, Epoch>> l2Fetcher;

private BlockInfo safeHead;

Expand All @@ -62,7 +62,7 @@ public State(
TreeMap<String, L1Info> l1Info,
TreeMap<BigInteger, String> l1Hashes,
TreeMap<BigInteger, Tuple2<BlockInfo, Epoch>> l2Refs,
Function<BigInteger, Tuple2<BlockInfo, Epoch>> l2Fetcher,
BiFunction<DefaultBlockParameter, Boolean, Tuple2<BlockInfo, Epoch>> l2Fetcher,
BlockInfo safeHead,
Epoch safeEpoch,
BigInteger currentEpochNum,
Expand All @@ -89,7 +89,7 @@ public State(
*/
public static State create(
TreeMap<BigInteger, Tuple2<BlockInfo, Epoch>> l2Refs,
Function<BigInteger, Tuple2<BlockInfo, Epoch>> l2Fetcher,
BiFunction<DefaultBlockParameter, Boolean, Tuple2<BlockInfo, Epoch>> l2Fetcher,
BlockInfo finalizedHead,
Epoch finalizedEpoch,
Config config) {
Expand Down Expand Up @@ -144,7 +144,7 @@ public Tuple2<BlockInfo, Epoch> l2Info(BigInteger timestamp) {
return cache;
}
LOGGER.warn("L2 refs cache not contains, will fetch from geth: blockNum = {}", blockNum);
var res = l2Fetcher.apply(blockNum);
var res = l2Fetcher.apply(DefaultBlockParameter.valueOf(blockNum), true);
this.l2Refs.put(res.component1().number(), res);
return res;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ private BatchStatus spanBatchStatus(final Batch batchWrapper) {

// check batch timestamp
if (spanEndTimestamp.compareTo(nextTimestamp) < 0) {
LOGGER.warn("past batch");
LOGGER.warn("past batch: nextTimestamp = l2SafeHead({}) + blockTime({}), spanEndTimestamp({})",
l2SafeHead.timestamp(), this.config.chainConfig().blockTime(), spanEndTimestamp);
return BatchStatus.Drop;
}
if (spanStartTimestamp.compareTo(nextTimestamp) > 0) {
Expand Down
60 changes: 46 additions & 14 deletions hildr-node/src/main/java/io/optimism/driver/Driver.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
Expand Down Expand Up @@ -84,6 +85,8 @@ public class Driver<E extends Engine> extends AbstractExecutionThreadService {

private List<ExecutionPayload> futureUnsafeBlocks;

private final BiFunction<DefaultBlockParameter, Boolean, Tuple2<BlockInfo, Epoch>> l2Fetcher;

private final AtomicReference<io.optimism.derive.State> state;

private final ChainWatcher chainWatcher;
Expand All @@ -106,6 +109,8 @@ public class Driver<E extends Engine> extends AbstractExecutionThreadService {

private final AtomicBoolean isP2PNetworkStarted;

private final AtomicBoolean isElsyncFinished;

/**
* Instantiates a new Driver.
*
Expand All @@ -123,6 +128,7 @@ public class Driver<E extends Engine> extends AbstractExecutionThreadService {
public Driver(
EngineDriver<E> engineDriver,
Pipeline pipeline,
BiFunction<DefaultBlockParameter, Boolean, Tuple2<BlockInfo, Epoch>> l2Fetcher,
AtomicReference<io.optimism.derive.State> state,
ChainWatcher chainWatcher,
MessagePassingQueue<ExecutionPayload> unsafeBlockQueue,
Expand All @@ -133,6 +139,7 @@ public Driver(
this.engineDriver = engineDriver;
this.rpcServer = rpcServer;
this.pipeline = pipeline;
this.l2Fetcher = l2Fetcher;
this.state = state;
this.chainWatcher = chainWatcher;
this.unsafeBlockQueue = unsafeBlockQueue;
Expand All @@ -148,6 +155,7 @@ public Driver(
rpcHandler.put(RpcMethod.OP_ROLLUP_CONFIG.getRpcMethodName(), unused -> this.getRollupConfig());
this.rpcServer.register(rpcHandler);
this.isP2PNetworkStarted = new AtomicBoolean(false);
this.isElsyncFinished = new AtomicBoolean(false);
}

/**
Expand Down Expand Up @@ -218,9 +226,9 @@ public static Driver<EngineApi> from(Config config, CountDownLatch latch)
} else {
l2Refs = io.optimism.derive.State.initL2Refs(finalizedHead.number(), config.chainConfig(), l2Provider);
}

var l2Fetcher = Driver.l2Fetcher(l2Provider);
AtomicReference<io.optimism.derive.State> state = new AtomicReference<>(io.optimism.derive.State.create(
l2Refs, Driver.l2Fetcher(l2Provider), finalizedHead, finalizedEpoch, config));
l2Refs, l2Fetcher, finalizedHead, finalizedEpoch, config));

EngineDriver<EngineApi> engineDriver = new EngineDriver<>(finalizedHead, finalizedEpoch, l2Provider, config);

Expand All @@ -234,7 +242,7 @@ public static Driver<EngineApi> from(Config config, CountDownLatch latch)

l2Provider.shutdown();
return new Driver<>(
engineDriver, pipeline, state, watcher, unsafeBlockQueue, rpcServer, latch, config, opStackNetwork);
engineDriver, pipeline, l2Fetcher, state, watcher, unsafeBlockQueue, rpcServer, latch, config, opStackNetwork);
}

/**
Expand Down Expand Up @@ -494,10 +502,21 @@ private void advanceUnsafeHead() throws ExecutionException, InterruptedException
}

if (nextUnsafePayload.isPresent()) {
this.engineDriver.handleUnsafePayload(nextUnsafePayload.get());
try {
this.engineDriver.handleUnsafePayload(nextUnsafePayload.get());
} catch (ForkchoiceUpdateException e) {
if (!this.config.syncMode().isEl()) {
throw e;
}
// Ignore fork choice update exception during EL syncing
LOGGER.warn("Failed to insert unsafe payload for EL sync: ", e);
}
if (this.config.syncMode().isEl() && !this.engineDriver.isEngineSyncing()) {
LOGGER.info("execution layer syncing is done, restarting chain watcher.");
this.restartChainWatcher(false);
if (!this.isElsyncFinished.compareAndExchange(false, true)) {
LOGGER.info("execution layer syncing is done, restarting chain watcher.");
this.fetchAndUpdateFinalizedHead();
this.restartChainWatcher();
}
}
}
}
Expand Down Expand Up @@ -533,14 +552,14 @@ private void handleNextBlockUpdate() {
case BlockUpdate.Reorg ignored -> {
LOGGER.warn("reorg detected, purging pipeline");
Driver.this.unfinalizedBlocks.clear();
Driver.this.restartChainWatcher(true);
Driver.this.restartChainWatcher();
}
case BlockUpdate.FinalityUpdate num -> Driver.this.finalizedL1BlockNumber = num.get();
default -> throw new IllegalArgumentException("unknown block update type");
}
}

private void restartChainWatcher(boolean isReorg) {
private void restartChainWatcher() {
Driver.this.chainWatcher.restart(
Driver.this.engineDriver.getFinalizedEpoch().number().subtract(this.channelTimeout),
Driver.this.engineDriver.getFinalizedHead().number());
Expand All @@ -550,9 +569,7 @@ private void restartChainWatcher(boolean isReorg) {
return state;
});
Driver.this.pipeline.purge();
if (isReorg) {
Driver.this.engineDriver.reorg();
}
Driver.this.engineDriver.reorg();
}

private void updateFinalized() {
Expand Down Expand Up @@ -646,11 +663,24 @@ public static L2BlockRef payloadToRef(ExecutionPayload payload, Config.ChainConf
sequenceNumber);
}

private static Function<BigInteger, Tuple2<BlockInfo, Epoch>> l2Fetcher(final Web3j l2Provider) {
return blockNum -> {
private void fetchAndUpdateFinalizedHead() {
DefaultBlockParameter blockParameter;
if (this.engineDriver.getFinalizedHead().number().compareTo(BigInteger.ZERO) == 0) {
blockParameter = FINALIZED;
} else {
blockParameter = DefaultBlockParameter.valueOf(this.engineDriver.getFinalizedHead().number());
}
Tuple2<BlockInfo, Epoch> finalizedHead = l2Fetcher.apply(blockParameter, true);
this.engineDriver.updateFinalized(finalizedHead.component1(), finalizedHead.component2());
}



private static BiFunction<DefaultBlockParameter, Boolean, Tuple2<BlockInfo, Epoch>> l2Fetcher(final Web3j l2Provider) {
return (blockParameter, returnFull) -> {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
StructuredTaskScope.Subtask<EthBlock> blockTask = scope.fork(TracerTaskWrapper.wrap(() -> l2Provider
.ethGetBlockByNumber(DefaultBlockParameter.valueOf(blockNum), true)
.ethGetBlockByNumber(blockParameter, returnFull)
.send()));
scope.join();
scope.throwIfFailed();
Expand All @@ -668,6 +698,8 @@ private static Function<BigInteger, Tuple2<BlockInfo, Epoch>> l2Fetcher(final We
};
}



/**
* The type Unfinalized block.
*
Expand Down
23 changes: 14 additions & 9 deletions hildr-node/src/main/java/io/optimism/driver/EngineDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ public void handleAttributes(PayloadAttributes attributes) throws ExecutionExcep
*/
public void handleUnsafePayload(ExecutionPayload payload) throws ExecutionException, InterruptedException {
if (this.syncStatus == SyncStatus.WillStartEL) {
// todo query from l2Client
var l2Finalized = l2Client.ethGetBlockByNumber(DefaultBlockParameterName.FINALIZED, true)
.sendAsync()
.get();
Expand Down Expand Up @@ -321,23 +320,29 @@ private void updateForkchoice() throws InterruptedException, ExecutionException

scope.join();
scope.throwIfFailed();
ForkChoiceUpdate forkChoiceUpdate = forkChoiceUpdateFuture.get().getForkChoiceUpdate();
Status forkChoiceUpdateStatus = forkChoiceUpdate.payloadStatus().getStatus();
if (this.syncModeEl) {
if (forkChoiceUpdateStatus == Status.VALID && this.syncStatus == SyncStatus.StartedEL) {

var forkChoiceUpdate = forkChoiceUpdateFuture.get();
if (forkChoiceUpdate.hasError()) {
throw new ForkchoiceUpdateException(
"could not accept new forkchoice: %s".formatted(forkChoiceUpdate.getError()));
}
var forkChoiceUpdateStatus = forkChoiceUpdate.getForkChoiceUpdate().payloadStatus();
var updateStatus = forkChoiceUpdateStatus.getStatus();
if (this.syncStatus.isEngineSyncing()) {
if (updateStatus == Status.VALID && this.syncStatus == SyncStatus.StartedEL) {
this.syncStatus = SyncStatus.FinishedELNotFinalized;
}
// Allow SYNCING if engine P2P sync is enabled
if (forkChoiceUpdateStatus == Status.INVALID || forkChoiceUpdateStatus == Status.INVALID_BLOCK_HASH) {
if (updateStatus == Status.INVALID || forkChoiceUpdateStatus.getStatus() == Status.INVALID_BLOCK_HASH) {
throw new ForkchoiceUpdateException(String.format(
"could not accept new forkchoice: %s",
forkChoiceUpdate.payloadStatus().getValidationError()));
forkChoiceUpdateStatus.getValidationError()));
}
} else {
if (forkChoiceUpdate.payloadStatus().getStatus() != Status.VALID) {
if (updateStatus != Status.VALID) {
throw new ForkchoiceUpdateException(String.format(
"could not accept new forkchoice: %s",
forkChoiceUpdate.payloadStatus().getValidationError()));
forkChoiceUpdateStatus.getValidationError()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.optimism.utilities.rpc;

import ch.qos.logback.classic.Level;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import java.net.ConnectException;
import java.util.ArrayList;
Expand Down Expand Up @@ -72,6 +73,11 @@ public static Tuple2<Web3j, Web3jService> create(String url, Function<String, Bo
web3Srv = new HttpService(url, okHttpClient);

} else if (Web3jProvider.isWs(url)) {
var logger = LoggerFactory.getLogger("org.web3j.protocol.websocket");
if (logger instanceof ch.qos.logback.classic.Logger) {
var level = !LOGGER.isTraceEnabled() ? Level.INFO : Level.DEBUG;
((ch.qos.logback.classic.Logger) logger).setLevel(level);
}
final var web3finalSrv = new WebSocketService(url, true);
wsConnect(web3finalSrv);
web3Srv = web3finalSrv;
Expand Down

0 comments on commit c49ee4a

Please sign in to comment.