Skip to content

Commit

Permalink
refactor(consensus): move logic for buildingconsensus for tests into …
Browse files Browse the repository at this point in the history
…its own main
  • Loading branch information
matan-starkware committed Sep 18, 2024
1 parent 318c4da commit d725dce
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 63 deletions.
4 changes: 4 additions & 0 deletions crates/papyrus_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ name = "central_source_integration_test"
path = "src/bin/central_source_integration_test.rs"
required-features = ["futures-util", "tokio-stream"]

[[bin]]
name = "run_consensus"
path = "src/bin/run_consensus.rs"

[dependencies]
anyhow.workspace = true
clap = { workspace = true }
Expand Down
75 changes: 75 additions & 0 deletions crates/papyrus_node/src/bin/run_consensus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use std::env::args;

use futures::stream::StreamExt;
use papyrus_config::ConfigError;
use papyrus_consensus::config::ConsensusConfig;
use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager};
use papyrus_node::config::NodeConfig;
use papyrus_node::run::{run, PapyrusTaskHandles, PapyrusUtilities};
use papyrus_p2p_sync::BUFFER_SIZE;
use papyrus_storage::StorageReader;
use starknet_api::block::BlockNumber;
use tokio::task::JoinHandle;
use tracing::info;

fn build_consensus(
config: &ConsensusConfig,
storage_reader: StorageReader,
network_manager: &mut NetworkManager,
) -> anyhow::Result<Option<JoinHandle<anyhow::Result<()>>>> {
let config = config.clone();
let Some(test_config) = config.test.as_ref() else {
info!("Using the default consensus implementation.");
return Ok(None);
};

let network_channels = network_manager
.register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?;
let BroadcastTopicChannels { messages_to_broadcast_sender, broadcast_client_channels } =
network_channels;
// TODO(matan): connect this to an actual channel.
let sync_channels = network_manager
.register_broadcast_topic(Topic::new(test_config.sync_topic.clone()), BUFFER_SIZE)?;
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
messages_to_broadcast_sender,
config.num_validators,
Some(sync_channels.messages_to_broadcast_sender),
);
let sync_receiver = sync_channels.broadcast_client_channels.map(|(vote, _report_sender)| {
BlockNumber(vote.expect("Sync channel should never have errors").height)
});
Ok(Some(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
config.timeouts.clone(),
broadcast_client_channels,
sync_receiver,
)
.await?)
})))
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let config = NodeConfig::load_and_process(args().collect());
if let Err(ConfigError::CommandInput(clap_err)) = config {
clap_err.exit();
}
let config = config?;

let mut utils = PapyrusUtilities::new(&config)?;
let mut tasks = PapyrusTaskHandles::default();

tasks.consensus_handle = build_consensus(
config.consensus.as_ref().unwrap(),
utils.storage_reader.clone(),
utils.maybe_network_manager.as_mut().unwrap(),
)?;
run(config, utils, tasks).await
}
92 changes: 34 additions & 58 deletions crates/papyrus_node/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::process::exit;
use std::sync::Arc;
use std::time::Duration;

use futures::stream::StreamExt;
use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig;
use papyrus_common::metrics::COLLECT_PROFILING_METRICS;
use papyrus_common::pending_classes::PendingClasses;
Expand All @@ -30,7 +29,7 @@ use papyrus_sync::sources::base_layer::{BaseLayerSourceError, EthereumBaseLayerS
use papyrus_sync::sources::central::{CentralError, CentralSource, CentralSourceConfig};
use papyrus_sync::sources::pending::PendingSource;
use papyrus_sync::{StateSync, SyncConfig};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::block::BlockHash;
use starknet_api::felt;
use starknet_client::reader::objects::pending_data::{PendingBlock, PendingBlockOrDeprecated};
use starknet_client::reader::PendingData;
Expand All @@ -57,10 +56,10 @@ const GENESIS_HASH: &str = "0x0";
const STORAGE_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);

pub struct PapyrusUtilities {
storage_reader: StorageReader,
storage_writer: StorageWriter,
maybe_network_manager: Option<NetworkManager>,
local_peer_id: String,
pub storage_reader: StorageReader,
pub storage_writer: StorageWriter,
pub maybe_network_manager: Option<NetworkManager>,
pub local_peer_id: String,
}

impl PapyrusUtilities {
Expand All @@ -71,14 +70,18 @@ impl PapyrusUtilities {
}
}

