diff --git a/Cargo.lock b/Cargo.lock index 043028ac2..306469e32 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -438,12 +438,43 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cab2a5582fe222b2b298035ecfd4264835cbe82bd02201d64a8801eb5ceae5e6" +[[package]] +name = "attohttpc" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f77d243921b0979fbbd728dd2d5162e68ac8252976797c24eb5b3a6af9090dc" +dependencies = [ + "http 0.2.11", + "log", + "native-tls", + "serde", + "serde_json", + "url", +] + [[package]] name = "autocfg" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "aws-creds" +version = "0.36.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "390ad3b77f3e21e01a4a0355865853b681daf1988510b0b15e31c0c4ae7eb0f6" +dependencies = [ + "attohttpc", + "home", + "log", + "quick-xml", + "rust-ini", + "serde", + "thiserror", + "time", + "url", +] + [[package]] name = "aws-lc-rs" version = "1.7.1" @@ -471,6 +502,15 @@ dependencies = [ "paste", ] +[[package]] +name = "aws-region" +version = "0.25.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42fed2b9fca70f2908268d057a607f2a906f47edbf856ea8587de9038d264e22" +dependencies = [ + "thiserror", +] + [[package]] name = "axum" version = "0.6.20" @@ -1160,6 +1200,26 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom", + "once_cell", + "tiny-keccak", +] + [[package]] name = "constant_time_eq" version = "0.3.0" @@ -1271,6 +1331,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "crunchy" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" + [[package]] name = "crypto-common" version = "0.1.6" @@ -1412,6 +1478,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "dlv-list" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f" +dependencies = [ + "const-random", +] + [[package]] name = "doc-comment" version = "0.3.3" @@ -1872,6 +1947,12 @@ dependencies = [ "ahash", ] +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" + [[package]] name = "hashbrown" version = "0.14.3" @@ -2078,6 +2159,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper 0.14.28", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "hyper-tls" version = "0.6.0" @@ -2486,6 +2580,23 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "maybe-async" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cf92c10c7e361d6b99666ec1c6f9805b0bea2c3bd8c78dc6fe98ac5bd78db11" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + +[[package]] +name = "md5" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" + [[package]] name = "memchr" version = "2.7.1" @@ -2516,6 +2627,15 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "minidom" +version = "0.15.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f45614075738ce1b77a1768912a60c0227525971b03e09122a05b8a34a2a6278" +dependencies = [ + "rxml", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -2828,6 +2948,16 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "ordered-multimap" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ed8acf08e98e744e5384c8bc63ceb0364e68a6854187221c18df61c4797690e" +dependencies = [ + "dlv-list", + "hashbrown 0.13.2", +] + [[package]] name = "ordered-stream" version = "0.2.0" @@ -3261,6 +3391,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "quick-xml" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eff6510e86862b57b210fd8cbe8ed3f0d7d600b9c2863cd4549a2e033c66e956" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quinn" version = "0.11.1" @@ -3494,7 +3634,7 @@ dependencies = [ "http-body 1.0.0", "http-body-util", "hyper 1.1.0", - "hyper-tls", + "hyper-tls 0.6.0", "hyper-util", "ipnet", "js-sys", @@ -3634,6 +3774,53 @@ dependencies = [ "serde", ] +[[package]] +name = "rust-ini" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e2a3bcec1f113553ef1c88aae6c020a369d03d55b58de9869a0908930385091" +dependencies = [ + "cfg-if", + "ordered-multimap", +] + +[[package]] +name = "rust-s3" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6679da8efaf4c6f0c161de0961dfe95fb6e9049c398d6fbdada2639f053aedb" +dependencies = [ + "async-trait", + "aws-creds", + "aws-region", + "base64 0.21.7", + "bytes", + "cfg-if", + "futures", + "hex", + "hmac", + "http 0.2.11", + "hyper 0.14.28", + "hyper-tls 0.5.0", + "log", + "maybe-async", + "md5", + "minidom", + "native-tls", + "percent-encoding", + "quick-xml", + "serde", + "serde_derive", + "serde_json", + "sha2", + "thiserror", + "time", + "tokio", + "tokio-native-tls", + "tokio-stream", + "url", +] + [[package]] name = "rust_decimal" version = "1.33.1" @@ -3810,6 +3997,23 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" +[[package]] +name = "rxml" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a98f186c7a2f3abbffb802984b7f1dfd65dac8be1aafdaabbca4137f53f0dff7" +dependencies = [ + "bytes", + "rxml_validation", + "smartstring", +] + +[[package]] +name = "rxml_validation" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22a197350ece202f19a166d1ad6d9d6de145e1d2a8ef47db299abe164dbd7530" + [[package]] name = "ryu" version = "1.0.16" @@ -4053,7 +4257,7 @@ dependencies = [ [[package]] name = "server" -version = "0.3.2" +version = "0.3.3" dependencies = [ "anyhow", "async-stream", @@ -4083,6 +4287,7 @@ dependencies = [ "rcgen", "ring", "rmp-serde", + "rust-s3", "rustls 0.23.10", "rustls-pemfile", "serde", @@ -4218,6 +4423,17 @@ version = "1.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" +[[package]] +name = "smartstring" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fb72c633efbaa2dd666986505016c32c3044395ceaf881518399d2f4127ee29" +dependencies = [ + "autocfg", + "static_assertions", + "version_check", +] + [[package]] name = "socket2" version = "0.4.10" @@ -4509,6 +4725,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinyvec" version = "1.6.0" diff --git a/configs/server.json b/configs/server.json index af24b8f7d..c07a5bb6d 100644 --- a/configs/server.json +++ b/configs/server.json @@ -8,9 +8,11 @@ }, "s3": { "key_id": "123", - "access_key": "secret", + "key_secret": "secret", "bucket": "iggy", - "region": "eu-west-1" + "endpoint": "http://localhost:9000", + "region": "eu-west-1", + "tmp_upload_dir": "local_data/s3_tmp" } }, "messages": { diff --git a/configs/server.toml b/configs/server.toml index ce02beba6..c0ffbd93a 100644 --- a/configs/server.toml +++ b/configs/server.toml @@ -12,12 +12,21 @@ path = "local_data/archive" [data_maintenance.archiver.s3] # Access key ID for the S3 bucket. key_id = "123" + # Secret access key for the S3 bucket -access_key = "secret" +key_secret = "secret" + +# Name of the S3 bucket. +bucket = "iggy" + +# Endpoint of the S3 region. +endpoint = "http://localhost:9000" + # Region of the S3 bucket. region = "eu-west-1" -# Endpoint of the S3 bucket. -bucket = "iggy" + +# Temporary directory for storing the data before uploading to S3. +tmp_upload_dir = "local_data/s3_tmp" [data_maintenance.messages] # Enables or disables the archiver process for closed segments containing messages. diff --git a/integration/tests/archiver/disk.rs b/integration/tests/archiver/disk.rs index abc045e68..2b9412854 100644 --- a/integration/tests/archiver/disk.rs +++ b/integration/tests/archiver/disk.rs @@ -1,12 +1,13 @@ -use crate::archiver::ArchiverSetup; +use crate::archiver::DiskArchiverSetup; use server::archiver::Archiver; +use server::server_error::ServerError; use server::streaming::utils::file; use std::path::Path; use tokio::io::{AsyncReadExt, AsyncWriteExt}; #[tokio::test] async fn should_init_base_archiver_directory() { - let setup = ArchiverSetup::init().await; + let setup = DiskArchiverSetup::init().await; let archiver = setup.archiver(); let result = archiver.init().await; assert!(result.is_ok()); @@ -16,7 +17,7 @@ async fn should_init_base_archiver_directory() { #[tokio::test] async fn should_archive_file_on_disk_by_making_a_copy_of_original_file() { - let setup = ArchiverSetup::init().await; + let setup = DiskArchiverSetup::init().await; let archiver = setup.archiver(); let content = "hello world"; let file_to_archive_path = format!("{}/file_to_archive", setup.base_path); @@ -31,7 +32,7 @@ async fn should_archive_file_on_disk_by_making_a_copy_of_original_file() { #[tokio::test] async fn should_archive_file_on_disk_within_additional_base_directory() { - let setup = ArchiverSetup::init().await; + let setup = DiskArchiverSetup::init().await; let archiver = setup.archiver(); let base_directory = "base"; let content = "hello world"; @@ -52,7 +53,7 @@ async fn should_archive_file_on_disk_within_additional_base_directory() { #[tokio::test] async fn should_return_true_when_file_is_archived() { - let setup = ArchiverSetup::init().await; + let setup = DiskArchiverSetup::init().await; let archiver = setup.archiver(); let content = "hello world"; let file_to_archive_path = format!("{}/file_to_archive", setup.base_path); @@ -67,7 +68,7 @@ async fn should_return_true_when_file_is_archived() { #[tokio::test] async fn should_return_false_when_file_is_not_archived() { - let setup = ArchiverSetup::init().await; + let setup = DiskArchiverSetup::init().await; let archiver = setup.archiver(); let content = "hello world"; let file_to_archive_path = format!("{}/file_to_archive", setup.base_path); @@ -78,6 +79,19 @@ async fn should_return_false_when_file_is_not_archived() { assert!(!is_archived.unwrap()); } +#[tokio::test] +async fn should_fail_when_file_to_archive_does_not_exist() { + let setup = DiskArchiverSetup::init().await; + let archiver = setup.archiver(); + let file_to_archive_path = "invalid_file_to_archive"; + let files_to_archive = vec![file_to_archive_path]; + let result = archiver.archive(&files_to_archive, None).await; + + assert!(result.is_err()); + let error = result.err().unwrap(); + assert!(matches!(error, ServerError::FileToArchiveNotFound(_))); +} + async fn create_file(path: &str, content: &str) { let mut file = file::overwrite(path).await.unwrap(); file.write_all(content.as_bytes()).await.unwrap(); diff --git a/integration/tests/archiver/mod.rs b/integration/tests/archiver/mod.rs index f8172d614..1d1452c18 100644 --- a/integration/tests/archiver/mod.rs +++ b/integration/tests/archiver/mod.rs @@ -4,15 +4,16 @@ use tokio::fs::create_dir; use uuid::Uuid; mod disk; +mod s3; -pub struct ArchiverSetup { +pub struct DiskArchiverSetup { base_path: String, archive_path: String, archiver: DiskArchiver, } -impl ArchiverSetup { - pub async fn init() -> ArchiverSetup { +impl DiskArchiverSetup { + pub async fn init() -> DiskArchiverSetup { let base_path = format!("test_local_data_{}", Uuid::new_v4().to_u128_le()); let archive_path = format!("{}/archive", base_path); let config = DiskArchiverConfig { @@ -33,7 +34,7 @@ impl ArchiverSetup { } } -impl Drop for ArchiverSetup { +impl Drop for DiskArchiverSetup { fn drop(&mut self) { std::fs::remove_dir_all(&self.base_path).unwrap(); } diff --git a/integration/tests/archiver/s3.rs b/integration/tests/archiver/s3.rs new file mode 100644 index 000000000..aebbce23c --- /dev/null +++ b/integration/tests/archiver/s3.rs @@ -0,0 +1,20 @@ +use server::archiver::s3::S3Archiver; +use server::archiver::Archiver; +use server::configs::server::S3ArchiverConfig; + +#[tokio::test] +async fn should_not_be_initialized_given_invalid_configuration() { + let config = S3ArchiverConfig { + key_id: "test".to_owned(), + key_secret: "secret".to_owned(), + bucket: "iggy".to_owned(), + endpoint: Some("https://iggy.s3.com".to_owned()), + region: None, + tmp_upload_dir: "tmp".to_owned(), + }; + let archiver = S3Archiver::new(config); + assert!(archiver.is_ok()); + let archiver = archiver.unwrap(); + let init = archiver.init().await; + assert!(init.is_err()); +} diff --git a/server/Cargo.toml b/server/Cargo.toml index 0485c34c7..27d4bc8d8 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.3.2" +version = "0.3.3" edition = "2021" build = "src/build.rs" @@ -38,6 +38,7 @@ quinn = { version = "0.11.1" } rcgen = "0.13.1" ring = "0.17.8" rmp-serde = "1.3.0" +rust-s3 = { version = "0.34.0", features = ["default"] } rustls = { version = "0.23.10" } rustls-pemfile = "2.1.2" serde = { version = "1.0.203", features = ["derive", "rc"] } @@ -86,3 +87,6 @@ path = "src/main.rs" # in case if feature 'tokio-console' is enabled. [package.metadata.cargo-udeps.ignore] normal = ["tracing-appender", "strip-ansi-escapes"] + +[package.metadata.cargo-machete] +ignored = ["rust-s3"] diff --git a/server/src/archiver/disk.rs b/server/src/archiver/disk.rs index fc0998702..e9e2e9333 100644 --- a/server/src/archiver/disk.rs +++ b/server/src/archiver/disk.rs @@ -49,6 +49,10 @@ impl Archiver for DiskArchiver { for file in files { debug!("Archiving file: {file}"); let source = Path::new(file); + if !source.exists() { + return Err(ServerError::FileToArchiveNotFound(file.to_string())); + } + let base_directory = base_directory.as_deref().unwrap_or_default(); let destination = Path::new(&self.config.path).join(base_directory).join(file); let destination_path = destination.to_str().unwrap_or_default().to_owned(); diff --git a/server/src/archiver/mod.rs b/server/src/archiver/mod.rs index f48710ef9..d301b451e 100644 --- a/server/src/archiver/mod.rs +++ b/server/src/archiver/mod.rs @@ -23,7 +23,7 @@ impl FromStr for ArchiverKind { fn from_str(s: &str) -> Result { match s.to_lowercase().as_str() { "disk" => Ok(ArchiverKind::Disk), - "s3" => Err("S3 archiver is not implemented yet.".to_string()), + "s3" => Ok(ArchiverKind::S3), _ => Err(format!("Unknown archiver kind: {}", s)), } } diff --git a/server/src/archiver/s3.rs b/server/src/archiver/s3.rs index 1db48eb57..8c5c6f569 100644 --- a/server/src/archiver/s3.rs +++ b/server/src/archiver/s3.rs @@ -1,38 +1,158 @@ use crate::archiver::Archiver; use crate::configs::server::S3ArchiverConfig; use crate::server_error::ServerError; +use crate::streaming::utils::file; use async_trait::async_trait; +use s3::creds::Credentials; +use s3::{Bucket, Region}; +use std::path::Path; +use tokio::fs; +use tracing::{debug, error, info}; #[derive(Debug)] pub struct S3Archiver { - _config: S3ArchiverConfig, + bucket: Bucket, + tmp_upload_dir: String, } impl S3Archiver { - pub fn new(config: S3ArchiverConfig) -> Self { - S3Archiver { _config: config } + pub fn new(config: S3ArchiverConfig) -> Result { + let credentials = Credentials::new( + Some(&config.key_id), + Some(&config.key_secret), + None, + None, + None, + ) + .map_err(|_| ServerError::InvalidS3Credentials)?; + + let bucket = Bucket::new( + &config.bucket, + Region::Custom { + endpoint: config + .endpoint + .map(|e| e.to_owned()) + .unwrap_or("".to_owned()) + .to_owned(), + region: config + .region + .map(|r| r.to_owned()) + .unwrap_or("".to_owned()) + .to_owned(), + }, + credentials, + ) + .map_err(|_| ServerError::CannotInitializeS3Archiver)?; + Ok(Self { + bucket, + tmp_upload_dir: config.tmp_upload_dir, + }) + } + + async fn copy_file_to_tmp(&self, path: &str) -> Result { + debug!( + "Copying file: {path} to temporary S3 upload directory: {}", + self.tmp_upload_dir + ); + let source = Path::new(path); + let destination = Path::new(&self.tmp_upload_dir).join(path); + let destination_path = destination.to_str().unwrap_or_default().to_owned(); + debug!("Creating temporary S3 upload directory: {destination_path}"); + fs::create_dir_all(destination.parent().unwrap()).await?; + debug!("Copying file: {path} to temporary S3 upload path: {destination_path}"); + fs::copy(source, destination).await?; + debug!("File: {path} copied to temporary S3 upload path: {destination_path}"); + Ok(destination_path) } } #[async_trait] impl Archiver for S3Archiver { async fn init(&self) -> Result<(), ServerError> { + let response = self.bucket.list("/".to_string(), None).await; + if let Err(error) = response { + error!("Cannot initialize S3 archiver: {error}"); + return Err(ServerError::CannotInitializeS3Archiver); + } + + if Path::new(&self.tmp_upload_dir).exists() { + info!( + "Removing existing S3 archiver temporary upload directory: {}", + self.tmp_upload_dir + ); + fs::remove_dir_all(&self.tmp_upload_dir).await?; + } + info!( + "Creating S3 archiver temporary upload directory: {}", + self.tmp_upload_dir + ); + fs::create_dir_all(&self.tmp_upload_dir).await?; Ok(()) } async fn is_archived( &self, - _file: &str, - _base_directory: Option, + file: &str, + base_directory: Option, ) -> Result { - todo!("Checking if file is archived on S3") + debug!("Checking if file: {file} is archived on S3."); + let base_directory = base_directory.as_deref().unwrap_or_default(); + let destination = Path::new(&base_directory).join(file); + let destination_path = destination.to_str().unwrap_or_default().to_owned(); + let response = self.bucket.get_object_tagging(destination_path).await; + if response.is_err() { + debug!("File: {file} is not archived on S3."); + return Ok(false); + } + + let (_, status) = response.unwrap(); + if status == 200 { + debug!("File: {file} is archived on S3."); + return Ok(true); + } + + debug!("File: {file} is not archived on S3."); + Ok(false) } async fn archive( &self, - _files: &[&str], - _base_directory: Option, + files: &[&str], + base_directory: Option, ) -> Result<(), ServerError> { - todo!("Archiving files on S3") + for path in files { + if !Path::new(path).exists() { + return Err(ServerError::FileToArchiveNotFound(path.to_string())); + } + + let source = self.copy_file_to_tmp(path).await?; + debug!("Archiving file: {source} on S3."); + let mut file = file::open(&source).await?; + let base_directory = base_directory.as_deref().unwrap_or_default(); + let destination = Path::new(&base_directory).join(path); + let destination_path = destination.to_str().unwrap_or_default().to_owned(); + let response = self + .bucket + .put_object_stream(&mut file, destination_path) + .await; + if let Err(error) = response { + error!("Cannot archive file: {path} on S3: {}", error); + fs::remove_file(&source).await?; + return Err(ServerError::CannotArchiveFile(path.to_string())); + } + + let response = response.unwrap(); + let status = response.status_code(); + if status == 200 { + debug!("Archived file: {path} on S3."); + fs::remove_file(&source).await?; + continue; + } + + error!("Cannot archive file: {path} on S3, received an invalid status code: {status}."); + fs::remove_file(&source).await?; + return Err(ServerError::CannotArchiveFile(path.to_string())); + } + Ok(()) } } diff --git a/server/src/channels/commands/archive_state.rs b/server/src/channels/commands/archive_state.rs index bfb015489..b61fecf8b 100644 --- a/server/src/channels/commands/archive_state.rs +++ b/server/src/channels/commands/archive_state.rs @@ -75,7 +75,7 @@ impl ServerCommand for ArchiveStateExecutor { let state_info_path = system.config.get_state_info_path(); info!("Archiving state..."); let archiver = system.archiver.as_ref().unwrap(); - let files = [state_log_path.as_ref(), state_info_path.as_ref()]; + let files = [state_info_path.as_ref(), state_log_path.as_ref()]; if let Err(error) = archiver.archive(&files, base_directory).await { error!("Failed to archive state. Error: {}", error); return; diff --git a/server/src/configs/displays.rs b/server/src/configs/displays.rs index d6a99036c..680a2f1c9 100644 --- a/server/src/configs/displays.rs +++ b/server/src/configs/displays.rs @@ -144,8 +144,11 @@ impl Display for S3ArchiverConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{{ key_id: {}, access_key: ******, region: {}, bucket: {} }}", - self.key_id, self.region, self.bucket + "{{ key_id: {}, key_secret: ******, bucket: {}, endpoint: {}. region: {} }}", + self.key_id, + self.bucket, + self.endpoint.as_deref().unwrap_or_default(), + self.region.as_deref().unwrap_or_default() ) } } diff --git a/server/src/configs/server.rs b/server/src/configs/server.rs index f09f89c12..efa8fe412 100644 --- a/server/src/configs/server.rs +++ b/server/src/configs/server.rs @@ -65,9 +65,11 @@ pub struct DiskArchiverConfig { #[derive(Debug, Deserialize, Serialize, Clone)] pub struct S3ArchiverConfig { pub key_id: String, - pub access_key: String, - pub region: String, + pub key_secret: String, pub bucket: String, + pub endpoint: Option, + pub region: Option, + pub tmp_upload_dir: String, } #[serde_as] diff --git a/server/src/configs/validators.rs b/server/src/configs/validators.rs index d50997d42..44072a308 100644 --- a/server/src/configs/validators.rs +++ b/server/src/configs/validators.rs @@ -182,15 +182,23 @@ impl Validatable for ArchiverConfig { )); } - if s3.access_key.is_empty() { + if s3.key_secret.is_empty() { return Err(ServerError::InvalidConfiguration( - "S3 archiver access key cannot be empty.".into(), + "S3 archiver key secret cannot be empty.".into(), )); } - if s3.region.is_empty() { + if s3.endpoint.is_none() && s3.region.is_none() { return Err(ServerError::InvalidConfiguration( - "S3 archiver region cannot be empty.".into(), + "S3 archiver endpoint or region must be set.".into(), + )); + } + + if s3.endpoint.as_deref().unwrap_or_default().is_empty() + && s3.region.as_deref().unwrap_or_default().is_empty() + { + return Err(ServerError::InvalidConfiguration( + "S3 archiver region or endpoint cannot be empty.".into(), )); } diff --git a/server/src/server_error.rs b/server/src/server_error.rs index e3e2ed252..e1336eeec 100644 --- a/server/src/server_error.rs +++ b/server/src/server_error.rs @@ -45,6 +45,12 @@ pub enum ServerError { CannotRemoveOldSegmentFiles, #[error("Cannot persist new segment files")] CannotPersistNewSegmentFiles, - #[error("Cannot archive segment: {0}")] - CannotArchiveSegment(String), + #[error("Cannot archive file: {0}")] + CannotArchiveFile(String), + #[error("Cannot initialize S3 archiver")] + CannotInitializeS3Archiver, + #[error("Invalid S3 credentials")] + InvalidS3Credentials, + #[error("File to archive not found: {0}")] + FileToArchiveNotFound(String), } diff --git a/server/src/streaming/systems/system.rs b/server/src/streaming/systems/system.rs index cbe4f2055..422b0090f 100644 --- a/server/src/streaming/systems/system.rs +++ b/server/src/streaming/systems/system.rs @@ -127,12 +127,15 @@ impl System { .clone() .expect("Disk archiver config is missing"), ))), - ArchiverKind::S3 => Some(Arc::new(S3Archiver::new( - archiver_config - .s3 - .clone() - .expect("S3 archiver config is missing"), - ))), + ArchiverKind::S3 => Some(Arc::new( + S3Archiver::new( + archiver_config + .s3 + .clone() + .expect("S3 archiver config is missing"), + ) + .expect("Failed to create S3 archiver"), + )), } } else { info!("Archiving is disabled.");