diff --git a/src/handlers/http/alerts/alerts_utils.rs b/src/handlers/http/alerts/alerts_utils.rs index 245da6329..b4241b67b 100644 --- a/src/handlers/http/alerts/alerts_utils.rs +++ b/src/handlers/http/alerts/alerts_utils.rs @@ -18,6 +18,12 @@ use datafusion::{ common::tree_node::TreeNode, + functions_aggregate::{ + count::count, + expr_fn::avg, + min_max::{max, min}, + sum::sum, + }, prelude::{col, lit, Expr}, }; use tracing::trace; @@ -77,8 +83,8 @@ pub async fn user_auth_for_query(session_key: &SessionKey, query: &str) -> Resul } /// This function contains the logic to run the alert evaluation task -pub async fn evaluate_alert(alert: AlertConfig) -> Result<(), AlertError> { - println!("RUNNING EVAL TASK FOR- {alert:?}"); +pub async fn evaluate_alert(alert: &AlertConfig) -> Result<(), AlertError> { + trace!("RUNNING EVAL TASK FOR- {alert:?}"); let (start_time, end_time) = match &alert.eval_type { super::EvalConfig::RollingWindow(rolling_window) => { @@ -87,13 +93,11 @@ pub async fn evaluate_alert(alert: AlertConfig) -> Result<(), AlertError> { }; let session_state = QUERY_SESSION.state(); - let raw_logical_plan = session_state - .create_logical_plan(&alert.query) - .await - .unwrap(); + let raw_logical_plan = session_state.create_logical_plan(&alert.query).await?; // TODO: Filter tags should be taken care of!!! - let time_range = TimeRange::parse_human_time(start_time, end_time).unwrap(); + let time_range = TimeRange::parse_human_time(start_time, end_time) + .map_err(|err| AlertError::CustomError(err.to_string()))?; let query = crate::query::Query { raw_logical_plan, time_range, @@ -102,12 +106,28 @@ pub async fn evaluate_alert(alert: AlertConfig) -> Result<(), AlertError> { // for now proceed in a similar fashion as we do in query // TODO: in case of multiple table query does the selection of time partition make a difference? (especially when the tables don't have overlapping data) - let stream_name = query.first_table_name().unwrap(); + let stream_name = if let Some(stream_name) = query.first_table_name() { + stream_name + } else { + return Err(AlertError::CustomError(format!( + "Table name not found in query- {}", + alert.query + ))); + }; - let df = query.get_dataframe(stream_name).await.unwrap(); + let df = query + .get_dataframe(stream_name) + .await + .map_err(|err| AlertError::CustomError(err.to_string()))?; // let df = DataFrame::new(session_state, raw_logical_plan); + // for now group by is empty, we can include this later + let group_expr = vec![]; + + // agg expression + let mut aggr_expr = vec![]; + let mut expr = Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(true))); for threshold in &alert.thresholds { let res = match threshold.operator { @@ -137,10 +157,29 @@ pub async fn evaluate_alert(alert: AlertConfig) -> Result<(), AlertError> { } }; + aggr_expr.push(match threshold.agg { + crate::handlers::http::alerts::Aggregate::Avg => { + avg(col(&threshold.column)).alias(&threshold.column) + } + crate::handlers::http::alerts::Aggregate::Count => { + count(col(&threshold.column)).alias(&threshold.column) + } + crate::handlers::http::alerts::Aggregate::Min => { + min(col(&threshold.column)).alias(&threshold.column) + } + crate::handlers::http::alerts::Aggregate::Max => { + max(col(&threshold.column)).alias(&threshold.column) + } + crate::handlers::http::alerts::Aggregate::Sum => { + sum(col(&threshold.column)).alias(&threshold.column) + } + }); expr = expr.and(res); } - let nrows = df.clone().filter(expr).unwrap().count().await.unwrap(); + let df = df.aggregate(group_expr, aggr_expr)?; + + let nrows = df.clone().filter(expr)?.count().await?; trace!("dataframe-\n{:?}", df.collect().await); if nrows > 0 { diff --git a/src/handlers/http/alerts/http_handlers.rs b/src/handlers/http/alerts/http_handlers.rs index d40b95075..b9ab9a7b2 100644 --- a/src/handlers/http/alerts/http_handlers.rs +++ b/src/handlers/http/alerts/http_handlers.rs @@ -18,15 +18,17 @@ use crate::{ option::CONFIG, - storage::object_storage::alert_json_path, + storage::{object_storage::alert_json_path, ALERTS_ROOT_DIRECTORY, PARSEABLE_ROOT_DIRECTORY}, sync::schedule_alert_task, - utils::{actix::extract_session_key_from_req, uid::Uid}, + utils::actix::extract_session_key_from_req, }; use actix_web::{web, HttpRequest, Responder}; use bytes::Bytes; -use tracing::warn; +use relative_path::RelativePathBuf; -use super::{alerts_utils::user_auth_for_query, AlertConfig, AlertError, AlertState, ALERTS}; +use super::{ + alerts_utils::user_auth_for_query, AlertConfig, AlertError, AlertRequest, AlertState, ALERTS, +}; // GET /alerts /// User needs at least a read access to the stream(s) that is being referenced in an alert @@ -39,10 +41,11 @@ pub async fn list(req: HttpRequest) -> Result { } // POST /alerts -pub async fn post(req: HttpRequest, alert: AlertConfig) -> Result { +pub async fn post(req: HttpRequest, alert: AlertRequest) -> Result { + let alert: AlertConfig = alert.into(); // validate the incoming alert query // does the user have access to these tables or not? - let session_key = extract_session_key_from_req(&req).unwrap(); + let session_key = extract_session_key_from_req(&req)?; user_auth_for_query(&session_key, &alert.query).await?; // now that we've validated that the user can run this query @@ -71,18 +74,27 @@ pub async fn get(req: HttpRequest) -> Result { .get("alert_id") .ok_or(AlertError::Metadata("No alert ID Provided"))?; - let alert = ALERTS.get_alert_by_id(session_key, id).await?; + let alert = ALERTS.get_alert_by_id(id).await?; + // validate that the user has access to the tables mentioned + user_auth_for_query(&session_key, &alert.query).await?; + Ok(web::Json(alert)) } // DELETE /alerts/{alert_id} /// Deletion should happen from disk, sheduled tasks, then memory pub async fn delete(req: HttpRequest) -> Result { + let session_key = extract_session_key_from_req(&req)?; let alert_id = req .match_info() .get("alert_id") .ok_or(AlertError::Metadata("No alert ID Provided"))?; + let alert = ALERTS.get_alert_by_id(alert_id).await?; + + // validate that the user has access to the tables mentioned + user_auth_for_query(&session_key, &alert.query).await?; + // delete from disk and memory ALERTS.delete(alert_id).await?; @@ -95,31 +107,34 @@ pub async fn delete(req: HttpRequest) -> Result { // PUT /alerts/{alert_id} /// first save on disk, then in memory /// then modify scheduled task -pub async fn modify( - req: HttpRequest, - mut alert: AlertConfig, -) -> Result { +pub async fn modify(req: HttpRequest, alert: AlertRequest) -> Result { let session_key = extract_session_key_from_req(&req)?; let alert_id = req .match_info() .get("alert_id") .ok_or(AlertError::Metadata("No alert ID Provided"))?; - // ensure that the user doesn't unknowingly change the ID - if alert_id != alert.id.to_string() { - warn!("Alert modify request is trying to change Alert ID, reverting ID"); - alert.id = Uid::from_string(alert_id) - .map_err(|_| AlertError::CustomError("Unable to get Uid from String".to_owned()))?; - } + // check if alert id exists in map + ALERTS.get_alert_by_id(alert_id).await?; // validate that the user has access to the tables mentioned user_auth_for_query(&session_key, &alert.query).await?; - // // fetch the alert from this ID to get AlertState - // let state = ALERTS.get_alert_by_id(session_key, alert_id).await?.state; - let store = CONFIG.storage().get_object_store(); + // fetch the alert object for the relevant ID + let old_alert_config: AlertConfig = serde_json::from_slice( + &store + .get_object(&RelativePathBuf::from_iter([ + PARSEABLE_ROOT_DIRECTORY, + ALERTS_ROOT_DIRECTORY, + &format!("{alert_id}.json"), + ])) + .await?, + )?; + + let alert = alert.modify(old_alert_config); + // modify on disk store.put_alert(&alert.id.to_string(), &alert).await?; @@ -136,11 +151,18 @@ pub async fn modify( // PUT /alerts/{alert_id}/update_state pub async fn update_state(req: HttpRequest, state: String) -> Result { + let session_key = extract_session_key_from_req(&req)?; let alert_id = req .match_info() .get("alert_id") .ok_or(AlertError::Metadata("No alert ID Provided"))?; + // check if alert id exists in map + let alert = ALERTS.get_alert_by_id(alert_id).await?; + + // validate that the user has access to the tables mentioned + user_auth_for_query(&session_key, &alert.query).await?; + // get current state let current_state = ALERTS.get_state(alert_id).await?; diff --git a/src/handlers/http/alerts/mod.rs b/src/handlers/http/alerts/mod.rs index 2a01d2bd9..3df06aa76 100644 --- a/src/handlers/http/alerts/mod.rs +++ b/src/handlers/http/alerts/mod.rs @@ -19,7 +19,7 @@ use actix_web::http::header::ContentType; use actix_web::web::Json; use actix_web::{FromRequest, HttpRequest}; -use alerts_utils::user_auth_for_query; +use alerts_utils::{evaluate_alert, user_auth_for_query}; use async_trait::async_trait; use http::StatusCode; use once_cell::sync::Lazy; @@ -227,27 +227,22 @@ impl Display for AlertState { #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] #[serde(rename_all = "camelCase")] -pub struct AlertConfig { +pub struct AlertRequest { pub version: AlertVerison, - #[serde(default = "crate::utils::uid::gen")] - pub id: uid::Uid, pub title: String, pub query: String, pub alert_type: AlertType, pub thresholds: Vec, pub eval_type: EvalConfig, pub targets: Vec, - // for new alerts, state should be resolved - #[serde(default = "AlertState::default")] - pub state: AlertState, } -impl FromRequest for AlertConfig { +impl FromRequest for AlertRequest { type Error = actix_web::Error; type Future = Pin>>>; fn from_request(req: &HttpRequest, payload: &mut actix_web::dev::Payload) -> Self::Future { - let body = Json::::from_request(req, payload); + let body = Json::::from_request(req, payload); let fut = async move { let body = body.await?.into_inner(); Ok(body) @@ -257,6 +252,55 @@ impl FromRequest for AlertConfig { } } +impl AlertRequest { + pub fn modify(self, alert: AlertConfig) -> AlertConfig { + AlertConfig { + version: self.version, + id: alert.id, + title: self.title, + query: self.query, + alert_type: self.alert_type, + thresholds: self.thresholds, + eval_type: self.eval_type, + targets: self.targets, + state: alert.state, + } + } +} + +impl From for AlertConfig { + fn from(val: AlertRequest) -> AlertConfig { + AlertConfig { + version: val.version, + id: crate::utils::uid::gen(), + title: val.title, + query: val.query, + alert_type: val.alert_type, + thresholds: val.thresholds, + eval_type: val.eval_type, + targets: val.targets, + state: AlertState::default(), + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct AlertConfig { + pub version: AlertVerison, + #[serde(default = "crate::utils::uid::gen")] + pub id: uid::Uid, + pub title: String, + pub query: String, + pub alert_type: AlertType, + pub thresholds: Vec, + pub eval_type: EvalConfig, + pub targets: Vec, + // for new alerts, state should be resolved + #[serde(default = "AlertState::default")] + pub state: AlertState, +} + impl AlertConfig { pub fn get_eval_frequency(&self) -> u32 { match &self.eval_type { @@ -338,6 +382,7 @@ impl actix_web::ResponseError for AlertError { impl Alerts { /// Loads alerts from disk /// spawn scheduled tasks + /// Evaluate pub async fn load(&self) -> Result<(), AlertError> { let mut this = vec![]; let store = CONFIG.storage().get_object_store(); @@ -359,7 +404,13 @@ impl Alerts { } let mut s = self.alerts.write().await; - s.append(&mut this); + s.append(&mut this.clone()); + drop(s); + + // run eval task once for each alert + for alert in this.iter() { + evaluate_alert(alert).await?; + } Ok(()) } @@ -382,24 +433,9 @@ impl Alerts { } /// Returns a sigle alert that the user has access to (based on query auth) - pub async fn get_alert_by_id( - &self, - session: SessionKey, - id: &str, - ) -> Result { - let mut alert = None; - for a in self.alerts.read().await.iter() { - if a.id.to_string() == id { - let query = &a.query; - match user_auth_for_query(&session, query).await { - Ok(_) => { - alert = Some(a.clone()); - break; - } - Err(err) => return Err(err), - } - } - } + pub async fn get_alert_by_id(&self, id: &str) -> Result { + let read_access = self.alerts.read().await; + let alert = read_access.iter().find(|a| a.id.to_string() == id); if let Some(alert) = alert { Ok(alert.clone()) @@ -425,21 +461,32 @@ impl Alerts { trigger_notif: bool, ) -> Result<(), AlertError> { let store = CONFIG.storage().get_object_store(); - let alert_path = alert_json_path(alert_id); + // let alert_path = alert_json_path(alert_id); - // read and modify alert - let mut alert: AlertConfig = serde_json::from_slice(&store.get_object(&alert_path).await?)?; - alert.state = new_state; + // // read and modify alert + // let mut alert: AlertConfig = serde_json::from_slice(&store.get_object(&alert_path).await?)?; + // alert.state = new_state; + let mut alert = self.get_alert_by_id(alert_id).await?; + alert.state = new_state; // save to disk store.put_alert(alert_id, &alert).await?; // modify in memory - self.alerts.write().await.iter_mut().for_each(|alert| { - if alert.id.to_string() == alert_id { - alert.state = new_state; - } - }); + let mut writer = self.alerts.write().await; + let alert_to_update = writer + .iter_mut() + .find(|alert| alert.id.to_string() == alert_id); + if let Some(alert) = alert_to_update { + alert.state = new_state; + }; + drop(writer); + + // self.alerts.write().await.iter_mut().for_each(|alert| { + // if alert.id.to_string() == alert_id { + // alert.state = new_state; + // } + // }); if trigger_notif { alert.trigger_notifications().await?; diff --git a/src/handlers/http/alerts/target.rs b/src/handlers/http/alerts/target.rs index 3ab7014c7..1f3e36d86 100644 --- a/src/handlers/http/alerts/target.rs +++ b/src/handlers/http/alerts/target.rs @@ -28,7 +28,7 @@ use chrono::Utc; use http::{header::AUTHORIZATION, HeaderMap, HeaderValue}; use humantime_serde::re::humantime; use reqwest::ClientBuilder; -use tracing::{error, trace}; +use tracing::{error, trace, warn}; use crate::handlers::http::alerts::ALERTS; @@ -60,6 +60,7 @@ pub struct Target { impl Target { pub fn call(&self, context: Context) { + trace!("context- {context:?}"); let timeout = &self.timeout; let resolves = context.alert_info.alert_state; let mut state = timeout.state.lock().unwrap(); @@ -124,7 +125,14 @@ impl Target { tokio::spawn(async move { match retry { Retry::Infinite => loop { - let current_state = ALERTS.get_state(&alert_id).await.unwrap(); + let current_state = if let Ok(state) = ALERTS.get_state(&alert_id).await { + state + } else { + state.lock().unwrap().timed_out = true; + warn!("Unable to fetch state for given alert_id- {alert_id}, stopping target notifs"); + return; + }; + let should_call = sleep_and_check_if_call(Arc::clone(&state), current_state).await; if should_call { @@ -133,22 +141,30 @@ impl Target { }, Retry::Finite(times) => { for _ in 0..times { - let current_state = ALERTS.get_state(&alert_id).await.unwrap(); + let current_state = if let Ok(state) = ALERTS.get_state(&alert_id).await { + state + } else { + state.lock().unwrap().timed_out = true; + warn!("Unable to fetch state for given alert_id- {alert_id}, stopping target notifs"); + return; + }; + println!("current_state= {state:?}"); + let should_call = sleep_and_check_if_call(Arc::clone(&state), current_state).await; if should_call { call_target(target.clone(), alert_context.clone()) } } - // fallback for if this task only observed FIRING on all RETRIES - // Stream might be dead and sending too many alerts is not great - // Send and alert stating that this alert will only work once it has seen a RESOLVE - state.lock().unwrap().timed_out = false; - let context = alert_context; - // context.alert_info.message = format!( - // "Triggering alert did not resolve itself after {times} retries, This alert is paused until it resolves"); - // Send and exit this task. - call_target(target, context); + // // fallback for if this task only observed FIRING on all RETRIES + // // Stream might be dead and sending too many alerts is not great + // // Send and alert stating that this alert will only work once it has seen a RESOLVE + // state.lock().unwrap().timed_out = false; + // let context = alert_context; + // // context.alert_info.message = format!( + // // "Triggering alert did not resolve itself after {times} retries, This alert is paused until it resolves"); + // // Send and exit this task. + // call_target(target, context); } } }); @@ -156,6 +172,7 @@ impl Target { } fn call_target(target: TargetType, context: Context) { + trace!("Calling target with context- {context:?}"); tokio::spawn(async move { target.call(&context).await }); } diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 6d3b28d10..9d3d6c5bb 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -68,6 +68,7 @@ impl ParseableServer for QueryServer { .service(Server::get_oauth_webscope(oidc_client)) .service(Self::get_user_role_webscope()) .service(Server::get_metrics_webscope()) + .service(Server::get_alerts_webscope()) .service(Self::get_cluster_web_scope()), ) .service(Server::get_generated()); @@ -297,21 +298,6 @@ impl QueryServer { .authorize_for_stream(Action::GetStreamInfo), ), ) - // .service( - // web::resource("/alert") - // // PUT "/logstream/{logstream}/alert" ==> Set alert for given log stream - // .route( - // web::put() - // .to(logstream::put_alert) - // .authorize_for_stream(Action::PutAlert), - // ) - // // GET "/logstream/{logstream}/alert" ==> Get alert for given log stream - // .route( - // web::get() - // .to(logstream::get_alert) - // .authorize_for_stream(Action::GetAlert), - // ), - // ) .service( // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream web::resource("/schema").route( diff --git a/src/sync.rs b/src/sync.rs index 83ae9c38a..e5b5a8edb 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -158,7 +158,7 @@ pub async fn schedule_alert_task( scheduler.every((eval_frequency).minutes()).run(move || { let alert_val = alert.clone(); async move { - match alerts_utils::evaluate_alert(alert_val).await { + match alerts_utils::evaluate_alert(&alert_val).await { Ok(_) => {} Err(err) => error!("Error while evaluation- {err}"), }