Skip to content

Commit

Permalink
feat(papyrus_p2p_sync): add to server class manager client
Browse files Browse the repository at this point in the history
  • Loading branch information
noamsp-starkware committed Jan 16, 2025
1 parent 5af1494 commit 3fc2de3
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 45 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion crates/papyrus_node/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ async fn spawn_sync_client(
fn spawn_p2p_sync_server(
network_manager: Option<&mut NetworkManager>,
storage_reader: StorageReader,
class_manager_client: SharedClassManagerClient,
) -> JoinHandle<anyhow::Result<()>> {
let Some(network_manager) = network_manager else {
info!("P2p Sync is disabled.");
Expand All @@ -348,7 +349,8 @@ fn spawn_p2p_sync_server(
event_server_receiver,
);

let p2p_sync_server = P2pSyncServer::new(storage_reader.clone(), p2p_sync_server_channels);
let p2p_sync_server =
P2pSyncServer::new(storage_reader.clone(), p2p_sync_server_channels, class_manager_client);
tokio::spawn(async move {
p2p_sync_server.run().await;
Ok(())
Expand Down Expand Up @@ -411,6 +413,7 @@ async fn run_threads(
spawn_p2p_sync_server(
resources.maybe_network_manager.as_mut(),
resources.storage_reader.clone(),
resources.class_manager_client.clone(),
)
};

Expand Down
1 change: 1 addition & 0 deletions crates/papyrus_p2p_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ starknet_state_sync_types.workspace = true
starknet-types-core.workspace = true
thiserror.workspace = true
starknet_class_manager_types.workspace = true
async-trait.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
tracing.workspace = true
Expand Down
108 changes: 71 additions & 37 deletions crates/papyrus_p2p_sync/src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::fmt::Debug;

use async_trait::async_trait;
use futures::never::Never;
use futures::StreamExt;
use papyrus_common::pending_classes::ApiContractClass;
Expand All @@ -21,14 +22,16 @@ use papyrus_protobuf::sync::{
TransactionQuery,
};
use papyrus_storage::body::BodyStorageReader;
use papyrus_storage::class::ClassStorageReader;
use papyrus_storage::class_manager::ClassManagerStorageReader;
use papyrus_storage::header::HeaderStorageReader;
use papyrus_storage::state::StateStorageReader;
use papyrus_storage::{db, StorageReader, StorageTxn};
use starknet_api::block::BlockNumber;
use starknet_api::contract_class::ContractClass;
use starknet_api::core::ClassHash;
use starknet_api::state::ThinStateDiff;
use starknet_api::transaction::{Event, FullTransaction, TransactionHash};
use starknet_class_manager_types::{ClassManagerClientError, SharedClassManagerClient};
use tracing::{debug, error, info};

#[cfg(test)]
Expand Down Expand Up @@ -56,12 +59,14 @@ pub enum P2pSyncServerError {
SignatureNotFound { block_number: BlockNumber },
#[error(transparent)]
SendError(#[from] futures::channel::mpsc::SendError),
#[error(transparent)]
ClassManagerClientError(#[from] ClassManagerClientError),
}

impl P2pSyncServerError {
pub fn should_log_in_error_level(&self) -> bool {
match self {
Self::JoinError(_) | Self::SignatureNotFound { .. } | Self::SendError { .. }
Self::JoinError(_) | Self::SignatureNotFound { .. } | Self::SendError { .. } | Self::ClassManagerClientError { .. }
// TODO(shahak): Consider returning false for some of the StorageError variants.
| Self::DBInternalError { .. } => true,
Self::BlockNumberOutOfRange { .. } | Self::BlockNotFound { .. } | Self::ClassNotFound { .. } => false,
Expand Down Expand Up @@ -105,6 +110,7 @@ impl P2pSyncServerChannels {
pub struct P2pSyncServer {
storage_reader: StorageReader,
p2p_sync_channels: P2pSyncServerChannels,
class_manager_client: SharedClassManagerClient,
}

impl P2pSyncServer {
Expand All @@ -122,46 +128,51 @@ impl P2pSyncServer {
let server_query_manager = maybe_server_query_manager.expect(
"Header queries sender was unexpectedly dropped."
);
register_query(self.storage_reader.clone(), server_query_manager, "header");
register_query(self.storage_reader.clone(), server_query_manager, self.class_manager_client.clone(), "header");
}
maybe_server_query_manager = state_diff_receiver.next() => {
let server_query_manager = maybe_server_query_manager.expect(
"State diff queries sender was unexpectedly dropped."
);
register_query(self.storage_reader.clone(), server_query_manager, "state diff");
register_query(self.storage_reader.clone(), server_query_manager, self.class_manager_client.clone(), "state diff");
}
maybe_server_query_manager = transaction_receiver.next() => {
let server_query_manager = maybe_server_query_manager.expect(
"Transaction queries sender was unexpectedly dropped."
);
register_query(self.storage_reader.clone(), server_query_manager, "transaction");
register_query(self.storage_reader.clone(), server_query_manager, self.class_manager_client.clone(), "transaction");
}
maybe_server_query_manager = class_receiver.next() => {
let server_query_manager = maybe_server_query_manager.expect(
"Class queries sender was unexpectedly dropped."
);
register_query(self.storage_reader.clone(), server_query_manager, "class");
register_query(self.storage_reader.clone(), server_query_manager, self.class_manager_client.clone(), "class");
}
maybe_server_query_manager = event_receiver.next() => {
let server_query_manager = maybe_server_query_manager.expect(
"Event queries sender was unexpectedly dropped."
);
register_query(self.storage_reader.clone(), server_query_manager, "event");
register_query(self.storage_reader.clone(), server_query_manager, self.class_manager_client.clone(), "event");
}
};
}
}

pub fn new(storage_reader: StorageReader, p2p_sync_channels: P2pSyncServerChannels) -> Self {
Self { storage_reader, p2p_sync_channels }
pub fn new(
storage_reader: StorageReader,
p2p_sync_channels: P2pSyncServerChannels,
class_manager_client: SharedClassManagerClient,
) -> Self {
Self { storage_reader, p2p_sync_channels, class_manager_client }
}
}
fn register_query<Data, TQuery>(
storage_reader: StorageReader,
server_query_manager: ServerQueryManager<TQuery, DataOrFin<Data>>,
class_manager_client: SharedClassManagerClient,
protocol_decription: &str,
) where
Data: FetchBlockDataFromDb + Send + 'static,
Data: FetchBlockData + Send + 'static,
TQuery: TryFrom<Vec<u8>, Error = ProtobufConversionError> + Send + Clone + Debug + 'static,
Query: From<TQuery>,
{
Expand All @@ -174,6 +185,7 @@ fn register_query<Data, TQuery>(
let result = send_data_for_query(
storage_reader,
server_query_manager,
class_manager_client,
protocol_decription.as_str(),
)
.await;
Expand All @@ -194,17 +206,21 @@ fn register_query<Data, TQuery>(
}
}

pub trait FetchBlockDataFromDb: Sized {
fn fetch_block_data_from_db(
#[async_trait]
pub trait FetchBlockData: Sized {
async fn fetch_block_data(
block_number: BlockNumber,
txn: &StorageTxn<'_, db::RO>,
class_manager_client: &mut SharedClassManagerClient,
) -> Result<Vec<Self>, P2pSyncServerError>;
}

impl FetchBlockDataFromDb for SignedBlockHeader {
fn fetch_block_data_from_db(
#[async_trait]
impl FetchBlockData for SignedBlockHeader {
async fn fetch_block_data(
block_number: BlockNumber,
txn: &StorageTxn<'_, db::RO>,
_class_manager_client: &mut SharedClassManagerClient,
) -> Result<Vec<Self>, P2pSyncServerError> {
let mut header =
txn.get_block_header(block_number)?.ok_or(P2pSyncServerError::BlockNotFound {
Expand All @@ -227,10 +243,12 @@ impl FetchBlockDataFromDb for SignedBlockHeader {
}
}

impl FetchBlockDataFromDb for StateDiffChunk {
fn fetch_block_data_from_db(
#[async_trait]
impl FetchBlockData for StateDiffChunk {
async fn fetch_block_data(
block_number: BlockNumber,
txn: &StorageTxn<'_, db::RO>,
_class_manager_client: &mut SharedClassManagerClient,
) -> Result<Vec<Self>, P2pSyncServerError> {
let thin_state_diff =
txn.get_state_diff(block_number)?.ok_or(P2pSyncServerError::BlockNotFound {
Expand All @@ -240,10 +258,12 @@ impl FetchBlockDataFromDb for StateDiffChunk {
}
}

impl FetchBlockDataFromDb for FullTransaction {
fn fetch_block_data_from_db(
#[async_trait]
impl FetchBlockData for FullTransaction {
async fn fetch_block_data(
block_number: BlockNumber,
txn: &StorageTxn<'_, db::RO>,
_class_manager_client: &mut SharedClassManagerClient,
) -> Result<Vec<Self>, P2pSyncServerError> {
let transactions =
txn.get_block_transactions(block_number)?.ok_or(P2pSyncServerError::BlockNotFound {
Expand Down Expand Up @@ -272,44 +292,55 @@ impl FetchBlockDataFromDb for FullTransaction {
}
}

impl FetchBlockDataFromDb for (ApiContractClass, ClassHash) {
fn fetch_block_data_from_db(
#[async_trait]
impl FetchBlockData for (ApiContractClass, ClassHash) {
async fn fetch_block_data(
block_number: BlockNumber,
txn: &StorageTxn<'_, db::RO>,
class_manager_client: &mut SharedClassManagerClient,
) -> Result<Vec<Self>, P2pSyncServerError> {
let thin_state_diff =
txn.get_state_diff(block_number)?.ok_or(P2pSyncServerError::BlockNotFound {
block_hash_or_number: BlockHashOrNumber::Number(block_number),
})?;

if block_number >= txn.get_class_manager_block_marker()? {
return Err(P2pSyncServerError::BlockNotFound {
block_hash_or_number: BlockHashOrNumber::Number(block_number),
});
}

let declared_classes = thin_state_diff.declared_classes;
let deprecated_declared_classes = thin_state_diff.deprecated_declared_classes;
let mut result = Vec::new();
for class_hash in &deprecated_declared_classes {
for class_hash in deprecated_declared_classes {
let ContractClass::V0(deprecated_contract_class) =
class_manager_client.get_executable(class_hash).await?
else {
panic!("Received a cairo1 contract, expected cairo0");
};
result.push((
ApiContractClass::DeprecatedContractClass(
txn.get_deprecated_class(class_hash)?
.ok_or(P2pSyncServerError::ClassNotFound { class_hash: *class_hash })?,
),
*class_hash,
ApiContractClass::DeprecatedContractClass(deprecated_contract_class),
class_hash,
));
}
for (class_hash, _) in &declared_classes {

for (class_hash, _) in declared_classes {
result.push((
ApiContractClass::ContractClass(
txn.get_class(class_hash)?
.ok_or(P2pSyncServerError::ClassNotFound { class_hash: *class_hash })?,
),
*class_hash,
ApiContractClass::ContractClass(class_manager_client.get_sierra(class_hash).await?),
class_hash,
));
}
Ok(result)
}
}

impl FetchBlockDataFromDb for (Event, TransactionHash) {
fn fetch_block_data_from_db(
#[async_trait]
impl FetchBlockData for (Event, TransactionHash) {
async fn fetch_block_data(
block_number: BlockNumber,
txn: &StorageTxn<'_, db::RO>,
_class_manager_client: &mut SharedClassManagerClient,
) -> Result<Vec<Self>, P2pSyncServerError> {
let transaction_outputs = txn.get_block_transaction_outputs(block_number)?.ok_or(
P2pSyncServerError::BlockNotFound {
Expand Down Expand Up @@ -381,17 +412,19 @@ pub fn split_thin_state_diff(thin_state_diff: ThinStateDiff) -> Vec<StateDiffChu
async fn send_data_for_query<Data, TQuery>(
storage_reader: StorageReader,
mut server_query_manager: ServerQueryManager<TQuery, DataOrFin<Data>>,
mut class_manager_client: SharedClassManagerClient,
protocol_decription: &str,
) -> Result<(), P2pSyncServerError>
where
Data: FetchBlockDataFromDb + Send + 'static,
Data: FetchBlockData + Send + 'static,
TQuery: TryFrom<Vec<u8>, Error = ProtobufConversionError> + Clone,
Query: From<TQuery>,
{
// If this function fails, we still want to send fin before failing.
let result = send_data_without_fin_for_query(
&storage_reader,
&mut server_query_manager,
&mut class_manager_client,
protocol_decription,
)
.await;
Expand All @@ -403,10 +436,11 @@ where
async fn send_data_without_fin_for_query<Data, TQuery>(
storage_reader: &StorageReader,
server_query_manager: &mut ServerQueryManager<TQuery, DataOrFin<Data>>,
class_manager_client: &mut SharedClassManagerClient,
protocol_decription: &str,
) -> Result<(), P2pSyncServerError>
where
Data: FetchBlockDataFromDb + Send + 'static,
Data: FetchBlockData + Send + 'static,
TQuery: TryFrom<Vec<u8>, Error = ProtobufConversionError> + Clone,
Query: From<TQuery>,
{
Expand All @@ -428,7 +462,7 @@ where
for block_counter in 0..query.limit {
let block_number =
BlockNumber(utils::calculate_block_number(&query, start_block_number, block_counter)?);
let data_vec = Data::fetch_block_data_from_db(block_number, &txn)?;
let data_vec = Data::fetch_block_data(block_number, &txn, class_manager_client).await?;
for data in data_vec {
// TODO: consider implement retry mechanism.
info!(
Expand Down
21 changes: 16 additions & 5 deletions crates/papyrus_p2p_sync/src/server/test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::fmt::Debug;
use std::sync::Arc;

use futures::channel::mpsc::Sender;
use futures::StreamExt;
Expand Down Expand Up @@ -49,8 +50,9 @@ use starknet_api::transaction::{
TransactionHash,
TransactionOutput,
};
use starknet_class_manager_types::EmptyClassManagerClient;

use super::{split_thin_state_diff, FetchBlockDataFromDb, P2pSyncServer, P2pSyncServerChannels};
use super::{split_thin_state_diff, FetchBlockData, P2pSyncServer, P2pSyncServerChannels};
use crate::server::register_query;
const BUFFER_SIZE: usize = 10;
const NUM_OF_BLOCKS: usize = 10;
Expand Down Expand Up @@ -312,7 +314,7 @@ async fn run_test<T, F, TQuery>(
start_block_number: usize,
start_block_type: StartBlockType,
) where
T: FetchBlockDataFromDb + std::fmt::Debug + PartialEq + Send + Sync + 'static,
T: FetchBlockData + std::fmt::Debug + PartialEq + Send + Sync + 'static,
F: FnOnce(Vec<T>),
TQuery: From<Query>
+ TryFrom<Vec<u8>, Error = ProtobufConversionError>
Expand Down Expand Up @@ -361,7 +363,10 @@ async fn run_test<T, F, TQuery>(
let query = TQuery::from(query);
let (server_query_manager, _report_sender, response_reciever) =
create_test_server_query_manager(query);
register_query::<T, TQuery>(storage_reader, server_query_manager, "test");

// TODO(noamsp): use MockClassManagerClient instead
let class_manager_client = Arc::new(EmptyClassManagerClient);
register_query::<T, TQuery>(storage_reader, server_query_manager, class_manager_client, "test");

// run p2p_sync_server and collect query results.
tokio::select! {
Expand Down Expand Up @@ -409,8 +414,14 @@ fn setup() -> TestArgs {
event_receiver,
};

let p2p_sync_server =
super::P2pSyncServer::new(storage_reader.clone(), p2p_sync_server_channels);
// TODO(noamsp): use MockClassManagerClient instead
let class_manager_client = Arc::new(EmptyClassManagerClient);

let p2p_sync_server = super::P2pSyncServer::new(
storage_reader.clone(),
p2p_sync_server_channels,
class_manager_client,
);
TestArgs {
p2p_sync_server,
storage_reader,
Expand Down
Loading

0 comments on commit 3fc2de3

Please sign in to comment.