Skip to content

Commit

Permalink
chore(bottlecap): switch flushing strategy to race (#318)
Browse files Browse the repository at this point in the history
* feat: race flush

* refactor: periodic only when configured

* fmt

* when flushing strategy is default, set periodic flush tick to `1s`

* on `End`, never flush until the end of the invocation

* remove `tokio_unstable` feature for building

* remove debug comment

* remove `invocation_times` mod

* update `flush_control.rs`

* use `flush_control` in main

* allow `end,<ms>` strategy

allows to flush periodically over a given amount of seconds and at the end

* update `debug` comment for flushing

* simplify logic for flush strategy parsing

* remove log that could spam debug

* refactor code and add unit test

---------

Co-authored-by: jordan gonzález <30836115+duncanista@users.noreply.github.com>
Co-authored-by: alexgallotta <5581237+alexgallotta@users.noreply.github.com>
  • Loading branch information
3 people authored Jul 22, 2024
1 parent e1a622f commit 5b4399e
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 197 deletions.
57 changes: 33 additions & 24 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,23 @@ use bottlecap::{
invocation_context::{InvocationContext, InvocationContextBuffer},
},
logger,
logs::{agent::LogsAgent, flusher::Flusher as LogsFlusher},
logs::{
agent::LogsAgent,
flusher::{build_fqdn_logs, Flusher as LogsFlusher},
},
metrics::{
aggregator::Aggregator as MetricsAggregator,
constants::CONTEXTS,
dogstatsd::{DogStatsD, DogStatsDConfig},
enhanced::lambda::Lambda as enhanced_metrics,
flusher::Flusher as MetricsFlusher,
flusher::{build_fqdn_metrics, Flusher as MetricsFlusher},
},
secrets::decrypt,
tags::{lambda, provider::Provider as TagProvider},
telemetry::{
self,
client::TelemetryApiClient,
events::TelemetryEvent,
events::{Status, TelemetryRecord},
events::{Status, TelemetryEvent, TelemetryRecord},
listener::TelemetryListener,
},
traces::{
Expand All @@ -49,6 +51,8 @@ use bottlecap::{
};
use datadog_trace_obfuscation::obfuscation_config;
use decrypt::resolve_secrets;
use reqwest::Client;
use serde::Deserialize;
use std::{
collections::hash_map,
collections::HashMap,
Expand All @@ -61,17 +65,12 @@ use std::{
sync::{Arc, Mutex},
};
use telemetry::listener::TelemetryListenerConfig;
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex as TokioMutex;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
use tracing_subscriber::EnvFilter;

use bottlecap::logs::flusher::build_fqdn_logs;
use bottlecap::metrics::flusher::build_fqdn_metrics;
use reqwest::Client;
use serde::Deserialize;
use tokio::sync::mpsc::Sender;
use tokio_util::sync::CancellationToken;

#[derive(Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct RegisterResponse {
Expand Down Expand Up @@ -327,10 +326,13 @@ async fn extension_loop_active(
let telemetry_listener_cancel_token =
setup_telemetry_client(&r.extension_id, logs_agent_channel).await?;

let mut flush_control = FlushControl::new(config.serverless_flush_strategy);
let flush_control = FlushControl::new(config.serverless_flush_strategy);
let mut invocation_context_buffer = InvocationContextBuffer::default();
let mut shutdown = false;

let mut flush_interval = flush_control.get_flush_interval();
flush_interval.tick().await; // discard first tick, which is instantaneous

loop {
let evt = next_event(client, &r.extension_id).await;
match evt {
Expand Down Expand Up @@ -360,10 +362,17 @@ async fn extension_loop_active(
}
// Block until we get something from the telemetry API
// Check if flush logic says we should block and flush or not
if flush_control.should_flush() || shutdown {
loop {
let received = event_bus.rx.recv().await;
if let Some(event) = received {
loop {
tokio::select! {
_ = flush_interval.tick() => {
tokio::join!(
logs_flusher.flush(),
metrics_flusher.flush(),
trace_flusher.manual_flush(),
stats_flusher.manual_flush()
);
}
Some(event) = event_bus.rx.recv() => {
match event {
Event::Metric(event) => {
debug!("Metric event: {:?}", event);
Expand Down Expand Up @@ -411,12 +420,14 @@ async fn extension_loop_active(
// pass the invocation deadline to
// flush tasks here, so they can
// retry if we have more time
tokio::join!(
logs_flusher.flush(),
metrics_flusher.flush(),
trace_flusher.manual_flush(),
stats_flusher.manual_flush()
);
if flush_control.should_flush_end() {
tokio::join!(
logs_flusher.flush(),
metrics_flusher.flush(),
trace_flusher.manual_flush(),
stats_flusher.manual_flush()
);
}
break;
}
TelemetryRecord::PlatformReport {
Expand Down Expand Up @@ -453,8 +464,6 @@ async fn extension_loop_active(
}
},
}
} else {
error!("could not get the event");
}
}
}
Expand Down
83 changes: 66 additions & 17 deletions bottlecap/src/config/flush_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ pub struct PeriodicStrategy {
pub enum FlushStrategy {
Default,
End,
EndPeriodically(PeriodicStrategy),
Periodically(PeriodicStrategy),
}

// Deserialize for FlushStrategy
// Flush Strategy can be either "end" or "periodically,<ms>"
// Flush Strategy can be either "end", "end,<ms>", or "periodically,<ms>"
impl<'de> Deserialize<'de> for FlushStrategy {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
Expand All @@ -26,22 +27,22 @@ impl<'de> Deserialize<'de> for FlushStrategy {
} else {
let mut split_value = value.as_str().split(',');
// "periodically,60000"
match split_value.next() {
Some(first_value) if first_value.starts_with("periodically") => {
let interval = split_value.next();
// "60000"
if let Some(interval) = interval {
if let Ok(parsed_interval) = interval.parse() {
return Ok(FlushStrategy::Periodically(PeriodicStrategy {
interval: parsed_interval,
}));
}
debug!("Invalid flush interval: {}, using default", interval);
Ok(FlushStrategy::Default)
} else {
debug!("Invalid flush strategy: {}, using default", value);
Ok(FlushStrategy::Default)
}
// "end,1000"
let strategy = split_value.next();
let interval: Option<u64> = split_value.next().and_then(|v| v.parse().ok());

match (strategy, interval) {
(Some("periodically"), Some(interval)) => {
Ok(FlushStrategy::Periodically(PeriodicStrategy { interval }))
}
(Some("end"), Some(interval)) => {
Ok(FlushStrategy::EndPeriodically(PeriodicStrategy {
interval,
}))
}
(Some(strategy), _) => {
debug!("Invalid flush interval: {}, using default", strategy);
Ok(FlushStrategy::Default)
}
_ => {
debug!("Invalid flush strategy: {}, using default", value);
Expand All @@ -51,3 +52,51 @@ impl<'de> Deserialize<'de> for FlushStrategy {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn deserialize_end() {
let flush_strategy: FlushStrategy = serde_json::from_str("\"end\"").unwrap();
assert_eq!(flush_strategy, FlushStrategy::End);
}

#[test]
fn deserialize_periodically() {
let flush_strategy: FlushStrategy = serde_json::from_str("\"periodically,60000\"").unwrap();
assert_eq!(
flush_strategy,
FlushStrategy::Periodically(PeriodicStrategy { interval: 60000 })
);
}

#[test]
fn deserialize_end_periodically() {
let flush_strategy: FlushStrategy = serde_json::from_str("\"end,1000\"").unwrap();
assert_eq!(
flush_strategy,
FlushStrategy::EndPeriodically(PeriodicStrategy { interval: 1000 })
);
}

#[test]
fn deserialize_invalid() {
let flush_strategy: FlushStrategy = serde_json::from_str("\"invalid\"").unwrap();
assert_eq!(flush_strategy, FlushStrategy::Default);
}

#[test]
fn deserialize_invalid_interval() {
let flush_strategy: FlushStrategy =
serde_json::from_str("\"periodically,invalid\"").unwrap();
assert_eq!(flush_strategy, FlushStrategy::Default);
}

#[test]
fn deserialize_invalid_end_interval() {
let flush_strategy: FlushStrategy = serde_json::from_str("\"end,invalid\"").unwrap();
assert_eq!(flush_strategy, FlushStrategy::Default);
}
}
121 changes: 54 additions & 67 deletions bottlecap/src/lifecycle/flush_control.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use crate::config::flush_strategy::FlushStrategy;
use crate::lifecycle::invocation_times::InvocationTimes;
use ::std::time;
use tracing::debug;
use tokio::time::Interval;

const TWENTY_SECONDS: u64 = 20 * 1000;
const DEFAULT_FLUSH_INTERVAL: u64 = 1000; // 1s

#[derive(Clone, Copy, Debug, PartialEq)]
pub struct FlushControl {
pub last_flush: u64,
flush_strategy: FlushStrategy,
invocation_times: InvocationTimes,
}

// FlushControl is called at the end of every invocation and decides whether or not we should flush
Expand All @@ -26,49 +23,24 @@ pub struct FlushControl {
impl FlushControl {
#[must_use]
pub fn new(flush_strategy: FlushStrategy) -> FlushControl {
FlushControl {
flush_strategy,
last_flush: 0,
invocation_times: InvocationTimes::new(),
}
FlushControl { flush_strategy }
}

pub fn should_flush(&mut self) -> bool {
let now = match time::SystemTime::now().duration_since(time::UNIX_EPOCH) {
Ok(now) => now.as_secs(),
Err(e) => {
debug!("Failed to get current time: {:?}", e);
return false;
}
};
self.invocation_times.add(now);
#[must_use]
pub fn should_flush_end(&self) -> bool {
!matches!(&self.flush_strategy, FlushStrategy::Periodically(_))
}

#[must_use]
pub fn get_flush_interval(&self) -> Interval {
match &self.flush_strategy {
FlushStrategy::Default => {
if self.invocation_times.should_adapt_to_periodic(now) {
let should_periodic_flush = self.should_periodic_flush(now, TWENTY_SECONDS);
debug!(
"Adapting over to periodic flush strategy. should_periodic_flush: {}",
should_periodic_flush
);
return should_periodic_flush;
}
debug!("Not enough invocations to adapt to periodic flush, flushing at the end of the invocation");
self.last_flush = now;
true
tokio::time::interval(tokio::time::Duration::from_millis(DEFAULT_FLUSH_INTERVAL))
}
FlushStrategy::Periodically(periodic) => {
self.should_periodic_flush(now, periodic.interval)
FlushStrategy::Periodically(p) | FlushStrategy::EndPeriodically(p) => {
tokio::time::interval(tokio::time::Duration::from_millis(p.interval))
}
FlushStrategy::End => true,
}
}

fn should_periodic_flush(&mut self, now: u64, interval: u64) -> bool {
if now - self.last_flush > (interval / 1000) {
self.last_flush = now;
true
} else {
false
FlushStrategy::End => tokio::time::interval(tokio::time::Duration::MAX),
}
}
}
Expand All @@ -78,32 +50,47 @@ mod tests {
use super::*;
use crate::config::flush_strategy::PeriodicStrategy;

#[test]
fn should_flush_default_end() {
let mut flush_control = super::FlushControl::new(FlushStrategy::Default);
assert!(flush_control.should_flush());
}
#[test]
fn should_flush_default_periodic() {
const LOOKBACK_COUNT: usize = 20;
let mut flush_control = super::FlushControl::new(FlushStrategy::Default);
for _ in 0..LOOKBACK_COUNT - 1 {
assert!(flush_control.should_flush());
}
assert!(!flush_control.should_flush());
}
#[test]
fn should_flush_end() {
let mut flush_control = super::FlushControl::new(FlushStrategy::End);
assert!(flush_control.should_flush());
let flush_control = FlushControl::new(FlushStrategy::Default);
assert!(flush_control.should_flush_end());

let flush_control = FlushControl::new(FlushStrategy::EndPeriodically(PeriodicStrategy {
interval: 1,
}));
assert!(flush_control.should_flush_end());

let flush_control = FlushControl::new(FlushStrategy::End);
assert!(flush_control.should_flush_end());

let flush_control = FlushControl::new(FlushStrategy::Periodically(PeriodicStrategy {
interval: 1,
}));
assert!(!flush_control.should_flush_end());
}
#[test]
fn should_flush_periodically() {
let mut flush_control =
super::FlushControl::new(FlushStrategy::Periodically(PeriodicStrategy {
interval: 1,
}));
assert!(flush_control.should_flush());
assert!(!flush_control.should_flush());

#[tokio::test]
async fn get_flush_interval() {
let flush_control = FlushControl::new(FlushStrategy::Default);
assert_eq!(
flush_control.get_flush_interval().period().as_millis(),
DEFAULT_FLUSH_INTERVAL as u128
);

let flush_control = FlushControl::new(FlushStrategy::Periodically(PeriodicStrategy {
interval: 1,
}));
assert_eq!(flush_control.get_flush_interval().period().as_millis(), 1);

let flush_control = FlushControl::new(FlushStrategy::EndPeriodically(PeriodicStrategy {
interval: 1,
}));
assert_eq!(flush_control.get_flush_interval().period().as_millis(), 1);

let flush_control = FlushControl::new(FlushStrategy::End);
assert_eq!(
flush_control.get_flush_interval().period().as_millis(),
tokio::time::Duration::MAX.as_millis()
);
}
}
Loading

0 comments on commit 5b4399e

Please sign in to comment.