From 5cc7aacc3ce641f94faffa06f24ac5cbeff08ce6 Mon Sep 17 00:00:00 2001 From: "Matthew M. Keeler" Date: Thu, 5 Dec 2024 12:33:43 -0500 Subject: [PATCH] feat: Add option to enable compression of event payloads (#102) --- contract-tests/Cargo.toml | 2 +- contract-tests/src/client_entity.rs | 3 ++ contract-tests/src/main.rs | 3 ++ launchdarkly-server-sdk/Cargo.toml | 2 + .../src/data_source_builders.rs | 2 +- .../src/events/processor_builders.rs | 18 ++++++++ launchdarkly-server-sdk/src/events/sender.rs | 41 ++++++++++++++++--- launchdarkly-server-sdk/src/test_common.rs | 2 +- 8 files changed, 65 insertions(+), 8 deletions(-) diff --git a/contract-tests/Cargo.toml b/contract-tests/Cargo.toml index a27aca0..2e66d1b 100644 --- a/contract-tests/Cargo.toml +++ b/contract-tests/Cargo.toml @@ -10,7 +10,7 @@ actix = "0.13.0" actix-web = "4.2.1" env_logger = "0.10.0" log = "0.4.14" -launchdarkly-server-sdk = { path = "../launchdarkly-server-sdk/", default-features = false} +launchdarkly-server-sdk = { path = "../launchdarkly-server-sdk/", default-features = false, features = ["event-compression"]} serde = { version = "1.0.132", features = ["derive"] } serde_json = "1.0.73" futures = "0.3.12" diff --git a/contract-tests/src/client_entity.rs b/contract-tests/src/client_entity.rs index 4b3b158..3538178 100644 --- a/contract-tests/src/client_entity.rs +++ b/contract-tests/src/client_entity.rs @@ -113,6 +113,9 @@ impl ClientEntity { processor_builder.capacity(capacity); } processor_builder.all_attributes_private(events.all_attributes_private); + if let Some(e) = events.enable_gzip { + processor_builder.compress_events(e); + } if let Some(interval) = events.flush_interval_ms { processor_builder.flush_interval(Duration::from_millis(interval)); diff --git a/contract-tests/src/main.rs b/contract-tests/src/main.rs index d5f0d7d..575caa8 100644 --- a/contract-tests/src/main.rs +++ b/contract-tests/src/main.rs @@ -41,6 +41,7 @@ pub struct EventParameters { pub enable_diagnostics: bool, #[serde(default = "bool::default")] pub all_attributes_private: bool, + pub enable_gzip: Option, pub global_private_attributes: Option>, pub flush_interval_ms: Option, #[serde(default = "bool::default")] @@ -110,6 +111,8 @@ async fn status() -> impl Responder { "migrations".to_string(), "event-sampling".to_string(), "client-prereq-events".to_string(), + "event-gzip".to_string(), + "optional-event-gzip".to_string(), ], }) } diff --git a/launchdarkly-server-sdk/Cargo.toml b/launchdarkly-server-sdk/Cargo.toml index 2602f50..a82942c 100644 --- a/launchdarkly-server-sdk/Cargo.toml +++ b/launchdarkly-server-sdk/Cargo.toml @@ -35,6 +35,7 @@ uuid = {version = "1.2.2", features = ["v4"] } hyper = { version = "0.14.19", features = ["client", "http1", "http2", "tcp"] } hyper-rustls = { version = "0.24.1" , optional = true} rand = "0.8" +flate2 = { version = "1.0.35", optional = true } [dev-dependencies] maplit = "1.0.1" @@ -50,6 +51,7 @@ reqwest = { version = "0.12.4", features = ["json"] } [features] default = ["rustls"] rustls = ["hyper-rustls/http1", "hyper-rustls/http2", "eventsource-client/rustls"] +event-compression = ["flate2"] [[example]] name = "print_flags" diff --git a/launchdarkly-server-sdk/src/data_source_builders.rs b/launchdarkly-server-sdk/src/data_source_builders.rs index 530253a..ec32390 100644 --- a/launchdarkly-server-sdk/src/data_source_builders.rs +++ b/launchdarkly-server-sdk/src/data_source_builders.rs @@ -345,7 +345,7 @@ impl DataSourceFactory for MockDataSourceBuilder { _sdk_key: &str, _tags: Option, ) -> Result, BuildError> { - return Ok(self.data_source.as_ref().unwrap().clone()); + Ok(self.data_source.as_ref().unwrap().clone()) } fn to_owned(&self) -> Box { diff --git a/launchdarkly-server-sdk/src/events/processor_builders.rs b/launchdarkly-server-sdk/src/events/processor_builders.rs index a1574a0..75b7e8e 100644 --- a/launchdarkly-server-sdk/src/events/processor_builders.rs +++ b/launchdarkly-server-sdk/src/events/processor_builders.rs @@ -84,6 +84,7 @@ pub struct EventProcessorBuilder { private_attributes: HashSet, connector: Option, omit_anonymous_contexts: bool, + compress_events: bool, // diagnostic_recording_interval: Duration } @@ -109,6 +110,7 @@ where } let event_sender_result: Result, BuildError> = + // NOTE: This would only be possible under unit testing conditions. if let Some(event_sender) = &self.event_sender { Ok(event_sender.clone()) } else if let Some(connector) = &self.connector { @@ -117,6 +119,7 @@ where hyper::Uri::from_str(url_string.as_str()).unwrap(), sdk_key, default_headers, + self.compress_events, ))) } else { #[cfg(feature = "rustls")] @@ -133,6 +136,7 @@ where hyper::Uri::from_str(url_string.as_str()).unwrap(), sdk_key, default_headers, + self.compress_events, ))) } #[cfg(not(feature = "rustls"))] @@ -178,6 +182,7 @@ impl EventProcessorBuilder { private_attributes: HashSet::new(), omit_anonymous_contexts: false, connector: None, + compress_events: false, } } @@ -258,7 +263,20 @@ impl EventProcessorBuilder { self } + #[cfg(feature = "event-compression")] + /// Should the event payload sent to LaunchDarkly use gzip compression. By + /// default this is false to prevent backward breaking compatibility issues with + /// older versions of the relay proxy. + // + /// Customers not using the relay proxy are strongly encouraged to enable this + /// feature to reduce egress bandwidth cost. + pub fn compress_events(&mut self, enabled: bool) -> &mut Self { + self.compress_events = enabled; + self + } + #[cfg(test)] + /// Test only functionality that allows us to override the event sender. pub fn event_sender(&mut self, event_sender: Arc) -> &mut Self { self.event_sender = Some(event_sender); self diff --git a/launchdarkly-server-sdk/src/events/sender.rs b/launchdarkly-server-sdk/src/events/sender.rs index 44edea3..f8b6a54 100644 --- a/launchdarkly-server-sdk/src/events/sender.rs +++ b/launchdarkly-server-sdk/src/events/sender.rs @@ -2,10 +2,17 @@ use crate::{ reqwest::is_http_error_recoverable, LAUNCHDARKLY_EVENT_SCHEMA_HEADER, LAUNCHDARKLY_PAYLOAD_ID_HEADER, }; -use std::collections::HashMap; - use chrono::DateTime; use crossbeam_channel::Sender; +use std::collections::HashMap; + +#[cfg(feature = "event-compression")] +use flate2::write::GzEncoder; +#[cfg(feature = "event-compression")] +use flate2::Compression; +#[cfg(feature = "event-compression")] +use std::io::Write; + use futures::future::BoxFuture; use hyper::{client::connect::Connection, service::Service, Uri}; use tokio::{ @@ -36,6 +43,10 @@ pub struct HyperEventSender { sdk_key: String, http: hyper::Client, default_headers: HashMap<&'static str, String>, + + // used with event-compression feature + #[allow(dead_code)] + compress_events: bool, } impl HyperEventSender @@ -50,12 +61,14 @@ where url: hyper::Uri, sdk_key: &str, default_headers: HashMap<&'static str, String>, + compress_events: bool, ) -> Self { Self { url, sdk_key: sdk_key.to_owned(), http: hyper::Client::builder().build(connector), default_headers, + compress_events, } } @@ -96,7 +109,9 @@ where serde_json::to_string_pretty(&events).unwrap_or_else(|e| e.to_string()) ); - let json = match serde_json::to_vec(&events) { + // mut is needed for event-compression feature + #[allow(unused_mut)] + let mut payload = match serde_json::to_vec(&events) { Ok(json) => json, Err(e) => { error!( @@ -107,6 +122,21 @@ where } }; + // mut is needed for event-compression feature + #[allow(unused_mut)] + let mut additional_headers = self.default_headers.clone(); + + #[cfg(feature = "event-compression")] + if self.compress_events { + let mut e = GzEncoder::new(Vec::new(), Compression::default()); + if e.write_all(payload.as_slice()).is_ok() { + if let Ok(compressed) = e.finish() { + payload = compressed; + additional_headers.insert("Content-Encoding", "gzip".into()); + } + } + } + for attempt in 1..=2 { if attempt == 2 { sleep(Duration::from_secs(1)).await; @@ -124,11 +154,11 @@ where ) .header(LAUNCHDARKLY_PAYLOAD_ID_HEADER, uuid.to_string()); - for default_header in &self.default_headers { + for default_header in &additional_headers { request_builder = request_builder.header(*default_header.0, default_header.1.as_str()); } - let request = request_builder.body(hyper::Body::from(json.clone())); + let request = request_builder.body(hyper::Body::from(payload.clone())); let result = self.http.request(request.unwrap()).await; @@ -334,6 +364,7 @@ mod tests { url, "sdk-key", HashMap::<&str, String>::new(), + false, ) } } diff --git a/launchdarkly-server-sdk/src/test_common.rs b/launchdarkly-server-sdk/src/test_common.rs index 4b70123..c5f167c 100644 --- a/launchdarkly-server-sdk/src/test_common.rs +++ b/launchdarkly-server-sdk/src/test_common.rs @@ -7,7 +7,7 @@ use crate::Stage; pub const FLOAT_TO_INT_MAX: i64 = 9007199254740991; pub fn basic_flag(key: &str) -> Flag { - return basic_flag_with_visibility(key, false); + basic_flag_with_visibility(key, false) } pub fn basic_flag_with_visibility(key: &str, visible_to_environment_id: bool) -> Flag {