Skip to content

Commit

Permalink
fix: init_scheduler (#649)
Browse files Browse the repository at this point in the history
Earlier, separate scheduler was initialized for each 
stream on load time or whenever retention period is set.
Now, a single scheduler is initialized which checks retention 
config of all the streams and performs the retention cleanup.

Fixes #636
  • Loading branch information
gurjotkaur20 authored Feb 6, 2024
1 parent 3e39328 commit d795e8e
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 34 deletions.
4 changes: 1 addition & 3 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use serde_json::Value;
use crate::alerts::Alerts;
use crate::metadata::STREAM_INFO;
use crate::option::CONFIG;
use crate::storage::retention::{self, Retention};
use crate::storage::retention::Retention;
use crate::storage::{LogStream, StorageDir};
use crate::{event, stats};
use crate::{metadata, validator};
Expand Down Expand Up @@ -219,8 +219,6 @@ pub async fn put_retention(
.put_retention(&stream_name, &retention)
.await?;

retention::init_scheduler(&stream_name, retention);

Ok((
format!("set retention configuration for log stream {stream_name}"),
StatusCode::OK,
Expand Down
2 changes: 1 addition & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async fn main() -> anyhow::Result<()> {
}

// track all parquet files already in the data directory
storage::retention::load_retention_from_global().await;
storage::retention::load_retention_from_global();
// load data from stats back to prometheus metrics
metrics::load_from_stats_from_storage().await;

Expand Down
67 changes: 37 additions & 30 deletions server/src/storage/retention.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,40 +43,47 @@ fn async_runtime() -> tokio::runtime::Runtime {
.unwrap()
}

pub async fn load_retention_from_global() {
pub fn load_retention_from_global() {
log::info!("loading retention for all streams");
for stream in STREAM_INFO.list_streams() {
let res = CONFIG
.storage()
.get_object_store()
.get_retention(&stream)
.await;
match res {
Ok(config) => {
if config.tasks.is_empty() {
log::info!("skipping loading retention for {stream}");
continue;
}
init_scheduler(&stream, config)
}
Err(err) => log::warn!("failed to load retention config for {stream} due to {err:?}"),
}
}
init_scheduler();
}

pub fn init_scheduler(stream: &str, config: Retention) {
log::info!("Setting up schedular for {stream}");
pub fn init_scheduler() {
log::info!("Setting up schedular");
let mut scheduler = AsyncScheduler::new();
for Task { action, days, .. } in config.tasks.into_iter() {
let func = match action {
Action::Delete => {
let stream = stream.to_string();
move || action::delete(stream.clone(), u32::from(days))
}
};
let func = move || async {
for stream in STREAM_INFO.list_streams() {
let res = CONFIG
.storage()
.get_object_store()
.get_retention(&stream)
.await;

match res {
Ok(config) => {
for Task { action, days, .. } in config.tasks.into_iter() {
match action {
Action::Delete => {
let stream = stream.to_string();
thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
// Run the asynchronous delete action
action::delete(stream.clone(), u32::from(days)).await;
});
});
}
};
}
}
Err(err) => {
log::warn!("failed to load retention config for {stream} due to {err:?}")
}
};
}
};

scheduler.every(1.day()).at("00:00").run(func);
}
scheduler.every(1.day()).at("00:00").run(func);

let handler = thread::spawn(|| {
let rt = async_runtime();
Expand Down Expand Up @@ -183,7 +190,7 @@ mod action {
use crate::option::CONFIG;

pub(super) async fn delete(stream_name: String, days: u32) {
log::info!("running retention task - delete");
log::info!("running retention task - delete for stream={stream_name}");
let retain_until = get_retain_until(Utc::now().date_naive(), days as u64);

let Ok(dates) = CONFIG
Expand Down

0 comments on commit d795e8e

Please sign in to comment.