Skip to content

Commit

Permalink
refactor: store correlations as a mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh committed Jan 17, 2025
1 parent 6f237ce commit 66a9b3a
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 97 deletions.
151 changes: 94 additions & 57 deletions src/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,25 @@
*
*/

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

use actix_web::{http::header::ContentType, Error};
use chrono::Utc;
use datafusion::error::DataFusionError;
use http::StatusCode;
use itertools::Itertools;
use once_cell::sync::Lazy;
use relative_path::RelativePathBuf;
use serde::{Deserialize, Serialize};
use serde_json::Error as SerdeError;
use tokio::sync::RwLock;
use tracing::{error, trace, warn};
use tracing::error;

use crate::{
handlers::http::rbac::RBACError,
handlers::http::{
rbac::RBACError,
users::{CORRELATION_DIR, USERS_ROOT_DIR},
},
option::CONFIG,
query::QUERY_SESSION,
rbac::{map::SessionKey, Users},
Expand All @@ -41,28 +45,32 @@ use crate::{

pub static CORRELATIONS: Lazy<Correlation> = Lazy::new(Correlation::default);

type CorrelationMap = HashMap<CorrelationId, CorrelationConfig>;

#[derive(Debug, Default, derive_more::Deref)]
pub struct Correlation(RwLock<Vec<CorrelationConfig>>);
pub struct Correlation(RwLock<HashMap<UserId, CorrelationMap>>);

impl Correlation {
// Load correlations from storage
pub async fn load(&self) -> anyhow::Result<()> {
let store = CONFIG.storage().get_object_store();
let all_correlations = store.get_all_correlations().await.unwrap_or_default();

let correlations: Vec<CorrelationConfig> = all_correlations
.into_iter()
.flat_map(|(_, correlations_bytes)| correlations_bytes)
.filter_map(|correlation| {
serde_json::from_slice(&correlation)
.inspect_err(|e| {
error!("Unable to load correlation: {e}");
})
.ok()
})
.collect();

self.write().await.extend(correlations);
for correlations_bytes in all_correlations.values().flat_map(|c| c) {
let Ok(correlation) = serde_json::from_slice::<CorrelationConfig>(&correlations_bytes)
.inspect_err(|e| {
error!("Unable to load correlation file : {e}");
})
else {
continue;
};

self.write()
.await
.entry(correlation.user_id.to_owned())
.or_insert_with(HashMap::new)
.insert(correlation.id.to_owned(), correlation);
}

Ok(())
}
Expand All @@ -72,21 +80,26 @@ impl Correlation {
session_key: &SessionKey,
user_id: &str,
) -> Result<Vec<CorrelationConfig>, CorrelationError> {
let correlations = self.read().await.iter().cloned().collect_vec();
let Some(correlations) = self.read().await.get(user_id).cloned() else {
return Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!(
"Unable to find correlations for user - {user_id}"
))));
};

let mut user_correlations = vec![];
let permissions = Users.get_permissions(session_key);

for c in correlations {
let tables = &c
for correlation in correlations.values() {
let tables = &correlation
.table_configs
.iter()
.map(|t| t.table_name.clone())
.collect_vec();
if user_auth_for_query(&permissions, tables).is_ok() && c.user_id == user_id {
user_correlations.push(c);
if user_auth_for_query(&permissions, tables).is_ok() && correlation.user_id == user_id {
user_correlations.push(correlation.clone());
}
}

Ok(user_correlations)
}

Expand All @@ -95,45 +108,57 @@ impl Correlation {
correlation_id: &str,
user_id: &str,
) -> Result<CorrelationConfig, CorrelationError> {
let read = self.read().await;
let correlation = read
.iter()
.find(|c| c.id == correlation_id && c.user_id == user_id)
.cloned();

correlation.ok_or_else(|| {
CorrelationError::AnyhowError(anyhow::Error::msg(format!(
"Unable to find correlation with ID- {correlation_id}"
)))
})
self.read()
.await
.get(user_id)
.and_then(|correlations| correlations.get(correlation_id))
.cloned()
.ok_or_else(|| {
CorrelationError::AnyhowError(anyhow::Error::msg(format!(
"Unable to find correlation with ID- {correlation_id}"
)))
})
}

/// Insert new or replace existing correlation for the user and with the same ID
pub async fn update(&self, correlation: &CorrelationConfig) -> Result<(), CorrelationError> {
// save to memory
let mut s = self.write().await;
s.retain(|c| c.id != correlation.id);
s.push(correlation.clone());
// Update in storage
let correlation_bytes = serde_json::to_vec(&correlation)?.into();
let path = correlation.path();
CONFIG
.storage()
.get_object_store()
.put_object(&path, correlation_bytes)
.await?;

// Update in memory
self.write()
.await
.entry(correlation.user_id.to_owned())
.or_insert_with(HashMap::new)
.insert(correlation.id.to_owned(), correlation.clone());

Ok(())
}

pub async fn delete(&self, correlation_id: &str) -> Result<(), CorrelationError> {
// now delete from memory
let read_access = self.read().await;
/// Delete correlation from memory and storage
pub async fn delete(&self, correlation: &CorrelationConfig) -> Result<(), CorrelationError> {
// Delete from memory
self.write()
.await
.entry(correlation.user_id.to_owned())
.and_modify(|correlations| {
correlations.remove(&correlation.id);
});

// Delete from storage
let path = correlation.path();
CONFIG
.storage()
.get_object_store()
.delete_object(&path)
.await?;

let index = read_access
.iter()
.enumerate()
.find(|(_, c)| c.id == correlation_id)
.to_owned();

if let Some((index, _)) = index {
// drop the read access in order to get exclusive write access
drop(read_access);
self.0.write().await.remove(index);
trace!("removed correlation from memory");
} else {
warn!("Correlation ID- {correlation_id} not found in memory!");
}
Ok(())
}
}
Expand All @@ -144,21 +169,33 @@ pub enum CorrelationVersion {
V1,
}

type CorrelationId = String;
type UserId = String;

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CorrelationConfig {
pub version: CorrelationVersion,
pub title: String,
pub id: String,
pub user_id: String,
pub id: CorrelationId,
pub user_id: UserId,
pub table_configs: Vec<TableConfig>,
pub join_config: JoinConfig,
pub filter: Option<FilterQuery>,
pub start_time: Option<String>,
pub end_time: Option<String>,
}

impl CorrelationConfig {}
impl CorrelationConfig {
pub fn path(&self) -> RelativePathBuf {
RelativePathBuf::from_iter([
USERS_ROOT_DIR,
&self.user_id,
CORRELATION_DIR,
&format!("{}.json", self.id),
])
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
Expand Down
40 changes: 10 additions & 30 deletions src/handlers/http/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
*
*/

use actix_web::web::Path;
use actix_web::{web, HttpRequest, HttpResponse, Responder};
use anyhow::Error;
use bytes::Bytes;
use itertools::Itertools;

use crate::rbac::Users;
use crate::storage::object_storage::correlation_path;
use crate::utils::{get_hash, get_user_from_request, user_auth_for_query};
use crate::{option::CONFIG, utils::actix::extract_session_key_from_req};

Expand Down Expand Up @@ -76,24 +76,12 @@ pub async fn get(req: HttpRequest) -> Result<impl Responder, CorrelationError> {
pub async fn post(req: HttpRequest, body: Bytes) -> Result<impl Responder, CorrelationError> {
let session_key = extract_session_key_from_req(&req)
.map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
let user_id = get_user_from_request(&req)
.map(|s| get_hash(&s.to_string()))
.map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;

let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?;

correlation_request.validate(&session_key).await?;

let mut correlation: CorrelationConfig = correlation_request.into();
correlation.user_id.clone_from(&user_id);
let correlation_id = &correlation.id;
let path = correlation_path(&user_id, &format!("{}.json", correlation_id));

let store = CONFIG.storage().get_object_store();
let correlation_bytes = serde_json::to_vec(&correlation)?;
store
.put_object(&path, Bytes::from(correlation_bytes))
.await?;
let correlation: CorrelationConfig = correlation_request.into();

// Save to memory
CORRELATIONS.update(&correlation).await?;
Expand Down Expand Up @@ -132,8 +120,7 @@ pub async fn modify(req: HttpRequest, body: Bytes) -> Result<impl Responder, Cor
let correlation =
correlation_request.generate_correlation_config(correlation_id.to_owned(), user_id.clone());

let correlation_id = &correlation.id;
let path = correlation_path(&user_id, &format!("{}.json", correlation_id));
let path = correlation.path();

let store = CONFIG.storage().get_object_store();
let correlation_bytes = serde_json::to_vec(&correlation)?;
Expand All @@ -147,20 +134,19 @@ pub async fn modify(req: HttpRequest, body: Bytes) -> Result<impl Responder, Cor
Ok(web::Json(correlation))
}

pub async fn delete(req: HttpRequest) -> Result<impl Responder, CorrelationError> {
pub async fn delete(
req: HttpRequest,
correlation_id: Path<String>,
) -> Result<impl Responder, CorrelationError> {
let correlation_id = correlation_id.into_inner();
let session_key = extract_session_key_from_req(&req)
.map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
let user_id = get_user_from_request(&req)
.map(|s| get_hash(&s.to_string()))
.map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;

let correlation_id = req
.match_info()
.get("correlation_id")
.ok_or(CorrelationError::Metadata("No correlation ID Provided"))?;

let correlation = CORRELATIONS
.get_correlation(correlation_id, &user_id)
.get_correlation(&correlation_id, &user_id)
.await?;

// validate user's query auth
Expand All @@ -173,13 +159,7 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, CorrelationError

user_auth_for_query(&permissions, tables)?;

let correlation_id = &correlation.id;
let path = correlation_path(&user_id, &format!("{}.json", correlation_id));

let store = CONFIG.storage().get_object_store();
store.delete_object(&path).await?;
CORRELATIONS.delete(&correlation).await?;

// Delete from memory
CORRELATIONS.delete(correlation_id).await?;
Ok(HttpResponse::Ok().finish())
}
11 changes: 1 addition & 10 deletions src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use super::{

use crate::event::format::LogSource;
use crate::handlers::http::modal::ingest_server::INGESTOR_META;
use crate::handlers::http::users::{CORRELATION_DIR, DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR};
use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR};
use crate::metadata::SchemaVersion;
use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE};
use crate::option::Mode;
Expand Down Expand Up @@ -698,15 +698,6 @@ pub fn filter_path(user_id: &str, stream_name: &str, filter_file_name: &str) ->
])
}

pub fn correlation_path(user_id: &str, correlation_file_name: &str) -> RelativePathBuf {
RelativePathBuf::from_iter([
USERS_ROOT_DIR,
user_id,
CORRELATION_DIR,
correlation_file_name,
])
}

/// path will be ".parseable/.parsable.json"
#[inline(always)]
pub fn parseable_json_path() -> RelativePathBuf {
Expand Down

0 comments on commit 66a9b3a

Please sign in to comment.