Skip to content

Commit

Permalink
feat: Add option to enable compression of event payloads (#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
keelerm84 authored Dec 5, 2024
1 parent c239b8a commit 5cc7aac
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 8 deletions.
2 changes: 1 addition & 1 deletion contract-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ actix = "0.13.0"
actix-web = "4.2.1"
env_logger = "0.10.0"
log = "0.4.14"
launchdarkly-server-sdk = { path = "../launchdarkly-server-sdk/", default-features = false}
launchdarkly-server-sdk = { path = "../launchdarkly-server-sdk/", default-features = false, features = ["event-compression"]}
serde = { version = "1.0.132", features = ["derive"] }
serde_json = "1.0.73"
futures = "0.3.12"
Expand Down
3 changes: 3 additions & 0 deletions contract-tests/src/client_entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ impl ClientEntity {
processor_builder.capacity(capacity);
}
processor_builder.all_attributes_private(events.all_attributes_private);
if let Some(e) = events.enable_gzip {
processor_builder.compress_events(e);
}

if let Some(interval) = events.flush_interval_ms {
processor_builder.flush_interval(Duration::from_millis(interval));
Expand Down
3 changes: 3 additions & 0 deletions contract-tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub struct EventParameters {
pub enable_diagnostics: bool,
#[serde(default = "bool::default")]
pub all_attributes_private: bool,
pub enable_gzip: Option<bool>,
pub global_private_attributes: Option<HashSet<Reference>>,
pub flush_interval_ms: Option<u64>,
#[serde(default = "bool::default")]
Expand Down Expand Up @@ -110,6 +111,8 @@ async fn status() -> impl Responder {
"migrations".to_string(),
"event-sampling".to_string(),
"client-prereq-events".to_string(),
"event-gzip".to_string(),
"optional-event-gzip".to_string(),
],
})
}
Expand Down
2 changes: 2 additions & 0 deletions launchdarkly-server-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ uuid = {version = "1.2.2", features = ["v4"] }
hyper = { version = "0.14.19", features = ["client", "http1", "http2", "tcp"] }
hyper-rustls = { version = "0.24.1" , optional = true}
rand = "0.8"
flate2 = { version = "1.0.35", optional = true }

[dev-dependencies]
maplit = "1.0.1"
Expand All @@ -50,6 +51,7 @@ reqwest = { version = "0.12.4", features = ["json"] }
[features]
default = ["rustls"]
rustls = ["hyper-rustls/http1", "hyper-rustls/http2", "eventsource-client/rustls"]
event-compression = ["flate2"]

[[example]]
name = "print_flags"
Expand Down
2 changes: 1 addition & 1 deletion launchdarkly-server-sdk/src/data_source_builders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ impl DataSourceFactory for MockDataSourceBuilder {
_sdk_key: &str,
_tags: Option<String>,
) -> Result<Arc<dyn DataSource>, BuildError> {
return Ok(self.data_source.as_ref().unwrap().clone());
Ok(self.data_source.as_ref().unwrap().clone())
}

fn to_owned(&self) -> Box<dyn DataSourceFactory> {
Expand Down
18 changes: 18 additions & 0 deletions launchdarkly-server-sdk/src/events/processor_builders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub struct EventProcessorBuilder<C> {
private_attributes: HashSet<Reference>,
connector: Option<C>,
omit_anonymous_contexts: bool,
compress_events: bool,
// diagnostic_recording_interval: Duration
}

Expand All @@ -109,6 +110,7 @@ where
}

let event_sender_result: Result<Arc<dyn EventSender>, BuildError> =
// NOTE: This would only be possible under unit testing conditions.
if let Some(event_sender) = &self.event_sender {
Ok(event_sender.clone())
} else if let Some(connector) = &self.connector {
Expand All @@ -117,6 +119,7 @@ where
hyper::Uri::from_str(url_string.as_str()).unwrap(),
sdk_key,
default_headers,
self.compress_events,
)))
} else {
#[cfg(feature = "rustls")]
Expand All @@ -133,6 +136,7 @@ where
hyper::Uri::from_str(url_string.as_str()).unwrap(),
sdk_key,
default_headers,
self.compress_events,
)))
}
#[cfg(not(feature = "rustls"))]
Expand Down Expand Up @@ -178,6 +182,7 @@ impl<C> EventProcessorBuilder<C> {
private_attributes: HashSet::new(),
omit_anonymous_contexts: false,
connector: None,
compress_events: false,
}
}

