Skip to content

Commit

Permalink
refactor(node): remove test code from production code
Browse files Browse the repository at this point in the history
Remove the test config and test setup from production code.
This causes us to create a new run_consensus binary for the test flow.
  • Loading branch information
matan-starkware committed Sep 22, 2024
1 parent d70e589 commit 470e815
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 218 deletions.
30 changes: 0 additions & 30 deletions config/papyrus/default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -99,36 +99,6 @@
"privacy": "Public",
"value": 0
},
"consensus.test.#is_none": {
"description": "Flag for an optional field.",
"privacy": "TemporaryValue",
"value": true
},
"consensus.test.cache_size": {
"description": "The cache size for the test simulation.",
"privacy": "Public",
"value": 1000
},
"consensus.test.drop_probability": {
"description": "The probability of dropping a message.",
"privacy": "Public",
"value": 0.0
},
"consensus.test.invalid_probability": {
"description": "The probability of sending an invalid message.",
"privacy": "Public",
"value": 0.0
},
"consensus.test.random_seed": {
"description": "The random seed for the test simulation to ensure repeatable test results.",
"privacy": "Public",
"value": 0
},
"consensus.test.sync_topic": {
"description": "The network topic for sync messages.",
"privacy": "Public",
"value": "consensus_test_sync"
},
"consensus.timeouts.precommit_timeout": {
"description": "The timeout (seconds) for a precommit.",
"privacy": "Public",
Expand Down
6 changes: 6 additions & 0 deletions crates/papyrus_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,18 @@ normal = ["clap", "papyrus_base_layer", "reqwest", "tokio"]
[features]
default = ["rpc"]
rpc = ["papyrus_rpc"]
testing = []

[[bin]]
name = "central_source_integration_test"
path = "src/bin/central_source_integration_test.rs"
required-features = ["futures-util", "tokio-stream"]

[[bin]]
name = "run_consensus"
path = "src/bin/run_consensus.rs"
required-features = ["testing"]

[dependencies]
anyhow.workspace = true
clap = { workspace = true }
Expand Down
113 changes: 113 additions & 0 deletions crates/papyrus_node/src/bin/run_consensus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
//! Run a papyrus node with consensus enabled and the ability to simulate network issues for
//! consensus.
use clap::Parser;
use futures::stream::StreamExt;
use papyrus_consensus::config::ConsensusConfig;
use papyrus_consensus::simulation_network_receiver::NetworkReceiver;
use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager};
use papyrus_node::bin_utils::build_configs;
use papyrus_node::run::{run, PapyrusResources, PapyrusTaskHandles};
use papyrus_p2p_sync::BUFFER_SIZE;
use papyrus_storage::StorageReader;
use starknet_api::block::BlockNumber;
use tokio::task::JoinHandle;

/// Test configuration for consensus.
#[derive(Parser, Debug, Clone, PartialEq)]
pub struct TestConfig {
#[arg(long = "cache_size", help = "The cache size for the test network receiver.")]
pub cache_size: usize,
#[arg(
long = "random_seed",
help = "The random seed for the test simulation to ensure repeatable test results."
)]
pub random_seed: u64,
#[arg(long = "drop_probability", help = "The probability of dropping a message.")]
pub drop_probability: f64,
#[arg(long = "invalid_probability", help = "The probability of sending an invalid message.")]
pub invalid_probability: f64,
#[arg(long = "sync_topic", help = "The network topic for sync messages.")]
pub sync_topic: String,
}

impl Default for TestConfig {
fn default() -> Self {
Self {
cache_size: 1000,
random_seed: 0,
drop_probability: 0.0,
invalid_probability: 0.0,
sync_topic: "consensus_test_sync".to_string(),
}
}
}

