From 0ce5ba9f5c920867ca2270703b54de6dd289c5c6 Mon Sep 17 00:00:00 2001 From: Brian Martin Date: Mon, 22 Jul 2024 21:02:59 -0700 Subject: [PATCH] adds h2 implementation for grpc ping --- Cargo.lock | 17 +- Cargo.toml | 5 +- configs/grpc_ping.toml | 4 +- configs/ping.toml | 14 +- src/clients/http2.rs | 8 +- src/clients/mod.rs | 8 +- src/clients/{ping.rs => ping/ascii.rs} | 2 +- src/clients/{grpc_ping.rs => ping/grpc.rs} | 2 +- src/clients/ping/http2.rs | 208 +++++++++++++++++++++ src/clients/ping/mod.rs | 3 + src/config/protocol.rs | 1 + 11 files changed, 251 insertions(+), 21 deletions(-) rename src/clients/{ping.rs => ping/ascii.rs} (99%) rename src/clients/{grpc_ping.rs => ping/grpc.rs} (99%) create mode 100644 src/clients/ping/http2.rs create mode 100644 src/clients/ping/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 412eb8ff..fc157206 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -393,6 +393,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "autocfg" version = "1.3.0" @@ -1160,15 +1166,15 @@ dependencies = [ [[package]] name = "h2" -version = "0.4.4" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "816ec7294445779408f36fe57bc5b7fc1cf59664059096c65f905c1c61f58069" +checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab" dependencies = [ + "atomic-waker", "bytes", "fnv", "futures-core", "futures-sink", - "futures-util", "http 1.1.0", "indexmap 2.2.6", "slab", @@ -1353,7 +1359,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2 0.4.4", + "h2 0.4.5", "http 1.1.0", "http-body 1.0.0", "httparse", @@ -2610,7 +2616,9 @@ dependencies = [ "flate2", "foreign-types-shared 0.3.1", "futures", + "h2 0.4.5", "histogram", + "http 1.1.0", "http-body-util", "humantime", "hyper 1.3.1", @@ -2623,6 +2631,7 @@ dependencies = [ "openssl-sys", "paste", "pelikan-net 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "pin-project", "prost", "protocol-memcache", "protocol-ping", diff --git a/Cargo.toml b/Cargo.toml index e30f5bc4..a1dfefa5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,9 @@ tonic = "0.11" warp = "0.3.6" zipf = "7.0.1" flate2 = "1.0.28" +pin-project = "1.1.5" +h2 = "0.4.5" +http = "1.1.0" [build-dependencies] tonic-build = "0.11" @@ -70,7 +73,7 @@ debug = true rpath = false lto = true debug-assertions = false -codegen-units = 1 +codegen-units = 16 [profile.bench] opt-level = 3 diff --git a/configs/grpc_ping.toml b/configs/grpc_ping.toml index 67aabe4e..ad99f1e0 100644 --- a/configs/grpc_ping.toml +++ b/configs/grpc_ping.toml @@ -3,7 +3,7 @@ [general] # specify the protocol to be used -protocol = "grpc_ping" +protocol = "http2_ping" # the interval for stats integration and reporting interval = 60 # the number of intervals to run the test for @@ -44,7 +44,7 @@ log_max_size = 1073741824 [target] # specify one or more endpoints as IP:PORT pairs endpoints = [ - "http://127.0.0.1:12321", + "http://cache00:12321", ] [client] diff --git a/configs/ping.toml b/configs/ping.toml index da160a4c..3614dc25 100644 --- a/configs/ping.toml +++ b/configs/ping.toml @@ -3,9 +3,13 @@ [general] # specify the protocol to be used -protocol = "ping" +# should be one of: +# ping - ascii ping/pong over TCP (w/ optional TLS) +# grpc_ping - ping/pong using GRPC over HTTP(s)/2 +# http2_ping - ping/pong implemented using HTTP(s)/2 GET +protocol = "grpc_ping" # the interval for stats integration and reporting -interval = 60 +interval = 1 # the number of intervals to run the test for duration = 300 # run the admin thread with a HTTP listener at the address provided, this allows @@ -44,14 +48,14 @@ log_max_size = 1073741824 [target] # specify one or more endpoints as IP:PORT pairs endpoints = [ - "127.0.0.1:12321", + "http://127.0.0.1:8080", ] [client] # number of threads used to drive client requests threads = 4 # the total number of connections to each endpoint -poolsize = 20 +poolsize = 100 # the connect timeout in milliseconds connect_timeout = 10000 # set the timeout in milliseconds @@ -63,7 +67,7 @@ threads = 1 [workload.ratelimit] # set a global ratelimit for the workload -start = 10_000 +#start = 1 # Note, even though the command does not use keys, it's still a member of a # keyspace. diff --git a/src/clients/http2.rs b/src/clients/http2.rs index 85b3f1a5..fa98f9b8 100644 --- a/src/clients/http2.rs +++ b/src/clients/http2.rs @@ -9,7 +9,7 @@ use hyper::{Request, Uri}; use std::future::Future; #[derive(Clone)] -struct Queue { +pub struct Queue { tx: async_channel::Sender, rx: async_channel::Receiver, } @@ -62,7 +62,7 @@ pub fn launch_tasks(runtime: &mut Runtime, config: Config, work_receiver: Receiv } #[derive(Clone)] -struct TokioExecutor; +pub struct TokioExecutor; impl Executor for TokioExecutor where @@ -74,7 +74,7 @@ where } } -async fn pool_manager(endpoint: String, config: Config, queue: Queue>>) { +pub async fn pool_manager(endpoint: String, config: Config, queue: Queue>>) { let connector = Connector::new(&config).expect("failed to init connector"); let mut sender = None; @@ -181,7 +181,7 @@ async fn task( .header(hyper::header::HOST, authority.as_str()) .header( hyper::header::USER_AGENT, - &format!("rpc-perf/5.0.0-alpha (request; seq:{sequence})"), + &format!("rpc-perf/{} (request; seq:{sequence})", env!("CARGO_PKG_VERSION")), ) .body(Empty::::new()) .expect("failed to build request") diff --git a/src/clients/mod.rs b/src/clients/mod.rs index ba2dfef4..1c39253b 100644 --- a/src/clients/mod.rs +++ b/src/clients/mod.rs @@ -11,7 +11,6 @@ use tokio::time::{timeout, Duration}; use std::io::{Error, ErrorKind, Result}; use std::time::Instant; -mod grpc_ping; mod http1; mod http2; mod memcache; @@ -33,7 +32,7 @@ pub fn launch_clients(config: &Config, work_receiver: Receiver) -> Opt match config.general().protocol() { Protocol::GrpcPing => { - clients::grpc_ping::launch_tasks(&mut client_rt, config.clone(), work_receiver) + clients::ping::grpc::launch_tasks(&mut client_rt, config.clone(), work_receiver) } Protocol::Http1 => { clients::http1::launch_tasks(&mut client_rt, config.clone(), work_receiver) @@ -41,6 +40,9 @@ pub fn launch_clients(config: &Config, work_receiver: Receiver) -> Opt Protocol::Http2 => { clients::http2::launch_tasks(&mut client_rt, config.clone(), work_receiver) } + Protocol::Http2Ping => { + clients::ping::http2::launch_tasks(&mut client_rt, config.clone(), work_receiver) + } Protocol::Memcache => { clients::memcache::launch_tasks(&mut client_rt, config.clone(), work_receiver) } @@ -48,7 +50,7 @@ pub fn launch_clients(config: &Config, work_receiver: Receiver) -> Opt clients::momento::launch_tasks(&mut client_rt, config.clone(), work_receiver) } Protocol::Ping => { - clients::ping::launch_tasks(&mut client_rt, config.clone(), work_receiver) + clients::ping::ascii::launch_tasks(&mut client_rt, config.clone(), work_receiver) } Protocol::Resp => { clients::redis::launch_tasks(&mut client_rt, config.clone(), work_receiver) diff --git a/src/clients/ping.rs b/src/clients/ping/ascii.rs similarity index 99% rename from src/clients/ping.rs rename to src/clients/ping/ascii.rs index cae75572..2fca6035 100644 --- a/src/clients/ping.rs +++ b/src/clients/ping/ascii.rs @@ -1,4 +1,4 @@ -use super::*; +use crate::clients::*; use crate::net::Connector; use protocol_ping::{Compose, Parse, Request, Response}; use session::{Buf, BufMut, Buffer}; diff --git a/src/clients/grpc_ping.rs b/src/clients/ping/grpc.rs similarity index 99% rename from src/clients/grpc_ping.rs rename to src/clients/ping/grpc.rs index 89ff339c..58e147b3 100644 --- a/src/clients/grpc_ping.rs +++ b/src/clients/ping/grpc.rs @@ -1,4 +1,4 @@ -use super::*; +use crate::clients::*; use tonic::transport::Channel; use pingpong::ping_client::PingClient; diff --git a/src/clients/ping/http2.rs b/src/clients/ping/http2.rs new file mode 100644 index 00000000..f56c6001 --- /dev/null +++ b/src/clients/ping/http2.rs @@ -0,0 +1,208 @@ +use http::Version; +use chrono::DateTime; +use chrono::Utc; +use bytes::BytesMut; +use http::Method; +use http::uri::Authority; +use tokio::net::TcpStream; +use std::time::Instant; +use crate::workload::ClientRequest; +use std::io::ErrorKind; +use async_channel::Receiver; +use crate::clients::timeout; +use crate::clients::WorkItem; +use tokio::runtime::Runtime; +use crate::*; +use crate::net::Connector; +use crate::clients::http2::TokioExecutor; +use http::HeaderValue; +use http::HeaderName; +use http::Uri; +use h2::client::SendRequest; +use crate::clients::http2::Queue; +use std::borrow::Borrow; +use bytes::BufMut; +use bytes::Buf; +use std::io::Write; +use std::borrow::BorrowMut; +use session::Buffer; +use bytes::Bytes; +use std::io::Error; + +// launch a pool manager and worker tasks since HTTP/2.0 is mux'ed we prepare +// senders in the pool manager and pass them over a queue to our worker tasks +pub fn launch_tasks(runtime: &mut Runtime, config: Config, work_receiver: Receiver) { + debug!("launching http2 protocol tasks"); + + for _ in 0..config.client().unwrap().poolsize() { + for endpoint in config.target().endpoints() { + // for each endpoint have poolsize # of pool_managers, each managing + // a single TCP stream + + let queue = Queue::new(1); + runtime.spawn(pool_manager( + endpoint.clone(), + config.clone(), + queue.clone(), + )); + + // since HTTP/2.0 allows muxing several sessions onto a single TCP + // stream, we launch one task for each session on this TCP stream + for _ in 0..config.client().unwrap().concurrency() { + runtime.spawn(task( + work_receiver.clone(), + endpoint.clone(), + config.clone(), + queue.clone(), + )); + } + } + } +} + +async fn resolve(uri: &str) -> Result<(std::net::SocketAddr, Authority), std::io::Error> { + let uri = uri + .parse::() + .map_err(|_| Error::new(ErrorKind::Other, "failed to parse uri"))?; + + let auth = uri + .authority() + .ok_or(Error::new(ErrorKind::Other, "uri has no authority"))? + .clone(); + + let port = auth.port_u16().unwrap_or(443); + + let addr = tokio::net::lookup_host((auth.host(), port)) + .await? + .next() + .ok_or(Error::new(ErrorKind::Other, "dns found no addresses"))?; + + Ok((addr, auth)) +} + +pub async fn pool_manager(endpoint: String, config: Config, queue: Queue>) { + let mut client = None; + + // let connector = Connector::new(&config).expect("failed to init connector"); + // let mut sender = None; + + while RUNNING.load(Ordering::Relaxed) { + if client.is_none() { + CONNECT.increment(); + + if let Ok((addr, _auth)) = resolve(&endpoint).await { + if let Ok(tcp) = TcpStream::connect(addr).await { + if let Ok((h2, connection)) = ::h2::client::handshake(tcp).await { + tokio::spawn(async move { + connection.await.unwrap(); + }); + + if let Ok(h2) = h2.ready().await { + client = Some(h2); + } + } + } + } + } else if let Ok(s) = client.clone().unwrap().ready().await { + let _ = queue.send(s).await; + } else { + client = None; + } + } +} + +// a task for http/2.0 +#[allow(clippy::slow_vector_initialization)] +async fn task( + work_receiver: Receiver, + endpoint: String, + config: Config, + queue: Queue>, +) -> Result<(), std::io::Error> { + // let mut buffer = Buffer::new(16384); + // let parser = protocol_ping::ResponseParser::new(); + // let mut sender = None; + + let uri = endpoint + .parse::() + .map_err(|_| Error::new(ErrorKind::Other, "failed to parse uri"))?; + + let auth = uri + .authority() + .ok_or(Error::new(ErrorKind::Other, "uri has no authority"))? + .clone(); + + let port = auth.port_u16().unwrap_or(443); + + while RUNNING.load(Ordering::Relaxed) { + let sender = queue.recv().await; + + if sender.is_err() { + continue; + } + + let mut sender = sender.unwrap(); + + let work_item = work_receiver + .recv() + .await + .map_err(|_| Error::new(ErrorKind::Other, "channel closed"))?; + + + + REQUEST.increment(); + + // compose request into buffer + match &work_item { + WorkItem::Request { request, .. } => match request { + ClientRequest::Ping(_) => { } + _ => { + REQUEST_UNSUPPORTED.increment(); + continue; + } + }, + WorkItem::Reconnect => { + REQUEST_UNSUPPORTED.increment(); + continue; + } + }; + + let now: DateTime = Utc::now(); + + let request = http::request::Builder::new() + .version(Version::HTTP_2) + .method(Method::POST) + .uri(&format!("http://192.168.1.205:12321/pingpong.Ping/Ping")) + .header("content-type", "application/grpc") + .header("date", now.to_rfc2822()) + .header("user-agent", "unknown/0.0.0") + .header("te", "trailers") + .body(()) + .unwrap(); + + let start = Instant::now(); + + if let Ok((response, mut stream)) = sender.send_request(request, false) { + if stream.send_data(Bytes::from(vec![0, 0, 0, 0, 0]), true).is_err() { + // REQUEST_EX.increment(); + continue; + } else { + REQUEST_OK.increment(); + } + + if let Ok(_response) = response.await { + let stop = Instant::now(); + + RESPONSE_OK.increment(); + PING_OK.increment(); + + let latency = stop.duration_since(start).as_nanos() as u64; + let _ = RESPONSE_LATENCY.increment(latency); + } else { + RESPONSE_EX.increment(); + } + } + } + + Ok(()) +} diff --git a/src/clients/ping/mod.rs b/src/clients/ping/mod.rs new file mode 100644 index 00000000..e6a3d3f7 --- /dev/null +++ b/src/clients/ping/mod.rs @@ -0,0 +1,3 @@ +pub mod ascii; +pub mod grpc; +pub mod http2; \ No newline at end of file diff --git a/src/config/protocol.rs b/src/config/protocol.rs index 9e8011af..6e2896e3 100644 --- a/src/config/protocol.rs +++ b/src/config/protocol.rs @@ -7,6 +7,7 @@ pub enum Protocol { GrpcPing, Http1, Http2, + Http2Ping, Memcache, Momento, Ping,