diff --git a/Cargo.toml b/Cargo.toml index e5f88fc8..e2847a8a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,8 +11,6 @@ license = "MIT OR Apache-2.0" [dependencies] ahash = "0.8.3" async-channel = "1.9.0" -aws-config = { version = "1.1.7", features = ["behavior-version-latest"] } -aws-sdk-s3 = "1.51.0" backtrace = "0.3.69" boring = { version = "3.1.0", optional = true } boring-sys = { version = "3.1.0", optional = true } @@ -65,7 +63,6 @@ sha1 = "0.10.6" hmac = "0.12.1" hyper-rustls = "0.27.3" hyper-util = { version = "0.1.8", features = ["full"] } -client = "0.1.1-beta1" hex = "0.4.3" [features] diff --git a/configs/s3.toml b/configs/s3.toml index 7aaf9947..f8b1cae5 100644 --- a/configs/s3.toml +++ b/configs/s3.toml @@ -76,9 +76,9 @@ vlen = 128 # controls what commands will be used in this keyspace commands = [ # get a value - # { verb = "get", weight = 80 }, + { verb = "get", weight = 80 }, # set a value - # { verb = "put", weight = 19 }, + { verb = "put", weight = 19 }, # # delete a value { verb = "delete", weight = 1 }, ] diff --git a/src/store/s3.rs b/src/store/s3.rs index 5a3cff8f..b4846c2f 100644 --- a/src/store/s3.rs +++ b/src/store/s3.rs @@ -3,9 +3,11 @@ use crate::workload::StoreClientRequest; use crate::*; use async_channel::Receiver; use bytes::Bytes; +use chrono::DateTime; use chrono::Utc; use futures::Future; use hmac::{Hmac, Mac}; +use http::HeaderMap; use http::Method; use http::Version; use http_body_util::BodyExt; @@ -21,8 +23,6 @@ use std::io::ErrorKind; use std::time::Instant; use tokio::runtime::Runtime; -static EMPTY_BODY_SHA256: &str = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; - // launch a pool manager and worker tasks since HTTP/2.0 is mux'ed we prepare // senders in the pool manager and pass them over a queue to our worker tasks pub fn launch_tasks( @@ -92,8 +92,8 @@ async fn task( std::process::exit(1); }; - let bucket = parts[0]; - let region = parts[2]; + let bucket = parts[0].to_string(); + let region = parts[2].to_string(); let port = auth.port_u16().unwrap_or(443); @@ -133,14 +133,6 @@ async fn task( SESSION.increment(); continue; - - // session = Some(s); - - // tokio::task::spawn(async move { - // if let Err(err) = conn.await { - // println!("Connection failed: {:?}", err); - // } - // }); } let c = client.take().unwrap(); @@ -152,67 +144,17 @@ async fn task( REQUEST.increment(); - let now = Utc::now(); - - // date with format 20240920 - let date = format!("{}", now.format("%Y%m%d")); - - // datetime with format: 20240920T084700Z - let datetime = format!("{}", now.format("%Y%m%dT%H%M%SZ")); - - // datetime in rfc2822 format: "Fri, 28 Nov 2014 12:00:09 +0000" - let rfc2822 = now.to_rfc2822().to_string(); - match &work_item { ClientWorkItemKind::Request { request, .. } => match request { StoreClientRequest::Get(r) => { let key = &*r.key; - let content_sha256 = EMPTY_BODY_SHA256; - - // form and hash the canonical request - let canonical_request = vec![ - "GET".to_string(), - format!("/{key}"), - "".to_string(), - format!("host:{bucket}.s3.amazonaws.com"), - format!("x-amz-content-sha256:{content_sha256}"), - format!("x-amz-date:{datetime}"), - "".to_string(), - "host;x-amz-content-sha256;x-amz-date".to_string(), - content_sha256.to_string(), - ] - .join("\n"); - - trace!("canonical request:\n{canonical_request}"); - - let request_hash = sha256_sum(canonical_request); - - let scope = format!("{date}/{region}/s3/aws4_request"); - - let string_to_sign = - format!("AWS4-HMAC-SHA256\n{datetime}\n{scope}\n{request_hash}"); - - trace!("string to sign:\n{string_to_sign}"); - - let signing_key = generate_signing_key(&secret_key, &date, region, "s3"); - let signature = calculate_signature(signing_key, string_to_sign.as_bytes()); - // and finally our authorization header - let authorization = format!("AWS4-HMAC-SHA256 Credential={access_key}/{scope},SignedHeaders=host;x-amz-content-sha256;x-amz-date,Signature={signature}"); - - trace!("authorization: {authorization}"); - - // now we can make our request - let request = http::request::Builder::new() - .version(Version::HTTP_11) - .method(Method::GET) - .uri(&format!("https://{bucket}.s3.amazonaws.com/{key}")) - .header("host", &format!("{bucket}.s3.amazonaws.com")) - .header("authorization", authorization) - .header("x-amz-content-sha256", content_sha256) - .header("x-amz-date", datetime) - .body(Full::::new(vec![].into())) - .unwrap(); + let request = S3RequestBuilder::get_object( + region.clone(), + bucket.clone(), + key.to_string(), + ) + .build(&access_key, &secret_key); let start = Instant::now(); @@ -262,56 +204,15 @@ async fn task( } StoreClientRequest::Put(r) => { let key = &*r.key; - let content_sha256 = sha256_sum(&r.value); - - // form and hash the canonical request - let canonical_request = vec![ - "PUT".to_string(), - format!("/{key}"), - "".to_string(), - format!("date:{rfc2822}"), - format!("host:{bucket}.s3.amazonaws.com"), - format!("x-amz-content-sha256:{content_sha256}"), - format!("x-amz-date:{datetime}"), - "x-amz-storage-class:STANDARD".to_string(), - "".to_string(), - "date;host;x-amz-content-sha256;x-amz-date;x-amz-storage-class".to_string(), - content_sha256.to_string(), - ] - .join("\n"); - - trace!("canonical request:\n{canonical_request}"); - - let request_hash = sha256_sum(canonical_request); - - let scope = format!("{date}/{region}/s3/aws4_request"); - - let string_to_sign = - format!("AWS4-HMAC-SHA256\n{datetime}\n{scope}\n{request_hash}"); - - trace!("string to sign:\n{string_to_sign}"); - - let signing_key = generate_signing_key(&secret_key, &date, region, "s3"); - let signature = calculate_signature(signing_key, string_to_sign.as_bytes()); - - // and finally our authorization header - let authorization = format!("AWS4-HMAC-SHA256 Credential={access_key}/{scope},SignedHeaders=date;host;x-amz-content-sha256;x-amz-date;x-amz-storage-class,Signature={signature}"); - - trace!("authorization: {authorization}"); - - // now we can make our request - let request = http::request::Builder::new() - .version(Version::HTTP_11) - .method(Method::PUT) - .uri(&format!("https://{bucket}.s3.amazonaws.com/{key}")) - .header("date", rfc2822) - .header("host", &format!("{bucket}.s3.amazonaws.com")) - .header("authorization", authorization) - .header("x-amz-content-sha256", content_sha256) - .header("x-amz-date", datetime) - .header("x-amz-storage-class", "STANDARD") - .body(Full::::new(r.value.clone().into())) - .unwrap(); + let value = r.value.clone(); + + let request = S3RequestBuilder::put_object( + region.clone(), + bucket.clone(), + key.to_string(), + value, + ) + .build(&access_key, &secret_key); let start = Instant::now(); @@ -352,52 +253,13 @@ async fn task( } StoreClientRequest::Delete(r) => { let key = &*r.key; - let content_sha256 = EMPTY_BODY_SHA256; - - // form and hash the canonical request - let canonical_request = vec![ - "DELETE".to_string(), - format!("/{key}"), - "".to_string(), - format!("host:{bucket}.s3.amazonaws.com"), - format!("x-amz-content-sha256:{content_sha256}"), - format!("x-amz-date:{datetime}"), - "".to_string(), - "host;x-amz-content-sha256;x-amz-date".to_string(), - content_sha256.to_string(), - ] - .join("\n"); - - trace!("canonical request:\n{canonical_request}"); - - let request_hash = sha256_sum(canonical_request); - - let scope = format!("{date}/{region}/s3/aws4_request"); - - let string_to_sign = - format!("AWS4-HMAC-SHA256\n{datetime}\n{scope}\n{request_hash}"); - trace!("string to sign:\n{string_to_sign}"); - - let signing_key = generate_signing_key(&secret_key, &date, region, "s3"); - let signature = calculate_signature(signing_key, string_to_sign.as_bytes()); - - // and finally our authorization header - let authorization = format!("AWS4-HMAC-SHA256 Credential={access_key}/{scope},SignedHeaders=host;x-amz-content-sha256;x-amz-date,Signature={signature}"); - - trace!("authorization: {authorization}"); - - // now we can make our request - let request = http::request::Builder::new() - .version(Version::HTTP_11) - .method(Method::DELETE) - .uri(&format!("https://{bucket}.s3.amazonaws.com/{key}")) - .header("host", &format!("{bucket}.s3.amazonaws.com")) - .header("authorization", authorization) - .header("x-amz-content-sha256", content_sha256) - .header("x-amz-date", datetime) - .body(Full::::new(vec![].into())) - .unwrap(); + let request = S3RequestBuilder::delete_object( + region.clone(), + bucket.clone(), + key.to_string(), + ) + .build(&access_key, &secret_key); let start = Instant::now(); @@ -430,17 +292,17 @@ async fn task( } Err(e) => { error!("error: {e}"); + CONNECT_CURR.decrement(); continue; } } } _ => { REQUEST_UNSUPPORTED.increment(); - continue; } }, ClientWorkItemKind::Reconnect => { - REQUEST_UNSUPPORTED.increment(); + CONNECT_CURR.decrement(); continue; } }; @@ -451,6 +313,137 @@ async fn task( Ok(()) } +pub struct S3RequestBuilder { + inner: http::request::Builder, + region: String, + relative_uri: String, + content: Vec, + content_sha256: String, + timestamp: DateTime, +} + +impl S3RequestBuilder { + fn new( + region: String, + bucket: String, + method: Method, + relative_uri: String, + content: Vec, + ) -> Self { + let now = Utc::now(); + // let date = format!("{}", now.format("%Y%m%d")); + let datetime = format!("{}", now.format("%Y%m%dT%H%M%SZ")); + // let rfc2822 = now.to_rfc2822().to_string(); + + let content_sha256 = sha256_sum(&content); + + let mut headers = HeaderMap::new(); + + headers.insert( + "host", + format!("{bucket}.s3.amazonaws.com").parse().unwrap(), + ); + headers.insert("x-amz-content-sha256", content_sha256.parse().unwrap()); + headers.insert("x-amz-date", datetime.parse().unwrap()); + + let inner = http::Request::builder() + .version(Version::HTTP_11) + .method(method) + .uri(&format!("https://{bucket}.s3.amazonaws.com{relative_uri}")) + .header("host", &format!("{bucket}.s3.amazonaws.com")) + .header("x-amz-content-sha256", &content_sha256) + .header("x-amz-date", datetime); + + Self { + inner, + region, + relative_uri, + content, + content_sha256, + timestamp: now, + } + } + + pub fn build(self, access_key: &str, secret_key: &str) -> http::Request> { + // form and hash the canonical request + let mut canonical_request = vec![ + self.inner.method_ref().unwrap().as_str().to_string(), + self.relative_uri, + String::new(), + ]; + + let mut signed_hdr_names = Vec::new(); + let mut signed_headers = Vec::new(); + + for (key, value) in self.inner.headers_ref().unwrap().iter() { + signed_headers.push(format!("{key}:{}", value.to_str().unwrap())); + signed_hdr_names.push(format!("{key}")); + } + + signed_hdr_names.sort(); + signed_headers.sort(); + + canonical_request.extend_from_slice(&signed_headers); + + let signed_hdr_names = signed_hdr_names.join(";"); + + canonical_request.push(String::new()); + canonical_request.push(signed_hdr_names.clone()); + canonical_request.push(self.content_sha256); + + let canonical_request = canonical_request.join("\n"); + + let date = format!("{}", self.timestamp.format("%Y%m%d")); + let datetime = format!("{}", self.timestamp.format("%Y%m%dT%H%M%SZ")); + + trace!("canonical request:\n{canonical_request}"); + + let region = self.region; + + let request_hash = sha256_sum(canonical_request); + let scope = format!("{date}/{region}/s3/aws4_request"); + let string_to_sign = format!("AWS4-HMAC-SHA256\n{datetime}\n{scope}\n{request_hash}"); + + trace!("string to sign:\n{string_to_sign}"); + + let signing_key = generate_signing_key(secret_key, &date, ®ion, "s3"); + let signature = calculate_signature(signing_key, string_to_sign.as_bytes()); + + // and finally our authorization header + let authorization = format!("AWS4-HMAC-SHA256 Credential={access_key}/{scope},SignedHeaders={signed_hdr_names},Signature={signature}"); + + self.inner + .header("authorization", authorization) + .body(Full::::new(self.content.into())) + .unwrap() + } + + pub fn delete_object(region: String, bucket: String, key: String) -> Self { + Self::new( + region, + bucket, + Method::DELETE, + format!("/{key}"), + Vec::new(), + ) + } + + pub fn get_object(region: String, bucket: String, key: String) -> Self { + Self::new(region, bucket, Method::GET, format!("/{key}"), Vec::new()) + } + + pub fn put_object(region: String, bucket: String, key: String, value: Vec) -> Self { + let mut s = Self::new(region, bucket, Method::PUT, format!("/{key}"), value); + + s.inner = s + .inner + .header("date", s.timestamp.to_rfc2822()) + .header("x-amz-storage-class", "STANDARD"); + + s + } +} + /* the code below was taken from AWS Rust SDK */ /*