Skip to content

Commit

Permalink
[Stability] Garbage collect polls for all views < latest-2 (#2226)
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron authored Dec 14, 2023
1 parent 5554b70 commit 96d27f7
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 50 deletions.
119 changes: 69 additions & 50 deletions crates/hotshot/src/traits/networking/web_server_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ use serde::{Deserialize, Serialize};
use surf_disco::Url;

use hotshot_types::traits::network::ViewMessage;
use std::collections::BTreeMap;
use std::{
collections::{hash_map::Entry, BTreeSet, HashMap},
collections::{btree_map::Entry, BTreeSet},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
Expand Down Expand Up @@ -119,25 +120,25 @@ struct Inner<TYPES: NodeType> {

/// Task map for quorum proposals.
proposal_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for quorum votes.
vote_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for VID disperse data
vid_disperse_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for DACs.
dac_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for view sync certificates.
view_sync_cert_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for view sync votes.
view_sync_vote_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for transactions
txn_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
}

impl<TYPES: NodeType> Inner<TYPES> {
Expand Down Expand Up @@ -330,6 +331,7 @@ impl<TYPES: NodeType> Inner<TYPES> {
ConsensusIntentEvent::CancelPollForVotes(event_view)
| ConsensusIntentEvent::CancelPollForProposal(event_view)
| ConsensusIntentEvent::CancelPollForDAC(event_view)
| ConsensusIntentEvent::CancelPollForViewSyncCertificate(event_view)
| ConsensusIntentEvent::CancelPollForVIDDisperse(event_view)
| ConsensusIntentEvent::CancelPollForViewSyncVotes(event_view) => {
if view_number == event_view {
Expand Down Expand Up @@ -768,8 +770,6 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
);

// TODO ED Need to handle canceling tasks that don't receive their expected output (such a proposal that never comes)
// TODO ED Need to GC all old views, not just singular views, could lead to a network leak

match event {
ConsensusIntentEvent::PollForProposal(view_number) => {
// Check if we already have a task for this (we shouldn't)
Expand Down Expand Up @@ -799,15 +799,13 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// GC proposal collection if we are two views in the future
if let Some((_, sender)) = task_map.remove_entry(&view_number.wrapping_sub(2)) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForProposal(
view_number.wrapping_sub(2),
))
// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForProposal(*view))
.await;
}
}
Expand Down Expand Up @@ -839,15 +837,13 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// GC proposal collection if we are two views in the future
if let Some((_, sender)) = task_map.remove_entry(&view_number.wrapping_sub(2)) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForVIDDisperse(
view_number.wrapping_sub(2),
))
// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForVIDDisperse(*view))
.await;
}
}
Expand Down Expand Up @@ -894,16 +890,13 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// GC proposal collection if we are two views in the future
// TODO ED This won't work for vote collection, last task is more than 2 view ago depending on size of network, will need to rely on cancel task from consensus
if let Some((_, sender)) = task_map.remove_entry(&(view_number.wrapping_sub(2))) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForVotes(
view_number.wrapping_sub(2),
))
// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForVotes(*view))
.await;
}
}
Expand Down Expand Up @@ -932,15 +925,13 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// GC proposal collection if we are two views in the future
if let Some((_, sender)) = task_map.remove_entry(&(view_number.wrapping_sub(2))) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForDAC(
view_number.wrapping_sub(2),
))
// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForDAC(*view))
.await;
}
}
Expand Down Expand Up @@ -986,7 +977,17 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// TODO ED Do we need to GC before returning? Or will view sync task handle that?
// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForViewSyncCertificate(
*view,
))
.await;
}
}
ConsensusIntentEvent::PollForViewSyncVotes(view_number) => {
let mut task_map = self.inner.view_sync_vote_task_map.write().await;
Expand Down Expand Up @@ -1015,6 +1016,16 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
} else {
debug!("Somehow task already existed!");
}

// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForViewSyncVotes(*view))
.await;
}
}

ConsensusIntentEvent::CancelPollForViewSyncCertificate(view_number) => {
Expand Down Expand Up @@ -1047,7 +1058,7 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
}
ConsensusIntentEvent::PollForTransactions(view_number) => {
let mut task_map = self.inner.txn_task_map.write().await;
if let std::collections::hash_map::Entry::Vacant(e) = task_map.entry(view_number) {
if let Entry::Vacant(e) = task_map.entry(view_number) {
// create new task
let (sender, receiver) = unbounded();
e.insert(sender);
Expand All @@ -1069,7 +1080,15 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// TODO ED Do we need to GC before returning? Or will view sync task handle that?
// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForTransactions(*view))
.await;
}
}
ConsensusIntentEvent::CancelPollForTransactions(view_number) => {
let mut task_map = self.inner.txn_task_map.write().await;
Expand Down
5 changes: 5 additions & 0 deletions crates/task-impls/src/view_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,11 @@ impl<
subscribe_view,
))
.await;
// Also subscribe to the latest view for the same reason. The GC will remove the above poll
// in the case that one doesn't resolve but this one does.
self.network
.inject_consensus_info(ConsensusIntentEvent::PollForCurrentProposal)
.await;

self.network
.inject_consensus_info(ConsensusIntentEvent::PollForDAC(subscribe_view))
Expand Down

0 comments on commit 96d27f7

Please sign in to comment.