Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Stability] Fix GC for tasks #2235

Merged
merged 6 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/hotshot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ surf-disco = { workspace = true }
time = { workspace = true }
local-ip-address = "0.5.6"
dyn-clone = { git = "https://github.com/dtolnay/dyn-clone", tag = "1.0.16" }
derive_more = "0.99.17"
portpicker = "0.1.1"

tracing = { workspace = true }
Expand Down
194 changes: 115 additions & 79 deletions crates/hotshot/src/traits/networking/web_server_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use async_compatibility_layer::{
};
use async_lock::RwLock;
use async_trait::async_trait;
use derive_more::{Deref, DerefMut};
use hotshot_task::{boxed_sync, BoxSyncFuture};
use hotshot_types::{
message::{Message, MessagePurpose},
Expand Down Expand Up @@ -94,6 +95,79 @@ impl<TYPES: NodeType> WebServerNetwork<TYPES> {
}
}

/// `TaskChannel` is a type alias for an unbounded sender channel that sends `ConsensusIntentEvent`s.
///
/// This channel is used to send events to a task. The `K` type parameter is the type of the key used in the `ConsensusIntentEvent`.
///
/// # Examples
///
/// ```
/// let (tx, _rx): (TaskChannel<MyKey>, _) = tokio::sync::mpsc::unbounded_channel();
/// ```
///
/// # Note
///
/// This type alias is used in the context of a `TaskMap`, where each task is represented by a `TaskChannel`.
type TaskChannel<K> = UnboundedSender<ConsensusIntentEvent<K>>;

/// `TaskMap` is a wrapper around a `BTreeMap` that maps view numbers to tasks.
///
/// Each task is represented by a `TaskChannel` that can be used to send events to the task.
/// The key `K` is a type that implements the `SignatureKey` trait.
///
/// # Examples
///
/// ```
/// use your_crate::TaskMap;
/// let mut map: TaskMap<MyKey> = TaskMap::default();
/// ```
///
/// # Note
///
/// This struct is `Clone`, `Deref`, and `DerefMut`, so it can be used just like a `BTreeMap`.
#[derive(Debug, Clone, Deref, DerefMut)]
struct TaskMap<K: SignatureKey>(BTreeMap<u64, TaskChannel<K>>);

impl<K: SignatureKey> Default for TaskMap<K> {
fn default() -> Self {
Self(BTreeMap::default())
}
}

impl<K: SignatureKey> TaskMap<K> {
/// Prunes tasks that are polling for a view less than or equal to `current_view - 2`.
///
/// This method cancels and removes all entries in the task map that are polling for a view less than or equal to `current_view - 2`.
/// The cancellation is performed by sending a `cancel_event` to the task.
///
/// # Arguments
///
/// * `current_view` - The current view number. Tasks polling for a view less than or equal to `current_view - 2` will be pruned.
/// * `cancel_event_fn` - A function that takes a view number and returns a `ConsensusIntentEvent` to be sent to the task for cancellation.
///
/// # Examples
///
/// ```
/// let mut map: TaskMap<MyKey> = TaskMap::default();
/// map.prune_tasks(10, ConsensusIntentEvent::CancelPollForProposal).await;
/// ```
async fn prune_tasks(
&mut self,
current_view: u64,
cancel_event_fn: fn(u64) -> ConsensusIntentEvent<K>,
) {
let cutoff_view = current_view.saturating_sub(2);
let views_to_remove: Vec<_> = self.range(..cutoff_view).map(|(key, _)| *key).collect();

for view in views_to_remove {
let task = self.remove(&view);
if let Some(task) = task {
let _ = task.send(cancel_event_fn(view)).await;
}
}
}
}

