From 16e13f800cf69ecd5cede0315542d104eaf5996e Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Thu, 22 Aug 2024 11:09:35 +0200 Subject: [PATCH] request and response types --- apollo-router/src/services/json.rs | 349 +++++++++++++++++++++ apollo-router/src/services/json/service.rs | 1 + apollo-router/src/services/json/tests.rs | 1 + apollo-router/src/services/mod.rs | 5 +- 4 files changed, 355 insertions(+), 1 deletion(-) create mode 100644 apollo-router/src/services/json.rs create mode 100644 apollo-router/src/services/json/tests.rs diff --git a/apollo-router/src/services/json.rs b/apollo-router/src/services/json.rs new file mode 100644 index 0000000000..892ef6de07 --- /dev/null +++ b/apollo-router/src/services/json.rs @@ -0,0 +1,349 @@ +use std::pin::Pin; + +use futures::future::ready; +use futures::stream::once; +use futures::stream::StreamExt; +use futures::Stream; +use http::header::HeaderName; +use http::method::Method; +use http::HeaderValue; +use http::StatusCode; +use http::Uri; +use mime::APPLICATION_JSON; +use multimap::MultiMap; +use serde_json_bytes::json; +use serde_json_bytes::ByteString; +use serde_json_bytes::Map as JsonMap; +use serde_json_bytes::Value; +use static_assertions::assert_impl_all; +use tower::BoxError; + +use crate::http_ext::header_map; +use crate::http_ext::TryIntoHeaderName; +use crate::http_ext::TryIntoHeaderValue; +use crate::Context; + +pub(crate) mod service; +#[cfg(test)] +mod tests; + +pub type BoxService = tower::util::BoxService; +pub type BoxCloneService = tower::util::BoxCloneService; +pub type ServiceResult = Result; + +assert_impl_all!(Request: Send); +/// Represents the router processing step of the processing pipeline. +/// +/// This consists of the parsed graphql Request, HTTP headers and contextual data for extensions. +#[non_exhaustive] +pub struct Request { + /// Original request to the Router. + pub request: http::Request, + + /// Context for extension + pub context: Context, +} + +impl From> for Request { + fn from(request: http::Request) -> Self { + Self { + request, + context: Context::new(), + } + } +} + +impl std::fmt::Debug for Request { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Request") + .field("context", &self.context) + .finish() + } +} + +#[buildstructor::buildstructor] +impl Request { + /// This is the constructor (or builder) to use when constructing a real Request. + /// + /// Required parameters are required in non-testing code to create a Request. + #[allow(clippy::too_many_arguments)] + #[builder(visibility = "pub")] + fn new( + body: Value, + context: Context, + headers: MultiMap, + uri: Uri, + method: Method, + ) -> Result { + let mut request = http::Request::builder() + .uri(uri) + .method(method) + .body(body)?; + *request.headers_mut() = header_map(headers)?; + Ok(Self { request, context }) + } + + /// This is the constructor (or builder) to use when constructing a "fake" Request. + /// + /// This does not enforce the provision of the data that is required for a fully functional + /// Request. It's usually enough for testing, when a fully constructed Request is + /// difficult to construct and not required for the purposes of the test. + /// + /// In addition, fake requests are expected to be valid, and will panic if given invalid values. + #[builder(visibility = "pub")] + fn fake_new( + body: Option, + context: Option, + mut headers: MultiMap, + method: Option, + ) -> Result { + // Avoid testing requests getting blocked by the CSRF-prevention plugin + headers + .entry(http::header::CONTENT_TYPE.into()) + .or_insert(HeaderValue::from_static(APPLICATION_JSON.essence_str()).into()); + let context = context.unwrap_or_default(); + + Request::new( + body.unwrap_or(Value::Null), + context, + headers, + Uri::from_static("http://default"), + method.unwrap_or(Method::POST), + ) + } + + /// Create a request with an example query, for tests + #[builder(visibility = "pub")] + fn canned_new( + body: Option, + context: Option, + headers: MultiMap, + ) -> Result { + let default_body = json!({ + "query": " + query TopProducts($first: Int) { + topProducts(first: $first) { + upc + name + reviews { + id + product { name } + author { id name } + } + } + }", + "variables": { + "first": 2 + } + }); + + Self::fake_new(Some(default_body), context, headers, None) + } +} + +assert_impl_all!(Response: Send); +#[non_exhaustive] +pub struct Response { + pub response: http::Response, + pub context: Context, +} + +/// An asynchronous [`Stream`] of JSON objects. +/// +/// In some cases such as with `@defer`, a single HTTP response from the Router +/// may contain multiple GraphQL responses that will be sent at different times +/// (as more data becomes available). +/// +/// We represent this in Rust as a stream, +/// even if that stream happens to only contain one item. +pub type JsonStream = Pin + Send>>; + +#[buildstructor::buildstructor] +impl Response { + /// This is the constructor (or builder) to use when constructing a real Response.. + /// + /// Required parameters are required in non-testing code to create a Response.. + #[allow(clippy::too_many_arguments)] + #[builder(visibility = "pub")] + fn new( + body: Value, + status_code: Option, + headers: MultiMap, + context: Context, + ) -> Result { + // Build an http Response + let mut builder = http::Response::builder().status(status_code.unwrap_or(StatusCode::OK)); + for (key, values) in headers { + let header_name: HeaderName = key.try_into()?; + for value in values { + let header_value: HeaderValue = value.try_into()?; + builder = builder.header(header_name.clone(), header_value); + } + } + + let response = builder.body(once(ready(body)).boxed())?; + + Ok(Self { response, context }) + } + + /// This is the constructor (or builder) to use when constructing a "fake" Response. + /// + /// This does not enforce the provision of the data that is required for a fully functional + /// Response. It's usually enough for testing, when a fully constructed Response is + /// difficult to construct and not required for the purposes of the test. + /// + /// In addition, fake responses are expected to be valid, and will panic if given invalid values. + #[allow(clippy::too_many_arguments)] + #[builder(visibility = "pub")] + fn fake_new( + body: Option, + status_code: Option, + headers: MultiMap, + context: Option, + ) -> Result { + Response::new( + body.unwrap_or(Value::Null), + status_code, + headers, + context.unwrap_or_default(), + ) + } + + /// This is the constructor (or builder) to use when constructing a "fake" Response stream. + /// + /// This does not enforce the provision of the data that is required for a fully functional + /// Response. It's usually enough for testing, when a fully constructed Response is + /// difficult to construct and not required for the purposes of the test. + /// + /// In addition, fake responses are expected to be valid, and will panic if given invalid values. + #[builder(visibility = "pub")] + fn fake_stream_new( + responses: Vec, + status_code: Option, + headers: MultiMap, + context: Context, + ) -> Result { + let mut builder = http::Response::builder().status(status_code.unwrap_or(StatusCode::OK)); + for (key, values) in headers { + let header_name: HeaderName = key.try_into()?; + for value in values { + let header_value: HeaderValue = value.try_into()?; + builder = builder.header(header_name.clone(), header_value); + } + } + + let stream = futures::stream::iter(responses); + let response = builder.body(stream.boxed())?; + Ok(Self { response, context }) + } + + /// This is the constructor (or builder) to use when constructing a Response that represents a global error. + /// It has no path and no response data. + /// This is useful for things such as authentication errors. + #[builder(visibility = "pub")] + fn error_new( + status_code: Option, + headers: MultiMap, + context: Context, + ) -> Result { + Response::new( + Value::Null, + status_code, + headers, + context, + ) + } + + /// This is the constructor (or builder) to use when constructing a real Response.. + /// + /// Required parameters are required in non-testing code to create a Response.. + #[allow(clippy::too_many_arguments)] + #[builder(visibility = "pub(crate)")] + fn infallible_new( + body: Value, + // Skip the `Object` type alias in order to use buildstructor’s map special-casing + extensions: JsonMap, + status_code: Option, + headers: MultiMap, + context: Context, + ) -> Self { + // Build an http Response + let mut builder = http::Response::builder().status(status_code.unwrap_or(StatusCode::OK)); + for (header_name, values) in headers { + for header_value in values { + builder = builder.header(header_name.clone(), header_value); + } + } + + let response = builder.body(once(ready(body)).boxed()).expect("can't fail"); + + Self { response, context } + } +} + +impl Response { + pub async fn next_response(&mut self) -> Option { + self.response.body_mut().next().await + } + + pub(crate) fn new_from_response( + response: http::Response, + context: Context, + ) -> Self { + Self { response, context } + } + + pub fn map(self, f: F) -> Response + where + F: FnOnce(JsonStream) -> JsonStream, + { + Response { + context: self.context, + response: self.response.map(f), + } + } + + /// Returns a new supergraph response where each [`graphql::Response`] is mapped through `f`. + /// + /// In supergraph and execution services, the service response contains + /// not just one GraphQL response but a stream of them, + /// in order to support features such as `@defer`. + /// This method uses [`futures::stream::StreamExt::map`] to map over each item in the stream. + /// + /// # Example + /// + /// ``` + /// use apollo_router::services::supergraph; + /// use apollo_router::layers::ServiceExt as _; + /// use tower::ServiceExt as _; + /// + /// struct ExamplePlugin; + /// + /// #[async_trait::async_trait] + /// impl apollo_router::plugin::Plugin for ExamplePlugin { + /// # type Config = (); + /// # async fn new( + /// # _init: apollo_router::plugin::PluginInit, + /// # ) -> Result { + /// # Ok(Self) + /// # } + /// // … + /// fn supergraph_service(&self, inner: supergraph::BoxService) -> supergraph::BoxService { + /// inner + /// .map_response(|supergraph_response| { + /// supergraph_response.map_stream(|graphql_response| { + /// // Something interesting here + /// graphql_response + /// }) + /// }) + /// .boxed() + /// } + /// } + /// ``` + pub fn map_stream(self, f: F) -> Self + where + F: 'static + Send + FnMut(Value) -> Value, + { + self.map(move |stream| stream.map(f).boxed()) + } +} diff --git a/apollo-router/src/services/json/service.rs b/apollo-router/src/services/json/service.rs index e69de29bb2..8b13789179 100644 --- a/apollo-router/src/services/json/service.rs +++ b/apollo-router/src/services/json/service.rs @@ -0,0 +1 @@ + diff --git a/apollo-router/src/services/json/tests.rs b/apollo-router/src/services/json/tests.rs new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/apollo-router/src/services/json/tests.rs @@ -0,0 +1 @@ + diff --git a/apollo-router/src/services/mod.rs b/apollo-router/src/services/mod.rs index 76c31331c1..b96d0386d9 100644 --- a/apollo-router/src/services/mod.rs +++ b/apollo-router/src/services/mod.rs @@ -3,6 +3,7 @@ use std::sync::Arc; pub(crate) use self::execution::service::*; +pub(crate) use self::json::service::*; pub(crate) use self::query_planner::*; pub(crate) use self::subgraph_service::*; pub(crate) use self::supergraph::service::*; @@ -13,6 +14,8 @@ pub use crate::http_ext::TryIntoHeaderValue; pub use crate::query_planner::OperationKind; pub(crate) use crate::services::execution::Request as ExecutionRequest; pub(crate) use crate::services::execution::Response as ExecutionResponse; +pub(crate) use crate::services::json::Request as JsonRequest; +pub(crate) use crate::services::json::Response as JsonResponse; pub(crate) use crate::services::query_planner::Request as QueryPlannerRequest; pub(crate) use crate::services::query_planner::Response as QueryPlannerResponse; pub(crate) use crate::services::router::Request as RouterRequest; @@ -26,10 +29,10 @@ pub(crate) use crate::services::supergraph::Response as SupergraphResponse; pub mod execution; pub(crate) mod external; pub(crate) mod http; +pub(crate) mod json; pub(crate) mod layers; pub(crate) mod new_service; pub(crate) mod query_planner; -pub(crate) mod json; pub mod router; pub mod subgraph; pub(crate) mod subgraph_service;