Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel Timeout Tasks and Fail tests if too many Timeouts #2204

Merged
merged 16 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/hotshot/src/tasks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Provides a number of tasks that run continuously on a [`HotShot`]

use crate::{async_spawn, types::SystemContextHandle, HotShotConsensusApi};
use crate::{types::SystemContextHandle, HotShotConsensusApi};
use async_compatibility_layer::art::async_sleep;
use futures::FutureExt;
use hotshot_task::{
Expand Down Expand Up @@ -227,7 +227,7 @@ pub async fn add_consensus_task<TYPES: NodeType, I: NodeImplementation<TYPES>>(
_pd: PhantomData,
vote_collector: None,
timeout_vote_collector: None,
timeout_task: async_spawn(async move {}),
timeout_task: None,
event_stream: event_stream.clone(),
output_event_stream: output_stream,
da_certs: HashMap::new(),
Expand Down
17 changes: 14 additions & 3 deletions crates/task-impls/src/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
events::HotShotEvent,
helpers::cancel_task,
vote::{spawn_vote_accumulator, AccumulatorInfo},
};
use async_compatibility_layer::art::{async_sleep, async_spawn};
Expand Down Expand Up @@ -105,7 +106,7 @@ pub struct ConsensusTaskState<
pub timeout_vote_collector: Option<(TYPES::Time, usize, usize)>,

/// timeout task handle
pub timeout_task: JoinHandle<()>,
pub timeout_task: Option<JoinHandle<()>>,

/// Global events stream to publish events
pub event_stream: ChannelStream<HotShotEvent<TYPES>>,
Expand Down Expand Up @@ -342,6 +343,10 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
"Updating view from {} to {} in consensus task",
*self.cur_view, *new_view
);
// cancel the old timeout task
if let Some(timeout_task) = self.timeout_task.take() {
cancel_task(timeout_task).await;
}

// Remove old certs, we won't vote on past views
for view in *self.cur_view..*new_view - 1 {
Expand Down Expand Up @@ -383,7 +388,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +

// Spawn a timeout task if we did actually update view
let timeout = self.timeout;
self.timeout_task = async_spawn({
self.timeout_task = Some(async_spawn({
let stream = self.event_stream.clone();
// Nuance: We timeout on the view + 1 here because that means that we have
// not seen evidence to transition to this new view
Expand All @@ -394,7 +399,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
.publish(HotShotEvent::Timeout(TYPES::Time::new(*view_number)))
.await;
}
});
}));
let consensus = self.consensus.read().await;
consensus
.metrics
Expand Down Expand Up @@ -1023,6 +1028,12 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
"We did not receive evidence for view {} in time, sending timeout vote for that view!",
*view
);
self.output_event_stream
.publish(Event {
view_number: view,
event: EventType::ReplicaViewTimeout { view_number: view },
})
.await;
let consensus = self.consensus.read().await;
consensus.metrics.number_of_timeouts.add(1);
}
Expand Down
12 changes: 12 additions & 0 deletions crates/task-impls/src/helpers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#[cfg(async_executor_impl = "async-std")]
use async_std::task::JoinHandle;
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;

/// Cancel a task
pub async fn cancel_task<T>(task: JoinHandle<T>) {
#[cfg(async_executor_impl = "async-std")]
task.cancel().await;
#[cfg(async_executor_impl = "tokio")]
task.abort();
}
3 changes: 3 additions & 0 deletions crates/task-impls/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ pub mod vid;

/// Generic task for collecting votes
pub mod vote;

/// Helper functions used by any task
mod helpers;
39 changes: 31 additions & 8 deletions crates/task-impls/src/view_sync.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![allow(clippy::module_name_repetitions)]
use crate::{
events::HotShotEvent,
helpers::cancel_task,
vote::{spawn_vote_accumulator, AccumulatorInfo},
};
use async_compatibility_layer::art::{async_sleep, async_spawn};
Expand All @@ -21,6 +22,8 @@ use hotshot_types::{
vote::{Certificate, HasViewNumber, Vote, VoteAccumulator},
};

