Skip to content

Commit

Permalink
feat: add kafka support (iopsystems#56)
Browse files Browse the repository at this point in the history
Adds support for kafka pubsub
  • Loading branch information
yangxi committed Oct 19, 2023
1 parent 3f10af2 commit 64f8104
Show file tree
Hide file tree
Showing 14 changed files with 677 additions and 118 deletions.
76 changes: 76 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ rand_distr = "0.4.3"
rand_xoshiro = "0.6.0"
ratelimit = "0.7.0"
redis = { version = "0.22.3", features = ["tokio-comp"] }
rdkafka = { version = "0.25", features = ["cmake-build"] }
ringlog = "0.3.0"
rpcperf-dataspec = { path = "lib/dataspec" }
serde = { workspace = true }
Expand Down
83 changes: 83 additions & 0 deletions configs/kafka.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# An example configuration for benchmarking Momento (https://www.gomomento.com)
# and demonstrating the use of the preview functionality for collections. Each
# command family is using its own keyspace and covers key-value, hash, list,
# set, and sorted set.

[general]
# specify the protocol to be used
protocol = "kafka"
# the interval for stats integration and reporting
interval = 1
# the number of intervals to run the test for
duration = 300
# optionally, we can write some detailed stats to a file during the run
#json_output = "stats.json"
# run the admin thread with a HTTP listener at the address provided, this allows
# stats exposition via HTTP
admin = "127.0.0.1:9090"
# optionally, set an initial seed for the PRNGs used to generate the workload.
# The default is to intialize from the OS entropy pool.
#initial_seed = "0"

[debug]
# choose from: error, warn, info, debug, trace
log_level = "error"
# optionally, log to the file below instead of standard out
# log_file = "rpc-perf.log"
# backup file name for use with log rotation
log_backup = "rpc-perf.log.old"
# trigger log rotation when the file grows beyond this size (in bytes). Set this
# option to '0' to disable log rotation.
log_max_size = 1073741824

[target]
# kafka broker ip:port
endpoints = [
"127.0.0.1:9092"
]

[pubsub]
# TODO the connect timeout in milliseconds
connect_timeout = 10000
publish_timeout = 1000
# the number of threads in the publisher runtime
publisher_threads = 4
publisher_poolsize = 1
publisher_concurrency = 20
# the number of threads in the subscriber runtime
subscriber_threads = 4
# kafka-specific client configurations
kafka_acks = "1"
kafka_linger_ms = "1"
#kafka_batch_size
#kafka_batch_num_messages
#kafka_fetch_message_max_bytes
#kafka_request_timeout_ms

[workload]
# the number of threads that will be used to generate requests
threads = 1
# the global ratelimit
ratelimit = 1000

# An example set of
#topics using a single consumer multiple producer.
[[workload.topics]]
# the weight relative to other workload components
weight = 1
# the total number of Momento clients for subscribers to this set of topics
subscriber_poolsize = 1
# the total number of gRPC sessions per Momento client for this set of topics
subscriber_concurrency = 1
# sets the number of topics
topics = 1
# set the length of the topic names, in bytes
topic_len = 5
# set the topic names, if empty or the length and the number do not match topics and topic_len, generate random names
topic_names = ["hello"]
# sets the number of partitions in each topic
partitions = 10
# sets the value length, in bytes
message_len = 512
# sets the key length, in bytes
key_len = 8
4 changes: 4 additions & 0 deletions src/clients/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ pub fn launch_clients(config: &Config, work_receiver: Receiver<WorkItem>) -> Opt
Protocol::Resp => {
clients::redis::launch_tasks(&mut client_rt, config.clone(), work_receiver)
}
Protocol::Kafka => {
error!("keyspace is not supported for the kafka protocol");
std::process::exit(1);
}
}

Some(client_rt)
Expand Down
1 change: 1 addition & 0 deletions src/config/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ pub enum Protocol {
Momento,
Ping,
Resp,
Kafka,
}
32 changes: 32 additions & 0 deletions src/config/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ pub struct Pubsub {

publisher_poolsize: usize,
publisher_concurrency: usize,

// kafka specific configs
kafka_acks: Option<String>,
kafka_linger_ms: Option<String>,
kafka_batch_size: Option<String>,
kafka_batch_num_messages: Option<String>,
kafka_fetch_message_max_bytes: Option<String>,
kafka_request_timeout_ms: Option<String>,
}

