Skip to content

Commit

Permalink
fix: execution layer sync process
Browse files Browse the repository at this point in the history
  • Loading branch information
thinkAfCod committed Apr 24, 2024
1 parent 053eef6 commit 3ce9e47
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 90 deletions.
4 changes: 4 additions & 0 deletions hildr-node/src/main/java/io/optimism/common/BlockInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.math.BigInteger;
import java.util.Objects;
import org.web3j.protocol.core.methods.response.EthBlock.Block;
import org.web3j.utils.Numeric;

/**
* The type BlockInfo.
Expand All @@ -19,6 +20,9 @@
*/
public record BlockInfo(String hash, BigInteger number, String parentHash, BigInteger timestamp) {

public static final BlockInfo EMPTY = new BlockInfo(
Numeric.toHexString(new byte[32]), BigInteger.ZERO, Numeric.toHexString(new byte[32]), BigInteger.ZERO);

/**
* From block info.
*
Expand Down
111 changes: 70 additions & 41 deletions hildr-node/src/main/java/io/optimism/driver/Driver.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -210,29 +211,16 @@ public static Driver<EngineApi> from(Config config, CountDownLatch latch)
l1StartBlock.compareTo(BigInteger.ZERO) < 0 ? BigInteger.ZERO : l1StartBlock,
finalizedHead.number(),
config);
TreeMap<BigInteger, Tuple2<BlockInfo, Epoch>> l2Refs;
if (config.syncMode().isEl()) {
finalizedHead = BlockInfo.EMPTY;
l2Refs = new TreeMap<>();
} else {
l2Refs = io.optimism.derive.State.initL2Refs(finalizedHead.number(), config.chainConfig(), l2Provider);
}

var l2Refs = io.optimism.derive.State.initL2Refs(finalizedHead.number(), config.chainConfig(), l2Provider);
var l2Fetcher = (Function<BigInteger, Tuple2<BlockInfo, Epoch>>) blockNum -> {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
StructuredTaskScope.Subtask<EthBlock> blockTask = scope.fork(TracerTaskWrapper.wrap(() -> l2Provider
.ethGetBlockByNumber(DefaultBlockParameter.valueOf(blockNum), true)
.send()));
scope.join();
scope.throwIfFailed();

var block = blockTask.get();
if (block == null || block.getBlock() == null) {
return null;
}
final HeadInfo l2BlockInfo = HeadInfo.from(block.getBlock());
return new Tuple2<>(l2BlockInfo.l2BlockInfo(), l2BlockInfo.l1Epoch());
} catch (Exception e) {
LOGGER.error("failed to fetch L2 block", e);
return null;
}
};
AtomicReference<io.optimism.derive.State> state = new AtomicReference<>(
io.optimism.derive.State.create(l2Refs, l2Fetcher, finalizedHead, finalizedEpoch, config));
AtomicReference<io.optimism.derive.State> state = new AtomicReference<>(io.optimism.derive.State.create(
l2Refs, Driver.l2Fetcher(l2Provider), finalizedHead, finalizedEpoch, config));

EngineDriver<EngineApi> engineDriver = new EngineDriver<>(finalizedHead, finalizedEpoch, l2Provider, config);

Expand Down Expand Up @@ -302,6 +290,9 @@ public RollupConfigResult getRollupConfig() {
* @return result of sync status.
*/
public SyncStatusResult getSyncStatus() {
if (this.engineDriver.isEngineSyncing()) {
return null;
}
// CurrentL1
final var currentL1 = this.chainWatcher.getCurrentL1();
// CurrentL1Finalized
Expand Down Expand Up @@ -362,7 +353,9 @@ protected void startUp() {
LOGGER.error("driver start fatal error", e);
throw new HildrServiceExecutionException(e);
}
this.chainWatcher.start();
if (!Driver.this.config.syncMode().isEl()) {
this.chainWatcher.start();
}
}

@Override
Expand Down Expand Up @@ -482,19 +475,30 @@ private void advanceUnsafeHead() throws ExecutionException, InterruptedException
BigInteger unsafeBlockNum = payload.blockNumber();
BigInteger syncedBlockNum =
Driver.this.engineDriver.getUnsafeHead().number();
if (Driver.this.engineDriver.isEngineSyncing()) {
return unsafeBlockNum.compareTo(syncedBlockNum) > 0;
}
return unsafeBlockNum.compareTo(syncedBlockNum) > 0
&& unsafeBlockNum.subtract(syncedBlockNum).compareTo(BigInteger.valueOf(1024L)) < 0;
})
.collect(Collectors.toList());

