Skip to content

Commit

Permalink
fix: update files in distributed mode to use hash (#761)
Browse files Browse the repository at this point in the history
This PR ensures all metadata and data files (json and parquet) use 
a simple sha256 based hash name mechanism. Each ingestor 
allocates itself a unique hash which is used in all file names
relevant to that ingestor. This hash is persisted in metadata file
content also and is supposed to be the same for the lifecycle 
of the ingestor.
---------

Co-authored-by: Nikhil Sinha <nikhil.sinha@cloudsurfex.com>
  • Loading branch information
Eshanatnight and nikhilsinhaparseable authored Apr 19, 2024
1 parent fb8a105 commit 8e710f2
Show file tree
Hide file tree
Showing 24 changed files with 378 additions and 171 deletions.
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ hashlru = { version = "0.11.0", features = ["serde"] }
path-clean = "1.0.1"
prost = "0.12.3"
prometheus-parse = "0.2.5"
sha2 = "0.10.8"

[build-dependencies]
cargo_toml = "0.15"
Expand Down
33 changes: 22 additions & 11 deletions server/src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub struct Report {
}

impl Report {
pub async fn new() -> Self {
pub async fn new() -> anyhow::Result<Self> {
let mut upt: f64 = 0.0;
if let Ok(uptime) = uptime_lib::get() {
upt = uptime.as_secs_f64();
Expand All @@ -91,9 +91,9 @@ impl Report {
cpu_count = info.cpus().len();
mem_total = info.total_memory();
}
let ingestor_metrics = fetch_ingestors_metrics().await;
let ingestor_metrics = fetch_ingestors_metrics().await?;

Self {
Ok(Self {
deployment_id: storage::StorageMetadata::global().deployment_id,
uptime: upt,
report_created_at: Utc::now(),
Expand All @@ -113,7 +113,7 @@ impl Report {
total_json_bytes: ingestor_metrics.4,
total_parquet_bytes: ingestor_metrics.5,
metrics: build_metrics().await,
}
})
}

pub async fn send(&self) {
Expand Down Expand Up @@ -148,7 +148,7 @@ fn total_event_stats() -> (u64, u64, u64) {
(total_events, total_json_bytes, total_parquet_bytes)
}

async fn fetch_ingestors_metrics() -> (u64, u64, usize, u64, u64, u64) {
async fn fetch_ingestors_metrics() -> anyhow::Result<(u64, u64, usize, u64, u64, u64)> {
let event_stats = total_event_stats();
let mut node_metrics =
NodeMetrics::new(total_streams(), event_stats.0, event_stats.1, event_stats.2);
Expand Down Expand Up @@ -181,24 +181,24 @@ async fn fetch_ingestors_metrics() -> (u64, u64, usize, u64, u64, u64) {
.header(header::CONTENT_TYPE, "application/json")
.send()
.await
.unwrap(); // should respond
.expect("should respond");

let data = serde_json::from_slice::<NodeMetrics>(&resp.bytes().await.unwrap()).unwrap();
let data = serde_json::from_slice::<NodeMetrics>(&resp.bytes().await?)?;
vec.push(data);
active_ingestors += 1;
}

node_metrics.accumulate(&mut vec);
}

(
Ok((
active_ingestors,
offline_ingestors,
node_metrics.stream_count,
node_metrics.total_events_count,
node_metrics.total_json_bytes,
node_metrics.total_parquet_bytes,
)
))
}

async fn build_metrics() -> HashMap<String, Value> {
Expand All @@ -220,14 +220,23 @@ async fn build_metrics() -> HashMap<String, Value> {
metrics
}

pub fn init_analytics_scheduler() {
pub fn init_analytics_scheduler() -> anyhow::Result<()> {
log::info!("Setting up schedular for anonymous user analytics");

let mut scheduler = AsyncScheduler::new();
scheduler
.every(ANALYTICS_SEND_INTERVAL_SECONDS)
.run(move || async {
Report::new().await.send().await;
Report::new()
.await
.unwrap_or_else(|err| {
// panicing because seperate thread
// TODO: a better way to handle this
log::error!("Error while sending analytics: {}", err.to_string());
panic!("{}", err.to_string());
})
.send()
.await;
});

tokio::spawn(async move {
Expand All @@ -236,6 +245,8 @@ pub fn init_analytics_scheduler() {
tokio::time::sleep(Duration::from_secs(10)).await;
}
});

Ok(())
}

#[derive(Serialize, Deserialize, Default, Debug)]
Expand Down
28 changes: 14 additions & 14 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@
*
*/

use std::sync::Arc;

use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, Utc};
use relative_path::RelativePathBuf;
use std::{io::ErrorKind, sync::Arc};

use crate::{
catalog::manifest::Manifest,
query::PartialTimeFilter,
storage::{ObjectStorage, ObjectStorageError, MANIFEST_FILE},
utils::get_address,
storage::{object_storage::manifest_path, ObjectStorage, ObjectStorageError},
};
use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, Utc};
use relative_path::RelativePathBuf;
use std::io::Error as IOError;

use self::{column::Column, snapshot::ManifestItem};

Expand Down Expand Up @@ -117,8 +116,7 @@ pub async fn update_snapshot(

let mut ch = false;
for m in manifests.iter() {
let s = get_address();
let p = format!("{}.{}.{}", s.ip(), s.port(), MANIFEST_FILE);
let p = manifest_path("").to_string();
if m.manifest_path.contains(&p) {
ch = true;
}
Expand All @@ -142,7 +140,11 @@ pub async fn update_snapshot(
23 * 3600 + 59 * 60 + 59,
999_999_999,
)
.unwrap(),
.ok_or(IOError::new(
ErrorKind::Other,
"Failed to create upper bound for manifest",
))
.map_err(ObjectStorageError::IoError)?,
)
.and_utc();

Expand All @@ -151,12 +153,11 @@ pub async fn update_snapshot(
..Manifest::default()
};

let addr = get_address();
let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE);
let mainfest_file_name = manifest_path("").to_string();
let path =
partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
storage
.put_object(&path, serde_json::to_vec(&manifest).unwrap().into())
.put_object(&path, serde_json::to_vec(&manifest)?.into())
.await?;
let path = storage.absolute_url(&path);
let new_snapshot_entriy = snapshot::ManifestItem {
Expand Down Expand Up @@ -185,8 +186,7 @@ pub async fn update_snapshot(
..Manifest::default()
};

let addr = get_address();
let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE);
let mainfest_file_name = manifest_path("").to_string();
let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
storage
.put_object(&path, serde_json::to_vec(&manifest).unwrap().into())
Expand Down
14 changes: 7 additions & 7 deletions server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub struct Cli {
pub mode: Mode,

/// public address for the parseable server ingestor
pub ingestor_url: String,
pub ingestor_endpoint: String,
}

impl Cli {
Expand All @@ -115,7 +115,7 @@ impl Cli {
pub const ROW_GROUP_SIZE: &'static str = "row-group-size";
pub const PARQUET_COMPRESSION_ALGO: &'static str = "compression-algo";
pub const MODE: &'static str = "mode";
pub const INGESTOR_URL: &'static str = "ingestor-url";
pub const INGESTOR_ENDPOINT: &'static str = "ingestor-endpoint";
pub const DEFAULT_USERNAME: &'static str = "admin";
pub const DEFAULT_PASSWORD: &'static str = "admin";

Expand Down Expand Up @@ -317,9 +317,9 @@ impl Cli {
.help("Mode of operation"),
)
.arg(
Arg::new(Self::INGESTOR_URL)
.long(Self::INGESTOR_URL)
.env("P_INGESTOR_URL")
Arg::new(Self::INGESTOR_ENDPOINT)
.long(Self::INGESTOR_ENDPOINT)
.env("P_INGESTOR_ENDPOINT")
.value_name("URL")
.required(false)
.help("URL to connect to this specific ingestor. Default is the address of the server.")
Expand Down Expand Up @@ -367,8 +367,8 @@ impl FromArgMatches for Cli {
.cloned()
.expect("default value for address");

self.ingestor_url = m
.get_one::<String>(Self::INGESTOR_URL)
self.ingestor_endpoint = m
.get_one::<String>(Self::INGESTOR_ENDPOINT)
.cloned()
.unwrap_or_else(String::default);

Expand Down
4 changes: 2 additions & 2 deletions server/src/handlers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Sch
.await?
.iter()
// we should be able to unwrap as we know the data is valid schema
.map(|byte_obj| serde_json::from_slice(byte_obj).unwrap())
.map(|byte_obj| serde_json::from_slice(byte_obj).expect("data is valid json"))
.collect_vec();

let new_schema = Schema::try_merge(res)?;
Expand All @@ -97,7 +97,7 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Sch
pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec<Value>> {
// send the query request to the ingestor
let mut res = vec![];
let ima = get_ingestor_info().await.unwrap();
let ima = get_ingestor_info().await?;

for im in ima.iter() {
let uri = format!(
Expand Down
41 changes: 31 additions & 10 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use chrono::Utc;
use http::StatusCode;
use itertools::Itertools;
use relative_path::RelativePathBuf;
use serde::de::Error;
use serde_json::error::Error as SerdeError;
use serde_json::Value as JsonValue;
use url::Url;

Expand Down Expand Up @@ -262,9 +264,13 @@ pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
StreamError::SerdeError(err)
})?
.get("staging")
.unwrap()
.ok_or(StreamError::SerdeError(SerdeError::missing_field(
"staging",
)))?
.as_str()
.unwrap()
.ok_or(StreamError::SerdeError(SerdeError::custom(
"staging path not a string/ not provided",
)))?
.to_string();

(true, sp, None, status)
Expand Down Expand Up @@ -304,7 +310,9 @@ pub async fn get_cluster_metrics() -> Result<impl Responder, PostError> {
&ingestor.domain_name,
base_path_without_preceding_slash()
))
.unwrap();
.map_err(|err| {
PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
})?;

let res = reqwest::Client::new()
.get(uri)
Expand Down Expand Up @@ -362,14 +370,27 @@ pub async fn remove_ingestor(req: HttpRequest) -> Result<impl Responder, PostErr
if check_liveness(&domain_name).await {
return Err(PostError::Invalid(anyhow::anyhow!("Node Online")));
}

let url = Url::parse(&domain_name).unwrap();
let ingestor_meta_filename = ingestor_metadata_path(
url.host_str().unwrap().to_owned(),
url.port().unwrap().to_string(),
)
.to_string();
let object_store = CONFIG.storage().get_object_store();

let ingestor_metadatas = object_store
.get_objects(
Some(&RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY)),
Box::new(|file_name| file_name.starts_with("ingestor")),
)
.await?;

let ingestor_metadata = ingestor_metadatas
.iter()
.map(|elem| serde_json::from_slice::<IngestorMetadata>(elem).unwrap_or_default())
.collect_vec();

let ingestor_metadata = ingestor_metadata
.iter()
.filter(|elem| elem.domain_name == domain_name)
.collect_vec();

let ingestor_meta_filename =
ingestor_metadata_path(Some(&ingestor_metadata[0].ingestor_id)).to_string();
let msg = match object_store
.try_delete_ingestor_meta(ingestor_meta_filename)
.await
Expand Down
6 changes: 3 additions & 3 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
}

if !body.is_empty() && static_schema_flag == "true" {
let static_schema: StaticSchema = serde_json::from_slice(&body).unwrap();
let static_schema: StaticSchema = serde_json::from_slice(&body)?;
let parsed_schema = convert_static_schema_to_arrow_schema(static_schema);
if let Ok(parsed_schema) = parsed_schema {
schema = parsed_schema;
Expand Down Expand Up @@ -357,7 +357,7 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
None
};

let hash_map = STREAM_INFO.read().unwrap();
let hash_map = STREAM_INFO.read().expect("Readable");
let stream_meta = &hash_map
.get(&stream_name)
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
Expand Down Expand Up @@ -396,7 +396,7 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
stats
};

let stats = serde_json::to_value(stats).unwrap();
let stats = serde_json::to_value(stats)?;

Ok((web::Json(stats), StatusCode::OK))
}
Expand Down
Loading

0 comments on commit 8e710f2

Please sign in to comment.