Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support SSE in dashboard server and frontend. #18

Merged
merged 1 commit into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 37 additions & 41 deletions rollout-dashboard/frontend/src/lib/stores.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { get, writable } from 'svelte/store'
import { get, writable, type Writable } from 'svelte/store'
import { type Rollout } from './types'

const BACKEND_TIMEOUT = 15000
Expand All @@ -8,58 +8,54 @@ export type RolloutResult = {
rollouts: Rollout[];
}

export const rollout_query = (() => {
const store = writable<RolloutResult>({ rollouts: [], error: "loading" })

let updater = async () => {
const API_URL = import.meta.env.BACKEND_API_PATH || "/api/v1";
const url = API_URL + "/rollouts"
try {
const res = await fetch(url, { signal: AbortSignal.timeout(BACKEND_TIMEOUT) });
if (res.ok) {
if (res.status == 204) {
console.log('Data is not yet available. Retrying soon.')
store.set({ rollouts: [], error: "loading" })
} else {
let json = await res.json()
store.set({
rollouts: json,
error: null
})
}
setTimeout(updater, 5000)
const API_URL = import.meta.env.BACKEND_API_PATH || "/api/v1";
const url = API_URL + "/rollouts/sse"
var evtSource: EventSource;

const rollout_store = writable<RolloutResult>({ rollouts: [], error: "loading" })

function resetupEventSource() {
evtSource = new EventSource(url);
evtSource.onmessage = async function (event) {
var current_rollout_result = JSON.parse(event.data);
if (current_rollout_result.error !== null) {
let status = current_rollout_result.error[0];
if (status == 204) {
rollout_store.set({ rollouts: [], error: "loading" })
} else {
// Sometimes the API will fail!
// FIXME: we should handle this with an error shown to the user.
let responseText = await res.text()
let errorText = res.status + " " + res.statusText
let responseText = current_rollout_result.error[1];
let errorText = status + " " + responseText;
if (responseText) {
responseText = responseText.split("\n")[0]
errorText = errorText + ": " + responseText
} else if (res.status == 500) {
// An HTTP 500 error without status text from fetch() indicates
// that the fetch() call could not contact the backend server,
// rather than an HTTP error proper coming from the backend.
errorText = "not possible to contact rollout backend (network issues / backend down)"
}
console.log('Request for rollout data failed: ' + errorText)
store.set({
rollouts: get(store).rollouts,
rollout_store.set({
rollouts: get(rollout_store).rollouts,
error: errorText
})
setTimeout(updater, 15000)
}
} catch (e) {
let errorText = "the request to the backend has timed out (network issues / backend under high load)"
console.log('Request for rollout data failed: ' + errorText)
store.set({
rollouts: get(store).rollouts,
error: errorText
} else {
rollout_store.set({
rollouts: current_rollout_result.rollouts,
error: null
})
setTimeout(updater, 15000)
}
}
setTimeout(updater, 1)
evtSource.onerror = function (e) {
console.log("Disconnected from event source. Reconnecting in 5 seconds.")
evtSource.close();
var errorText = 'Rollout dashboard is down — reconnecting in 5 seconds'
rollout_store.set({
rollouts: get(rollout_store).rollouts,
error: errorText
})
setTimeout(resetupEventSource, 5000)
}
}

return store
export const rollout_query = ((): Writable<RolloutResult> => {
resetupEventSource()
return rollout_store
});
23 changes: 23 additions & 0 deletions rollout-dashboard/server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rollout-dashboard/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = "2021"

[dependencies]
async-recursion = "1.1.1"
async-stream = "0.3.5"
axum = { version = "0.7.5", features = ["macros"] }
axum-server = "0.6.0"
chrono = { version = "0.4.38", features = ["serde"] }
Expand Down
70 changes: 66 additions & 4 deletions rollout-dashboard/server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
use async_stream::try_stream;
use axum::http::StatusCode;
use axum::response::sse::Event;
use axum::response::Sse;
use axum::Json;
use axum::{routing::get, Router};
use chrono::{DateTime, Utc};
use log::{error, info};
use futures::stream::Stream;
use log::{debug, error, info};
use reqwest::Url;
use serde::Serialize;
use serde_json::from_str;
use std::collections::VecDeque;
use std::convert::Infallible;
use std::env;
use std::error::Error;
use std::future::Future;
use std::net::SocketAddr;
use std::process::ExitCode;
use std::sync::Arc;

use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::watch;
use tokio::sync::watch::{self, Sender};
use tokio::sync::Mutex;
use tokio::time::{sleep, Duration};
use tokio::{select, spawn};
Expand All @@ -36,13 +43,17 @@ type CurrentRolloutStatus = Result<VecDeque<Rollout>, (StatusCode, String)>;
struct Server {
rollout_api: Arc<RolloutApi>,
last_rollout_data: Arc<Mutex<CurrentRolloutStatus>>,
stream_tx: Sender<CurrentRolloutStatus>,
}

impl Server {
fn new(rollout_api: Arc<RolloutApi>) -> Self {
let init = Err((StatusCode::NO_CONTENT, "".to_string()));
let (stream_tx, _stream_rx) = watch::channel::<CurrentRolloutStatus>(init.clone());
Self {
rollout_api,
last_rollout_data: Arc::new(Mutex::new(Err((StatusCode::NO_CONTENT, "".to_string())))),
last_rollout_data: Arc::new(Mutex::new(init)),
stream_tx,
}
}
async fn fetch_rollout_data(
Expand Down Expand Up @@ -122,6 +133,8 @@ impl Server {
_ignored = &mut cancel => break,
};

let _ = self.stream_tx.send(data.clone());

let mut container = self.last_rollout_data.lock().await;
*container = data;
drop(container);
Expand All @@ -141,6 +154,49 @@ impl Server {
Err(e) => Err(e),
}
}
fn produce_rollouts_sse_stream(&self) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
debug!(target: "sse", "New client connected.");

struct DisconnectionGuard {}

impl Drop for DisconnectionGuard {
fn drop(&mut self) {
debug!(target: "sse", "Client disconnected.");
}
}

#[derive(Serialize)]
struct SseResponse {
rollouts: VecDeque<Rollout>,
error: Option<(u16, String)>,
}

let mut stream_rx = self.stream_tx.subscribe();
let stream = try_stream! {
let guard = DisconnectionGuard{};
loop {
let current_rollout_status = &stream_rx.borrow_and_update().clone();
let mfk = current_rollout_status.clone();
let data_to_serialize = match mfk {
Ok(rollouts) => serde_json::to_string(&SseResponse{rollouts, error: None}),
Err(e) => serde_json::to_string(&SseResponse{rollouts: VecDeque::new(), error: Some((e.0.as_u16(), e.1))}),
}.unwrap();
let event = Event::default().data(data_to_serialize);
yield event;
if stream_rx.changed().await.is_err() {
debug!(target: "sse", "No more transmissions. Stopping client SSE streaming.");
break;
}
}
drop(guard);
};

Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(5))
.text("keepalive"),
)
}
}

#[tokio::main]
Expand Down Expand Up @@ -173,9 +229,15 @@ async fn main() -> ExitCode {
let (stop_serve_tx, mut stop_serve_rx) = watch::channel(());
let (finish_loop_tx, mut finish_loop_rx) = watch::channel(());

let rollouts_handler = move || async move { server.get_rollout_data().await };
let server_for_rollouts_handler = server.clone();
let server_for_sse_handler = server.clone();
let rollouts_handler =
move || async move { server_for_rollouts_handler.get_rollout_data().await };
let rollouts_sse_handler =
move || async move { server_for_sse_handler.produce_rollouts_sse_stream() };
let mut tree = Router::new();
tree = tree.route("/api/v1/rollouts", get(rollouts_handler));
tree = tree.route("/api/v1/rollouts/sse", get(rollouts_sse_handler));
tree = tree.nest_service("/", ServeDir::new(frontend_static_dir));

let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
Expand Down
Loading