Skip to content

Commit

Permalink
feat: introduce EngineApiRequest type (paradigmxyz#10158)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse authored Aug 7, 2024
1 parent 31b5548 commit 7ec3b92
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 52 deletions.
19 changes: 18 additions & 1 deletion crates/engine/tree/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use crate::{
download::{BlockDownloader, DownloadAction, DownloadOutcome},
};
use futures::{Stream, StreamExt};
use reth_beacon_consensus::BeaconConsensusEngineEvent;
use reth_beacon_consensus::{BeaconConsensusEngineEvent, BeaconEngineMessage};
use reth_chain_state::ExecutedBlock;
use reth_engine_primitives::EngineTypes;
use reth_primitives::{SealedBlockWithSenders, B256};
use std::{
collections::HashSet,
Expand Down Expand Up @@ -217,6 +219,21 @@ pub enum EngineApiKind {
OpStack,
}

/// The request variants that the engine API handler can receive.
#[derive(Debug)]
pub enum EngineApiRequest<T: EngineTypes> {
/// A request received from the consensus engine.
Beacon(BeaconEngineMessage<T>),
/// Request to insert an already executed block, e.g. via payload building.
InsertExecutedBlock(ExecutedBlock),
}

impl<T: EngineTypes> From<BeaconEngineMessage<T>> for EngineApiRequest<T> {
fn from(msg: BeaconEngineMessage<T>) -> Self {
Self::Beacon(msg)
}
}

/// Events emitted by the engine API handler.
#[derive(Debug)]
pub enum EngineApiEvent {
Expand Down
118 changes: 69 additions & 49 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use tracing::*;

mod config;
mod metrics;
use crate::tree::metrics::EngineApiMetrics;
use crate::{engine::EngineApiRequest, tree::metrics::EngineApiMetrics};
pub use config::TreeConfig;

/// Keeps track of the state of the tree.
Expand Down Expand Up @@ -373,9 +373,9 @@ pub struct EngineApiTreeHandler<P, E, T: EngineTypes> {
/// them one by one so that we can handle incoming engine API in between and don't become
/// unresponsive. This can happen during live sync transition where we're trying to close the
/// gap (up to 3 epochs of blocks in the worst case).
incoming_tx: Sender<FromEngine<BeaconEngineMessage<T>>>,
incoming_tx: Sender<FromEngine<EngineApiRequest<T>>>,
/// Incoming engine API requests.
incoming: Receiver<FromEngine<BeaconEngineMessage<T>>>,
incoming: Receiver<FromEngine<EngineApiRequest<T>>>,
/// Outgoing events that are emitted to the handler.
outgoing: UnboundedSender<EngineApiEvent>,
/// Channels to the persistence layer.
Expand Down Expand Up @@ -452,7 +452,7 @@ where
payload_builder: PayloadBuilderHandle<T>,
canonical_in_memory_state: CanonicalInMemoryState,
config: TreeConfig,
) -> (Sender<FromEngine<BeaconEngineMessage<T>>>, UnboundedReceiver<EngineApiEvent>) {
) -> (Sender<FromEngine<EngineApiRequest<T>>>, UnboundedReceiver<EngineApiEvent>) {
let best_block_number = provider.best_block_number().unwrap_or(0);
let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();

Expand Down Expand Up @@ -488,7 +488,7 @@ where
}

/// Returns a new [`Sender`] to send messages to this type.
pub fn sender(&self) -> Sender<FromEngine<BeaconEngineMessage<T>>> {
pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T>>> {
self.incoming_tx.clone()
}

Expand Down Expand Up @@ -804,7 +804,7 @@ where
/// Returns an error if the engine channel is disconnected.
fn try_recv_engine_message(
&self,
) -> Result<Option<FromEngine<BeaconEngineMessage<T>>>, RecvError> {
) -> Result<Option<FromEngine<EngineApiRequest<T>>>, RecvError> {
if self.persistence_state.in_progress() {
// try to receive the next request with a timeout to not block indefinitely
match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
Expand Down Expand Up @@ -868,7 +868,7 @@ where
}

/// Handles a message from the engine.
fn on_engine_message(&mut self, msg: FromEngine<BeaconEngineMessage<T>>) {
fn on_engine_message(&mut self, msg: FromEngine<EngineApiRequest<T>>) {
match msg {
FromEngine::Event(event) => match event {
FromOrchestrator::BackfillSyncStarted => {
Expand All @@ -879,43 +879,58 @@ where
self.on_backfill_sync_finished(ctrl);
}
},
FromEngine::Request(request) => match request {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
let mut output = self.on_forkchoice_updated(state, payload_attrs);

if let Ok(res) = &mut output {
// track last received forkchoice state
self.state
.forkchoice_state_tracker
.set_latest(state, res.outcome.forkchoice_status());

// emit an event about the handled FCU
self.emit_event(BeaconConsensusEngineEvent::ForkchoiceUpdated(
state,
res.outcome.forkchoice_status(),
));

// handle the event if any
self.on_maybe_tree_event(res.event.take());
FromEngine::Request(request) => {
match request {
EngineApiRequest::InsertExecutedBlock(block) => {
self.state.tree_state.insert_executed(block);
}

if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(Into::into)) {
error!("Failed to send event: {err:?}");
}
}
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => {
let output = self.on_new_payload(payload, cancun_fields);
if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(|e| {
reth_beacon_consensus::BeaconOnNewPayloadError::Internal(Box::new(e))
})) {
error!("Failed to send event: {err:?}");
EngineApiRequest::Beacon(request) => {
match request {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
let mut output = self.on_forkchoice_updated(state, payload_attrs);

if let Ok(res) = &mut output {
// track last received forkchoice state
self.state
.forkchoice_state_tracker
.set_latest(state, res.outcome.forkchoice_status());

// emit an event about the handled FCU
self.emit_event(BeaconConsensusEngineEvent::ForkchoiceUpdated(
state,
res.outcome.forkchoice_status(),
));

// handle the event if any
self.on_maybe_tree_event(res.event.take());
}

if let Err(err) =
tx.send(output.map(|o| o.outcome).map_err(Into::into))
{
error!("Failed to send event: {err:?}");
}
}
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => {
let output = self.on_new_payload(payload, cancun_fields);
if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(|e| {
reth_beacon_consensus::BeaconOnNewPayloadError::Internal(
Box::new(e),
)
})) {
error!("Failed to send event: {err:?}");
}
}
BeaconEngineMessage::TransitionConfigurationExchanged => {
// triggering this hook will record that we received a request from
// the CL
self.canonical_in_memory_state
.on_transition_configuration_exchanged();
}
}
}
}
BeaconEngineMessage::TransitionConfigurationExchanged => {
// triggering this hook will record that we received a request from the CL
self.canonical_in_memory_state.on_transition_configuration_exchanged();
}
},
}
FromEngine::DownloadedBlocks(blocks) => {
if let Some(event) = self.on_downloaded(blocks) {
self.on_tree_event(event);
Expand Down Expand Up @@ -1992,7 +2007,7 @@ mod tests {

struct TestHarness {
tree: EngineApiTreeHandler<MockEthProvider, MockExecutorProvider, EthEngineTypes>,
to_tree_tx: Sender<FromEngine<BeaconEngineMessage<EthEngineTypes>>>,
to_tree_tx: Sender<FromEngine<EngineApiRequest<EthEngineTypes>>>,
from_tree_rx: UnboundedReceiver<EngineApiEvent>,
blocks: Vec<ExecutedBlock>,
action_rx: Receiver<PersistenceAction>,
Expand Down Expand Up @@ -2136,7 +2151,8 @@ mod tests {
state: fcu_state,
payload_attrs: None,
tx,
},
}
.into(),
));

let response = rx.await.unwrap().unwrap().await.unwrap();
Expand Down Expand Up @@ -2367,7 +2383,8 @@ mod tests {
},
payload_attrs: None,
tx,
},
}
.into(),
));

let resp = rx.await.unwrap().unwrap().await.unwrap();
Expand Down Expand Up @@ -2424,11 +2441,14 @@ mod tests {
TestHarness::new(HOLESKY.clone()).with_backfill_state(BackfillSyncState::Active);

let (tx, rx) = oneshot::channel();
test_harness.tree.on_engine_message(FromEngine::Request(BeaconEngineMessage::NewPayload {
payload: payload.clone().into(),
cancun_fields: None,
tx,
}));
test_harness.tree.on_engine_message(FromEngine::Request(
BeaconEngineMessage::NewPayload {
payload: payload.clone().into(),
cancun_fields: None,
tx,
}
.into(),
));

let resp = rx.await.unwrap().unwrap();
assert!(resp.is_syncing());
Expand Down
4 changes: 2 additions & 2 deletions crates/ethereum/engine/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use reth_db_api::database::Database;
use reth_engine_tree::{
backfill::PipelineSync,
download::BasicBlockDownloader,
engine::{EngineApiRequestHandler, EngineHandler},
engine::{EngineApiRequest, EngineApiRequestHandler, EngineHandler},
persistence::PersistenceHandle,
tree::{EngineApiTreeHandler, TreeConfig},
};
Expand All @@ -33,7 +33,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
/// Alias for Ethereum chain orchestrator.
type EthServiceType<DB, Client> = ChainOrchestrator<
EngineHandler<
EngineApiRequestHandler<BeaconEngineMessage<EthEngineTypes>>,
EngineApiRequestHandler<EngineApiRequest<EthEngineTypes>>,
UnboundedReceiverStream<BeaconEngineMessage<EthEngineTypes>>,
BasicBlockDownloader<Client>,
>,
Expand Down

0 comments on commit 7ec3b92

Please sign in to comment.