Expand Down Expand Up @@ -258,7 +263,20 @@ impl<C> EventProcessorBuilder<C> {
self
}

#[cfg(feature = "event-compression")]
/// Should the event payload sent to LaunchDarkly use gzip compression. By
/// default this is false to prevent backward breaking compatibility issues with
/// older versions of the relay proxy.
//
/// Customers not using the relay proxy are strongly encouraged to enable this
/// feature to reduce egress bandwidth cost.
pub fn compress_events(&mut self, enabled: bool) -> &mut Self {
self.compress_events = enabled;
self
}

#[cfg(test)]
/// Test only functionality that allows us to override the event sender.
pub fn event_sender(&mut self, event_sender: Arc<dyn EventSender>) -> &mut Self {
self.event_sender = Some(event_sender);
self
Expand Down
41 changes: 36 additions & 5 deletions launchdarkly-server-sdk/src/events/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@ use crate::{
reqwest::is_http_error_recoverable, LAUNCHDARKLY_EVENT_SCHEMA_HEADER,
LAUNCHDARKLY_PAYLOAD_ID_HEADER,
};
use std::collections::HashMap;

use chrono::DateTime;
use crossbeam_channel::Sender;
use std::collections::HashMap;

#[cfg(feature = "event-compression")]
use flate2::write::GzEncoder;
#[cfg(feature = "event-compression")]
use flate2::Compression;
#[cfg(feature = "event-compression")]
use std::io::Write;

use futures::future::BoxFuture;
use hyper::{client::connect::Connection, service::Service, Uri};
use tokio::{
Expand Down Expand Up @@ -36,6 +43,10 @@ pub struct HyperEventSender<C> {
sdk_key: String,
http: hyper::Client<C>,
default_headers: HashMap<&'static str, String>,

// used with event-compression feature
#[allow(dead_code)]
compress_events: bool,
}

impl<C> HyperEventSender<C>
Expand All @@ -50,12 +61,14 @@ where
url: hyper::Uri,
sdk_key: &str,
default_headers: HashMap<&'static str, String>,
compress_events: bool,
) -> Self {
Self {
url,
sdk_key: sdk_key.to_owned(),
http: hyper::Client::builder().build(connector),
default_headers,
compress_events,
}
}

Expand Down Expand Up @@ -96,7 +109,9 @@ where
serde_json::to_string_pretty(&events).unwrap_or_else(|e| e.to_string())
);

let json = match serde_json::to_vec(&events) {
// mut is needed for event-compression feature
#[allow(unused_mut)]
let mut payload = match serde_json::to_vec(&events) {
Ok(json) => json,
Err(e) => {
error!(
Expand All @@ -107,6 +122,21 @@ where
}
};

// mut is needed for event-compression feature
#[allow(unused_mut)]
let mut additional_headers = self.default_headers.clone();

#[cfg(feature = "event-compression")]
if self.compress_events {
let mut e = GzEncoder::new(Vec::new(), Compression::default());
if e.write_all(payload.as_slice()).is_ok() {
if let Ok(compressed) = e.finish() {
payload = compressed;
additional_headers.insert("Content-Encoding", "gzip".into());
}
}
}

for attempt in 1..=2 {
if attempt == 2 {
sleep(Duration::from_secs(1)).await;
Expand All @@ -124,11 +154,11 @@ where
)
.header(LAUNCHDARKLY_PAYLOAD_ID_HEADER, uuid.to_string());

for default_header in &self.default_headers {
for default_header in &additional_headers {
request_builder =
request_builder.header(*default_header.0, default_header.1.as_str());
}
let request = request_builder.body(hyper::Body::from(json.clone()));
let request = request_builder.body(hyper::Body::from(payload.clone()));

let result = self.http.request(request.unwrap()).await;

Expand Down Expand Up @@ -334,6 +364,7 @@ mod tests {
url,
"sdk-key",
HashMap::<&str, String>::new(),
false,
)
}
}
2 changes: 1 addition & 1 deletion launchdarkly-server-sdk/src/test_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::Stage;
pub const FLOAT_TO_INT_MAX: i64 = 9007199254740991;

pub fn basic_flag(key: &str) -> Flag {
return basic_flag_with_visibility(key, false);
basic_flag_with_visibility(key, false)
}

pub fn basic_flag_with_visibility(key: &str, visible_to_environment_id: bool) -> Flag {
Expand Down

0 comments on commit 5cc7aac

Please sign in to comment.