Skip to content

Commit

Permalink
implement the JsonService
Browse files Browse the repository at this point in the history
not plugged to anything yet
  • Loading branch information
Geal committed Aug 23, 2024
1 parent b1bdca8 commit 6cf1866
Showing 1 changed file with 86 additions and 116 deletions.
202 changes: 86 additions & 116 deletions apollo-router/src/services/json/service.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use std::collections::HashMap;
use std::future::ready;
use std::sync::Arc;
use std::task::Poll;

use futures::future::join_all;
use futures::future::BoxFuture;
use futures::stream::once;
use futures::StreamExt;
use http::header::CONTENT_TYPE;
use http::request::Parts;
use http::Method;
Expand All @@ -11,34 +15,61 @@ use mime::APPLICATION_JSON;
use serde_json_bytes::Value;
use tower::BoxError;
use tower::Service;
use tower::ServiceExt;

use super::JsonStream;
use super::Request as JsonRequest;
use super::Response as JsonResponse;
use crate::axum_factory::CanceledRequest;
use crate::batching::Batch;
use crate::batching::BatchQuery;
use crate::configuration::Batching;
use crate::configuration::BatchingMode;
use crate::context::CONTAINS_GRAPHQL_ERROR;
use crate::graphql;
use crate::http_ext;
use crate::services::layers::apq::APQLayer;
use crate::services::layers::content_negotiation::GRAPHQL_JSON_RESPONSE_HEADER_VALUE;
use crate::services::layers::persisted_queries::PersistedQueryLayer;
use crate::services::layers::query_analysis::QueryAnalysisLayer;
use crate::services::new_service::ServiceFactory;
use crate::services::router::service::process_vary_header;
use crate::services::router::service::MULTIPART_DEFER_CONTENT_TYPE_HEADER_VALUE;
use crate::services::router::service::MULTIPART_SUBSCRIPTION_CONTENT_TYPE_HEADER_VALUE;
use crate::services::router::ClientRequestAccepts;
use crate::services::SupergraphCreator;
use crate::services::SupergraphRequest;
use crate::services::SupergraphResponse;
use crate::services::APPLICATION_JSON_HEADER_VALUE;
use crate::services::MULTIPART_DEFER_ACCEPT;
use crate::services::MULTIPART_SUBSCRIPTION_ACCEPT;
use crate::Context;

/// Containing [`Service`] in the request lifecyle.
#[derive(Clone)]
pub(crate) struct JsonServerService {
pub(crate) supergraph_creator: Arc<SupergraphCreator>,
apq_layer: APQLayer,
persisted_query_layer: Arc<PersistedQueryLayer>,
query_analysis_layer: QueryAnalysisLayer,
batching: Batching,
}

