From 291bf27fa7fb759edef2f8ce21b9662c017cdd26 Mon Sep 17 00:00:00 2001 From: Arsenii Lyashenko Date: Thu, 24 Oct 2024 18:28:31 +0300 Subject: [PATCH] feat(ethexe): Initial on-chain `db-sync` request validation (#4243) --- ethexe/cli/src/service.rs | 87 +++++++++--- ethexe/cli/src/tests.rs | 5 + ethexe/ethereum/src/lib.rs | 4 + ethexe/ethereum/src/router/mod.rs | 17 ++- ethexe/network/src/db_sync/mod.rs | 188 ++++++++++++++++++++++---- ethexe/network/src/db_sync/ongoing.rs | 161 ++++++++++++++++++---- ethexe/network/src/lib.rs | 73 +++++++++- ethexe/network/src/peer_score.rs | 8 ++ 8 files changed, 462 insertions(+), 81 deletions(-) diff --git a/ethexe/cli/src/service.rs b/ethexe/cli/src/service.rs index a418816637b..b9b17dde143 100644 --- a/ethexe/cli/src/service.rs +++ b/ethexe/cli/src/service.rs @@ -22,7 +22,7 @@ use crate::{ config::{Config, ConfigPublicKey, PrometheusConfig}, metrics::MetricsService, }; -use anyhow::{anyhow, Ok, Result}; +use anyhow::{anyhow, Result}; use ethexe_common::{ router::{ BlockCommitment, CodeCommitment, RequestEvent as RouterRequestEvent, StateTransition, @@ -30,8 +30,8 @@ use ethexe_common::{ BlockRequestEvent, }; use ethexe_db::{BlockMetaStorage, CodesStorage, Database}; -use ethexe_ethereum::router::RouterQuery; -use ethexe_network::NetworkReceiverEvent; +use ethexe_ethereum::{primitives::U256, router::RouterQuery}; +use ethexe_network::{db_sync, NetworkReceiverEvent}; use ethexe_observer::{RequestBlockData, RequestEvent}; use ethexe_processor::LocalOutcome; use ethexe_sequencer::agro::AggregatedCommitments; @@ -53,6 +53,7 @@ pub struct Service { db: Database, observer: ethexe_observer::Observer, query: ethexe_observer::Query, + router_query: RouterQuery, processor: ethexe_processor::Processor, signer: ethexe_signer::Signer, block_time: Duration, @@ -186,6 +187,7 @@ impl Service { network, observer, query, + router_query, processor, sequencer, signer, @@ -210,6 +212,7 @@ impl Service { db: Database, observer: ethexe_observer::Observer, query: ethexe_observer::Query, + router_query: RouterQuery, processor: ethexe_processor::Processor, signer: ethexe_signer::Signer, block_time: Duration, @@ -223,6 +226,7 @@ impl Service { db, observer, query, + router_query, processor, signer, block_time, @@ -404,6 +408,7 @@ impl Service { network, mut observer, mut query, + mut router_query, mut processor, mut sequencer, signer: _signer, @@ -505,25 +510,39 @@ impl Service { validation_round_timer.stop(); } - event = maybe_await(network_receiver.as_mut().map(|rx| rx.recv())) => { - let Some(NetworkReceiverEvent::Message { source, data }) = event else { - continue; - }; - - log::debug!("Received a network message from peer {source:?}"); - - let result = Self::process_network_message( - data.as_slice(), - &db, - validator.as_mut(), - sequencer.as_mut(), - network_sender.as_mut(), - ); - - if let Err(err) = result { - // TODO: slash peer/validator in case of error #4175 - // TODO: consider error log as temporary solution #4175 - log::warn!("Failed to process network message: {err}"); + Some(event) = maybe_await(network_receiver.as_mut().map(|rx| rx.recv())) => { + match event { + NetworkReceiverEvent::Message { source, data } => { + log::debug!("Received a network message from peer {source:?}"); + + let result = Self::process_network_message( + data.as_slice(), + &db, + validator.as_mut(), + sequencer.as_mut(), + network_sender.as_mut(), + ); + + if let Err(err) = result { + // TODO: slash peer/validator in case of error #4175 + // TODO: consider error log as temporary solution #4175 + log::warn!("Failed to process network message: {err}"); + } + } + NetworkReceiverEvent::ExternalValidation(validating_response) => { + let validated = Self::process_response_validation(&validating_response, &mut router_query).await?; + let res = if validated { + Ok(validating_response) + } else { + Err(validating_response) + }; + + network_sender + .as_mut() + .expect("if network receiver is `Some()` so does sender") + .request_validated(res); + } + _ => {} } } _ = maybe_await(network_handle.as_mut()) => { @@ -781,6 +800,30 @@ impl Service { } } } + + async fn process_response_validation( + validating_response: &db_sync::ValidatingResponse, + router_query: &mut RouterQuery, + ) -> Result { + let response = validating_response.response(); + + if let db_sync::Response::ProgramIds(ids) = response { + let ethereum_programs = router_query.programs_count().await?; + if ethereum_programs != U256::from(ids.len()) { + return Ok(false); + } + + // TODO: #4309 + for &id in ids { + let code_id = router_query.program_code_id(id).await?; + if code_id.is_none() { + return Ok(false); + } + } + } + + Ok(true) + } } mod utils { diff --git a/ethexe/cli/src/tests.rs b/ethexe/cli/src/tests.rs index a15a9f1fda9..9146bd20a3f 100644 --- a/ethexe/cli/src/tests.rs +++ b/ethexe/cli/src/tests.rs @@ -1274,6 +1274,10 @@ mod utils { .await .unwrap(); + let router_query = RouterQuery::new(&self.rpc_url, self.router_address) + .await + .unwrap(); + let network = self.network_address.as_ref().map(|addr| { let config_path = tempfile::tempdir().unwrap().into_path(); let mut config = @@ -1322,6 +1326,7 @@ mod utils { self.db.clone(), self.observer.clone(), query, + router_query, processor, self.signer.clone(), self.block_time, diff --git a/ethexe/ethereum/src/lib.rs b/ethexe/ethereum/src/lib.rs index ac5164a5b00..190fb01934e 100644 --- a/ethexe/ethereum/src/lib.rs +++ b/ethexe/ethereum/src/lib.rs @@ -57,6 +57,10 @@ pub mod mirror; pub mod router; pub mod wvara; +pub mod primitives { + pub use alloy::primitives::*; +} + pub(crate) type AlloyTransport = BoxTransport; type AlloyProvider = FillProvider, AlloyTransport, AlloyEthereum>; diff --git a/ethexe/ethereum/src/router/mod.rs b/ethexe/ethereum/src/router/mod.rs index e096b1b4d32..f80e189bc92 100644 --- a/ethexe/ethereum/src/router/mod.rs +++ b/ethexe/ethereum/src/router/mod.rs @@ -19,7 +19,7 @@ use crate::{abi::IRouter, wvara::WVara, AlloyProvider, AlloyTransport, TryGetReceipt}; use alloy::{ consensus::{SidecarBuilder, SimpleCoder}, - primitives::{Address, Bytes, B256}, + primitives::{Address, Bytes, B256, U256}, providers::{Provider, ProviderBuilder, RootProvider}, rpc::types::Filter, transports::BoxTransport, @@ -29,7 +29,7 @@ use ethexe_common::router::{BlockCommitment, CodeCommitment}; use ethexe_signer::{Address as LocalAddress, Signature as LocalSignature}; use events::signatures; use futures::StreamExt; -use gear_core::ids::prelude::CodeIdExt as _; +use gear_core::ids::{prelude::CodeIdExt as _, ProgramId}; use gprimitives::{ActorId, CodeId, H160, H256}; use std::sync::Arc; @@ -275,4 +275,17 @@ impl RouterQuery { .map(|res| res._0.to()) .map_err(Into::into) } + + pub async fn programs_count(&self) -> Result { + let count = self.instance.programsCount().call().await?; + Ok(count._0) + } + + pub async fn program_code_id(&self, program_id: ProgramId) -> Result> { + let program_id = LocalAddress::try_from(program_id).expect("infallible"); + let program_id = Address::new(program_id.0); + let code_id = self.instance.programCodeId(program_id).call().await?; + let code_id = Some(CodeId::new(code_id._0.0)).filter(|&code_id| code_id != CodeId::zero()); + Ok(code_id) + } } diff --git a/ethexe/network/src/db_sync/mod.rs b/ethexe/network/src/db_sync/mod.rs index 66ae4900d60..967ed3b41ff 100644 --- a/ethexe/network/src/db_sync/mod.rs +++ b/ethexe/network/src/db_sync/mod.rs @@ -18,9 +18,11 @@ mod ongoing; +pub use ongoing::ValidatingResponse; + use crate::{ db_sync::ongoing::{ - OngoingRequests, OngoingResponses, PeerFailed, PeerResponse, SendRequestError, + ExternalValidation, OngoingRequests, OngoingResponses, PeerResponse, SendRequestError, SendRequestErrorKind, }, export::{Multiaddr, PeerId}, @@ -42,7 +44,7 @@ use libp2p::{ }; use parity_scale_codec::{Decode, Encode}; use std::{ - collections::{BTreeMap, BTreeSet}, + collections::{BTreeMap, BTreeSet, VecDeque}, task::{Context, Poll}, time::Duration, }; @@ -77,7 +79,7 @@ impl Request { } } (Request::ProgramIds, Response::ProgramIds(_ids)) => None, - _ => unreachable!("should be checked in `validate_response`"), + _ => unreachable!("should be checked in `Response::validate()`"), } } } @@ -107,11 +109,14 @@ impl Response { (Response::ProgramIds(ids), Response::ProgramIds(new_ids)) => { ids.extend(new_ids); } - _ => unreachable!("should be checked in `validate_response`"), + _ => unreachable!("should be checked in `Response::validate()`"), } } - fn validate(&self, request: &Request) -> Result<(), ResponseValidationError> { + /// Validates response against request. + /// + /// Returns `false` if external validation is required. + fn validate(&self, request: &Request) -> Result { match (request, self) { (Request::DataForHashes(_requested_hashes), Response::DataForHashes(hashes)) => { for (hash, data) in hashes { @@ -120,9 +125,9 @@ impl Response { } } - Ok(()) + Ok(true) } - (Request::ProgramIds, Response::ProgramIds(_ids)) => Ok(()), + (Request::ProgramIds, Response::ProgramIds(_ids)) => Ok(false), (_, _) => Err(ResponseValidationError::TypeMismatch), } } @@ -145,7 +150,7 @@ impl Response { true } (Request::ProgramIds, Response::ProgramIds(_ids)) => false, - _ => unreachable!("should be checked in `validate_response`"), + _ => unreachable!("should be checked in `Response::validate()`"), } } } @@ -184,6 +189,8 @@ pub enum Event { //// The ID of request request_id: RequestId, }, + /// External validation is mandatory for response + ExternalValidation(ValidatingResponse), /// Request completion done RequestSucceed { /// The ID of request @@ -261,6 +268,7 @@ type InnerBehaviour = request_response::Behaviour, peer_score_handle: peer_score::Handle, ongoing_requests: OngoingRequests, ongoing_responses: OngoingResponses, @@ -274,6 +282,7 @@ impl Behaviour { [(STREAM_PROTOCOL, ProtocolSupport::Full)], request_response::Config::default(), ), + pending_events: VecDeque::new(), peer_score_handle: peer_score_handle.clone(), ongoing_requests: OngoingRequests::new(&config, peer_score_handle), ongoing_responses: OngoingResponses::new(db, &config), @@ -284,6 +293,44 @@ impl Behaviour { self.ongoing_requests.push_pending_request(request) } + pub(crate) fn request_validated( + &mut self, + res: Result, + ) { + let res = self + .ongoing_requests + .on_external_validation(res, &mut self.inner); + let event = match res { + Ok(ExternalValidation::Success { + request_id, + response, + }) => Event::RequestSucceed { + request_id, + response, + }, + Ok(ExternalValidation::NewRound { + peer_id, + request_id, + }) => Event::NewRequestRound { + request_id, + peer_id, + reason: NewRequestRoundReason::PartialData, + }, + Err(SendRequestError { + request_id, + kind: SendRequestErrorKind::OutOfRounds, + }) => Event::RequestFailed { + request_id, + error: RequestFailure::OutOfRounds, + }, + Err(SendRequestError { + request_id, + kind: SendRequestErrorKind::NoPeers, + }) => Event::PendingStateRequest { request_id }, + }; + self.pending_events.push_back(event); + } + fn handle_inner_event( &mut self, event: request_response::Event, @@ -327,11 +374,14 @@ impl Behaviour { request_id, response, ) { - Ok((request_id, response)) => Event::RequestSucceed { + Ok(PeerResponse::Success { + request_id, + response, + }) => Event::RequestSucceed { request_id, response, }, - Err(PeerResponse::NewRound { + Ok(PeerResponse::NewRound { peer_id, request_id, }) => Event::NewRequestRound { @@ -339,17 +389,20 @@ impl Behaviour { peer_id, reason: NewRequestRoundReason::PartialData, }, - Err(PeerResponse::SendRequest(SendRequestError { + Ok(PeerResponse::ExternalValidation(validating_response)) => { + Event::ExternalValidation(validating_response) + } + Err(SendRequestError { request_id, kind: SendRequestErrorKind::OutOfRounds, - })) => Event::RequestFailed { + }) => Event::RequestFailed { request_id, error: RequestFailure::OutOfRounds, }, - Err(PeerResponse::SendRequest(SendRequestError { + Err(SendRequestError { request_id, - kind: SendRequestErrorKind::Pending, - })) => Event::PendingStateRequest { request_id }, + kind: SendRequestErrorKind::NoPeers, + }) => Event::PendingStateRequest { request_id }, }; return Poll::Ready(ToSwarm::GenerateEvent(event)); @@ -376,17 +429,17 @@ impl Behaviour { peer_id, reason: NewRequestRoundReason::PeerFailed, }, - Err(PeerFailed::SendRequest(SendRequestError { + Err(SendRequestError { request_id, kind: SendRequestErrorKind::OutOfRounds, - })) => Event::RequestFailed { + }) => Event::RequestFailed { request_id, error: RequestFailure::OutOfRounds, }, - Err(PeerFailed::SendRequest(SendRequestError { + Err(SendRequestError { request_id, - kind: SendRequestErrorKind::Pending, - })) => Event::PendingStateRequest { request_id }, + kind: SendRequestErrorKind::NoPeers, + }) => Event::PendingStateRequest { request_id }, }; return Poll::Ready(ToSwarm::GenerateEvent(event)); @@ -494,6 +547,10 @@ impl NetworkBehaviour for Behaviour { })); } + if let Some(event) = self.pending_events.pop_front() { + return Poll::Ready(ToSwarm::GenerateEvent(event)); + } + let event = match self.ongoing_requests.send_pending_request(&mut self.inner) { Ok(Some((peer_id, request_id))) => Some(Event::NewRequestRound { request_id, @@ -510,7 +567,7 @@ impl NetworkBehaviour for Behaviour { }), Err(SendRequestError { request_id: _, - kind: SendRequestErrorKind::Pending, + kind: SendRequestErrorKind::NoPeers, }) => None, }; if let Some(event) = event { @@ -598,17 +655,14 @@ mod tests { #[tokio::test] async fn smoke() { - const PID1: ProgramId = ProgramId::new([1; 32]); - const PID2: ProgramId = ProgramId::new([2; 32]); - init_logger(); let (mut alice, _alice_db) = new_swarm().await; let (mut bob, bob_db) = new_swarm().await; let bob_peer_id = *bob.local_peer_id(); - bob_db.set_program_code_id(PID1, CodeId::zero()); - bob_db.set_program_code_id(PID2, CodeId::zero()); + let hello_hash = bob_db.write(b"hello"); + let world_hash = bob_db.write(b"world"); alice.connect(&mut bob).await; tokio::spawn(async move { @@ -640,7 +694,9 @@ mod tests { } }); - let request_id = alice.behaviour_mut().request(Request::ProgramIds); + let request_id = alice + .behaviour_mut() + .request(Request::DataForHashes([hello_hash, world_hash].into())); let event = alice.next_behaviour_event().await; assert_eq!( @@ -657,7 +713,13 @@ mod tests { event, Event::RequestSucceed { request_id, - response: Response::ProgramIds([PID1, PID2].into()) + response: Response::DataForHashes( + [ + (hello_hash, b"hello".to_vec()), + (world_hash, b"world".to_vec()) + ] + .into() + ) } ) } @@ -1035,4 +1097,74 @@ mod tests { let event = alice.next_behaviour_event().await; assert!(matches!(event, Event::ResponseSent { peer_id, .. } if peer_id == bob_peer_id)); } + + #[tokio::test] + async fn external_validation() { + const PID1: ProgramId = ProgramId::new([1; 32]); + const PID2: ProgramId = ProgramId::new([2; 32]); + + init_logger(); + + let (mut alice, _alice_db) = new_swarm().await; + let (mut bob, _bob_db) = new_swarm().await; + let (mut charlie, charlie_db) = new_swarm().await; + let bob_peer_id = *bob.local_peer_id(); + let charlie_peer_id = *charlie.local_peer_id(); + + alice.connect(&mut bob).await; + tokio::spawn(bob.loop_on_next()); + + charlie_db.set_program_code_id(PID1, CodeId::zero()); + charlie_db.set_program_code_id(PID2, CodeId::zero()); + + let request_id = alice.behaviour_mut().request(Request::ProgramIds); + + let event = alice.next_behaviour_event().await; + assert_eq!( + event, + Event::NewRequestRound { + request_id, + peer_id: bob_peer_id, + reason: NewRequestRoundReason::FromQueue, + } + ); + + let event = alice.next_behaviour_event().await; + if let Event::ExternalValidation(validating_response) = event { + assert_eq!(validating_response.peer_id(), bob_peer_id); + let response = validating_response.response(); + assert_eq!(*response, Response::ProgramIds([].into())); + alice + .behaviour_mut() + .request_validated(Err(validating_response)); + } else { + unreachable!(); + } + + alice.connect(&mut charlie).await; + tokio::spawn(charlie.loop_on_next()); + + // `Event::NewRequestRound` skipped by `connect()` above + + let event = alice.next_behaviour_event().await; + if let Event::ExternalValidation(validating_response) = event { + assert_eq!(validating_response.peer_id(), charlie_peer_id); + let response = validating_response.response(); + assert_eq!(*response, Response::ProgramIds([PID1, PID2].into())); + alice + .behaviour_mut() + .request_validated(Ok(validating_response)); + } else { + unreachable!(); + } + + let event = alice.next_behaviour_event().await; + assert_eq!( + event, + Event::RequestSucceed { + request_id, + response: Response::ProgramIds([PID1, PID2].into()), + } + ); + } } diff --git a/ethexe/network/src/db_sync/ongoing.rs b/ethexe/network/src/db_sync/ongoing.rs index cd1e481c784..6374312c696 100644 --- a/ethexe/network/src/db_sync/ongoing.rs +++ b/ethexe/network/src/db_sync/ongoing.rs @@ -46,23 +46,54 @@ pub(crate) struct SendRequestError { #[derive(Debug)] pub(crate) enum SendRequestErrorKind { OutOfRounds, - Pending, + NoPeers, } -#[derive(Debug, derive_more::From)] +#[derive(Debug)] pub(crate) enum PeerResponse { + Success { + request_id: RequestId, + response: Response, + }, NewRound { peer_id: PeerId, request_id: RequestId, }, - #[from] - SendRequest(SendRequestError), + ExternalValidation(ValidatingResponse), } -#[derive(Debug, derive_more::From)] -pub(crate) enum PeerFailed { - #[from] - SendRequest(SendRequestError), +#[derive(Debug)] +pub(crate) enum ExternalValidation { + Success { + request_id: RequestId, + response: Response, + }, + NewRound { + peer_id: PeerId, + request_id: RequestId, + }, +} + +#[derive(Debug, Eq, PartialEq)] +pub struct ValidatingResponse { + ongoing_request: OngoingRequest, + peer_id: PeerId, + response: Response, +} + +impl ValidatingResponse { + pub fn request(&self) -> &Request { + &self.ongoing_request.request + } + + pub fn response(&self) -> &Response { + &self.response + } + + #[cfg(test)] + pub(crate) fn peer_id(&self) -> PeerId { + self.peer_id + } } #[derive(Debug)] @@ -76,6 +107,14 @@ pub(crate) struct OngoingRequest { peer_score_handle: Handle, } +impl PartialEq for OngoingRequest { + fn eq(&self, other: &Self) -> bool { + self.request_id == other.request_id + } +} + +impl Eq for OngoingRequest {} + impl OngoingRequest { pub(crate) fn new( request_id: RequestId, @@ -94,7 +133,7 @@ impl OngoingRequest { } } - fn inner_complete(&mut self, peer: PeerId, new_response: Response) -> Response { + fn merge_and_strip(&mut self, peer: PeerId, new_response: Response) -> Response { let mut response = if let Some(mut response) = self.response.take() { response.merge(new_response); response @@ -113,25 +152,53 @@ impl OngoingRequest { response } + fn inner_complete( + mut self, + peer: PeerId, + response: Response, + ) -> Result<(RequestId, Response), Self> { + if let Some(new_request) = self.request.difference(&response) { + self.request = new_request; + self.response = Some(self.merge_and_strip(peer, response)); + Err(self) + } else { + let request_id = self.request_id; + let response = self.merge_and_strip(peer, response); + Ok((request_id, response)) + } + } + /// Try to bring the request to the complete state. /// /// Returns `Err(self)` if response validation is failed or response is incomplete. - fn try_complete(mut self, peer: PeerId, response: Response) -> Result { + fn try_complete(mut self, peer: PeerId, response: Response) -> Result { self.tried_peers.insert(peer); - if let Err(error) = response.validate(&self.request) { - let request_id = self.request_id; - log::trace!( - "response validation failed for request {request_id:?} from {peer}: {error:?}", - ); - Err(self) - } else if let Some(new_request) = self.request.difference(&response) { - self.request = new_request; - self.response = Some(self.inner_complete(peer, response)); - Err(self) - } else { - let response = self.inner_complete(peer, response); - Ok(response) + let request_id = self.request_id; + + match response.validate(&self.request) { + Ok(true) => self + .inner_complete(peer, response) + .map(|(request_id, response)| PeerResponse::Success { + request_id, + response, + }), + Ok(false) => { + let validating_response = ValidatingResponse { + ongoing_request: self, + peer_id: peer, + response, + }; + Ok(PeerResponse::ExternalValidation(validating_response)) + } + Err(error) => { + log::trace!( + "response validation failed for request {request_id:?} from {peer}: {error:?}", + ); + self.peer_score_handle.invalid_data(peer); + + Err(self) + } } } @@ -269,7 +336,7 @@ impl OngoingRequests { self.pending_requests.push_back(ongoing_request); Err(SendRequestError { request_id, - kind: SendRequestErrorKind::Pending, + kind: SendRequestErrorKind::NoPeers, }) } } @@ -293,7 +360,7 @@ impl OngoingRequests { peer: PeerId, request_id: OutboundRequestId, response: Response, - ) -> Result<(RequestId, Response), PeerResponse> { + ) -> Result { let ongoing_request = self .active_requests .remove(&request_id) @@ -301,12 +368,50 @@ impl OngoingRequests { let request_id = ongoing_request.request_id; let new_ongoing_request = match ongoing_request.try_complete(peer, response) { - Ok(response) => return Ok((request_id, response)), + Ok(peer_response) => return Ok(peer_response), Err(new_ongoing_request) => new_ongoing_request, }; let peer_id = self.send_request(behaviour, new_ongoing_request)?; - Err(PeerResponse::NewRound { + Ok(PeerResponse::NewRound { + peer_id, + request_id, + }) + } + + pub(crate) fn on_external_validation( + &mut self, + res: Result, + behaviour: &mut InnerBehaviour, + ) -> Result { + let new_ongoing_request = match res { + Ok(validating_response) => { + let ValidatingResponse { + ongoing_request, + peer_id, + response, + } = validating_response; + + match ongoing_request.inner_complete(peer_id, response) { + Ok((request_id, response)) => { + return Ok(ExternalValidation::Success { + request_id, + response, + }) + } + Err(new_ongoing_request) => new_ongoing_request, + } + } + Err(validating_response) => { + self.peer_score_handle + .invalid_data(validating_response.peer_id); + validating_response.ongoing_request + } + }; + + let request_id = new_ongoing_request.request_id; + let peer_id = self.send_request(behaviour, new_ongoing_request)?; + Ok(ExternalValidation::NewRound { peer_id, request_id, }) @@ -317,7 +422,7 @@ impl OngoingRequests { behaviour: &mut InnerBehaviour, peer: PeerId, request_id: OutboundRequestId, - ) -> Result<(PeerId, RequestId), PeerFailed> { + ) -> Result<(PeerId, RequestId), SendRequestError> { let ongoing_request = self .active_requests .remove(&request_id) diff --git a/ethexe/network/src/lib.rs b/ethexe/network/src/lib.rs index 53fbb3950f9..a7d7c39116f 100644 --- a/ethexe/network/src/lib.rs +++ b/ethexe/network/src/lib.rs @@ -122,6 +122,7 @@ impl NetworkService { enum NetworkSenderEvent { PublishMessage { data: Vec }, RequestDbData(db_sync::Request), + RequestValidated(Result), } #[derive(Debug, Clone)] @@ -147,6 +148,13 @@ impl NetworkSender { pub fn request_db_data(&self, request: db_sync::Request) { let _res = self.tx.send(NetworkSenderEvent::RequestDbData(request)); } + + pub fn request_validated( + &self, + res: Result, + ) { + let _res = self.tx.send(NetworkSenderEvent::RequestValidated(res)); + } } #[derive(Debug, Eq, PartialEq)] @@ -157,6 +165,7 @@ pub enum NetworkReceiverEvent { }, DbResponse(Result), PeerBlocked(PeerId), + ExternalValidation(db_sync::ValidatingResponse), } pub struct NetworkReceiver { @@ -437,6 +446,13 @@ impl NetworkEventLoop { } BehaviourEvent::Gossipsub(_) => {} // + BehaviourEvent::DbSync(db_sync::Event::ExternalValidation(validating_response)) => { + let _res = self + .external_tx + .send(NetworkReceiverEvent::ExternalValidation( + validating_response, + )); + } BehaviourEvent::DbSync(db_sync::Event::RequestSucceed { request_id: _, response, @@ -472,6 +488,9 @@ impl NetworkEventLoop { NetworkSenderEvent::RequestDbData(request) => { self.swarm.behaviour_mut().db_sync.request(request); } + NetworkSenderEvent::RequestValidated(res) => { + self.swarm.behaviour_mut().db_sync.request_validated(res); + } } } } @@ -703,7 +722,7 @@ mod tests { let mut service1 = NetworkService::new(config.clone(), &signer1, db).unwrap(); let peer_id = service1.event_loop.local_peer_id(); - let multiaddr: Multiaddr = format!("/memory/3/p2p/{peer_id}").parse().unwrap(); + let multiaddr: Multiaddr = format!("/memory/5/p2p/{peer_id}").parse().unwrap(); let peer_score_handle = service1.event_loop.score_handle(); @@ -733,4 +752,56 @@ mod tests { Some(NetworkReceiverEvent::PeerBlocked(service2_peer_id)) ); } + + #[tokio::test] + async fn external_validation() { + init_logger(); + + let tmp_dir1 = tempfile::tempdir().unwrap(); + let config = NetworkEventLoopConfig::new_memory(tmp_dir1.path().to_path_buf(), "/memory/7"); + let signer1 = ethexe_signer::Signer::new(tmp_dir1.path().join("key")).unwrap(); + let db = Database::from_one(&MemDb::default(), [0; 20]); + let mut service1 = NetworkService::new(config.clone(), &signer1, db).unwrap(); + + let peer_id = service1.event_loop.local_peer_id(); + let multiaddr: Multiaddr = format!("/memory/7/p2p/{peer_id}").parse().unwrap(); + + tokio::spawn(service1.event_loop.run()); + + // second service + let tmp_dir2 = tempfile::tempdir().unwrap(); + let signer2 = ethexe_signer::Signer::new(tmp_dir2.path().join("key")).unwrap(); + let mut config2 = + NetworkEventLoopConfig::new_memory(tmp_dir2.path().to_path_buf(), "/memory/8"); + config2.bootstrap_addresses = [multiaddr].into(); + let db = Database::from_one(&MemDb::default(), [0; 20]); + let service2 = NetworkService::new(config2.clone(), &signer2, db).unwrap(); + tokio::spawn(service2.event_loop.run()); + + // Wait for the connection to be established + tokio::time::sleep(Duration::from_secs(1)).await; + + service1 + .sender + .request_db_data(db_sync::Request::ProgramIds); + + let event = timeout(Duration::from_secs(5), service1.receiver.recv()) + .await + .expect("time has elapsed") + .unwrap(); + if let NetworkReceiverEvent::ExternalValidation(validating_response) = event { + service1.sender.request_validated(Ok(validating_response)); + } else { + unreachable!(); + } + + let event = timeout(Duration::from_secs(5), service1.receiver.recv()) + .await + .expect("time has elapsed") + .unwrap(); + assert_eq!( + event, + NetworkReceiverEvent::DbResponse(Ok(db_sync::Response::ProgramIds([].into()))) + ); + } } diff --git a/ethexe/network/src/peer_score.rs b/ethexe/network/src/peer_score.rs index 14bea14d156..1a0caec83c5 100644 --- a/ethexe/network/src/peer_score.rs +++ b/ethexe/network/src/peer_score.rs @@ -35,6 +35,7 @@ use tokio::sync::mpsc; pub(crate) enum ScoreChangedReason { UnsupportedProtocol, ExcessiveData, + InvalidData, } impl ScoreChangedReason { @@ -42,6 +43,7 @@ impl ScoreChangedReason { match self { ScoreChangedReason::UnsupportedProtocol => config.unsupported_protocol, ScoreChangedReason::ExcessiveData => config.excessive_data, + ScoreChangedReason::InvalidData => config.invalid_data, } } } @@ -67,6 +69,10 @@ impl Handle { pub fn excessive_data(&self, peer_id: PeerId) { let _res = self.0.send((peer_id, ScoreChangedReason::ExcessiveData)); } + + pub fn invalid_data(&self, peer_id: PeerId) { + let _res = self.0.send((peer_id, ScoreChangedReason::InvalidData)); + } } #[derive(Debug, Eq, PartialEq)] @@ -95,6 +101,7 @@ pub(crate) enum Event { pub(crate) struct Config { unsupported_protocol: u8, excessive_data: u8, + invalid_data: u8, } impl Default for Config { @@ -102,6 +109,7 @@ impl Default for Config { Self { unsupported_protocol: u8::MAX, excessive_data: u8::MAX, + invalid_data: u8::MAX, } } }