Skip to content

Commit

Permalink
Add Momento HTTP Cache API and S3 Storage API (iopsystems#264)
Browse files Browse the repository at this point in the history
Adds support to send cache requests to the Momento HTTP Cache API.

Adds support to send storage requests to AWS S3 REST API.
  • Loading branch information
brayniac authored Sep 24, 2024
1 parent 2145075 commit a77818b
Show file tree
Hide file tree
Showing 15 changed files with 1,445 additions and 113 deletions.
331 changes: 297 additions & 34 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ toml = "0.8.2"
warp = "0.3.6"
zipf = "7.0.1"
flate2 = "1.0.28"
url-escape = "0.1.1"
h2 = "0.4.6"
http = "1.1.0"
tokio-rustls = "0.26.0"
rustls = "0.23.13"
webpki-roots = "0.26.5"
sha1 = "0.10.6"
hmac = "0.12.1"
hyper-rustls = "0.27.3"
hyper-util = { version = "0.1.8", features = ["full"] }
hex = "0.4.3"

[features]
default = ["openssl"]
Expand Down
104 changes: 104 additions & 0 deletions configs/momento_http.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# An example configuration for benchmarking Momento (https://www.gomomento.com)
# and demonstrating the use of the preview functionality for collections. Each
# command family is using its own keyspace and covers key-value, hash, list,
# set, and sorted set.
#
# Expiration: unless otherwise specified, the default TTL of 15 minutes will be
# used. Commands which operate on collections will not refresh the TTL for the
# collection.

[general]
# specify the protocol to be used
protocol = "momento_http"
# the interval for stats integration and reporting
interval = 1
# the number of intervals to run the test for
duration = 30
# run the admin thread with a HTTP listener at the address provided, this allows
# stats exposition via HTTP
admin = "127.0.0.1:9090"
# optionally, set an initial seed for the PRNGs used to generate the workload.
# The default is to intialize from the OS entropy pool.
initial_seed = "0"

#[metrics]
# output file for detailed stats during the run
#output = "stats.json"
# format of the output file (possible values are json, msgpack, parquet)
#format = "json"
# optionally specify batch size for parquet row groups
# only valid for parquet output
#batch_size = 100_000
# optionally specify histogram type (can be standard (default) or sparse)
# only valid for parquet output
#histogram = "sparse"
# optionally, specify the sampling interval for metrics. Input is a string
# with the unit attached; for example "100ms" or "1s". Defaults to 1s.
#interval = "1s"

[debug]
# choose from: error, warn, info, debug, trace
log_level = "info"
# optionally, log to the file below instead of standard out
# log_file = "rpc-perf.log"
# backup file name for use with log rotation
log_backup = "rpc-perf.log.old"
# trigger log rotation when the file grows beyond this size (in bytes). Set this
# option to '0' to disable log rotation.
log_max_size = 1073741824

[target]
# specify the Momento HTTP Cache API Endpoint
endpoints = ["https://api.cache.cell-4-us-west-2-1.prod.a.momentohq.com"]
# specify the name of the target cache
cache_name = "test"

[client]
# number of threads used to drive client requests
threads = 4
# number of gRPC clients to initialize, each maintains at least one TCP stream
poolsize = 4
# an upper limit on the number of concurrent requests per gRPC client
concurrency = 128
# the connect timeout in milliseconds
connect_timeout = 10000
# set the timeout in milliseconds
request_timeout = 1000

[workload]
# the number of threads that will be used to generate the workload
threads = 1

[workload.ratelimit]
# set a global ratelimit for the workload
start = 50

# An example keyspace showcasing the use of the `key-value` family of commands.
#
# Note that we can constrain the number of keys in the keyspace and specify that
# the generated values are random bytes with 128B values.
[[workload.keyspace]]
# sets the relative weight of this keyspace: defaults to 1
weight = 1
# sets the length of the key, in bytes
klen = 32
# sets the number of keys that will be generated
nkeys = 100
# sets the value length, in bytes
vlen = 128
# optionally, specify an approximate compression ratio for the value payload.
# Defaults to 1.0 meaning the message is high-entropy and not compressible.
compression_ratio = 1.0
# use random bytes for the values
vkind = "bytes"
# override the default ttl for this keyspace setting it to 15 minutes
ttl = "15m"
# controls what commands will be used in this keyspace
commands = [
# get a value
{ verb = "get", weight = 80 },
# set a value
{ verb = "set", weight = 19 },
# delete a value
{ verb = "delete", weight = 1 },
]
80 changes: 80 additions & 0 deletions configs/s3.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# An example configuration for benchmarking AWS S3 over the HTTP REST API.

[general]
# specify the protocol to be used
protocol = "s3"
# the interval for stats integration and reporting
interval = 1
# the number of intervals to run the test for
duration = 30
# run the admin thread with a HTTP listener at the address provided, this allows
# stats exposition via HTTP
admin = "127.0.0.1:9090"
# optionally, set an initial seed for the PRNGs used to generate the workload.
# The default is to intialize from the OS entropy pool.
initial_seed = "0"

#[metrics]
# output file for detailed stats during the run
#output = "stats.json"
# format of the output file (possible values are json, msgpack, parquet)
#format = "json"
# optionally specify batch size for parquet row groups
# only valid for parquet output
#batch_size = 100_000
# optionally specify histogram type (can be standard (default) or sparse)
# only valid for parquet output
#histogram = "sparse"
# optionally, specify the sampling interval for metrics. Input is a string
# with the unit attached; for example "100ms" or "1s". Defaults to 1s.
#interval = "1s"

[debug]
# choose from: error, warn, info, debug, trace
log_level = "info"
# optionally, log to the file below instead of standard out
# log_file = "rpc-perf.log"
# backup file name for use with log rotation
log_backup = "rpc-perf.log.old"
# trigger log rotation when the file grows beyond this size (in bytes). Set this
# option to '0' to disable log rotation.
log_max_size = 1073741824

[target]
# we provide a vHost bucket URL as the target:
endpoints = ["https://[BUCKET_NAME].s3.[REGION].amazonaws.com"]

[storage]
# number of threads used to drive requests
threads = 4
# number of concurrent connections to have open to S3. Since the underlying
# protocol is HTTP/1.1, there is no mux/concurrency per connection.
poolsize = 1

# currently this timeout is ignored but is a required config field
request_timeout = 1000

[workload]
# the number of threads that will be used to generate the workload
threads = 1

[workload.ratelimit]
# set a global ratelimit for the workload
start = 1

[[workload.stores]]
# object name length, in bytes
klen = 32
# sets the number of objects that will be generated
nkeys = 100
# sets the object size, in bytes
vlen = 128
# controls what commands will be used in this keyspace
commands = [
# get a value
{ verb = "get", weight = 80 },
# set a value
{ verb = "put", weight = 19 },
# # delete a value
{ verb = "delete", weight = 1 },
]
45 changes: 45 additions & 0 deletions src/clients/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use http::uri::Authority;

use std::io::{Error, ErrorKind};

#[derive(Clone)]
pub struct Queue<T> {
tx: async_channel::Sender<T>,
rx: async_channel::Receiver<T>,
}

impl<T> Queue<T> {
pub fn new(size: usize) -> Self {
let (tx, rx) = async_channel::bounded::<T>(size);

Self { tx, rx }
}

pub async fn send(&self, item: T) -> std::result::Result<(), async_channel::SendError<T>> {
self.tx.send(item).await
}

pub async fn recv(&self) -> std::result::Result<T, async_channel::RecvError> {
self.rx.recv().await
}
}

pub async fn resolve(uri: &str) -> Result<(std::net::SocketAddr, Authority), std::io::Error> {
let uri = uri
.parse::<http::Uri>()
.map_err(|_| Error::new(ErrorKind::Other, "failed to parse uri"))?;

let auth = uri
.authority()
.ok_or(Error::new(ErrorKind::Other, "uri has no authority"))?
.clone();

let port = auth.port_u16().unwrap_or(443);

let addr = tokio::net::lookup_host((auth.host(), port))
.await?
.next()
.ok_or(Error::new(ErrorKind::Other, "dns found no addresses"))?;

Ok((addr, auth))
}
43 changes: 4 additions & 39 deletions src/clients/http2.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,12 @@
use super::*;
use crate::clients::common::*;
use crate::clients::*;
use crate::net::Connector;
use bytes::Bytes;
use http_body_util::Empty;
use hyper::client::conn::http2::SendRequest;
use hyper::header::{HeaderName, HeaderValue};
use hyper::rt::Executor;
use hyper::{Request, Uri};
use std::future::Future;

#[derive(Clone)]
struct Queue<T> {
tx: async_channel::Sender<T>,
rx: async_channel::Receiver<T>,
}

impl<T> Queue<T> {
pub fn new(size: usize) -> Self {
let (tx, rx) = async_channel::bounded::<T>(size);

Self { tx, rx }
}

pub async fn send(&self, item: T) -> std::result::Result<(), async_channel::SendError<T>> {
self.tx.send(item).await
}

pub async fn recv(&self) -> std::result::Result<T, async_channel::RecvError> {
self.rx.recv().await
}
}
use hyper_util::rt::TokioExecutor;

// launch a pool manager and worker tasks since HTTP/2.0 is mux'ed we prepare
// senders in the pool manager and pass them over a queue to our worker tasks
Expand Down Expand Up @@ -65,19 +43,6 @@ pub fn launch_tasks(
}
}