#[buildstructor::buildstructor]
impl JsonServerService {
#[builder]
pub(crate) fn new(supergraph_creator: Arc<SupergraphCreator>, batching: Batching) -> Self {
pub(crate) fn new(
supergraph_creator: Arc<SupergraphCreator>,
apq_layer: APQLayer,
persisted_query_layer: Arc<PersistedQueryLayer>,
query_analysis_layer: QueryAnalysisLayer,
batching: Batching,
) -> Self {
JsonServerService {
supergraph_creator,
apq_layer,
persisted_query_layer,
query_analysis_layer,
batching,
}
}
Expand Down Expand Up @@ -192,21 +223,12 @@ impl JsonServerService {
.next()
.expect("we should have at least one response");
let (parts, body) = first.response.into_parts();
let context = first.context;
let mut bytes = BytesMut::new();
bytes.put_u8(b'[');
bytes.extend_from_slice(&get_body_bytes(body).await?);
for result in results_it {
bytes.put(&b", "[..]);
bytes.extend_from_slice(&get_body_bytes(result.response.into_body()).await?);
}
bytes.put_u8(b']');
let bodies = body.collect().await;
let body = Value::Array(bodies);

let context = first.context;
Ok(JsonResponse {
response: http::Response::from_parts(
parts,
RouterBody::from(bytes.freeze()).into_inner(),
),
response: http::Response::from_parts(parts, Box::pin(once(ready(body)))),
context,
})
} else {
Expand Down Expand Up @@ -293,7 +315,7 @@ impl JsonServerService {
let mut result = vec![];
let mut is_batch = false;

match serde_json_bytes::<graphql::Request>::from_value(value) {
match serde_json_bytes::from_value::<graphql::Request>(value.clone()) {
Ok(request) => {
result.push(request);
}
Expand Down Expand Up @@ -374,51 +396,7 @@ impl JsonServerService {
if parts.method == Method::GET {
self.translate_query_request(&parts).await
} else {
self.translate_body_request(&body).await
/*
// FIXME: use a try block when available: https://github.com/rust-lang/rust/issues/31436
let content_length = (|| {
parts
.headers
.get(http::header::CONTENT_LENGTH)?
.to_str()
.ok()?
.parse()
.ok()
})();
if content_length.unwrap_or(0) > self.http_max_request_bytes {
Err(TranslateError {
status: StatusCode::PAYLOAD_TOO_LARGE,
error: "payload too large for the `http_max_request_bytes` configuration",
extension_code: "INVALID_GRAPHQL_REQUEST",
extension_details: "payload too large".to_string(),
})
} else {
let body = http_body::Limited::new(body, self.http_max_request_bytes);
get_body_bytes(body)
.instrument(tracing::debug_span!("receive_body"))
.await
.map_err(|e| {
if e.is::<http_body::LengthLimitError>() {
TranslateError {
status: StatusCode::PAYLOAD_TOO_LARGE,
error: "payload too large for the `http_max_request_bytes` configuration",
extension_code: "INVALID_GRAPHQL_REQUEST",
extension_details: "payload too large".to_string(),
}
} else {
TranslateError {
status: StatusCode::BAD_REQUEST,
error: "failed to get the request body",
extension_code: "INVALID_GRAPHQL_REQUEST",
extension_details: format!("failed to get the request body: {e}"),
}
}
})
.and_then(|bytes| {
self.translate_bytes_request(&bytes)
})
}*/
self.translate_body_request(body)
};

let (ok_results, is_batch) = graphql_requests?;
Expand Down Expand Up @@ -535,7 +513,7 @@ impl JsonServerService {
async fn process_supergraph_request(
&self,
supergraph_request: SupergraphRequest,
) -> Result<router::Response, BoxError> {
) -> Result<JsonResponse, BoxError> {
let mut request_res = self
.persisted_query_layer
.supergraph_request(supergraph_request);
Expand Down Expand Up @@ -583,16 +561,16 @@ impl JsonServerService {
match body.next().await {
None => {
tracing::error!("router service is not available to process request",);
Ok(router::Response {
response: http::Response::builder()
.status(StatusCode::SERVICE_UNAVAILABLE)
.body(
RouterBody::from("router service is not available to process request")
.into_inner(),
)
.expect("cannot fail"),
context,
})
Ok(JsonResponse::error_builder()
.status_code(StatusCode::SERVICE_UNAVAILABLE)
.error::<graphql::Error>(
graphql::Error::builder()
.message("router service is not available to process request")
.extension_code("SERVICE_UNAVAILABLE")
.build(),
)
.context(context)
.build()?)
}
Some(response) => {
if !response.has_next.unwrap_or(false)
Expand All @@ -607,11 +585,11 @@ impl JsonServerService {
.headers
.insert(CONTENT_TYPE, APPLICATION_JSON_HEADER_VALUE.clone());
tracing::trace_span!("serialize_response").in_scope(|| {
let body = serde_json::to_string(&response)?;
Ok(router::Response {
let body = serde_json_bytes::to_value(&response)?;
Ok(JsonResponse {
response: http::Response::from_parts(
parts,
RouterBody::from(body).into_inner(),
Box::pin(once(ready(body))),
),
context,
})
Expand All @@ -632,50 +610,19 @@ impl JsonServerService {
if !response.errors.is_empty() {
Self::count_errors(&response.errors);
}
let response = serde_json_bytes::to_value(&response)?;

// Useful when you're using a proxy like nginx which enable proxy_buffering by default (http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering)
parts.headers.insert(
ACCEL_BUFFERING_HEADER_NAME.clone(),
ACCEL_BUFFERING_HEADER_VALUE.clone(),
);
let multipart_stream = match response.subscribed {
Some(true) => StreamBody::new(Multipart::new(
body.inspect(|response| {
if !response.errors.is_empty() {
Self::count_errors(&response.errors);
}
}),
ProtocolMode::Subscription,
)),
_ => StreamBody::new(Multipart::new(
once(ready(response)).chain(body.inspect(|response| {
if !response.errors.is_empty() {
Self::count_errors(&response.errors);
}
})),
ProtocolMode::Defer,
)),
};
let response = (parts, multipart_stream).into_response().map(|body| {
// Axum makes this `body` have type:
// https://docs.rs/http-body/0.4.5/http_body/combinators/struct.UnsyncBoxBody.html
let mut body = Box::pin(body);
// We make a stream based on its `poll_data` method
// in order to create a `hyper::Body`.
RouterBody::wrap_stream(stream::poll_fn(move |ctx| {
body.as_mut().poll_data(ctx)
}))
.into_inner()
// … but we ignore the `poll_trailers` method:
// https://docs.rs/http-body/0.4.5/http_body/trait.Body.html#tymethod.poll_trailers
// Apparently HTTP/2 trailers are like headers, except after the response body.
// I (Simon) believe nothing in the Apollo Router uses trailers as of this writing,
// so ignoring `poll_trailers` is fine.
// If we want to use trailers, we may need remove this convertion to `hyper::Body`
// and return `UnsyncBoxBody` (a.k.a. `axum::BoxBody`) as-is.
});
let stream = Box::pin(once(ready(response)).chain(body.map(|response| {
if !response.errors.is_empty() {
Self::count_errors(&response.errors);
}
serde_json_bytes::to_value(&response)
.expect("response should be serializable; qed")
}))) as JsonStream;

let response = http::Response::from_parts(parts, stream);

Ok(RouterResponse { response, context })
Ok(JsonResponse { response, context })
} else {
tracing::info!(
monotonic_counter.apollo.router.graphql_error = 1u64,
Expand Down Expand Up @@ -709,6 +656,29 @@ impl JsonServerService {
}
}
}

fn count_errors(errors: &[graphql::Error]) {
let mut map = HashMap::new();
for error in errors {
let code = error.extensions.get("code").and_then(|c| c.as_str());
let entry = map.entry(code).or_insert(0u64);
*entry += 1;
}

for (code, count) in map {
match code {
None => {
tracing::info!(monotonic_counter.apollo.router.graphql_error = count,);
}
Some(code) => {
tracing::info!(
monotonic_counter.apollo.router.graphql_error = count,
code = code
);
}
}
}
}
}

struct TranslateError<'a> {
Expand Down

0 comments on commit 6cf1866

Please sign in to comment.