Optional<ExecutionPayload> nextUnsafePayload = Iterables.tryFind(
this.futureUnsafeBlocks, input -> input.parentHash()
.equalsIgnoreCase(
Driver.this.engineDriver.getUnsafeHead().hash()))
.toJavaUtil();
Optional<ExecutionPayload> nextUnsafePayload;
if (Driver.this.engineDriver.isEngineSyncing() && !this.futureUnsafeBlocks.isEmpty()) {
nextUnsafePayload = Optional.of(this.futureUnsafeBlocks.getLast());
} else {
nextUnsafePayload = Iterables.tryFind(this.futureUnsafeBlocks, input -> input.parentHash()
.equalsIgnoreCase(
Driver.this.engineDriver.getUnsafeHead().hash()))
.toJavaUtil();
}

if (nextUnsafePayload.isPresent()) {
this.engineDriver.handleUnsafePayload(nextUnsafePayload.get());
if (this.config.syncMode().isEl() && !this.engineDriver.isEngineSyncing()) {
LOGGER.info("execution layer syncing is done, restarting chain watcher.");
this.restartChainWatcher(false);
}
}
}

Expand Down Expand Up @@ -529,25 +533,28 @@ private void handleNextBlockUpdate() {
case BlockUpdate.Reorg ignored -> {
LOGGER.warn("reorg detected, purging pipeline");
Driver.this.unfinalizedBlocks.clear();

Driver.this.chainWatcher.restart(
Driver.this.engineDriver.getFinalizedEpoch().number().subtract(this.channelTimeout),
Driver.this.engineDriver.getFinalizedHead().number());

Driver.this.state.getAndUpdate(state -> {
state.purge(
Driver.this.engineDriver.getFinalizedHead(), Driver.this.engineDriver.getFinalizedEpoch());
return state;
});

Driver.this.pipeline.purge();
Driver.this.engineDriver.reorg();
Driver.this.restartChainWatcher(true);
}
case BlockUpdate.FinalityUpdate num -> Driver.this.finalizedL1BlockNumber = num.get();
default -> throw new IllegalArgumentException("unknown block update type");
}
}

private void restartChainWatcher(boolean isReorg) {
Driver.this.chainWatcher.restart(
Driver.this.engineDriver.getFinalizedEpoch().number().subtract(this.channelTimeout),
Driver.this.engineDriver.getFinalizedHead().number());

Driver.this.state.getAndUpdate(state -> {
state.purge(Driver.this.engineDriver.getFinalizedHead(), Driver.this.engineDriver.getFinalizedEpoch());
return state;
});
Driver.this.pipeline.purge();
if (isReorg) {
Driver.this.engineDriver.reorg();
}
}

private void updateFinalized() {
UnfinalizedBlock newFinalized = Iterables.getLast(
this.unfinalizedBlocks.stream()
Expand Down Expand Up @@ -639,6 +646,28 @@ public static L2BlockRef payloadToRef(ExecutionPayload payload, Config.ChainConf
sequenceNumber);
}

private static Function<BigInteger, Tuple2<BlockInfo, Epoch>> l2Fetcher(final Web3j l2Provider) {
return blockNum -> {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
StructuredTaskScope.Subtask<EthBlock> blockTask = scope.fork(TracerTaskWrapper.wrap(() -> l2Provider
.ethGetBlockByNumber(DefaultBlockParameter.valueOf(blockNum), true)
.send()));
scope.join();
scope.throwIfFailed();

var block = blockTask.get();
if (block == null || block.getBlock() == null) {
return null;
}
final HeadInfo l2BlockInfo = HeadInfo.from(block.getBlock());
return new Tuple2<>(l2BlockInfo.l2BlockInfo(), l2BlockInfo.l1Epoch());
} catch (Exception e) {
LOGGER.error("failed to fetch L2 block", e);
return null;
}
};
}

