From e7cd5868da2714ab401647933fdead1d2d2da7d5 Mon Sep 17 00:00:00 2001 From: Gurjot Kaur <88080236+gurjotkaur20@users.noreply.github.com> Date: Mon, 26 Feb 2024 16:45:29 +0530 Subject: [PATCH] fix: add stream creation time in get stats api (#632) Changes does in the PR - 1. adds the first_event_at property (from the min value of p_timestamp of the first parquet file listed in the first manifest file from the snapshot of the stream.json) to the stats api and writes it to the stream.json file at the request of get stats. 2. updates the first_event_at in case of retention Fixes : #587 --- server/src/catalog.rs | 98 ++++++++++++++++++++------- server/src/handlers/http/logstream.rs | 56 ++++++++++++++- server/src/metadata.rs | 25 ++++++- server/src/storage.rs | 3 + server/src/storage/object_storage.rs | 17 +++++ server/src/storage/retention.rs | 36 +++++++++- 6 files changed, 208 insertions(+), 27 deletions(-) diff --git a/server/src/catalog.rs b/server/src/catalog.rs index f8adad1ca..3ffdd21a1 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -18,7 +18,7 @@ use std::sync::Arc; -use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc}; +use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, Utc}; use relative_path::RelativePathBuf; use crate::{ @@ -69,33 +69,33 @@ impl ManifestFile for manifest::File { } } +fn get_file_bounds(file: &manifest::File) -> (DateTime, DateTime) { + match file + .columns() + .iter() + .find(|col| col.name == "p_timestamp") + .unwrap() + .stats + .clone() + .unwrap() + { + column::TypedStatistics::Int(stats) => ( + NaiveDateTime::from_timestamp_millis(stats.min) + .unwrap() + .and_utc(), + NaiveDateTime::from_timestamp_millis(stats.max) + .unwrap() + .and_utc(), + ), + _ => unreachable!(), + } +} + pub async fn update_snapshot( storage: Arc, stream_name: &str, change: manifest::File, ) -> Result<(), ObjectStorageError> { - fn get_file_bounds(file: &manifest::File) -> (DateTime, DateTime) { - match file - .columns() - .iter() - .find(|col| col.name == "p_timestamp") - .unwrap() - .stats - .clone() - .unwrap() - { - column::TypedStatistics::Int(stats) => ( - NaiveDateTime::from_timestamp_millis(stats.min) - .unwrap() - .and_utc(), - NaiveDateTime::from_timestamp_millis(stats.min) - .unwrap() - .and_utc(), - ), - _ => unreachable!(), - } - } - // get current snapshot let mut meta = storage.get_snapshot(stream_name).await?; let manifests = &mut meta.manifest_list; @@ -154,6 +154,58 @@ pub async fn update_snapshot( Ok(()) } +pub async fn remove_manifest_from_snapshot( + storage: Arc, + stream_name: &str, + dates: Vec, +) -> Result<(), ObjectStorageError> { + // get current snapshot + let mut meta = storage.get_snapshot(stream_name).await?; + let manifests = &mut meta.manifest_list; + + // Filter out items whose manifest_path contains any of the dates_to_delete + manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date))); + + storage.put_snapshot(stream_name, meta).await?; + Ok(()) +} + +pub async fn get_first_event( + storage: Arc, + stream_name: &str, +) -> Result, ObjectStorageError> { + // get current snapshot + let mut meta = storage.get_snapshot(stream_name).await?; + let manifests = &mut meta.manifest_list; + + if manifests.is_empty() { + log::info!("No manifest found for stream {stream_name}"); + return Err(ObjectStorageError::Custom("No manifest found".to_string())); + } + + let manifest = &manifests[0]; + + let path = partition_path( + stream_name, + manifest.time_lower_bound, + manifest.time_upper_bound, + ); + let Some(manifest) = storage.get_manifest(&path).await? else { + return Err(ObjectStorageError::UnhandledError( + "Manifest found in snapshot but not in object-storage" + .to_string() + .into(), + )); + }; + + if let Some(first_event) = manifest.files.first() { + let (lower_bound, _) = get_file_bounds(first_event); + let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339(); + return Ok(Some(first_event_at)); + } + Ok(None) +} + /// Partition the path to which this manifest belongs. /// Useful when uploading the manifest file. fn partition_path( diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index afd057eda..541b19ae4 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -28,7 +28,7 @@ use crate::metadata::STREAM_INFO; use crate::option::CONFIG; use crate::storage::retention::Retention; use crate::storage::{LogStream, StorageDir}; -use crate::{event, stats}; +use crate::{catalog, event, stats}; use crate::{metadata, validator}; use self::error::{CreateStreamError, StreamError}; @@ -263,13 +263,46 @@ pub async fn get_stats(req: HttpRequest) -> Result return Err(StreamError::StreamNotFound(stream_name)); } + if first_event_at_empty(&stream_name) { + let store = CONFIG.storage().get_object_store(); + if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name).await { + if let Err(err) = CONFIG + .storage() + .get_object_store() + .put_first_event_at(&stream_name, &first_event_at) + .await + { + log::error!( + "Failed to update first_event_at in metadata for stream {:?} {err:?}", + stream_name + ); + } + + if let Err(err) = + metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at)) + { + log::error!( + "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", + stream_name + ); + } + } + } + let stats = stats::get_current_stats(&stream_name, "json") .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; + let hash_map = STREAM_INFO.read().unwrap(); + let stream_meta = &hash_map + .get(&stream_name) + .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; + let time = Utc::now(); let stats = serde_json::json!({ "stream": stream_name, + "creation_time": &stream_meta.created_at, + "first_event_at": Some(&stream_meta.first_event_at), "time": time, "ingestion": { "count": stats.events, @@ -285,6 +318,17 @@ pub async fn get_stats(req: HttpRequest) -> Result Ok((web::Json(stats), StatusCode::OK)) } +// Check if the first_event_at is empty +pub fn first_event_at_empty(stream_name: &str) -> bool { + let hash_map = STREAM_INFO.read().unwrap(); + if let Some(stream_info) = hash_map.get(stream_name) { + if let Some(first_event_at) = &stream_info.first_event_at { + return first_event_at.is_empty(); + } + } + true +} + fn remove_id_from_alerts(value: &mut Value) { if let Some(Value::Array(alerts)) = value.get_mut("alerts") { alerts @@ -305,7 +349,15 @@ pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError> if let Err(err) = storage.create_stream(&stream_name).await { return Err(CreateStreamError::Storage { stream_name, err }); } - metadata::STREAM_INFO.add_stream(stream_name.to_string()); + + let stream_meta = CONFIG + .storage() + .get_object_store() + .get_stream_metadata(&stream_name) + .await; + let created_at = stream_meta.unwrap().created_at; + + metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at); Ok(()) } diff --git a/server/src/metadata.rs b/server/src/metadata.rs index b57cc710a..e8a250719 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -18,6 +18,7 @@ use arrow_array::RecordBatch; use arrow_schema::{Field, Fields, Schema}; +use chrono::Local; use itertools::Itertools; use once_cell::sync::Lazy; use std::collections::HashMap; @@ -43,6 +44,8 @@ pub struct LogStreamMetadata { pub schema: HashMap>, pub alerts: Alerts, pub cache_enabled: bool, + pub created_at: String, + pub first_event_at: Option, } // It is very unlikely that panic will occur when dealing with metadata. @@ -126,9 +129,27 @@ impl StreamInfo { }) } - pub fn add_stream(&self, stream_name: String) { + pub fn set_first_event_at( + &self, + stream_name: &str, + first_event_at: Option, + ) -> Result<(), MetadataError> { + let mut map = self.write().expect(LOCK_EXPECT); + map.get_mut(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| { + metadata.first_event_at = first_event_at; + }) + } + + pub fn add_stream(&self, stream_name: String, created_at: String) { let mut map = self.write().expect(LOCK_EXPECT); let metadata = LogStreamMetadata { + created_at: if created_at.is_empty() { + Local::now().to_rfc3339() + } else { + created_at.clone() + }, ..Default::default() }; map.insert(stream_name, metadata); @@ -162,6 +183,8 @@ impl StreamInfo { schema, alerts, cache_enabled: meta.cache_enabled, + created_at: meta.created_at, + first_event_at: meta.first_event_at, }; let mut map = self.write().expect(LOCK_EXPECT); diff --git a/server/src/storage.rs b/server/src/storage.rs index 975fcf445..63e9577c1 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -69,6 +69,8 @@ pub struct ObjectStoreFormat { pub objectstore_format: String, #[serde(rename = "created-at")] pub created_at: String, + #[serde(rename = "first-event-at")] + pub first_event_at: Option, pub owner: Owner, pub permissions: Vec, pub stats: Stats, @@ -113,6 +115,7 @@ impl Default for ObjectStoreFormat { version: CURRENT_SCHEMA_VERSION.to_string(), objectstore_format: CURRENT_OBJECT_STORE_VERSION.to_string(), created_at: Local::now().to_rfc3339(), + first_event_at: None, owner: Owner::new("".to_string(), "".to_string()), permissions: vec![Permisssion::new("parseable".to_string())], stats: Stats::default(), diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 7494d16e1..0d867da94 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -161,6 +161,23 @@ pub trait ObjectStorage: Sync + 'static { self.put_object(&path, to_bytes(&stream_metadata)).await } + async fn put_first_event_at( + &self, + stream_name: &str, + first_event_at: &str, + ) -> Result<(), ObjectStorageError> { + let path = stream_json_path(stream_name); + let stream_metadata = self.get_object(&path).await?; + let first_event_ts = + serde_json::to_value(first_event_at).expect("first_event_at is perfectly serializable"); + let mut stream_metadata: serde_json::Value = + serde_json::from_slice(&stream_metadata).expect("parseable config is valid json"); + + stream_metadata["first-event-at"] = first_event_ts; + + self.put_object(&path, to_bytes(&stream_metadata)).await + } + async fn put_metadata( &self, parseable_metadata: &StorageMetadata, diff --git a/server/src/storage/retention.rs b/server/src/storage/retention.rs index beb8edd2e..7921ba4d1 100644 --- a/server/src/storage/retention.rs +++ b/server/src/storage/retention.rs @@ -193,7 +193,11 @@ mod action { use itertools::Itertools; use relative_path::RelativePathBuf; - use crate::option::CONFIG; + use crate::{ + catalog::{self, remove_manifest_from_snapshot}, + metadata, + option::CONFIG, + }; pub(super) async fn delete(stream_name: String, days: u32) { log::info!("running retention task - delete for stream={stream_name}"); @@ -212,6 +216,7 @@ mod action { .into_iter() .filter(|date| string_to_date(date) < retain_until) .collect_vec(); + let dates = dates_to_delete.clone(); let delete_tasks = FuturesUnordered::new(); for date in dates_to_delete { @@ -232,6 +237,35 @@ mod action { log::error!("Failed to run delete task {err:?}") } } + + let store = CONFIG.storage().get_object_store(); + let res = remove_manifest_from_snapshot(store.clone(), &stream_name, dates).await; + if let Err(err) = res { + log::error!("Failed to update manifest list in the snapshot {err:?}") + } + + if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name).await { + if let Err(err) = CONFIG + .storage() + .get_object_store() + .put_first_event_at(&stream_name, &first_event_at) + .await + { + log::error!( + "Failed to update first_event_at in metadata for stream {:?} {err:?}", + stream_name + ); + } + + if let Err(err) = + metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at)) + { + log::error!( + "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", + stream_name + ); + } + } } fn get_retain_until(current_date: NaiveDate, days: u64) -> NaiveDate {