From 2cc0ce82ad532bdd4d9d57bd0de1de666f54482a Mon Sep 17 00:00:00 2001 From: Uriel Korach Date: Mon, 15 Apr 2024 14:56:14 +0300 Subject: [PATCH] feat: add a mempool proxy support for concurrent calls --- Cargo.lock | 32 +++++ Cargo.toml | 2 + crates/mempool_node/Cargo.toml | 16 ++- crates/mempool_node/src/config/config_test.rs | 67 +++++++++ crates/mempool_node/src/config/mod.rs | 128 ++++++++++++++++++ crates/mempool_node/src/lib.rs | 2 + crates/mempool_node/src/mempool.rs | 20 +-- crates/mempool_node/src/mempool_proxy.rs | 53 ++++---- crates/mempool_node/src/mempool_proxy_test.rs | 61 ++++++++- .../src/test_files/mempool_node_config.json | 18 +++ crates/mempool_node/src/version.rs | 55 ++++++++ crates/mempool_node/src/version_test.rs | 24 ++++ 12 files changed, 432 insertions(+), 46 deletions(-) create mode 100644 crates/mempool_node/src/config/config_test.rs create mode 100644 crates/mempool_node/src/config/mod.rs create mode 100644 crates/mempool_node/src/lib.rs create mode 100644 crates/mempool_node/src/test_files/mempool_node_config.json create mode 100644 crates/mempool_node/src/version.rs create mode 100644 crates/mempool_node/src/version_test.rs diff --git a/Cargo.lock b/Cargo.lock index a6681703..f121524a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -551,6 +551,26 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32b13ea120a812beba79e34316b3942a857c86ec1593cb34f27bb28272ce2cca" +[[package]] +name = "const_format" +version = "0.2.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3a214c7af3d04997541b18d432afaff4c455e79e2029079647e72fc2bd27673" +dependencies = [ + "const_format_proc_macros", +] + +[[package]] +name = "const_format_proc_macros" +version = "0.2.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7f6ff08fd20f4f299298a28e2dfa8a8ba1036e6cd2460ac1de7b425d76f2500" +dependencies = [ + "proc-macro2", + "quote", + "unicode-xid", +] + [[package]] name = "convert_case" version = "0.4.0" @@ -1283,8 +1303,20 @@ checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" name = "mempool_node" version = "0.0.0" dependencies = [ + "assert_matches", "async-trait", + "clap", + "const_format", + "hyper 1.2.0", + "papyrus_config", + "pretty_assertions", + "serde", + "serde_json", + "starknet_api", + "starknet_gateway", + "thiserror", "tokio", + "validator", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index b6f9420e..4703fab0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,9 +20,11 @@ as_conversions = "deny" [workspace.dependencies] assert_matches = "1.5.0" +async-trait = "0.1.79" axum = "0.6.12" clap = "4.3.10" derive_more = "0.99" +const_format = "0.2.30" hyper = "1.2.0" papyrus_config = "0.3.0" pretty_assertions = "1.4.0" diff --git a/crates/mempool_node/Cargo.toml b/crates/mempool_node/Cargo.toml index e8f4303e..ceb4a1aa 100644 --- a/crates/mempool_node/Cargo.toml +++ b/crates/mempool_node/Cargo.toml @@ -9,5 +9,19 @@ license.workspace = true workspace = true [dependencies] +async-trait.workspace = true +clap.workspace = true +const_format.workspace = true +starknet_gateway = { path = "../gateway", version = "0.0" } +hyper.workspace = true +serde.workspace = true +serde_json.workspace = true +starknet_api.workspace = true +thiserror.workspace = true +papyrus_config.workspace = true tokio.workspace = true -async-trait = "0.1.79" +validator.workspace = true + +[dev-dependencies] +assert_matches.workspace = true +pretty_assertions.workspace = true diff --git a/crates/mempool_node/src/config/config_test.rs b/crates/mempool_node/src/config/config_test.rs new file mode 100644 index 00000000..55e2b57c --- /dev/null +++ b/crates/mempool_node/src/config/config_test.rs @@ -0,0 +1,67 @@ +#![allow(unused_imports)] +use crate::config::node_command; +use crate::config::{ComponentConfig, ComponentExecutionConfig, MempoolNodeConfig}; +use assert_matches::assert_matches; +use papyrus_config::dumping::SerializeConfig; +use papyrus_config::loading::load_and_process_config; +use papyrus_config::presentation::get_config_presentation; +use papyrus_config::validators::ParsedValidationErrors; +use papyrus_config::{SerializationType, SerializedContent, SerializedParam}; +use starknet_gateway::GatewayConfig; +use std::env::{self, args}; +use std::fs::File; +use std::ops::IndexMut; +use std::path::{Path, PathBuf}; +use validator::Validate; + +const TEST_FILES_FOLDER: &str = "./src/test_files"; +const CONFIG_FILE: &str = "mempool_node_config.json"; + +fn get_config_file(file_name: &str) -> Result { + let config_file = File::open(Path::new(TEST_FILES_FOLDER).join(file_name)).unwrap(); + load_and_process_config::(config_file, node_command(), vec![]) +} + +#[test] +fn test_valid_config() { + // Read the valid config file and validate its content. + let expected_config = MempoolNodeConfig { + components: ComponentConfig { + gateway_component: ComponentExecutionConfig { execute: true }, + mempool_component: ComponentExecutionConfig { execute: false }, + }, + gateway_config: GatewayConfig { + bind_address: String::from("0.0.0.0:8080"), + }, + }; + let loaded_config = get_config_file(CONFIG_FILE).unwrap(); + + assert!(loaded_config.validate().is_ok()); + assert_eq!(loaded_config, expected_config); +} + +#[test] +fn test_components_config() { + // Read the valid config file and check that the validator finds no errors. + let mut config = get_config_file(CONFIG_FILE).unwrap(); + assert!(config.validate().is_ok()); + + // Invalidate the gateway component and check that the validator finds an error. + config.components.gateway_component.execute = false; + + assert_matches!(config.validate(), Err(e) => { + let parse_err = ParsedValidationErrors::from(e); + let mut error_msg = String::new(); + for error in parse_err.0 { + if error.param_path == "components.__all__" { + error_msg.push_str(&error.code); + break; + } + } + assert_eq!(error_msg, "Invalid components configuration."); + }); + + // Validate the mempool component and check that the validator finds no errors. + config.components.mempool_component.execute = true; + assert!(config.validate().is_ok()); +} diff --git a/crates/mempool_node/src/config/mod.rs b/crates/mempool_node/src/config/mod.rs new file mode 100644 index 00000000..b94d56cc --- /dev/null +++ b/crates/mempool_node/src/config/mod.rs @@ -0,0 +1,128 @@ +#[cfg(test)] +mod config_test; + +use std::collections::BTreeMap; +use std::fs::File; +use std::path::Path; + +use clap::Command; +use papyrus_config::dumping::{append_sub_config_name, ser_param, SerializeConfig}; +use papyrus_config::loading::load_and_process_config; +use papyrus_config::ParamPrivacyInput; +use papyrus_config::{ConfigError, ParamPath, SerializedParam}; +use serde::{Deserialize, Serialize}; +use starknet_gateway::GatewayConfig; +use validator::{Validate, ValidationError}; + +use crate::version::VERSION_FULL; + +// The path of the default configuration file, provided as part of the crate. +pub const DEFAULT_CONFIG_PATH: &str = "config/default_config.json"; + +/// The single crate configuration. +#[derive(Clone, Debug, Serialize, Deserialize, Validate, PartialEq)] +pub struct ComponentExecutionConfig { + pub execute: bool, +} + +impl SerializeConfig for ComponentExecutionConfig { + fn dump(&self) -> BTreeMap { + BTreeMap::from_iter([ser_param( + "execute", + &self.execute, + "The component execution flag.", + ParamPrivacyInput::Public, + )]) + } +} + +impl Default for ComponentExecutionConfig { + fn default() -> Self { + Self { execute: true } + } +} + +/// The components configuration. +#[derive(Clone, Debug, Serialize, Deserialize, Validate, PartialEq, Default)] +#[validate(schema(function = "validate_components_config"))] +pub struct ComponentConfig { + pub gateway_component: ComponentExecutionConfig, + pub mempool_component: ComponentExecutionConfig, +} + +impl SerializeConfig for ComponentConfig { + fn dump(&self) -> BTreeMap { + #[allow(unused_mut)] + let mut sub_configs = vec![ + append_sub_config_name(self.gateway_component.dump(), "gateway_component"), + append_sub_config_name(self.mempool_component.dump(), "mempool_component"), + ]; + + sub_configs.into_iter().flatten().collect() + } +} + +pub fn validate_components_config(components: &ComponentConfig) -> Result<(), ValidationError> { + if components.gateway_component.execute || components.mempool_component.execute { + return Ok(()); + } + + let mut error = ValidationError::new("Invalid components configuration."); + error.message = Some("At least one component should be allowed to execute.".into()); + Err(error) +} + +/// The configurations of the various components of the node. +#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Validate, Default)] +pub struct MempoolNodeConfig { + #[validate] + pub components: ComponentConfig, + #[validate] + pub gateway_config: GatewayConfig, +} + +impl SerializeConfig for MempoolNodeConfig { + fn dump(&self) -> BTreeMap { + #[allow(unused_mut)] + let mut sub_configs = vec![ + append_sub_config_name(self.components.dump(), "components"), + append_sub_config_name(self.gateway_config.dump(), "gateway_config"), + ]; + + sub_configs.into_iter().flatten().collect() + } +} + +impl MempoolNodeConfig { + /// Creates a config object. Selects the values from the default file and from resources with + /// higher priority. + fn load_and_process_config_file( + args: Vec, + config_file_name: Option<&str>, + ) -> Result { + let config_file_name = match config_file_name { + Some(file_name) => file_name, + None => DEFAULT_CONFIG_PATH, + }; + + let default_config_file = File::open(Path::new(config_file_name))?; + load_and_process_config(default_config_file, node_command(), args) + } + + pub fn load_and_process(args: Vec) -> Result { + Self::load_and_process_config_file(args, None) + } + pub fn load_and_process_file( + args: Vec, + config_file_name: &str, + ) -> Result { + Self::load_and_process_config_file(args, Some(config_file_name)) + } +} + +/// The command line interface of this node. +pub fn node_command() -> Command { + Command::new("Mempool") + .version(VERSION_FULL) + .about("Mempool is a StarkNet mempool node written in Rust.") +} diff --git a/crates/mempool_node/src/lib.rs b/crates/mempool_node/src/lib.rs new file mode 100644 index 00000000..ee657688 --- /dev/null +++ b/crates/mempool_node/src/lib.rs @@ -0,0 +1,2 @@ +pub mod config; +pub mod version; diff --git a/crates/mempool_node/src/mempool.rs b/crates/mempool_node/src/mempool.rs index 42f54b12..ddedb994 100644 --- a/crates/mempool_node/src/mempool.rs +++ b/crates/mempool_node/src/mempool.rs @@ -1,30 +1,24 @@ use async_trait::async_trait; +use tokio::sync::Mutex; pub type AddTransactionCallType = u32; pub type AddTransactionReturnType = usize; #[async_trait] pub trait MempoolTrait { - async fn add_transaction(&mut self, tx: AddTransactionCallType) -> AddTransactionReturnType; + async fn add_transaction(&self, tx: AddTransactionCallType) -> AddTransactionReturnType; } #[derive(Default)] pub struct Mempool { - transactions: Vec, -} - -impl Mempool { - pub fn new() -> Self { - Self { - transactions: vec![], - } - } + transactions: Mutex>, } #[async_trait] impl MempoolTrait for Mempool { - async fn add_transaction(&mut self, tx: AddTransactionCallType) -> AddTransactionReturnType { - self.transactions.push(tx); - self.transactions.len() + async fn add_transaction(&self, tx: AddTransactionCallType) -> AddTransactionReturnType { + let mut guarded_transactions = self.transactions.lock().await; + guarded_transactions.push(tx); + guarded_transactions.len() } } diff --git a/crates/mempool_node/src/mempool_proxy.rs b/crates/mempool_node/src/mempool_proxy.rs index 60666d3d..5e1dac79 100644 --- a/crates/mempool_node/src/mempool_proxy.rs +++ b/crates/mempool_node/src/mempool_proxy.rs @@ -1,9 +1,9 @@ +use std::sync::Arc; + use crate::mempool::{AddTransactionCallType, AddTransactionReturnType, Mempool, MempoolTrait}; use async_trait::async_trait; -use std::sync::Arc; -use tokio::sync::mpsc::{channel, Receiver, Sender}; -use tokio::sync::Mutex; +use tokio::sync::mpsc::{channel, Sender}; use tokio::task; enum ProxyFunc { @@ -14,48 +14,51 @@ enum ProxyRetValue { AddTransaction(AddTransactionReturnType), } +#[derive(Clone)] pub struct MempoolProxy { - tx_call: Sender, - rx_ret_value: Receiver, + tx_call: Sender<(ProxyFunc, Sender)>, } -impl MempoolProxy { - pub fn new(mempool: Arc>) -> Self { - let (tx_call, mut rx_call) = channel(32); - let (tx_ret_value, rx_ret_value) = channel(32); +impl Default for MempoolProxy { + fn default() -> Self { + let (tx_call, mut rx_call) = channel::<(ProxyFunc, Sender)>(32); task::spawn(async move { + let mempool = Arc::new(Mempool::default()); while let Some(call) = rx_call.recv().await { match call { - ProxyFunc::AddTransaction(tx) => { - let ret_value = mempool.lock().await.add_transaction(tx).await; - tx_ret_value - .send(ProxyRetValue::AddTransaction(ret_value)) - .await - .expect("Sender of the func call is expecting a return value"); + (ProxyFunc::AddTransaction(tx), tx_response) => { + let mempool = mempool.clone(); + task::spawn(async move { + let ret_value = mempool.add_transaction(tx).await; + tx_response + .send(ProxyRetValue::AddTransaction(ret_value)) + .await + .expect("Receiver should be listening."); + }); } } } }); - MempoolProxy { - tx_call, - rx_ret_value, - } + Self { tx_call } } } #[async_trait] impl MempoolTrait for MempoolProxy { - async fn add_transaction(&mut self, tx: AddTransactionCallType) -> AddTransactionReturnType { + async fn add_transaction(&self, tx: AddTransactionCallType) -> AddTransactionReturnType { + let (tx_response, mut rx_response) = channel(32); self.tx_call - .send(ProxyFunc::AddTransaction(tx)) + .send((ProxyFunc::AddTransaction(tx), tx_response)) .await - .expect("Receiver is always listening in a dedicated task"); + .expect("Receiver should be listening."); - match self.rx_ret_value.recv().await.expect( - "Receiver of the function call always returns a return value after sending a func call", - ) { + match rx_response + .recv() + .await + .expect("Sender should be responding.") + { ProxyRetValue::AddTransaction(ret_value) => ret_value, } } diff --git a/crates/mempool_node/src/mempool_proxy_test.rs b/crates/mempool_node/src/mempool_proxy_test.rs index 2441b842..1847410d 100644 --- a/crates/mempool_node/src/mempool_proxy_test.rs +++ b/crates/mempool_node/src/mempool_proxy_test.rs @@ -1,18 +1,65 @@ mod tests { + use std::sync::Arc; - use tokio::sync::Mutex; + use tokio::task::JoinSet; use crate::{ - mempool::{Mempool, MempoolTrait}, + mempool::{AddTransactionCallType, AddTransactionReturnType, Mempool, MempoolTrait}, mempool_proxy::MempoolProxy, }; + async fn test_mempool_single_thread_add_transaction(mempool: T) + where + T: MempoolTrait, + { + let tx: AddTransactionCallType = 1; + let expected_result: AddTransactionReturnType = 1; + assert_eq!(mempool.add_transaction(tx).await, expected_result); + } + + async fn test_mempool_concurrent_add_transaction(mempool: Arc) + where + T: MempoolTrait + std::marker::Send + std::marker::Sync + 'static, + { + let mut tasks: JoinSet<_> = (0..5) + .map(|_| { + let mempool = mempool.clone(); + async move { + let tx: AddTransactionCallType = 1; + mempool.add_transaction(tx).await + } + }) + .collect(); + + let mut results: Vec = vec![]; + while let Some(result) = tasks.join_next().await { + results.push(result.unwrap()); + } + + results.sort(); + + let expected_results: Vec = (1..=5).collect(); + assert_eq!(results, expected_results); + } + + #[tokio::test] + async fn test_direct_mempool_single_thread_add_transaction() { + test_mempool_single_thread_add_transaction(Mempool::default()).await; + } + + #[tokio::test] + async fn test_proxy_mempool_single_thread_add_transaction() { + test_mempool_single_thread_add_transaction(MempoolProxy::default()).await; + } + + #[tokio::test] + async fn test_direct_mempool_concurrent_add_transaction() { + test_mempool_concurrent_add_transaction(Arc::new(Mempool::default())).await; + } + #[tokio::test] - async fn test_proxy_add_transaction() { - let mempool = Arc::new(Mutex::new(Mempool::new())); - let mut proxy = MempoolProxy::new(mempool); - assert_eq!(proxy.add_transaction(1).await, 1); - assert_eq!(proxy.add_transaction(1).await, 2); + async fn test_proxy_mempool_concurrent_add_transaction() { + test_mempool_concurrent_add_transaction(Arc::new(MempoolProxy::default())).await; } } diff --git a/crates/mempool_node/src/test_files/mempool_node_config.json b/crates/mempool_node/src/test_files/mempool_node_config.json new file mode 100644 index 00000000..d7efcadc --- /dev/null +++ b/crates/mempool_node/src/test_files/mempool_node_config.json @@ -0,0 +1,18 @@ +{ + "components.gateway_component.execute": { + "description": "The component execution flag.", + "value": true, + "privacy": "Public" + }, + "components.mempool_component.execute": { + "description": "The component execution flag.", + "value": false, + "privacy": "Public" + }, + "gateway_config.bind_address": { + "description": "The server bind addres of a gateway.", + "value": "0.0.0.0:8080", + "privacy": "Public" + } +} + diff --git a/crates/mempool_node/src/version.rs b/crates/mempool_node/src/version.rs new file mode 100644 index 00000000..b5ba9580 --- /dev/null +++ b/crates/mempool_node/src/version.rs @@ -0,0 +1,55 @@ +#[cfg(test)] +#[path = "version_test.rs"] +mod version_test; + +/// Major version component of the current release. +const VERSION_MAJOR: u32 = 0; + +/// Minor version component of the current release. +const VERSION_MINOR: u32 = 0; + +/// Patch version component of the current release. +const VERSION_PATCH: u32 = 0; + +/// Version metadata to append to the version string. +/// Expected values are `dev` and `stable`. +#[allow(dead_code)] +const VERSION_META: Metadata = Metadata::Dev; + +/// Textual version string. +pub const VERSION: &str = version_str(); +/// Textual version string including the metadata. +pub const VERSION_FULL: &str = full_version_str(); + +#[allow(dead_code)] +const DEV_VERSION_META: &str = "dev"; +#[allow(dead_code)] +const STABLE_VERSION_META: &str = "stable"; + +#[allow(dead_code)] +#[derive(PartialEq)] +enum Metadata { + Dev, + Stable, +} + +#[cfg_attr(coverage_nightly, coverage_attribute)] +const fn version_str() -> &'static str { + const_format::concatcp!(VERSION_MAJOR, ".", VERSION_MINOR, ".", VERSION_PATCH) +} + +#[cfg_attr(coverage_nightly, coverage_attribute)] +const fn full_version_str() -> &'static str { + match VERSION_META { + Metadata::Dev => const_format::concatcp!(VERSION, "-", DEV_VERSION_META), + Metadata::Stable => VERSION, + } +} + +#[allow(dead_code)] +const fn metadata_str(metadata: Metadata) -> &'static str { + match metadata { + Metadata::Dev => DEV_VERSION_META, + Metadata::Stable => STABLE_VERSION_META, + } +} diff --git a/crates/mempool_node/src/version_test.rs b/crates/mempool_node/src/version_test.rs new file mode 100644 index 00000000..14fb95c6 --- /dev/null +++ b/crates/mempool_node/src/version_test.rs @@ -0,0 +1,24 @@ +use pretty_assertions::assert_eq; + +#[test] +fn version() { + let expected_version = format!( + "{}.{}.{}", + super::VERSION_MAJOR, + super::VERSION_MINOR, + super::VERSION_PATCH + ); + assert_eq!(super::VERSION, expected_version); + + let expected_version_with_meta = match super::VERSION_META { + crate::version::Metadata::Dev => { + format!( + "{}-{}", + expected_version, + super::metadata_str(super::VERSION_META) + ) + } + crate::version::Metadata::Stable => expected_version, + }; + assert_eq!(super::VERSION_FULL, expected_version_with_meta); +}