Skip to content

Commit

Permalink
allow providiing a ca certificate
Browse files Browse the repository at this point in the history
  • Loading branch information
brayniac committed Aug 13, 2024
1 parent 3beb977 commit 0161dc3
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 33 deletions.
6 changes: 5 additions & 1 deletion src/clients/ping/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ pub mod pingpong {

// launch a pool manager and worker tasks since HTTP/2.0 is mux'ed we prepare
// senders in the pool manager and pass them over a queue to our worker tasks
pub fn launch_tasks(runtime: &mut Runtime, config: Config, work_receiver: Receiver<ClientWorkItemKind<ClientRequest>>) {
pub fn launch_tasks(
runtime: &mut Runtime,
config: Config,
work_receiver: Receiver<ClientWorkItemKind<ClientRequest>>,
) {
debug!("launching http2 protocol tasks");

for endpoint in config.target().endpoints() {
Expand Down
12 changes: 8 additions & 4 deletions src/clients/ping/http2.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use http::HeaderValue;
use crate::workload::ClientWorkItemKind;
use crate::clients::http2::Queue;
use crate::workload::ClientRequest;
use crate::workload::ClientWorkItemKind;
use crate::*;
use async_channel::Receiver;
use bytes::Bytes;
use chrono::Utc;
use h2::client::SendRequest;
use http::uri::Authority;
use http::HeaderValue;
use http::Method;
use http::Version;
use std::io::Error;
Expand All @@ -18,7 +18,11 @@ use tokio::runtime::Runtime;

// launch a pool manager and worker tasks since HTTP/2.0 is mux'ed we prepare
// senders in the pool manager and pass them over a queue to our worker tasks
pub fn launch_tasks(runtime: &mut Runtime, config: Config, work_receiver: Receiver<ClientWorkItemKind<ClientRequest>>) {
pub fn launch_tasks(
runtime: &mut Runtime,
config: Config,
work_receiver: Receiver<ClientWorkItemKind<ClientRequest>>,
) {
debug!("launching http2 protocol tasks");

for _ in 0..config.client().unwrap().poolsize() {
Expand Down Expand Up @@ -158,7 +162,7 @@ async fn task(

let mut date = HeaderValue::from_str(&Utc::now().to_rfc2822()).unwrap();
date.set_sensitive(true);

let request = http::request::Builder::new()
.version(Version::HTTP_2)
.method(Method::POST)
Expand Down
71 changes: 43 additions & 28 deletions src/clients/ping/http3.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use rustls::pki_types::CertificateDer;
use http::HeaderValue;
use crate::workload::ClientWorkItemKind;
use crate::clients::http2::Queue;
use crate::workload::ClientRequest;
use crate::workload::ClientWorkItemKind;
use crate::*;
use async_channel::Receiver;
use bytes::Bytes;
use chrono::Utc;
use h3::client::SendRequest;
use http::uri::Authority;
use http::HeaderValue;
use http::Method;
use http::Version;
use rustls::pki_types::CertificateDer;
use std::io::Error;
use std::io::ErrorKind;
use std::time::Instant;
Expand All @@ -20,7 +20,11 @@ static ALPN: &[u8] = b"h3";

// launch a pool manager and worker tasks since HTTP/2.0 is mux'ed we prepare
// senders in the pool manager and pass them over a queue to our worker tasks
pub fn launch_tasks(runtime: &mut Runtime, config: Config, work_receiver: Receiver<ClientWorkItemKind<ClientRequest>>) {
pub fn launch_tasks(
runtime: &mut Runtime,
config: Config,
work_receiver: Receiver<ClientWorkItemKind<ClientRequest>>,
) {
debug!("launching http2 protocol tasks");

for _ in 0..config.client().unwrap().poolsize() {
Expand Down Expand Up @@ -69,7 +73,7 @@ async fn resolve(uri: &str) -> Result<(std::net::SocketAddr, Authority), std::io
Ok((addr, auth))
}

fn root_cert_store() -> rustls::RootCertStore {
fn root_cert_store(config: &Config) -> rustls::RootCertStore {
// load system CA certs
let mut roots = rustls::RootCertStore::empty();
match rustls_native_certs::load_native_certs() {
Expand All @@ -85,41 +89,42 @@ fn root_cert_store() -> rustls::RootCertStore {
}
};

// if let Err(e) = roots.add(CertificateDer::from(std::fs::read("ca.cert").unwrap())) {
// eprintln!("failed to parse trust anchor: {}", e);
// }
if let Some(Some(ca_file)) = config.tls().map(|c| c.ca_file()) {
if let Err(e) = roots.add(CertificateDer::from(std::fs::read(ca_file).unwrap())) {
eprintln!("failed to parse trust anchor: {}", e);
}
}

roots
}

pub async fn pool_manager(endpoint: String, _config: Config, queue: Queue<SendRequest<h3_quinn::OpenStreams, Bytes>>) {
pub async fn pool_manager(
endpoint: String,
config: Config,
queue: Queue<SendRequest<h3_quinn::OpenStreams, Bytes>>,
) {
let mut client = None;

let mut tls_config = rustls::ClientConfig::builder()
.with_root_certificates(root_cert_store())
.with_root_certificates(root_cert_store(&config))
.with_no_client_auth();

tls_config.enable_early_data = true;
tls_config.alpn_protocols = vec![ALPN.into()];

let quic_client_config = Arc::new(quinn::crypto::rustls::QuicClientConfig::try_from(tls_config).expect("failed to initialize quic client config"));

// let connector = Connector::new(&config).expect("failed to init connector");
// let mut sender = None;
let quic_client_config = Arc::new(
quinn::crypto::rustls::QuicClientConfig::try_from(tls_config)
.expect("failed to initialize quic client config"),
);

while RUNNING.load(Ordering::Relaxed) {
if client.is_none() {
CONNECT.increment();

if let Ok((addr, auth)) = resolve(&endpoint).await {
let mut tls_config = rustls::ClientConfig::builder()
.with_root_certificates(root_cert_store())
.with_no_client_auth();

tls_config.enable_early_data = true;
tls_config.alpn_protocols = vec![ALPN.into()];

if let Ok(mut client_endpoint) = h3_quinn::quinn::Endpoint::client("[::]:0".parse().unwrap()) {
if let Ok(mut client_endpoint) =
h3_quinn::quinn::Endpoint::client("[::]:0".parse().unwrap())
{
let mut client_config = quinn::ClientConfig::new(quic_client_config.clone());

let mut transport_config = quinn::TransportConfig::default();
Expand All @@ -128,12 +133,18 @@ pub async fn pool_manager(endpoint: String, _config: Config, queue: Queue<SendRe

client_endpoint.set_default_client_config(client_config);

if let Ok(quic_conn) = client_endpoint.connect(addr, auth.host()).unwrap().await.map_err(|e| {
eprintln!("failed to create http3 client: {e}");
}) {
if let Ok(quic_conn) = client_endpoint
.connect(addr, auth.host())
.unwrap()
.await
.map_err(|e| {
eprintln!("failed to create http3 client: {e}");
})
{
let quinn_conn = h3_quinn::Connection::new(quic_conn);

if let Ok((mut driver, send_request)) = ::h3::client::new(quinn_conn).await {
if let Ok((mut driver, send_request)) = ::h3::client::new(quinn_conn).await
{
tokio::spawn(async move {
let _ = core::future::poll_fn(|cx| driver.poll_close(cx)).await;
});
Expand Down Expand Up @@ -205,7 +216,7 @@ async fn task(

let mut date = HeaderValue::from_str(&Utc::now().to_rfc2822()).unwrap();
date.set_sensitive(true);

let request = http::request::Builder::new()
.version(Version::HTTP_3)
.method(Method::POST)
Expand All @@ -220,7 +231,11 @@ async fn task(
let start = Instant::now();

if let Ok(mut stream) = sender.send_request(request).await {
if stream.send_data(Bytes::from(vec![0, 0, 0, 0, 0])).await.is_ok() {
if stream
.send_data(Bytes::from(vec![0, 0, 0, 0, 0]))
.await
.is_ok()
{
REQUEST_OK.increment();

if let Ok(_response) = stream.recv_response().await {
Expand Down

0 comments on commit 0161dc3

Please sign in to comment.