fn build_consensus(
consensus_config: ConsensusConfig,
test_config: TestConfig,
storage_reader: StorageReader,
network_manager: &mut NetworkManager,
) -> anyhow::Result<Option<JoinHandle<anyhow::Result<()>>>> {
let network_channels = network_manager.register_broadcast_topic(
Topic::new(consensus_config.network_topic.clone()),
BUFFER_SIZE,
)?;
// TODO(matan): connect this to an actual channel.
let sync_channels = network_manager
.register_broadcast_topic(Topic::new(test_config.sync_topic.clone()), BUFFER_SIZE)?;
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender.clone(),
consensus_config.num_validators,
Some(sync_channels.messages_to_broadcast_sender),
);
let sync_receiver =
sync_channels.broadcasted_messages_receiver.map(|(vote, _report_sender)| {
BlockNumber(vote.expect("Sync channel should never have errors").height)
});
let network_receiver = NetworkReceiver::new(
network_channels.broadcasted_messages_receiver,
test_config.cache_size,
test_config.random_seed,
test_config.drop_probability,
test_config.invalid_probability,
);
let broadcast_channels = BroadcastTopicChannels {
messages_to_broadcast_sender: network_channels.messages_to_broadcast_sender,
broadcasted_messages_receiver: Box::new(network_receiver),
reported_messages_sender: network_channels.reported_messages_sender,
continue_propagation_sender: network_channels.continue_propagation_sender,
};

Ok(Some(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
consensus_config.start_height,
consensus_config.validator_id,
consensus_config.consensus_delay,
consensus_config.timeouts.clone(),
broadcast_channels,
sync_receiver,
)
.await?)
})))
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let (test_config, node_config) = build_configs::<TestConfig>()?;

let mut resources = PapyrusResources::new(&node_config)?;

let consensus_handle = build_consensus(
node_config.consensus.clone().unwrap(),
test_config,
resources.storage_reader.clone(),
resources.maybe_network_manager.as_mut().unwrap(),
)?;
let tasks = PapyrusTaskHandles { consensus_handle, ..Default::default() };

run(node_config, resources, tasks).await
}
48 changes: 48 additions & 0 deletions crates/papyrus_node/src/bin_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::env::args;

use clap::Parser;
use papyrus_config::ConfigError;

use crate::config::NodeConfig;

// Test arguments passed on the command line are prefixed with `test.<ARG_NAME>`.
const TEST_ARG_PREFIX: &str = "--test.";

/// Split the elements of `input_args` into 2 groups:
/// 1. Those prefixed with "--test."
/// 2. Other.
///
/// Presumes input is: program_name (--flag_name value)*
pub fn split_args(input_args: Vec<String>) -> (Vec<String>, Vec<String>) {
input_args[1..].chunks(2).fold(
(vec![input_args[0].clone()], vec![input_args[0].clone()]),
|(mut matching_args, mut mismatched_args), input_arg| {
let (name, value) = (&input_arg[0], &input_arg[1]);
// String leading `--` for comparison.
if &name[..TEST_ARG_PREFIX.len()] == TEST_ARG_PREFIX {
matching_args.push(format!("--{}", &name[TEST_ARG_PREFIX.len()..]));
matching_args.push(value.clone());
} else {
mismatched_args.push(name.clone());
mismatched_args.push(value.clone());
}
(matching_args, mismatched_args)
},
)
}

