Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add option to enable compression of event payloads #102

Merged
merged 7 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contract-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ actix = "0.13.0"
actix-web = "4.2.1"
env_logger = "0.10.0"
log = "0.4.14"
launchdarkly-server-sdk = { path = "../launchdarkly-server-sdk/", default-features = false}
launchdarkly-server-sdk = { path = "../launchdarkly-server-sdk/", default-features = false, features = ["event-compression"]}
serde = { version = "1.0.132", features = ["derive"] }
serde_json = "1.0.73"
futures = "0.3.12"
Expand Down
3 changes: 3 additions & 0 deletions contract-tests/src/client_entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ impl ClientEntity {
processor_builder.capacity(capacity);
}
processor_builder.all_attributes_private(events.all_attributes_private);
if let Some(e) = events.enable_gzip {
processor_builder.compress_events(e);
}

if let Some(interval) = events.flush_interval_ms {
processor_builder.flush_interval(Duration::from_millis(interval));
Expand Down
3 changes: 3 additions & 0 deletions contract-tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub struct EventParameters {
pub enable_diagnostics: bool,
#[serde(default = "bool::default")]
pub all_attributes_private: bool,
pub enable_gzip: Option<bool>,
pub global_private_attributes: Option<HashSet<Reference>>,
pub flush_interval_ms: Option<u64>,
#[serde(default = "bool::default")]
Expand Down Expand Up @@ -110,6 +111,8 @@ async fn status() -> impl Responder {
"migrations".to_string(),
"event-sampling".to_string(),
"client-prereq-events".to_string(),
"event-gzip".to_string(),
"optional-event-gzip".to_string(),
],
})
}
Expand Down
2 changes: 2 additions & 0 deletions launchdarkly-server-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ uuid = {version = "1.2.2", features = ["v4"] }
hyper = { version = "0.14.19", features = ["client", "http1", "http2", "tcp"] }
hyper-rustls = { version = "0.24.1" , optional = true}
rand = "0.8"
flate2 = { version = "1.0.35", optional = true }

[dev-dependencies]
maplit = "1.0.1"
Expand All @@ -50,6 +51,7 @@ reqwest = { version = "0.12.4", features = ["json"] }
[features]
default = ["rustls"]
rustls = ["hyper-rustls/http1", "hyper-rustls/http2", "eventsource-client/rustls"]
event-compression = ["flate2"]

[[example]]
name = "print_flags"
Expand Down
2 changes: 1 addition & 1 deletion launchdarkly-server-sdk/src/data_source_builders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ impl DataSourceFactory for MockDataSourceBuilder {
_sdk_key: &str,
_tags: Option<String>,
) -> Result<Arc<dyn DataSource>, BuildError> {
return Ok(self.data_source.as_ref().unwrap().clone());
Ok(self.data_source.as_ref().unwrap().clone())
}

fn to_owned(&self) -> Box<dyn DataSourceFactory> {
Expand Down
18 changes: 18 additions & 0 deletions launchdarkly-server-sdk/src/events/processor_builders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub struct EventProcessorBuilder<C> {
private_attributes: HashSet<Reference>,
connector: Option<C>,
omit_anonymous_contexts: bool,
compress_events: bool,
// diagnostic_recording_interval: Duration
}

Expand All @@ -109,6 +110,7 @@ where
}

let event_sender_result: Result<Arc<dyn EventSender>, BuildError> =
// NOTE: This would only be possible under unit testing conditions.
if let Some(event_sender) = &self.event_sender {
Ok(event_sender.clone())
} else if let Some(connector) = &self.connector {
Expand All @@ -117,6 +119,7 @@ where
hyper::Uri::from_str(url_string.as_str()).unwrap(),
sdk_key,
default_headers,
self.compress_events,
)))
} else {
#[cfg(feature = "rustls")]
Expand All @@ -133,6 +136,7 @@ where
hyper::Uri::from_str(url_string.as_str()).unwrap(),
sdk_key,
default_headers,
self.compress_events,
)))
}
#[cfg(not(feature = "rustls"))]
Expand Down Expand Up @@ -178,6 +182,7 @@ impl<C> EventProcessorBuilder<C> {
private_attributes: HashSet::new(),
omit_anonymous_contexts: false,
connector: None,
compress_events: false,
}
}

Expand Down Expand Up @@ -258,7 +263,20 @@ impl<C> EventProcessorBuilder<C> {
self
}

