Skip to content

Commit

Permalink
fix: add stream creation time in get stats api (#632)
Browse files Browse the repository at this point in the history
Changes does in the PR -
1. adds the first_event_at property (from the min value of p_timestamp of the first parquet file listed in the first manifest file from the snapshot of the stream.json) to the stats api and writes it to the stream.json file at the request of get stats.
2. updates the first_event_at in case of retention

Fixes : #587
  • Loading branch information
gurjotkaur20 authored Feb 26, 2024
1 parent 6e8c7ef commit e7cd586
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 27 deletions.
98 changes: 75 additions & 23 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use std::sync::Arc;

use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, Utc};
use relative_path::RelativePathBuf;

use crate::{
Expand Down Expand Up @@ -69,33 +69,33 @@ impl ManifestFile for manifest::File {
}
}

fn get_file_bounds(file: &manifest::File) -> (DateTime<Utc>, DateTime<Utc>) {
match file
.columns()
.iter()
.find(|col| col.name == "p_timestamp")
.unwrap()
.stats
.clone()
.unwrap()
{
column::TypedStatistics::Int(stats) => (
NaiveDateTime::from_timestamp_millis(stats.min)
.unwrap()
.and_utc(),
NaiveDateTime::from_timestamp_millis(stats.max)
.unwrap()
.and_utc(),
),
_ => unreachable!(),
}
}

pub async fn update_snapshot(
storage: Arc<dyn ObjectStorage + Send>,
stream_name: &str,
change: manifest::File,
) -> Result<(), ObjectStorageError> {
fn get_file_bounds(file: &manifest::File) -> (DateTime<Utc>, DateTime<Utc>) {
match file
.columns()
.iter()
.find(|col| col.name == "p_timestamp")
.unwrap()
.stats
.clone()
.unwrap()
{
column::TypedStatistics::Int(stats) => (
NaiveDateTime::from_timestamp_millis(stats.min)
.unwrap()
.and_utc(),
NaiveDateTime::from_timestamp_millis(stats.min)
.unwrap()
.and_utc(),
),
_ => unreachable!(),
}
}

// get current snapshot
let mut meta = storage.get_snapshot(stream_name).await?;
let manifests = &mut meta.manifest_list;
Expand Down Expand Up @@ -154,6 +154,58 @@ pub async fn update_snapshot(
Ok(())
}

pub async fn remove_manifest_from_snapshot(
storage: Arc<dyn ObjectStorage + Send>,
stream_name: &str,
dates: Vec<String>,
) -> Result<(), ObjectStorageError> {
// get current snapshot
let mut meta = storage.get_snapshot(stream_name).await?;
let manifests = &mut meta.manifest_list;

// Filter out items whose manifest_path contains any of the dates_to_delete
manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));

storage.put_snapshot(stream_name, meta).await?;
Ok(())
}

pub async fn get_first_event(
storage: Arc<dyn ObjectStorage + Send>,
stream_name: &str,
) -> Result<Option<String>, ObjectStorageError> {
// get current snapshot
let mut meta = storage.get_snapshot(stream_name).await?;
let manifests = &mut meta.manifest_list;

if manifests.is_empty() {
log::info!("No manifest found for stream {stream_name}");
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
}

let manifest = &manifests[0];

let path = partition_path(
stream_name,
manifest.time_lower_bound,
manifest.time_upper_bound,
);
let Some(manifest) = storage.get_manifest(&path).await? else {
return Err(ObjectStorageError::UnhandledError(
"Manifest found in snapshot but not in object-storage"
.to_string()
.into(),
));
};

if let Some(first_event) = manifest.files.first() {
let (lower_bound, _) = get_file_bounds(first_event);
let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
return Ok(Some(first_event_at));
}
Ok(None)
}

/// Partition the path to which this manifest belongs.
/// Useful when uploading the manifest file.
fn partition_path(
Expand Down
56 changes: 54 additions & 2 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::metadata::STREAM_INFO;
use crate::option::CONFIG;
use crate::storage::retention::Retention;
use crate::storage::{LogStream, StorageDir};
use crate::{event, stats};
use crate::{catalog, event, stats};
use crate::{metadata, validator};

use self::error::{CreateStreamError, StreamError};
Expand Down Expand Up @@ -263,13 +263,46 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
return Err(StreamError::StreamNotFound(stream_name));
}

if first_event_at_empty(&stream_name) {
let store = CONFIG.storage().get_object_store();
if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name).await {
if let Err(err) = CONFIG
.storage()
.get_object_store()
.put_first_event_at(&stream_name, &first_event_at)
.await
{
log::error!(
"Failed to update first_event_at in metadata for stream {:?} {err:?}",
stream_name
);
}

if let Err(err) =
metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at))
{
log::error!(
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
stream_name
);
}
}
}

let stats = stats::get_current_stats(&stream_name, "json")
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;

let hash_map = STREAM_INFO.read().unwrap();
let stream_meta = &hash_map
.get(&stream_name)
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;

let time = Utc::now();

