diff --git a/Cargo.lock b/Cargo.lock index 042da42a7..043028ac2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1328,11 +1328,12 @@ dependencies = [ [[package]] name = "dashmap" -version = "5.5.3" +version = "6.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28" dependencies = [ "cfg-if", + "crossbeam-utils", "hashbrown 0.14.3", "lock_api", "once_cell", @@ -4052,7 +4053,7 @@ dependencies = [ [[package]] name = "server" -version = "0.3.1" +version = "0.3.2" dependencies = [ "anyhow", "async-stream", @@ -4067,6 +4068,7 @@ dependencies = [ "clap", "console-subscriber", "dashmap", + "derive_more", "figlet-rs", "figment", "flume", diff --git a/configs/server.json b/configs/server.json index 39250d89b..af24b8f7d 100644 --- a/configs/server.json +++ b/configs/server.json @@ -1,4 +1,29 @@ { + "data_maintenance": { + "archiver": { + "enabled": false, + "kind": "disk", + "disk": { + "path": "local_data/archive" + }, + "s3": { + "key_id": "123", + "access_key": "secret", + "bucket": "iggy", + "region": "eu-west-1" + } + }, + "messages": { + "archiver_enabled": false, + "cleaner_enabled": false, + "interval": "1 m" + }, + "state": { + "archiver_enabled": false, + "overwrite": true, + "interval": "1 m" + } + }, "http": { "enabled": true, "address": "0.0.0.0:3000", @@ -113,10 +138,6 @@ "enabled": true, "size": "4 GB" }, - "retention_policy": { - "message_expiry": "none", - "max_topic_size": "10 GB" - }, "encryption": { "enabled": false, "key": "" @@ -129,7 +150,9 @@ "path": "streams" }, "topic": { - "path": "topics" + "path": "topics", + "max_size": "10 GB", + "delete_oldest_segments": false }, "partition": { "path": "partitions", @@ -140,7 +163,9 @@ "segment": { "size": "1 GB", "cache_indexes": true, - "cache_time_indexes": true + "cache_time_indexes": true, + "message_expiry": "none", + "archive_expired": false }, "message_deduplication": { "enabled": false, diff --git a/configs/server.toml b/configs/server.toml index fdce13340..ce02beba6 100644 --- a/configs/server.toml +++ b/configs/server.toml @@ -1,3 +1,44 @@ +[data_maintenance.archiver] +# Enables or disables the archiver process. +enabled = false + +# Kind of archiver to use. Available options: "disk". +kind = "disk" + +[data_maintenance.archiver.disk] +# Path for storing the archived data on disk. +path = "local_data/archive" + +[data_maintenance.archiver.s3] +# Access key ID for the S3 bucket. +key_id = "123" +# Secret access key for the S3 bucket +access_key = "secret" +# Region of the S3 bucket. +region = "eu-west-1" +# Endpoint of the S3 bucket. +bucket = "iggy" + +[data_maintenance.messages] +# Enables or disables the archiver process for closed segments containing messages. +archiver_enabled = false + +# Enables or disables the expired message cleaner process. +cleaner_enabled = false + +# Interval for running the message archiver and cleaner. +interval = "1 m" + +[data_maintenance.state] +# Enables or disables the archiver process for state log. +archiver_enabled = false + +# Sets whether the state archiver should overwrite existing log archive or always create a new one. +overwrite = true + +# Interval for running the state archiver +interval = "1 m" + # HTTP server configuration [http] # Determines if the HTTP server is active. @@ -269,24 +310,6 @@ enabled = true # Maximum size of the cache, e.g. "4GB". size = "4 GB" -# Data retention policy configuration. -[system.retention_policy] -# Configures the message time-based expiry setting. -# "none" means messages are kept indefinitely. -# A time value in human-readable format determines the lifespan of messages. -# Example: `message_expiry = "2 days 4 hours 15 minutes"` means messages will expire after that duration. -message_expiry = "none" - -# Configures the topic size-based expiry setting. -# "unlimited" or "0" means topics are kept indefinitely. -# A size value in human-readable format determines the maximum size of a topic. -# When a topic reaches this size, the oldest messages are deleted to make room for new ones. -# Messages are removed in full segments, so if segment size is 1 GB and the topic size is 10 GB, -# the oldest segment will be deleted upon reaching 10 GB. -# Example: `max_topic_size = "10 GB"` means oldest messages in topics will be deleted when they reach 10 GB. -# Note: this setting can be overwritten with CreateTopic and UpdateTopic requests. -max_topic_size = "10 GB" - # Encryption configuration [system.encryption] # Determines whether server-side data encryption is enabled (boolean). @@ -322,6 +345,19 @@ path = "streams" # Specifies the directory where topic data is stored, relative to `stream.path`. path = "topics" +# Configures the topic size-based expiry setting. +# "unlimited" or "0" means topics are kept indefinitely. +# A size value in human-readable format determines the maximum size of a topic. +# When a topic reaches this size, the oldest messages are deleted to make room for new ones. +# Messages are removed in full segments, so if segment size is 1 GB and the topic size is 10 GB, +# the oldest segment will be deleted upon reaching 10 GB. +# Example: `max_topic_size = "10 GB"` means oldest messages in topics will be deleted when they reach 10 GB. +# Note: this setting can be overwritten with CreateTopic and UpdateTopic requests. +max_size = "10 GB" + +# Configures whether the oldest segments are deleted when a topic reaches its maximum size (boolean). +delete_oldest_segments = false + # Partition configuration [system.partition] # Path for storing partition-related data (string). @@ -349,6 +385,14 @@ messages_required_to_save = 10_000 # When a segment reaches this size, a new segment is created for subsequent data. # Example: if `size` is set "1GB", the actual segment size may be 1GB + the size of remaining messages in received batch. size = "1 GB" +# Configures the message time-based expiry setting. +# "none" means messages are kept indefinitely. +# A time value in human-readable format determines the lifespan of messages. +# Example: `message_expiry = "2 days 4 hours 15 minutes"` means messages will expire after that duration. +message_expiry = "none" + +# Configures whether expired segments are archived (boolean) or just deleted without archiving. +archive_expired = false # Controls whether to cache indexes for segment access (boolean). # `true` keeps indexes in memory, speeding up data retrieval. diff --git a/integration/tests/archiver/disk.rs b/integration/tests/archiver/disk.rs new file mode 100644 index 000000000..abc045e68 --- /dev/null +++ b/integration/tests/archiver/disk.rs @@ -0,0 +1,98 @@ +use crate::archiver::ArchiverSetup; +use server::archiver::Archiver; +use server::streaming::utils::file; +use std::path::Path; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +#[tokio::test] +async fn should_init_base_archiver_directory() { + let setup = ArchiverSetup::init().await; + let archiver = setup.archiver(); + let result = archiver.init().await; + assert!(result.is_ok()); + let path = Path::new(&setup.archive_path); + assert!(path.exists()); +} + +#[tokio::test] +async fn should_archive_file_on_disk_by_making_a_copy_of_original_file() { + let setup = ArchiverSetup::init().await; + let archiver = setup.archiver(); + let content = "hello world"; + let file_to_archive_path = format!("{}/file_to_archive", setup.base_path); + create_file(&file_to_archive_path, content).await; + let files_to_archive = vec![file_to_archive_path.as_ref()]; + + let result = archiver.archive(&files_to_archive, None).await; + assert!(result.is_ok()); + let archived_file_path = format!("{}/{}", setup.archive_path, file_to_archive_path); + assert_archived_file(&file_to_archive_path, &archived_file_path, content).await; +} + +#[tokio::test] +async fn should_archive_file_on_disk_within_additional_base_directory() { + let setup = ArchiverSetup::init().await; + let archiver = setup.archiver(); + let base_directory = "base"; + let content = "hello world"; + let file_to_archive_path = format!("{}/file_to_archive", setup.base_path); + create_file(&file_to_archive_path, content).await; + let files_to_archive = vec![file_to_archive_path.as_ref()]; + + let result = archiver + .archive(&files_to_archive, Some(base_directory.to_string())) + .await; + assert!(result.is_ok()); + let archived_file_path = format!( + "{}/{base_directory}/{}", + setup.archive_path, file_to_archive_path + ); + assert_archived_file(&file_to_archive_path, &archived_file_path, content).await; +} + +#[tokio::test] +async fn should_return_true_when_file_is_archived() { + let setup = ArchiverSetup::init().await; + let archiver = setup.archiver(); + let content = "hello world"; + let file_to_archive_path = format!("{}/file_to_archive", setup.base_path); + create_file(&file_to_archive_path, content).await; + let files_to_archive = vec![file_to_archive_path.as_ref()]; + archiver.archive(&files_to_archive, None).await.unwrap(); + + let is_archived = archiver.is_archived(&file_to_archive_path, None).await; + assert!(is_archived.is_ok()); + assert!(is_archived.unwrap()); +} + +#[tokio::test] +async fn should_return_false_when_file_is_not_archived() { + let setup = ArchiverSetup::init().await; + let archiver = setup.archiver(); + let content = "hello world"; + let file_to_archive_path = format!("{}/file_to_archive", setup.base_path); + create_file(&file_to_archive_path, content).await; + + let is_archived = archiver.is_archived(&file_to_archive_path, None).await; + assert!(is_archived.is_ok()); + assert!(!is_archived.unwrap()); +} + +async fn create_file(path: &str, content: &str) { + let mut file = file::overwrite(path).await.unwrap(); + file.write_all(content.as_bytes()).await.unwrap(); +} + +async fn assert_archived_file(file_to_archive_path: &str, archived_file_path: &str, content: &str) { + assert!(Path::new(&file_to_archive_path).exists()); + assert!(Path::new(&archived_file_path).exists()); + let archived_file = file::open(archived_file_path).await; + assert!(archived_file.is_ok()); + let mut archived_file = archived_file.unwrap(); + let mut archived_file_content = String::new(); + archived_file + .read_to_string(&mut archived_file_content) + .await + .unwrap(); + assert_eq!(content, archived_file_content); +} diff --git a/integration/tests/archiver/mod.rs b/integration/tests/archiver/mod.rs new file mode 100644 index 000000000..f8172d614 --- /dev/null +++ b/integration/tests/archiver/mod.rs @@ -0,0 +1,40 @@ +use server::archiver::disk::DiskArchiver; +use server::configs::server::DiskArchiverConfig; +use tokio::fs::create_dir; +use uuid::Uuid; + +mod disk; + +pub struct ArchiverSetup { + base_path: String, + archive_path: String, + archiver: DiskArchiver, +} + +impl ArchiverSetup { + pub async fn init() -> ArchiverSetup { + let base_path = format!("test_local_data_{}", Uuid::new_v4().to_u128_le()); + let archive_path = format!("{}/archive", base_path); + let config = DiskArchiverConfig { + path: archive_path.clone(), + }; + let archiver = DiskArchiver::new(config); + create_dir(&base_path).await.unwrap(); + + Self { + base_path, + archive_path, + archiver, + } + } + + pub fn archiver(&self) -> &DiskArchiver { + &self.archiver + } +} + +impl Drop for ArchiverSetup { + fn drop(&mut self) { + std::fs::remove_dir_all(&self.base_path).unwrap(); + } +} diff --git a/integration/tests/config_provider/mod.rs b/integration/tests/config_provider/mod.rs index 63d107e5e..4dba2953b 100644 --- a/integration/tests/config_provider/mod.rs +++ b/integration/tests/config_provider/mod.rs @@ -56,7 +56,7 @@ async fn validate_custom_env_provider() { "IGGY_MESSAGE_SAVER_ENABLED", expected_message_saver_enabled.to_string(), ); - env::set_var("IGGY_SYSTEM_RETENTION_POLICY_MESSAGE_EXPIRY", "10s"); + env::set_var("IGGY_SYSTEM_SEGMENT_MESSAGE_EXPIRY", "10s"); let config_path = get_root_path().join("../configs/server.toml"); let file_config_provider = FileConfigProvider::new(config_path.as_path().display().to_string()); @@ -77,7 +77,7 @@ async fn validate_custom_env_provider() { assert_eq!(config.tcp.enabled.to_string(), expected_tcp_enabled); assert_eq!(config.message_saver.enabled, expected_message_saver_enabled); assert_eq!( - config.system.retention_policy.message_expiry.to_string(), + config.system.segment.message_expiry.to_string(), expected_message_expiry ); diff --git a/integration/tests/mod.rs b/integration/tests/mod.rs index 43df859f5..3736d7183 100644 --- a/integration/tests/mod.rs +++ b/integration/tests/mod.rs @@ -1,3 +1,4 @@ +mod archiver; mod bench; mod cli; mod config_provider; diff --git a/integration/tests/streaming/segment.rs b/integration/tests/streaming/segment.rs index 87e290041..cbc379479 100644 --- a/integration/tests/streaming/segment.rs +++ b/integration/tests/streaming/segment.rs @@ -283,6 +283,7 @@ async fn given_all_expired_messages_segment_should_be_expired() { segment.persist_messages().await.unwrap(); + segment.is_closed = true; let is_expired = segment.is_expired(now).await; assert!(is_expired); } diff --git a/integration/tests/streaming/system.rs b/integration/tests/streaming/system.rs index dc2f6a187..c9c53cf5d 100644 --- a/integration/tests/streaming/system.rs +++ b/integration/tests/streaming/system.rs @@ -1,6 +1,6 @@ use crate::streaming::common::test_setup::TestSetup; use iggy::identifier::Identifier; -use server::configs::server::PersonalAccessTokenConfig; +use server::configs::server::{DataMaintenanceConfig, PersonalAccessTokenConfig}; use server::streaming::session::Session; use server::streaming::systems::system::System; use std::net::{Ipv4Addr, SocketAddr}; @@ -9,7 +9,11 @@ use tokio::fs; #[tokio::test] async fn should_initialize_system_and_base_directories() { let setup = TestSetup::init().await; - let mut system = System::new(setup.config.clone(), PersonalAccessTokenConfig::default()); + let mut system = System::new( + setup.config.clone(), + DataMaintenanceConfig::default(), + PersonalAccessTokenConfig::default(), + ); system.init().await.unwrap(); @@ -28,7 +32,11 @@ async fn should_initialize_system_and_base_directories() { #[tokio::test] async fn should_create_and_persist_stream() { let setup = TestSetup::init().await; - let mut system = System::new(setup.config.clone(), PersonalAccessTokenConfig::default()); + let mut system = System::new( + setup.config.clone(), + DataMaintenanceConfig::default(), + PersonalAccessTokenConfig::default(), + ); let stream_id = 1; let stream_name = "test"; let session = Session::new(1, 1, SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234)); @@ -45,7 +53,11 @@ async fn should_create_and_persist_stream() { #[tokio::test] async fn should_create_and_persist_stream_with_automatically_generated_id() { let setup = TestSetup::init().await; - let mut system = System::new(setup.config.clone(), PersonalAccessTokenConfig::default()); + let mut system = System::new( + setup.config.clone(), + DataMaintenanceConfig::default(), + PersonalAccessTokenConfig::default(), + ); let stream_id = 1; let stream_name = "test"; let session = Session::new(1, 1, SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234)); @@ -62,7 +74,11 @@ async fn should_create_and_persist_stream_with_automatically_generated_id() { #[tokio::test] async fn should_delete_persisted_stream() { let setup = TestSetup::init().await; - let mut system = System::new(setup.config.clone(), PersonalAccessTokenConfig::default()); + let mut system = System::new( + setup.config.clone(), + DataMaintenanceConfig::default(), + PersonalAccessTokenConfig::default(), + ); let stream_id = 1; let stream_name = "test"; let session = Session::new(1, 1, SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234)); diff --git a/sdk/src/error.rs b/sdk/src/error.rs index 38438e61a..11e8b296f 100644 --- a/sdk/src/error.rs +++ b/sdk/src/error.rs @@ -254,6 +254,8 @@ pub enum IggyError { PartitionNotFound(u32, u32, u32) = 3007, #[error("Topic with ID: {0} for stream with ID: {1} has no partitions.")] NoPartitions(u32, u32) = 3008, + #[error("Cannot read partitions for topic with ID: {0} for stream with ID: {1}")] + TopicFull(u32, u32) = 3009, #[error("Failed to delete consumer offsets directory for path: {0}")] CannotDeleteConsumerOffsetsDirectory(String) = 3010, #[error("Failed to delete consumer offset file for path: {0}")] diff --git a/server/Cargo.toml b/server/Cargo.toml index 00d946be4..0485c34c7 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.3.1" +version = "0.3.2" edition = "2021" build = "src/build.rs" @@ -22,7 +22,8 @@ blake3 = "1.5.1" bytes = "1.6.0" clap = { version = "4.5.4", features = ["derive"] } console-subscriber = { version = "0.2.0", optional = true } -dashmap = "5.5.3" +dashmap = "6.0.1" +derive_more = "0.99.18" figlet-rs = "0.1.5" figment = { version = "0.10.18", features = ["json", "toml", "env"] } flume = "0.11.0" diff --git a/server/server.http b/server/server.http index 368422dd1..a9f8bf1cb 100644 --- a/server/server.http +++ b/server/server.http @@ -227,6 +227,7 @@ Content-Type: application/json "name": "topic1", "compression_algorithm": "none", "partitions_count": 3, + "max_topic_size": 0, "message_expiry": 0 } @@ -238,7 +239,8 @@ Content-Type: application/json { "name": "topic1", "compression_algorithm": "none", - "message_expiry": 1000 + "max_topic_size": 0, + "message_expiry": 0 } ### diff --git a/server/src/archiver/disk.rs b/server/src/archiver/disk.rs new file mode 100644 index 000000000..fc0998702 --- /dev/null +++ b/server/src/archiver/disk.rs @@ -0,0 +1,62 @@ +use crate::archiver::Archiver; +use crate::configs::server::DiskArchiverConfig; +use crate::server_error::ServerError; +use async_trait::async_trait; +use std::path::Path; +use tokio::fs; +use tracing::{debug, info}; + +#[derive(Debug)] +pub struct DiskArchiver { + config: DiskArchiverConfig, +} + +impl DiskArchiver { + pub fn new(config: DiskArchiverConfig) -> Self { + DiskArchiver { config } + } +} + +#[async_trait] +impl Archiver for DiskArchiver { + async fn init(&self) -> Result<(), ServerError> { + if !Path::new(&self.config.path).exists() { + info!("Creating disk archiver directory: {}", self.config.path); + fs::create_dir_all(&self.config.path).await?; + } + Ok(()) + } + + async fn is_archived( + &self, + file: &str, + base_directory: Option, + ) -> Result { + debug!("Checking if file: {file} is archived on disk."); + let base_directory = base_directory.as_deref().unwrap_or_default(); + let path = Path::new(&self.config.path).join(base_directory).join(file); + let is_archived = path.exists(); + debug!("File: {file} is archived: {is_archived}"); + Ok(is_archived) + } + + async fn archive( + &self, + files: &[&str], + base_directory: Option, + ) -> Result<(), ServerError> { + debug!("Archiving files on disk: {:?}", files); + for file in files { + debug!("Archiving file: {file}"); + let source = Path::new(file); + let base_directory = base_directory.as_deref().unwrap_or_default(); + let destination = Path::new(&self.config.path).join(base_directory).join(file); + let destination_path = destination.to_str().unwrap_or_default().to_owned(); + fs::create_dir_all(destination.parent().unwrap()).await?; + fs::copy(source, destination).await?; + debug!("Archived file: {file} at: {destination_path}"); + } + + Ok(()) + } +} diff --git a/server/src/archiver/mod.rs b/server/src/archiver/mod.rs new file mode 100644 index 000000000..f48710ef9 --- /dev/null +++ b/server/src/archiver/mod.rs @@ -0,0 +1,51 @@ +pub mod disk; +pub mod s3; + +use crate::server_error::ServerError; +use async_trait::async_trait; +use derive_more::Display; +use serde::{Deserialize, Serialize}; +use std::fmt::{Debug, Formatter}; +use std::str::FromStr; + +#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Display, Copy, Clone)] +#[serde(rename_all = "lowercase")] +pub enum ArchiverKind { + #[default] + #[display(fmt = "disk")] + Disk, + #[display(fmt = "s3")] + S3, +} + +impl FromStr for ArchiverKind { + type Err = String; + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "disk" => Ok(ArchiverKind::Disk), + "s3" => Err("S3 archiver is not implemented yet.".to_string()), + _ => Err(format!("Unknown archiver kind: {}", s)), + } + } +} + +#[async_trait] +pub trait Archiver: Sync + Send { + async fn init(&self) -> Result<(), ServerError>; + async fn is_archived( + &self, + file: &str, + base_directory: Option, + ) -> Result; + async fn archive( + &self, + files: &[&str], + base_directory: Option, + ) -> Result<(), ServerError>; +} + +impl Debug for dyn Archiver { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Archiver") + } +} diff --git a/server/src/archiver/s3.rs b/server/src/archiver/s3.rs new file mode 100644 index 000000000..1db48eb57 --- /dev/null +++ b/server/src/archiver/s3.rs @@ -0,0 +1,38 @@ +use crate::archiver::Archiver; +use crate::configs::server::S3ArchiverConfig; +use crate::server_error::ServerError; +use async_trait::async_trait; + +#[derive(Debug)] +pub struct S3Archiver { + _config: S3ArchiverConfig, +} + +impl S3Archiver { + pub fn new(config: S3ArchiverConfig) -> Self { + S3Archiver { _config: config } + } +} + +#[async_trait] +impl Archiver for S3Archiver { + async fn init(&self) -> Result<(), ServerError> { + Ok(()) + } + + async fn is_archived( + &self, + _file: &str, + _base_directory: Option, + ) -> Result { + todo!("Checking if file is archived on S3") + } + + async fn archive( + &self, + _files: &[&str], + _base_directory: Option, + ) -> Result<(), ServerError> { + todo!("Archiving files on S3") + } +} diff --git a/server/src/channels/commands/archive_state.rs b/server/src/channels/commands/archive_state.rs new file mode 100644 index 000000000..bfb015489 --- /dev/null +++ b/server/src/channels/commands/archive_state.rs @@ -0,0 +1,122 @@ +use crate::channels::server_command::ServerCommand; +use crate::configs::server::StateMaintenanceConfig; +use crate::streaming::systems::system::SharedSystem; +use async_trait::async_trait; +use flume::Sender; +use iggy::utils::duration::IggyDuration; +use iggy::utils::timestamp::IggyTimestamp; +use tokio::time; +use tracing::{error, info, warn}; + +pub struct StateArchiver { + enabled: bool, + overwrite: bool, + interval: IggyDuration, + sender: Sender, +} + +#[derive(Debug, Default, Clone)] +pub struct ArchiveStateCommand { + overwrite: bool, +} + +#[derive(Debug, Default, Clone)] +pub struct ArchiveStateExecutor; + +impl StateArchiver { + pub fn new(config: &StateMaintenanceConfig, sender: Sender) -> Self { + Self { + enabled: config.archiver_enabled, + overwrite: config.overwrite, + interval: config.interval, + sender, + } + } + + pub fn start(&self) { + if !self.enabled { + info!("State archiver is disabled."); + return; + } + + let overwrite = self.overwrite; + let interval = self.interval; + let sender = self.sender.clone(); + info!("State archiver is enabled, state will be archived every: {interval}."); + tokio::spawn(async move { + let mut interval_timer = time::interval(interval.get_duration()); + loop { + interval_timer.tick().await; + sender + .send(ArchiveStateCommand { overwrite }) + .unwrap_or_else(|err| { + error!("Failed to send ArchiveStateCommand. Error: {}", err); + }); + } + }); + } +} + +#[async_trait] +impl ServerCommand for ArchiveStateExecutor { + async fn execute(&mut self, system: &SharedSystem, command: ArchiveStateCommand) { + let system = system.read(); + if system.archiver.is_none() { + warn!("Archiver is disabled, state will not be archived."); + return; + } + + let base_directory = if command.overwrite { + None + } else { + Some(format!("{}_state", IggyTimestamp::now().as_micros())) + }; + let state_log_path = system.config.get_state_log_path(); + let state_info_path = system.config.get_state_info_path(); + info!("Archiving state..."); + let archiver = system.archiver.as_ref().unwrap(); + let files = [state_log_path.as_ref(), state_info_path.as_ref()]; + if let Err(error) = archiver.archive(&files, base_directory).await { + error!("Failed to archive state. Error: {}", error); + return; + } + info!("State archived successfully."); + } + + fn start_command_sender( + &mut self, + _system: SharedSystem, + config: &crate::configs::server::ServerConfig, + sender: Sender, + ) { + if !config.data_maintenance.archiver.enabled + || !config.data_maintenance.state.archiver_enabled + { + return; + } + + let state_archiver = StateArchiver::new(&config.data_maintenance.state, sender); + state_archiver.start(); + } + + fn start_command_consumer( + mut self, + system: SharedSystem, + config: &crate::configs::server::ServerConfig, + receiver: flume::Receiver, + ) { + if !config.data_maintenance.archiver.enabled + || !config.data_maintenance.state.archiver_enabled + { + return; + } + + tokio::spawn(async move { + let system = system.clone(); + while let Ok(command) = receiver.recv_async().await { + self.execute(&system, command).await; + } + info!("State archiver receiver stopped."); + }); + } +} diff --git a/server/src/channels/commands/clean_messages.rs b/server/src/channels/commands/clean_messages.rs deleted file mode 100644 index c3eb45e4a..000000000 --- a/server/src/channels/commands/clean_messages.rs +++ /dev/null @@ -1,171 +0,0 @@ -use crate::streaming::systems::system::SharedSystem; -use crate::streaming::topics::topic::Topic; -use crate::{channels::server_command::ServerCommand, configs::server::MessageCleanerConfig}; -use async_trait::async_trait; -use flume::Sender; -use iggy::error::IggyError; -use iggy::locking::IggySharedMutFn; -use iggy::utils::duration::IggyDuration; -use iggy::utils::timestamp::IggyTimestamp; -use tokio::time; -use tracing::{error, info}; - -struct DeletedSegments { - pub segments_count: u32, - pub messages_count: u64, -} - -pub struct MessagesCleaner { - enabled: bool, - interval: IggyDuration, - sender: Sender, -} - -#[derive(Debug, Default, Clone)] -pub struct CleanMessagesCommand; - -#[derive(Debug, Default, Clone)] -pub struct CleanMessagesExecutor; - -impl MessagesCleaner { - pub fn new(config: &MessageCleanerConfig, sender: Sender) -> Self { - Self { - enabled: config.enabled, - interval: config.interval, - sender, - } - } - - pub fn start(&self) { - if !self.enabled { - info!("Message cleaner is disabled."); - return; - } - - let interval = self.interval; - let sender = self.sender.clone(); - info!( - "Message cleaner is enabled, expired messages will be deleted every: {:?}.", - interval - ); - - tokio::spawn(async move { - let mut interval_timer = time::interval(interval.get_duration()); - loop { - interval_timer.tick().await; - sender.send(CleanMessagesCommand).unwrap_or_else(|err| { - error!("Failed to send CleanMessagesCommand. Error: {}", err); - }); - } - }); - } -} - -#[async_trait] -impl ServerCommand for CleanMessagesExecutor { - async fn execute(&mut self, system: &SharedSystem, _command: CleanMessagesCommand) { - let now = IggyTimestamp::now(); - let system = system.read(); - let streams = system.get_streams(); - for stream in streams { - let topics = stream.get_topics(); - for topic in topics { - let deleted_segments = delete_expired_segments(topic, now).await; - if let Ok(Some(deleted_segments)) = deleted_segments { - info!( - "Deleted {} segments and {} messages for stream ID: {}, topic ID: {}", - deleted_segments.segments_count, - deleted_segments.messages_count, - topic.stream_id, - topic.topic_id - ); - - system - .metrics - .decrement_segments(deleted_segments.segments_count); - system - .metrics - .decrement_messages(deleted_segments.messages_count); - } - } - } - } - - fn start_command_sender( - &mut self, - _system: SharedSystem, - config: &crate::configs::server::ServerConfig, - sender: Sender, - ) { - let messages_cleaner = MessagesCleaner::new(&config.message_cleaner, sender); - messages_cleaner.start(); - } - - fn start_command_consumer( - mut self, - system: SharedSystem, - _config: &crate::configs::server::ServerConfig, - receiver: flume::Receiver, - ) { - tokio::spawn(async move { - let system = system.clone(); - while let Ok(command) = receiver.recv_async().await { - self.execute(&system, command).await; - } - info!("Messages cleaner receiver stopped."); - }); - } -} - -async fn delete_expired_segments( - topic: &Topic, - now: IggyTimestamp, -) -> Result, IggyError> { - let expired_segments = topic - .get_expired_segments_start_offsets_per_partition(now) - .await; - if expired_segments.is_empty() { - return Ok(None); - } - - info!( - "Found {} expired segments for stream ID: {}, topic ID: {}, deleting...", - expired_segments.len(), - topic.stream_id, - topic.topic_id - ); - - let mut segments_count = 0; - let mut messages_count = 0; - for (partition_id, start_offsets) in &expired_segments { - match topic.get_partition(*partition_id) { - Ok(partition) => { - let mut partition = partition.write().await; - let mut last_end_offset = 0; - for start_offset in start_offsets { - let deleted_segment = partition.delete_segment(*start_offset).await?; - last_end_offset = deleted_segment.end_offset; - segments_count += 1; - messages_count += deleted_segment.messages_count; - } - - if partition.get_segments().is_empty() { - let start_offset = last_end_offset + 1; - partition.add_persisted_segment(start_offset).await?; - } - } - Err(error) => { - error!( - "Partition with ID: {} not found for stream ID: {}, topic ID: {}. Error: {}", - partition_id, topic.stream_id, topic.topic_id, error - ); - continue; - } - } - } - - Ok(Some(DeletedSegments { - segments_count, - messages_count, - })) -} diff --git a/server/src/channels/commands/clean_personal_access_tokens.rs b/server/src/channels/commands/clean_personal_access_tokens.rs index cb18f3cd1..5ab5c77a2 100644 --- a/server/src/channels/commands/clean_personal_access_tokens.rs +++ b/server/src/channels/commands/clean_personal_access_tokens.rs @@ -40,11 +40,7 @@ impl PersonalAccessTokenCleaner { let interval = self.interval; let sender = self.sender.clone(); - info!( - "Personal access token cleaner is enabled, expired tokens will be deleted every: {:?}.", - interval - ); - + info!("Personal access token cleaner is enabled, expired tokens will be deleted every: {interval}."); tokio::spawn(async move { let mut interval_timer = time::interval(interval.get_duration()); loop { diff --git a/server/src/channels/commands/maintain_messages.rs b/server/src/channels/commands/maintain_messages.rs new file mode 100644 index 000000000..f3a62262f --- /dev/null +++ b/server/src/channels/commands/maintain_messages.rs @@ -0,0 +1,517 @@ +use crate::archiver::Archiver; +use crate::channels::server_command::ServerCommand; +use crate::configs::server::MessagesMaintenanceConfig; +use crate::map_toggle_str; +use crate::streaming::systems::system::SharedSystem; +use crate::streaming::topics::topic::Topic; +use async_trait::async_trait; +use flume::Sender; +use iggy::error::IggyError; +use iggy::locking::IggySharedMutFn; +use iggy::utils::duration::IggyDuration; +use iggy::utils::timestamp::IggyTimestamp; +use std::sync::Arc; +use tokio::time; +use tracing::{debug, error, info}; + +pub struct MessagesMaintainer { + cleaner_enabled: bool, + archiver_enabled: bool, + interval: IggyDuration, + sender: Sender, +} + +#[derive(Debug, Default, Clone)] +pub struct MaintainMessagesCommand { + clean_messages: bool, + archive_messages: bool, +} + +#[derive(Debug, Default, Clone)] +pub struct MaintainMessagesExecutor; + +impl MessagesMaintainer { + pub fn new( + config: &MessagesMaintenanceConfig, + sender: Sender, + ) -> Self { + Self { + cleaner_enabled: config.cleaner_enabled, + archiver_enabled: config.archiver_enabled, + interval: config.interval, + sender, + } + } + + pub fn start(&self) { + if !self.cleaner_enabled && !self.archiver_enabled { + info!("Messages maintainer is disabled."); + return; + } + + let interval = self.interval; + let sender = self.sender.clone(); + info!( + "Message maintainer, cleaner is {}, archiver is {}, interval: {interval}", + map_toggle_str(self.cleaner_enabled), + map_toggle_str(self.archiver_enabled) + ); + let clean_messages = self.cleaner_enabled; + let archive_messages = self.archiver_enabled; + tokio::spawn(async move { + let mut interval_timer = time::interval(interval.get_duration()); + loop { + interval_timer.tick().await; + sender + .send(MaintainMessagesCommand { + clean_messages, + archive_messages, + }) + .unwrap_or_else(|err| { + error!("Failed to send MaintainMessagesCommand. Error: {}", err); + }); + } + }); + } +} + +#[async_trait] +impl ServerCommand for MaintainMessagesExecutor { + async fn execute(&mut self, system: &SharedSystem, command: MaintainMessagesCommand) { + let system = system.read(); + let streams = system.get_streams(); + for stream in streams { + let topics = stream.get_topics(); + for topic in topics { + let archiver = if command.archive_messages { + system.archiver.clone() + } else { + None + }; + let expired_segments = handle_expired_segments( + topic, + archiver.clone(), + system.config.segment.archive_expired, + command.clean_messages, + ) + .await; + if expired_segments.is_err() { + error!( + "Failed to get expired segments for stream ID: {}, topic ID: {}", + topic.stream_id, topic.topic_id + ); + continue; + } + + let oldest_segments = handle_oldest_segments( + topic, + archiver.clone(), + system.config.topic.delete_oldest_segments, + ) + .await; + if oldest_segments.is_err() { + error!( + "Failed to get oldest segments for stream ID: {}, topic ID: {}", + topic.stream_id, topic.topic_id + ); + continue; + } + + let deleted_expired_segments = expired_segments.unwrap(); + let deleted_oldest_segments = oldest_segments.unwrap(); + let deleted_segments = HandledSegments { + segments_count: deleted_expired_segments.segments_count + + deleted_oldest_segments.segments_count, + messages_count: deleted_expired_segments.messages_count + + deleted_oldest_segments.messages_count, + }; + + if deleted_segments.segments_count == 0 { + info!( + "No segments were deleted for stream ID: {}, topic ID: {}", + topic.stream_id, topic.topic_id + ); + continue; + } + + info!( + "Deleted {} segments and {} messages for stream ID: {}, topic ID: {}", + deleted_segments.segments_count, + deleted_segments.messages_count, + topic.stream_id, + topic.topic_id + ); + + system + .metrics + .decrement_segments(deleted_segments.segments_count); + system + .metrics + .decrement_messages(deleted_segments.messages_count); + } + } + } + + fn start_command_sender( + &mut self, + _system: SharedSystem, + config: &crate::configs::server::ServerConfig, + sender: Sender, + ) { + if (!config.data_maintenance.archiver.enabled + || !config.data_maintenance.messages.archiver_enabled) + && !config.data_maintenance.messages.cleaner_enabled + { + return; + } + + let messages_maintainer = + MessagesMaintainer::new(&config.data_maintenance.messages, sender); + messages_maintainer.start(); + } + + fn start_command_consumer( + mut self, + system: SharedSystem, + config: &crate::configs::server::ServerConfig, + receiver: flume::Receiver, + ) { + if (!config.data_maintenance.archiver.enabled + || !config.data_maintenance.messages.archiver_enabled) + && !config.data_maintenance.messages.cleaner_enabled + { + return; + } + + tokio::spawn(async move { + let system = system.clone(); + while let Ok(command) = receiver.recv_async().await { + self.execute(&system, command).await; + } + info!("Messages maintainer receiver stopped."); + }); + } +} + +async fn handle_expired_segments( + topic: &Topic, + archiver: Option>, + archive: bool, + clean: bool, +) -> Result { + let expired_segments = get_expired_segments(topic, IggyTimestamp::now()).await; + if expired_segments.is_empty() { + return Ok(HandledSegments::none()); + } + + if archive { + if let Some(archiver) = archiver { + info!( + "Archiving expired segments for stream ID: {}, topic ID: {}", + topic.stream_id, topic.topic_id + ); + archive_segments(topic, &expired_segments, archiver.clone()).await?; + } else { + error!( + "Archiver is not enabled, yet archive_expired is set to true. Cannot archive expired segments for stream ID: {}, topic ID: {}", + topic.stream_id, topic.topic_id + ); + return Ok(HandledSegments::none()); + } + } + + if clean { + info!( + "Deleting expired segments for stream ID: {}, topic ID: {}", + topic.stream_id, topic.topic_id + ); + delete_segments(topic, &expired_segments).await + } else { + info!( + "Deleting expired segments is disabled for stream ID: {}, topic ID: {}", + topic.stream_id, topic.topic_id + ); + Ok(HandledSegments::none()) + } +} + +async fn get_expired_segments(topic: &Topic, now: IggyTimestamp) -> Vec { + let expired_segments = topic + .get_expired_segments_start_offsets_per_partition(now) + .await; + if expired_segments.is_empty() { + debug!( + "No expired segments found for stream ID: {}, topic ID: {}", + topic.stream_id, topic.topic_id + ); + return Vec::new(); + } + + debug!( + "Found {} expired segments for stream ID: {}, topic ID: {}", + expired_segments.len(), + topic.stream_id, + topic.topic_id + ); + + expired_segments + .into_iter() + .map(|(partition_id, start_offsets)| SegmentsToHandle { + partition_id, + start_offsets, + }) + .collect() +} + +async fn handle_oldest_segments( + topic: &Topic, + archiver: Option>, + delete_oldest_segments: bool, +) -> Result { + if let Some(archiver) = archiver { + let mut segments_to_archive = Vec::new(); + for partition in topic.partitions.values() { + let mut start_offsets = Vec::new(); + let partition = partition.read().await; + for segment in partition.get_segments() { + if !segment.is_closed { + continue; + } + + let is_archived = archiver.is_archived(&segment.index_path, None).await; + if is_archived.is_err() { + error!( + "Failed to check if segment with start offset: {} is archived for stream ID: {}, topic ID: {}, partition ID: {}. Error: {}", + segment.start_offset, topic.stream_id, topic.topic_id, partition.partition_id, is_archived.err().unwrap() + ); + continue; + } + + if !is_archived.unwrap() { + debug!( + "Segment with start offset: {} is not archived for stream ID: {}, topic ID: {}, partition ID: {}", + segment.start_offset, topic.stream_id, topic.topic_id, partition.partition_id + ); + start_offsets.push(segment.start_offset); + } + } + if !start_offsets.is_empty() { + info!( + "Found {} segments to archive for stream ID: {}, topic ID: {}, partition ID: {}", + start_offsets.len(), + topic.stream_id, + topic.topic_id, + partition.partition_id + ); + segments_to_archive.push(SegmentsToHandle { + partition_id: partition.partition_id, + start_offsets, + }); + } + } + + info!( + "Archiving {} oldest segments for stream ID: {}, topic ID: {}...", + segments_to_archive.len(), + topic.stream_id, + topic.topic_id, + ); + archive_segments(topic, &segments_to_archive, archiver.clone()).await?; + } + + if topic.is_unlimited() { + debug!( + "Topic is unlimited, oldest segments will not be deleted for stream ID: {}, topic ID: {}", + topic.stream_id, topic.topic_id + ); + return Ok(HandledSegments::none()); + } + + if !delete_oldest_segments { + debug!( + "Delete oldest segments is disabled, oldest segments will not be deleted for stream ID: {}, topic ID: {}", + topic.stream_id, topic.topic_id + ); + return Ok(HandledSegments::none()); + } + + if !topic.is_almost_full() { + debug!( + "Topic is not almost full, oldest segments will not be deleted for stream ID: {}, topic ID: {}", + topic.stream_id, topic.topic_id + ); + return Ok(HandledSegments::none()); + } + + let oldest_segments = get_oldest_segments(topic).await; + if oldest_segments.is_empty() { + return Ok(HandledSegments::none()); + } + + delete_segments(topic, &oldest_segments).await +} + +async fn get_oldest_segments(topic: &Topic) -> Vec { + let mut oldest_segments = Vec::new(); + for partition in topic.partitions.values() { + let partition = partition.read().await; + if let Some(segment) = partition.get_segments().first() { + if !segment.is_closed { + continue; + } + + oldest_segments.push(SegmentsToHandle { + partition_id: partition.partition_id, + start_offsets: vec![segment.start_offset], + }); + } + } + + if oldest_segments.is_empty() { + debug!( + "No oldest segments found for stream ID: {}, topic ID: {}", + topic.stream_id, topic.topic_id + ); + return oldest_segments; + } + + info!( + "Found {} oldest segments for stream ID: {}, topic ID: {}.", + oldest_segments.len(), + topic.stream_id, + topic.topic_id + ); + + oldest_segments +} + +#[derive()] +struct SegmentsToHandle { + partition_id: u32, + start_offsets: Vec, +} + +#[derive(Debug)] +struct HandledSegments { + pub segments_count: u32, + pub messages_count: u64, +} + +impl HandledSegments { + pub fn none() -> Self { + Self { + segments_count: 0, + messages_count: 0, + } + } +} + +async fn archive_segments( + topic: &Topic, + segments_to_archive: &[SegmentsToHandle], + archiver: Arc, +) -> Result { + if segments_to_archive.is_empty() { + return Ok(0); + } + + info!( + "Found {} segments to archive for stream ID: {}, topic ID: {}, archiving...", + segments_to_archive.len(), + topic.stream_id, + topic.topic_id + ); + + let mut archived_segments = 0; + for segment_to_archive in segments_to_archive { + match topic.get_partition(segment_to_archive.partition_id) { + Ok(partition) => { + let partition = partition.read().await; + for start_offset in &segment_to_archive.start_offsets { + let segment = partition.get_segment(*start_offset); + if segment.is_none() { + error!( + "Segment with start offset: {} not found for stream ID: {}, topic ID: {}, partition ID: {}", + start_offset, topic.stream_id, topic.topic_id, partition.partition_id + ); + continue; + } + + let segment = segment.unwrap(); + let files = [ + segment.index_path.as_ref(), + segment.time_index_path.as_ref(), + segment.log_path.as_ref(), + ]; + if let Err(error) = archiver.archive(&files, None).await { + error!( + "Failed to archive segment with start offset: {} for stream ID: {}, topic ID: {}, partition ID: {}. Error: {}", + start_offset, topic.stream_id, topic.topic_id, partition.partition_id, error + ); + continue; + } + info!( + "Archived Segment with start offset: {}, for stream ID: {}, topic ID: {}, partition ID: {}", + start_offset, topic.stream_id, topic.topic_id, partition.partition_id + ); + archived_segments += 1; + } + } + Err(error) => { + error!( + "Partition with ID: {} was not found for stream ID: {}, topic ID: {}. Error: {}", + segment_to_archive.partition_id, topic.stream_id, topic.topic_id, error + ); + continue; + } + } + } + + Ok(archived_segments) +} + +async fn delete_segments( + topic: &Topic, + segments_to_delete: &[SegmentsToHandle], +) -> Result { + info!( + "Deleting {} segments for stream ID: {}, topic ID: {}...", + segments_to_delete.len(), + topic.stream_id, + topic.topic_id + ); + + let mut segments_count = 0; + let mut messages_count = 0; + for segment_to_delete in segments_to_delete { + match topic.get_partition(segment_to_delete.partition_id) { + Ok(partition) => { + let mut partition = partition.write().await; + let mut last_end_offset = 0; + for start_offset in &segment_to_delete.start_offsets { + let deleted_segment = partition.delete_segment(*start_offset).await?; + last_end_offset = deleted_segment.end_offset; + segments_count += 1; + messages_count += deleted_segment.messages_count; + } + + if partition.get_segments().is_empty() { + let start_offset = last_end_offset + 1; + partition.add_persisted_segment(start_offset).await?; + } + } + Err(error) => { + error!( + "Partition with ID: {} not found for stream ID: {}, topic ID: {}. Error: {}", + segment_to_delete.partition_id, topic.stream_id, topic.topic_id, error + ); + continue; + } + } + } + + Ok(HandledSegments { + segments_count, + messages_count, + }) +} diff --git a/server/src/channels/commands/mod.rs b/server/src/channels/commands/mod.rs index da2f5d823..5fc4b1a07 100644 --- a/server/src/channels/commands/mod.rs +++ b/server/src/channels/commands/mod.rs @@ -1,4 +1,5 @@ -pub mod clean_messages; +pub mod archive_state; pub mod clean_personal_access_tokens; +pub mod maintain_messages; pub mod print_sysinfo; pub mod save_messages; diff --git a/server/src/channels/commands/print_sysinfo.rs b/server/src/channels/commands/print_sysinfo.rs index 16d5700b1..77f5a93fe 100644 --- a/server/src/channels/commands/print_sysinfo.rs +++ b/server/src/channels/commands/print_sysinfo.rs @@ -31,11 +31,7 @@ impl SysInfoPrinter { return; } - info!( - "SysInfoPrinter is enabled, system information will be printed every {:?} seconds.", - interval.as_secs() - ); - + info!("SysInfoPrinter is enabled, system information will be printed every {interval}."); tokio::spawn(async move { let mut interval_timer = time::interval(interval.get_duration()); loop { diff --git a/server/src/channels/commands/save_messages.rs b/server/src/channels/commands/save_messages.rs index c7cfcc1dc..06cdb3c8b 100644 --- a/server/src/channels/commands/save_messages.rs +++ b/server/src/channels/commands/save_messages.rs @@ -42,11 +42,7 @@ impl MessagesSaver { let enforce_fsync = self.enforce_fsync; let interval = self.interval; let sender = self.sender.clone(); - info!( - "Message saver is enabled, buffered messages will be automatically saved every: {:?}, enforce fsync: {:?}.", - interval, enforce_fsync - ); - + info!("Message saver is enabled, buffered messages will be automatically saved every: {interval}, enforce fsync: {enforce_fsync}."); tokio::spawn(async move { let mut interval_timer = time::interval(interval.get_duration()); loop { diff --git a/server/src/configs/defaults.rs b/server/src/configs/defaults.rs index f64cba4a6..71fea1c39 100644 --- a/server/src/configs/defaults.rs +++ b/server/src/configs/defaults.rs @@ -3,13 +3,14 @@ use crate::configs::http::{ }; use crate::configs::quic::{QuicCertificateConfig, QuicConfig}; use crate::configs::server::{ - MessageCleanerConfig, MessageSaverConfig, PersonalAccessTokenCleanerConfig, - PersonalAccessTokenConfig, ServerConfig, + ArchiverConfig, DataMaintenanceConfig, MessageSaverConfig, MessagesMaintenanceConfig, + PersonalAccessTokenCleanerConfig, PersonalAccessTokenConfig, ServerConfig, + StateMaintenanceConfig, }; use crate::configs::system::{ BackupConfig, CacheConfig, CompatibilityConfig, CompressionConfig, EncryptionConfig, - LoggingConfig, MessageDeduplicationConfig, PartitionConfig, RetentionPolicyConfig, - RuntimeConfig, SegmentConfig, StreamConfig, SystemConfig, TopicConfig, + LoggingConfig, MessageDeduplicationConfig, PartitionConfig, RuntimeConfig, SegmentConfig, + StreamConfig, SystemConfig, TopicConfig, }; use crate::configs::tcp::{TcpConfig, TcpTlsConfig}; use std::sync::Arc; @@ -22,7 +23,7 @@ static_toml::static_toml! { impl Default for ServerConfig { fn default() -> ServerConfig { ServerConfig { - message_cleaner: MessageCleanerConfig::default(), + data_maintenance: DataMaintenanceConfig::default(), message_saver: MessageSaverConfig::default(), personal_access_token: PersonalAccessTokenConfig::default(), system: Arc::new(SystemConfig::default()), @@ -33,10 +34,56 @@ impl Default for ServerConfig { } } +impl Default for ArchiverConfig { + fn default() -> ArchiverConfig { + ArchiverConfig { + enabled: false, + kind: SERVER_CONFIG + .data_maintenance + .archiver + .kind + .parse() + .unwrap(), + disk: None, + s3: None, + } + } +} + +impl Default for MessagesMaintenanceConfig { + fn default() -> MessagesMaintenanceConfig { + MessagesMaintenanceConfig { + archiver_enabled: SERVER_CONFIG.data_maintenance.messages.archiver_enabled, + cleaner_enabled: SERVER_CONFIG.data_maintenance.messages.cleaner_enabled, + interval: SERVER_CONFIG + .data_maintenance + .messages + .interval + .parse() + .unwrap(), + } + } +} + +impl Default for StateMaintenanceConfig { + fn default() -> StateMaintenanceConfig { + StateMaintenanceConfig { + archiver_enabled: SERVER_CONFIG.data_maintenance.state.archiver_enabled, + overwrite: SERVER_CONFIG.data_maintenance.state.overwrite, + interval: SERVER_CONFIG + .data_maintenance + .state + .interval + .parse() + .unwrap(), + } + } +} + impl Default for QuicConfig { fn default() -> QuicConfig { QuicConfig { - enabled: true, + enabled: SERVER_CONFIG.quic.enabled, address: SERVER_CONFIG.quic.address.parse().unwrap(), max_concurrent_bidi_streams: SERVER_CONFIG.quic.max_concurrent_bidi_streams as u64, datagram_send_buffer_size: SERVER_CONFIG @@ -87,7 +134,7 @@ impl Default for TcpTlsConfig { impl Default for HttpConfig { fn default() -> HttpConfig { HttpConfig { - enabled: true, + enabled: SERVER_CONFIG.http.enabled, address: SERVER_CONFIG.http.address.parse().unwrap(), max_request_size: SERVER_CONFIG.http.max_request_size.parse().unwrap(), cors: HttpCorsConfig::default(), @@ -185,15 +232,6 @@ impl Default for HttpTlsConfig { } } -impl Default for MessageCleanerConfig { - fn default() -> MessageCleanerConfig { - MessageCleanerConfig { - enabled: SERVER_CONFIG.message_cleaner.enabled, - interval: SERVER_CONFIG.message_cleaner.interval.parse().unwrap(), - } - } -} - impl Default for MessageSaverConfig { fn default() -> MessageSaverConfig { MessageSaverConfig { @@ -236,7 +274,6 @@ impl Default for SystemConfig { runtime: RuntimeConfig::default(), logging: LoggingConfig::default(), cache: CacheConfig::default(), - retention_policy: RetentionPolicyConfig::default(), stream: StreamConfig::default(), encryption: EncryptionConfig::default(), topic: TopicConfig::default(), @@ -319,25 +356,6 @@ impl Default for CacheConfig { } } -impl Default for RetentionPolicyConfig { - fn default() -> RetentionPolicyConfig { - RetentionPolicyConfig { - message_expiry: SERVER_CONFIG - .system - .retention_policy - .message_expiry - .parse() - .unwrap(), - max_topic_size: SERVER_CONFIG - .system - .retention_policy - .max_topic_size - .parse() - .unwrap(), - } - } -} - impl Default for EncryptionConfig { fn default() -> EncryptionConfig { EncryptionConfig { @@ -359,6 +377,8 @@ impl Default for TopicConfig { fn default() -> TopicConfig { TopicConfig { path: SERVER_CONFIG.system.topic.path.parse().unwrap(), + max_size: SERVER_CONFIG.system.topic.max_size.parse().unwrap(), + delete_oldest_segments: SERVER_CONFIG.system.topic.delete_oldest_segments, } } } @@ -381,6 +401,8 @@ impl Default for SegmentConfig { size: SERVER_CONFIG.system.segment.size.parse().unwrap(), cache_indexes: SERVER_CONFIG.system.segment.cache_indexes, cache_time_indexes: SERVER_CONFIG.system.segment.cache_time_indexes, + message_expiry: SERVER_CONFIG.system.segment.message_expiry.parse().unwrap(), + archive_expired: SERVER_CONFIG.system.segment.archive_expired, } } } diff --git a/server/src/configs/displays.rs b/server/src/configs/displays.rs index 59837aff8..d6a99036c 100644 --- a/server/src/configs/displays.rs +++ b/server/src/configs/displays.rs @@ -1,12 +1,16 @@ use crate::configs::quic::{QuicCertificateConfig, QuicConfig}; +use crate::configs::server::{ + ArchiverConfig, DataMaintenanceConfig, DiskArchiverConfig, MessagesMaintenanceConfig, + S3ArchiverConfig, StateMaintenanceConfig, +}; use crate::configs::system::MessageDeduplicationConfig; use crate::configs::{ http::{HttpConfig, HttpCorsConfig, HttpJwtConfig, HttpMetricsConfig, HttpTlsConfig}, resource_quota::MemoryResourceQuota, - server::{MessageCleanerConfig, MessageSaverConfig, ServerConfig}, + server::{MessageSaverConfig, ServerConfig}, system::{ CacheConfig, CompressionConfig, EncryptionConfig, LoggingConfig, PartitionConfig, - RetentionPolicyConfig, SegmentConfig, StreamConfig, SystemConfig, TopicConfig, + SegmentConfig, StreamConfig, SystemConfig, TopicConfig, }, tcp::{TcpConfig, TcpTlsConfig}, }; @@ -110,52 +114,88 @@ impl Display for CompressionConfig { } } -impl Display for ServerConfig { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl Display for DataMaintenanceConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{{ message_cleaner: {}, message_saver: {}, system: {}, quic: {}, tcp: {}, http: {} }}", - self.message_cleaner, self.message_saver, self.system, self.quic, self.tcp, self.http + "{{ archiver: {}, messages: {}, state: {} }}", + self.archiver, self.messages, self.state ) } } -impl Display for MessageCleanerConfig { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl Display for ArchiverConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{{ enabled: {}, interval: {} }}", - self.enabled, self.interval + "{{ enabled: {}, kind: {:?}, disk: {:?}, s3: {:?} }}", + self.enabled, self.kind, self.disk, self.s3 ) } } -impl Display for MessageSaverConfig { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl Display for DiskArchiverConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{{ path: {} }}", self.path) + } +} + +impl Display for S3ArchiverConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{{ enabled: {}, enforce_fsync: {}, interval: {} }}", - self.enabled, self.enforce_fsync, self.interval + "{{ key_id: {}, access_key: ******, region: {}, bucket: {} }}", + self.key_id, self.region, self.bucket ) } } -impl Display for CacheConfig { +impl Display for MessagesMaintenanceConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{{ enabled: {}, size: {} }}", self.enabled, self.size) + write!( + f, + "{{ archiver_enabled: {}, cleaner_enabled: {}, interval: {} }}", + self.archiver_enabled, self.cleaner_enabled, self.interval + ) } } -impl Display for RetentionPolicyConfig { +impl Display for StateMaintenanceConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{{ message_expiry {}, max_topic_size: {} }}", - self.message_expiry, self.max_topic_size + "{{ archiver_enabled: {}, overwrite: {}, interval: {} }}", + self.archiver_enabled, self.overwrite, self.interval ) } } +impl Display for ServerConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{{ data_maintenance: {}, message_saver: {}, system: {}, quic: {}, tcp: {}, http: {} }}", + self.data_maintenance, self.message_saver, self.system, self.quic, self.tcp, self.http + ) + } +} + +impl Display for MessageSaverConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{{ enabled: {}, enforce_fsync: {}, interval: {} }}", + self.enabled, self.enforce_fsync, self.interval + ) + } +} + +impl Display for CacheConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{{ enabled: {}, size: {} }}", self.enabled, self.size) + } +} + impl Display for EncryptionConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "{{ enabled: {} }}", self.enabled) @@ -170,7 +210,11 @@ impl Display for StreamConfig { impl Display for TopicConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{{ path: {} }}", self.path) + write!( + f, + "{{ path: {}, max_size: {}, delete_oldest_segments: {} }}", + self.path, self.max_size, self.delete_oldest_segments + ) } } @@ -201,8 +245,8 @@ impl Display for SegmentConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{{ size_bytes: {}, cache_indexes: {}, cache_time_indexes: {} }}", - self.size, self.cache_indexes, self.cache_time_indexes + "{{ size_bytes: {}, cache_indexes: {}, cache_time_indexes: {}, message_expiry: {}, archive_expired: {} }}", + self.size, self.cache_indexes, self.cache_time_indexes, self.message_expiry, self.archive_expired ) } } diff --git a/server/src/configs/server.rs b/server/src/configs/server.rs index 937de99a6..f09f89c12 100644 --- a/server/src/configs/server.rs +++ b/server/src/configs/server.rs @@ -1,3 +1,4 @@ +use crate::archiver::ArchiverKind; use crate::configs::config_provider::ConfigProvider; use crate::configs::http::HttpConfig; use crate::configs::quic::QuicConfig; @@ -13,7 +14,7 @@ use std::sync::Arc; #[derive(Debug, Deserialize, Serialize, Clone)] pub struct ServerConfig { - pub message_cleaner: MessageCleanerConfig, + pub data_maintenance: DataMaintenanceConfig, pub message_saver: MessageSaverConfig, pub personal_access_token: PersonalAccessTokenConfig, pub system: Arc, @@ -23,13 +24,52 @@ pub struct ServerConfig { } #[serde_as] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] +pub struct DataMaintenanceConfig { + pub archiver: ArchiverConfig, + pub messages: MessagesMaintenanceConfig, + pub state: StateMaintenanceConfig, +} + #[derive(Debug, Deserialize, Serialize, Clone)] -pub struct MessageCleanerConfig { +pub struct ArchiverConfig { pub enabled: bool, + pub kind: ArchiverKind, + pub disk: Option, + pub s3: Option, +} + +#[serde_as] +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct MessagesMaintenanceConfig { + pub archiver_enabled: bool, + pub cleaner_enabled: bool, + #[serde_as(as = "DisplayFromStr")] + pub interval: IggyDuration, +} + +#[serde_as] +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct StateMaintenanceConfig { + pub archiver_enabled: bool, + pub overwrite: bool, #[serde_as(as = "DisplayFromStr")] pub interval: IggyDuration, } +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct DiskArchiverConfig { + pub path: String, +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct S3ArchiverConfig { + pub key_id: String, + pub access_key: String, + pub region: String, + pub bucket: String, +} + #[serde_as] #[derive(Debug, Deserialize, Serialize, Clone)] pub struct MessageSaverConfig { @@ -39,14 +79,14 @@ pub struct MessageSaverConfig { pub interval: IggyDuration, } -#[derive(Debug, Deserialize, Serialize, Copy, Clone)] +#[derive(Debug, Deserialize, Serialize, Clone)] pub struct PersonalAccessTokenConfig { pub max_tokens_per_user: u32, pub cleaner: PersonalAccessTokenCleanerConfig, } #[serde_as] -#[derive(Debug, Deserialize, Serialize, Copy, Clone)] +#[derive(Debug, Deserialize, Serialize, Clone)] pub struct PersonalAccessTokenCleanerConfig { pub enabled: bool, #[serde_as(as = "DisplayFromStr")] diff --git a/server/src/configs/system.rs b/server/src/configs/system.rs index 60bb8ce9e..c02c76a55 100644 --- a/server/src/configs/system.rs +++ b/server/src/configs/system.rs @@ -17,7 +17,6 @@ pub struct SystemConfig { pub runtime: RuntimeConfig, pub logging: LoggingConfig, pub cache: CacheConfig, - pub retention_policy: RetentionPolicyConfig, pub stream: StreamConfig, pub topic: TopicConfig, pub partition: PartitionConfig, @@ -72,15 +71,6 @@ pub struct CacheConfig { pub size: MemoryResourceQuota, } -#[serde_as] -#[derive(Debug, Deserialize, Serialize, Copy, Clone)] -pub struct RetentionPolicyConfig { - #[serde_as(as = "DisplayFromStr")] - pub message_expiry: IggyExpiry, - #[serde_as(as = "DisplayFromStr")] - pub max_topic_size: MaxTopicSize, -} - #[derive(Debug, Deserialize, Serialize)] pub struct EncryptionConfig { pub enabled: bool, @@ -92,9 +82,13 @@ pub struct StreamConfig { pub path: String, } +#[serde_as] #[derive(Debug, Deserialize, Serialize)] pub struct TopicConfig { pub path: String, + #[serde_as(as = "DisplayFromStr")] + pub max_size: MaxTopicSize, + pub delete_oldest_segments: bool, } #[derive(Debug, Deserialize, Serialize)] @@ -114,11 +108,15 @@ pub struct MessageDeduplicationConfig { pub expiry: IggyDuration, } +#[serde_as] #[derive(Debug, Deserialize, Serialize)] pub struct SegmentConfig { pub size: IggyByteSize, pub cache_indexes: bool, pub cache_time_indexes: bool, + #[serde_as(as = "DisplayFromStr")] + pub message_expiry: IggyExpiry, + pub archive_expired: bool, } impl SystemConfig { diff --git a/server/src/configs/validators.rs b/server/src/configs/validators.rs index c9afa4709..d50997d42 100644 --- a/server/src/configs/validators.rs +++ b/server/src/configs/validators.rs @@ -1,9 +1,13 @@ extern crate sysinfo; -use super::server::{MessageCleanerConfig, MessageSaverConfig}; +use super::server::{ + ArchiverConfig, DataMaintenanceConfig, MessageSaverConfig, MessagesMaintenanceConfig, + StateMaintenanceConfig, +}; use super::system::CompressionConfig; +use crate::archiver::ArchiverKind; use crate::configs::server::{PersonalAccessTokenConfig, ServerConfig}; -use crate::configs::system::{CacheConfig, RetentionPolicyConfig, SegmentConfig}; +use crate::configs::system::{CacheConfig, SegmentConfig}; use crate::server_error::ServerError; use crate::streaming::segments::segment; use iggy::compression::compression_algorithm::CompressionAlgorithm; @@ -12,40 +16,44 @@ use iggy::utils::expiry::IggyExpiry; use iggy::utils::topic_size::MaxTopicSize; use iggy::validatable::Validatable; use sysinfo::System; -use tracing::{error, info, warn}; +use tracing::{info, warn}; impl Validatable for ServerConfig { fn validate(&self) -> Result<(), ServerError> { + self.data_maintenance.validate()?; + self.personal_access_token.validate()?; self.system.segment.validate()?; self.system.cache.validate()?; - self.system.retention_policy.validate()?; self.system.compression.validate()?; - self.personal_access_token.validate()?; - let topic_size = match self.system.retention_policy.max_topic_size { + let topic_size = match self.system.topic.max_size { MaxTopicSize::Custom(size) => Ok(size.as_bytes_u64()), MaxTopicSize::Unlimited => Ok(u64::MAX), - MaxTopicSize::ServerDefault => { - error!("Max topic size cannot be set to server default."); - Err(ServerError::InvalidConfiguration) - } + MaxTopicSize::ServerDefault => Err(ServerError::InvalidConfiguration( + "Max topic size cannot be set to server default.".into(), + )), }?; - if let IggyExpiry::ServerDefault = self.system.retention_policy.message_expiry { - error!("Message expiry cannot be set to server default."); - return Err(ServerError::InvalidConfiguration); + if let IggyExpiry::ServerDefault = self.system.segment.message_expiry { + return Err(ServerError::InvalidConfiguration( + "Message expiry cannot be set to server default.".into(), + )); } if self.http.enabled { if let IggyExpiry::ServerDefault = self.http.jwt.access_token_expiry { - error!("Access token expiry cannot be set to server default."); - return Err(ServerError::InvalidConfiguration); + return Err(ServerError::InvalidConfiguration( + "Access token expiry cannot be set to server default.".into(), + )); } } if topic_size < self.system.segment.size.as_bytes_u64() { - error!("Max topic size cannot be lower than segment size. Max topic size: {}, segment size: {}.",topic_size, self.system.segment.size); - return Err(ServerError::InvalidConfiguration); + return Err(ServerError::InvalidConfiguration(format!( + "Max topic size cannot be lower than segment size. Max topic size: {}, segment size: {}.", + topic_size, + self.system.segment.size + ))); } Ok(()) @@ -104,20 +112,13 @@ impl Validatable for CacheConfig { } } -impl Validatable for RetentionPolicyConfig { - fn validate(&self) -> Result<(), ServerError> { - Ok(()) - } -} - impl Validatable for SegmentConfig { fn validate(&self) -> Result<(), ServerError> { if self.size.as_bytes_u64() as u32 > segment::MAX_SIZE_BYTES { - error!( - "Segment configuration -> size cannot be greater than: {} bytes.", + return Err(ServerError::InvalidConfiguration(format!( + "Segment size cannot be greater than: {} bytes.", segment::MAX_SIZE_BYTES - ); - return Err(ServerError::InvalidConfiguration); + ))); } Ok(()) @@ -127,19 +128,102 @@ impl Validatable for SegmentConfig { impl Validatable for MessageSaverConfig { fn validate(&self) -> Result<(), ServerError> { if self.enabled && self.interval.is_zero() { - error!("Message saver interval size cannot be zero, it must be greater than 0."); - return Err(ServerError::InvalidConfiguration); + return Err(ServerError::InvalidConfiguration( + "Message saver interval size cannot be zero, it must be greater than 0.".into(), + )); } Ok(()) } } -impl Validatable for MessageCleanerConfig { +impl Validatable for DataMaintenanceConfig { fn validate(&self) -> Result<(), ServerError> { - if self.enabled && self.interval.is_zero() { - error!("Message cleaner interval size cannot be zero, it must be greater than 0."); - return Err(ServerError::InvalidConfiguration); + self.archiver.validate()?; + self.messages.validate()?; + self.state.validate()?; + Ok(()) + } +} + +impl Validatable for ArchiverConfig { + fn validate(&self) -> Result<(), ServerError> { + if !self.enabled { + return Ok(()); + } + + return match self.kind { + ArchiverKind::Disk => { + if self.disk.is_none() { + return Err(ServerError::InvalidConfiguration( + "Disk archiver configuration is missing.".into(), + )); + } + + let disk = self.disk.as_ref().unwrap(); + if disk.path.is_empty() { + return Err(ServerError::InvalidConfiguration( + "Disk archiver path cannot be empty.".into(), + )); + } + Ok(()) + } + ArchiverKind::S3 => { + if self.s3.is_none() { + return Err(ServerError::InvalidConfiguration( + "S3 archiver configuration is missing.".into(), + )); + } + + let s3 = self.s3.as_ref().unwrap(); + if s3.key_id.is_empty() { + return Err(ServerError::InvalidConfiguration( + "S3 archiver key id cannot be empty.".into(), + )); + } + + if s3.access_key.is_empty() { + return Err(ServerError::InvalidConfiguration( + "S3 archiver access key cannot be empty.".into(), + )); + } + + if s3.region.is_empty() { + return Err(ServerError::InvalidConfiguration( + "S3 archiver region cannot be empty.".into(), + )); + } + + if s3.bucket.is_empty() { + return Err(ServerError::InvalidConfiguration( + "S3 archiver bucket cannot be empty.".into(), + )); + } + Ok(()) + } + }; + } +} + +impl Validatable for MessagesMaintenanceConfig { + fn validate(&self) -> Result<(), ServerError> { + if self.archiver_enabled && self.interval.is_zero() { + return Err(ServerError::InvalidConfiguration( + "Message maintenance interval size cannot be zero, it must be greater than 0." + .into(), + )); + } + + Ok(()) + } +} + +impl Validatable for StateMaintenanceConfig { + fn validate(&self) -> Result<(), ServerError> { + if self.archiver_enabled && self.interval.is_zero() { + return Err(ServerError::InvalidConfiguration( + "State maintenance interval size cannot be zero, it must be greater than 0.".into(), + )); } Ok(()) @@ -149,15 +233,16 @@ impl Validatable for MessageCleanerConfig { impl Validatable for PersonalAccessTokenConfig { fn validate(&self) -> Result<(), ServerError> { if self.max_tokens_per_user == 0 { - error!("Max tokens per user cannot be zero, it must be greater than 0."); - return Err(ServerError::InvalidConfiguration); + return Err(ServerError::InvalidConfiguration( + "Max tokens per user cannot be zero, it must be greater than 0.".into(), + )); } if self.cleaner.enabled && self.cleaner.interval.is_zero() { - error!( + return Err(ServerError::InvalidConfiguration( "Personal access token cleaner interval cannot be zero, it must be greater than 0." - ); - return Err(ServerError::InvalidConfiguration); + .into(), + )); } Ok(()) diff --git a/server/src/lib.rs b/server/src/lib.rs index b468ce4d7..1cfdfce7b 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -8,6 +8,7 @@ use tikv_jemallocator::Jemalloc; #[global_allocator] static GLOBAL: Jemalloc = Jemalloc; +pub mod archiver; pub mod args; pub mod binary; pub mod channels; @@ -22,3 +23,10 @@ pub mod state; pub mod streaming; pub mod tcp; pub mod versioning; + +pub(crate) fn map_toggle_str<'a>(enabled: bool) -> &'a str { + match enabled { + true => "enabled", + false => "disabled", + } +} diff --git a/server/src/main.rs b/server/src/main.rs index 2dacb689f..3f0b57603 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -2,8 +2,9 @@ use anyhow::Result; use clap::Parser; use figlet_rs::FIGfont; use server::args::Args; -use server::channels::commands::clean_messages::CleanMessagesExecutor; +use server::channels::commands::archive_state::ArchiveStateExecutor; use server::channels::commands::clean_personal_access_tokens::CleanPersonalAccessTokensExecutor; +use server::channels::commands::maintain_messages::MaintainMessagesExecutor; use server::channels::commands::print_sysinfo::SysInfoPrintExecutor; use server::channels::commands::save_messages::SaveMessagesExecutor; use server::channels::handler::ServerCommandHandler; @@ -42,21 +43,23 @@ async fn main() -> Result<(), ServerError> { let system = SharedSystem::new(System::new( config.system.clone(), - config.personal_access_token, + config.data_maintenance.clone(), + config.personal_access_token.clone(), )); - let _command_handler = ServerCommandHandler::new(system.clone(), &config) - .install_handler(SaveMessagesExecutor) - .install_handler(CleanMessagesExecutor) - .install_handler(CleanPersonalAccessTokensExecutor) - .install_handler(SysInfoPrintExecutor); - // Workaround to ensure that the statistics are initialized before the server // loads streams and starts accepting connections. This is necessary to // have the correct statistics when the server starts. system.write().get_stats_bypass_auth().await?; system.write().init().await?; + let _command_handler = ServerCommandHandler::new(system.clone(), &config) + .install_handler(SaveMessagesExecutor) + .install_handler(MaintainMessagesExecutor) + .install_handler(ArchiveStateExecutor) + .install_handler(CleanPersonalAccessTokensExecutor) + .install_handler(SysInfoPrintExecutor); + #[cfg(unix)] let (mut ctrl_c, mut sigterm) = { use tokio::signal::unix::{signal, SignalKind}; diff --git a/server/src/server_error.rs b/server/src/server_error.rs index fecfe7c49..e3e2ed252 100644 --- a/server/src/server_error.rs +++ b/server/src/server_error.rs @@ -13,8 +13,8 @@ pub enum ServerError { InvalidConfigurationProvider(String), #[error("Cannot load configuration: {0}")] CannotLoadConfiguration(String), - #[error("Invalid configuration")] - InvalidConfiguration, + #[error("Invalid configuration: {0}")] + InvalidConfiguration(String), #[error("SDK error")] SdkError(#[from] iggy::error::IggyError), #[error("Write error")] @@ -45,4 +45,6 @@ pub enum ServerError { CannotRemoveOldSegmentFiles, #[error("Cannot persist new segment files")] CannotPersistNewSegmentFiles, + #[error("Cannot archive segment: {0}")] + CannotArchiveSegment(String), } diff --git a/server/src/streaming/partitions/segments.rs b/server/src/streaming/partitions/segments.rs index c536243b8..eccf2a805 100644 --- a/server/src/streaming/partitions/segments.rs +++ b/server/src/streaming/partitions/segments.rs @@ -20,6 +20,12 @@ impl Partition { &self.segments } + pub fn get_segment(&self, start_offset: u64) -> Option<&Segment> { + self.segments + .iter() + .find(|s| s.start_offset == start_offset) + } + pub fn get_segments_mut(&mut self) -> &mut Vec { &mut self.segments } @@ -27,7 +33,7 @@ impl Partition { pub async fn get_expired_segments_start_offsets(&self, now: IggyTimestamp) -> Vec { let mut expired_segments = Vec::new(); for segment in &self.segments { - if segment.is_closed && segment.is_expired(now).await { + if segment.is_expired(now).await { expired_segments.push(segment.start_offset); } } @@ -60,16 +66,15 @@ impl Partition { self.segments.push(new_segment); self.segments_count_of_parent_stream .fetch_add(1, Ordering::SeqCst); + self.segments + .sort_by(|a, b| a.start_offset.cmp(&b.start_offset)); Ok(()) } pub async fn delete_segment(&mut self, start_offset: u64) -> Result { let deleted_segment; { - let segment = self - .segments - .iter() - .find(|s| s.start_offset == start_offset); + let segment = self.get_segment(start_offset); if segment.is_none() { return Err(IggyError::SegmentNotFound); } @@ -86,6 +91,12 @@ impl Partition { } self.segments.retain(|s| s.start_offset != start_offset); + self.segments + .sort_by(|a, b| a.start_offset.cmp(&b.start_offset)); + info!( + "Segment with start offset: {} has been deleted from partition with ID: {}, stream with ID: {}, topic with ID: {}", + start_offset, self.partition_id, self.stream_id, self.topic_id + ); Ok(deleted_segment) } } diff --git a/server/src/streaming/segments/messages.rs b/server/src/streaming/segments/messages.rs index ca3347c44..fcbd93b09 100644 --- a/server/src/streaming/segments/messages.rs +++ b/server/src/streaming/segments/messages.rs @@ -9,7 +9,7 @@ use bytes::BufMut; use iggy::error::IggyError; use std::sync::atomic::Ordering; use std::sync::Arc; -use tracing::{trace, warn}; +use tracing::{info, trace, warn}; const EMPTY_MESSAGES: Vec = vec![]; @@ -324,6 +324,10 @@ impl Segment { self.end_offset = self.current_offset; self.is_closed = true; self.unsaved_batches = None; + info!( + "Closed segment with start offset: {} for partition with ID: {}.", + self.start_offset, self.partition_id + ); } else { self.unsaved_batches.as_mut().unwrap().clear(); } diff --git a/server/src/streaming/segments/segment.rs b/server/src/streaming/segments/segment.rs index dd6059573..c8a927f5b 100644 --- a/server/src/streaming/segments/segment.rs +++ b/server/src/streaming/segments/segment.rs @@ -89,7 +89,7 @@ impl Segment { size_bytes: 0, max_size_bytes: config.segment.size.as_bytes_u64() as u32, message_expiry: match message_expiry { - IggyExpiry::ServerDefault => config.retention_policy.message_expiry, + IggyExpiry::ServerDefault => config.segment.message_expiry, _ => message_expiry, }, indexes: match config.segment.cache_indexes { @@ -124,6 +124,10 @@ impl Segment { } pub async fn is_expired(&self, now: IggyTimestamp) -> bool { + if !self.is_closed { + return false; + } + match self.message_expiry { IggyExpiry::NeverExpire => false, IggyExpiry::ServerDefault => false, diff --git a/server/src/streaming/streams/topics.rs b/server/src/streaming/streams/topics.rs index b7fc4713f..a9464327a 100644 --- a/server/src/streaming/streams/topics.rs +++ b/server/src/streaming/streams/topics.rs @@ -114,7 +114,7 @@ impl Stream { let topic = self.get_topic_mut(id)?; let max_topic_size = match max_topic_size { - MaxTopicSize::ServerDefault => topic.config.retention_policy.max_topic_size, + MaxTopicSize::ServerDefault => topic.config.topic.max_size, _ => max_topic_size, }; diff --git a/server/src/streaming/systems/streams.rs b/server/src/streaming/systems/streams.rs index 0955c54a7..1b919ddc8 100644 --- a/server/src/streaming/systems/streams.rs +++ b/server/src/streaming/systems/streams.rs @@ -338,7 +338,7 @@ impl System { #[cfg(test)] mod tests { use super::*; - use crate::configs::server::PersonalAccessTokenConfig; + use crate::configs::server::{DataMaintenanceConfig, PersonalAccessTokenConfig}; use crate::configs::system::SystemConfig; use crate::state::command::EntryCommand; use crate::state::entry::StateEntry; @@ -362,6 +362,7 @@ mod tests { config, storage, Arc::new(TestState::default()), + DataMaintenanceConfig::default(), PersonalAccessTokenConfig::default(), ); let root = User::root(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD); diff --git a/server/src/streaming/systems/system.rs b/server/src/streaming/systems/system.rs index fbafce079..cbe4f2055 100644 --- a/server/src/streaming/systems/system.rs +++ b/server/src/streaming/systems/system.rs @@ -1,4 +1,4 @@ -use crate::configs::server::PersonalAccessTokenConfig; +use crate::configs::server::{DataMaintenanceConfig, PersonalAccessTokenConfig}; use crate::configs::system::SystemConfig; use crate::streaming::cache::memory_tracker::CacheMemoryTracker; use crate::streaming::clients::client_manager::ClientManager; @@ -17,12 +17,15 @@ use tokio::fs::{create_dir, remove_dir_all}; use tokio::time::Instant; use tracing::{info, trace}; -use crate::compat; +use crate::archiver::disk::DiskArchiver; +use crate::archiver::s3::S3Archiver; +use crate::archiver::{Archiver, ArchiverKind}; use crate::state::file::FileState; use crate::state::system::SystemState; use crate::state::State; use crate::streaming::users::user::User; use crate::versioning::SemanticVersion; +use crate::{compat, map_toggle_str}; use iggy::locking::IggySharedMut; use iggy::locking::IggySharedMutFn; use iggy::models::user_info::UserId; @@ -69,6 +72,7 @@ pub struct System { pub(crate) encryptor: Option>, pub(crate) metrics: Metrics, pub(crate) state: Arc, + pub(crate) archiver: Option>, pub personal_access_token: PersonalAccessTokenConfig, } @@ -77,7 +81,11 @@ pub struct System { const CACHE_OVER_EVICTION_FACTOR: u64 = 5; impl System { - pub fn new(config: Arc, pat_config: PersonalAccessTokenConfig) -> System { + pub fn new( + config: Arc, + data_maintenance_config: DataMaintenanceConfig, + pat_config: PersonalAccessTokenConfig, + ) -> System { let version = SemanticVersion::current().expect("Invalid version"); let persister: Arc = match config.partition.enforce_fsync { true => Arc::new(FileWithSyncPersister {}), @@ -92,28 +100,53 @@ impl System { config.clone(), SystemStorage::new(config, persister), state, + data_maintenance_config, pat_config, ) } pub fn create( - config: Arc, + system_config: Arc, storage: SystemStorage, state: Arc, + data_maintenance_config: DataMaintenanceConfig, pat_config: PersonalAccessTokenConfig, ) -> System { info!( "Server-side encryption is {}.", - Self::map_toggle_str(config.encryption.enabled) + map_toggle_str(system_config.encryption.enabled) ); + + let archiver_config = data_maintenance_config.archiver; + let archiver: Option> = if archiver_config.enabled { + info!("Archiving is enabled, kind: {}", archiver_config.kind); + match archiver_config.kind { + ArchiverKind::Disk => Some(Arc::new(DiskArchiver::new( + archiver_config + .disk + .clone() + .expect("Disk archiver config is missing"), + ))), + ArchiverKind::S3 => Some(Arc::new(S3Archiver::new( + archiver_config + .s3 + .clone() + .expect("S3 archiver config is missing"), + ))), + } + } else { + info!("Archiving is disabled."); + None + }; + System { - encryptor: match config.encryption.enabled { + encryptor: match system_config.encryption.enabled { true => Some(Box::new( - Aes256GcmEncryptor::from_base64_key(&config.encryption.key).unwrap(), + Aes256GcmEncryptor::from_base64_key(&system_config.encryption.key).unwrap(), )), false => None, }, - config, + config: system_config, streams: HashMap::new(), streams_ids: HashMap::new(), storage: Arc::new(storage), @@ -123,6 +156,7 @@ impl System { users: HashMap::new(), state, personal_access_token: pat_config, + archiver, } } @@ -173,6 +207,12 @@ impl System { .await?; self.load_streams(system_state.streams.into_values().collect()) .await?; + if let Some(archiver) = self.archiver.as_ref() { + archiver + .init() + .await + .expect("Failed to initialize archiver"); + } info!("Initialized system in {} ms.", now.elapsed().as_millis()); Ok(()) } @@ -199,13 +239,6 @@ impl System { } } - fn map_toggle_str<'a>(enabled: bool) -> &'a str { - match enabled { - true => "enabled", - false => "disabled", - } - } - pub async fn clean_cache(&self, size_to_clean: u64) { for stream in self.streams.values() { for topic in stream.get_topics() { diff --git a/server/src/streaming/topics/messages.rs b/server/src/streaming/topics/messages.rs index 736faad2b..3fbf0700e 100644 --- a/server/src/streaming/topics/messages.rs +++ b/server/src/streaming/topics/messages.rs @@ -78,6 +78,10 @@ impl Topic { return Err(IggyError::NoPartitions(self.topic_id, self.stream_id)); } + if self.is_full() { + return Err(IggyError::TopicFull(self.topic_id, self.stream_id)); + } + if messages.is_empty() { return Ok(()); } diff --git a/server/src/streaming/topics/topic.rs b/server/src/streaming/topics/topic.rs index 83b5c57e6..0ccaf3afc 100644 --- a/server/src/streaming/topics/topic.rs +++ b/server/src/streaming/topics/topic.rs @@ -15,6 +15,8 @@ use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::Arc; use tokio::sync::RwLock; +const ALMOST_FULL_THRESHOLD: f64 = 0.9; + #[derive(Debug)] pub struct Topic { pub stream_id: u32, @@ -106,13 +108,13 @@ impl Topic { consumer_groups_ids: HashMap::new(), current_consumer_group_id: AtomicU32::new(1), current_partition_id: AtomicU32::new(1), - message_expiry: match config.retention_policy.message_expiry { + message_expiry: match config.segment.message_expiry { IggyExpiry::NeverExpire => message_expiry, value => value, }, compression_algorithm, max_topic_size: match max_topic_size { - MaxTopicSize::ServerDefault => config.retention_policy.max_topic_size, + MaxTopicSize::ServerDefault => config.topic.max_size, _ => max_topic_size, }, replication_factor, @@ -124,6 +126,31 @@ impl Topic { Ok(topic) } + pub fn is_full(&self) -> bool { + match self.max_topic_size { + MaxTopicSize::Unlimited => false, + MaxTopicSize::ServerDefault => false, + MaxTopicSize::Custom(size) => { + self.size_bytes.load(Ordering::SeqCst) >= size.as_bytes_u64() + } + } + } + + pub fn is_almost_full(&self) -> bool { + match self.max_topic_size { + MaxTopicSize::Unlimited => false, + MaxTopicSize::ServerDefault => false, + MaxTopicSize::Custom(size) => { + self.size_bytes.load(Ordering::SeqCst) + >= (size.as_bytes_u64() as f64 * ALMOST_FULL_THRESHOLD) as u64 + } + } + } + + pub fn is_unlimited(&self) -> bool { + matches!(self.max_topic_size, MaxTopicSize::Unlimited) + } + pub fn get_size(&self) -> IggyByteSize { IggyByteSize::from(self.size_bytes.load(Ordering::SeqCst)) }