diff --git a/crates/engine/tree/src/engine.rs b/crates/engine/tree/src/engine.rs index c7cbbb9f5a67..d7dfba4b0825 100644 --- a/crates/engine/tree/src/engine.rs +++ b/crates/engine/tree/src/engine.rs @@ -6,7 +6,9 @@ use crate::{ download::{BlockDownloader, DownloadAction, DownloadOutcome}, }; use futures::{Stream, StreamExt}; -use reth_beacon_consensus::BeaconConsensusEngineEvent; +use reth_beacon_consensus::{BeaconConsensusEngineEvent, BeaconEngineMessage}; +use reth_chain_state::ExecutedBlock; +use reth_engine_primitives::EngineTypes; use reth_primitives::{SealedBlockWithSenders, B256}; use std::{ collections::HashSet, @@ -217,6 +219,21 @@ pub enum EngineApiKind { OpStack, } +/// The request variants that the engine API handler can receive. +#[derive(Debug)] +pub enum EngineApiRequest { + /// A request received from the consensus engine. + Beacon(BeaconEngineMessage), + /// Request to insert an already executed block, e.g. via payload building. + InsertExecutedBlock(ExecutedBlock), +} + +impl From> for EngineApiRequest { + fn from(msg: BeaconEngineMessage) -> Self { + Self::Beacon(msg) + } +} + /// Events emitted by the engine API handler. #[derive(Debug)] pub enum EngineApiEvent { diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 710d0c4de21a..1d8da2c8f82c 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -59,7 +59,7 @@ use tracing::*; mod config; mod metrics; -use crate::tree::metrics::EngineApiMetrics; +use crate::{engine::EngineApiRequest, tree::metrics::EngineApiMetrics}; pub use config::TreeConfig; /// Keeps track of the state of the tree. @@ -373,9 +373,9 @@ pub struct EngineApiTreeHandler { /// them one by one so that we can handle incoming engine API in between and don't become /// unresponsive. This can happen during live sync transition where we're trying to close the /// gap (up to 3 epochs of blocks in the worst case). - incoming_tx: Sender>>, + incoming_tx: Sender>>, /// Incoming engine API requests. - incoming: Receiver>>, + incoming: Receiver>>, /// Outgoing events that are emitted to the handler. outgoing: UnboundedSender, /// Channels to the persistence layer. @@ -452,7 +452,7 @@ where payload_builder: PayloadBuilderHandle, canonical_in_memory_state: CanonicalInMemoryState, config: TreeConfig, - ) -> (Sender>>, UnboundedReceiver) { + ) -> (Sender>>, UnboundedReceiver) { let best_block_number = provider.best_block_number().unwrap_or(0); let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default(); @@ -488,7 +488,7 @@ where } /// Returns a new [`Sender`] to send messages to this type. - pub fn sender(&self) -> Sender>> { + pub fn sender(&self) -> Sender>> { self.incoming_tx.clone() } @@ -804,7 +804,7 @@ where /// Returns an error if the engine channel is disconnected. fn try_recv_engine_message( &self, - ) -> Result>>, RecvError> { + ) -> Result>>, RecvError> { if self.persistence_state.in_progress() { // try to receive the next request with a timeout to not block indefinitely match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) { @@ -868,7 +868,7 @@ where } /// Handles a message from the engine. - fn on_engine_message(&mut self, msg: FromEngine>) { + fn on_engine_message(&mut self, msg: FromEngine>) { match msg { FromEngine::Event(event) => match event { FromOrchestrator::BackfillSyncStarted => { @@ -879,43 +879,58 @@ where self.on_backfill_sync_finished(ctrl); } }, - FromEngine::Request(request) => match request { - BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { - let mut output = self.on_forkchoice_updated(state, payload_attrs); - - if let Ok(res) = &mut output { - // track last received forkchoice state - self.state - .forkchoice_state_tracker - .set_latest(state, res.outcome.forkchoice_status()); - - // emit an event about the handled FCU - self.emit_event(BeaconConsensusEngineEvent::ForkchoiceUpdated( - state, - res.outcome.forkchoice_status(), - )); - - // handle the event if any - self.on_maybe_tree_event(res.event.take()); + FromEngine::Request(request) => { + match request { + EngineApiRequest::InsertExecutedBlock(block) => { + self.state.tree_state.insert_executed(block); } - - if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(Into::into)) { - error!("Failed to send event: {err:?}"); - } - } - BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => { - let output = self.on_new_payload(payload, cancun_fields); - if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(|e| { - reth_beacon_consensus::BeaconOnNewPayloadError::Internal(Box::new(e)) - })) { - error!("Failed to send event: {err:?}"); + EngineApiRequest::Beacon(request) => { + match request { + BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { + let mut output = self.on_forkchoice_updated(state, payload_attrs); + + if let Ok(res) = &mut output { + // track last received forkchoice state + self.state + .forkchoice_state_tracker + .set_latest(state, res.outcome.forkchoice_status()); + + // emit an event about the handled FCU + self.emit_event(BeaconConsensusEngineEvent::ForkchoiceUpdated( + state, + res.outcome.forkchoice_status(), + )); + + // handle the event if any + self.on_maybe_tree_event(res.event.take()); + } + + if let Err(err) = + tx.send(output.map(|o| o.outcome).map_err(Into::into)) + { + error!("Failed to send event: {err:?}"); + } + } + BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => { + let output = self.on_new_payload(payload, cancun_fields); + if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(|e| { + reth_beacon_consensus::BeaconOnNewPayloadError::Internal( + Box::new(e), + ) + })) { + error!("Failed to send event: {err:?}"); + } + } + BeaconEngineMessage::TransitionConfigurationExchanged => { + // triggering this hook will record that we received a request from + // the CL + self.canonical_in_memory_state + .on_transition_configuration_exchanged(); + } + } } } - BeaconEngineMessage::TransitionConfigurationExchanged => { - // triggering this hook will record that we received a request from the CL - self.canonical_in_memory_state.on_transition_configuration_exchanged(); - } - }, + } FromEngine::DownloadedBlocks(blocks) => { if let Some(event) = self.on_downloaded(blocks) { self.on_tree_event(event); @@ -1992,7 +2007,7 @@ mod tests { struct TestHarness { tree: EngineApiTreeHandler, - to_tree_tx: Sender>>, + to_tree_tx: Sender>>, from_tree_rx: UnboundedReceiver, blocks: Vec, action_rx: Receiver, @@ -2136,7 +2151,8 @@ mod tests { state: fcu_state, payload_attrs: None, tx, - }, + } + .into(), )); let response = rx.await.unwrap().unwrap().await.unwrap(); @@ -2367,7 +2383,8 @@ mod tests { }, payload_attrs: None, tx, - }, + } + .into(), )); let resp = rx.await.unwrap().unwrap().await.unwrap(); @@ -2424,11 +2441,14 @@ mod tests { TestHarness::new(HOLESKY.clone()).with_backfill_state(BackfillSyncState::Active); let (tx, rx) = oneshot::channel(); - test_harness.tree.on_engine_message(FromEngine::Request(BeaconEngineMessage::NewPayload { - payload: payload.clone().into(), - cancun_fields: None, - tx, - })); + test_harness.tree.on_engine_message(FromEngine::Request( + BeaconEngineMessage::NewPayload { + payload: payload.clone().into(), + cancun_fields: None, + tx, + } + .into(), + )); let resp = rx.await.unwrap().unwrap(); assert!(resp.is_syncing()); diff --git a/crates/ethereum/engine/src/service.rs b/crates/ethereum/engine/src/service.rs index 5a1c409fd1ce..bb5b582cd8ac 100644 --- a/crates/ethereum/engine/src/service.rs +++ b/crates/ethereum/engine/src/service.rs @@ -6,7 +6,7 @@ use reth_db_api::database::Database; use reth_engine_tree::{ backfill::PipelineSync, download::BasicBlockDownloader, - engine::{EngineApiRequestHandler, EngineHandler}, + engine::{EngineApiRequest, EngineApiRequestHandler, EngineHandler}, persistence::PersistenceHandle, tree::{EngineApiTreeHandler, TreeConfig}, }; @@ -33,7 +33,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream; /// Alias for Ethereum chain orchestrator. type EthServiceType = ChainOrchestrator< EngineHandler< - EngineApiRequestHandler>, + EngineApiRequestHandler>, UnboundedReceiverStream>, BasicBlockDownloader, >,