Skip to content

Commit

Permalink
various stats fixes and cleanup for ping protocols
Browse files Browse the repository at this point in the history
  • Loading branch information
brayniac committed Aug 20, 2024
1 parent 06d5db1 commit 14c48e0
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 20 deletions.
24 changes: 16 additions & 8 deletions src/clients/ping/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,25 @@ pub fn launch_tasks(

for endpoint in config.target().endpoints() {
for _ in 0..config.client().unwrap().poolsize() {
let _ = runtime.enter();
while RUNNING.load(Ordering::Relaxed) {
let endpoint = endpoint.clone();

let endpoint = endpoint.clone();
CONNECT.increment();

let client = runtime
.block_on(async { PingClient::connect(endpoint).await })
.unwrap();
let _ = runtime.enter();

// create one task per channel
for _ in 0..config.client().unwrap().concurrency() {
runtime.spawn(task(config.clone(), client.clone(), work_receiver.clone()));
if let Ok(client) = runtime.block_on(async { PingClient::connect(endpoint).await })
{
CONNECT_OK.increment();
CONNECT_CURR.increment();

// create one task per channel
for _ in 0..config.client().unwrap().concurrency() {
runtime.spawn(task(config.clone(), client.clone(), work_receiver.clone()));
}
} else {
CONNECT_EX.increment();
}
}
}
}
Expand Down
14 changes: 10 additions & 4 deletions src/clients/ping/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,21 @@ pub async fn pool_manager(endpoint: String, _config: Config, queue: Queue<SendRe
});

if let Ok(h2) = h2.ready().await {
CONNECT_OK.increment();
CONNECT_CURR.increment();

client = Some(h2);

continue;
}
}
}
}

// Successfully negotiated connections result in early continue back
// to the top of the loop. If we hit this, that means there was some
// exception during connection establishment / negotiation.
CONNECT_EX.increment();
} else if let Ok(s) = client.clone().unwrap().ready().await {
let _ = queue.send(s).await;
} else {
Expand All @@ -114,10 +124,6 @@ async fn task(
_config: Config,
queue: Queue<SendRequest<Bytes>>,
) -> Result<(), std::io::Error> {
// let mut buffer = Buffer::new(16384);
// let parser = protocol_ping::ResponseParser::new();
// let mut sender = None;

let uri = endpoint
.parse::<http::Uri>()
.map_err(|_| Error::new(ErrorKind::Other, "failed to parse uri"))?;
Expand Down
22 changes: 14 additions & 8 deletions src/clients/ping/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub fn launch_tasks(
for _ in 0..config.client().unwrap().poolsize() {
for endpoint in config.target().endpoints() {
// for each endpoint have poolsize # of pool_managers, each managing
// a single TCP stream
// a single H3 stream

let queue = Queue::new(1);
runtime.spawn(pool_manager(
Expand All @@ -39,8 +39,8 @@ pub fn launch_tasks(
queue.clone(),
));

// since HTTP/2.0 allows muxing several sessions onto a single TCP
// stream, we launch one task for each session on this TCP stream
// since HTTP/3.0 allows muxing several sessions onto a single H3
// stream, we launch one task for each session on this H3 stream
for _ in 0..config.client().unwrap().concurrency() {
runtime.spawn(task(
work_receiver.clone(),
Expand Down Expand Up @@ -147,33 +147,39 @@ pub async fn pool_manager(

if let Ok((mut driver, send_request)) = ::h3::client::new(quinn_conn).await
{
CONNECT_OK.increment();
CONNECT_CURR.increment();

tokio::spawn(async move {
let _ = core::future::poll_fn(|cx| driver.poll_close(cx)).await;
});

client = Some(send_request);

continue;
}
}
}
}

// Successfully negotiated connections result in early continue back
// to the top of the loop. If we hit this, that means there was some
// exception during connection establishment / negotiation.
CONNECT_EX.increment();
} else if let Some(s) = client.clone() {
let _ = queue.send(s).await;
}
}
}

// a task for http/2.0
// a task for HTTP/3.0
#[allow(clippy::slow_vector_initialization)]
async fn task(
work_receiver: Receiver<ClientWorkItemKind<ClientRequest>>,
endpoint: String,
_config: Config,
queue: Queue<SendRequest<h3_quinn::OpenStreams, Bytes>>,
) -> Result<(), std::io::Error> {
// let mut buffer = Buffer::new(16384);
// let parser = protocol_ping::ResponseParser::new();
// let mut sender = None;

let uri = endpoint
.parse::<http::Uri>()
.map_err(|_| Error::new(ErrorKind::Other, "failed to parse uri"))?;
Expand Down

0 comments on commit 14c48e0

Please sign in to comment.