Skip to content

Commit

Permalink
add blabber protocol support (iopsystems#136)
Browse files Browse the repository at this point in the history
Adds support for the `blabber` protocol in which the server will
periodically publish messages to subscribers.

This is implemented as a pubsub protocol.

Adds latency distribution information to the dataspec for pubsub.
  • Loading branch information
brayniac committed Dec 26, 2023
1 parent bbd23ff commit 4735c72
Show file tree
Hide file tree
Showing 14 changed files with 457 additions and 161 deletions.
319 changes: 175 additions & 144 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ hyper = { version = "1.0.0-rc.4", features = ["http1", "http2", "client"]}
metriken = "0.3.3"
mio = "0.8.8"
momento = "0.32.1"
net = { git = "https://github.com/pelikan-io/pelikan" }
pelikan-net = "0.1.0"
once_cell = "1.18.0"
paste = "1.0.14"
protocol-memcache = { git = "https://github.com/pelikan-io/pelikan" }
Expand Down
58 changes: 58 additions & 0 deletions configs/blabber.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# An example configuration for a Pelikan Blabber client which is unidirectional
# publishing of small messages to all clients.

[general]
# specify the protocol to be used
protocol = "blabber"
# the interval for stats integration and reporting
interval = 1
# the number of intervals to run the test for
duration = 300
# optionally, we can write some detailed stats to a file during the run
#json_output = "stats.json"
# run the admin thread with a HTTP listener at the address provided, this allows
# stats exposition via HTTP
admin = "127.0.0.1:9090"
# optionally, set an initial seed for the PRNGs used to generate the workload.
# The default is to intialize from the OS entropy pool.
#initial_seed = "0"

[debug]
# choose from: error, warn, info, debug, trace
log_level = "info"
# optionally, log to the file below instead of standard out
# log_file = "rpc-perf.log"
# backup file name for use with log rotation
log_backup = "rpc-perf.log.old"
# trigger log rotation when the file grows beyond this size (in bytes). Set this
# option to '0' to disable log rotation.
log_max_size = 1073741824

[target]
# specify one or more endpoints as IP:PORT pairs
endpoints = ["127.0.0.1:12321"]

[pubsub]
# the connect timeout in milliseconds
connect_timeout = 10000
publish_timeout = 1000
publisher_threads = 1
subscriber_threads = 6
publisher_poolsize = 1
publisher_concurrency = 20

[workload]
# the number of threads that will be used to generate requests
threads = 1
# the global ratelimit
ratelimit = 1

# An example set of topics using a low number of subscribers per topic.
[[workload.topics]]
topics = 1
topic_len = 1
message_len = 64
weight = 1
# the total number of clients that will subscribe to this set of topics
subscriber_poolsize = 100

8 changes: 5 additions & 3 deletions configs/momento_pubsub.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# An example configuration for benchmarking Momento (https://www.gomomento.com)
# and demonstrating the use of the preview functionality for collections. Each
# command family is using its own keyspace and covers key-value, hash, list,
# set, and sorted set.
# and demonstrating how to use RPC-Perf with Momento Topics (pubsub).

[general]
# specify the protocol to be used
Expand Down Expand Up @@ -55,6 +53,8 @@ ratelimit = 10
[[workload.topics]]
# the weight relative to other workload components
weight = 1
# limits the rate at which new subscribers are created (secondly rate)
# subscribe_ratelimt = 1
# the total number of Momento clients for subscribers to this set of topics
subscriber_poolsize = 1
# the total number of gRPC sessions per Momento client for this set of topics
Expand All @@ -70,6 +70,8 @@ message_len = 128
[[workload.topics]]
# the weight relative to other workload components
weight = 1
# limits the rate at which new subscribers are created (secondly rate)
# subscribe_ratelimt = 1
# the total number of Momento clients for subscribers to this set of topics
subscriber_poolsize = 1
# the total number of gRPC sessions per Momento client for this set of topics
Expand Down
4 changes: 4 additions & 0 deletions lib/dataspec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ pub struct ClientStats {
pub struct PubsubStats {
pub publishers: Publishers,
pub subscribers: Subscribers,
#[serde(skip_serializing_if = "Option::is_none")]
pub publish_latency: Option<SparseHistogram>,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_latency: Option<SparseHistogram>,
}

#[derive(Default, Deserialize, Serialize)]
Expand Down
4 changes: 4 additions & 0 deletions src/clients/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ pub fn launch_clients(config: &Config, work_receiver: Receiver<WorkItem>) -> Opt
error!("keyspace is not supported for the kafka protocol");
std::process::exit(1);
}
Protocol::Blabber => {
error!("keyspace is not supported for the blabber protocol");
std::process::exit(1);
}
}