/**
* The type Unfinalized block.
*
Expand Down
8 changes: 6 additions & 2 deletions hildr-node/src/main/java/io/optimism/driver/EngineDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public void handleUnsafePayload(ExecutionPayload payload) throws ExecutionExcept
var l2Finalized = l2Client.ethGetBlockByNumber(DefaultBlockParameterName.FINALIZED, true)
.sendAsync()
.get();
if (l2Finalized != null && l2Finalized.getBlock() != null) {
if (l2Finalized.hasError() && l2Finalized.getError().getMessage().contains("block not found")) {
this.syncStatus = SyncStatus.StartedEL;
LOGGER.info("Starting EL sync");
} else if (this.chainConfig.l2Genesis().number().compareTo(BigInteger.ZERO) != 0
Expand Down Expand Up @@ -241,6 +241,10 @@ public void reorg() {
* @throws InterruptedException the interrupted exception
*/
public boolean engineReady() throws InterruptedException {
if (this.syncModeEl) {
// Skip check if EL sync is enabled
return true;
}
ForkchoiceState forkchoiceState = createForkchoiceState();

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Expand Down Expand Up @@ -324,7 +328,7 @@ private void updateForkchoice() throws InterruptedException, ExecutionException
this.syncStatus = SyncStatus.FinishedELNotFinalized;
}
// Allow SYNCING if engine P2P sync is enabled
if (forkChoiceUpdateStatus != Status.VALID && forkChoiceUpdateStatus != Status.SYNCING) {
if (forkChoiceUpdateStatus == Status.INVALID || forkChoiceUpdateStatus == Status.INVALID_BLOCK_HASH) {
throw new ForkchoiceUpdateException(String.format(
"could not accept new forkchoice: %s",
forkChoiceUpdate.payloadStatus().getValidationError()));
Expand Down
32 changes: 16 additions & 16 deletions hildr-node/src/main/java/io/optimism/engine/EngineApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,15 @@ public EngineApi fromEnv(Config config) {
String baseUrlParm = System.getenv("ENGINE_API_URL");
if (StringUtils.isBlank(baseUrlParm)) {
throw new RuntimeException(
"""
"""
ENGINE_API_URL environment variable not set.
Please set this to the base url of the engine api
""");
}
String secretKey = System.getenv("JWT_SECRET");
if (StringUtils.isBlank(secretKey)) {
throw new RuntimeException(
"""
"""
JWT_SECRET environment variable not set.
Please set this to the 256 bit hex-encoded secret key
used to authenticate with the engine api.
Expand Down Expand Up @@ -146,26 +146,26 @@ protected static String generateJws(Key key) {
Date nowDate = Date.from(now);
Date expirationDate = Date.from(now.plusSeconds(60));
return Jwts.builder()
.setIssuedAt(nowDate)
.setExpiration(expirationDate)
.signWith(key, SignatureAlgorithm.HS256)
.compact();
.setIssuedAt(nowDate)
.setExpiration(expirationDate)
.signWith(key, SignatureAlgorithm.HS256)
.compact();
}

@Override
public OpEthForkChoiceUpdate forkchoiceUpdated(ForkchoiceState forkchoiceState, PayloadAttributes payloadAttributes)
throws IOException {
throws IOException {
var method = ENGINE_FORKCHOICE_UPDATED_V2;
var ecotoneTime = this.config.chainConfig().ecotoneTime();
if (payloadAttributes == null || payloadAttributes.timestamp().compareTo(ecotoneTime) >= 0) {
method = ENGINE_FORKCHOICE_UPDATED_V3;
}
web3jService.addHeader("authorization", String.format("Bearer %1$s", generateJws(key)));
Request<?, OpEthForkChoiceUpdate> r = new Request<>(
method,
Arrays.asList(forkchoiceState, payloadAttributes != null ? payloadAttributes.toReq() : null),
web3jService,
OpEthForkChoiceUpdate.class);
method,
Arrays.asList(forkchoiceState, payloadAttributes != null ? payloadAttributes.toReq() : null),
web3jService,
OpEthForkChoiceUpdate.class);
return r.send();
}

Expand Down Expand Up @@ -197,11 +197,11 @@ public OpEthExecutionPayload getPayload(BigInteger timestamp, BigInteger payload
}
web3jService.addHeader("authorization", String.format("Bearer %1$s", generateJws(key)));
Request<?, OpEthExecutionPayload> r = new Request<>(
method,
Collections.singletonList(
payloadId != null ? Numeric.toHexStringWithPrefixZeroPadded(payloadId, 16) : null),
web3jService,
OpEthExecutionPayload.class);
method,
Collections.singletonList(
payloadId != null ? Numeric.toHexStringWithPrefixZeroPadded(payloadId, 16) : null),
web3jService,
OpEthExecutionPayload.class);
return r.send();
}

Expand Down
Loading

0 comments on commit 3ce9e47

Please sign in to comment.