diff --git a/src/correlation.rs b/src/correlation.rs index 706a6734b..81001ee37 100644 --- a/src/correlation.rs +++ b/src/correlation.rs @@ -16,7 +16,7 @@ * */ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use actix_web::{http::header::ContentType, Error}; use chrono::Utc; @@ -24,13 +24,17 @@ use datafusion::error::DataFusionError; use http::StatusCode; use itertools::Itertools; use once_cell::sync::Lazy; +use relative_path::RelativePathBuf; use serde::{Deserialize, Serialize}; use serde_json::Error as SerdeError; use tokio::sync::RwLock; -use tracing::{error, trace, warn}; +use tracing::error; use crate::{ - handlers::http::rbac::RBACError, + handlers::http::{ + rbac::RBACError, + users::{CORRELATION_DIR, USERS_ROOT_DIR}, + }, option::CONFIG, query::QUERY_SESSION, rbac::{map::SessionKey, Users}, @@ -41,8 +45,10 @@ use crate::{ pub static CORRELATIONS: Lazy = Lazy::new(Correlation::default); +type CorrelationMap = HashMap; + #[derive(Debug, Default, derive_more::Deref)] -pub struct Correlation(RwLock>); +pub struct Correlation(RwLock>); impl Correlation { // Load correlations from storage @@ -50,19 +56,21 @@ impl Correlation { let store = CONFIG.storage().get_object_store(); let all_correlations = store.get_all_correlations().await.unwrap_or_default(); - let correlations: Vec = all_correlations - .into_iter() - .flat_map(|(_, correlations_bytes)| correlations_bytes) - .filter_map(|correlation| { - serde_json::from_slice(&correlation) - .inspect_err(|e| { - error!("Unable to load correlation: {e}"); - }) - .ok() - }) - .collect(); - - self.write().await.extend(correlations); + for correlations_bytes in all_correlations.values().flat_map(|c| c) { + let Ok(correlation) = serde_json::from_slice::(&correlations_bytes) + .inspect_err(|e| { + error!("Unable to load correlation file : {e}"); + }) + else { + continue; + }; + + self.write() + .await + .entry(correlation.user_id.to_owned()) + .or_insert_with(HashMap::new) + .insert(correlation.id.to_owned(), correlation); + } Ok(()) } @@ -72,21 +80,26 @@ impl Correlation { session_key: &SessionKey, user_id: &str, ) -> Result, CorrelationError> { - let correlations = self.read().await.iter().cloned().collect_vec(); + let Some(correlations) = self.read().await.get(user_id).cloned() else { + return Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!( + "Unable to find correlations for user - {user_id}" + )))); + }; let mut user_correlations = vec![]; let permissions = Users.get_permissions(session_key); - for c in correlations { - let tables = &c + for correlation in correlations.values() { + let tables = &correlation .table_configs .iter() .map(|t| t.table_name.clone()) .collect_vec(); - if user_auth_for_query(&permissions, tables).is_ok() && c.user_id == user_id { - user_correlations.push(c); + if user_auth_for_query(&permissions, tables).is_ok() && correlation.user_id == user_id { + user_correlations.push(correlation.clone()); } } + Ok(user_correlations) } @@ -95,45 +108,57 @@ impl Correlation { correlation_id: &str, user_id: &str, ) -> Result { - let read = self.read().await; - let correlation = read - .iter() - .find(|c| c.id == correlation_id && c.user_id == user_id) - .cloned(); - - correlation.ok_or_else(|| { - CorrelationError::AnyhowError(anyhow::Error::msg(format!( - "Unable to find correlation with ID- {correlation_id}" - ))) - }) + self.read() + .await + .get(user_id) + .and_then(|correlations| correlations.get(correlation_id)) + .cloned() + .ok_or_else(|| { + CorrelationError::AnyhowError(anyhow::Error::msg(format!( + "Unable to find correlation with ID- {correlation_id}" + ))) + }) } + /// Insert new or replace existing correlation for the user and with the same ID pub async fn update(&self, correlation: &CorrelationConfig) -> Result<(), CorrelationError> { - // save to memory - let mut s = self.write().await; - s.retain(|c| c.id != correlation.id); - s.push(correlation.clone()); + // Update in storage + let correlation_bytes = serde_json::to_vec(&correlation)?.into(); + let path = correlation.path(); + CONFIG + .storage() + .get_object_store() + .put_object(&path, correlation_bytes) + .await?; + + // Update in memory + self.write() + .await + .entry(correlation.user_id.to_owned()) + .or_insert_with(HashMap::new) + .insert(correlation.id.to_owned(), correlation.clone()); + Ok(()) } - pub async fn delete(&self, correlation_id: &str) -> Result<(), CorrelationError> { - // now delete from memory - let read_access = self.read().await; + /// Delete correlation from memory and storage + pub async fn delete(&self, correlation: &CorrelationConfig) -> Result<(), CorrelationError> { + // Delete from memory + self.write() + .await + .entry(correlation.user_id.to_owned()) + .and_modify(|correlations| { + correlations.remove(&correlation.id); + }); + + // Delete from storage + let path = correlation.path(); + CONFIG + .storage() + .get_object_store() + .delete_object(&path) + .await?; - let index = read_access - .iter() - .enumerate() - .find(|(_, c)| c.id == correlation_id) - .to_owned(); - - if let Some((index, _)) = index { - // drop the read access in order to get exclusive write access - drop(read_access); - self.0.write().await.remove(index); - trace!("removed correlation from memory"); - } else { - warn!("Correlation ID- {correlation_id} not found in memory!"); - } Ok(()) } } @@ -144,13 +169,16 @@ pub enum CorrelationVersion { V1, } +type CorrelationId = String; +type UserId = String; + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct CorrelationConfig { pub version: CorrelationVersion, pub title: String, - pub id: String, - pub user_id: String, + pub id: CorrelationId, + pub user_id: UserId, pub table_configs: Vec, pub join_config: JoinConfig, pub filter: Option, @@ -158,7 +186,16 @@ pub struct CorrelationConfig { pub end_time: Option, } -impl CorrelationConfig {} +impl CorrelationConfig { + pub fn path(&self) -> RelativePathBuf { + RelativePathBuf::from_iter([ + USERS_ROOT_DIR, + &self.user_id, + CORRELATION_DIR, + &format!("{}.json", self.id), + ]) + } +} #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] diff --git a/src/handlers/http/correlation.rs b/src/handlers/http/correlation.rs index 08a9b13d2..531890e01 100644 --- a/src/handlers/http/correlation.rs +++ b/src/handlers/http/correlation.rs @@ -16,13 +16,13 @@ * */ +use actix_web::web::Path; use actix_web::{web, HttpRequest, HttpResponse, Responder}; use anyhow::Error; use bytes::Bytes; use itertools::Itertools; use crate::rbac::Users; -use crate::storage::object_storage::correlation_path; use crate::utils::{get_hash, get_user_from_request, user_auth_for_query}; use crate::{option::CONFIG, utils::actix::extract_session_key_from_req}; @@ -76,24 +76,12 @@ pub async fn get(req: HttpRequest) -> Result { pub async fn post(req: HttpRequest, body: Bytes) -> Result { let session_key = extract_session_key_from_req(&req) .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; - let user_id = get_user_from_request(&req) - .map(|s| get_hash(&s.to_string())) - .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?; correlation_request.validate(&session_key).await?; - let mut correlation: CorrelationConfig = correlation_request.into(); - correlation.user_id.clone_from(&user_id); - let correlation_id = &correlation.id; - let path = correlation_path(&user_id, &format!("{}.json", correlation_id)); - - let store = CONFIG.storage().get_object_store(); - let correlation_bytes = serde_json::to_vec(&correlation)?; - store - .put_object(&path, Bytes::from(correlation_bytes)) - .await?; + let correlation: CorrelationConfig = correlation_request.into(); // Save to memory CORRELATIONS.update(&correlation).await?; @@ -132,8 +120,7 @@ pub async fn modify(req: HttpRequest, body: Bytes) -> Result Result Result { +pub async fn delete( + req: HttpRequest, + correlation_id: Path, +) -> Result { + let correlation_id = correlation_id.into_inner(); let session_key = extract_session_key_from_req(&req) .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; let user_id = get_user_from_request(&req) .map(|s| get_hash(&s.to_string())) .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; - let correlation_id = req - .match_info() - .get("correlation_id") - .ok_or(CorrelationError::Metadata("No correlation ID Provided"))?; - let correlation = CORRELATIONS - .get_correlation(correlation_id, &user_id) + .get_correlation(&correlation_id, &user_id) .await?; // validate user's query auth @@ -173,13 +159,7 @@ pub async fn delete(req: HttpRequest) -> Result ]) } -pub fn correlation_path(user_id: &str, correlation_file_name: &str) -> RelativePathBuf { - RelativePathBuf::from_iter([ - USERS_ROOT_DIR, - user_id, - CORRELATION_DIR, - correlation_file_name, - ]) -} - /// path will be ".parseable/.parsable.json" #[inline(always)] pub fn parseable_json_path() -> RelativePathBuf {