From 22604c60ba7f73e52e5c9d28b1504dad115fb592 Mon Sep 17 00:00:00 2001 From: Brian Martin Date: Thu, 11 Apr 2024 13:56:14 -0700 Subject: [PATCH] feat: add approximate compression ratio for messages and values (#191) Adds approximate compression ratio targeting for messages and values. This is done by limiting the number of random bytes in the payload to the first N bytes and iteratively estimating the compression ratio using gzip. --- .github/workflows/cargo.yml | 2 +- Cargo.lock | 25 +++++++------- Cargo.toml | 1 + configs/blabber.toml | 3 ++ configs/kafka.toml | 3 ++ configs/memcached.toml | 3 ++ configs/momento.toml | 3 ++ configs/momento_pubsub.toml | 6 ++++ configs/redis.toml | 3 ++ configs/segcache.toml | 3 ++ configs/smoketest.toml | 44 +++++++++++++++++++++++++ src/config/workload.rs | 12 +++++++ src/workload/mod.rs | 65 ++++++++++++++++++++++++++++++++++--- 13 files changed, 155 insertions(+), 18 deletions(-) create mode 100644 configs/smoketest.toml diff --git a/.github/workflows/cargo.yml b/.github/workflows/cargo.yml index eeb34b5..bf2914d 100644 --- a/.github/workflows/cargo.yml +++ b/.github/workflows/cargo.yml @@ -175,7 +175,7 @@ jobs: shell: bash run: | memcached -t 1 -p 11211 -m 256 & - target/release/rpc-perf configs/memcached.toml + target/release/rpc-perf configs/smoketest.toml check-success: name: verify all tests pass diff --git a/Cargo.lock b/Cargo.lock index 91169a2..1b0ddbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -120,9 +120,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.81" +version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0952808a6c2afd1aa8947271f3a60f1a6763c7b912d210184c5149b5cf147247" +checksum = "f538837af36e6f6a9be0faa67f9a314f8119e4e4b5867c6ab40ed60360142519" [[package]] name = "arrow" @@ -669,9 +669,9 @@ checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" [[package]] name = "combine" -version = "4.6.6" +version = "4.6.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" dependencies = [ "bytes", "futures-core", @@ -844,9 +844,9 @@ checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a" [[package]] name = "encoding_rs" -version = "0.8.33" +version = "0.8.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1" +checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" dependencies = [ "cfg-if", ] @@ -2327,9 +2327,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.35" +version = "1.0.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" +checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" dependencies = [ "proc-macro2", ] @@ -2561,6 +2561,7 @@ dependencies = [ "bytes", "chrono", "clap", + "flate2", "foreign-types-shared 0.3.1", "futures", "histogram 0.10.0", @@ -3023,9 +3024,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.34" +version = "0.3.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8248b6521bb14bc45b4067159b9b6ad792e2d6d754d6c41fb50e29fefe38749" +checksum = "ef89ece63debf11bc32d1ed8d078ac870cbeb44da02afb02a9ff135ae7ca0582" dependencies = [ "deranged", "itoa", @@ -3044,9 +3045,9 @@ checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.17" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ba3a3ef41e6672a2f0f001392bb5dcd3ff0a9992d618ca761a11c3121547774" +checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf" dependencies = [ "num-conv", "time-core", diff --git a/Cargo.toml b/Cargo.toml index 7755125..2b04b91 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,7 @@ tokio-openssl = { version = "0.6.4", optional = true } toml = "0.8.2" warp = "0.3.6" zipf = "7.0.1" +flate2 = "1.0.28" [features] default = ["openssl"] diff --git a/configs/blabber.toml b/configs/blabber.toml index 38e5a0b..3f0fa9a 100644 --- a/configs/blabber.toml +++ b/configs/blabber.toml @@ -54,6 +54,9 @@ start = 1 topics = 1 topic_len = 1 message_len = 64 +# optionally, specify an approximate compression ratio for the message payload. +# Defaults to 1.0 meaning the message is high-entropy and not compressible. +# compression_ratio = 1.0 weight = 1 # the total number of clients that will subscribe to this set of topics subscriber_poolsize = 100 diff --git a/configs/kafka.toml b/configs/kafka.toml index 21beda1..6169f42 100644 --- a/configs/kafka.toml +++ b/configs/kafka.toml @@ -84,5 +84,8 @@ topic_names = ["hello", "world"] partitions = 10 # the value length, in bytes message_len = 512 +# optionally, specify an approximate compression ratio for the message payload. +# Defaults to 1.0 meaning the message is high-entropy and not compressible. +compression_ratio = 1.0 # the key length, in bytes key_len = 8 \ No newline at end of file diff --git a/configs/memcached.toml b/configs/memcached.toml index 3f1a218..5daa8e4 100644 --- a/configs/memcached.toml +++ b/configs/memcached.toml @@ -63,6 +63,9 @@ nkeys = 1_000_000 vlen = 128 # use random bytes for the values vkind = "bytes" +# optionally, specify an approximate compression ratio for the value payload. +# Defaults to 1.0 meaning the message is high-entropy and not compressible. +compression_ratio = 1.0 # optionally: specify a TTL for the keys, by default there is no expiration # ttl = "15m" # controls what commands will be used in this keyspace diff --git a/configs/momento.toml b/configs/momento.toml index cb37d30..03f5d84 100644 --- a/configs/momento.toml +++ b/configs/momento.toml @@ -73,6 +73,9 @@ klen = 32 nkeys = 1_000_000 # sets the value length, in bytes vlen = 128 +# optionally, specify an approximate compression ratio for the value payload. +# Defaults to 1.0 meaning the message is high-entropy and not compressible. +compression_ratio = 1.0 # use random bytes for the values vkind = "bytes" # override the default ttl for this keyspace setting it to 15 minutes diff --git a/configs/momento_pubsub.toml b/configs/momento_pubsub.toml index 15ba83f..ecbd463 100644 --- a/configs/momento_pubsub.toml +++ b/configs/momento_pubsub.toml @@ -67,6 +67,8 @@ topics = 10 topic_len = 64 # sets the value length, in bytes message_len = 128 +# specify an approximate compression ratio for the message payload +compression_ratio = 1.0 # An example set of topics using a high number of subscribers per topic. [[workload.topics]] @@ -84,3 +86,7 @@ topics = 1 topic_len = 32 # sets the value length, in bytes message_len = 128 +# optionally, specify an approximate compression ratio for the message payload. +# Defaults to 1.0 meaning the message is high-entropy and not compressible. +compression_ratio = 1.0 + diff --git a/configs/redis.toml b/configs/redis.toml index 999e37c..a6f59dc 100644 --- a/configs/redis.toml +++ b/configs/redis.toml @@ -69,6 +69,9 @@ nkeys = 1_000_000 vlen = 128 # use random bytes for the values vkind = "bytes" +# optionally, specify an approximate compression ratio for the value payload. +# Defaults to 1.0 meaning the message is high-entropy and not compressible. +compression_ratio = 1.0 # optionally: specify a TTL for the keys, by default there is no expiration # ttl = "15m" # controls what commands will be used in this keyspace diff --git a/configs/segcache.toml b/configs/segcache.toml index cec5079..841a44b 100644 --- a/configs/segcache.toml +++ b/configs/segcache.toml @@ -86,6 +86,9 @@ nkeys = 1_000_000 vlen = 128 # use random bytes for the values vkind = "bytes" +# optionally, specify an approximate compression ratio for the value payload. +# Defaults to 1.0 meaning the message is high-entropy and not compressible. +compression_ratio = 1.0 # controls what commands will be used in this keyspace commands = [ # get a value diff --git a/configs/smoketest.toml b/configs/smoketest.toml new file mode 100644 index 0000000..d32dabe --- /dev/null +++ b/configs/smoketest.toml @@ -0,0 +1,44 @@ +# A configuration that can be used as a smoketest against a memcached instance. + +[general] +protocol = "memcache" +interval = 1 +duration = 60 +metrics_output = "rpcperf.parquet" +metrics_format = "parquet" +admin = "127.0.0.1:9090" + +[debug] +log_level = "info" +log_backup = "rpc-perf.log.old" +log_max_size = 1073741824 + +[target] +endpoints = [ + "127.0.0.1:11211", +] + +[client] +threads = 4 +poolsize = 20 +connect_timeout = 10000 +request_timeout = 1000 + +[workload] +threads = 1 + +[workload.ratelimit] +start = 10_000 + +[[workload.keyspace]] +weight = 1 +klen = 32 +nkeys = 1_000_000 +vlen = 128 +vkind = "bytes" +compression_ratio = 10.0 +commands = [ + { verb = "get", weight = 80 }, + { verb = "set", weight = 20 }, + { verb = "delete", weight = 0 }, +] diff --git a/src/config/workload.rs b/src/config/workload.rs index 41a946b..22fd8f8 100644 --- a/src/config/workload.rs +++ b/src/config/workload.rs @@ -48,6 +48,8 @@ pub struct Topics { #[serde(default)] topic_names: Vec, message_len: usize, + #[serde(default)] + compression_ratio: Option, #[serde(default = "one")] key_len: usize, weight: usize, @@ -89,6 +91,10 @@ impl Topics { self.message_len } + pub fn compression_ratio(&self) -> f64 { + self.compression_ratio.unwrap_or(1.0) + } + pub fn subscriber_poolsize(&self) -> usize { self.subscriber_poolsize } @@ -139,6 +145,8 @@ pub struct Keyspace { #[serde(default)] vkind: Option, #[serde(default)] + compression_ratio: Option, + #[serde(default)] // no ttl is treated as no-expires or max ttl for the protocol ttl: Option, } @@ -180,6 +188,10 @@ impl Keyspace { self.vkind.unwrap_or(ValueKind::Bytes) } + pub fn compression_ratio(&self) -> f64 { + self.compression_ratio.unwrap_or(1.0) + } + pub fn ttl(&self) -> Option { self.ttl .as_ref() diff --git a/src/workload/mod.rs b/src/workload/mod.rs index 8bb2a14..74bf4a8 100644 --- a/src/workload/mod.rs +++ b/src/workload/mod.rs @@ -1,15 +1,16 @@ use super::*; use config::{Command, RampCompletionAction, RampType, ValueKind, Verb}; +use flate2::write::GzEncoder; +use flate2::Compression; use rand::distributions::{Alphanumeric, Uniform}; use rand::seq::SliceRandom; -use rand::thread_rng; -use rand::{Rng, RngCore, SeedableRng}; +use rand::{thread_rng, Rng, RngCore, SeedableRng}; use rand_distr::Distribution as RandomDistribution; use rand_distr::WeightedAliasIndex; use rand_xoshiro::{Seed512, Xoshiro512PlusPlus}; use ratelimit::Ratelimiter; use std::collections::{HashMap, HashSet}; -use std::io::Result; +use std::io::{Result, Write}; use std::sync::atomic::AtomicU64; use std::sync::Arc; use tokio::runtime::Runtime; @@ -162,7 +163,7 @@ impl Generator { // add a header [m[0], m[1], m[2], m[3], m[4], m[5], m[6], m[7]] = [0x54, 0x45, 0x53, 0x54, 0x49, 0x4E, 0x47, 0x21]; - rng.fill(&mut m[32..topics.message_len]); + rng.fill(&mut m[32..(topics.message_random_bytes + 32)]); let mut k = vec![0_u8; topics.key_len]; rng.fill(&mut k[0..topics.key_len]); @@ -391,12 +392,16 @@ pub struct Topics { partition_dist: Distribution, key_len: usize, message_len: usize, + message_random_bytes: usize, subscriber_poolsize: usize, subscriber_concurrency: usize, } impl Topics { pub fn new(config: &Config, topics: &config::Topics) -> Self { + let message_random_bytes = + estimate_random_bytes_needed(topics.message_len(), topics.compression_ratio()); + // ntopics must be >= 1 let ntopics = std::cmp::max(1, topics.topics()); // partitions must be >= 1 @@ -458,6 +463,7 @@ impl Topics { partition_dist, key_len, message_len, + message_random_bytes, subscriber_poolsize, subscriber_concurrency, } @@ -490,6 +496,7 @@ pub struct Keyspace { inner_key_dist: Distribution, vlen: usize, vkind: ValueKind, + value_random_bytes: usize, ttl: Option, } @@ -510,6 +517,11 @@ impl Distribution { impl Keyspace { pub fn new(config: &Config, keyspace: &config::Keyspace) -> Self { + let value_random_bytes = estimate_random_bytes_needed( + keyspace.vlen().unwrap_or(0), + keyspace.compression_ratio(), + ); + // nkeys must be >= 1 let nkeys = std::cmp::max(1, keyspace.nkeys()); let klen = keyspace.klen(); @@ -655,6 +667,7 @@ impl Keyspace { inner_key_dist, vlen: keyspace.vlen().unwrap_or(0), vkind: keyspace.vkind(), + value_random_bytes, ttl: keyspace.ttl(), } } @@ -674,7 +687,7 @@ impl Keyspace { ValueKind::I64 => format!("{}", rng.gen::()).into_bytes(), ValueKind::Bytes => { let mut buf = vec![0_u8; self.vlen]; - rng.fill(&mut buf[0..self.vlen]); + rng.fill(&mut buf[0..self.value_random_bytes]); buf } } @@ -807,3 +820,45 @@ impl Ratelimit { limit } } + +fn estimate_random_bytes_needed(length: usize, compression_ratio: f64) -> usize { + // if compression ratio is low, all bytes should be random + if compression_ratio <= 1.0 { + return length; + } + + // we need to approximate the number of random bytes to send, we do + // this iteratively assuming gzip compression. + + // doesn't matter what seed we use here + let mut rng = Xoshiro512PlusPlus::seed_from_u64(0); + + // message buffer + let mut m = vec![0; length]; + + let mut best = 0; + + for idx in 0..m.len() { + // zero all bytes + for b in &mut m { + *b = 0 + } + + // fill first N bytes with pseudorandom data + rng.fill_bytes(&mut m[0..idx]); + + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + let _ = encoder.write_all(&m); + let compressed = encoder.finish().unwrap(); + + let ratio = m.len() as f64 / compressed.len() as f64; + + if ratio < compression_ratio { + break; + } + + best = idx; + } + + best +}