/// Represents the core of web server networking
#[derive(Debug)]
struct Inner<TYPES: NodeType> {
Expand All @@ -118,26 +192,19 @@ struct Inner<TYPES: NodeType> {
tx_index: Arc<RwLock<u64>>,

/// Task map for quorum proposals.
proposal_task_map:
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
proposal_task_map: Arc<RwLock<TaskMap<TYPES::SignatureKey>>>,
/// Task map for quorum votes.
vote_task_map:
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
vote_task_map: Arc<RwLock<TaskMap<TYPES::SignatureKey>>>,
/// Task map for VID disperse data
vid_disperse_task_map:
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
vid_disperse_task_map: Arc<RwLock<TaskMap<TYPES::SignatureKey>>>,
/// Task map for DACs.
dac_task_map:
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
dac_task_map: Arc<RwLock<TaskMap<TYPES::SignatureKey>>>,
/// Task map for view sync certificates.
view_sync_cert_task_map:
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
view_sync_cert_task_map: Arc<RwLock<TaskMap<TYPES::SignatureKey>>>,
/// Task map for view sync votes.
view_sync_vote_task_map:
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
view_sync_vote_task_map: Arc<RwLock<TaskMap<TYPES::SignatureKey>>>,
/// Task map for transactions
txn_task_map:
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
txn_task_map: Arc<RwLock<TaskMap<TYPES::SignatureKey>>>,
/// Task polling for current propsal
current_proposal_task:
Arc<RwLock<Option<UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
Expand Down Expand Up @@ -802,15 +869,10 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForProposal(*view))
.await;
}
// Cancel old, stale tasks
task_map
.prune_tasks(view_number, ConsensusIntentEvent::CancelPollForProposal)
.await;
}
ConsensusIntentEvent::PollForVIDDisperse(view_number) => {
// Check if we already have a task for this (we shouldn't)
Expand Down Expand Up @@ -840,15 +902,10 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForVIDDisperse(*view))
.await;
}
// Cancel old, stale tasks
task_map
.prune_tasks(view_number, ConsensusIntentEvent::CancelPollForVIDDisperse)
.await;
}
ConsensusIntentEvent::PollForCurrentProposal => {
let mut proposal_task = self.inner.current_proposal_task.write().await;
Expand Down Expand Up @@ -899,15 +956,10 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForVotes(*view))
.await;
}
// Cancel old, stale tasks
task_map
.prune_tasks(view_number, ConsensusIntentEvent::CancelPollForVotes)
.await;
}

ConsensusIntentEvent::PollForDAC(view_number) => {
Expand All @@ -934,15 +986,10 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForDAC(*view))
.await;
}
// Cancel old, stale tasks
task_map
.prune_tasks(view_number, ConsensusIntentEvent::CancelPollForDAC)
.await;
}

ConsensusIntentEvent::CancelPollForVotes(view_number) => {
Expand Down Expand Up @@ -986,17 +1033,13 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForViewSyncCertificate(
*view,
))
.await;
}
// Cancel old, stale tasks
task_map
.prune_tasks(
view_number,
ConsensusIntentEvent::CancelPollForViewSyncCertificate,
)
.await;
}
ConsensusIntentEvent::PollForViewSyncVotes(view_number) => {
let mut task_map = self.inner.view_sync_vote_task_map.write().await;
Expand Down Expand Up @@ -1026,15 +1069,13 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForViewSyncVotes(*view))
.await;
}
// Cancel old, stale tasks
task_map
.prune_tasks(
view_number,
ConsensusIntentEvent::CancelPollForViewSyncVotes,
)
.await;
}

ConsensusIntentEvent::CancelPollForViewSyncCertificate(view_number) => {
Expand Down Expand Up @@ -1089,15 +1130,10 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForTransactions(*view))
.await;
}
// Cancel old, stale tasks
task_map
.prune_tasks(view_number, ConsensusIntentEvent::CancelPollForTransactions)
.await;
}
ConsensusIntentEvent::CancelPollForTransactions(view_number) => {
let mut task_map = self.inner.txn_task_map.write().await;
Expand Down