Some(client_rt)
Expand Down
6 changes: 5 additions & 1 deletion src/clients/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ async fn task(work_receiver: Receiver<WorkItem>, endpoint: String, config: Confi
)
.await
{
Ok(Ok(s)) => Some(s),
Ok(Ok(s)) => {
CONNECT_OK.increment();
CONNECT_CURR.increment();
Some(s)
}
Ok(Err(_)) => {
CONNECT_EX.increment();
sleep(Duration::from_millis(100)).await;
Expand Down
9 changes: 1 addition & 8 deletions src/config/client.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
use super::*;

const PAGESIZE: usize = 4096;
const DEFAULT_BUFFER_SIZE: usize = 4 * PAGESIZE;

fn default_buffer_size() -> usize {
DEFAULT_BUFFER_SIZE
}

#[derive(Clone, Deserialize)]
pub struct Client {
/// The number of connections this process will have to each endpoint.
Expand All @@ -29,7 +22,7 @@ pub struct Client {
/// It is useful to increase the sizes if you expect to send and/or receive
/// large requests/responses as part of the workload.
///
/// The default is a single page (4KB) for each the read and write buffers.
/// The default is a 16KB for each the read and write buffers.
///
/// Not all client implementations allow setting these values, so this is a
/// best effort basis.
Expand Down
7 changes: 7 additions & 0 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ pub use target::Target;
pub use tls::Tls;
pub use workload::{Command, Distribution, Keyspace, Topics, ValueKind, Verb, Workload};

pub const PAGESIZE: usize = 4096;
pub const DEFAULT_BUFFER_SIZE: usize = 4 * PAGESIZE;

pub fn default_buffer_size() -> usize {
DEFAULT_BUFFER_SIZE
}

#[derive(Clone, Deserialize)]
pub struct Config {
general: General,
Expand Down
1 change: 1 addition & 0 deletions src/config/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use super::*;
#[derive(Clone, Copy, Deserialize, Debug)]
#[serde(rename_all = "snake_case")]
pub enum Protocol {
Blabber,
Http1,
Http2,
Memcache,
Expand Down
26 changes: 26 additions & 0 deletions src/config/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@ pub struct Pubsub {
publisher_poolsize: usize,
publisher_concurrency: usize,

/// Specify the default sizes for the read and write buffers (in bytes).
/// It is useful to increase the sizes if you expect to send and/or receive
/// large requests/responses as part of the workload.
///
/// The default is a 16KB for each the read and write buffers.
///
/// Not all client implementations allow setting these values, so this is a
/// best effort basis.
#[serde(default = "default_buffer_size")]
read_buffer_size: usize,
#[serde(default = "default_buffer_size")]
write_buffer_size: usize,

// kafka specific configs
kafka_acks: Option<String>,
kafka_linger_ms: Option<String>,
Expand Down Expand Up @@ -48,6 +61,19 @@ impl Pubsub {
self.publisher_concurrency
}

pub fn read_buffer_size(&self) -> usize {
// rounds the read buffer size up to the next nearest multiple of the
// pagesize
((std::cmp::max(1, self.read_buffer_size) + PAGESIZE - 1) / PAGESIZE) * PAGESIZE
}

#[allow(dead_code)]
pub fn write_buffer_size(&self) -> usize {
// rounds the write buffer size up to the next nearest multiple of the
// pagesize
((std::cmp::max(1, self.write_buffer_size) + PAGESIZE - 1) / PAGESIZE) * PAGESIZE
}

pub fn kafka_acks(&self) -> &Option<String> {
&self.kafka_acks
}
Expand Down
6 changes: 6 additions & 0 deletions src/output/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ pub fn json(config: Config, ratelimit: Option<&Ratelimiter>) {
subscribers: Subscribers {
current: PUBSUB_SUBSCRIBER_CURR.value(),
},
publish_latency: snapshot
.histogram_delta(PUBSUB_PUBLISH_LATENCY_HISTOGRAM)
.map_or_else(|| None, |h| Some(histogram::SparseHistogram::from(h))),
total_latency: snapshot
.histogram_delta(PUBSUB_LATENCY_HISTOGRAM)
.map_or_else(|| None, |h| Some(histogram::SparseHistogram::from(h))),
},
};

Expand Down
154 changes: 154 additions & 0 deletions src/pubsub/blabber.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use super::*;
use crate::net::Connector;
use bytes::Buf;
use bytes::BufMut;
use session::Buffer;
use std::borrow::Borrow;
use std::borrow::BorrowMut;
use tokio::io::AsyncReadExt;

use tokio::time::timeout;

// blabber has a header before the standard pubsub message
//
// ___________________________________
// | 0 .. | 4 .. | 8 .. length |
// | | | |
// | length | padding | pubsub message |
// |________|_________|________________|

const HEADER_LEN: u32 = 8;

/// Launch tasks with one conncetion per task as ping protocol is not mux-enabled.
pub fn launch_subscribers(
runtime: &mut Runtime,
config: Config,
workload_components: &[Component],
) {
debug!("launching blabber subscriber tasks");

for component in workload_components {
if let Component::Topics(topics) = component {
let connections = topics.subscriber_poolsize() * topics.subscriber_concurrency();

// create one task per "connection"
// note: these may be channels instead of connections for multiplexed protocols
for _ in 0..connections {
for endpoint in config.target().endpoints() {
runtime.spawn(subscriber_task(endpoint.clone(), config.clone()));
}
}
}
}
}

// a task for blabber servers (eg: Pelikan Blabber)
#[allow(clippy::slow_vector_initialization)]
async fn subscriber_task(endpoint: String, config: Config) -> Result<()> {
let validator = MessageValidator::new();

let connector = Connector::new(&config)?;

// this unwrap will succeed because we wouldn't be creating these tasks if
// there wasn't a client config.
let pubsub_config = config.pubsub().unwrap();

let mut stream = None;
let mut read_buffer = Buffer::new(pubsub_config.read_buffer_size());

while RUNNING.load(Ordering::Relaxed) {
if stream.is_none() {
CONNECT.increment();
PUBSUB_SUBSCRIBE.increment();

stream = match timeout(
pubsub_config.connect_timeout(),
connector.connect(&endpoint),
)
.await
{
Ok(Ok(s)) => {
CONNECT_OK.increment();
CONNECT_CURR.increment();
PUBSUB_SUBSCRIBER_CURR.add(1);
PUBSUB_SUBSCRIBE_OK.increment();

Some(s)
}
Ok(Err(_)) => {
CONNECT_EX.increment();
PUBSUB_SUBSCRIBE_EX.increment();

sleep(Duration::from_millis(100)).await;
continue;
}
Err(_) => {
CONNECT_TIMEOUT.increment();
PUBSUB_SUBSCRIBE_EX.increment();

sleep(Duration::from_millis(100)).await;
continue;
}
}
}

let mut s = stream.take().unwrap();

// read until response or timeout
loop {
match s.read(read_buffer.borrow_mut()).await {
Ok(n) => {
unsafe {
read_buffer.advance_mut(n);
}
{
loop {
let consumed = {
let rbuf: &[u8] = read_buffer.borrow();

if rbuf.len() >= HEADER_LEN as usize {
let len =
u32::from_be_bytes(rbuf[0..4].try_into().unwrap()) as usize;

// check if we have only a partial message
if rbuf.len() < len {
break;
}

let mut mbuf = rbuf[8..len].to_owned();

let _ = validator.validate(&mut mbuf);

// return the number of bytes consumed
len
} else {
break;
}
};

read_buffer.advance(consumed);
}
}
}
Err(_) => {
PUBSUB_RECEIVE.increment();
PUBSUB_RECEIVE_EX.increment();
PUBSUB_SUBSCRIBER_CURR.sub(1);
}
}
}
}

Ok(())
}

/// Launch tasks with one channel per task as gRPC is mux-enabled.
pub fn launch_publishers(
_runtime: &mut Runtime,
_config: Config,
_work_receiver: Receiver<WorkItem>,
) {
// note: there are no publish tasks for blabber, instead the server is
// expected to publish compatible messages to the subscribers
debug!("skipping blabber publisher tasks");
}
Loading

0 comments on commit 4735c72

Please sign in to comment.