Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various improvements to the dashboard. #58

Merged
merged 5 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rollout-dashboard/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ set to the correct value (though sometimes the defaults are OK):
6. `REFRESH_INTERVAL` optionally set to a nonzero positive integer
as the number of seconds to wait between queries to Airflow.

## Development
## Developer information

* [How to develop the dashboard further](doc/dev.md)
* [How to use the dashboard API as a developer](doc/api.md)
* [How to develop the dashboard further](doc/dev.md)
4 changes: 4 additions & 0 deletions rollout-dashboard/frontend/src/App.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@
href="https://github.com/dfinity/dre-airflow/tree/main/rollout-dashboard"
target="_blank">Documentation</FooterLink
>
<FooterLink
href="https://dfinity.enterprise.slack.com/archives/C01DB8MQ5M1"
target="_blank">Help</FooterLink
>
</FooterLinkGroup>
</div>
</Footer>
188 changes: 123 additions & 65 deletions rollout-dashboard/server/src/frontend_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,11 @@ impl Display for RolloutPlanParseError {
}

impl RolloutPlan {
fn from_python_string(value: String) -> Result<Self, RolloutPlanParseError> {
fn from_python_string(value: &str) -> Result<Self, RolloutPlanParseError> {
let mut res = RolloutPlan {
batches: IndexMap::new(),
};
let python_string_plan: PythonFormattedRolloutPlan = match python::from_str(value.as_str())
{
let python_string_plan: PythonFormattedRolloutPlan = match python::from_str(value) {
Ok(s) => s,
Err(e) => return Err(RolloutPlanParseError::UndecipherablePython(e)),
};
Expand Down Expand Up @@ -268,19 +267,93 @@ impl From<CyclicDependencyError> for RolloutDataGatherError {
}

#[derive(Clone)]
enum ScheduleCache {
enum ScheduleCacheState {
Missing,
Invalid {
try_number: usize,
latest_date: DateTime<Utc>,
cached_schedule: String,
},
Valid {
cached_schedule: IndexMap<usize, Batch>,
},
}

enum ScheduleCacheValidity {
UpToDate(IndexMap<usize, Batch>),
Stale,
Invalid,
}

#[derive(Clone, Default)]
enum ScheduleCache {
#[default]
Unretrieved,
ForTask {
try_number: usize,
latest_date: DateTime<Utc>,
cached_schedule: String,
kind: ScheduleCacheState,
},
}

impl ScheduleCache {
/// Retrieve whether the cache is updated based on the cache keys passed,
/// and if up to date, return the cache contents. Return Stale if the
/// cache needs updating, and Invalid if the cache is up to date but
/// the contents are not valid for use.
fn up_to_date(&self, task_instance: &TaskInstancesResponseItem) -> ScheduleCacheValidity {
match self {
ScheduleCache::Unretrieved => ScheduleCacheValidity::Stale,
ScheduleCache::ForTask {
try_number: t,
latest_date: l,
kind,
} => {
if *t == task_instance.try_number && *l == task_instance.latest_date() {
match &kind {
ScheduleCacheState::Valid { cached_schedule } => {
ScheduleCacheValidity::UpToDate(cached_schedule.clone())
}
ScheduleCacheState::Missing | ScheduleCacheState::Invalid { .. } => {
ScheduleCacheValidity::Invalid
}
}
} else {
// Same schedule task has been updated. Data may not be missing anymore.
ScheduleCacheValidity::Stale
}
}
}
}

/// Update the cache entry.
fn update(
&mut self,
task_instance: &TaskInstancesResponseItem,
batches: &IndexMap<usize, Batch>,
) {
*self = Self::ForTask {
try_number: task_instance.try_number,
latest_date: task_instance.latest_date(),
kind: ScheduleCacheState::Valid {
cached_schedule: batches.clone(),
},
}
}

/// Update the cache entry with (possibly) an invalid value.
fn invalidate(&mut self, task_instance: &TaskInstancesResponseItem, schedule: Option<String>) {
*self = Self::ForTask {
try_number: task_instance.try_number,
latest_date: task_instance.latest_date(),
kind: match schedule {
None => ScheduleCacheState::Missing,
Some(schedule) => ScheduleCacheState::Invalid {
cached_schedule: schedule,
},
},
}
}
}

#[derive(Debug, Clone)]
/// DAG run update type.
enum DagRunUpdateType {
Expand Down Expand Up @@ -503,11 +576,16 @@ where
S: Serializer,
{
match cache {
ScheduleCache::Invalid { .. } => serializer.serialize_str("invalid"),
ScheduleCache::Missing { .. } => serializer.serialize_str("missing"),
ScheduleCache::Valid {
cached_schedule, ..
} => serializer.serialize_str(cached_schedule),
ScheduleCache::Unretrieved { .. } => serializer.serialize_str("not retrieved yet"),
ScheduleCache::ForTask { kind, .. } => match kind {
ScheduleCacheState::Missing { .. } => serializer.serialize_str("missing"),
ScheduleCacheState::Invalid {
cached_schedule, ..
} => serializer.serialize_str(format!("<INVALID!>{}", cached_schedule).as_str()),
ScheduleCacheState::Valid {
cached_schedule, ..
} => serializer.serialize_str(format!("<valid>{:?}", cached_schedule).as_str()),
},
}
}

Expand Down Expand Up @@ -614,7 +692,7 @@ impl<'a> RolloutUpdater<'a> {
// We must evict the task cache.
if cache_entry.dispatch_time != dag_run.logical_date {
cache_entry.dispatch_time = dag_run.logical_date;
cache_entry.schedule = ScheduleCache::Missing;
cache_entry.schedule = ScheduleCache::default();
cache_entry.task_instances = HashMap::new();
cache_entry.update_count += 1;
}
Expand Down Expand Up @@ -848,58 +926,43 @@ impl<'a> RolloutUpdater<'a> {
| Some(TaskInstanceState::Scheduled)
| None => rollout.state = min(rollout.state, RolloutState::Preparing),
Some(TaskInstanceState::Success) => {
if let ScheduleCache::Valid {
try_number,
latest_date,
..
} = &cache_entry.schedule
{
if *try_number != task_instance.try_number
|| *latest_date != task_instance.latest_date()
{
info!(target: "frontend_api::get_rollout_data", "{}: resetting schedule cache due to changes to the schedule task", dag_run.dag_run_id);
// Another task run of the same task has executed. We must clear the cache entry.
cache_entry.schedule = ScheduleCache::Missing;
}
}
if let ScheduleCache::Invalid {
try_number,
latest_date,
..
} = &cache_entry.schedule
{
if *try_number != task_instance.try_number
|| *latest_date != task_instance.latest_date()
{
// Same schedule task has been updated.
info!(target: "frontend_api::get_rollout_data", "{}: requerying schedule cache due to forward progress of the schedule task", dag_run.dag_run_id);
cache_entry.schedule = ScheduleCache::Missing;
rollout.batches = match cache_entry.schedule.up_to_date(&task_instance) {
ScheduleCacheValidity::UpToDate(cache) => cache,
ScheduleCacheValidity::Invalid => {
// Nothing has changed. Stop processing this task.
continue;
}
}
let schedule_string = match &cache_entry.schedule {
ScheduleCache::Valid {
cached_schedule, ..
} => cached_schedule,
ScheduleCache::Invalid { .. } => continue,
ScheduleCache::Missing => {
let value = airflow_api
ScheduleCacheValidity::Stale => {
// Same schedule task has been updated. Data may not be missing anymore.
info!(target: "frontend_api::get_rollout_data", "{}: schedule task is outdated; requerying", dag_run.dag_run_id);
match airflow_api
.xcom_entry(
dag_id,
dag_run.dag_run_id.as_str(),
task_instance.task_id.as_str(),
task_instance.map_index,
"return_value",
)
.await;
let schedule = match value {
.await
{
Ok(schedule) => {
cache_entry.schedule = ScheduleCache::Valid {
try_number: task_instance.try_number,
latest_date: task_instance.latest_date(),
cached_schedule: schedule.value.clone(),
};
info!(target: "frontend_api::get_rollout_data", "{}: saving schedule cache", dag_run.dag_run_id);
schedule.value
match &RolloutPlan::from_python_string(&schedule.value) {
Ok(schedule) => {
info!(target: "frontend_api::get_rollout_data", "{}: saving schedule cache", dag_run.dag_run_id);
cache_entry
.schedule
.update(&task_instance, &schedule.batches);
schedule.batches.clone()
}
Err(e) => {
warn!(target: "frontend_api::get_rollout_data", "{}: could not parse schedule data: {}", dag_run.dag_run_id, e);
cache_entry.schedule.invalidate(
&task_instance,
Some(schedule.value),
);
continue;
}
}
}
Err(AirflowError::StatusCode(
reqwest::StatusCode::NOT_FOUND,
Expand All @@ -908,21 +971,16 @@ impl<'a> RolloutUpdater<'a> {
// Or there was no schedule to be found last time
// it was queried.
warn!(target: "frontend_api::get_rollout_data", "{}: no schedule despite schedule task finished", dag_run.dag_run_id);
cache_entry.schedule = ScheduleCache::Invalid {
try_number: task_instance.try_number,
latest_date: task_instance.latest_date(),
};
cache_entry.schedule.invalidate(&task_instance, None);
continue;
}
Err(e) => {
// In this case the dashboard will try to requery in the future again.
return Err(RolloutDataGatherError::AirflowError(e));
}
};
&schedule.clone()
}
}
};
let schedule = RolloutPlan::from_python_string(schedule_string.clone())?;
rollout.batches = schedule.batches;
}
}
} else if task_instance.task_id == "wait_for_other_rollouts"
Expand Down Expand Up @@ -1276,7 +1334,7 @@ impl RolloutApi {
task_instances: HashMap::new(),
dispatch_time: dag_run.logical_date,
note: dag_run.note.clone(),
schedule: ScheduleCache::Missing,
schedule: ScheduleCache::default(),
last_update_time: None,
update_count: 0,
}),
Expand Down
12 changes: 7 additions & 5 deletions rollout-dashboard/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,11 +353,13 @@ async fn main() -> ExitCode {
.produce_rollouts_sse_stream(options.incremental.unwrap_or_default())
}
};
let mut tree = Router::new();
tree = tree
.route("/api/v1/rollouts", get(rollouts_handler))
.route("/api/v1/cache", get(cached_data_handler))
.route("/api/v1/rollouts/sse", get(rollouts_sse_handler));
let stable_api = Router::new()
.route("/rollouts", get(rollouts_handler))
.route("/rollouts/sse", get(rollouts_sse_handler));
let unstable_api = Router::new().route("/cache", get(cached_data_handler));
let mut tree = Router::new()
.nest("/api/v1", stable_api)
.nest("/api/unstable", unstable_api);
tree = tree.nest_service("/", ServeDir::new(frontend_static_dir));

let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
Expand Down
Loading