diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index ebcd41189..afd057eda 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -26,7 +26,7 @@ use serde_json::Value; use crate::alerts::Alerts; use crate::metadata::STREAM_INFO; use crate::option::CONFIG; -use crate::storage::retention::{self, Retention}; +use crate::storage::retention::Retention; use crate::storage::{LogStream, StorageDir}; use crate::{event, stats}; use crate::{metadata, validator}; @@ -219,8 +219,6 @@ pub async fn put_retention( .put_retention(&stream_name, &retention) .await?; - retention::init_scheduler(&stream_name, retention); - Ok(( format!("set retention configuration for log stream {stream_name}"), StatusCode::OK, diff --git a/server/src/main.rs b/server/src/main.rs index f0a1d087c..157c982f1 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -76,7 +76,7 @@ async fn main() -> anyhow::Result<()> { } // track all parquet files already in the data directory - storage::retention::load_retention_from_global().await; + storage::retention::load_retention_from_global(); // load data from stats back to prometheus metrics metrics::load_from_stats_from_storage().await; diff --git a/server/src/storage/retention.rs b/server/src/storage/retention.rs index c395c7d84..82062575a 100644 --- a/server/src/storage/retention.rs +++ b/server/src/storage/retention.rs @@ -43,40 +43,47 @@ fn async_runtime() -> tokio::runtime::Runtime { .unwrap() } -pub async fn load_retention_from_global() { +pub fn load_retention_from_global() { log::info!("loading retention for all streams"); - for stream in STREAM_INFO.list_streams() { - let res = CONFIG - .storage() - .get_object_store() - .get_retention(&stream) - .await; - match res { - Ok(config) => { - if config.tasks.is_empty() { - log::info!("skipping loading retention for {stream}"); - continue; - } - init_scheduler(&stream, config) - } - Err(err) => log::warn!("failed to load retention config for {stream} due to {err:?}"), - } - } + init_scheduler(); } -pub fn init_scheduler(stream: &str, config: Retention) { - log::info!("Setting up schedular for {stream}"); +pub fn init_scheduler() { + log::info!("Setting up schedular"); let mut scheduler = AsyncScheduler::new(); - for Task { action, days, .. } in config.tasks.into_iter() { - let func = match action { - Action::Delete => { - let stream = stream.to_string(); - move || action::delete(stream.clone(), u32::from(days)) - } - }; + let func = move || async { + for stream in STREAM_INFO.list_streams() { + let res = CONFIG + .storage() + .get_object_store() + .get_retention(&stream) + .await; + + match res { + Ok(config) => { + for Task { action, days, .. } in config.tasks.into_iter() { + match action { + Action::Delete => { + let stream = stream.to_string(); + thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + // Run the asynchronous delete action + action::delete(stream.clone(), u32::from(days)).await; + }); + }); + } + }; + } + } + Err(err) => { + log::warn!("failed to load retention config for {stream} due to {err:?}") + } + }; + } + }; - scheduler.every(1.day()).at("00:00").run(func); - } + scheduler.every(1.day()).at("00:00").run(func); let handler = thread::spawn(|| { let rt = async_runtime(); @@ -183,7 +190,7 @@ mod action { use crate::option::CONFIG; pub(super) async fn delete(stream_name: String, days: u32) { - log::info!("running retention task - delete"); + log::info!("running retention task - delete for stream={stream_name}"); let retain_until = get_retain_until(Utc::now().date_naive(), days as u64); let Ok(dates) = CONFIG