#[cfg(async_executor_impl = "async-std")]
use async_std::task::JoinHandle;
use hotshot_task::global_registry::GlobalRegistry;
use hotshot_types::{
message::GeneralConsensusMessage,
Expand All @@ -34,6 +37,8 @@ use hotshot_types::{
};
use snafu::Snafu;
use std::{collections::HashMap, sync::Arc, time::Duration};
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;
use tracing::{debug, error, info, instrument, warn};
#[derive(PartialEq, PartialOrd, Clone, Debug, Eq, Hash)]
/// Phases of view sync
Expand Down Expand Up @@ -138,6 +143,8 @@ pub struct ViewSyncReplicaTaskState<
pub finalized: bool,
/// Whether we have already sent a view change event for `next_view`
pub sent_view_change_event: bool,
/// Timeout task handle, when it expires we try the next relay
pub timeout_task: Option<JoinHandle<()>>,
/// Our node id; for logging
pub id: u64,

Expand Down Expand Up @@ -254,6 +261,7 @@ impl<
finalized: false,
sent_view_change_event: false,
phase: ViewSyncPhase::None,
timeout_task: None,
membership: self.membership.clone(),
network: self.network.clone(),
public_key: self.public_key.clone(),
Expand Down Expand Up @@ -527,6 +535,7 @@ impl<
finalized: false,
sent_view_change_event: false,
phase: ViewSyncPhase::None,
timeout_task: None,
membership: self.membership.clone(),
network: self.network.clone(),
public_key: self.public_key.clone(),
Expand Down Expand Up @@ -677,7 +686,11 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
.await;
}

async_spawn({
if let Some(timeout_task) = self.timeout_task.take() {
cancel_task(timeout_task).await;
}

self.timeout_task = Some(async_spawn({
let stream = self.event_stream.clone();
let phase = self.phase.clone();
async move {
Expand All @@ -691,7 +704,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
))
.await;
}
});
}));
}

HotShotEvent::ViewSyncCommitCertificate2Recv(certificate) => {
Expand Down Expand Up @@ -754,7 +767,10 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
.publish(HotShotEvent::ViewChange(self.next_view))
.await;

async_spawn({
if let Some(timeout_task) = self.timeout_task.take() {
cancel_task(timeout_task).await;
}
self.timeout_task = Some(async_spawn({
let stream = self.event_stream.clone();
let phase = self.phase.clone();
async move {
Expand All @@ -768,7 +784,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
))
.await;
}
});
}));
}

HotShotEvent::ViewSyncFinalizeCertificate2Recv(certificate) => {
Expand Down Expand Up @@ -819,6 +835,10 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
self.relay = certificate.get_data().relay;
}

if let Some(timeout_task) = self.timeout_task.take() {
cancel_task(timeout_task).await;
}

self.event_stream
.publish(HotShotEvent::ViewChange(self.next_view))
.await;
Expand Down Expand Up @@ -848,7 +868,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
.await;
}

async_spawn({
self.timeout_task = Some(async_spawn({
let stream = self.event_stream.clone();
async move {
async_sleep(self.view_sync_timeout).await;
Expand All @@ -861,7 +881,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
))
.await;
}
});
}));

return (None, self);
}
Expand All @@ -872,6 +892,9 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
&& relay == self.relay
&& last_seen_certificate == self.phase
{
if let Some(timeout_task) = self.timeout_task.take() {
cancel_task(timeout_task).await;
}
// Keep tyring to get a more recent proposal to catch up to
self.network
.inject_consensus_info(ConsensusIntentEvent::PollForCurrentProposal)
Expand Down Expand Up @@ -941,7 +964,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
}
}

async_spawn({
self.timeout_task = Some(async_spawn({
let stream = self.event_stream.clone();
async move {
async_sleep(self.view_sync_timeout).await;
Expand All @@ -954,7 +977,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
))
.await;
}
});
}));

return (None, self);
}
Expand Down
Loading
Loading