From 1c2621dd16d732804e9242a53e0ff375773bf621 Mon Sep 17 00:00:00 2001 From: Manuel Amador Date: Fri, 25 Oct 2024 18:18:13 +0200 Subject: [PATCH 1/5] Various improvements to the dashboard: * Make the schedule cache structures robust against various corner cases. * Make the code processing schedule caches much cleaner. * Mark the cache API endpoint as unstable. * Add help link to the bottom of the dashboard. * Minor documentation changes. --- rollout-dashboard/README.md | 4 +- rollout-dashboard/frontend/src/App.svelte | 4 + rollout-dashboard/server/src/frontend_api.rs | 194 +++++++++++++------ rollout-dashboard/server/src/main.rs | 12 +- 4 files changed, 143 insertions(+), 71 deletions(-) diff --git a/rollout-dashboard/README.md b/rollout-dashboard/README.md index 3fc2ae7..386743f 100644 --- a/rollout-dashboard/README.md +++ b/rollout-dashboard/README.md @@ -39,7 +39,7 @@ set to the correct value (though sometimes the defaults are OK): 6. `REFRESH_INTERVAL` optionally set to a nonzero positive integer as the number of seconds to wait between queries to Airflow. -## Development +## Developer information -* [How to develop the dashboard further](doc/dev.md) * [How to use the dashboard API as a developer](doc/api.md) +* [How to develop the dashboard further](doc/dev.md) diff --git a/rollout-dashboard/frontend/src/App.svelte b/rollout-dashboard/frontend/src/App.svelte index 9d6d056..9b736fd 100644 --- a/rollout-dashboard/frontend/src/App.svelte +++ b/rollout-dashboard/frontend/src/App.svelte @@ -105,6 +105,10 @@ href="https://github.com/dfinity/dre-airflow/tree/main/rollout-dashboard" target="_blank">Documentation + Help diff --git a/rollout-dashboard/server/src/frontend_api.rs b/rollout-dashboard/server/src/frontend_api.rs index 4ed710c..67779b4 100644 --- a/rollout-dashboard/server/src/frontend_api.rs +++ b/rollout-dashboard/server/src/frontend_api.rs @@ -190,12 +190,11 @@ impl Display for RolloutPlanParseError { } impl RolloutPlan { - fn from_python_string(value: String) -> Result { + fn from_python_string(value: &str) -> Result { let mut res = RolloutPlan { batches: IndexMap::new(), }; - let python_string_plan: PythonFormattedRolloutPlan = match python::from_str(value.as_str()) - { + let python_string_plan: PythonFormattedRolloutPlan = match python::from_str(value) { Ok(s) => s, Err(e) => return Err(RolloutPlanParseError::UndecipherablePython(e)), }; @@ -268,19 +267,91 @@ impl From for RolloutDataGatherError { } #[derive(Clone)] -enum ScheduleCache { +enum ScheduleCacheKind { Missing, Invalid { - try_number: usize, - latest_date: DateTime, + cached_schedule: String, }, Valid { + cached_schedule: IndexMap, + }, +} +#[derive(Clone)] +enum ScheduleCache { + Unretrieved, + ForTask { try_number: usize, latest_date: DateTime, - cached_schedule: String, + kind: ScheduleCacheKind, }, } +enum ScheduleCacheValidity { + UpToDate(IndexMap), + Stale, + Invalid, +} + +impl ScheduleCache { + fn up_to_date(&self, try_number: usize, latest_date: DateTime) -> ScheduleCacheValidity { + match self { + ScheduleCache::Unretrieved => ScheduleCacheValidity::Stale, + ScheduleCache::ForTask { + try_number: t, + latest_date: l, + kind, + } => { + if *t == try_number && *l == latest_date { + match &kind { + ScheduleCacheKind::Valid { cached_schedule } => { + ScheduleCacheValidity::UpToDate(cached_schedule.clone()) + } + ScheduleCacheKind::Missing | ScheduleCacheKind::Invalid { .. } => { + ScheduleCacheValidity::Invalid + } + } + } else { + // Same schedule task has been updated. Data may not be missing anymore. + ScheduleCacheValidity::Stale + } + } + } + } + + fn save( + &mut self, + try_number: usize, + latest_date: DateTime, + batches: &IndexMap, + ) { + *self = Self::ForTask { + try_number, + latest_date, + kind: ScheduleCacheKind::Valid { + cached_schedule: batches.clone(), + }, + } + } + + fn invalidate( + &mut self, + try_number: usize, + latest_date: DateTime, + schedule: Option, + ) { + *self = Self::ForTask { + try_number, + latest_date, + kind: match schedule { + None => ScheduleCacheKind::Missing, + Some(schedule) => ScheduleCacheKind::Invalid { + cached_schedule: schedule, + }, + }, + } + } +} + #[derive(Debug, Clone)] /// DAG run update type. enum DagRunUpdateType { @@ -503,11 +574,16 @@ where S: Serializer, { match cache { - ScheduleCache::Invalid { .. } => serializer.serialize_str("invalid"), - ScheduleCache::Missing { .. } => serializer.serialize_str("missing"), - ScheduleCache::Valid { - cached_schedule, .. - } => serializer.serialize_str(cached_schedule), + ScheduleCache::Unretrieved { .. } => serializer.serialize_str("not retrieved yet"), + ScheduleCache::ForTask { kind, .. } => match kind { + ScheduleCacheKind::Missing { .. } => serializer.serialize_str("missing"), + ScheduleCacheKind::Invalid { + cached_schedule, .. + } => serializer.serialize_str(format!("{}", cached_schedule).as_str()), + ScheduleCacheKind::Valid { + cached_schedule, .. + } => serializer.serialize_str(format!("{:?}", cached_schedule).as_str()), + }, } } @@ -614,7 +690,7 @@ impl<'a> RolloutUpdater<'a> { // We must evict the task cache. if cache_entry.dispatch_time != dag_run.logical_date { cache_entry.dispatch_time = dag_run.logical_date; - cache_entry.schedule = ScheduleCache::Missing; + cache_entry.schedule = ScheduleCache::Unretrieved; cache_entry.task_instances = HashMap::new(); cache_entry.update_count += 1; } @@ -848,41 +924,19 @@ impl<'a> RolloutUpdater<'a> { | Some(TaskInstanceState::Scheduled) | None => rollout.state = min(rollout.state, RolloutState::Preparing), Some(TaskInstanceState::Success) => { - if let ScheduleCache::Valid { - try_number, - latest_date, - .. - } = &cache_entry.schedule - { - if *try_number != task_instance.try_number - || *latest_date != task_instance.latest_date() - { - info!(target: "frontend_api::get_rollout_data", "{}: resetting schedule cache due to changes to the schedule task", dag_run.dag_run_id); - // Another task run of the same task has executed. We must clear the cache entry. - cache_entry.schedule = ScheduleCache::Missing; - } - } - if let ScheduleCache::Invalid { - try_number, - latest_date, - .. - } = &cache_entry.schedule + rollout.batches = match cache_entry + .schedule + .up_to_date(task_instance.try_number, task_instance.latest_date()) { - if *try_number != task_instance.try_number - || *latest_date != task_instance.latest_date() - { - // Same schedule task has been updated. - info!(target: "frontend_api::get_rollout_data", "{}: requerying schedule cache due to forward progress of the schedule task", dag_run.dag_run_id); - cache_entry.schedule = ScheduleCache::Missing; + ScheduleCacheValidity::UpToDate(cache) => cache, + ScheduleCacheValidity::Invalid => { + // Nothing has changed. Stop processing this task. + continue; } - } - let schedule_string = match &cache_entry.schedule { - ScheduleCache::Valid { - cached_schedule, .. - } => cached_schedule, - ScheduleCache::Invalid { .. } => continue, - ScheduleCache::Missing => { - let value = airflow_api + ScheduleCacheValidity::Stale => { + // Same schedule task has been updated. Data may not be missing anymore. + info!(target: "frontend_api::get_rollout_data", "{}: schedule task is outdated; requerying", dag_run.dag_run_id); + match airflow_api .xcom_entry( dag_id, dag_run.dag_run_id.as_str(), @@ -890,16 +944,29 @@ impl<'a> RolloutUpdater<'a> { task_instance.map_index, "return_value", ) - .await; - let schedule = match value { + .await + { Ok(schedule) => { - cache_entry.schedule = ScheduleCache::Valid { - try_number: task_instance.try_number, - latest_date: task_instance.latest_date(), - cached_schedule: schedule.value.clone(), - }; - info!(target: "frontend_api::get_rollout_data", "{}: saving schedule cache", dag_run.dag_run_id); - schedule.value + match &RolloutPlan::from_python_string(&schedule.value) { + Ok(schedule) => { + info!(target: "frontend_api::get_rollout_data", "{}: saving schedule cache", dag_run.dag_run_id); + cache_entry.schedule.save( + task_instance.try_number, + task_instance.latest_date(), + &schedule.batches, + ); + schedule.batches.clone() + } + Err(e) => { + warn!(target: "frontend_api::get_rollout_data", "{}: could not parse schedule data: {}", dag_run.dag_run_id, e); + cache_entry.schedule.invalidate( + task_instance.try_number, + task_instance.latest_date(), + Some(schedule.value), + ); + continue; + } + } } Err(AirflowError::StatusCode( reqwest::StatusCode::NOT_FOUND, @@ -908,21 +975,20 @@ impl<'a> RolloutUpdater<'a> { // Or there was no schedule to be found last time // it was queried. warn!(target: "frontend_api::get_rollout_data", "{}: no schedule despite schedule task finished", dag_run.dag_run_id); - cache_entry.schedule = ScheduleCache::Invalid { - try_number: task_instance.try_number, - latest_date: task_instance.latest_date(), - }; + cache_entry.schedule.invalidate( + task_instance.try_number, + task_instance.latest_date(), + None, + ); continue; } Err(e) => { + // In this case the dashboard will try to requery in the future again. return Err(RolloutDataGatherError::AirflowError(e)); } - }; - &schedule.clone() + } } }; - let schedule = RolloutPlan::from_python_string(schedule_string.clone())?; - rollout.batches = schedule.batches; } } } else if task_instance.task_id == "wait_for_other_rollouts" @@ -1276,7 +1342,7 @@ impl RolloutApi { task_instances: HashMap::new(), dispatch_time: dag_run.logical_date, note: dag_run.note.clone(), - schedule: ScheduleCache::Missing, + schedule: ScheduleCache::Unretrieved, last_update_time: None, update_count: 0, }), diff --git a/rollout-dashboard/server/src/main.rs b/rollout-dashboard/server/src/main.rs index 77b3fd3..eee5167 100644 --- a/rollout-dashboard/server/src/main.rs +++ b/rollout-dashboard/server/src/main.rs @@ -353,11 +353,13 @@ async fn main() -> ExitCode { .produce_rollouts_sse_stream(options.incremental.unwrap_or_default()) } }; - let mut tree = Router::new(); - tree = tree - .route("/api/v1/rollouts", get(rollouts_handler)) - .route("/api/v1/cache", get(cached_data_handler)) - .route("/api/v1/rollouts/sse", get(rollouts_sse_handler)); + let stable_api = Router::new() + .route("/rollouts", get(rollouts_handler)) + .route("/rollouts/sse", get(rollouts_sse_handler)); + let unstable_api = Router::new().route("/cache", get(cached_data_handler)); + let mut tree = Router::new() + .nest("/api/v1", stable_api) + .nest("/api/unstable", unstable_api); tree = tree.nest_service("/", ServeDir::new(frontend_static_dir)); let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); From ab18ce8bea6f1b434e5973aed46362884713fbcc Mon Sep 17 00:00:00 2001 From: Manuel Amador Date: Fri, 25 Oct 2024 18:21:22 +0200 Subject: [PATCH 2/5] Few nits. --- rollout-dashboard/server/src/frontend_api.rs | 37 ++++++++++---------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/rollout-dashboard/server/src/frontend_api.rs b/rollout-dashboard/server/src/frontend_api.rs index 67779b4..71e0842 100644 --- a/rollout-dashboard/server/src/frontend_api.rs +++ b/rollout-dashboard/server/src/frontend_api.rs @@ -267,7 +267,7 @@ impl From for RolloutDataGatherError { } #[derive(Clone)] -enum ScheduleCacheKind { +enum ScheduleCacheState { Missing, Invalid { cached_schedule: String, @@ -276,22 +276,23 @@ enum ScheduleCacheKind { cached_schedule: IndexMap, }, } + +enum ScheduleCacheValidity { + UpToDate(IndexMap), + Stale, + Invalid, +} + #[derive(Clone)] enum ScheduleCache { Unretrieved, ForTask { try_number: usize, latest_date: DateTime, - kind: ScheduleCacheKind, + kind: ScheduleCacheState, }, } -enum ScheduleCacheValidity { - UpToDate(IndexMap), - Stale, - Invalid, -} - impl ScheduleCache { fn up_to_date(&self, try_number: usize, latest_date: DateTime) -> ScheduleCacheValidity { match self { @@ -303,10 +304,10 @@ impl ScheduleCache { } => { if *t == try_number && *l == latest_date { match &kind { - ScheduleCacheKind::Valid { cached_schedule } => { + ScheduleCacheState::Valid { cached_schedule } => { ScheduleCacheValidity::UpToDate(cached_schedule.clone()) } - ScheduleCacheKind::Missing | ScheduleCacheKind::Invalid { .. } => { + ScheduleCacheState::Missing | ScheduleCacheState::Invalid { .. } => { ScheduleCacheValidity::Invalid } } @@ -318,7 +319,7 @@ impl ScheduleCache { } } - fn save( + fn update( &mut self, try_number: usize, latest_date: DateTime, @@ -327,7 +328,7 @@ impl ScheduleCache { *self = Self::ForTask { try_number, latest_date, - kind: ScheduleCacheKind::Valid { + kind: ScheduleCacheState::Valid { cached_schedule: batches.clone(), }, } @@ -343,8 +344,8 @@ impl ScheduleCache { try_number, latest_date, kind: match schedule { - None => ScheduleCacheKind::Missing, - Some(schedule) => ScheduleCacheKind::Invalid { + None => ScheduleCacheState::Missing, + Some(schedule) => ScheduleCacheState::Invalid { cached_schedule: schedule, }, }, @@ -576,11 +577,11 @@ where match cache { ScheduleCache::Unretrieved { .. } => serializer.serialize_str("not retrieved yet"), ScheduleCache::ForTask { kind, .. } => match kind { - ScheduleCacheKind::Missing { .. } => serializer.serialize_str("missing"), - ScheduleCacheKind::Invalid { + ScheduleCacheState::Missing { .. } => serializer.serialize_str("missing"), + ScheduleCacheState::Invalid { cached_schedule, .. } => serializer.serialize_str(format!("{}", cached_schedule).as_str()), - ScheduleCacheKind::Valid { + ScheduleCacheState::Valid { cached_schedule, .. } => serializer.serialize_str(format!("{:?}", cached_schedule).as_str()), }, @@ -950,7 +951,7 @@ impl<'a> RolloutUpdater<'a> { match &RolloutPlan::from_python_string(&schedule.value) { Ok(schedule) => { info!(target: "frontend_api::get_rollout_data", "{}: saving schedule cache", dag_run.dag_run_id); - cache_entry.schedule.save( + cache_entry.schedule.update( task_instance.try_number, task_instance.latest_date(), &schedule.batches, From b73bc47cc9f0da2d914d6cfcff9e8672711903b9 Mon Sep 17 00:00:00 2001 From: Manuel Amador Date: Fri, 25 Oct 2024 18:24:20 +0200 Subject: [PATCH 3/5] Rustify moar. --- rollout-dashboard/server/src/frontend_api.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rollout-dashboard/server/src/frontend_api.rs b/rollout-dashboard/server/src/frontend_api.rs index 71e0842..4b9823b 100644 --- a/rollout-dashboard/server/src/frontend_api.rs +++ b/rollout-dashboard/server/src/frontend_api.rs @@ -283,7 +283,7 @@ enum ScheduleCacheValidity { Invalid, } -#[derive(Clone)] +#[derive(Clone, Default)] enum ScheduleCache { Unretrieved, ForTask { @@ -691,7 +691,7 @@ impl<'a> RolloutUpdater<'a> { // We must evict the task cache. if cache_entry.dispatch_time != dag_run.logical_date { cache_entry.dispatch_time = dag_run.logical_date; - cache_entry.schedule = ScheduleCache::Unretrieved; + cache_entry.schedule = ScheduleCache::default(); cache_entry.task_instances = HashMap::new(); cache_entry.update_count += 1; } @@ -1343,7 +1343,7 @@ impl RolloutApi { task_instances: HashMap::new(), dispatch_time: dag_run.logical_date, note: dag_run.note.clone(), - schedule: ScheduleCache::Unretrieved, + schedule: ScheduleCache::default(), last_update_time: None, update_count: 0, }), From 154768998bbe7ea5218fa9d5c18fa10ba167d2fe Mon Sep 17 00:00:00 2001 From: Manuel Amador Date: Fri, 25 Oct 2024 18:25:58 +0200 Subject: [PATCH 4/5] Other improvements. --- rollout-dashboard/server/src/frontend_api.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/rollout-dashboard/server/src/frontend_api.rs b/rollout-dashboard/server/src/frontend_api.rs index 4b9823b..bb96c4b 100644 --- a/rollout-dashboard/server/src/frontend_api.rs +++ b/rollout-dashboard/server/src/frontend_api.rs @@ -285,6 +285,7 @@ enum ScheduleCacheValidity { #[derive(Clone, Default)] enum ScheduleCache { + #[default] Unretrieved, ForTask { try_number: usize, @@ -294,6 +295,10 @@ enum ScheduleCache { } impl ScheduleCache { + /// Retrieve whether the cache is updated based on the cache keys passed, + /// and if up to date, return the cache contents. Return Stale if the + /// cache needs updating, and Invalid if the cache is up to date but + /// the contents are not valid for use. fn up_to_date(&self, try_number: usize, latest_date: DateTime) -> ScheduleCacheValidity { match self { ScheduleCache::Unretrieved => ScheduleCacheValidity::Stale, @@ -319,6 +324,7 @@ impl ScheduleCache { } } + /// Update the cache entry. fn update( &mut self, try_number: usize, @@ -334,6 +340,7 @@ impl ScheduleCache { } } + /// Update the cache entry with (possibly) an invalid value. fn invalidate( &mut self, try_number: usize, From 39b3a4032533591ee66b45f0bc72f57bbb463911 Mon Sep 17 00:00:00 2001 From: Manuel Amador Date: Fri, 25 Oct 2024 18:35:13 +0200 Subject: [PATCH 5/5] Commit to using the task instance as the cache key. --- rollout-dashboard/server/src/frontend_api.rs | 44 +++++++------------- 1 file changed, 14 insertions(+), 30 deletions(-) diff --git a/rollout-dashboard/server/src/frontend_api.rs b/rollout-dashboard/server/src/frontend_api.rs index bb96c4b..847d2a2 100644 --- a/rollout-dashboard/server/src/frontend_api.rs +++ b/rollout-dashboard/server/src/frontend_api.rs @@ -299,7 +299,7 @@ impl ScheduleCache { /// and if up to date, return the cache contents. Return Stale if the /// cache needs updating, and Invalid if the cache is up to date but /// the contents are not valid for use. - fn up_to_date(&self, try_number: usize, latest_date: DateTime) -> ScheduleCacheValidity { + fn up_to_date(&self, task_instance: &TaskInstancesResponseItem) -> ScheduleCacheValidity { match self { ScheduleCache::Unretrieved => ScheduleCacheValidity::Stale, ScheduleCache::ForTask { @@ -307,7 +307,7 @@ impl ScheduleCache { latest_date: l, kind, } => { - if *t == try_number && *l == latest_date { + if *t == task_instance.try_number && *l == task_instance.latest_date() { match &kind { ScheduleCacheState::Valid { cached_schedule } => { ScheduleCacheValidity::UpToDate(cached_schedule.clone()) @@ -327,13 +327,12 @@ impl ScheduleCache { /// Update the cache entry. fn update( &mut self, - try_number: usize, - latest_date: DateTime, + task_instance: &TaskInstancesResponseItem, batches: &IndexMap, ) { *self = Self::ForTask { - try_number, - latest_date, + try_number: task_instance.try_number, + latest_date: task_instance.latest_date(), kind: ScheduleCacheState::Valid { cached_schedule: batches.clone(), }, @@ -341,15 +340,10 @@ impl ScheduleCache { } /// Update the cache entry with (possibly) an invalid value. - fn invalidate( - &mut self, - try_number: usize, - latest_date: DateTime, - schedule: Option, - ) { + fn invalidate(&mut self, task_instance: &TaskInstancesResponseItem, schedule: Option) { *self = Self::ForTask { - try_number, - latest_date, + try_number: task_instance.try_number, + latest_date: task_instance.latest_date(), kind: match schedule { None => ScheduleCacheState::Missing, Some(schedule) => ScheduleCacheState::Invalid { @@ -932,10 +926,7 @@ impl<'a> RolloutUpdater<'a> { | Some(TaskInstanceState::Scheduled) | None => rollout.state = min(rollout.state, RolloutState::Preparing), Some(TaskInstanceState::Success) => { - rollout.batches = match cache_entry - .schedule - .up_to_date(task_instance.try_number, task_instance.latest_date()) - { + rollout.batches = match cache_entry.schedule.up_to_date(&task_instance) { ScheduleCacheValidity::UpToDate(cache) => cache, ScheduleCacheValidity::Invalid => { // Nothing has changed. Stop processing this task. @@ -958,18 +949,15 @@ impl<'a> RolloutUpdater<'a> { match &RolloutPlan::from_python_string(&schedule.value) { Ok(schedule) => { info!(target: "frontend_api::get_rollout_data", "{}: saving schedule cache", dag_run.dag_run_id); - cache_entry.schedule.update( - task_instance.try_number, - task_instance.latest_date(), - &schedule.batches, - ); + cache_entry + .schedule + .update(&task_instance, &schedule.batches); schedule.batches.clone() } Err(e) => { warn!(target: "frontend_api::get_rollout_data", "{}: could not parse schedule data: {}", dag_run.dag_run_id, e); cache_entry.schedule.invalidate( - task_instance.try_number, - task_instance.latest_date(), + &task_instance, Some(schedule.value), ); continue; @@ -983,11 +971,7 @@ impl<'a> RolloutUpdater<'a> { // Or there was no schedule to be found last time // it was queried. warn!(target: "frontend_api::get_rollout_data", "{}: no schedule despite schedule task finished", dag_run.dag_run_id); - cache_entry.schedule.invalidate( - task_instance.try_number, - task_instance.latest_date(), - None, - ); + cache_entry.schedule.invalidate(&task_instance, None); continue; } Err(e) => {