Skip to content

Commit

Permalink
Upgrade to hyper 1.1
Browse files Browse the repository at this point in the history
  • Loading branch information
alexliesenfeld committed Jan 31, 2024
1 parent be377c9 commit eebfb1b
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 51 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## Version 0.7.1

- Internal mock server implementation is now based on hyper 1.1.
- This release also updates all other dependencies to the most recent version.


## Version 0.7.0

- **BREAKING CHANGES**:
Expand Down
8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "httpmock"
version = "0.7.0"
version = "0.7.1"
authors = ["Alexander Liesenfeld <alexander.liesenfeld@outlook.com>"]
edition = "2018"
description = "HTTP mocking library for Rust"
Expand Down Expand Up @@ -29,18 +29,20 @@ levenshtein = "1.0"
form_urlencoded = "1.2"

hyper = { version = "1.1", features = ["server", "http1"] }
hyper-util = { version = "0.1", features = ["tokio"] }
http-body-util = "0.1"
tokio = { version = "1.35", features = ["sync", "macros", "rt-multi-thread", "signal"] }

isahc = { version = "1.7", optional = true }
basic-cookies = { version = "0.1", optional = true }
colored = { version = "2.1", optional = true }
clap = { version = "4.4", features = ["derive", "env"], optional = true }
env_logger = { version = "0.11", optional = true }
env_logger = { version = "0.10", optional = true }
serde_yaml = { version = "0.9", optional = true }
async-std = { version = "1.12", features = ["attributes", "unstable"] }

[dev-dependencies]
env_logger = "0.11"
env_logger = "0.10"
tokio-test = "0.4"
quote = "1.0"
actix-rt = "2.9"
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# ================================================================================
# Builder
# ================================================================================
FROM rust:1.70 as builder
FROM rust:1.72 as builder
WORKDIR /usr/src/httpmock

COPY Cargo.toml .
Expand Down
122 changes: 75 additions & 47 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::{Arc, Mutex};

use hyper::body::Buf;
use hyper::body::{Body, Buf, Bytes};
use hyper::header::HeaderValue;
use hyper::http::header::HeaderName;
use hyper::service::{make_service_fn, service_fn};
use hyper::service::{service_fn};
use hyper::{
Body, HeaderMap, Request as HyperRequest, Response as HyperResponse, Result as HyperResult,
Server, StatusCode,
HeaderMap, Request as HyperRequest, Response as HyperResponse, Result as HyperResult, StatusCode,
body::Incoming as IncomingBody
};
use hyper_util::rt::tokio::{TokioIo};

use regex::Regex;

use matchers::generic::SingleValueMatcher;
Expand Down Expand Up @@ -45,14 +47,19 @@ use crate::server::matchers::Matcher;
use crate::server::web::routes;
use futures_util::task::Spawn;
use std::future::Future;
use std::iter::Map;

use std::time::Instant;
use futures_util::{FutureExt};
use http_body_util::{BodyExt, Full};
use tokio::net::{TcpListener};

mod matchers;

mod util;
pub(crate) mod web;