/// Build both the node and test configs from the command line arguments.
pub fn build_configs<T: Parser + Default>() -> Result<(T, NodeConfig), ConfigError> {
let input_args = args().collect::<Vec<_>>();
let (test_input_args, node_input_args) = split_args(input_args);

let mut test_config = T::default();
test_config.update_from(test_input_args.iter());

let node_config = NodeConfig::load_and_process(node_input_args);
if let Err(ConfigError::CommandInput(clap_err)) = node_config {
clap_err.exit();
}
let node_config = node_config?;
Ok((test_config, node_config))
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,44 +115,6 @@ expression: dumped_default_config
},
"privacy": "Public"
},
"consensus.test.#is_none": {
"description": "Flag for an optional field.",
"value": true,
"privacy": "TemporaryValue"
},
"consensus.test.cache_size": {
"description": "The cache size for the test simulation.",
"value": {
"$serde_json::private::Number": "1000"
},
"privacy": "Public"
},
"consensus.test.drop_probability": {
"description": "The probability of dropping a message.",
"value": {
"$serde_json::private::Number": "0.0"
},
"privacy": "Public"
},
"consensus.test.invalid_probability": {
"description": "The probability of sending an invalid message.",
"value": {
"$serde_json::private::Number": "0.0"
},
"privacy": "Public"
},
"consensus.test.random_seed": {
"description": "The random seed for the test simulation to ensure repeatable test results.",
"value": {
"$serde_json::private::Number": "0"
},
"privacy": "Public"
},
"consensus.test.sync_topic": {
"description": "The network topic for sync messages.",
"value": "consensus_test_sync",
"privacy": "Public"
},
"consensus.timeouts.precommit_timeout": {
"description": "The timeout (seconds) for a precommit.",
"value": {
Expand Down
2 changes: 2 additions & 0 deletions crates/papyrus_node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// within this crate
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]

#[cfg(any(test, feature = "testing"))]
pub mod bin_utils;
#[allow(unused_imports)]
pub mod config;
#[cfg(test)]
Expand Down
83 changes: 20 additions & 63 deletions crates/papyrus_node/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@ use std::process::exit;
use std::sync::Arc;
use std::time::Duration;

use futures::stream::StreamExt;
use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig;
use papyrus_common::metrics::COLLECT_PROFILING_METRICS;
use papyrus_common::pending_classes::PendingClasses;
use papyrus_common::BlockHashAndNumber;
use papyrus_config::presentation::get_config_presentation;
use papyrus_config::validators::config_validate;
use papyrus_consensus::config::ConsensusConfig;
use papyrus_consensus::simulation_network_receiver::NetworkReceiver;
use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext;
use papyrus_monitoring_gateway::MonitoringServer;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager};
use papyrus_network::network_manager::NetworkManager;
use papyrus_network::{network_manager, NetworkConfig};
use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels};
use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels};
Expand All @@ -31,7 +29,7 @@ use papyrus_sync::sources::base_layer::{BaseLayerSourceError, EthereumBaseLayerS
use papyrus_sync::sources::central::{CentralError, CentralSource, CentralSourceConfig};
use papyrus_sync::sources::pending::PendingSource;
use papyrus_sync::{StateSync, SyncConfig};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::block::BlockHash;
use starknet_api::felt;
use starknet_client::reader::objects::pending_data::{PendingBlock, PendingBlockOrDeprecated};
use starknet_client::reader::PendingData;
Expand Down Expand Up @@ -188,65 +186,24 @@ fn spawn_consensus(

let network_channels = network_manager
.register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?;
// TODO(matan): connect this to an actual channel.
if let Some(test_config) = config.test.as_ref() {
let sync_channels = network_manager
.register_broadcast_topic(Topic::new(test_config.sync_topic.clone()), BUFFER_SIZE)?;
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender.clone(),
config.num_validators,
Some(sync_channels.messages_to_broadcast_sender),
);
let network_receiver = NetworkReceiver::new(
network_channels.broadcasted_messages_receiver,
test_config.cache_size,
test_config.random_seed,
test_config.drop_probability,
test_config.invalid_probability,
);
let broadcast_channels = BroadcastTopicChannels {
messages_to_broadcast_sender: network_channels.messages_to_broadcast_sender,
broadcasted_messages_receiver: Box::new(network_receiver),
reported_messages_sender: network_channels.reported_messages_sender,
continue_propagation_sender: network_channels.continue_propagation_sender,
};
let sync_receiver =
sync_channels.broadcasted_messages_receiver.map(|(vote, _report_sender)| {
BlockNumber(vote.expect("Sync channel should never have errors").height)
});
Ok(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
config.timeouts.clone(),
broadcast_channels,
sync_receiver,
)
.await?)
}))
} else {
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender.clone(),
config.num_validators,
None,
);
Ok(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
config.timeouts.clone(),
network_channels,
futures::stream::pending(),
)
.await?)
}))
}
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender.clone(),
config.num_validators,
None,
);
Ok(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
config.timeouts.clone(),
network_channels,
futures::stream::pending(),
)
.await?)
}))
}

async fn run_sync(
Expand Down
Loading

0 comments on commit 470e815

Please sign in to comment.