diff --git a/rollout-dashboard/doc/api.md b/rollout-dashboard/doc/api.md index 54277fc..be4ead9 100644 --- a/rollout-dashboard/doc/api.md +++ b/rollout-dashboard/doc/api.md @@ -5,7 +5,7 @@ To learn about the API, how to use it, and how to interpret the data served by API calls, please consult the programming documentation that accompanies the `rollout_dashboard` crate, available under folder -[`server/`](server/) by running the `cargo rustdoc` program within that +[`../server/`](../server/) by running the `cargo rustdoc` program within that folder, and then launching the Web page it generates for you. Please do not proceed with creating a client diff --git a/rollout-dashboard/server/src/airflow_client.rs b/rollout-dashboard/server/src/airflow_client.rs index 9e6f260..9d6c798 100644 --- a/rollout-dashboard/server/src/airflow_client.rs +++ b/rollout-dashboard/server/src/airflow_client.rs @@ -52,6 +52,18 @@ fn add_updated_parameters( ) } +fn add_executed_parameters( + url: String, + execution_date_lte: Option>, + execution_date_gte: Option>, +) -> String { + add_date_parm( + add_date_parm(url, "execution_date_lte", execution_date_lte), + "execution_date_gte", + execution_date_gte, + ) +} + fn add_ended_parameters( url: String, end_date_lte: Option>, @@ -233,6 +245,39 @@ pub struct TaskInstancesResponseItem { pub note: Option, } +impl TaskInstancesResponseItem { + #[allow(dead_code)] + pub fn latest_date(&self) -> DateTime { + let mut d = self.execution_date; + if let Some(dd) = self.start_date { + if dd > d { + d = dd + } + } + if let Some(dd) = self.end_date { + if dd > d { + d = dd + } + } + d + } + #[allow(dead_code)] + pub fn earliest_date(&self) -> DateTime { + let mut d = self.execution_date; + if let Some(dd) = self.start_date { + if dd < d { + d = dd + } + } + if let Some(dd) = self.end_date { + if dd < d { + d = dd + } + } + d + } +} + #[derive(Debug, Deserialize, Default)] pub struct TaskInstancesResponse { @@ -275,6 +320,8 @@ impl Pageable for TaskInstancesResponse { #[derive(Default)] pub struct TaskInstanceRequestFilters { + executed_at_lte: Option>, + executed_at_gte: Option>, updated_at_lte: Option>, updated_at_gte: Option>, ended_at_lte: Option>, @@ -282,6 +329,15 @@ pub struct TaskInstanceRequestFilters { } impl TaskInstanceRequestFilters { + #[allow(dead_code)] + pub fn executed_on_or_before(mut self, date: Option>) -> Self { + self.executed_at_lte = date; + self + } + pub fn executed_on_or_after(mut self, date: Option>) -> Self { + self.executed_at_gte = date; + self + } #[allow(dead_code)] pub fn updated_on_or_before(mut self, date: Option>) -> Self { self.updated_at_lte = date; @@ -737,6 +793,7 @@ impl AirflowClient { filters: TaskInstanceRequestFilters, ) -> Result { let mut url = format!("dags/{}/dagRuns/{}/taskInstances", dag_id, dag_run_id); + url = add_executed_parameters(url, filters.executed_at_lte, filters.executed_at_gte); url = add_updated_parameters(url, filters.updated_at_lte, filters.updated_at_gte); url = add_ended_parameters(url, filters.ended_at_lte, filters.ended_at_gte); _paged_get( diff --git a/rollout-dashboard/server/src/frontend_api.rs b/rollout-dashboard/server/src/frontend_api.rs index 68ac4a5..1127eb7 100644 --- a/rollout-dashboard/server/src/frontend_api.rs +++ b/rollout-dashboard/server/src/frontend_api.rs @@ -1,14 +1,25 @@ +use crate::airflow_client::{ + AirflowClient, AirflowError, DagRunState, TaskInstanceRequestFilters, TaskInstanceState, + TaskInstancesResponseItem, TasksResponse, TasksResponseItem, +}; use crate::python; use chrono::{DateTime, Utc}; +use futures::future::join_all; use indexmap::IndexMap; use lazy_static::lazy_static; use log::{debug, trace, warn}; use regex::Regex; +use rollout_dashboard::types::{ + Batch, Rollout, RolloutState, Rollouts, Subnet, SubnetRolloutState, +}; use serde::Serialize; use std::cmp::min; +use std::collections::hash_map::Entry::{Occupied, Vacant}; use std::collections::HashMap; use std::fmt::{self, Display}; +use std::future::Future; use std::num::ParseIntError; +use std::pin::Pin; use std::rc::Rc; use std::str::FromStr; use std::sync::Arc; @@ -16,20 +27,14 @@ use std::{vec, vec::Vec}; use tokio::sync::Mutex; use topological_sort::TopologicalSort; -use crate::airflow_client::{ - AirflowClient, AirflowError, DagRunState, TaskInstanceRequestFilters, TaskInstanceState, - TaskInstancesResponseItem, TasksResponse, TasksResponseItem, -}; -use rollout_dashboard::types::{ - Batch, Rollout, RolloutState, Rollouts, Subnet, SubnetRolloutState, -}; - lazy_static! { // unwrap() is legitimate here because we know these cannot fail to compile. static ref SubnetGitRevisionRe: Regex = Regex::new("dfinity.ic_types.SubnetRolloutInstance.*@version=0[(]start_at=.*,subnet_id=([0-9-a-z-]+),git_revision=([0-9a-f]+)[)]").unwrap(); static ref BatchIdentificationRe: Regex = Regex::new("batch_([0-9]+)[.](.+)").unwrap(); } +const TASK_INSTANCE_LIST_LIMIT: usize = 500; + #[derive(Debug)] pub struct CyclicDependencyError { message: String, @@ -254,14 +259,14 @@ enum ScheduleCache { Valid(String), } struct RolloutDataCache { - task_instances: HashMap, + task_instances: HashMap, TaskInstancesResponseItem>>, dispatch_time: DateTime, note: Option, schedule: ScheduleCache, + last_update_time: Option>, } struct RolloutApiCache { - last_update_time: Option>, /// Map from DAG run ID to task instance ID (with / without index) /// to task instance. by_dag_run: HashMap, @@ -339,7 +344,6 @@ impl RolloutApi { Self { airflow_api: Arc::new(client), cache: Arc::new(Mutex::new(RolloutApiCache { - last_update_time: None, by_dag_run: HashMap::new(), })), } @@ -364,8 +368,6 @@ impl RolloutApi { max_rollouts: usize, ) -> Result<(Rollouts, bool), RolloutDataGatherError> { let mut cache = self.cache.lock().await; - let now = Utc::now(); - let last_update_time = cache.last_update_time; let dag_id = "rollout_ic_os_to_mainnet_subnets"; let dag_runs = self @@ -392,43 +394,99 @@ impl RolloutApi { dispatch_time: dag_run.logical_date, note: dag_run.note.clone(), schedule: ScheduleCache::Empty, + last_update_time: None, }); - // All new task instances that have not been seen before. This includes - // tasks of rollouts newly created since last time this loop checked for rollouts. - let updated_task_instances = self - .airflow_api - .task_instances( - dag_id, - dag_run.dag_run_id.as_str(), - 500, - 0, - TaskInstanceRequestFilters::default().updated_on_or_after(last_update_time), - ) - .await? - .task_instances; - - // Tasks that are ended are not marked as updated in Airflow. - let ended_task_instances = if last_update_time.is_some() { - self.airflow_api - .task_instances( - dag_id, - dag_run.dag_run_id.as_str(), - 500, - 0, - TaskInstanceRequestFilters::default().ended_on_or_after(last_update_time), - ) - .await? - .task_instances - } else { - vec![] - }; + type TaskInstanceResponse = Result, AirflowError>; + + let last_update_time = cache_entry.last_update_time; + let now = Utc::now(); + let requests: Vec + Send>>> = vec![ + Box::pin(async move { + match self + .airflow_api + .task_instances( + dag_id, + dag_run.dag_run_id.as_str(), + TASK_INSTANCE_LIST_LIMIT, + 0, + TaskInstanceRequestFilters::default() + .executed_on_or_after(last_update_time), + ) + .await + { + Ok(r) => Ok(r.task_instances), + Err(e) => Err(e), + } + }), + Box::pin(async move { + match last_update_time { + None => Ok(vec![]), + Some(_) => { + match self + .airflow_api + .task_instances( + dag_id, + dag_run.dag_run_id.as_str(), + TASK_INSTANCE_LIST_LIMIT, + 0, + TaskInstanceRequestFilters::default() + .updated_on_or_after(last_update_time), + ) + .await + { + Ok(r) => Ok(r.task_instances), + Err(e) => Err(e), + } + } + } + }), + Box::pin(async move { + match last_update_time { + None => Ok(vec![]), + Some(_) => { + match self + .airflow_api + .task_instances( + dag_id, + dag_run.dag_run_id.as_str(), + TASK_INSTANCE_LIST_LIMIT, + 0, + TaskInstanceRequestFilters::default() + .ended_on_or_after(last_update_time), + ) + .await + { + Ok(r) => Ok(r.task_instances), + Err(e) => Err(e), + } + } + } + }), + ]; + + let results = join_all(requests).await; + let mut all_task_instances: Vec = vec![]; + for r in results.into_iter() { + all_task_instances.append(&mut r?) + } debug!( - target: "frontend_api", "{}: Updated tasks {} Ended tasks {}", - dag_run.dag_run_id, updated_task_instances.len(), ended_task_instances.len(), + target: "frontend_api", "{}: Undeduplicated tasks: {}", + dag_run.dag_run_id, all_task_instances.len() ); + let rollout_had_changed_tasks = match all_task_instances.is_empty() { + true => false, + false => { + // At least one task has updated or finished. + // See function documentation about meaningful changes. + meaningful_updates_to_any_rollout = true; + // Now remember this rollout was updated. + true + } + }; + // If the note of the rollout has changed, // note that this has been updated. if cache_entry.note != dag_run.note { @@ -464,58 +522,58 @@ impl RolloutApi { ); // Let's update the cache to incorporate the most up-to-date task instances. - for task_instance in updated_task_instances - .into_iter() - .chain(ended_task_instances.into_iter()) - { - // At least one task has updated or finished. - // See function documentation about meaningful changes. - meaningful_updates_to_any_rollout = true; - + for task_instance in all_task_instances.into_iter() { let task_instance_id = task_instance.task_id.clone(); if task_instance_id == "schedule" { cache_entry.schedule = ScheduleCache::Invalid; } - match task_instance.map_index { - None => { - cache_entry - .task_instances - .insert(format!("{} None", task_instance_id), task_instance); + + let by_name = cache_entry + .task_instances + .entry(task_instance_id) + .or_insert(HashMap::new()); + + match by_name.entry(task_instance.map_index) { + Vacant(entry) => { + entry.insert(task_instance); } - Some(idx) => { - if cache_entry - .task_instances - .contains_key(&format!("{} None", task_instance_id)) - { - debug!( - target: "frontend_api", "Formerly unmapped task {} is now mapped to index {}", - task_instance_id, idx - ); + Occupied(mut entry) => { + if task_instance.latest_date() > entry.get().latest_date() { + entry.insert(task_instance.clone()); } - // Once a task has been mapped, clearing the task will not cause it - // to become unmapped anymore. This is behavior that the API has - // presented to me through observation. - // - // The number of map indexes for a task cannot be reduced once a - // flow has started executing. - cache_entry - .task_instances - .insert(format!("{} {}", task_instance_id, idx), task_instance); - // Thus, we must remove any cached entry that has map index None. - cache_entry - .task_instances - .remove(&format!("{} None", task_instance_id)); + } + }; + } + + for (task_instance_id, tasks) in cache_entry.task_instances.iter_mut() { + // Delete data on all unmapped tasks if a mapped task sibling is present. + if tasks.len() > 1 { + if let Occupied(_) = tasks.entry(None) { + debug!( + target: "frontend_api", "Formerly unmapped task {} is now mapped", + task_instance_id + ); + tasks.remove(&None); } } } + let linearized_tasks: Vec = cache_entry + .task_instances + .iter() + .flat_map(|(_, tasks)| tasks.iter().map(|(_, task)| task.clone())) + .collect(); + + debug!( + target: "frontend_api", "{}: Total disambiguated tasks including locally cached ones: {}", + dag_run.dag_run_id, linearized_tasks.len(), + ); + // Now update rollout and batch state based on the obtained data. // What this process does is fairly straightforward: // * for each and every known up-to-date Airflow task in the cache // (always processed in topological order), - for task_instance in - sorter.sort_instances(cache_entry.task_instances.clone().into_values()) - { + for task_instance in sorter.sort_instances(linearized_tasks.into_iter()) { // * deduce the rollout plan, if available, // * mark the rollout as having problems or errors depending on what // the task state is, or as one of the various running states, if @@ -542,7 +600,7 @@ impl RolloutApi { Some(TaskInstanceState::Success) => { let schedule_string = match &cache_entry.schedule { ScheduleCache::Valid(s) => s, - ScheduleCache::Invalid => { + ScheduleCache::Invalid | ScheduleCache::Empty => { let value = self .airflow_api .xcom_entry( @@ -563,6 +621,8 @@ impl RolloutApi { reqwest::StatusCode::NOT_FOUND, )) => { // There is no schedule to be found. + // Or there was no schedule to be found last time + // it was queried. cache_entry.schedule = ScheduleCache::Empty; continue; } @@ -572,11 +632,6 @@ impl RolloutApi { }; &schedule.clone() } - ScheduleCache::Empty => { - // There was no schedule to be found last time - // it was queried. - continue; - } }; let schedule = RolloutPlan::from_python_string(schedule_string.clone())?; @@ -834,14 +889,18 @@ impl RolloutApi { } } + if rollout_had_changed_tasks { + // We bump the cache entry's last update time, to only retrieve + // tasks from this point in time on during subsequent retrievals. + // We only do this at the end, in case any code above returns + // early, to force a full state recalculation if there was a + // failure or an early return. + cache_entry.last_update_time = Some(now); + } + res.push(rollout); } - if meaningful_updates_to_any_rollout { - // Preserve the value for next loop so that we have a baseline - // of date/time to query data incrementally. - cache.last_update_time = Some(now); - } Ok((res, meaningful_updates_to_any_rollout)) } }