/// The shared state accessible to all handlers
pub struct MockServerState {
id_counter: AtomicUsize,
Expand Down Expand Up @@ -146,7 +153,7 @@ impl MockServerState {
}),
// Cookie exact
#[cfg(feature = "cookies")]
Box::new(MultiValueMatcher {
Box::new(MultiValueMatcher {
entity_name: "cookie",
key_comparator: Box::new(StringExactMatchComparator::new(true)),
value_comparator: Box::new(StringExactMatchComparator::new(true)),
Expand All @@ -160,7 +167,7 @@ impl MockServerState {
}),
// Cookie exists
#[cfg(feature = "cookies")]
Box::new(MultiValueMatcher {
Box::new(MultiValueMatcher {
entity_name: "cookie",
key_comparator: Box::new(StringExactMatchComparator::new(true)),
value_comparator: Box::new(AnyValueComparator::new()),
Expand Down Expand Up @@ -311,7 +318,7 @@ pub(crate) struct ServerRequestHeader {
}

impl ServerRequestHeader {
pub fn from(req: &HyperRequest<Body>) -> Result<ServerRequestHeader, String> {
pub fn from(req: &HyperRequest<IncomingBody>) -> Result<ServerRequestHeader, String> {
let headers = extract_headers(req.headers());
if let Err(e) = headers {
return Err(format!("error parsing headers: {}", e));
Expand Down Expand Up @@ -374,13 +381,13 @@ fn extract_headers(header_map: &HeaderMap) -> Result<Vec<(String, String)>, Stri
}

async fn access_log_middleware<T>(
req: HyperRequest<Body>,
req: HyperRequest<IncomingBody>,
state: Arc<MockServerState>,
print_access_log: bool,
next: fn(req: HyperRequest<Body>, state: Arc<MockServerState>) -> T,
) -> HyperResult<HyperResponse<Body>>
where
T: Future<Output = HyperResult<HyperResponse<Body>>>,
next: fn(req: HyperRequest<IncomingBody>, state: Arc<MockServerState>) -> T,
) -> HyperResult<HyperResponse<Full<Bytes>>>
where
T: Future<Output=HyperResult<HyperResponse<Full<Bytes>>>>,
{
let time_request_received = Instant::now();

Expand All @@ -407,26 +414,28 @@ where
}

async fn handle_server_request(
req: HyperRequest<Body>,
req: HyperRequest<IncomingBody>,
state: Arc<MockServerState>,
) -> HyperResult<HyperResponse<Body>> {
) -> HyperResult<HyperResponse<Full<Bytes>>> {
let request_header = ServerRequestHeader::from(&req);

if let Err(e) = request_header {
return Ok(error_response(format!("Cannot parse request: {}", e)));
}

let body = hyper::body::to_bytes(req.into_body()).await;
if let Err(e) = body {
return Ok(error_response(format!("Cannot read request body: {}", e)));
let body_parts = BodyExt::collect(req).await;
if let Err(e) = body_parts {
return Ok(error_response(format!("Cannot read request body chunks: {}", e)));
}

let full_body_bytes = body_parts.unwrap().to_bytes();

let routing_result = route_request(
state.borrow(),
&request_header.unwrap(),
body.unwrap().to_vec(),
full_body_bytes.to_vec(),
)
.await;
.await;
if let Err(e) = routing_result {
return Ok(error_response(format!("Request handler error: {}", e)));
}
Expand All @@ -450,26 +459,16 @@ pub(crate) async fn start_server<F>(
print_access_log: bool,
shutdown: F,
) -> Result<(), String>
where
F: Future<Output = ()>,
where
F: Future<Output=()>,
{
let host = if expose { "0.0.0.0" } else { "127.0.0.1" };

let state = state.clone();
let new_service = make_service_fn(move |_| {
let state = state.clone();
async move {
Ok::<_, GenericError>(service_fn(move |req: HyperRequest<Body>| {
let state = state.clone();
access_log_middleware(req, state, print_access_log, handle_server_request)
}))
}
});

let server = Server::bind(&format!("{}:{}", host, port).parse().unwrap()).serve(new_service);
let addr = server.local_addr();
let addr: SocketAddr = format!("{}:{}", host, port).parse().expect("cannot parse hostname and port");
let listener: TcpListener = TcpListener::bind(addr).await.expect("cannot bind to port");

if let Some(socket_addr_sender) = socket_addr_sender {
let addr = listener.local_addr().expect("cannot read local TCP address");
if let Err(e) = socket_addr_sender.send(addr) {
return Err(format!(
"Cannot send socket information to the test thread: {:?}",
Expand All @@ -478,19 +477,47 @@ where
}
}

// And now add a graceful shutdown signal...
let graceful = server.with_graceful_shutdown(shutdown);

log::info!("Listening on {}", addr);
if let Err(e) = graceful.await {
return Err(format!("Err: {}", e));

let shutdown = shutdown.shared();

loop {
tokio::select! {
accepted = listener.accept() => {
match accepted {
Ok((tcp_stream, remote_address)) => {
let io = TokioIo::new(tcp_stream);
let state = state.clone();

tokio::task::spawn_local(async move {
if let Err(err) = hyper::server::conn::http1::Builder::new()
.serve_connection(io, service_fn(move |req: HyperRequest<IncomingBody>| {
let state = state.clone();
access_log_middleware(req, state, print_access_log, handle_server_request)
})).await
{
// TODO: Do something useful with the error
println!("Error serving connection: {:?}", err);
}
});
},
Err(e) => {
// TODO: Do something useful with the error
println!("error serving connection: {:?}", e)
},
};
}
_ = shutdown.clone() => {
break;
}
}
}

Ok(())
}

/// Maps a server response to a hyper response.
fn map_response(route_response: ServerResponse) -> Result<HyperResponse<Body>, String> {
fn map_response(route_response: ServerResponse) -> Result<HyperResponse<Full<Bytes>>, String>{
let mut builder = HyperResponse::builder();
builder = builder.status(route_response.status);

Expand All @@ -514,7 +541,7 @@ fn map_response(route_response: ServerResponse) -> Result<HyperResponse<Body>, S
builder = builder.header(name.unwrap(), value.unwrap());
}

let result = builder.body(Body::from(route_response.body));
let result = builder.body(Full::new(Bytes::from(route_response.body)));
if let Err(e) = result {
return Err(format!("Cannot create HTTP response: {}", e));
}
Expand Down Expand Up @@ -605,10 +632,10 @@ fn get_path_param(regex: &Regex, idx: usize, path: &str) -> Result<usize, String
}

/// Creates a default error response.
fn error_response(body: String) -> HyperResponse<Body> {
fn error_response(body: String) -> HyperResponse<Full<Bytes>>{
HyperResponse::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(body))
.body(Full::new(Bytes::from(body)))
.expect("Cannot build route error response")
}

Expand All @@ -627,6 +654,7 @@ mod test {
use std::collections::BTreeMap;

use futures_util::TryStreamExt;
use http_body_util::BodyExt;

use crate::server::{
error_response, get_path_param, map_response, ServerResponse, HISTORY_PATH, MOCKS_PATH,
Expand Down Expand Up @@ -687,13 +715,13 @@ mod test {
let (parts, body) = res.into_parts();

let body = async_std::task::block_on(async {
return match hyper::body::to_bytes(body).await {
Ok(bytes) => bytes.to_vec(),
return match BodyExt::collect(body).await {
Ok(collected_bytes) => collected_bytes.to_bytes(),
Err(e) => panic!(e),
};
});

assert_eq!(String::from_utf8(body).unwrap(), "test".to_string())
assert_eq!(String::from_utf8(body.to_vec()).unwrap(), "test".to_string())
}

/// Makes sure an error is return if there is a header parsing error
Expand Down

0 comments on commit eebfb1b

Please sign in to comment.