Skip to content

Commit

Permalink
Merge pull request #48 from dfinity/requery-schedule
Browse files Browse the repository at this point in the history
Rework local cache to address "not yet computed a rollout plan" issue.
  • Loading branch information
DFINITYManu authored Sep 25, 2024
2 parents 57b0c2c + 4916ee5 commit 452aec7
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 94 deletions.
2 changes: 1 addition & 1 deletion rollout-dashboard/doc/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
To learn about the API, how to use it, and how to interpret the data
served by API calls, please consult the programming documentation
that accompanies the `rollout_dashboard` crate, available under folder
[`server/`](server/) by running the `cargo rustdoc` program within that
[`../server/`](../server/) by running the `cargo rustdoc` program within that
folder, and then launching the Web page it generates for you.

Please do not proceed with creating a client
Expand Down
57 changes: 57 additions & 0 deletions rollout-dashboard/server/src/airflow_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ fn add_updated_parameters(
)
}

fn add_executed_parameters(
url: String,
execution_date_lte: Option<DateTime<Utc>>,
execution_date_gte: Option<DateTime<Utc>>,
) -> String {
add_date_parm(
add_date_parm(url, "execution_date_lte", execution_date_lte),
"execution_date_gte",
execution_date_gte,
)
}

fn add_ended_parameters(
url: String,
end_date_lte: Option<DateTime<Utc>>,
Expand Down Expand Up @@ -233,6 +245,39 @@ pub struct TaskInstancesResponseItem {
pub note: Option<String>,
}

impl TaskInstancesResponseItem {
#[allow(dead_code)]
pub fn latest_date(&self) -> DateTime<Utc> {
let mut d = self.execution_date;
if let Some(dd) = self.start_date {
if dd > d {
d = dd
}
}
if let Some(dd) = self.end_date {
if dd > d {
d = dd
}
}
d
}
#[allow(dead_code)]
pub fn earliest_date(&self) -> DateTime<Utc> {
let mut d = self.execution_date;
if let Some(dd) = self.start_date {
if dd < d {
d = dd
}
}
if let Some(dd) = self.end_date {
if dd < d {
d = dd
}
}
d
}
}

#[derive(Debug, Deserialize, Default)]

pub struct TaskInstancesResponse {
Expand Down Expand Up @@ -275,13 +320,24 @@ impl Pageable for TaskInstancesResponse {

#[derive(Default)]
pub struct TaskInstanceRequestFilters {
executed_at_lte: Option<DateTime<Utc>>,
executed_at_gte: Option<DateTime<Utc>>,
updated_at_lte: Option<DateTime<Utc>>,
updated_at_gte: Option<DateTime<Utc>>,
ended_at_lte: Option<DateTime<Utc>>,
ended_at_gte: Option<DateTime<Utc>>,
}

impl TaskInstanceRequestFilters {
#[allow(dead_code)]
pub fn executed_on_or_before(mut self, date: Option<DateTime<Utc>>) -> Self {
self.executed_at_lte = date;
self
}
pub fn executed_on_or_after(mut self, date: Option<DateTime<Utc>>) -> Self {
self.executed_at_gte = date;
self
}
#[allow(dead_code)]
pub fn updated_on_or_before(mut self, date: Option<DateTime<Utc>>) -> Self {
self.updated_at_lte = date;
Expand Down Expand Up @@ -737,6 +793,7 @@ impl AirflowClient {
filters: TaskInstanceRequestFilters,
) -> Result<TaskInstancesResponse, AirflowError> {
let mut url = format!("dags/{}/dagRuns/{}/taskInstances", dag_id, dag_run_id);
url = add_executed_parameters(url, filters.executed_at_lte, filters.executed_at_gte);
url = add_updated_parameters(url, filters.updated_at_lte, filters.updated_at_gte);
url = add_ended_parameters(url, filters.ended_at_lte, filters.ended_at_gte);
_paged_get(
Expand Down
Loading

0 comments on commit 452aec7

Please sign in to comment.