diff --git a/Cargo.lock b/Cargo.lock index 2f4ea9450..fa390c587 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4256,7 +4256,7 @@ dependencies = [ [[package]] name = "server" -version = "0.4.3" +version = "0.4.4" dependencies = [ "anyhow", "async-stream", diff --git a/server/Cargo.toml b/server/Cargo.toml index e97de6199..e755fc925 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.4.3" +version = "0.4.4" edition = "2021" build = "src/build.rs" diff --git a/server/src/configs/config_provider.rs b/server/src/configs/config_provider.rs index 1411cb587..a2a95c608 100644 --- a/server/src/configs/config_provider.rs +++ b/server/src/configs/config_provider.rs @@ -1,5 +1,6 @@ use crate::configs::server::ServerConfig; use crate::server_error::ServerError; +use crate::IGGY_ROOT_PASSWORD_ENV; use async_trait::async_trait; use figment::{ providers::{Format, Json, Toml}, @@ -12,6 +13,14 @@ use tracing::{debug, info}; const DEFAULT_CONFIG_PROVIDER: &str = "file"; const DEFAULT_CONFIG_PATH: &str = "configs/server.toml"; +const SECRET_KEYS: [&str; 6] = [ + IGGY_ROOT_PASSWORD_ENV, + "IGGY_DATA_MAINTENANCE_ARCHIVER_S3_KEY_SECRET", + "IGGY_HTTP_JWT_ENCODING_SECRET", + "IGGY_HTTP_JWT_DECODING_SECRET", + "IGGY_TCP_TLS_PASSWORD", + "IGGY_SYSTEM_ENCRYPTION_KEY", +]; #[async_trait] pub trait ConfigProvider { @@ -174,7 +183,7 @@ impl Provider for CustomEnvProvider { } let mut new_dict = Dict::new(); - for (key, value) in env::vars() { + for (key, mut value) in env::vars() { let env_key = key.to_uppercase(); if !env_key.starts_with(self.prefix.as_str()) { continue; @@ -184,10 +193,11 @@ impl Provider for CustomEnvProvider { .map(|k| k.to_lowercase()) .collect(); let env_var_value = Self::try_parse_value(&value); - info!( - "{} value changed to: {:?} from environment variable", - env_key, value - ); + if SECRET_KEYS.contains(&env_key.as_str()) { + value = "******".to_string(); + } + + info!("{env_key} value changed to: {value} from environment variable"); Self::insert_overridden_values_from_env( &source_dict, &mut new_dict, @@ -271,7 +281,7 @@ impl ConfigProvider for FileConfigProvider { match config_result { Ok(config) => { info!("Config loaded from path: '{}'", self.path); - info!("Using Config: {}", config); + info!("Using Config: {config}"); Ok(config) } Err(figment_error) => Err(ServerError::CannotLoadConfiguration(format!( diff --git a/server/src/configs/displays.rs b/server/src/configs/displays.rs index 680a2f1c9..7b3907f8e 100644 --- a/server/src/configs/displays.rs +++ b/server/src/configs/displays.rs @@ -126,10 +126,18 @@ impl Display for DataMaintenanceConfig { impl Display for ArchiverConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let disk = self + .disk + .as_ref() + .map_or("none".to_string(), |disk| disk.to_string()); + let s3 = self + .s3 + .as_ref() + .map_or("none".to_string(), |s3| s3.to_string()); write!( f, - "{{ enabled: {}, kind: {:?}, disk: {:?}, s3: {:?} }}", - self.enabled, self.kind, self.disk, self.s3 + "{{ enabled: {}, kind: {}, disk: {disk}, s3: {s3} }}", + self.enabled, self.kind, ) } } @@ -144,7 +152,7 @@ impl Display for S3ArchiverConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{{ key_id: {}, key_secret: ******, bucket: {}, endpoint: {}. region: {} }}", + "{{ key_id: {}, bucket: {}, endpoint: {}. region: {} }}", self.key_id, self.bucket, self.endpoint.as_deref().unwrap_or_default(), diff --git a/server/src/configs/validators.rs b/server/src/configs/validators.rs index 44072a308..306171c49 100644 --- a/server/src/configs/validators.rs +++ b/server/src/configs/validators.rs @@ -103,10 +103,14 @@ impl Validatable for CacheConfig { ); } - info!( + if self.enabled { + info!( "Cache configuration -> cache size set to {} ({:.2}% of total memory: {}, free memory: {}).", pretty_cache_limit, cache_percentage, pretty_total_memory, pretty_free_memory ); + } else { + info!("Cache configuration -> cache is disabled."); + } Ok(()) } diff --git a/server/src/lib.rs b/server/src/lib.rs index 1cfdfce7b..1ce39fe64 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -24,6 +24,9 @@ pub mod streaming; pub mod tcp; pub mod versioning; +const IGGY_ROOT_USERNAME_ENV: &str = "IGGY_ROOT_USERNAME"; +const IGGY_ROOT_PASSWORD_ENV: &str = "IGGY_ROOT_PASSWORD"; + pub(crate) fn map_toggle_str<'a>(enabled: bool) -> &'a str { match enabled { true => "enabled", diff --git a/server/src/streaming/segments/messages.rs b/server/src/streaming/segments/messages.rs index fcbd93b09..93153c002 100644 --- a/server/src/streaming/segments/messages.rs +++ b/server/src/streaming/segments/messages.rs @@ -245,12 +245,14 @@ impl Segment { Ok(()) } + fn store_offset_and_timestamp_index_for_batch( &mut self, batch_last_offset: u64, batch_max_timestamp: u64, ) { let relative_offset = (batch_last_offset - self.start_offset) as u32; + trace!("Storing index for relative_offset: {relative_offset}"); match (&mut self.indexes, &mut self.time_indexes) { (Some(indexes), Some(time_indexes)) => { indexes.push(Index { diff --git a/server/src/streaming/segments/storage.rs b/server/src/streaming/segments/storage.rs index 6c7419850..215add8e8 100644 --- a/server/src/streaming/segments/storage.rs +++ b/server/src/streaming/segments/storage.rs @@ -576,6 +576,14 @@ async fn load_batches_by_range( return Ok(()); } + trace!( + "Loading message batches by index range: {} [{}] - {} [{}], file size: {file_size}", + index_range.start.position, + index_range.start.relative_offset, + index_range.end.position, + index_range.end.relative_offset + ); + let mut reader = BufReader::with_capacity(BUF_READER_CAPACITY_BYTES, file); reader .seek(SeekFrom::Start(index_range.start.position as u64)) @@ -584,10 +592,9 @@ async fn load_batches_by_range( let mut read_bytes = index_range.start.position as u64; let mut last_batch_to_read = false; while !last_batch_to_read { - let batch_base_offset = reader - .read_u64_le() - .await - .map_err(|_| IggyError::CannotReadBatchBaseOffset)?; + let Ok(batch_base_offset) = reader.read_u64_le().await else { + break; + }; let batch_length = reader .read_u32_le() .await diff --git a/server/src/streaming/systems/users.rs b/server/src/streaming/systems/users.rs index d823d5c6d..09f45cc91 100644 --- a/server/src/streaming/systems/users.rs +++ b/server/src/streaming/systems/users.rs @@ -5,6 +5,7 @@ use crate::streaming::session::Session; use crate::streaming::systems::system::System; use crate::streaming::users::user::User; use crate::streaming::utils::crypto; +use crate::{IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV}; use iggy::error::IggyError; use iggy::identifier::{IdKind, Identifier}; use iggy::locking::IggySharedMutFn; @@ -78,8 +79,8 @@ impl System { } fn create_root_user() -> User { - let username = env::var("IGGY_ROOT_USERNAME"); - let password = env::var("IGGY_ROOT_PASSWORD"); + let username = env::var(IGGY_ROOT_USERNAME_ENV); + let password = env::var(IGGY_ROOT_PASSWORD_ENV); if (username.is_ok() && password.is_err()) || (username.is_err() && password.is_ok()) { panic!("When providing the custom root user credentials, both username and password must be set."); }