Skip to content

Commit

Permalink
Implement showing rollout state at the top of the rollout list.
Browse files Browse the repository at this point in the history
  • Loading branch information
DFINITYManu committed Oct 25, 2024
1 parent 2adf23b commit c72d831
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 21 deletions.
99 changes: 99 additions & 0 deletions rollout-dashboard/frontend/src/App.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,105 @@
</div>
{/if}

{#if $my_rollout_query.engine_state === "missing"}
<div
class="flex items-center p-4 mb-4 text-sm text-yellow-800 rounded-lg bg-yellow-50 dark:bg-gray-800 dark:text-yellow-300"
role="alert"
>
<svg
class="flex-shrink-0 inline w-4 h-4 me-3"
aria-hidden="true"
xmlns="http://www.w3.org/2000/svg"
fill="currentColor"
viewBox="0 0 20 20"
>
<path
d="M10 .5a9.5 9.5 0 1 0 9.5 9.5A9.51 9.51 0 0 0 10 .5ZM9.5 4a1.5 1.5 0 1 1 0 3 1.5 1.5 0 0 1 0-3ZM12 15H8a1 1 0 0 1 0-2h1v-3H8a1 1 0 0 1 0-2h2a1 1 0 0 1 1 1v4h1a1 1 0 0 1 0 2Z"
/>
</svg>
<span class="sr-only">Info</span>
<div>
<span class="font-medium">Rollout flow missing.</span> Airflow cannot find
the flow in charge of executing the rollouts. Use the <i>Help</i> link below
to contact DRE.
</div>
</div>
{/if}

{#if $my_rollout_query.engine_state === "inactive"}
<div
class="flex items-center p-4 mb-4 text-sm text-yellow-800 rounded-lg bg-yellow-50 dark:bg-gray-800 dark:text-yellow-300"
role="alert"
>
<svg
class="flex-shrink-0 inline w-4 h-4 me-3"
aria-hidden="true"
xmlns="http://www.w3.org/2000/svg"
fill="currentColor"
viewBox="0 0 20 20"
>
<path
d="M10 .5a9.5 9.5 0 1 0 9.5 9.5A9.51 9.51 0 0 0 10 .5ZM9.5 4a1.5 1.5 0 1 1 0 3 1.5 1.5 0 0 1 0-3ZM12 15H8a1 1 0 0 1 0-2h1v-3H8a1 1 0 0 1 0-2h2a1 1 0 0 1 1 1v4h1a1 1 0 0 1 0 2Z"
/>
</svg>
<span class="sr-only">Info</span>
<div>
<span class="font-medium">Rollouts inactive.</span> The Airflow scheduler
cannot see the flow in charge of executing the rollouts. Use the
<i>Help</i> link below to contact DRE.
</div>
</div>
{/if}

{#if $my_rollout_query.engine_state === "broken"}
<div
class="flex items-center p-4 mb-4 text-sm text-yellow-800 rounded-lg bg-yellow-50 dark:bg-gray-800 dark:text-yellow-300"
role="alert"
>
<svg
class="flex-shrink-0 inline w-4 h-4 me-3"
aria-hidden="true"
xmlns="http://www.w3.org/2000/svg"
fill="currentColor"
viewBox="0 0 20 20"
>
<path
d="M10 .5a9.5 9.5 0 1 0 9.5 9.5A9.51 9.51 0 0 0 10 .5ZM9.5 4a1.5 1.5 0 1 1 0 3 1.5 1.5 0 0 1 0-3ZM12 15H8a1 1 0 0 1 0-2h1v-3H8a1 1 0 0 1 0-2h2a1 1 0 0 1 1 1v4h1a1 1 0 0 1 0 2Z"
/>
</svg>
<span class="sr-only">Info</span>
<div>
<span class="font-medium">Rollout flow broken.</span> The Airflow
scheduler cannot process the flow in charge of executing the rollouts. Use
the <i>Help</i> link below to contact DRE.
</div>
</div>
{/if}

{#if $my_rollout_query.engine_state === "paused"}
<div
class="flex items-center p-4 mb-4 text-sm text-blue-800 rounded-lg bg-blue-50 dark:bg-gray-800 dark:text-blue-400"
role="alert"
>
<svg
class="flex-shrink-0 inline w-4 h-4 me-3"
aria-hidden="true"
xmlns="http://www.w3.org/2000/svg"
fill="currentColor"
viewBox="0 0 20 20"
>
<path
d="M10 .5a9.5 9.5 0 1 0 9.5 9.5A9.51 9.51 0 0 0 10 .5ZM9.5 4a1.5 1.5 0 1 1 0 3 1.5 1.5 0 0 1 0-3ZM12 15H8a1 1 0 0 1 0-2h1v-3H8a1 1 0 0 1 0-2h2a1 1 0 0 1 1 1v4h1a1 1 0 0 1 0 2Z"
/>
</svg>
<span class="sr-only">Info</span>
<div>
<span class="font-medium">Rollout engine paused.</span> Rollouts have been
paused by DRE. Use the <i>Help</i> link below if you want to inquire why.
</div>
</div>
{/if}

{#each $my_rollout_query.rollouts as rollout}
{#if (($url.hash === "" || $url.hash === "#active") && rollout.state !== "complete" && rollout.state !== "failed") || ($url.hash === "#complete" && rollout.state === "complete") || ($url.hash === "#failed" && rollout.state === "failed") || $url.hash === "#all"}
<Rollout {rollout} />
Expand Down
17 changes: 11 additions & 6 deletions rollout-dashboard/frontend/src/lib/stores.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ import { type Rollout } from './types'

const BACKEND_TIMEOUT = 15000

type IncrementalRolloutResult = {
type SseMessage = {
error: [number, string] | null;
rollouts: Rollout[];
updated: Rollout[] | undefined;
deleted: String[] | undefined;
engine_state?: string;
}

export type RolloutResult = {
error: [number, string] | string | null;
rollouts: Rollout[];
engine_state?: string;
}


Expand All @@ -37,7 +39,7 @@ function resetupEventSource() {
// var evtSourceGenerated = new Date();
evtSource = new EventSource(url);
evtSource.onmessage = async function (event) {
var sse_message: IncrementalRolloutResult = JSON.parse(event.data);
var sse_message: SseMessage = JSON.parse(event.data);
if (sse_message.error !== null) {
let status = sse_message.error[0];
if (status == 204) {
Expand All @@ -52,21 +54,23 @@ function resetupEventSource() {
console.log('Request for rollout data failed: ' + errorText)
rollout_store.set({
rollouts: get(rollout_store).rollouts,
engine_state: get(rollout_store).engine_state,
error: errorText
})
}
} else if (sse_message.rollouts !== undefined) {
console.log("Full sync with " + sse_message.rollouts.length + " rollouts");
console.log("Full sync with " + sse_message.rollouts.length + " rollouts and engine state " + sse_message.engine_state);
rollout_store.set({
rollouts: sse_message.rollouts,
error: null
error: null,
engine_state: sse_message.engine_state,
})
} else {
var rollouts: Rollout[] = get(rollout_store).rollouts;
var updated: Rollout[] | undefined = sse_message["updated"];
var deleted: String[] | undefined = sse_message["deleted"];
if (updated !== undefined) {
console.log("Update of " + updated.length + " rollouts");
console.log("Update of " + updated.length + " rollouts and engine state " + sse_message.engine_state);
for (var i = updated.length - 1; i >= 0; i--) {
var found = false;
for (var j = rollouts.length - 1; j >= 0; j--) {
Expand Down Expand Up @@ -95,7 +99,8 @@ function resetupEventSource() {
}
rollout_store.set({
rollouts: rollouts,
error: null
error: null,
engine_state: sse_message.engine_state,
})
}
}
Expand Down
100 changes: 100 additions & 0 deletions rollout-dashboard/server/src/airflow_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,74 @@ trait Pageable {
fn truncate(&mut self, max_entries: usize);
}

#[derive(Debug, Deserialize, Clone)]
pub struct DagsResponseItem {
/// dag_id is unique, enforced by Airflow.
pub dag_id: String,
#[allow(dead_code)]
pub dag_display_name: String,
pub is_paused: bool,
pub is_active: bool,
pub has_import_errors: bool,
}

#[derive(Debug, Deserialize, Default)]
pub struct DagsResponse {
pub dags: Vec<DagsResponseItem>,
#[serde(skip_serializing, skip_deserializing)]
position_cache: HashMap<String, usize>,
total_entries: usize,
}

impl Pageable for DagsResponse {
fn len(&self) -> usize {
self.dags.len()
}
fn merge(&mut self, other: Self) {
for v in other.dags.clone().into_iter() {
let id = v.dag_id.clone();
match self.position_cache.get(&id) {
Some(pos) => {
//debug!(target: "processing", "Replacing {} at position {}", id, pos);
self.dags[*pos] = v;
}
None => {
//debug!(target: "processing", "Consuming {}", id);
self.position_cache.insert(id, self.dags.len());
self.dags.push(v);
}
}
}
self.total_entries = other.total_entries;
}
fn truncate(&mut self, max_entries: usize) {
if self.dags.len() > max_entries {
self.dags.truncate(max_entries)
}
}
}

#[allow(dead_code)]
#[derive(Default)]
pub struct DagsQueryFilter<'a> {
pub dag_id_pattern: Option<&'a String>,
}

impl<'a> DagsQueryFilter<'a> {
fn as_queryparams(&self) -> Vec<(&str, String)> {
let shit = [self
.dag_id_pattern
.as_ref()
.map(|v| ("dag_id_pattern", (*v).clone()))];
let res: Vec<_> = shit
.iter()
.flatten()
.map(|(k, v)| (*k, (*v).clone()))
.collect();
res
}
}

#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "snake_case")]
pub enum DagRunState {
Expand Down Expand Up @@ -970,6 +1038,38 @@ impl AirflowClient {
Ok(())
}

/// Return DAG info, by default in alphabetical order.
#[allow(dead_code)]
pub async fn dags(
&self,
limit: usize,
offset: usize,
filter: &DagsQueryFilter<'_>,
order_by: Option<String>, // FIXME: use structural typing here.
) -> Result<DagsResponse, AirflowError> {
let qpairs = filter.as_queryparams();
let qparams: querystring::QueryParams =
qpairs.iter().map(|(k, v)| (*k, v.as_str())).collect();
let suburl = "dags".to_string()
+ (if !qparams.is_empty() {
"?".to_string() + querystring::stringify(qparams).as_str()
} else {
"".to_string()
})
.as_str();
_paged_get(
suburl,
match order_by {
Some(x) => Some(x),
None => Some("dag_id".into()),
},
Some(PagingParameters { limit, offset }),
MAX_BATCH_SIZE,
|x| self._get_logged_in(x),
)
.await
}

/// Return DAG runs from newest to oldest.
/// Optionally only return DAG runs updated between a certain time frame.
pub async fn dag_runs(
Expand Down
60 changes: 55 additions & 5 deletions rollout-dashboard/server/src/frontend_api.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::airflow_client::{
AirflowClient, AirflowError, DagRunState, DagRunsResponseItem, EventLogsResponseFilters,
TaskInstanceRequestFilters, TaskInstanceState, TaskInstancesResponseItem, TasksResponse,
TasksResponseItem,
AirflowClient, AirflowError, DagRunState, DagRunsResponseItem, DagsQueryFilter, DagsResponse,
DagsResponseItem, EventLogsResponseFilters, TaskInstanceRequestFilters, TaskInstanceState,
TaskInstancesResponseItem, TasksResponse, TasksResponseItem,
};
use crate::python;
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -600,6 +600,42 @@ pub struct RolloutDataCacheResponse {
linearized_task_instances: Vec<TaskInstancesResponseItem>,
}

#[derive(Serialize, Debug, Clone)]
#[serde(rename_all = "snake_case")]
pub enum RolloutEngineState {
Missing,
Broken,
Paused,
Inactive,
Active,
}

impl From<DagsResponse> for RolloutEngineState {
fn from(resp: DagsResponse) -> Self {
match resp.dags.len() {
0 => RolloutEngineState::Missing,
_ => match resp.dags[0] {
DagsResponseItem {
has_import_errors: true,
..
} => Self::Broken,
DagsResponseItem {
is_paused: true, ..
} => Self::Paused,
DagsResponseItem {
is_active: false, ..
} => Self::Inactive,
DagsResponseItem {
is_active: true,
is_paused: false,
has_import_errors: false,
..
} => Self::Active,
},
}
}
}

struct RolloutApiCache {
/// Map from DAG run ID to task instance ID (with / without index)
/// to task instance.
Expand Down Expand Up @@ -1296,7 +1332,7 @@ impl RolloutApi {
pub async fn get_rollout_data(
&self,
max_rollouts: usize,
) -> Result<Rollouts, RolloutDataGatherError> {
) -> Result<(RolloutEngineState, Rollouts), RolloutDataGatherError> {
let mut cache = self.cache.lock().await;
let dag_id = "rollout_ic_os_to_mainnet_subnets";

Expand All @@ -1307,6 +1343,20 @@ impl RolloutApi {
.incrementally_detect_dag_updates(&self.airflow_api, dag_id)
.await?;

// Retrieve the latest X DAG runs.
let engine_state: RolloutEngineState = self
.airflow_api
.dags(
1000,
0,
&DagsQueryFilter {
dag_id_pattern: Some(&dag_id.to_string()),
},
None,
)
.await?
.into();

// Retrieve the latest X DAG runs.
let dag_runs = self
.airflow_api
Expand Down Expand Up @@ -1363,6 +1413,6 @@ impl RolloutApi {
// Save the state of the log inspector after everything was successful.
cache.log_inspector = updated_log_inspector;

Ok(res)
Ok((engine_state, res))
}
}
Loading

0 comments on commit c72d831

Please sign in to comment.