From 5b4399e5061143dc0740c5cee08eab975d246831 Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Mon, 22 Jul 2024 14:01:26 -0400 Subject: [PATCH] chore(bottlecap): switch flushing strategy to race (#318) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: race flush * refactor: periodic only when configured * fmt * when flushing strategy is default, set periodic flush tick to `1s` * on `End`, never flush until the end of the invocation * remove `tokio_unstable` feature for building * remove debug comment * remove `invocation_times` mod * update `flush_control.rs` * use `flush_control` in main * allow `end,` strategy allows to flush periodically over a given amount of seconds and at the end * update `debug` comment for flushing * simplify logic for flush strategy parsing * remove log that could spam debug * refactor code and add unit test --------- Co-authored-by: jordan gonzález <30836115+duncanista@users.noreply.github.com> Co-authored-by: alexgallotta <5581237+alexgallotta@users.noreply.github.com> --- bottlecap/src/bin/bottlecap/main.rs | 57 +++++---- bottlecap/src/config/flush_strategy.rs | 83 +++++++++++--- bottlecap/src/lifecycle/flush_control.rs | 121 +++++++++----------- bottlecap/src/lifecycle/invocation_times.rs | 88 -------------- bottlecap/src/lifecycle/mod.rs | 1 - 5 files changed, 153 insertions(+), 197 deletions(-) delete mode 100644 bottlecap/src/lifecycle/invocation_times.rs diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 3d5435ea..ca942156 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -19,21 +19,23 @@ use bottlecap::{ invocation_context::{InvocationContext, InvocationContextBuffer}, }, logger, - logs::{agent::LogsAgent, flusher::Flusher as LogsFlusher}, + logs::{ + agent::LogsAgent, + flusher::{build_fqdn_logs, Flusher as LogsFlusher}, + }, metrics::{ aggregator::Aggregator as MetricsAggregator, constants::CONTEXTS, dogstatsd::{DogStatsD, DogStatsDConfig}, enhanced::lambda::Lambda as enhanced_metrics, - flusher::Flusher as MetricsFlusher, + flusher::{build_fqdn_metrics, Flusher as MetricsFlusher}, }, secrets::decrypt, tags::{lambda, provider::Provider as TagProvider}, telemetry::{ self, client::TelemetryApiClient, - events::TelemetryEvent, - events::{Status, TelemetryRecord}, + events::{Status, TelemetryEvent, TelemetryRecord}, listener::TelemetryListener, }, traces::{ @@ -49,6 +51,8 @@ use bottlecap::{ }; use datadog_trace_obfuscation::obfuscation_config; use decrypt::resolve_secrets; +use reqwest::Client; +use serde::Deserialize; use std::{ collections::hash_map, collections::HashMap, @@ -61,17 +65,12 @@ use std::{ sync::{Arc, Mutex}, }; use telemetry::listener::TelemetryListenerConfig; +use tokio::sync::mpsc::Sender; use tokio::sync::Mutex as TokioMutex; +use tokio_util::sync::CancellationToken; use tracing::{debug, error}; use tracing_subscriber::EnvFilter; -use bottlecap::logs::flusher::build_fqdn_logs; -use bottlecap::metrics::flusher::build_fqdn_metrics; -use reqwest::Client; -use serde::Deserialize; -use tokio::sync::mpsc::Sender; -use tokio_util::sync::CancellationToken; - #[derive(Clone, Deserialize)] #[serde(rename_all = "camelCase")] struct RegisterResponse { @@ -327,10 +326,13 @@ async fn extension_loop_active( let telemetry_listener_cancel_token = setup_telemetry_client(&r.extension_id, logs_agent_channel).await?; - let mut flush_control = FlushControl::new(config.serverless_flush_strategy); + let flush_control = FlushControl::new(config.serverless_flush_strategy); let mut invocation_context_buffer = InvocationContextBuffer::default(); let mut shutdown = false; + let mut flush_interval = flush_control.get_flush_interval(); + flush_interval.tick().await; // discard first tick, which is instantaneous + loop { let evt = next_event(client, &r.extension_id).await; match evt { @@ -360,10 +362,17 @@ async fn extension_loop_active( } // Block until we get something from the telemetry API // Check if flush logic says we should block and flush or not - if flush_control.should_flush() || shutdown { - loop { - let received = event_bus.rx.recv().await; - if let Some(event) = received { + loop { + tokio::select! { + _ = flush_interval.tick() => { + tokio::join!( + logs_flusher.flush(), + metrics_flusher.flush(), + trace_flusher.manual_flush(), + stats_flusher.manual_flush() + ); + } + Some(event) = event_bus.rx.recv() => { match event { Event::Metric(event) => { debug!("Metric event: {:?}", event); @@ -411,12 +420,14 @@ async fn extension_loop_active( // pass the invocation deadline to // flush tasks here, so they can // retry if we have more time - tokio::join!( - logs_flusher.flush(), - metrics_flusher.flush(), - trace_flusher.manual_flush(), - stats_flusher.manual_flush() - ); + if flush_control.should_flush_end() { + tokio::join!( + logs_flusher.flush(), + metrics_flusher.flush(), + trace_flusher.manual_flush(), + stats_flusher.manual_flush() + ); + } break; } TelemetryRecord::PlatformReport { @@ -453,8 +464,6 @@ async fn extension_loop_active( } }, } - } else { - error!("could not get the event"); } } } diff --git a/bottlecap/src/config/flush_strategy.rs b/bottlecap/src/config/flush_strategy.rs index 1dae5bf8..525ec422 100644 --- a/bottlecap/src/config/flush_strategy.rs +++ b/bottlecap/src/config/flush_strategy.rs @@ -10,11 +10,12 @@ pub struct PeriodicStrategy { pub enum FlushStrategy { Default, End, + EndPeriodically(PeriodicStrategy), Periodically(PeriodicStrategy), } // Deserialize for FlushStrategy -// Flush Strategy can be either "end" or "periodically," +// Flush Strategy can be either "end", "end,", or "periodically," impl<'de> Deserialize<'de> for FlushStrategy { fn deserialize(deserializer: D) -> Result where @@ -26,22 +27,22 @@ impl<'de> Deserialize<'de> for FlushStrategy { } else { let mut split_value = value.as_str().split(','); // "periodically,60000" - match split_value.next() { - Some(first_value) if first_value.starts_with("periodically") => { - let interval = split_value.next(); - // "60000" - if let Some(interval) = interval { - if let Ok(parsed_interval) = interval.parse() { - return Ok(FlushStrategy::Periodically(PeriodicStrategy { - interval: parsed_interval, - })); - } - debug!("Invalid flush interval: {}, using default", interval); - Ok(FlushStrategy::Default) - } else { - debug!("Invalid flush strategy: {}, using default", value); - Ok(FlushStrategy::Default) - } + // "end,1000" + let strategy = split_value.next(); + let interval: Option = split_value.next().and_then(|v| v.parse().ok()); + + match (strategy, interval) { + (Some("periodically"), Some(interval)) => { + Ok(FlushStrategy::Periodically(PeriodicStrategy { interval })) + } + (Some("end"), Some(interval)) => { + Ok(FlushStrategy::EndPeriodically(PeriodicStrategy { + interval, + })) + } + (Some(strategy), _) => { + debug!("Invalid flush interval: {}, using default", strategy); + Ok(FlushStrategy::Default) } _ => { debug!("Invalid flush strategy: {}, using default", value); @@ -51,3 +52,51 @@ impl<'de> Deserialize<'de> for FlushStrategy { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn deserialize_end() { + let flush_strategy: FlushStrategy = serde_json::from_str("\"end\"").unwrap(); + assert_eq!(flush_strategy, FlushStrategy::End); + } + + #[test] + fn deserialize_periodically() { + let flush_strategy: FlushStrategy = serde_json::from_str("\"periodically,60000\"").unwrap(); + assert_eq!( + flush_strategy, + FlushStrategy::Periodically(PeriodicStrategy { interval: 60000 }) + ); + } + + #[test] + fn deserialize_end_periodically() { + let flush_strategy: FlushStrategy = serde_json::from_str("\"end,1000\"").unwrap(); + assert_eq!( + flush_strategy, + FlushStrategy::EndPeriodically(PeriodicStrategy { interval: 1000 }) + ); + } + + #[test] + fn deserialize_invalid() { + let flush_strategy: FlushStrategy = serde_json::from_str("\"invalid\"").unwrap(); + assert_eq!(flush_strategy, FlushStrategy::Default); + } + + #[test] + fn deserialize_invalid_interval() { + let flush_strategy: FlushStrategy = + serde_json::from_str("\"periodically,invalid\"").unwrap(); + assert_eq!(flush_strategy, FlushStrategy::Default); + } + + #[test] + fn deserialize_invalid_end_interval() { + let flush_strategy: FlushStrategy = serde_json::from_str("\"end,invalid\"").unwrap(); + assert_eq!(flush_strategy, FlushStrategy::Default); + } +} diff --git a/bottlecap/src/lifecycle/flush_control.rs b/bottlecap/src/lifecycle/flush_control.rs index 6b9e968b..4502b51a 100644 --- a/bottlecap/src/lifecycle/flush_control.rs +++ b/bottlecap/src/lifecycle/flush_control.rs @@ -1,14 +1,11 @@ use crate::config::flush_strategy::FlushStrategy; -use crate::lifecycle::invocation_times::InvocationTimes; -use ::std::time; -use tracing::debug; +use tokio::time::Interval; -const TWENTY_SECONDS: u64 = 20 * 1000; +const DEFAULT_FLUSH_INTERVAL: u64 = 1000; // 1s +#[derive(Clone, Copy, Debug, PartialEq)] pub struct FlushControl { - pub last_flush: u64, flush_strategy: FlushStrategy, - invocation_times: InvocationTimes, } // FlushControl is called at the end of every invocation and decides whether or not we should flush @@ -26,49 +23,24 @@ pub struct FlushControl { impl FlushControl { #[must_use] pub fn new(flush_strategy: FlushStrategy) -> FlushControl { - FlushControl { - flush_strategy, - last_flush: 0, - invocation_times: InvocationTimes::new(), - } + FlushControl { flush_strategy } } - pub fn should_flush(&mut self) -> bool { - let now = match time::SystemTime::now().duration_since(time::UNIX_EPOCH) { - Ok(now) => now.as_secs(), - Err(e) => { - debug!("Failed to get current time: {:?}", e); - return false; - } - }; - self.invocation_times.add(now); + #[must_use] + pub fn should_flush_end(&self) -> bool { + !matches!(&self.flush_strategy, FlushStrategy::Periodically(_)) + } + + #[must_use] + pub fn get_flush_interval(&self) -> Interval { match &self.flush_strategy { FlushStrategy::Default => { - if self.invocation_times.should_adapt_to_periodic(now) { - let should_periodic_flush = self.should_periodic_flush(now, TWENTY_SECONDS); - debug!( - "Adapting over to periodic flush strategy. should_periodic_flush: {}", - should_periodic_flush - ); - return should_periodic_flush; - } - debug!("Not enough invocations to adapt to periodic flush, flushing at the end of the invocation"); - self.last_flush = now; - true + tokio::time::interval(tokio::time::Duration::from_millis(DEFAULT_FLUSH_INTERVAL)) } - FlushStrategy::Periodically(periodic) => { - self.should_periodic_flush(now, periodic.interval) + FlushStrategy::Periodically(p) | FlushStrategy::EndPeriodically(p) => { + tokio::time::interval(tokio::time::Duration::from_millis(p.interval)) } - FlushStrategy::End => true, - } - } - - fn should_periodic_flush(&mut self, now: u64, interval: u64) -> bool { - if now - self.last_flush > (interval / 1000) { - self.last_flush = now; - true - } else { - false + FlushStrategy::End => tokio::time::interval(tokio::time::Duration::MAX), } } } @@ -78,32 +50,47 @@ mod tests { use super::*; use crate::config::flush_strategy::PeriodicStrategy; - #[test] - fn should_flush_default_end() { - let mut flush_control = super::FlushControl::new(FlushStrategy::Default); - assert!(flush_control.should_flush()); - } - #[test] - fn should_flush_default_periodic() { - const LOOKBACK_COUNT: usize = 20; - let mut flush_control = super::FlushControl::new(FlushStrategy::Default); - for _ in 0..LOOKBACK_COUNT - 1 { - assert!(flush_control.should_flush()); - } - assert!(!flush_control.should_flush()); - } #[test] fn should_flush_end() { - let mut flush_control = super::FlushControl::new(FlushStrategy::End); - assert!(flush_control.should_flush()); + let flush_control = FlushControl::new(FlushStrategy::Default); + assert!(flush_control.should_flush_end()); + + let flush_control = FlushControl::new(FlushStrategy::EndPeriodically(PeriodicStrategy { + interval: 1, + })); + assert!(flush_control.should_flush_end()); + + let flush_control = FlushControl::new(FlushStrategy::End); + assert!(flush_control.should_flush_end()); + + let flush_control = FlushControl::new(FlushStrategy::Periodically(PeriodicStrategy { + interval: 1, + })); + assert!(!flush_control.should_flush_end()); } - #[test] - fn should_flush_periodically() { - let mut flush_control = - super::FlushControl::new(FlushStrategy::Periodically(PeriodicStrategy { - interval: 1, - })); - assert!(flush_control.should_flush()); - assert!(!flush_control.should_flush()); + + #[tokio::test] + async fn get_flush_interval() { + let flush_control = FlushControl::new(FlushStrategy::Default); + assert_eq!( + flush_control.get_flush_interval().period().as_millis(), + DEFAULT_FLUSH_INTERVAL as u128 + ); + + let flush_control = FlushControl::new(FlushStrategy::Periodically(PeriodicStrategy { + interval: 1, + })); + assert_eq!(flush_control.get_flush_interval().period().as_millis(), 1); + + let flush_control = FlushControl::new(FlushStrategy::EndPeriodically(PeriodicStrategy { + interval: 1, + })); + assert_eq!(flush_control.get_flush_interval().period().as_millis(), 1); + + let flush_control = FlushControl::new(FlushStrategy::End); + assert_eq!( + flush_control.get_flush_interval().period().as_millis(), + tokio::time::Duration::MAX.as_millis() + ); } } diff --git a/bottlecap/src/lifecycle/invocation_times.rs b/bottlecap/src/lifecycle/invocation_times.rs deleted file mode 100644 index f12499d5..00000000 --- a/bottlecap/src/lifecycle/invocation_times.rs +++ /dev/null @@ -1,88 +0,0 @@ -const LOOKBACK_COUNT: usize = 20; -const ONE_TWENTY_SECONDS: f64 = 120.0; - -pub(crate) struct InvocationTimes { - times: [u64; LOOKBACK_COUNT], - head: usize, -} - -impl InvocationTimes { - pub(crate) fn new() -> InvocationTimes { - InvocationTimes { - times: [0; LOOKBACK_COUNT], - head: 0, - } - } - - pub(crate) fn add(&mut self, timestamp: u64) { - self.times[self.head] = timestamp; - self.head = (self.head + 1) % LOOKBACK_COUNT; - } - - pub(crate) fn should_adapt_to_periodic(&self, now: u64) -> bool { - let mut count = 0; - let mut last = 0; - for time in &self.times { - if *time != 0 { - count += 1; - last = *time; - } - } - // If we haven't seen enough invocations, we should flush - if count < LOOKBACK_COUNT { - return false; - } - let elapsed = now - last; - (elapsed as f64 / (count - 1) as f64) < ONE_TWENTY_SECONDS - } -} - -#[cfg(test)] -mod tests { - use crate::lifecycle::invocation_times; - - #[test] - fn new() { - let invocation_times = invocation_times::InvocationTimes::new(); - assert_eq!( - invocation_times.times, - [0; invocation_times::LOOKBACK_COUNT] - ); - assert_eq!(invocation_times.head, 0); - } - - #[test] - fn insertion() { - let mut invocation_times = invocation_times::InvocationTimes::new(); - let timestamp = 1; - invocation_times.add(timestamp); - assert_eq!(invocation_times.times[0], timestamp); - assert_eq!(invocation_times.head, 1); - assert!(!invocation_times.should_adapt_to_periodic(1)); - } - - #[test] - fn insertion_with_full_buffer_fast_invokes() { - let mut invocation_times = invocation_times::InvocationTimes::new(); - for i in 0..=invocation_times::LOOKBACK_COUNT { - invocation_times.add(i as u64); - } - // should wrap around - assert_eq!(invocation_times.times[0], 20); - assert_eq!(invocation_times.head, 1); - assert!(invocation_times.should_adapt_to_periodic(21)); - } - - #[test] - fn insertion_with_full_buffer_slow_invokes() { - let mut invocation_times = invocation_times::InvocationTimes::new(); - invocation_times.add(1_u64); - for i in 0..invocation_times::LOOKBACK_COUNT { - invocation_times.add((i + 5000) as u64); - } - // should wrap around - assert_eq!(invocation_times.times[0], 5019); - assert_eq!(invocation_times.head, 1); - assert!(!invocation_times.should_adapt_to_periodic(10000)); - } -} diff --git a/bottlecap/src/lifecycle/mod.rs b/bottlecap/src/lifecycle/mod.rs index 1f00e517..1c0924d8 100644 --- a/bottlecap/src/lifecycle/mod.rs +++ b/bottlecap/src/lifecycle/mod.rs @@ -1,3 +1,2 @@ pub mod flush_control; pub mod invocation_context; -mod invocation_times;