Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rollout dashboard fixes. #21

Merged
merged 1 commit into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions rollout-dashboard/frontend/src/lib/Batch.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,22 @@
{#each batch.subnets as subnet}
<ul>
<li class="subnet">
<div class="subnet_state_icon tooltip">
{batchStateIcon(subnet.state)}<span
class="subnet_state tooltiptext"
>{batchStateName(subnet.state)}{#if subnet.comment}<br
/>{subnet.comment}{/if}</span
>
</div>
<a
rel="external"
href={subnet.display_url || ""}
target="_blank"
data-sveltekit-preload-data="off"
>
<div class="subnet_state_icon tooltip">
{batchStateIcon(subnet.state)}<span
class="subnet_state tooltiptext"
>{batchStateName(
subnet.state,
)}{#if subnet.comment}<br
/>{subnet.comment}{/if}</span
>
</div></a
>
<div
class="subnet_id"
role="link"
Expand Down
40 changes: 20 additions & 20 deletions rollout-dashboard/frontend/src/lib/Rollout.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,26 @@
<section class="rollout">
<div class="general_info">
<div class="name">
{#if rollout.url}<a
rel="external"
href={rollout.url}
target="_blank"
data-sveltekit-preload-data="off"
>{rollout.name}
<svg
style="display: inline-block"
xmlns="http://www.w3.org/2000/svg"
width="1em"
height="1em"
viewBox="0 0 15 15"
><path
fill="currentColor"
fill-rule="evenodd"
d="M12 13a1 1 0 0 0 1-1V3a1 1 0 0 0-1-1H3a1 1 0 0 0-1 1v3.5a.5.5 0 0 0 1 0V3h9v9H8.5a.5.5 0 0 0 0 1zM9 6.5v3a.5.5 0 0 1-1 0V7.707l-5.146 5.147a.5.5 0 0 1-.708-.708L7.293 7H5.5a.5.5 0 0 1 0-1h3a.498.498 0 0 1 .5.497"
clip-rule="evenodd"
/></svg
></a
>{:else}{rollout.name}{/if}
<a
rel="external"
href={rollout.display_url}
target="_blank"
data-sveltekit-preload-data="off"
>{rollout.name}
<svg
style="display: inline-block"
xmlns="http://www.w3.org/2000/svg"
width="1em"
height="1em"
viewBox="0 0 15 15"
><path
fill="currentColor"
fill-rule="evenodd"
d="M12 13a1 1 0 0 0 1-1V3a1 1 0 0 0-1-1H3a1 1 0 0 0-1 1v3.5a.5.5 0 0 0 1 0V3h9v9H8.5a.5.5 0 0 0 0 1zM9 6.5v3a.5.5 0 0 1-1 0V7.707l-5.146 5.147a.5.5 0 0 1-.708-.708L7.293 7H5.5a.5.5 0 0 1 0-1h3a.498.498 0 0 1 .5.497"
clip-rule="evenodd"
/></svg
></a
>
{#if rollout.conf.simulate}<i class="simulated">(simulated)</i>{/if}
</div>
<div class="state_icon tooltip">
Expand Down
3 changes: 2 additions & 1 deletion rollout-dashboard/frontend/src/lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export type Subnet = {
git_revision: string;
state: keyof typeof subnet_rollout_states;
comment: String;
display_url: string;
};
export type Batch = {
subnets: Subnet[];
Expand Down Expand Up @@ -61,7 +62,7 @@ export type RolloutConfiguration = {
};
export type Rollout = {
name: String;
url?: string;
display_url: string;
note?: String;
conf: RolloutConfiguration;
state: keyof typeof rollout_states;
Expand Down
37 changes: 30 additions & 7 deletions rollout-dashboard/server/src/airflow_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use urlencoding::decode;

/// Default maximum batch size for paged requests in Airflow.
const MAX_BATCH_SIZE: usize = 100;
/// Exists to mitigate https://github.com/apache/airflow/issues/41283 .
const MAX_TASK_INSTANCE_BATCH_SIZE: usize = 1000;
/// Default timeout per request to Airflow.
const PER_REQUEST_TIMEOUT: u64 = 15;

Expand Down Expand Up @@ -408,6 +410,7 @@ async fn _paged_get<'a, T: Deserialize<'a> + Pageable + Default, G, Fut>(
url: String,
order_by: Option<String>,
paging_parameters: Option<PagingParameters>,
max_batch_size: usize,
mut getter: G,
) -> Result<T, AirflowError>
where
Expand All @@ -421,7 +424,7 @@ where
};
loop {
let batch_limit = match &paging_parameters {
Some(p) => min(p.limit, MAX_BATCH_SIZE),
Some(p) => min(p.limit, max_batch_size),
None => 0,
};
// Let's handle our parameters.
Expand Down Expand Up @@ -717,6 +720,7 @@ impl AirflowClient {
url,
Some("-execution_date".into()),
Some(PagingParameters { limit, offset }),
MAX_BATCH_SIZE,
|x| self._get_logged_in(x),
)
.await
Expand All @@ -735,18 +739,26 @@ impl AirflowClient {
let mut url = format!("dags/{}/dagRuns/{}/taskInstances", dag_id, dag_run_id);
url = add_updated_parameters(url, filters.updated_at_lte, filters.updated_at_gte);
url = add_ended_parameters(url, filters.ended_at_lte, filters.ended_at_gte);
_paged_get(url, None, Some(PagingParameters { limit, offset }), |x| {
self._get_logged_in(x)
})
_paged_get(
url,
None,
Some(PagingParameters { limit, offset }),
MAX_TASK_INSTANCE_BATCH_SIZE,
|x| self._get_logged_in(x),
)
.await
}

/// Return TaskInstances for a DAG run.
/// Mapped tasks are not returned here.
pub async fn tasks(&self, dag_id: &str) -> Result<TasksResponse, AirflowError> {
_paged_get(format!("dags/{}/tasks", dag_id), None, None, |x| {
self._get_logged_in(x)
})
_paged_get(
format!("dags/{}/tasks", dag_id),
None,
None,
MAX_BATCH_SIZE,
|x| self._get_logged_in(x),
)
.await
}

Expand All @@ -768,6 +780,7 @@ impl AirflowClient {
),
None,
Some(PagingParameters { limit, offset }),
MAX_BATCH_SIZE,
|x| self._get_logged_in(x),
)
.await
Expand Down Expand Up @@ -952,6 +965,7 @@ mod tests {
limit: 1,
offset: 0,
}),
MAX_BATCH_SIZE,
getter,
)
.await
Expand All @@ -969,6 +983,7 @@ mod tests {
limit: 1,
offset: 0,
}),
MAX_BATCH_SIZE,
getter,
)
.await
Expand All @@ -983,6 +998,7 @@ mod tests {
limit: 2,
offset: 0,
}),
MAX_BATCH_SIZE,
getter,
)
.await
Expand All @@ -996,6 +1012,7 @@ mod tests {
limit: 2,
offset: 3,
}),
MAX_BATCH_SIZE,
getter,
)
.await
Expand All @@ -1009,6 +1026,7 @@ mod tests {
limit: 2,
offset: 998,
}),
MAX_BATCH_SIZE,
getter,
)
.await
Expand All @@ -1022,6 +1040,7 @@ mod tests {
limit: 12,
offset: 998,
}),
MAX_BATCH_SIZE,
getter,
)
.await
Expand All @@ -1043,6 +1062,7 @@ mod tests {
limit: 1,
offset: 0,
}),
MAX_BATCH_SIZE,
c,
)
.await
Expand All @@ -1063,6 +1083,7 @@ mod tests {
limit: 4,
offset: 0,
}),
MAX_BATCH_SIZE,
c,
)
.await
Expand All @@ -1081,6 +1102,7 @@ mod tests {
limit: 20,
offset: 0,
}),
MAX_BATCH_SIZE,
c,
)
.await
Expand Down Expand Up @@ -1108,6 +1130,7 @@ mod tests {
limit: 1000,
offset: 0,
}),
MAX_BATCH_SIZE,
c,
)
.await
Expand Down
Loading
Loading