Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Correlations #1115

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
257 changes: 151 additions & 106 deletions src/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,25 @@
*
*/

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

use actix_web::{http::header::ContentType, Error};
use chrono::Utc;
use datafusion::error::DataFusionError;
use http::StatusCode;
use itertools::Itertools;
use once_cell::sync::Lazy;
use relative_path::RelativePathBuf;
use serde::{Deserialize, Serialize};
use serde_json::Error as SerdeError;
use tokio::sync::RwLock;
use tracing::{error, trace, warn};
use tracing::error;

use crate::{
handlers::http::rbac::RBACError,
handlers::http::{
rbac::RBACError,
users::{CORRELATION_DIR, USERS_ROOT_DIR},
},
option::CONFIG,
query::QUERY_SESSION,
rbac::{map::SessionKey, Users},
Expand All @@ -39,167 +43,208 @@ use crate::{
utils::{get_hash, user_auth_for_query},
};

pub static CORRELATIONS: Lazy<Correlation> = Lazy::new(Correlation::default);
pub static CORRELATIONS: Lazy<Correlations> = Lazy::new(Correlations::default);

#[derive(Debug, Default)]
pub struct Correlation(RwLock<Vec<CorrelationConfig>>);
type CorrelationMap = HashMap<CorrelationId, CorrelationConfig>;

impl Correlation {
//load correlations from storage
#[derive(Debug, Default, derive_more::Deref)]
pub struct Correlations(RwLock<CorrelationMap>);

impl Correlations {
// Load correlations from storage
pub async fn load(&self) -> anyhow::Result<()> {
let store = CONFIG.storage().get_object_store();
let all_correlations = store.get_all_correlations().await.unwrap_or_default();

let correlations: Vec<CorrelationConfig> = all_correlations
.into_iter()
.flat_map(|(_, correlations_bytes)| correlations_bytes)
.filter_map(|correlation| {
serde_json::from_slice(&correlation)
.inspect_err(|e| {
error!("Unable to load correlation: {e}");
})
.ok()
})
.collect();
for correlations_bytes in all_correlations.values().flatten() {
let correlation = match serde_json::from_slice::<CorrelationConfig>(correlations_bytes)
{
Ok(c) => c,
Err(e) => {
error!("Unable to load correlation file : {e}");
continue;
}
};

self.write()
.await
.insert(correlation.id.to_owned(), correlation);
}

let mut s = self.0.write().await;
s.extend(correlations);
Ok(())
}

pub async fn list_correlations_for_user(
pub async fn list_correlations(
&self,
session_key: &SessionKey,
user_id: &str,
) -> Result<Vec<CorrelationConfig>, CorrelationError> {
let correlations = self.0.read().await.iter().cloned().collect_vec();

let mut user_correlations = vec![];
let permissions = Users.get_permissions(session_key);

for c in correlations {
let tables = &c
for correlation in self.read().await.values() {
let tables = &correlation
.table_configs
.iter()
.map(|t| t.table_name.clone())
.collect_vec();
if user_auth_for_query(&permissions, tables).is_ok() && c.user_id == user_id {
user_correlations.push(c);
if user_auth_for_query(&permissions, tables).is_ok() {
user_correlations.push(correlation.clone());
}
}

Ok(user_correlations)
}

pub async fn get_correlation(
&self,
correlation_id: &str,
user_id: &str,
) -> Result<CorrelationConfig, CorrelationError> {
let read = self.0.read().await;
let correlation = read
.iter()
.find(|c| c.id == correlation_id && c.user_id == user_id)
.cloned();

correlation.ok_or_else(|| {
CorrelationError::AnyhowError(anyhow::Error::msg(format!(
"Unable to find correlation with ID- {correlation_id}"
)))
})
self.read()
.await
.get(correlation_id)
.cloned()
.ok_or_else(|| {
CorrelationError::AnyhowError(anyhow::Error::msg(format!(
"Unable to find correlation with ID- {correlation_id}"
)))
})
}

pub async fn update(&self, correlation: &CorrelationConfig) -> Result<(), CorrelationError> {
// save to memory
let mut s = self.0.write().await;
s.retain(|c| c.id != correlation.id);
s.push(correlation.clone());
Ok(())
/// Create correlation associated with the user
pub async fn create(
&self,
mut correlation: CorrelationConfig,
session_key: &SessionKey,
) -> Result<CorrelationConfig, CorrelationError> {
correlation.id = get_hash(Utc::now().timestamp_micros().to_string().as_str());
correlation.validate(&session_key).await?;

// Update in storage
let correlation_bytes = serde_json::to_vec(&correlation)?.into();
let path = correlation.path();
CONFIG
.storage()
.get_object_store()
.put_object(&path, correlation_bytes)
.await?;

// Update in memory
self.write().await.insert(
correlation.id.to_owned(),
correlation.clone(),
);

Ok(correlation)
}

pub async fn delete(&self, correlation_id: &str) -> Result<(), CorrelationError> {
// now delete from memory
let read_access = self.0.read().await;
/// Update existing correlation for the user and with the same ID
pub async fn update(
&self,
mut updated_correlation: CorrelationConfig,
session_key: &SessionKey,
) -> Result<CorrelationConfig, CorrelationError> {
// validate whether user has access to this correlation object or not
let correlation = self.get_correlation(&updated_correlation.id).await?;
if correlation.user_id != updated_correlation.user_id {
return Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!(
r#"User "{}" isn't authorized to update correlation with ID - {}"#,
updated_correlation.user_id, correlation.id
))));
}

let index = read_access
.iter()
.enumerate()
.find(|(_, c)| c.id == correlation_id)
.to_owned();

if let Some((index, _)) = index {
// drop the read access in order to get exclusive write access
drop(read_access);
self.0.write().await.remove(index);
trace!("removed correlation from memory");
} else {
warn!("Correlation ID- {correlation_id} not found in memory!");
correlation.validate(&session_key).await?;
updated_correlation.update(correlation);

// Update in storage
let correlation_bytes = serde_json::to_vec(&updated_correlation)?.into();
let path = updated_correlation.path();
CONFIG
.storage()
.get_object_store()
.put_object(&path, correlation_bytes)
.await?;

// Update in memory
self.write().await.insert(
updated_correlation.id.to_owned(),
updated_correlation.clone(),
);

Ok(updated_correlation)
}

/// Delete correlation from memory and storage
pub async fn delete(
&self,
correlation_id: &str,
user_id: &str,
) -> Result<(), CorrelationError> {
let correlation = CORRELATIONS.get_correlation(&correlation_id).await?;
if correlation.user_id != user_id {
return Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!(
r#"User "{user_id}" isn't authorized to delete correlation with ID - {correlation_id}"#
))));
}

// Delete from memory
self.write().await.remove(&correlation.id);

// Delete from storage
let path = correlation.path();
CONFIG
.storage()
.get_object_store()
.delete_object(&path)
.await?;

Ok(())
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum CorrelationVersion {
#[default]
V1,
}

type CorrelationId = String;
type UserId = String;

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CorrelationConfig {
#[serde(skip_deserializing)]
pub version: CorrelationVersion,
pub title: String,
pub id: String,
pub user_id: String,
#[serde(skip_deserializing)]
pub id: CorrelationId,
#[serde(skip_deserializing)]
pub user_id: UserId,
pub table_configs: Vec<TableConfig>,
pub join_config: JoinConfig,
pub filter: Option<FilterQuery>,
pub start_time: Option<String>,
pub end_time: Option<String>,
}

impl CorrelationConfig {}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CorrelationRequest {
pub title: String,
pub table_configs: Vec<TableConfig>,
pub join_config: JoinConfig,
pub filter: Option<FilterQuery>,
pub start_time: Option<String>,
pub end_time: Option<String>,
}

impl From<CorrelationRequest> for CorrelationConfig {
fn from(val: CorrelationRequest) -> Self {
Self {
version: CorrelationVersion::V1,
title: val.title,
id: get_hash(Utc::now().timestamp_micros().to_string().as_str()),
user_id: String::default(),
table_configs: val.table_configs,
join_config: val.join_config,
filter: val.filter,
start_time: val.start_time,
end_time: val.end_time,
}
impl CorrelationConfig {
pub fn path(&self) -> RelativePathBuf {
RelativePathBuf::from_iter([
USERS_ROOT_DIR,
&self.user_id,
CORRELATION_DIR,
&format!("{}.json", self.id),
])
}
}

impl CorrelationRequest {
pub fn generate_correlation_config(self, id: String, user_id: String) -> CorrelationConfig {
CorrelationConfig {
version: CorrelationVersion::V1,
title: self.title,
id,
user_id,
table_configs: self.table_configs,
join_config: self.join_config,
filter: self.filter,
start_time: self.start_time,
end_time: self.end_time,
}
pub fn update(&mut self, update: Self) {
self.title = update.title;
self.table_configs = update.table_configs;
self.join_config = update.join_config;
self.filter = update.filter;
self.start_time = update.start_time;
self.end_time = update.end_time;
}

/// This function will validate the TableConfigs, JoinConfig, and user auth
Expand Down
Loading
Loading