Skip to content

Commit

Permalink
feat: add approximate compression ratio for messages and values (iops…
Browse files Browse the repository at this point in the history
…ystems#191)

Adds approximate compression ratio targeting for messages and
values.

This is done by limiting the number of random bytes in the payload
to the first N bytes and iteratively estimating the compression
ratio using gzip.
  • Loading branch information
brayniac committed Apr 11, 2024
1 parent ed01ba7 commit 22604c6
Show file tree
Hide file tree
Showing 13 changed files with 155 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cargo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ jobs:
shell: bash
run: |
memcached -t 1 -p 11211 -m 256 &
target/release/rpc-perf configs/memcached.toml
target/release/rpc-perf configs/smoketest.toml
check-success:
name: verify all tests pass
Expand Down
25 changes: 13 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ tokio-openssl = { version = "0.6.4", optional = true }
toml = "0.8.2"
warp = "0.3.6"
zipf = "7.0.1"
flate2 = "1.0.28"

[features]
default = ["openssl"]
Expand Down
3 changes: 3 additions & 0 deletions configs/blabber.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ start = 1
topics = 1
topic_len = 1
message_len = 64
# optionally, specify an approximate compression ratio for the message payload.
# Defaults to 1.0 meaning the message is high-entropy and not compressible.
# compression_ratio = 1.0
weight = 1
# the total number of clients that will subscribe to this set of topics
subscriber_poolsize = 100
Expand Down
3 changes: 3 additions & 0 deletions configs/kafka.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,8 @@ topic_names = ["hello", "world"]
partitions = 10
# the value length, in bytes
message_len = 512
# optionally, specify an approximate compression ratio for the message payload.
# Defaults to 1.0 meaning the message is high-entropy and not compressible.
compression_ratio = 1.0
# the key length, in bytes
key_len = 8
3 changes: 3 additions & 0 deletions configs/memcached.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ nkeys = 1_000_000
vlen = 128
# use random bytes for the values
vkind = "bytes"
# optionally, specify an approximate compression ratio for the value payload.
# Defaults to 1.0 meaning the message is high-entropy and not compressible.
compression_ratio = 1.0
# optionally: specify a TTL for the keys, by default there is no expiration
# ttl = "15m"
# controls what commands will be used in this keyspace
Expand Down
3 changes: 3 additions & 0 deletions configs/momento.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ klen = 32
nkeys = 1_000_000
# sets the value length, in bytes
vlen = 128
# optionally, specify an approximate compression ratio for the value payload.
# Defaults to 1.0 meaning the message is high-entropy and not compressible.
compression_ratio = 1.0
# use random bytes for the values
vkind = "bytes"
# override the default ttl for this keyspace setting it to 15 minutes
Expand Down
6 changes: 6 additions & 0 deletions configs/momento_pubsub.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ topics = 10
topic_len = 64
# sets the value length, in bytes
message_len = 128
# specify an approximate compression ratio for the message payload
compression_ratio = 1.0

# An example set of topics using a high number of subscribers per topic.
[[workload.topics]]
Expand All @@ -84,3 +86,7 @@ topics = 1
topic_len = 32
# sets the value length, in bytes
message_len = 128
# optionally, specify an approximate compression ratio for the message payload.
# Defaults to 1.0 meaning the message is high-entropy and not compressible.
compression_ratio = 1.0

3 changes: 3 additions & 0 deletions configs/redis.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ nkeys = 1_000_000
vlen = 128
# use random bytes for the values
vkind = "bytes"
# optionally, specify an approximate compression ratio for the value payload.
# Defaults to 1.0 meaning the message is high-entropy and not compressible.
compression_ratio = 1.0
# optionally: specify a TTL for the keys, by default there is no expiration
# ttl = "15m"
# controls what commands will be used in this keyspace
Expand Down
3 changes: 3 additions & 0 deletions configs/segcache.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ nkeys = 1_000_000
vlen = 128
# use random bytes for the values
vkind = "bytes"
# optionally, specify an approximate compression ratio for the value payload.
# Defaults to 1.0 meaning the message is high-entropy and not compressible.
compression_ratio = 1.0
# controls what commands will be used in this keyspace
commands = [
# get a value
Expand Down
44 changes: 44 additions & 0 deletions configs/smoketest.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# A configuration that can be used as a smoketest against a memcached instance.

[general]
protocol = "memcache"
interval = 1
duration = 60
metrics_output = "rpcperf.parquet"
metrics_format = "parquet"
admin = "127.0.0.1:9090"

[debug]
log_level = "info"
log_backup = "rpc-perf.log.old"
log_max_size = 1073741824

[target]
endpoints = [
"127.0.0.1:11211",
]

[client]
threads = 4
poolsize = 20
connect_timeout = 10000
request_timeout = 1000

[workload]
threads = 1

[workload.ratelimit]
start = 10_000

[[workload.keyspace]]
weight = 1
klen = 32
nkeys = 1_000_000
vlen = 128
vkind = "bytes"
compression_ratio = 10.0
commands = [
{ verb = "get", weight = 80 },
{ verb = "set", weight = 20 },
{ verb = "delete", weight = 0 },
]
12 changes: 12 additions & 0 deletions src/config/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub struct Topics {
#[serde(default)]
topic_names: Vec<String>,
message_len: usize,
#[serde(default)]
compression_ratio: Option<f64>,
#[serde(default = "one")]
key_len: usize,
weight: usize,
Expand Down Expand Up @@ -89,6 +91,10 @@ impl Topics {
self.message_len
}

pub fn compression_ratio(&self) -> f64 {
self.compression_ratio.unwrap_or(1.0)
}

pub fn subscriber_poolsize(&self) -> usize {
self.subscriber_poolsize
}
Expand Down Expand Up @@ -139,6 +145,8 @@ pub struct Keyspace {
#[serde(default)]
vkind: Option<ValueKind>,
#[serde(default)]
compression_ratio: Option<f64>,
#[serde(default)]
// no ttl is treated as no-expires or max ttl for the protocol
ttl: Option<String>,
}
Expand Down Expand Up @@ -180,6 +188,10 @@ impl Keyspace {
self.vkind.unwrap_or(ValueKind::Bytes)
}

pub fn compression_ratio(&self) -> f64 {
self.compression_ratio.unwrap_or(1.0)
}

pub fn ttl(&self) -> Option<Duration> {
self.ttl
.as_ref()
Expand Down
65 changes: 60 additions & 5 deletions src/workload/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use super::*;
use config::{Command, RampCompletionAction, RampType, ValueKind, Verb};
use flate2::write::GzEncoder;
use flate2::Compression;
use rand::distributions::{Alphanumeric, Uniform};
use rand::seq::SliceRandom;
use rand::thread_rng;
use rand::{Rng, RngCore, SeedableRng};
use rand::{thread_rng, Rng, RngCore, SeedableRng};
use rand_distr::Distribution as RandomDistribution;
use rand_distr::WeightedAliasIndex;
use rand_xoshiro::{Seed512, Xoshiro512PlusPlus};
use ratelimit::Ratelimiter;
use std::collections::{HashMap, HashSet};
use std::io::Result;
use std::io::{Result, Write};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use tokio::runtime::Runtime;
Expand Down Expand Up @@ -162,7 +163,7 @@ impl Generator {
// add a header
[m[0], m[1], m[2], m[3], m[4], m[5], m[6], m[7]] =
[0x54, 0x45, 0x53, 0x54, 0x49, 0x4E, 0x47, 0x21];
rng.fill(&mut m[32..topics.message_len]);
rng.fill(&mut m[32..(topics.message_random_bytes + 32)]);
let mut k = vec![0_u8; topics.key_len];
rng.fill(&mut k[0..topics.key_len]);

Expand Down Expand Up @@ -391,12 +392,16 @@ pub struct Topics {
partition_dist: Distribution,
key_len: usize,
message_len: usize,
message_random_bytes: usize,
subscriber_poolsize: usize,
subscriber_concurrency: usize,
}

impl Topics {
pub fn new(config: &Config, topics: &config::Topics) -> Self {
let message_random_bytes =
estimate_random_bytes_needed(topics.message_len(), topics.compression_ratio());

// ntopics must be >= 1
let ntopics = std::cmp::max(1, topics.topics());
// partitions must be >= 1
Expand Down Expand Up @@ -458,6 +463,7 @@ impl Topics {
partition_dist,
key_len,
message_len,
message_random_bytes,
subscriber_poolsize,
subscriber_concurrency,
}
Expand Down Expand Up @@ -490,6 +496,7 @@ pub struct Keyspace {
inner_key_dist: Distribution,
vlen: usize,
vkind: ValueKind,
value_random_bytes: usize,
ttl: Option<Duration>,
}

Expand All @@ -510,6 +517,11 @@ impl Distribution {

impl Keyspace {
pub fn new(config: &Config, keyspace: &config::Keyspace) -> Self {
let value_random_bytes = estimate_random_bytes_needed(
keyspace.vlen().unwrap_or(0),
keyspace.compression_ratio(),
);

// nkeys must be >= 1
let nkeys = std::cmp::max(1, keyspace.nkeys());
let klen = keyspace.klen();
Expand Down Expand Up @@ -655,6 +667,7 @@ impl Keyspace {
inner_key_dist,
vlen: keyspace.vlen().unwrap_or(0),
vkind: keyspace.vkind(),
value_random_bytes,
ttl: keyspace.ttl(),
}
}
Expand All @@ -674,7 +687,7 @@ impl Keyspace {
ValueKind::I64 => format!("{}", rng.gen::<i64>()).into_bytes(),
ValueKind::Bytes => {
let mut buf = vec![0_u8; self.vlen];
rng.fill(&mut buf[0..self.vlen]);
rng.fill(&mut buf[0..self.value_random_bytes]);
buf
}
}
Expand Down Expand Up @@ -807,3 +820,45 @@ impl Ratelimit {
limit
}
}

fn estimate_random_bytes_needed(length: usize, compression_ratio: f64) -> usize {
// if compression ratio is low, all bytes should be random
if compression_ratio <= 1.0 {
return length;
}

// we need to approximate the number of random bytes to send, we do
// this iteratively assuming gzip compression.

// doesn't matter what seed we use here
let mut rng = Xoshiro512PlusPlus::seed_from_u64(0);

// message buffer
let mut m = vec![0; length];

let mut best = 0;

for idx in 0..m.len() {
// zero all bytes
for b in &mut m {
*b = 0
}

// fill first N bytes with pseudorandom data
rng.fill_bytes(&mut m[0..idx]);

let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
let _ = encoder.write_all(&m);
let compressed = encoder.finish().unwrap();

let ratio = m.len() as f64 / compressed.len() as f64;

if ratio < compression_ratio {
break;
}

best = idx;
}

best
}

0 comments on commit 22604c6

Please sign in to comment.