impl Pubsub {
Expand All @@ -33,4 +41,28 @@ impl Pubsub {
pub fn publisher_concurrency(&self) -> usize {
self.publisher_concurrency
}

pub fn kafka_acks(&self) -> &Option<String> {
&self.kafka_acks
}

pub fn kafka_linger_ms(&self) -> &Option<String> {
&self.kafka_linger_ms
}

pub fn kafka_batch_size(&self) -> &Option<String> {
&self.kafka_batch_size
}

pub fn kafka_batch_num_messages(&self) -> &Option<String> {
&self.kafka_batch_num_messages
}

pub fn kafka_fetch_message_max_bytes(&self) -> &Option<String> {
&self.kafka_fetch_message_max_bytes
}

pub fn kafka_request_timeout_ms(&self) -> &Option<String> {
&self.kafka_request_timeout_ms
}
}
23 changes: 23 additions & 0 deletions src/config/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,30 +44,49 @@ impl Workload {
#[derive(Clone, Deserialize)]
pub struct Topics {
topics: usize,
#[serde(default = "one")]
partitions: usize,
topic_len: usize,
#[serde(default)]
topic_names: Vec<String>,
message_len: usize,
#[serde(default = "one")]
key_len: usize,
weight: usize,
subscriber_poolsize: usize,
#[serde(default = "one")]
subscriber_concurrency: usize,
#[serde(default)]
topic_distribution: Distribution,
#[serde(default)]
partition_distribution: Distribution,
}

impl Topics {
pub fn weight(&self) -> usize {
self.weight
}

pub fn partitions(&self) -> usize {
self.partitions
}

pub fn topics(&self) -> usize {
self.topics
}

pub fn topic_names(&self) -> &[String] {
&self.topic_names
}

pub fn topic_len(&self) -> usize {
self.topic_len
}

pub fn key_len(&self) -> usize {
self.key_len
}

pub fn message_len(&self) -> usize {
self.message_len
}
Expand All @@ -83,6 +102,10 @@ impl Topics {
pub fn topic_distribution(&self) -> Distribution {
self.topic_distribution
}

pub fn partition_distribution(&self) -> Distribution {
self.partition_distribution
}
}

#[derive(Clone, Copy, PartialEq, Eq, Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ fn main() {

let client_runtime = launch_clients(&config, client_receiver);

let mut pubsub_runtimes = launch_pubsub(&config, pubsub_receiver, workload_components);
let mut pubsub_runtimes = launch_pubsub(&config, pubsub_receiver, &workload_components);

// launch json log output
{
Expand Down
8 changes: 4 additions & 4 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ macro_rules! request {
});
paste! {
#[allow(dead_code)]
pub static [<$ident _COUNTER>]: &'static str = $name;
pub static [<$ident _COUNTER>]: &'static str = concat!($name, "/total");
}

paste! {
Expand All @@ -330,7 +330,7 @@ macro_rules! request {
});
paste! {
#[allow(dead_code)]
pub static [<$ident _EX_COUNTER>]: &'static str = $name;
pub static [<$ident _EX_COUNTER>]: &'static str = concat!($name, "/exception");
}
}

Expand All @@ -345,7 +345,7 @@ macro_rules! request {
});
paste! {
#[allow(dead_code)]
pub static [<$ident _OK_COUNTER>]: &'static str = $name;
pub static [<$ident _OK_COUNTER>]: &'static str = concat!($name, "/ok");
}
}

Expand All @@ -360,7 +360,7 @@ macro_rules! request {
});
paste! {
#[allow(dead_code)]
pub static [<$ident _TIMEOUT_COUNTER>]: &'static str = $name;
pub static [<$ident _TIMEOUT_COUNTER>]: &'static str = concat!($name, "/timeout");
}
}
}
Expand Down
Loading

0 comments on commit 64f8104

Please sign in to comment.