diff --git a/apollo-router/src/graphql/request.rs b/apollo-router/src/graphql/request.rs index 1e51262dbf..e7b30ead4c 100644 --- a/apollo-router/src/graphql/request.rs +++ b/apollo-router/src/graphql/request.rs @@ -193,7 +193,9 @@ impl Request { } } - fn process_batch_values(value: &serde_json::Value) -> Result, serde_json::Error> { + pub(crate) fn process_batch_values( + value: &serde_json::Value, + ) -> Result, serde_json::Error> { let mut result = Request::allocate_result_array(value); if value.is_array() { diff --git a/apollo-router/src/services/json.rs b/apollo-router/src/services/json.rs index 892ef6de07..eefd3ffbda 100644 --- a/apollo-router/src/services/json.rs +++ b/apollo-router/src/services/json.rs @@ -18,6 +18,7 @@ use serde_json_bytes::Value; use static_assertions::assert_impl_all; use tower::BoxError; +use crate::graphql; use crate::http_ext::header_map; use crate::http_ext::TryIntoHeaderName; use crate::http_ext::TryIntoHeaderValue; @@ -242,12 +243,14 @@ impl Response { /// This is useful for things such as authentication errors. #[builder(visibility = "pub")] fn error_new( + errors: Vec, status_code: Option, headers: MultiMap, context: Context, ) -> Result { + let res = graphql::Response::builder().errors(errors).build(); Response::new( - Value::Null, + serde_json_bytes::to_value(res).expect("JSON serialization should not fail"), status_code, headers, context, diff --git a/apollo-router/src/services/json/service.rs b/apollo-router/src/services/json/service.rs index 8b13789179..1853e23200 100644 --- a/apollo-router/src/services/json/service.rs +++ b/apollo-router/src/services/json/service.rs @@ -1 +1,719 @@ +use std::sync::Arc; +use std::task::Poll; +use futures::future::join_all; +use futures::future::BoxFuture; +use http::header::CONTENT_TYPE; +use http::request::Parts; +use http::Method; +use http::StatusCode; +use mime::APPLICATION_JSON; +use serde_json_bytes::Value; +use tower::BoxError; +use tower::Service; + +use super::Request as JsonRequest; +use super::Response as JsonResponse; +use crate::batching::Batch; +use crate::batching::BatchQuery; +use crate::configuration::Batching; +use crate::configuration::BatchingMode; +use crate::context::CONTAINS_GRAPHQL_ERROR; +use crate::graphql; +use crate::http_ext; +use crate::services::router::ClientRequestAccepts; +use crate::services::SupergraphCreator; +use crate::services::SupergraphRequest; +use crate::Context; + +/// Containing [`Service`] in the request lifecyle. +#[derive(Clone)] +pub(crate) struct JsonServerService { + pub(crate) supergraph_creator: Arc, + batching: Batching, +} + +#[buildstructor::buildstructor] +impl JsonServerService { + #[builder] + pub(crate) fn new(supergraph_creator: Arc, batching: Batching) -> Self { + JsonServerService { + supergraph_creator, + batching, + } + } +} + +impl Service for JsonServerService { + type Response = JsonResponse; + type Error = BoxError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: JsonRequest) -> Self::Future { + let JsonRequest { request, context } = req; + + /*match serde_json_bytes::::from_value(body) { + Ok(request) => { + let response = self.supergraph_creator.create().oneshot(request).await.map_err(|e| { + todo!() + })?; + Ok(JsonResponse::new_from_graphql_response(response, context)) + }, Err(e) => { + Ok(JsonResponse::error_builder() + .error(e) + .status_code(StatusCode::BAD_REQUEST) + .build()) + } + + }*/ + todo!() + + /*// Consume our cloned services and allow ownership to be transferred to the async block. + let clone = self.query_planner_service.clone(); + + let planning = std::mem::replace(&mut self.query_planner_service, clone); + + let schema = self.schema.clone(); + + let context_cloned = req.context.clone(); + let fut = service_call( + planning, + self.execution_service_factory.clone(), + schema, + req, + self.notify.clone(), + ) + .or_else(|error: BoxError| async move { + let errors = vec![crate::error::Error { + message: error.to_string(), + extensions: serde_json_bytes::json!({ + "code": "INTERNAL_SERVER_ERROR", + }) + .as_object() + .unwrap() + .to_owned(), + ..Default::default() + }]; + + Ok(SupergraphResponse::infallible_builder() + .errors(errors) + .status_code(StatusCode::INTERNAL_SERVER_ERROR) + .context(context_cloned) + .build()) + }); + + Box::pin(fut)*/ + } +} + +impl JsonServerService { + async fn call_inner(&self, req: JsonRequest) -> Result { + let context = req.context.clone(); + + let (supergraph_requests, is_batch) = match self.translate_request(req).await { + Ok(requests) => requests, + Err(err) => { + //FIXME: remove? + u64_counter!( + "apollo_router_http_requests_total", + "Total number of HTTP requests made.", + 1, + status = err.status.as_u16() as i64, + error = err.error.to_string() + ); + // Useful for selector in spans/instruments/events + context + .insert_json_value(CONTAINS_GRAPHQL_ERROR, serde_json_bytes::Value::Bool(true)); + + return JsonResponse::error_builder() + .error( + graphql::Error::builder() + .message(String::from("Invalid GraphQL request")) + .extension_code(err.extension_code) + .extension("details", err.extension_details) + .build(), + ) + .status_code(err.status) + .header(CONTENT_TYPE, APPLICATION_JSON.essence_str()) + .context(context) + .build(); + } + }; + + // We need to handle cases where a failure is part of a batch and thus must be cancelled. + // Requests can be cancelled at any point of the router pipeline, but all failures bubble back + // up through here, so we can catch them without having to specially handle batch queries in + // other portions of the codebase. + let futures = supergraph_requests + .into_iter() + .map(|supergraph_request| async { + // We clone the context here, because if the request results in an Err, the + // response context will no longer exist. + let context = supergraph_request.context.clone(); + let result = self.process_supergraph_request(supergraph_request).await; + + // Regardless of the result, we need to make sure that we cancel any potential batch queries. This is because + // custom rust plugins, rhai scripts, and coprocessors can cancel requests at any time and return a GraphQL + // error wrapped in an `Ok` or in a `BoxError` wrapped in an `Err`. + let batch_query_opt = context + .extensions() + .with_lock(|mut lock| lock.remove::()); + if let Some(batch_query) = batch_query_opt { + // Only proceed with signalling cancelled if the batch_query is not finished + if !batch_query.finished() { + tracing::debug!("cancelling batch query in supergraph response"); + batch_query + .signal_cancelled("request terminated by user".to_string()) + .await?; + } + } + + result + }); + + // Use join_all to preserve ordering of concurrent operations + // (Short circuit processing and propagate any errors in the batch) + // Note: We use `join_all` here since it awaits all futures before returning, thus allowing us to + // handle cancellation logic without fear of the other futures getting killed. + let mut results: Vec = join_all(futures) + .await + .into_iter() + .collect::, BoxError>>()?; + + // If we detected we are processing a batch, return an array of results even if there is only + // one result + if is_batch { + let mut results_it = results.into_iter(); + let first = results_it + .next() + .expect("we should have at least one response"); + let (parts, body) = first.response.into_parts(); + let context = first.context; + let mut bytes = BytesMut::new(); + bytes.put_u8(b'['); + bytes.extend_from_slice(&get_body_bytes(body).await?); + for result in results_it { + bytes.put(&b", "[..]); + bytes.extend_from_slice(&get_body_bytes(result.response.into_body()).await?); + } + bytes.put_u8(b']'); + + Ok(JsonResponse { + response: http::Response::from_parts( + parts, + RouterBody::from(bytes.freeze()).into_inner(), + ), + context, + }) + } else { + Ok(results.pop().expect("we should have at least one response")) + } + } + + async fn translate_query_request( + &self, + parts: &Parts, + ) -> Result<(Vec, bool), TranslateError> { + let mut is_batch = false; + parts.uri.query().map(|q| { + let mut result = vec![]; + + match graphql::Request::from_urlencoded_query(q.to_string()) { + Ok(request) => { + result.push(request); + } + Err(err) => { + // It may be a batch of requests, so try that (if config allows) before + // erroring out + if self.batching.enabled + && matches!(self.batching.mode, BatchingMode::BatchHttpLink) + { + result = graphql::Request::batch_from_urlencoded_query(q.to_string()) + .map_err(|e| TranslateError { + status: StatusCode::BAD_REQUEST, + error: "failed to decode a valid GraphQL request from path", + extension_code: "INVALID_GRAPHQL_REQUEST", + extension_details: format!( + "failed to decode a valid GraphQL request from path {e}" + ), + })?; + if result.is_empty() { + return Err(TranslateError { + status: StatusCode::BAD_REQUEST, + error: "failed to decode a valid GraphQL request from path", + extension_code: "INVALID_GRAPHQL_REQUEST", + extension_details: "failed to decode a valid GraphQL request from path: empty array ".to_string() + }); + } + is_batch = true; + } else if !q.is_empty() && q.as_bytes()[0] == b'[' { + let extension_details = if self.batching.enabled + && !matches!(self.batching.mode, BatchingMode::BatchHttpLink) { + format!("batching not supported for mode `{}`", self.batching.mode) + } else { + "batching not enabled".to_string() + }; + return Err(TranslateError { + status: StatusCode::BAD_REQUEST, + error: "batching not enabled", + extension_code: "BATCHING_NOT_ENABLED", + extension_details, + }); + } else { + return Err(TranslateError { + status: StatusCode::BAD_REQUEST, + error: "failed to decode a valid GraphQL request from path", + extension_code: "INVALID_GRAPHQL_REQUEST", + extension_details: format!( + "failed to decode a valid GraphQL request from path {err}" + ), + }); + } + } + }; + Ok((result, is_batch)) + }).unwrap_or_else(|| { + Err(TranslateError { + status: StatusCode::BAD_REQUEST, + error: "There was no GraphQL operation to execute. Use the `query` parameter to send an operation, using either GET or POST.", + extension_code: "INVALID_GRAPHQL_REQUEST", + extension_details: "There was no GraphQL operation to execute. Use the `query` parameter to send an operation, using either GET or POST.".to_string() + }) + }) + } + + fn translate_body_request( + &self, + value: Value, + ) -> Result<(Vec, bool), TranslateError> { + let mut result = vec![]; + let mut is_batch = false; + + match serde_json_bytes::::from_value(value) { + Ok(request) => { + result.push(request); + } + Err(err) => { + if self.batching.enabled + && matches!(self.batching.mode, BatchingMode::BatchHttpLink) + { + //FIXME: batching works on serde_json::Value, not on serde_json_bytes::Value + let v = + serde_json_bytes::from_value::(value).map_err(|e| { + TranslateError { + status: StatusCode::BAD_REQUEST, + error: "failed to deserialize the request body into JSON", + extension_code: "INVALID_GRAPHQL_REQUEST", + extension_details: format!( + "failed to deserialize the request body into JSON: {e}" + ), + } + })?; + result = + graphql::Request::process_batch_values(&v).map_err(|e| TranslateError { + status: StatusCode::BAD_REQUEST, + error: "failed to deserialize the request body into JSON", + extension_code: "INVALID_GRAPHQL_REQUEST", + extension_details: format!( + "failed to deserialize the request body into JSON: {e}" + ), + })?; + if result.is_empty() { + return Err(TranslateError { + status: StatusCode::BAD_REQUEST, + error: "failed to decode a valid GraphQL request from path", + extension_code: "INVALID_GRAPHQL_REQUEST", + extension_details: + "failed to decode a valid GraphQL request from path: empty array " + .to_string(), + }); + } + is_batch = true; + } else if value.is_array() { + let extension_details = if self.batching.enabled + && !matches!(self.batching.mode, BatchingMode::BatchHttpLink) + { + format!("batching not supported for mode `{}`", self.batching.mode) + } else { + "batching not enabled".to_string() + }; + return Err(TranslateError { + status: StatusCode::BAD_REQUEST, + error: "batching not enabled", + extension_code: "BATCHING_NOT_ENABLED", + extension_details, + }); + } else { + return Err(TranslateError { + status: StatusCode::BAD_REQUEST, + error: "failed to deserialize the request body into JSON", + extension_code: "INVALID_GRAPHQL_REQUEST", + extension_details: format!( + "failed to deserialize the request body into JSON: {err}" + ), + }); + } + } + }; + Ok((result, is_batch)) + } + + async fn translate_request( + &self, + req: JsonRequest, + ) -> Result<(Vec, bool), TranslateError> { + let JsonRequest { request, context } = req; + + let (parts, body) = request.into_parts(); + + let graphql_requests: Result<(Vec, bool), TranslateError> = + if parts.method == Method::GET { + self.translate_query_request(&parts).await + } else { + self.translate_body_request(&body).await + /* + // FIXME: use a try block when available: https://github.com/rust-lang/rust/issues/31436 + let content_length = (|| { + parts + .headers + .get(http::header::CONTENT_LENGTH)? + .to_str() + .ok()? + .parse() + .ok() + })(); + if content_length.unwrap_or(0) > self.http_max_request_bytes { + Err(TranslateError { + status: StatusCode::PAYLOAD_TOO_LARGE, + error: "payload too large for the `http_max_request_bytes` configuration", + extension_code: "INVALID_GRAPHQL_REQUEST", + extension_details: "payload too large".to_string(), + }) + } else { + let body = http_body::Limited::new(body, self.http_max_request_bytes); + get_body_bytes(body) + .instrument(tracing::debug_span!("receive_body")) + .await + .map_err(|e| { + if e.is::() { + TranslateError { + status: StatusCode::PAYLOAD_TOO_LARGE, + error: "payload too large for the `http_max_request_bytes` configuration", + extension_code: "INVALID_GRAPHQL_REQUEST", + extension_details: "payload too large".to_string(), + } + } else { + TranslateError { + status: StatusCode::BAD_REQUEST, + error: "failed to get the request body", + extension_code: "INVALID_GRAPHQL_REQUEST", + extension_details: format!("failed to get the request body: {e}"), + } + } + }) + .and_then(|bytes| { + self.translate_bytes_request(&bytes) + }) + }*/ + }; + + let (ok_results, is_batch) = graphql_requests?; + let mut results = Vec::with_capacity(ok_results.len()); + let batch_size = ok_results.len(); + + // Modifying our Context extensions. + // If we are processing a batch (is_batch == true), insert our batching configuration. + // If subgraph batching configuration exists and is enabled for any of our subgraphs, we create our shared batch details + let shared_batch_details = (is_batch) + .then(|| { + context + .extensions() + .with_lock(|mut lock| lock.insert(self.batching.clone())); + + self.batching.subgraph.as_ref() + }) + .flatten() + .map(|subgraph_batching_config| { + subgraph_batching_config.all.enabled + || subgraph_batching_config + .subgraphs + .values() + .any(|v| v.enabled) + }) + .and_then(|a| a.then_some(Arc::new(Batch::spawn_handler(batch_size)))); + + let mut ok_results_it = ok_results.into_iter(); + let first = ok_results_it + .next() + .expect("we should have at least one request"); + let sg = http::Request::from_parts(parts, first); + + // Building up the batch of supergraph requests is tricky. + // Firstly note that any http extensions are only propagated for the first request sent + // through the pipeline. This is because there is simply no way to clone http + // extensions. + // + // Secondly, we can't clone extensions, but we need to propagate at least + // ClientRequestAccepts to ensure correct processing of the response. We do that manually, + // but the concern is that there may be other extensions that wish to propagate into + // each request or we may add them in future and not know about it here... + // + // (Technically we could clone extensions, since it is held under an `Arc`, but that + // would mean all the requests in a batch shared the same set of extensions and review + // comments expressed the sentiment that this may be a bad thing...) + // + // Note: If we enter this loop, then we must be processing a batch. + for (index, graphql_request) in ok_results_it.enumerate() { + // XXX Lose http extensions, is that ok? + let mut new = http_ext::clone_http_request(&sg); + *new.body_mut() = graphql_request; + // XXX Lose some private entries, is that ok? + let new_context = Context::new(); + new_context.extend(&context); + let client_request_accepts_opt = context + .extensions() + .with_lock(|lock| lock.get::().cloned()); + // We are only going to insert a BatchQuery if Subgraph processing is enabled + let b_for_index_opt = if let Some(shared_batch_details) = &shared_batch_details { + Some( + Batch::query_for_index(shared_batch_details.clone(), index + 1).map_err( + |err| TranslateError { + status: StatusCode::INTERNAL_SERVER_ERROR, + error: "failed to create batch", + extension_code: "BATCHING_ERROR", + extension_details: format!("failed to create batch entry: {err}"), + }, + )?, + ) + } else { + None + }; + new_context.extensions().with_lock(|mut lock| { + if let Some(client_request_accepts) = client_request_accepts_opt { + lock.insert(client_request_accepts); + } + lock.insert(self.batching.clone()); + // We are only going to insert a BatchQuery if Subgraph processing is enabled + if let Some(b_for_index) = b_for_index_opt { + lock.insert(b_for_index); + } + }); + results.push(SupergraphRequest { + supergraph_request: new, + context: new_context, + }); + } + + if let Some(shared_batch_details) = shared_batch_details { + let b_for_index = + Batch::query_for_index(shared_batch_details, 0).map_err(|err| TranslateError { + status: StatusCode::INTERNAL_SERVER_ERROR, + error: "failed to create batch", + extension_code: "BATCHING_ERROR", + extension_details: format!("failed to create batch entry: {err}"), + })?; + context + .extensions() + .with_lock(|mut lock| lock.insert(b_for_index)); + } + + results.insert( + 0, + SupergraphRequest { + supergraph_request: sg, + context, + }, + ); + + Ok((results, is_batch)) + } + + async fn process_supergraph_request( + &self, + supergraph_request: SupergraphRequest, + ) -> Result { + let mut request_res = self + .persisted_query_layer + .supergraph_request(supergraph_request); + + if let Ok(supergraph_request) = request_res { + request_res = self.apq_layer.supergraph_request(supergraph_request).await; + } + + let SupergraphResponse { response, context } = match request_res { + Err(response) => response, + Ok(request) => match self.query_analysis_layer.supergraph_request(request).await { + Err(response) => response, + Ok(request) => match self + .persisted_query_layer + .supergraph_request_with_analyzed_query(request) + .await + { + Err(response) => response, + Ok(request) => self.supergraph_creator.create().oneshot(request).await?, + }, + }, + }; + + let ClientRequestAccepts { + wildcard: accepts_wildcard, + json: accepts_json, + multipart_defer: accepts_multipart_defer, + multipart_subscription: accepts_multipart_subscription, + } = context + .extensions() + .with_lock(|lock| lock.get().cloned()) + .unwrap_or_default(); + + let (mut parts, mut body) = response.into_parts(); + process_vary_header(&mut parts.headers); + + if context + .extensions() + .with_lock(|lock| lock.get::().is_some()) + { + parts.status = StatusCode::from_u16(499) + .expect("499 is not a standard status code but common enough"); + } + + match body.next().await { + None => { + tracing::error!("router service is not available to process request",); + Ok(router::Response { + response: http::Response::builder() + .status(StatusCode::SERVICE_UNAVAILABLE) + .body( + RouterBody::from("router service is not available to process request") + .into_inner(), + ) + .expect("cannot fail"), + context, + }) + } + Some(response) => { + if !response.has_next.unwrap_or(false) + && !response.subscribed.unwrap_or(false) + && (accepts_json || accepts_wildcard) + { + if !response.errors.is_empty() { + Self::count_errors(&response.errors); + } + + parts + .headers + .insert(CONTENT_TYPE, APPLICATION_JSON_HEADER_VALUE.clone()); + tracing::trace_span!("serialize_response").in_scope(|| { + let body = serde_json::to_string(&response)?; + Ok(router::Response { + response: http::Response::from_parts( + parts, + RouterBody::from(body).into_inner(), + ), + context, + }) + }) + } else if accepts_multipart_defer || accepts_multipart_subscription { + if accepts_multipart_defer { + parts.headers.insert( + CONTENT_TYPE, + MULTIPART_DEFER_CONTENT_TYPE_HEADER_VALUE.clone(), + ); + } else if accepts_multipart_subscription { + parts.headers.insert( + CONTENT_TYPE, + MULTIPART_SUBSCRIPTION_CONTENT_TYPE_HEADER_VALUE.clone(), + ); + } + + if !response.errors.is_empty() { + Self::count_errors(&response.errors); + } + + // Useful when you're using a proxy like nginx which enable proxy_buffering by default (http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering) + parts.headers.insert( + ACCEL_BUFFERING_HEADER_NAME.clone(), + ACCEL_BUFFERING_HEADER_VALUE.clone(), + ); + let multipart_stream = match response.subscribed { + Some(true) => StreamBody::new(Multipart::new( + body.inspect(|response| { + if !response.errors.is_empty() { + Self::count_errors(&response.errors); + } + }), + ProtocolMode::Subscription, + )), + _ => StreamBody::new(Multipart::new( + once(ready(response)).chain(body.inspect(|response| { + if !response.errors.is_empty() { + Self::count_errors(&response.errors); + } + })), + ProtocolMode::Defer, + )), + }; + let response = (parts, multipart_stream).into_response().map(|body| { + // Axum makes this `body` have type: + // https://docs.rs/http-body/0.4.5/http_body/combinators/struct.UnsyncBoxBody.html + let mut body = Box::pin(body); + // We make a stream based on its `poll_data` method + // in order to create a `hyper::Body`. + RouterBody::wrap_stream(stream::poll_fn(move |ctx| { + body.as_mut().poll_data(ctx) + })) + .into_inner() + // … but we ignore the `poll_trailers` method: + // https://docs.rs/http-body/0.4.5/http_body/trait.Body.html#tymethod.poll_trailers + // Apparently HTTP/2 trailers are like headers, except after the response body. + // I (Simon) believe nothing in the Apollo Router uses trailers as of this writing, + // so ignoring `poll_trailers` is fine. + // If we want to use trailers, we may need remove this convertion to `hyper::Body` + // and return `UnsyncBoxBody` (a.k.a. `axum::BoxBody`) as-is. + }); + + Ok(RouterResponse { response, context }) + } else { + tracing::info!( + monotonic_counter.apollo.router.graphql_error = 1u64, + code = "INVALID_ACCEPT_HEADER" + ); + // Useful for selector in spans/instruments/events + context.insert_json_value( + CONTAINS_GRAPHQL_ERROR, + serde_json_bytes::Value::Bool(true), + ); + + // this should be unreachable due to a previous check, but just to be sure... + Ok(JsonResponse::error_builder() + .error( + graphql::Error::builder() + .message(format!( + r#"'accept' header must be one of: \"*/*\", {:?}, {:?}, {:?} or {:?}"#, + APPLICATION_JSON.essence_str(), + GRAPHQL_JSON_RESPONSE_HEADER_VALUE, + MULTIPART_DEFER_ACCEPT, + MULTIPART_SUBSCRIPTION_ACCEPT, + )) + .extension_code("INVALID_ACCEPT_HEADER") + .build(), + ) + .status_code(StatusCode::NOT_ACCEPTABLE) + .header(CONTENT_TYPE, APPLICATION_JSON.essence_str()) + .context(context) + .build()?) + } + } + } + } +} + +struct TranslateError<'a> { + status: StatusCode, + error: &'a str, + extension_code: &'a str, + extension_details: String, +}