From 2f0c23c812f9a2f2bcf7df5346d0f567a6f9ac90 Mon Sep 17 00:00:00 2001 From: Noam Spiegelstein Date: Wed, 1 Jan 2025 12:31:52 +0200 Subject: [PATCH] feat(papyrus_p2p_sync): add to server class manager client --- Cargo.lock | 1 + crates/papyrus_node/src/run.rs | 5 +- crates/papyrus_p2p_sync/Cargo.toml | 1 + crates/papyrus_p2p_sync/src/server/mod.rs | 108 ++++++++++++------- crates/papyrus_p2p_sync/src/server/test.rs | 21 +++- crates/starknet_state_sync/src/runner/mod.rs | 8 +- 6 files changed, 99 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2180e86cb2..9d2e970dbe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7926,6 +7926,7 @@ version = "0.0.0" dependencies = [ "assert_matches", "async-stream", + "async-trait", "chrono", "enum-iterator", "futures", diff --git a/crates/papyrus_node/src/run.rs b/crates/papyrus_node/src/run.rs index f24e84cde2..fb7e39886a 100644 --- a/crates/papyrus_node/src/run.rs +++ b/crates/papyrus_node/src/run.rs @@ -323,6 +323,7 @@ async fn spawn_sync_client( fn spawn_p2p_sync_server( network_manager: Option<&mut NetworkManager>, storage_reader: StorageReader, + class_manager_client: SharedClassManagerClient, ) -> JoinHandle> { let Some(network_manager) = network_manager else { info!("P2p Sync is disabled."); @@ -348,7 +349,8 @@ fn spawn_p2p_sync_server( event_server_receiver, ); - let p2p_sync_server = P2pSyncServer::new(storage_reader.clone(), p2p_sync_server_channels); + let p2p_sync_server = + P2pSyncServer::new(storage_reader.clone(), p2p_sync_server_channels, class_manager_client); tokio::spawn(async move { p2p_sync_server.run().await; Ok(()) @@ -411,6 +413,7 @@ async fn run_threads( spawn_p2p_sync_server( resources.maybe_network_manager.as_mut(), resources.storage_reader.clone(), + resources.class_manager_client.clone(), ) }; diff --git a/crates/papyrus_p2p_sync/Cargo.toml b/crates/papyrus_p2p_sync/Cargo.toml index 98bc4296bd..dc1ded1f09 100644 --- a/crates/papyrus_p2p_sync/Cargo.toml +++ b/crates/papyrus_p2p_sync/Cargo.toml @@ -28,6 +28,7 @@ starknet_state_sync_types.workspace = true starknet-types-core.workspace = true thiserror.workspace = true starknet_class_manager_types.workspace = true +async-trait.workspace = true tokio.workspace = true tokio-stream.workspace = true tracing.workspace = true diff --git a/crates/papyrus_p2p_sync/src/server/mod.rs b/crates/papyrus_p2p_sync/src/server/mod.rs index c98c1c93bb..b7c9ea75f9 100644 --- a/crates/papyrus_p2p_sync/src/server/mod.rs +++ b/crates/papyrus_p2p_sync/src/server/mod.rs @@ -1,5 +1,6 @@ use std::fmt::Debug; +use async_trait::async_trait; use futures::never::Never; use futures::StreamExt; use papyrus_common::pending_classes::ApiContractClass; @@ -21,14 +22,16 @@ use papyrus_protobuf::sync::{ TransactionQuery, }; use papyrus_storage::body::BodyStorageReader; -use papyrus_storage::class::ClassStorageReader; +use papyrus_storage::class_manager::ClassManagerStorageReader; use papyrus_storage::header::HeaderStorageReader; use papyrus_storage::state::StateStorageReader; use papyrus_storage::{db, StorageReader, StorageTxn}; use starknet_api::block::BlockNumber; +use starknet_api::contract_class::ContractClass; use starknet_api::core::ClassHash; use starknet_api::state::ThinStateDiff; use starknet_api::transaction::{Event, FullTransaction, TransactionHash}; +use starknet_class_manager_types::{ClassManagerClientError, SharedClassManagerClient}; use tracing::{debug, error, info}; #[cfg(test)] @@ -56,12 +59,14 @@ pub enum P2pSyncServerError { SignatureNotFound { block_number: BlockNumber }, #[error(transparent)] SendError(#[from] futures::channel::mpsc::SendError), + #[error(transparent)] + ClassManagerClientError(#[from] ClassManagerClientError), } impl P2pSyncServerError { pub fn should_log_in_error_level(&self) -> bool { match self { - Self::JoinError(_) | Self::SignatureNotFound { .. } | Self::SendError { .. } + Self::JoinError(_) | Self::SignatureNotFound { .. } | Self::SendError { .. } | Self::ClassManagerClientError { .. } // TODO(shahak): Consider returning false for some of the StorageError variants. | Self::DBInternalError { .. } => true, Self::BlockNumberOutOfRange { .. } | Self::BlockNotFound { .. } | Self::ClassNotFound { .. } => false, @@ -105,6 +110,7 @@ impl P2pSyncServerChannels { pub struct P2pSyncServer { storage_reader: StorageReader, p2p_sync_channels: P2pSyncServerChannels, + class_manager_client: SharedClassManagerClient, } impl P2pSyncServer { @@ -122,46 +128,51 @@ impl P2pSyncServer { let server_query_manager = maybe_server_query_manager.expect( "Header queries sender was unexpectedly dropped." ); - register_query(self.storage_reader.clone(), server_query_manager, "header"); + register_query(self.storage_reader.clone(), server_query_manager, self.class_manager_client.clone(), "header"); } maybe_server_query_manager = state_diff_receiver.next() => { let server_query_manager = maybe_server_query_manager.expect( "State diff queries sender was unexpectedly dropped." ); - register_query(self.storage_reader.clone(), server_query_manager, "state diff"); + register_query(self.storage_reader.clone(), server_query_manager, self.class_manager_client.clone(), "state diff"); } maybe_server_query_manager = transaction_receiver.next() => { let server_query_manager = maybe_server_query_manager.expect( "Transaction queries sender was unexpectedly dropped." ); - register_query(self.storage_reader.clone(), server_query_manager, "transaction"); + register_query(self.storage_reader.clone(), server_query_manager, self.class_manager_client.clone(), "transaction"); } maybe_server_query_manager = class_receiver.next() => { let server_query_manager = maybe_server_query_manager.expect( "Class queries sender was unexpectedly dropped." ); - register_query(self.storage_reader.clone(), server_query_manager, "class"); + register_query(self.storage_reader.clone(), server_query_manager, self.class_manager_client.clone(), "class"); } maybe_server_query_manager = event_receiver.next() => { let server_query_manager = maybe_server_query_manager.expect( "Event queries sender was unexpectedly dropped." ); - register_query(self.storage_reader.clone(), server_query_manager, "event"); + register_query(self.storage_reader.clone(), server_query_manager, self.class_manager_client.clone(), "event"); } }; } } - pub fn new(storage_reader: StorageReader, p2p_sync_channels: P2pSyncServerChannels) -> Self { - Self { storage_reader, p2p_sync_channels } + pub fn new( + storage_reader: StorageReader, + p2p_sync_channels: P2pSyncServerChannels, + class_manager_client: SharedClassManagerClient, + ) -> Self { + Self { storage_reader, p2p_sync_channels, class_manager_client } } } fn register_query( storage_reader: StorageReader, server_query_manager: ServerQueryManager>, + class_manager_client: SharedClassManagerClient, protocol_decription: &str, ) where - Data: FetchBlockDataFromDb + Send + 'static, + Data: FetchBlockData + Send + 'static, TQuery: TryFrom, Error = ProtobufConversionError> + Send + Clone + Debug + 'static, Query: From, { @@ -174,6 +185,7 @@ fn register_query( let result = send_data_for_query( storage_reader, server_query_manager, + class_manager_client, protocol_decription.as_str(), ) .await; @@ -194,17 +206,21 @@ fn register_query( } } -pub trait FetchBlockDataFromDb: Sized { - fn fetch_block_data_from_db( +#[async_trait] +pub trait FetchBlockData: Sized { + async fn fetch_block_data( block_number: BlockNumber, txn: &StorageTxn<'_, db::RO>, + class_manager_client: &mut SharedClassManagerClient, ) -> Result, P2pSyncServerError>; } -impl FetchBlockDataFromDb for SignedBlockHeader { - fn fetch_block_data_from_db( +#[async_trait] +impl FetchBlockData for SignedBlockHeader { + async fn fetch_block_data( block_number: BlockNumber, txn: &StorageTxn<'_, db::RO>, + _class_manager_client: &mut SharedClassManagerClient, ) -> Result, P2pSyncServerError> { let mut header = txn.get_block_header(block_number)?.ok_or(P2pSyncServerError::BlockNotFound { @@ -227,10 +243,12 @@ impl FetchBlockDataFromDb for SignedBlockHeader { } } -impl FetchBlockDataFromDb for StateDiffChunk { - fn fetch_block_data_from_db( +#[async_trait] +impl FetchBlockData for StateDiffChunk { + async fn fetch_block_data( block_number: BlockNumber, txn: &StorageTxn<'_, db::RO>, + _class_manager_client: &mut SharedClassManagerClient, ) -> Result, P2pSyncServerError> { let thin_state_diff = txn.get_state_diff(block_number)?.ok_or(P2pSyncServerError::BlockNotFound { @@ -240,10 +258,12 @@ impl FetchBlockDataFromDb for StateDiffChunk { } } -impl FetchBlockDataFromDb for FullTransaction { - fn fetch_block_data_from_db( +#[async_trait] +impl FetchBlockData for FullTransaction { + async fn fetch_block_data( block_number: BlockNumber, txn: &StorageTxn<'_, db::RO>, + _class_manager_client: &mut SharedClassManagerClient, ) -> Result, P2pSyncServerError> { let transactions = txn.get_block_transactions(block_number)?.ok_or(P2pSyncServerError::BlockNotFound { @@ -272,44 +292,55 @@ impl FetchBlockDataFromDb for FullTransaction { } } -impl FetchBlockDataFromDb for (ApiContractClass, ClassHash) { - fn fetch_block_data_from_db( +#[async_trait] +impl FetchBlockData for (ApiContractClass, ClassHash) { + async fn fetch_block_data( block_number: BlockNumber, txn: &StorageTxn<'_, db::RO>, + class_manager_client: &mut SharedClassManagerClient, ) -> Result, P2pSyncServerError> { let thin_state_diff = txn.get_state_diff(block_number)?.ok_or(P2pSyncServerError::BlockNotFound { block_hash_or_number: BlockHashOrNumber::Number(block_number), })?; + + if block_number >= txn.get_class_manager_block_marker()? { + return Err(P2pSyncServerError::BlockNotFound { + block_hash_or_number: BlockHashOrNumber::Number(block_number), + }); + } + let declared_classes = thin_state_diff.declared_classes; let deprecated_declared_classes = thin_state_diff.deprecated_declared_classes; let mut result = Vec::new(); - for class_hash in &deprecated_declared_classes { + for class_hash in deprecated_declared_classes { + let ContractClass::V0(deprecated_contract_class) = + class_manager_client.get_executable(class_hash).await? + else { + panic!("Received a cairo1 contract, expected cairo0"); + }; result.push(( - ApiContractClass::DeprecatedContractClass( - txn.get_deprecated_class(class_hash)? - .ok_or(P2pSyncServerError::ClassNotFound { class_hash: *class_hash })?, - ), - *class_hash, + ApiContractClass::DeprecatedContractClass(deprecated_contract_class), + class_hash, )); } - for (class_hash, _) in &declared_classes { + + for (class_hash, _) in declared_classes { result.push(( - ApiContractClass::ContractClass( - txn.get_class(class_hash)? - .ok_or(P2pSyncServerError::ClassNotFound { class_hash: *class_hash })?, - ), - *class_hash, + ApiContractClass::ContractClass(class_manager_client.get_sierra(class_hash).await?), + class_hash, )); } Ok(result) } } -impl FetchBlockDataFromDb for (Event, TransactionHash) { - fn fetch_block_data_from_db( +#[async_trait] +impl FetchBlockData for (Event, TransactionHash) { + async fn fetch_block_data( block_number: BlockNumber, txn: &StorageTxn<'_, db::RO>, + _class_manager_client: &mut SharedClassManagerClient, ) -> Result, P2pSyncServerError> { let transaction_outputs = txn.get_block_transaction_outputs(block_number)?.ok_or( P2pSyncServerError::BlockNotFound { @@ -381,10 +412,11 @@ pub fn split_thin_state_diff(thin_state_diff: ThinStateDiff) -> Vec( storage_reader: StorageReader, mut server_query_manager: ServerQueryManager>, + mut class_manager_client: SharedClassManagerClient, protocol_decription: &str, ) -> Result<(), P2pSyncServerError> where - Data: FetchBlockDataFromDb + Send + 'static, + Data: FetchBlockData + Send + 'static, TQuery: TryFrom, Error = ProtobufConversionError> + Clone, Query: From, { @@ -392,6 +424,7 @@ where let result = send_data_without_fin_for_query( &storage_reader, &mut server_query_manager, + &mut class_manager_client, protocol_decription, ) .await; @@ -403,10 +436,11 @@ where async fn send_data_without_fin_for_query( storage_reader: &StorageReader, server_query_manager: &mut ServerQueryManager>, + class_manager_client: &mut SharedClassManagerClient, protocol_decription: &str, ) -> Result<(), P2pSyncServerError> where - Data: FetchBlockDataFromDb + Send + 'static, + Data: FetchBlockData + Send + 'static, TQuery: TryFrom, Error = ProtobufConversionError> + Clone, Query: From, { @@ -428,7 +462,7 @@ where for block_counter in 0..query.limit { let block_number = BlockNumber(utils::calculate_block_number(&query, start_block_number, block_counter)?); - let data_vec = Data::fetch_block_data_from_db(block_number, &txn)?; + let data_vec = Data::fetch_block_data(block_number, &txn, class_manager_client).await?; for data in data_vec { // TODO: consider implement retry mechanism. info!( diff --git a/crates/papyrus_p2p_sync/src/server/test.rs b/crates/papyrus_p2p_sync/src/server/test.rs index d450dfa903..075fd630a1 100644 --- a/crates/papyrus_p2p_sync/src/server/test.rs +++ b/crates/papyrus_p2p_sync/src/server/test.rs @@ -1,4 +1,5 @@ use std::fmt::Debug; +use std::sync::Arc; use futures::channel::mpsc::Sender; use futures::StreamExt; @@ -49,8 +50,9 @@ use starknet_api::transaction::{ TransactionHash, TransactionOutput, }; +use starknet_class_manager_types::EmptyClassManagerClient; -use super::{split_thin_state_diff, FetchBlockDataFromDb, P2pSyncServer, P2pSyncServerChannels}; +use super::{split_thin_state_diff, FetchBlockData, P2pSyncServer, P2pSyncServerChannels}; use crate::server::register_query; const BUFFER_SIZE: usize = 10; const NUM_OF_BLOCKS: usize = 10; @@ -312,7 +314,7 @@ async fn run_test( start_block_number: usize, start_block_type: StartBlockType, ) where - T: FetchBlockDataFromDb + std::fmt::Debug + PartialEq + Send + Sync + 'static, + T: FetchBlockData + std::fmt::Debug + PartialEq + Send + Sync + 'static, F: FnOnce(Vec), TQuery: From + TryFrom, Error = ProtobufConversionError> @@ -361,7 +363,10 @@ async fn run_test( let query = TQuery::from(query); let (server_query_manager, _report_sender, response_reciever) = create_test_server_query_manager(query); - register_query::(storage_reader, server_query_manager, "test"); + + // TODO(noamsp): use MockClassManagerClient instead + let class_manager_client = Arc::new(EmptyClassManagerClient); + register_query::(storage_reader, server_query_manager, class_manager_client, "test"); // run p2p_sync_server and collect query results. tokio::select! { @@ -409,8 +414,14 @@ fn setup() -> TestArgs { event_receiver, }; - let p2p_sync_server = - super::P2pSyncServer::new(storage_reader.clone(), p2p_sync_server_channels); + // TODO(noamsp): use MockClassManagerClient instead + let class_manager_client = Arc::new(EmptyClassManagerClient); + + let p2p_sync_server = super::P2pSyncServer::new( + storage_reader.clone(), + p2p_sync_server_channels, + class_manager_client, + ); TestArgs { p2p_sync_server, storage_reader, diff --git a/crates/starknet_state_sync/src/runner/mod.rs b/crates/starknet_state_sync/src/runner/mod.rs index 94260a14fd..b6a268de20 100644 --- a/crates/starknet_state_sync/src/runner/mod.rs +++ b/crates/starknet_state_sync/src/runner/mod.rs @@ -77,7 +77,7 @@ impl StateSyncRunner { storage_writer, p2p_sync_client_channels, new_block_receiver.boxed(), - class_manager_client, + class_manager_client.clone(), ); let header_server_receiver = network_manager @@ -97,7 +97,11 @@ impl StateSyncRunner { class_server_receiver, event_server_receiver, ); - let p2p_sync_server = P2pSyncServer::new(storage_reader.clone(), p2p_sync_server_channels); + let p2p_sync_server = P2pSyncServer::new( + storage_reader.clone(), + p2p_sync_server_channels, + class_manager_client, + ); let network_future = network_manager.run().boxed(); let p2p_sync_client_future = p2p_sync_client.run().boxed();