/// Struct which allows configuring how the node will run.
/// - If left `None`, the task will be spawn with its default (prod) configuration.
/// - If set to Some, that variant of the task will be run, and the default ignored.
/// - If you want to disable a task set it to `Some(tokio::spawn(pending()))`.
pub struct PapyrusTaskHandles {
storage_metrics_handle: Option<JoinHandle<anyhow::Result<()>>>,
sync_client_and_rpc_server_handles:
pub storage_metrics_handle: Option<JoinHandle<anyhow::Result<()>>>,
pub sync_client_and_rpc_server_handles:
Option<(JoinHandle<anyhow::Result<()>>, JoinHandle<anyhow::Result<()>>)>,
monitoring_server_handle: Option<JoinHandle<anyhow::Result<()>>>,
p2p_sync_server_handle: Option<JoinHandle<anyhow::Result<()>>>,
consensus_handle: Option<JoinHandle<anyhow::Result<()>>>,
network_handle: Option<JoinHandle<anyhow::Result<()>>>,
pub monitoring_server_handle: Option<JoinHandle<anyhow::Result<()>>>,
pub p2p_sync_server_handle: Option<JoinHandle<anyhow::Result<()>>>,
pub consensus_handle: Option<JoinHandle<anyhow::Result<()>>>,
pub network_handle: Option<JoinHandle<anyhow::Result<()>>>,
}

impl Default for PapyrusTaskHandles {
Expand Down Expand Up @@ -141,53 +144,26 @@ fn build_consensus(
.register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?;
let BroadcastTopicChannels { messages_to_broadcast_sender, broadcast_client_channels } =
network_channels;
// TODO(matan): connect this to an actual channel.
if let Some(test_config) = config.test.as_ref() {
let sync_channels = network_manager
.register_broadcast_topic(Topic::new(test_config.sync_topic.clone()), BUFFER_SIZE)?;
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
messages_to_broadcast_sender,
config.num_validators,
Some(sync_channels.messages_to_broadcast_sender),
);
let sync_receiver =
sync_channels.broadcast_client_channels.map(|(vote, _report_sender)| {
BlockNumber(vote.expect("Sync channel should never have errors").height)
});
Ok(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
config.timeouts.clone(),
broadcast_client_channels,
sync_receiver,
)
.await?)
}))
} else {
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
messages_to_broadcast_sender,
config.num_validators,
None,
);
Ok(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
config.timeouts.clone(),
broadcast_client_channels,
futures::stream::pending(),
)
.await?)
}))
}
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
messages_to_broadcast_sender,
config.num_validators,
None,
);
Ok(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
config.timeouts.clone(),
broadcast_client_channels,
futures::stream::pending(),
)
.await?)
}))
}

fn spawn_storage_metrics_collector(
storage_reader: StorageReader,
update_interval: Duration,
Expand Down
12 changes: 7 additions & 5 deletions crates/sequencing/papyrus_consensus/src/bin/run_simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,11 @@ struct RunConsensusArgs {
default_value = "60", value_parser = parse_duration
)]
stagnation_threshold: Duration,
#[arg(long = "duration", help = "Maximum test duration in seconds.",
default_value = "123456789123456789",
value_parser = parse_duration)]
#[arg(
long = "duration", help = "Maximum test duration in seconds.",
default_value = "123456789123456789",
value_parser = parse_duration
)]
max_test_duration: Duration,
}

Expand Down Expand Up @@ -261,7 +263,7 @@ async fn build_node(data_dir: &str, logs_dir: &str, i: usize, papyrus_args: &Pap
let data_dir = format!("{}/data{}", data_dir, i);

let mut cmd = format!(
"RUST_LOG=papyrus_consensus=debug,papyrus=info target/release/papyrus_node \
"RUST_LOG=papyrus_consensus=debug,papyrus=info target/release/run_consensus \
--network.#is_none false --base_layer.node_url {} --storage.db_config.path_prefix {} \
--consensus.#is_none false --consensus.validator_id 0x{} --consensus.num_validators {} \
--network.tcp_port {} --rpc.server_address 127.0.0.1:{} \
Expand Down Expand Up @@ -367,7 +369,7 @@ async fn main() {

println!("Running cargo build...");
Command::new("cargo")
.args(["build", "--release", "--package", "papyrus_node"])
.args(["build", "--release", "--package", "papyrus_node", "--bin", "run_consensus"])
.status()
.unwrap();

Expand Down

0 comments on commit d725dce

Please sign in to comment.