#[cfg(feature = "event-compression")]
/// Should the event payload sent to LaunchDarkly use gzip compression. By
/// default this is false to prevent backward breaking compatibility issues with
/// older versions of the relay proxy.
//
/// Customers not using the relay proxy are strongly encouraged to enable this
/// feature to reduce egress bandwidth cost.
pub fn compress_events(&mut self, enabled: bool) -> &mut Self {
self.compress_events = enabled;
self
}

#[cfg(test)]
/// Test only functionality that allows us to override the event sender.
pub fn event_sender(&mut self, event_sender: Arc<dyn EventSender>) -> &mut Self {
self.event_sender = Some(event_sender);
self
Expand Down
42 changes: 37 additions & 5 deletions launchdarkly-server-sdk/src/events/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@ use crate::{
reqwest::is_http_error_recoverable, LAUNCHDARKLY_EVENT_SCHEMA_HEADER,
LAUNCHDARKLY_PAYLOAD_ID_HEADER,
};
use std::collections::HashMap;

use chrono::DateTime;
use crossbeam_channel::Sender;
use std::collections::HashMap;

#[cfg(feature = "event-compression")]
use flate2::write::GzEncoder;
#[cfg(feature = "event-compression")]
use flate2::Compression;
#[cfg(feature = "event-compression")]
use std::io::Write;

use futures::future::BoxFuture;
use hyper::{client::connect::Connection, service::Service, Uri};
use tokio::{
Expand Down Expand Up @@ -36,6 +43,10 @@ pub struct HyperEventSender<C> {
sdk_key: String,
http: hyper::Client<C>,
default_headers: HashMap<&'static str, String>,

// used with event-compression feature
#[allow(dead_code)]
compress_events: bool,
}

impl<C> HyperEventSender<C>
Expand All @@ -50,12 +61,14 @@ where
url: hyper::Uri,
sdk_key: &str,
default_headers: HashMap<&'static str, String>,
compress_events: bool,
) -> Self {
Self {
url,
sdk_key: sdk_key.to_owned(),
http: hyper::Client::builder().build(connector),
default_headers,
compress_events,
}
}

Expand Down Expand Up @@ -96,7 +109,9 @@ where
serde_json::to_string_pretty(&events).unwrap_or_else(|e| e.to_string())
);

let json = match serde_json::to_vec(&events) {
// mut is needed for event-compression feature
#[allow(unused_mut)]
let mut payload = match serde_json::to_vec(&events) {
Ok(json) => json,
Err(e) => {
error!(
Expand All @@ -107,6 +122,21 @@ where
}
};

// mut is needed for event-compression feature
#[allow(unused_mut)]
let mut additional_headers = self.default_headers.clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be not worth it in terms of readability, but possibly this could conditionally be mut if event-compression is enabled, and not mut otherwise.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to do the same thing with the json bit above, so the readability doesn't seem good. I'm inclined to leave it.


#[cfg(feature = "event-compression")]
if self.compress_events {
let mut e = GzEncoder::new(Vec::new(), Compression::default());
if e.write_all(payload.as_slice()).is_ok() {
if let Ok(compressed) = e.finish() {
payload = compressed;
additional_headers.insert("Content-Encoding", "gzip".into());
}
}
}

for attempt in 1..=2 {
if attempt == 2 {
sleep(Duration::from_secs(1)).await;
Expand All @@ -124,11 +154,12 @@ where
)
.header(LAUNCHDARKLY_PAYLOAD_ID_HEADER, uuid.to_string());

for default_header in &self.default_headers {
for default_header in &additional_headers {
error!("Adding header: {} = {}", default_header.0, default_header.1);
cwaldren-ld marked this conversation as resolved.
Show resolved Hide resolved
request_builder =
request_builder.header(*default_header.0, default_header.1.as_str());
}
let request = request_builder.body(hyper::Body::from(json.clone()));
let request = request_builder.body(hyper::Body::from(payload.clone()));

let result = self.http.request(request.unwrap()).await;

Expand Down Expand Up @@ -334,6 +365,7 @@ mod tests {
url,
"sdk-key",
HashMap::<&str, String>::new(),
false,
)
}
}
2 changes: 1 addition & 1 deletion launchdarkly-server-sdk/src/test_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::Stage;
pub const FLOAT_TO_INT_MAX: i64 = 9007199254740991;

pub fn basic_flag(key: &str) -> Flag {
return basic_flag_with_visibility(key, false);
basic_flag_with_visibility(key, false)
}

pub fn basic_flag_with_visibility(key: &str, visible_to_environment_id: bool) -> Flag {
Expand Down
Loading