let stats = serde_json::json!({
"stream": stream_name,
"creation_time": &stream_meta.created_at,
"first_event_at": Some(&stream_meta.first_event_at),
"time": time,
"ingestion": {
"count": stats.events,
Expand All @@ -285,6 +318,17 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
Ok((web::Json(stats), StatusCode::OK))
}

// Check if the first_event_at is empty
pub fn first_event_at_empty(stream_name: &str) -> bool {
let hash_map = STREAM_INFO.read().unwrap();
if let Some(stream_info) = hash_map.get(stream_name) {
if let Some(first_event_at) = &stream_info.first_event_at {
return first_event_at.is_empty();
}
}
true
}

fn remove_id_from_alerts(value: &mut Value) {
if let Some(Value::Array(alerts)) = value.get_mut("alerts") {
alerts
Expand All @@ -305,7 +349,15 @@ pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError>
if let Err(err) = storage.create_stream(&stream_name).await {
return Err(CreateStreamError::Storage { stream_name, err });
}
metadata::STREAM_INFO.add_stream(stream_name.to_string());

let stream_meta = CONFIG
.storage()
.get_object_store()
.get_stream_metadata(&stream_name)
.await;
let created_at = stream_meta.unwrap().created_at;

metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at);

Ok(())
}
Expand Down
25 changes: 24 additions & 1 deletion server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

use arrow_array::RecordBatch;
use arrow_schema::{Field, Fields, Schema};
use chrono::Local;
use itertools::Itertools;
use once_cell::sync::Lazy;
use std::collections::HashMap;
Expand All @@ -43,6 +44,8 @@ pub struct LogStreamMetadata {
pub schema: HashMap<String, Arc<Field>>,
pub alerts: Alerts,
pub cache_enabled: bool,
pub created_at: String,
pub first_event_at: Option<String>,
}

// It is very unlikely that panic will occur when dealing with metadata.
Expand Down Expand Up @@ -126,9 +129,27 @@ impl StreamInfo {
})
}

pub fn add_stream(&self, stream_name: String) {
pub fn set_first_event_at(
&self,
stream_name: &str,
first_event_at: Option<String>,
) -> Result<(), MetadataError> {
let mut map = self.write().expect(LOCK_EXPECT);
map.get_mut(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| {
metadata.first_event_at = first_event_at;
})
}

pub fn add_stream(&self, stream_name: String, created_at: String) {
let mut map = self.write().expect(LOCK_EXPECT);
let metadata = LogStreamMetadata {
created_at: if created_at.is_empty() {
Local::now().to_rfc3339()
} else {
created_at.clone()
},
..Default::default()
};
map.insert(stream_name, metadata);
Expand Down Expand Up @@ -162,6 +183,8 @@ impl StreamInfo {
schema,
alerts,
cache_enabled: meta.cache_enabled,
created_at: meta.created_at,
first_event_at: meta.first_event_at,
};

let mut map = self.write().expect(LOCK_EXPECT);
Expand Down
3 changes: 3 additions & 0 deletions server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ pub struct ObjectStoreFormat {
pub objectstore_format: String,
#[serde(rename = "created-at")]
pub created_at: String,
#[serde(rename = "first-event-at")]
pub first_event_at: Option<String>,
pub owner: Owner,
pub permissions: Vec<Permisssion>,
pub stats: Stats,
Expand Down Expand Up @@ -113,6 +115,7 @@ impl Default for ObjectStoreFormat {
version: CURRENT_SCHEMA_VERSION.to_string(),
objectstore_format: CURRENT_OBJECT_STORE_VERSION.to_string(),
created_at: Local::now().to_rfc3339(),
first_event_at: None,
owner: Owner::new("".to_string(), "".to_string()),
permissions: vec![Permisssion::new("parseable".to_string())],
stats: Stats::default(),
Expand Down
17 changes: 17 additions & 0 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,23 @@ pub trait ObjectStorage: Sync + 'static {
self.put_object(&path, to_bytes(&stream_metadata)).await
}

async fn put_first_event_at(
&self,
stream_name: &str,
first_event_at: &str,
) -> Result<(), ObjectStorageError> {
let path = stream_json_path(stream_name);
let stream_metadata = self.get_object(&path).await?;
let first_event_ts =
serde_json::to_value(first_event_at).expect("first_event_at is perfectly serializable");
let mut stream_metadata: serde_json::Value =
serde_json::from_slice(&stream_metadata).expect("parseable config is valid json");

stream_metadata["first-event-at"] = first_event_ts;

self.put_object(&path, to_bytes(&stream_metadata)).await
}

async fn put_metadata(
&self,
parseable_metadata: &StorageMetadata,
Expand Down
36 changes: 35 additions & 1 deletion server/src/storage/retention.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ mod action {
use itertools::Itertools;
use relative_path::RelativePathBuf;

use crate::option::CONFIG;
use crate::{
catalog::{self, remove_manifest_from_snapshot},
metadata,
option::CONFIG,
};

pub(super) async fn delete(stream_name: String, days: u32) {
log::info!("running retention task - delete for stream={stream_name}");
Expand All @@ -212,6 +216,7 @@ mod action {
.into_iter()
.filter(|date| string_to_date(date) < retain_until)
.collect_vec();
let dates = dates_to_delete.clone();

let delete_tasks = FuturesUnordered::new();
for date in dates_to_delete {
Expand All @@ -232,6 +237,35 @@ mod action {
log::error!("Failed to run delete task {err:?}")
}
}

let store = CONFIG.storage().get_object_store();
let res = remove_manifest_from_snapshot(store.clone(), &stream_name, dates).await;
if let Err(err) = res {
log::error!("Failed to update manifest list in the snapshot {err:?}")
}

if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name).await {
if let Err(err) = CONFIG
.storage()
.get_object_store()
.put_first_event_at(&stream_name, &first_event_at)
.await
{
log::error!(
"Failed to update first_event_at in metadata for stream {:?} {err:?}",
stream_name
);
}

if let Err(err) =
metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at))
{
log::error!(
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
stream_name
);
}
}
}

fn get_retain_until(current_date: NaiveDate, days: u64) -> NaiveDate {
Expand Down

0 comments on commit e7cd586

Please sign in to comment.