From bfbe3e4fc841a13efdfbbe0a454487c0f97b104d Mon Sep 17 00:00:00 2001 From: Rob Date: Thu, 14 Dec 2023 13:13:21 -0500 Subject: [PATCH 1/3] remove cancelled polls from the taskmap --- Cargo.lock | 1 + crates/hotshot/Cargo.toml | 1 + .../traits/networking/web_server_network.rs | 206 +++++++++++------- 3 files changed, 129 insertions(+), 79 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4527932d96..fb4033f5cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2734,6 +2734,7 @@ dependencies = [ "custom_debug", "dashmap", "derivative", + "derive_more", "dyn-clone 1.0.16 (git+https://github.com/dtolnay/dyn-clone?tag=1.0.16)", "either", "embed-doc-image", diff --git a/crates/hotshot/Cargo.toml b/crates/hotshot/Cargo.toml index 74bd1eab9d..a7a34134b5 100644 --- a/crates/hotshot/Cargo.toml +++ b/crates/hotshot/Cargo.toml @@ -114,6 +114,7 @@ surf-disco = { workspace = true } time = { workspace = true } local-ip-address = "0.5.6" dyn-clone = { git = "https://github.com/dtolnay/dyn-clone", tag = "1.0.16" } +derive_more = "0.99.17" tracing = { workspace = true } typenum = { workspace = true } diff --git a/crates/hotshot/src/traits/networking/web_server_network.rs b/crates/hotshot/src/traits/networking/web_server_network.rs index ad0b2dd078..79107fb43c 100644 --- a/crates/hotshot/src/traits/networking/web_server_network.rs +++ b/crates/hotshot/src/traits/networking/web_server_network.rs @@ -11,6 +11,7 @@ use async_compatibility_layer::{ }; use async_lock::RwLock; use async_trait::async_trait; +use derive_more::{Deref, DerefMut}; use hotshot_task::{boxed_sync, BoxSyncFuture}; use hotshot_types::{ message::{Message, MessagePurpose}, @@ -95,6 +96,80 @@ impl WebServerNetwork { } } +/// `TaskChannel` is a type alias for an unbounded sender channel that sends `ConsensusIntentEvent`s. +/// +/// This channel is used to send events to a task. The `K` type parameter is the type of the key used in the `ConsensusIntentEvent`. +/// +/// # Examples +/// +/// ``` +/// let (tx, _rx): (TaskChannel, _) = tokio::sync::mpsc::unbounded_channel(); +/// ``` +/// +/// # Note +/// +/// This type alias is used in the context of a `TaskMap`, where each task is represented by a `TaskChannel`. +type TaskChannel = UnboundedSender>; + +/// `TaskMap` is a wrapper around a `BTreeMap` that maps view numbers to tasks. +/// +/// Each task is represented by a `TaskChannel` that can be used to send events to the task. +/// The key `K` is a type that implements the `SignatureKey` trait. +/// +/// # Examples +/// +/// ``` +/// use your_crate::TaskMap; +/// let mut map: TaskMap = TaskMap::default(); +/// ``` +/// +/// # Note +/// +/// This struct is `Clone`, `Deref`, and `DerefMut`, so it can be used just like a `BTreeMap`. +#[derive(Debug, Clone, Deref, DerefMut)] +struct TaskMap(BTreeMap>); + +impl Default for TaskMap { + fn default() -> Self { + Self(BTreeMap::default()) + } +} + +impl TaskMap { + /// Prunes tasks that are polling for a view less than or equal to `current_view - 2`. + /// + /// This method cancels and removes all entries in the task map that are polling for a view less than or equal to `current_view - 2`. + /// The cancellation is performed by sending a `cancel_event` to the task. + /// + /// # Arguments + /// + /// * `current_view` - The current view number. Tasks polling for a view less than or equal to `current_view - 2` will be pruned. + /// * `cancel_event_fn` - A function that takes a view number and returns a `ConsensusIntentEvent` to be sent to the task for cancellation. + /// + /// # Examples + /// + /// ``` + /// let mut map: TaskMap = TaskMap::default(); + /// map.prune_tasks(10, ConsensusIntentEvent::CancelPollForProposal).await; + /// ``` + async fn prune_tasks( + &mut self, + current_view: u64, + cancel_event_fn: fn(u64) -> ConsensusIntentEvent, + ) { + println!("len {:?}: {}", cancel_event_fn(0), self.len()); + let cutoff_view = current_view.saturating_sub(2); + let views_to_remove: Vec<_> = self.range(..cutoff_view).map(|(key, _)| *key).collect(); + + for view in views_to_remove { + let task = self.remove(&view); + if let Some(task) = task { + let _ = task.send(cancel_event_fn(view)).await; + } + } + } +} + /// Represents the core of web server networking #[derive(Debug)] struct Inner { @@ -119,26 +194,19 @@ struct Inner { tx_index: Arc>, /// Task map for quorum proposals. - proposal_task_map: - Arc>>>>, + proposal_task_map: Arc>>, /// Task map for quorum votes. - vote_task_map: - Arc>>>>, + vote_task_map: Arc>>, /// Task map for VID disperse data - vid_disperse_task_map: - Arc>>>>, + vid_disperse_task_map: Arc>>, /// Task map for DACs. - dac_task_map: - Arc>>>>, + dac_task_map: Arc>>, /// Task map for view sync certificates. - view_sync_cert_task_map: - Arc>>>>, + view_sync_cert_task_map: Arc>>, /// Task map for view sync votes. - view_sync_vote_task_map: - Arc>>>>, + view_sync_vote_task_map: Arc>>, /// Task map for transactions - txn_task_map: - Arc>>>>, + txn_task_map: Arc>>, } impl Inner { @@ -769,6 +837,17 @@ impl ConnectedNetwork, TYPES::Signatur self.inner.is_da ); + println!( + "{},{},{},{},{},{},{}", + self.inner.proposal_task_map.read().await.len(), + self.inner.vote_task_map.read().await.len(), + self.inner.vid_disperse_task_map.read().await.len(), + self.inner.dac_task_map.read().await.len(), + self.inner.view_sync_cert_task_map.read().await.len(), + self.inner.view_sync_vote_task_map.read().await.len(), + self.inner.txn_task_map.read().await.len() + ); + // TODO ED Need to handle canceling tasks that don't receive their expected output (such a proposal that never comes) match event { ConsensusIntentEvent::PollForProposal(view_number) => { @@ -799,15 +878,10 @@ impl ConnectedNetwork, TYPES::Signatur debug!("Somehow task already existed!"); } - // Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`. - let view_minus_2 = view_number.saturating_sub(2); - let range = task_map.range(..view_minus_2); - for (view, task) in range { - // Cancel the old task by sending a message to it. If the task already exited we expect an error - let _res = task - .send(ConsensusIntentEvent::CancelPollForProposal(*view)) - .await; - } + // Cancel old, stale tasks + task_map + .prune_tasks(view_number, ConsensusIntentEvent::CancelPollForProposal) + .await; } ConsensusIntentEvent::PollForVIDDisperse(view_number) => { // Check if we already have a task for this (we shouldn't) @@ -837,15 +911,10 @@ impl ConnectedNetwork, TYPES::Signatur debug!("Somehow task already existed!"); } - // Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`. - let view_minus_2 = view_number.saturating_sub(2); - let range = task_map.range(..view_minus_2); - for (view, task) in range { - // Cancel the old task by sending a message to it. If the task already exited we expect an error - let _res = task - .send(ConsensusIntentEvent::CancelPollForVIDDisperse(*view)) - .await; - } + // Cancel old, stale tasks + task_map + .prune_tasks(view_number, ConsensusIntentEvent::CancelPollForVIDDisperse) + .await; } ConsensusIntentEvent::PollForCurrentProposal => { // create new task @@ -890,15 +959,10 @@ impl ConnectedNetwork, TYPES::Signatur debug!("Somehow task already existed!"); } - // Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`. - let view_minus_2 = view_number.saturating_sub(2); - let range = task_map.range(..view_minus_2); - for (view, task) in range { - // Cancel the old task by sending a message to it. If the task already exited we expect an error - let _res = task - .send(ConsensusIntentEvent::CancelPollForVotes(*view)) - .await; - } + // Cancel old, stale tasks + task_map + .prune_tasks(view_number, ConsensusIntentEvent::CancelPollForVotes) + .await; } ConsensusIntentEvent::PollForDAC(view_number) => { @@ -925,15 +989,10 @@ impl ConnectedNetwork, TYPES::Signatur debug!("Somehow task already existed!"); } - // Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`. - let view_minus_2 = view_number.saturating_sub(2); - let range = task_map.range(..view_minus_2); - for (view, task) in range { - // Cancel the old task by sending a message to it. If the task already exited we expect an error - let _res = task - .send(ConsensusIntentEvent::CancelPollForDAC(*view)) - .await; - } + // Cancel old, stale tasks + task_map + .prune_tasks(view_number, ConsensusIntentEvent::CancelPollForDAC) + .await; } ConsensusIntentEvent::CancelPollForVotes(view_number) => { @@ -977,17 +1036,13 @@ impl ConnectedNetwork, TYPES::Signatur debug!("Somehow task already existed!"); } - // Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`. - let view_minus_2 = view_number.saturating_sub(2); - let range = task_map.range(..view_minus_2); - for (view, task) in range { - // Cancel the old task by sending a message to it. If the task already exited we expect an error - let _res = task - .send(ConsensusIntentEvent::CancelPollForViewSyncCertificate( - *view, - )) - .await; - } + // Cancel old, stale tasks + task_map + .prune_tasks( + view_number, + ConsensusIntentEvent::CancelPollForViewSyncCertificate, + ) + .await; } ConsensusIntentEvent::PollForViewSyncVotes(view_number) => { let mut task_map = self.inner.view_sync_vote_task_map.write().await; @@ -1017,15 +1072,13 @@ impl ConnectedNetwork, TYPES::Signatur debug!("Somehow task already existed!"); } - // Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`. - let view_minus_2 = view_number.saturating_sub(2); - let range = task_map.range(..view_minus_2); - for (view, task) in range { - // Cancel the old task by sending a message to it. If the task already exited we expect an error - let _res = task - .send(ConsensusIntentEvent::CancelPollForViewSyncVotes(*view)) - .await; - } + // Cancel old, stale tasks + task_map + .prune_tasks( + view_number, + ConsensusIntentEvent::CancelPollForViewSyncVotes, + ) + .await; } ConsensusIntentEvent::CancelPollForViewSyncCertificate(view_number) => { @@ -1080,15 +1133,10 @@ impl ConnectedNetwork, TYPES::Signatur debug!("Somehow task already existed!"); } - // Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`. - let view_minus_2 = view_number.saturating_sub(2); - let range = task_map.range(..view_minus_2); - for (view, task) in range { - // Cancel the old task by sending a message to it. If the task already exited we expect an error - let _res = task - .send(ConsensusIntentEvent::CancelPollForTransactions(*view)) - .await; - } + // Cancel old, stale tasks + task_map + .prune_tasks(view_number, ConsensusIntentEvent::CancelPollForTransactions) + .await; } ConsensusIntentEvent::CancelPollForTransactions(view_number) => { let mut task_map = self.inner.txn_task_map.write().await; From 19d8cedd3e619f12fe4f128644274d61b4e3c136 Mon Sep 17 00:00:00 2001 From: Rob Date: Thu, 14 Dec 2023 13:14:14 -0500 Subject: [PATCH 2/3] remove debug prints --- .../src/traits/networking/web_server_network.rs | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/crates/hotshot/src/traits/networking/web_server_network.rs b/crates/hotshot/src/traits/networking/web_server_network.rs index 79107fb43c..61fb275bdf 100644 --- a/crates/hotshot/src/traits/networking/web_server_network.rs +++ b/crates/hotshot/src/traits/networking/web_server_network.rs @@ -157,7 +157,6 @@ impl TaskMap { current_view: u64, cancel_event_fn: fn(u64) -> ConsensusIntentEvent, ) { - println!("len {:?}: {}", cancel_event_fn(0), self.len()); let cutoff_view = current_view.saturating_sub(2); let views_to_remove: Vec<_> = self.range(..cutoff_view).map(|(key, _)| *key).collect(); @@ -837,17 +836,6 @@ impl ConnectedNetwork, TYPES::Signatur self.inner.is_da ); - println!( - "{},{},{},{},{},{},{}", - self.inner.proposal_task_map.read().await.len(), - self.inner.vote_task_map.read().await.len(), - self.inner.vid_disperse_task_map.read().await.len(), - self.inner.dac_task_map.read().await.len(), - self.inner.view_sync_cert_task_map.read().await.len(), - self.inner.view_sync_vote_task_map.read().await.len(), - self.inner.txn_task_map.read().await.len() - ); - // TODO ED Need to handle canceling tasks that don't receive their expected output (such a proposal that never comes) match event { ConsensusIntentEvent::PollForProposal(view_number) => { From bd192d028745b6b8ba2aafd3da61ce70344a528a Mon Sep 17 00:00:00 2001 From: Rob Date: Thu, 14 Dec 2023 14:32:14 -0500 Subject: [PATCH 3/3] merge current proposal changes --- crates/hotshot/src/traits/networking/web_server_network.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/hotshot/src/traits/networking/web_server_network.rs b/crates/hotshot/src/traits/networking/web_server_network.rs index b14d02bd4d..9a48b6fffc 100644 --- a/crates/hotshot/src/traits/networking/web_server_network.rs +++ b/crates/hotshot/src/traits/networking/web_server_network.rs @@ -207,7 +207,8 @@ struct Inner { /// Task map for transactions txn_task_map: Arc>>, /// Task polling for current propsal - current_proposal_task: Arc>>, + current_proposal_task: + Arc>>>>, } impl Inner {