diff --git a/src/clients/memcache/commands/add.rs b/src/clients/cache/memcache/commands/add.rs similarity index 100% rename from src/clients/memcache/commands/add.rs rename to src/clients/cache/memcache/commands/add.rs diff --git a/src/clients/memcache/commands/delete.rs b/src/clients/cache/memcache/commands/delete.rs similarity index 100% rename from src/clients/memcache/commands/delete.rs rename to src/clients/cache/memcache/commands/delete.rs diff --git a/src/clients/memcache/commands/get.rs b/src/clients/cache/memcache/commands/get.rs similarity index 100% rename from src/clients/memcache/commands/get.rs rename to src/clients/cache/memcache/commands/get.rs diff --git a/src/clients/memcache/commands/mod.rs b/src/clients/cache/memcache/commands/mod.rs similarity index 100% rename from src/clients/memcache/commands/mod.rs rename to src/clients/cache/memcache/commands/mod.rs diff --git a/src/clients/memcache/commands/replace.rs b/src/clients/cache/memcache/commands/replace.rs similarity index 100% rename from src/clients/memcache/commands/replace.rs rename to src/clients/cache/memcache/commands/replace.rs diff --git a/src/clients/memcache/commands/set.rs b/src/clients/cache/memcache/commands/set.rs similarity index 100% rename from src/clients/memcache/commands/set.rs rename to src/clients/cache/memcache/commands/set.rs diff --git a/src/clients/memcache/mod.rs b/src/clients/cache/memcache/mod.rs similarity index 99% rename from src/clients/memcache/mod.rs rename to src/clients/cache/memcache/mod.rs index c6698093..a0876ee2 100644 --- a/src/clients/memcache/mod.rs +++ b/src/clients/cache/memcache/mod.rs @@ -1,3 +1,4 @@ +use crate::clients::ResponseError; use super::*; use crate::net::Connector; use protocol_memcache::{Compose, Parse, Request, Response, Ttl}; diff --git a/src/clients/cache/mod.rs b/src/clients/cache/mod.rs new file mode 100644 index 00000000..0742bf1e --- /dev/null +++ b/src/clients/cache/mod.rs @@ -0,0 +1,52 @@ +use crate::workload::ClientRequest; +use crate::*; + +use async_channel::Receiver; +use tokio::io::*; +use tokio::runtime::Runtime; +use tokio::time::{timeout, Duration}; +use workload::ClientWorkItemKind; + +use std::io::{Error, ErrorKind, Result}; +use std::time::Instant; + +mod memcache; +mod momento; +mod redis; + +pub fn launch( + config: &Config, + work_receiver: Receiver>, +) -> Option { + debug!("Launching clients..."); + + config.client()?; + + // spawn the request drivers on their own runtime + let mut client_rt = Builder::new_multi_thread() + .enable_all() + .worker_threads(config.client().unwrap().threads()) + .build() + .expect("failed to initialize tokio runtime"); + + match config.general().protocol() { + Protocol::Memcache => { + memcache::launch_tasks(&mut client_rt, config.clone(), work_receiver) + } + Protocol::Momento => { + momento::launch_tasks(&mut client_rt, config.clone(), work_receiver) + } + Protocol::Ping => { + crate::clients::ping::ascii::launch_tasks(&mut client_rt, config.clone(), work_receiver) + } + Protocol::Resp => { + redis::launch_tasks(&mut client_rt, config.clone(), work_receiver) + } + protocol => { + error!("keyspace is not supported for the {:?} protocol", protocol); + std::process::exit(1); + } + } + + Some(client_rt) +} diff --git a/src/clients/momento/commands/delete.rs b/src/clients/cache/momento/commands/delete.rs similarity index 100% rename from src/clients/momento/commands/delete.rs rename to src/clients/cache/momento/commands/delete.rs diff --git a/src/clients/momento/commands/get.rs b/src/clients/cache/momento/commands/get.rs similarity index 100% rename from src/clients/momento/commands/get.rs rename to src/clients/cache/momento/commands/get.rs diff --git a/src/clients/momento/commands/hash_delete.rs b/src/clients/cache/momento/commands/hash_delete.rs similarity index 100% rename from src/clients/momento/commands/hash_delete.rs rename to src/clients/cache/momento/commands/hash_delete.rs diff --git a/src/clients/momento/commands/hash_get.rs b/src/clients/cache/momento/commands/hash_get.rs similarity index 100% rename from src/clients/momento/commands/hash_get.rs rename to src/clients/cache/momento/commands/hash_get.rs diff --git a/src/clients/momento/commands/hash_get_all.rs b/src/clients/cache/momento/commands/hash_get_all.rs similarity index 100% rename from src/clients/momento/commands/hash_get_all.rs rename to src/clients/cache/momento/commands/hash_get_all.rs diff --git a/src/clients/momento/commands/hash_increment.rs b/src/clients/cache/momento/commands/hash_increment.rs similarity index 100% rename from src/clients/momento/commands/hash_increment.rs rename to src/clients/cache/momento/commands/hash_increment.rs diff --git a/src/clients/momento/commands/hash_set.rs b/src/clients/cache/momento/commands/hash_set.rs similarity index 100% rename from src/clients/momento/commands/hash_set.rs rename to src/clients/cache/momento/commands/hash_set.rs diff --git a/src/clients/momento/commands/list_fetch.rs b/src/clients/cache/momento/commands/list_fetch.rs similarity index 100% rename from src/clients/momento/commands/list_fetch.rs rename to src/clients/cache/momento/commands/list_fetch.rs diff --git a/src/clients/momento/commands/list_length.rs b/src/clients/cache/momento/commands/list_length.rs similarity index 100% rename from src/clients/momento/commands/list_length.rs rename to src/clients/cache/momento/commands/list_length.rs diff --git a/src/clients/momento/commands/list_pop_back.rs b/src/clients/cache/momento/commands/list_pop_back.rs similarity index 100% rename from src/clients/momento/commands/list_pop_back.rs rename to src/clients/cache/momento/commands/list_pop_back.rs diff --git a/src/clients/momento/commands/list_pop_front.rs b/src/clients/cache/momento/commands/list_pop_front.rs similarity index 100% rename from src/clients/momento/commands/list_pop_front.rs rename to src/clients/cache/momento/commands/list_pop_front.rs diff --git a/src/clients/momento/commands/list_push_back.rs b/src/clients/cache/momento/commands/list_push_back.rs similarity index 100% rename from src/clients/momento/commands/list_push_back.rs rename to src/clients/cache/momento/commands/list_push_back.rs diff --git a/src/clients/momento/commands/list_push_front.rs b/src/clients/cache/momento/commands/list_push_front.rs similarity index 100% rename from src/clients/momento/commands/list_push_front.rs rename to src/clients/cache/momento/commands/list_push_front.rs diff --git a/src/clients/momento/commands/list_remove.rs b/src/clients/cache/momento/commands/list_remove.rs similarity index 100% rename from src/clients/momento/commands/list_remove.rs rename to src/clients/cache/momento/commands/list_remove.rs diff --git a/src/clients/momento/commands/mod.rs b/src/clients/cache/momento/commands/mod.rs similarity index 100% rename from src/clients/momento/commands/mod.rs rename to src/clients/cache/momento/commands/mod.rs diff --git a/src/clients/momento/commands/set.rs b/src/clients/cache/momento/commands/set.rs similarity index 100% rename from src/clients/momento/commands/set.rs rename to src/clients/cache/momento/commands/set.rs diff --git a/src/clients/momento/commands/set_add.rs b/src/clients/cache/momento/commands/set_add.rs similarity index 100% rename from src/clients/momento/commands/set_add.rs rename to src/clients/cache/momento/commands/set_add.rs diff --git a/src/clients/momento/commands/set_members.rs b/src/clients/cache/momento/commands/set_members.rs similarity index 100% rename from src/clients/momento/commands/set_members.rs rename to src/clients/cache/momento/commands/set_members.rs diff --git a/src/clients/momento/commands/set_remove.rs b/src/clients/cache/momento/commands/set_remove.rs similarity index 100% rename from src/clients/momento/commands/set_remove.rs rename to src/clients/cache/momento/commands/set_remove.rs diff --git a/src/clients/momento/commands/sorted_set_add.rs b/src/clients/cache/momento/commands/sorted_set_add.rs similarity index 100% rename from src/clients/momento/commands/sorted_set_add.rs rename to src/clients/cache/momento/commands/sorted_set_add.rs diff --git a/src/clients/momento/commands/sorted_set_increment.rs b/src/clients/cache/momento/commands/sorted_set_increment.rs similarity index 100% rename from src/clients/momento/commands/sorted_set_increment.rs rename to src/clients/cache/momento/commands/sorted_set_increment.rs diff --git a/src/clients/momento/commands/sorted_set_range.rs b/src/clients/cache/momento/commands/sorted_set_range.rs similarity index 100% rename from src/clients/momento/commands/sorted_set_range.rs rename to src/clients/cache/momento/commands/sorted_set_range.rs diff --git a/src/clients/momento/commands/sorted_set_rank.rs b/src/clients/cache/momento/commands/sorted_set_rank.rs similarity index 100% rename from src/clients/momento/commands/sorted_set_rank.rs rename to src/clients/cache/momento/commands/sorted_set_rank.rs diff --git a/src/clients/momento/commands/sorted_set_remove.rs b/src/clients/cache/momento/commands/sorted_set_remove.rs similarity index 100% rename from src/clients/momento/commands/sorted_set_remove.rs rename to src/clients/cache/momento/commands/sorted_set_remove.rs diff --git a/src/clients/momento/commands/sorted_set_score.rs b/src/clients/cache/momento/commands/sorted_set_score.rs similarity index 100% rename from src/clients/momento/commands/sorted_set_score.rs rename to src/clients/cache/momento/commands/sorted_set_score.rs diff --git a/src/clients/momento/mod.rs b/src/clients/cache/momento/mod.rs similarity index 99% rename from src/clients/momento/mod.rs rename to src/clients/cache/momento/mod.rs index 963dcdb9..8a29ba36 100644 --- a/src/clients/momento/mod.rs +++ b/src/clients/cache/momento/mod.rs @@ -1,4 +1,5 @@ use super::*; +use crate::clients::*; use ::momento::cache::configurations::LowLatency; use ::momento::*; diff --git a/src/clients/redis/commands/add.rs b/src/clients/cache/redis/commands/add.rs similarity index 100% rename from src/clients/redis/commands/add.rs rename to src/clients/cache/redis/commands/add.rs diff --git a/src/clients/redis/commands/delete.rs b/src/clients/cache/redis/commands/delete.rs similarity index 100% rename from src/clients/redis/commands/delete.rs rename to src/clients/cache/redis/commands/delete.rs diff --git a/src/clients/redis/commands/get.rs b/src/clients/cache/redis/commands/get.rs similarity index 100% rename from src/clients/redis/commands/get.rs rename to src/clients/cache/redis/commands/get.rs diff --git a/src/clients/redis/commands/hash_delete.rs b/src/clients/cache/redis/commands/hash_delete.rs similarity index 100% rename from src/clients/redis/commands/hash_delete.rs rename to src/clients/cache/redis/commands/hash_delete.rs diff --git a/src/clients/redis/commands/hash_exists.rs b/src/clients/cache/redis/commands/hash_exists.rs similarity index 100% rename from src/clients/redis/commands/hash_exists.rs rename to src/clients/cache/redis/commands/hash_exists.rs diff --git a/src/clients/redis/commands/hash_get.rs b/src/clients/cache/redis/commands/hash_get.rs similarity index 100% rename from src/clients/redis/commands/hash_get.rs rename to src/clients/cache/redis/commands/hash_get.rs diff --git a/src/clients/redis/commands/hash_get_all.rs b/src/clients/cache/redis/commands/hash_get_all.rs similarity index 100% rename from src/clients/redis/commands/hash_get_all.rs rename to src/clients/cache/redis/commands/hash_get_all.rs diff --git a/src/clients/redis/commands/hash_increment.rs b/src/clients/cache/redis/commands/hash_increment.rs similarity index 100% rename from src/clients/redis/commands/hash_increment.rs rename to src/clients/cache/redis/commands/hash_increment.rs diff --git a/src/clients/redis/commands/hash_set.rs b/src/clients/cache/redis/commands/hash_set.rs similarity index 100% rename from src/clients/redis/commands/hash_set.rs rename to src/clients/cache/redis/commands/hash_set.rs diff --git a/src/clients/redis/commands/list_fetch.rs b/src/clients/cache/redis/commands/list_fetch.rs similarity index 100% rename from src/clients/redis/commands/list_fetch.rs rename to src/clients/cache/redis/commands/list_fetch.rs diff --git a/src/clients/redis/commands/list_length.rs b/src/clients/cache/redis/commands/list_length.rs similarity index 100% rename from src/clients/redis/commands/list_length.rs rename to src/clients/cache/redis/commands/list_length.rs diff --git a/src/clients/redis/commands/list_pop_back.rs b/src/clients/cache/redis/commands/list_pop_back.rs similarity index 100% rename from src/clients/redis/commands/list_pop_back.rs rename to src/clients/cache/redis/commands/list_pop_back.rs diff --git a/src/clients/redis/commands/list_pop_front.rs b/src/clients/cache/redis/commands/list_pop_front.rs similarity index 100% rename from src/clients/redis/commands/list_pop_front.rs rename to src/clients/cache/redis/commands/list_pop_front.rs diff --git a/src/clients/redis/commands/list_push_back.rs b/src/clients/cache/redis/commands/list_push_back.rs similarity index 100% rename from src/clients/redis/commands/list_push_back.rs rename to src/clients/cache/redis/commands/list_push_back.rs diff --git a/src/clients/redis/commands/list_push_front.rs b/src/clients/cache/redis/commands/list_push_front.rs similarity index 100% rename from src/clients/redis/commands/list_push_front.rs rename to src/clients/cache/redis/commands/list_push_front.rs diff --git a/src/clients/redis/commands/mod.rs b/src/clients/cache/redis/commands/mod.rs similarity index 100% rename from src/clients/redis/commands/mod.rs rename to src/clients/cache/redis/commands/mod.rs diff --git a/src/clients/redis/commands/ping.rs b/src/clients/cache/redis/commands/ping.rs similarity index 100% rename from src/clients/redis/commands/ping.rs rename to src/clients/cache/redis/commands/ping.rs diff --git a/src/clients/redis/commands/replace.rs b/src/clients/cache/redis/commands/replace.rs similarity index 100% rename from src/clients/redis/commands/replace.rs rename to src/clients/cache/redis/commands/replace.rs diff --git a/src/clients/redis/commands/set.rs b/src/clients/cache/redis/commands/set.rs similarity index 100% rename from src/clients/redis/commands/set.rs rename to src/clients/cache/redis/commands/set.rs diff --git a/src/clients/redis/commands/set_add.rs b/src/clients/cache/redis/commands/set_add.rs similarity index 100% rename from src/clients/redis/commands/set_add.rs rename to src/clients/cache/redis/commands/set_add.rs diff --git a/src/clients/redis/commands/set_members.rs b/src/clients/cache/redis/commands/set_members.rs similarity index 100% rename from src/clients/redis/commands/set_members.rs rename to src/clients/cache/redis/commands/set_members.rs diff --git a/src/clients/redis/commands/set_remove.rs b/src/clients/cache/redis/commands/set_remove.rs similarity index 100% rename from src/clients/redis/commands/set_remove.rs rename to src/clients/cache/redis/commands/set_remove.rs diff --git a/src/clients/redis/commands/sorted_set_add.rs b/src/clients/cache/redis/commands/sorted_set_add.rs similarity index 100% rename from src/clients/redis/commands/sorted_set_add.rs rename to src/clients/cache/redis/commands/sorted_set_add.rs diff --git a/src/clients/redis/commands/sorted_set_increment.rs b/src/clients/cache/redis/commands/sorted_set_increment.rs similarity index 100% rename from src/clients/redis/commands/sorted_set_increment.rs rename to src/clients/cache/redis/commands/sorted_set_increment.rs diff --git a/src/clients/redis/commands/sorted_set_range.rs b/src/clients/cache/redis/commands/sorted_set_range.rs similarity index 100% rename from src/clients/redis/commands/sorted_set_range.rs rename to src/clients/cache/redis/commands/sorted_set_range.rs diff --git a/src/clients/redis/commands/sorted_set_rank.rs b/src/clients/cache/redis/commands/sorted_set_rank.rs similarity index 100% rename from src/clients/redis/commands/sorted_set_rank.rs rename to src/clients/cache/redis/commands/sorted_set_rank.rs diff --git a/src/clients/redis/commands/sorted_set_remove.rs b/src/clients/cache/redis/commands/sorted_set_remove.rs similarity index 100% rename from src/clients/redis/commands/sorted_set_remove.rs rename to src/clients/cache/redis/commands/sorted_set_remove.rs diff --git a/src/clients/redis/commands/sorted_set_score.rs b/src/clients/cache/redis/commands/sorted_set_score.rs similarity index 100% rename from src/clients/redis/commands/sorted_set_score.rs rename to src/clients/cache/redis/commands/sorted_set_score.rs diff --git a/src/clients/redis/mod.rs b/src/clients/cache/redis/mod.rs similarity index 99% rename from src/clients/redis/mod.rs rename to src/clients/cache/redis/mod.rs index a129a2bf..2a325d57 100644 --- a/src/clients/redis/mod.rs +++ b/src/clients/cache/redis/mod.rs @@ -1,4 +1,5 @@ -use super::*; +use crate::clients::cache::*; +use crate::clients::*; use crate::net::Connector; use ::redis::aio::Connection; use ::redis::{AsyncCommands, RedisConnectionInfo}; diff --git a/src/clients/http1.rs b/src/clients/http1.rs deleted file mode 100644 index 0842459d..00000000 --- a/src/clients/http1.rs +++ /dev/null @@ -1,237 +0,0 @@ -use super::*; -use crate::net::Connector; -use bytes::Bytes; -use http_body_util::Empty; -use hyper::header::{HeaderName, HeaderValue}; -use hyper::{Request, Uri}; - -/// Launch tasks with one conncetion per task as http/1.1 is not mux'd -pub fn launch_tasks( - runtime: &mut Runtime, - config: Config, - work_receiver: Receiver>, -) { - debug!("launching http1 protocol tasks"); - - if config.client().unwrap().concurrency() > 1 { - error!("HTTP/1.1 does not support multiplexing sessions onto single streams. Ignoring the concurrency parameter."); - } - - for _ in 0..config.client().unwrap().poolsize() { - for endpoint in config.target().endpoints() { - runtime.spawn(task( - work_receiver.clone(), - endpoint.clone(), - config.clone(), - )); - } - } -} - -// a task for http/1.1 -#[allow(clippy::slow_vector_initialization)] -async fn task( - work_receiver: Receiver>, - endpoint: String, - config: Config, -) -> Result<()> { - let connector = Connector::new(&config)?; - let mut session = None; - let mut session_requests = 0; - let mut session_start = Instant::now(); - - while RUNNING.load(Ordering::Relaxed) { - if session.is_none() { - if session_requests != 0 { - let stop = Instant::now(); - let lifecycle_ns = (stop - session_start).as_nanos() as u64; - let _ = SESSION_LIFECYCLE_REQUESTS.increment(lifecycle_ns); - } - CONNECT.increment(); - let stream = match timeout( - config.client().unwrap().connect_timeout(), - connector.connect(&endpoint), - ) - .await - { - Ok(Ok(s)) => s, - Ok(Err(_)) => { - CONNECT_EX.increment(); - sleep(Duration::from_millis(100)).await; - continue; - } - Err(_) => { - CONNECT_TIMEOUT.increment(); - sleep(Duration::from_millis(100)).await; - continue; - } - }; - - let (s, conn) = match hyper::client::conn::http1::handshake(stream).await { - Ok((s, c)) => { - CONNECT_OK.increment(); - (s, c) - } - Err(_e) => { - CONNECT_EX.increment(); - sleep(Duration::from_millis(100)).await; - continue; - } - }; - - session_start = Instant::now(); - session_requests = 0; - CONNECT_CURR.increment(); - SESSION.increment(); - - session = Some(s); - - tokio::task::spawn(async move { - if let Err(err) = conn.await { - println!("Connection failed: {:?}", err); - } - }); - } - - let mut s = session.take().unwrap(); - - let work_item = work_receiver - .recv() - .await - .map_err(|_| Error::new(ErrorKind::Other, "channel closed"))?; - - REQUEST.increment(); - - // compose request into buffer - let request = match &work_item { - ClientWorkItemKind::Request { request, sequence } => match request { - ClientRequest::Get(r) => { - let key = unsafe { std::str::from_utf8_unchecked(&r.key) }; - let url: Uri = if config.tls().is_none() { - format!("http://{endpoint}/{key}").parse().unwrap() - } else { - format!("https://{endpoint}/{key}").parse().unwrap() - }; - let authority = url.authority().unwrap().clone(); - - Request::builder() - .uri(url) - .header(hyper::header::HOST, authority.as_str()) - .header( - hyper::header::USER_AGENT, - &format!("rpc-perf/5.0.0-alpha (request; seq:{sequence})"), - ) - .body(Empty::::new()) - .expect("failed to build request") - } - _ => { - // skip any requests that aren't supported and preserve the - // session for reuse - REQUEST_UNSUPPORTED.increment(); - session = Some(s); - continue; - } - }, - ClientWorkItemKind::Reconnect => { - // we want to reconnect, update stats and implicitly drop the - // session - SESSION_CLOSED_CLIENT.increment(); - REQUEST_RECONNECT.increment(); - CONNECT_CURR.increment(); - continue; - } - }; - - REQUEST_OK.increment(); - - // send request - let start = Instant::now(); - let response = timeout( - config.client().unwrap().request_timeout(), - s.send_request(request), - ) - .await; - let stop = Instant::now(); - - match response { - Ok(Ok(response)) => { - // validate response - match &work_item { - ClientWorkItemKind::Request { request, .. } => match request { - ClientRequest::Get { .. } => { - GET_OK.increment(); - } - _ => { - error!("unexpected request"); - unimplemented!(); - } - }, - _ => { - error!("unexpected work item"); - unimplemented!(); - } - } - - RESPONSE_OK.increment(); - - let latency = stop.duration_since(start).as_nanos() as u64; - - let _ = RESPONSE_LATENCY.increment(latency); - - if let Some(header) = response - .headers() - .get(HeaderName::from_bytes(b"Connection").unwrap()) - { - if header == HeaderValue::from_static("close") { - SESSION_CLOSED_SERVER.increment(); - } - } - - session_requests += 1; - - // if we get an error when checking if the session is ready for - // another request, we update the connection gauge and allow the - // session to be dropped - if let Err(_e) = s.ready().await { - CONNECT_CURR.decrement(); - } else { - // preserve the session for reuse - session = Some(s); - } - } - Ok(Err(_e)) => { - // an actual error was returned, do the necessary bookkeeping - // and allow the session to be dropped - RESPONSE_EX.increment(); - - // record execption - match work_item { - ClientWorkItemKind::Request { request, .. } => match request { - ClientRequest::Get { .. } => { - GET_EX.increment(); - } - _ => { - error!("unexpected request"); - unimplemented!(); - } - }, - _ => { - error!("unexpected work item"); - unimplemented!(); - } - } - SESSION_CLOSED_CLIENT.increment(); - CONNECT_CURR.decrement(); - } - Err(_) => { - // increment timeout related stats and allow the session to be - // dropped - RESPONSE_TIMEOUT.increment(); - SESSION_CLOSED_CLIENT.increment(); - CONNECT_CURR.decrement(); - } - } - } - - Ok(()) -} diff --git a/src/clients/http2.rs b/src/clients/http2.rs deleted file mode 100644 index 397ad912..00000000 --- a/src/clients/http2.rs +++ /dev/null @@ -1,286 +0,0 @@ -use super::*; -use crate::net::Connector; -use bytes::Bytes; -use http_body_util::Empty; -use hyper::client::conn::http2::SendRequest; -use hyper::header::{HeaderName, HeaderValue}; -use hyper::rt::Executor; -use hyper::{Request, Uri}; -use std::future::Future; - -#[derive(Clone)] -struct Queue { - tx: async_channel::Sender, - rx: async_channel::Receiver, -} - -impl Queue { - pub fn new(size: usize) -> Self { - let (tx, rx) = async_channel::bounded::(size); - - Self { tx, rx } - } - - pub async fn send(&self, item: T) -> std::result::Result<(), async_channel::SendError> { - self.tx.send(item).await - } - - pub async fn recv(&self) -> std::result::Result { - self.rx.recv().await - } -} - -// launch a pool manager and worker tasks since HTTP/2.0 is mux'ed we prepare -// senders in the pool manager and pass them over a queue to our worker tasks -pub fn launch_tasks( - runtime: &mut Runtime, - config: Config, - work_receiver: Receiver>, -) { - debug!("launching http2 protocol tasks"); - - for _ in 0..config.client().unwrap().poolsize() { - for endpoint in config.target().endpoints() { - // for each endpoint have poolsize # of pool_managers, each managing - // a single TCP stream - - let queue = Queue::new(1); - runtime.spawn(pool_manager( - endpoint.clone(), - config.clone(), - queue.clone(), - )); - - // since HTTP/2.0 allows muxing several sessions onto a single TCP - // stream, we launch one task for each session on this TCP stream - for _ in 0..config.client().unwrap().concurrency() { - runtime.spawn(task( - work_receiver.clone(), - endpoint.clone(), - config.clone(), - queue.clone(), - )); - } - } - } -} - -#[derive(Clone)] -struct TokioExecutor; - -impl Executor for TokioExecutor -where - F: Future + Send + 'static, - F::Output: Send + 'static, -{ - fn execute(&self, future: F) { - tokio::spawn(future); - } -} - -async fn pool_manager(endpoint: String, config: Config, queue: Queue>>) { - let connector = Connector::new(&config).expect("failed to init connector"); - let mut sender = None; - - while RUNNING.load(Ordering::Relaxed) { - if sender.is_none() { - CONNECT.increment(); - let stream = match timeout( - config.client().unwrap().connect_timeout(), - connector.connect(&endpoint), - ) - .await - { - Ok(Ok(s)) => s, - Ok(Err(_)) => { - CONNECT_EX.increment(); - sleep(Duration::from_millis(100)).await; - continue; - } - Err(_) => { - CONNECT_TIMEOUT.increment(); - sleep(Duration::from_millis(100)).await; - continue; - } - }; - - let (s, conn) = - match hyper::client::conn::http2::handshake(TokioExecutor {}, stream).await { - Ok((s, c)) => (s, c), - Err(_e) => { - CONNECT_EX.increment(); - sleep(Duration::from_millis(100)).await; - continue; - } - }; - - SESSION.increment(); - - sender = Some(s); - - tokio::task::spawn(async move { - if let Err(err) = conn.await { - println!("Connection failed: {:?}", err); - } - }); - } - - let mut s = sender.take().unwrap(); - - if let Err(_e) = s.ready().await { - continue; - } - - if queue.send(s.clone()).await.is_err() { - return; - } - - sender = Some(s); - } -} - -// a task for http/2.0 -#[allow(clippy::slow_vector_initialization)] -async fn task( - work_receiver: Receiver>, - endpoint: String, - config: Config, - queue: Queue>>, -) -> Result<()> { - // let connector = Connector::new(&config)?; - let mut sender = None; - - while RUNNING.load(Ordering::Relaxed) { - if sender.is_none() { - let s = queue - .recv() - .await - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?; - sender = Some(s); - } - - let mut s = sender.take().unwrap(); - - let work_item = work_receiver - .recv() - .await - .map_err(|_| Error::new(ErrorKind::Other, "channel closed"))?; - - REQUEST.increment(); - - // compose request into buffer - let request = match &work_item { - ClientWorkItemKind::Request { request, sequence } => match request { - ClientRequest::Get(r) => { - let key = unsafe { std::str::from_utf8_unchecked(&r.key) }; - let url: Uri = if config.tls().is_none() { - format!("http://{endpoint}/{key}").parse().unwrap() - } else { - format!("https://{endpoint}/{key}").parse().unwrap() - }; - let authority = url.authority().unwrap().clone(); - - Request::builder() - .uri(url) - .header(hyper::header::HOST, authority.as_str()) - .header( - hyper::header::USER_AGENT, - &format!("rpc-perf/5.0.0-alpha (request; seq:{sequence})"), - ) - .body(Empty::::new()) - .expect("failed to build request") - } - _ => { - REQUEST_UNSUPPORTED.increment(); - sender = Some(s); - continue; - } - }, - ClientWorkItemKind::Reconnect => { - SESSION_CLOSED_CLIENT.increment(); - REQUEST_RECONNECT.increment(); - continue; - } - }; - - REQUEST_OK.increment(); - - // send request - let start = Instant::now(); - let response = timeout( - config.client().unwrap().request_timeout(), - s.send_request(request), - ) - .await; - let stop = Instant::now(); - - match response { - Ok(Ok(response)) => { - // validate response - match work_item { - ClientWorkItemKind::Request { request, .. } => match request { - ClientRequest::Get { .. } => { - GET_OK.increment(); - } - _ => { - error!("unexpected request"); - unimplemented!(); - } - }, - _ => { - error!("unexpected work item"); - unimplemented!(); - } - } - - RESPONSE_OK.increment(); - - let latency = stop.duration_since(start).as_nanos() as u64; - - let _ = RESPONSE_LATENCY.increment(latency); - - if let Some(header) = response - .headers() - .get(HeaderName::from_bytes(b"connection").unwrap()) - { - if header == HeaderValue::from_static("close") { - SESSION_CLOSED_SERVER.increment(); - } - } - } - Ok(Err(_e)) => { - // record execption - match work_item { - ClientWorkItemKind::Request { request, .. } => match request { - ClientRequest::Get { .. } => { - GET_EX.increment(); - } - _ => { - error!("unexpected request"); - unimplemented!(); - } - }, - _ => { - error!("unexpected work item"); - unimplemented!(); - } - } - SESSION_CLOSED_CLIENT.increment(); - continue; - } - Err(_) => { - RESPONSE_TIMEOUT.increment(); - SESSION_CLOSED_CLIENT.increment(); - continue; - } - } - - if let Err(_e) = s.ready().await { - continue; - } - - sender = Some(s); - } - - Ok(()) -} diff --git a/src/clients/mod.rs b/src/clients/mod.rs index e16d9d2d..bdd29ed7 100644 --- a/src/clients/mod.rs +++ b/src/clients/mod.rs @@ -1,69 +1,10 @@ -use crate::workload::ClientRequest; -use crate::*; +use momento::MomentoError; +use momento::MomentoErrorCode; -use ::momento::{MomentoError, MomentoErrorCode}; -use async_channel::Receiver; -use tokio::io::*; -use tokio::runtime::Runtime; -use tokio::time::{timeout, Duration}; -use workload::ClientWorkItemKind; - -use std::io::{Error, ErrorKind, Result}; -use std::time::Instant; - -mod http1; -mod http2; -mod memcache; -mod momento; -mod ping; -mod redis; - -pub fn launch_clients( - config: &Config, - work_receiver: Receiver>, -) -> Option { - debug!("Launching clients..."); - - config.client()?; - - // spawn the request drivers on their own runtime - let mut client_rt = Builder::new_multi_thread() - .enable_all() - .worker_threads(config.client().unwrap().threads()) - .build() - .expect("failed to initialize tokio runtime"); - - match config.general().protocol() { - Protocol::Http1 => { - clients::http1::launch_tasks(&mut client_rt, config.clone(), work_receiver) - } - Protocol::Http2 => { - clients::http2::launch_tasks(&mut client_rt, config.clone(), work_receiver) - } - Protocol::Memcache => { - clients::memcache::launch_tasks(&mut client_rt, config.clone(), work_receiver) - } - Protocol::Momento => { - clients::momento::launch_tasks(&mut client_rt, config.clone(), work_receiver) - } - Protocol::Ping => { - clients::ping::launch_tasks(&mut client_rt, config.clone(), work_receiver) - } - Protocol::Resp => { - clients::redis::launch_tasks(&mut client_rt, config.clone(), work_receiver) - } - Protocol::Kafka => { - error!("keyspace is not supported for the kafka protocol"); - std::process::exit(1); - } - Protocol::Blabber => { - error!("keyspace is not supported for the blabber protocol"); - std::process::exit(1); - } - } - - Some(client_rt) -} +pub mod cache; +pub mod ping; +pub mod pubsub; +pub mod store; pub enum ResponseError { /// Some exception while reading the response @@ -84,4 +25,4 @@ impl From for ResponseError { _ => ResponseError::Exception, } } -} +} \ No newline at end of file diff --git a/src/clients/ping.rs b/src/clients/ping/ascii.rs similarity index 95% rename from src/clients/ping.rs rename to src/clients/ping/ascii.rs index 506d1997..c7f9f223 100644 --- a/src/clients/ping.rs +++ b/src/clients/ping/ascii.rs @@ -1,4 +1,14 @@ -use super::*; +use std::io::ErrorKind; +use crate::clients::*; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; +use std::time::Instant; +use std::io::{Error, Result}; +use tokio::time::timeout; +use crate::workload::*; +use async_channel::Receiver; +use tokio::runtime::Runtime; +use crate::*; use crate::net::Connector; use protocol_ping::{Compose, Parse, Request, Response}; use session::{Buf, BufMut, Buffer}; @@ -75,7 +85,7 @@ async fn task( let work_item = work_receiver .recv() .await - .map_err(|_| Error::new(ErrorKind::Other, "channel closed"))?; + .map_err(|_| Error::other("channel closed"))?; REQUEST.increment(); diff --git a/src/clients/ping/mod.rs b/src/clients/ping/mod.rs new file mode 100644 index 00000000..db3ba8ce --- /dev/null +++ b/src/clients/ping/mod.rs @@ -0,0 +1 @@ +pub mod ascii; \ No newline at end of file diff --git a/src/pubsub/blabber.rs b/src/clients/pubsub/blabber.rs similarity index 97% rename from src/pubsub/blabber.rs rename to src/clients/pubsub/blabber.rs index f8429f64..f1783eea 100644 --- a/src/pubsub/blabber.rs +++ b/src/clients/pubsub/blabber.rs @@ -1,14 +1,13 @@ -use super::*; +use crate::clients::pubsub::*; use crate::net::Connector; -use bytes::Buf; -use bytes::BufMut; + +use bytes::{Buf, BufMut}; use session::Buffer; -use std::borrow::Borrow; -use std::borrow::BorrowMut; use tokio::io::AsyncReadExt; - use tokio::time::timeout; +use std::borrow::{Borrow, BorrowMut}; + // blabber has a header before the standard pubsub message // // ___________________________________ diff --git a/src/pubsub/kafka.rs b/src/clients/pubsub/kafka.rs similarity index 99% rename from src/pubsub/kafka.rs rename to src/clients/pubsub/kafka.rs index df7e47b3..9452331b 100644 --- a/src/pubsub/kafka.rs +++ b/src/clients/pubsub/kafka.rs @@ -1,4 +1,5 @@ -use super::*; +use crate::clients::pubsub::*; + use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; use rdkafka::client::DefaultClientContext; use rdkafka::config::ClientConfig; diff --git a/src/pubsub/mod.rs b/src/clients/pubsub/mod.rs similarity index 98% rename from src/pubsub/mod.rs rename to src/clients/pubsub/mod.rs index bdcd7ccf..42d3d6bf 100644 --- a/src/pubsub/mod.rs +++ b/src/clients/pubsub/mod.rs @@ -1,4 +1,3 @@ -use crate::clients::*; use crate::workload::Component; use crate::workload::PublisherWorkItem as WorkItem; use crate::*; @@ -111,7 +110,7 @@ impl PubsubRuntimes { } } -pub fn launch_pubsub( +pub fn launch( config: &Config, work_receiver: Receiver, workload_components: &[Component], @@ -152,8 +151,8 @@ fn launch_publishers( kafka::create_topics(&mut publisher_rt, config.clone(), workload_components); kafka::launch_publishers(&mut publisher_rt, config.clone(), work_receiver); } - _ => { - error!("pubsub is not supported for the selected protocol"); + protocol => { + error!("pubsub is not supported for the selected protocol: {:?}", protocol); std::process::exit(1); } } diff --git a/src/pubsub/momento.rs b/src/clients/pubsub/momento.rs similarity index 98% rename from src/pubsub/momento.rs rename to src/clients/pubsub/momento.rs index 7320bdf8..c6d1f61b 100644 --- a/src/pubsub/momento.rs +++ b/src/clients/pubsub/momento.rs @@ -1,4 +1,8 @@ -use super::*; +use crate::clients::pubsub::*; +use async_channel::Receiver; +use crate::workload::*; +use tokio::runtime::Runtime; +use crate::clients::*; use ::momento::topics::configurations::LowLatency; use ::momento::topics::{TopicClient, ValueKind}; diff --git a/src/store/mod.rs b/src/clients/store/mod.rs similarity index 51% rename from src/store/mod.rs rename to src/clients/store/mod.rs index 5ccdae70..4e3d44f6 100644 --- a/src/store/mod.rs +++ b/src/clients/store/mod.rs @@ -1,15 +1,12 @@ use crate::*; -use ::momento::{MomentoError, MomentoErrorCode}; use async_channel::Receiver; -use std::io::{Error, ErrorKind, Result}; -use std::time::Instant; use tokio::runtime::Runtime; use workload::{ClientWorkItemKind, StoreClientRequest}; mod momento; -pub fn launch_store_clients( +pub fn launch( config: &Config, work_receiver: Receiver>, ) -> Option { @@ -30,32 +27,11 @@ pub fn launch_store_clients( match config.general().protocol() { Protocol::Momento => momento::launch_tasks(&mut client_rt, config.clone(), work_receiver), - _ => { - error!("momento protocol is the only supported store protocol"); + protocol => { + error!("store commands are not supported for the {:?} protocol", protocol); std::process::exit(1); } } Some(client_rt) } - -pub enum ResponseError { - /// Some exception while reading the response - Exception, - /// A timeout while awaiting the response - Timeout, - /// Some backends may have rate limits - Ratelimited, - /// Some backends may have their own timeout - BackendTimeout, -} - -impl From for ResponseError { - fn from(other: MomentoError) -> Self { - match other.error_code { - MomentoErrorCode::LimitExceededError { .. } => ResponseError::Ratelimited, - MomentoErrorCode::TimeoutError { .. } => ResponseError::BackendTimeout, - _ => ResponseError::Exception, - } - } -} diff --git a/src/store/momento.rs b/src/clients/store/momento.rs similarity index 96% rename from src/store/momento.rs rename to src/clients/store/momento.rs index 87a43a13..6885d39e 100644 --- a/src/store/momento.rs +++ b/src/clients/store/momento.rs @@ -1,14 +1,18 @@ -use super::*; +use crate::clients::*; +use crate::*; +use crate::workload::*; +use async_channel::Receiver; +use tokio::runtime::Runtime; use ::momento::storage::configurations::LowLatency; use ::momento::*; - use paste::paste; - use ::momento::storage::PutRequest; use storage::GetResponse; use tokio::time::timeout; -use workload::StoreClientRequest; + +use std::time::Instant; +use std::io::{Error, Result}; /// Launch tasks with one channel per task as gRPC is mux-enabled. pub fn launch_tasks( @@ -78,7 +82,7 @@ async fn task( let work_item = work_receiver .recv() .await - .map_err(|_| Error::new(ErrorKind::Other, "channel closed"))?; + .map_err(|_| Error::other("channel closed"))?; STORE_REQUEST.increment(); let start = Instant::now(); diff --git a/src/main.rs b/src/main.rs index f99a2c41..ba63c550 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,3 @@ -use crate::clients::launch_clients; -use crate::pubsub::launch_pubsub; use crate::workload::{launch_workload, Generator, Ratelimit}; use async_channel::{bounded, Sender}; use backtrace::Backtrace; @@ -10,7 +8,6 @@ use ringlog::*; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use store::launch_store_clients; use tokio::runtime::Builder; use tokio::sync::RwLock; use tokio::time::sleep; @@ -21,8 +18,6 @@ mod config; mod metrics; mod net; mod output; -mod pubsub; -mod store; mod workload; use config::*; @@ -167,13 +162,13 @@ fn main() { ); // start client(s) - let client_runtime = launch_clients(&config, client_receiver); + let client_runtime = clients::cache::launch(&config, client_receiver); // start store client(s) - let store_runtime = launch_store_clients(&config, store_receiver); + let store_runtime = clients::store::launch(&config, store_receiver); // start publisher(s) and subscriber(s) - let mut pubsub_runtimes = launch_pubsub(&config, pubsub_receiver, &workload_components); + let mut pubsub_runtimes = clients::pubsub::launch(&config, pubsub_receiver, &workload_components); // start ratelimit controller thread if a dynamic ratelimit is configured {