#[derive(Clone)]
struct TokioExecutor;

impl<F> Executor<F> for TokioExecutor
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
fn execute(&self, future: F) {
tokio::spawn(future);
}
}

async fn pool_manager(endpoint: String, config: Config, queue: Queue<SendRequest<Empty<Bytes>>>) {
let connector = Connector::new(&config).expect("failed to init connector");
let mut sender = None;
Expand Down Expand Up @@ -105,7 +70,7 @@ async fn pool_manager(endpoint: String, config: Config, queue: Queue<SendRequest
};

let (s, conn) =
match hyper::client::conn::http2::handshake(TokioExecutor {}, stream).await {
match hyper::client::conn::http2::handshake(TokioExecutor::new(), stream).await {
Ok((s, c)) => (s, c),
Err(_e) => {
CONNECT_EX.increment();
Expand Down
15 changes: 8 additions & 7 deletions src/clients/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ use workload::ClientWorkItemKind;
use std::io::{Error, ErrorKind, Result};
use std::time::Instant;

mod common;

mod http1;
mod http2;
pub mod http2;
mod memcache;
mod momento;
mod ping;
Expand Down Expand Up @@ -46,18 +48,17 @@ pub fn launch_clients(
Protocol::Momento => {
clients::momento::launch_tasks(&mut client_rt, config.clone(), work_receiver)
}
Protocol::MomentoHttp => {
clients::momento::http::launch_tasks(&mut client_rt, config.clone(), work_receiver)
}
Protocol::Ping => {
clients::ping::launch_tasks(&mut client_rt, config.clone(), work_receiver)
}
Protocol::Resp => {
clients::redis::launch_tasks(&mut client_rt, config.clone(), work_receiver)
}
Protocol::Kafka => {
error!("keyspace is not supported for the kafka protocol");
std::process::exit(1);
}
Protocol::Blabber => {
error!("keyspace is not supported for the blabber protocol");
other => {
error!("keyspace is not supported for the {:?} protocol", other);
std::process::exit(1);
}
}
Expand Down
Loading

0 comments on commit a77818b

Please sign in to comment.