Skip to content

Commit

Permalink
feat(ethexe): Initial on-chain db-sync request validation (#4243)
Browse files Browse the repository at this point in the history
  • Loading branch information
ark0f authored Oct 24, 2024
1 parent 40ae438 commit 291bf27
Show file tree
Hide file tree
Showing 8 changed files with 462 additions and 81 deletions.
87 changes: 65 additions & 22 deletions ethexe/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ use crate::{
config::{Config, ConfigPublicKey, PrometheusConfig},
metrics::MetricsService,
};
use anyhow::{anyhow, Ok, Result};
use anyhow::{anyhow, Result};
use ethexe_common::{
router::{
BlockCommitment, CodeCommitment, RequestEvent as RouterRequestEvent, StateTransition,
},
BlockRequestEvent,
};
use ethexe_db::{BlockMetaStorage, CodesStorage, Database};
use ethexe_ethereum::router::RouterQuery;
use ethexe_network::NetworkReceiverEvent;
use ethexe_ethereum::{primitives::U256, router::RouterQuery};
use ethexe_network::{db_sync, NetworkReceiverEvent};
use ethexe_observer::{RequestBlockData, RequestEvent};
use ethexe_processor::LocalOutcome;
use ethexe_sequencer::agro::AggregatedCommitments;
Expand All @@ -53,6 +53,7 @@ pub struct Service {
db: Database,
observer: ethexe_observer::Observer,
query: ethexe_observer::Query,
router_query: RouterQuery,
processor: ethexe_processor::Processor,
signer: ethexe_signer::Signer,
block_time: Duration,
Expand Down Expand Up @@ -186,6 +187,7 @@ impl Service {
network,
observer,
query,
router_query,
processor,
sequencer,
signer,
Expand All @@ -210,6 +212,7 @@ impl Service {
db: Database,
observer: ethexe_observer::Observer,
query: ethexe_observer::Query,
router_query: RouterQuery,
processor: ethexe_processor::Processor,
signer: ethexe_signer::Signer,
block_time: Duration,
Expand All @@ -223,6 +226,7 @@ impl Service {
db,
observer,
query,
router_query,
processor,
signer,
block_time,
Expand Down Expand Up @@ -404,6 +408,7 @@ impl Service {
network,
mut observer,
mut query,
mut router_query,
mut processor,
mut sequencer,
signer: _signer,
Expand Down Expand Up @@ -505,25 +510,39 @@ impl Service {

validation_round_timer.stop();
}
event = maybe_await(network_receiver.as_mut().map(|rx| rx.recv())) => {
let Some(NetworkReceiverEvent::Message { source, data }) = event else {
continue;
};

log::debug!("Received a network message from peer {source:?}");

let result = Self::process_network_message(
data.as_slice(),
&db,
validator.as_mut(),
sequencer.as_mut(),
network_sender.as_mut(),
);

if let Err(err) = result {
// TODO: slash peer/validator in case of error #4175
// TODO: consider error log as temporary solution #4175
log::warn!("Failed to process network message: {err}");
Some(event) = maybe_await(network_receiver.as_mut().map(|rx| rx.recv())) => {
match event {
NetworkReceiverEvent::Message { source, data } => {
log::debug!("Received a network message from peer {source:?}");

let result = Self::process_network_message(
data.as_slice(),
&db,
validator.as_mut(),
sequencer.as_mut(),
network_sender.as_mut(),
);

if let Err(err) = result {
// TODO: slash peer/validator in case of error #4175
// TODO: consider error log as temporary solution #4175
log::warn!("Failed to process network message: {err}");
}
}
NetworkReceiverEvent::ExternalValidation(validating_response) => {
let validated = Self::process_response_validation(&validating_response, &mut router_query).await?;
let res = if validated {
Ok(validating_response)
} else {
Err(validating_response)
};

network_sender
.as_mut()
.expect("if network receiver is `Some()` so does sender")
.request_validated(res);
}
_ => {}
}
}
_ = maybe_await(network_handle.as_mut()) => {
Expand Down Expand Up @@ -781,6 +800,30 @@ impl Service {
}
}
}

async fn process_response_validation(
validating_response: &db_sync::ValidatingResponse,
router_query: &mut RouterQuery,
) -> Result<bool> {
let response = validating_response.response();

if let db_sync::Response::ProgramIds(ids) = response {
let ethereum_programs = router_query.programs_count().await?;
if ethereum_programs != U256::from(ids.len()) {
return Ok(false);
}

// TODO: #4309
for &id in ids {
let code_id = router_query.program_code_id(id).await?;
if code_id.is_none() {
return Ok(false);
}
}
}

Ok(true)
}
}

mod utils {
Expand Down
5 changes: 5 additions & 0 deletions ethexe/cli/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1274,6 +1274,10 @@ mod utils {
.await
.unwrap();

let router_query = RouterQuery::new(&self.rpc_url, self.router_address)
.await
.unwrap();

let network = self.network_address.as_ref().map(|addr| {
let config_path = tempfile::tempdir().unwrap().into_path();
let mut config =
Expand Down Expand Up @@ -1322,6 +1326,7 @@ mod utils {
self.db.clone(),
self.observer.clone(),
query,
router_query,
processor,
self.signer.clone(),
self.block_time,
Expand Down
4 changes: 4 additions & 0 deletions ethexe/ethereum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ pub mod mirror;
pub mod router;
pub mod wvara;

pub mod primitives {
pub use alloy::primitives::*;
}

pub(crate) type AlloyTransport = BoxTransport;
type AlloyProvider =
FillProvider<ExeFiller, RootProvider<AlloyTransport>, AlloyTransport, AlloyEthereum>;
Expand Down
17 changes: 15 additions & 2 deletions ethexe/ethereum/src/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use crate::{abi::IRouter, wvara::WVara, AlloyProvider, AlloyTransport, TryGetReceipt};
use alloy::{
consensus::{SidecarBuilder, SimpleCoder},
primitives::{Address, Bytes, B256},
primitives::{Address, Bytes, B256, U256},
providers::{Provider, ProviderBuilder, RootProvider},
rpc::types::Filter,
transports::BoxTransport,
Expand All @@ -29,7 +29,7 @@ use ethexe_common::router::{BlockCommitment, CodeCommitment};
use ethexe_signer::{Address as LocalAddress, Signature as LocalSignature};
use events::signatures;
use futures::StreamExt;
use gear_core::ids::prelude::CodeIdExt as _;
use gear_core::ids::{prelude::CodeIdExt as _, ProgramId};
use gprimitives::{ActorId, CodeId, H160, H256};
use std::sync::Arc;

Expand Down Expand Up @@ -275,4 +275,17 @@ impl RouterQuery {
.map(|res| res._0.to())
.map_err(Into::into)
}

pub async fn programs_count(&self) -> Result<U256> {
let count = self.instance.programsCount().call().await?;
Ok(count._0)
}

pub async fn program_code_id(&self, program_id: ProgramId) -> Result<Option<CodeId>> {
let program_id = LocalAddress::try_from(program_id).expect("infallible");
let program_id = Address::new(program_id.0);
let code_id = self.instance.programCodeId(program_id).call().await?;
let code_id = Some(CodeId::new(code_id._0.0)).filter(|&code_id| code_id != CodeId::zero());
Ok(code_id)
}
}
Loading

0 comments on commit 291bf27

Please sign in to comment.