Skip to content

Commit

Permalink
Ensure that changes in rollout notes or dispatch times are also inter…
Browse files Browse the repository at this point in the history
…preted as changed data.
  • Loading branch information
DFINITYManu committed Aug 6, 2024
1 parent 1ea3ded commit 50cbc03
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 6 deletions.
30 changes: 25 additions & 5 deletions rollout-dashboard/server/src/frontend_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ enum ScheduleCache {
}
struct RolloutDataCache {
task_instances: HashMap<String, TaskInstancesResponseItem>,
dispatch_time: DateTime<Utc>,
note: Option<String>,
schedule: ScheduleCache,
}

Expand Down Expand Up @@ -447,7 +449,8 @@ impl RolloutApi {
///
/// Returns a tuple of the the rollout data and a flag
/// indicating if the rollout data was updated since
/// the last time.
/// the last time. The flag should be used by calling
/// code to decide whether to send data to clients or not.
pub async fn get_rollout_data(
&self,
max_rollouts: usize,
Expand Down Expand Up @@ -476,8 +479,13 @@ impl RolloutApi {
.entry(dag_run.dag_run_id.clone())
.or_insert(RolloutDataCache {
task_instances: HashMap::new(),
dispatch_time: dag_run.logical_date,
note: dag_run.note.clone(),
schedule: ScheduleCache::Empty,
});

// All new task instances that have not been seen before. This includes
// tasks of rollouts newly created since last time this loop checked for rollouts.
let updated_task_instances = self
.airflow_api
.task_instances(
Expand Down Expand Up @@ -557,8 +565,17 @@ impl RolloutApi {
}
}

let sorted_task_instances =
sorter.sort_instances(cache_entry.task_instances.clone().into_values());
// If the note of the rollout has changed,
// note that this has been updated.
if cache_entry.note != dag_run.note {
any_rollout_updated = true;
cache_entry.note = dag_run.note.clone();
}
// Same for the dispatch time.
if cache_entry.dispatch_time != dag_run.logical_date {
any_rollout_updated = true;
cache_entry.dispatch_time = dag_run.logical_date.clone();
}

let mut rollout = Rollout::new(
dag_run.dag_run_id.to_string(),
Expand All @@ -574,12 +591,15 @@ impl RolloutApi {
.append_pair("dag_run_id", dag_run.dag_run_id.as_str());
display_url.to_string()
},
dag_run.note.clone(),
dag_run.logical_date,
cache_entry.note.clone(),
cache_entry.dispatch_time,
dag_run.last_scheduling_decision,
dag_run.conf.clone(),
);

let sorted_task_instances =
sorter.sort_instances(cache_entry.task_instances.clone().into_values());

// Now update rollout and batch state based on the obtained data.
for task_instance in sorted_task_instances {
if task_instance.task_id == "schedule" {
Expand Down
1 change: 0 additions & 1 deletion rollout-dashboard/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ impl Server {
};

if changed {
println!("changed");
let _ = self.stream_tx.send(data.clone());

let mut container = self.last_rollout_data.lock().await;
Expand Down

0 comments on commit 50cbc03

Please sign in to comment.