Skip to content

Commit

Permalink
plug into the router service
Browse files Browse the repository at this point in the history
no plugins yet, some tests are failing
  • Loading branch information
Geal committed Aug 23, 2024
1 parent 6cf1866 commit 0a32a72
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 606 deletions.
53 changes: 28 additions & 25 deletions apollo-router/src/protocols/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ pub(crate) enum ProtocolMode {
}

#[derive(Clone, Debug, Serialize)]
struct SubscriptionPayload {
payload: Option<graphql::Response>,
pub(crate) struct SubscriptionPayload {
pub(crate) payload: Option<graphql::Response>,
#[serde(skip_serializing_if = "Vec::is_empty")]
errors: Vec<graphql::Error>,
pub(crate) errors: Vec<graphql::Error>,
}

#[derive(Debug)]
enum MessageKind {
Heartbeat,
Message(graphql::Response),
Message(Value),
Eof,
}

Expand All @@ -54,7 +54,7 @@ pub(crate) struct Multipart {
impl Multipart {
pub(crate) fn new<S>(stream: S, mode: ProtocolMode) -> Self
where
S: Stream<Item = graphql::Response> + Send + 'static,
S: Stream<Item = Value> + Send + 'static,
{
let stream = match mode {
ProtocolMode::Subscription => select(
Expand Down Expand Up @@ -104,9 +104,15 @@ impl Stream for Multipart {

Poll::Ready(Some(Ok(buf)))
}
Some(MessageKind::Message(mut response)) => {
let is_still_open =
response.has_next.unwrap_or(false) || response.subscribed.unwrap_or(false);
Some(MessageKind::Message(response)) => {
let is_still_open = response
.as_object()
.and_then(|o| o.get("has_next").and_then(|v| v.as_bool()))
.unwrap_or(false)
|| response
.as_object()
.and_then(|o| o.get("subscribed").and_then(|v| v.as_bool()))
.unwrap_or(false);
let mut buf = if self.is_first_chunk {
self.is_first_chunk = false;
Vec::from(&b"\r\n--graphql\r\ncontent-type: application/json\r\n\r\n"[..])
Expand All @@ -116,26 +122,23 @@ impl Stream for Multipart {

match self.mode {
ProtocolMode::Subscription => {
let resp = SubscriptionPayload {
errors: if is_still_open {
Vec::new()
} else {
response.errors.drain(..).collect()
},
payload: match response.data {
None | Some(Value::Null) if response.extensions.is_empty() => {
None
}
_ => response.into(),
},
};

// Gracefully closed at the server side
if !is_still_open && resp.payload.is_none() && resp.errors.is_empty() {
if !is_still_open
&& response
.as_object()
.and_then(|o| o.get("payload"))
.is_none()
&& response
.as_object()
.and_then(|o| o.get("errors"))
.and_then(|v| v.as_array())
.map(|v| v.is_empty())
.unwrap_or(true)
{
self.is_terminated = true;
return Poll::Ready(Some(Ok(Bytes::from_static(&b"--\r\n"[..]))));
} else {
serde_json::to_writer(&mut buf, &resp)?;
serde_json::to_writer(&mut buf, &response)?;
}
}
ProtocolMode::Defer => {
Expand Down Expand Up @@ -216,7 +219,7 @@ mod tests {
.build(),
graphql::Response::builder().build(),
];
let gql_responses = stream::iter(responses);
let gql_responses = stream::iter(responses).map(|r| serde_json_bytes::to_value(r).unwrap());

let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription);
let heartbeat =
Expand Down
77 changes: 20 additions & 57 deletions apollo-router/src/services/json/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::configuration::BatchingMode;
use crate::context::CONTAINS_GRAPHQL_ERROR;
use crate::graphql;
use crate::http_ext;
use crate::protocols::multipart::SubscriptionPayload;
use crate::services::layers::apq::APQLayer;
use crate::services::layers::content_negotiation::GRAPHQL_JSON_RESPONSE_HEADER_VALUE;
use crate::services::layers::persisted_queries::PersistedQueryLayer;
Expand Down Expand Up @@ -80,64 +81,13 @@ impl Service<JsonRequest> for JsonServerService {
type Error = BoxError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: JsonRequest) -> Self::Future {
let JsonRequest { request, context } = req;

/*match serde_json_bytes::<graphql::Request>::from_value(body) {
Ok(request) => {
let response = self.supergraph_creator.create().oneshot(request).await.map_err(|e| {
todo!()
})?;
Ok(JsonResponse::new_from_graphql_response(response, context))
}, Err(e) => {
Ok(JsonResponse::error_builder()
.error(e)
.status_code(StatusCode::BAD_REQUEST)
.build())
}
}*/
todo!()

/*// Consume our cloned services and allow ownership to be transferred to the async block.
let clone = self.query_planner_service.clone();
let planning = std::mem::replace(&mut self.query_planner_service, clone);
let schema = self.schema.clone();
let context_cloned = req.context.clone();
let fut = service_call(
planning,
self.execution_service_factory.clone(),
schema,
req,
self.notify.clone(),
)
.or_else(|error: BoxError| async move {
let errors = vec![crate::error::Error {
message: error.to_string(),
extensions: serde_json_bytes::json!({
"code": "INTERNAL_SERVER_ERROR",
})
.as_object()
.unwrap()
.to_owned(),
..Default::default()
}];
Ok(SupergraphResponse::infallible_builder()
.errors(errors)
.status_code(StatusCode::INTERNAL_SERVER_ERROR)
.context(context_cloned)
.build())
});
Box::pin(fut)*/
fn call(&mut self, request: JsonRequest) -> Self::Future {
let service = self.clone();
Box::pin(async move { service.call_inner(request).await })
}
}

Expand Down Expand Up @@ -572,7 +522,7 @@ impl JsonServerService {
.context(context)
.build()?)
}
Some(response) => {
Some(mut response) => {
if !response.has_next.unwrap_or(false)
&& !response.subscribed.unwrap_or(false)
&& (accepts_json || accepts_wildcard)
Expand All @@ -585,7 +535,20 @@ impl JsonServerService {
.headers
.insert(CONTENT_TYPE, APPLICATION_JSON_HEADER_VALUE.clone());
tracing::trace_span!("serialize_response").in_scope(|| {
let body = serde_json_bytes::to_value(&response)?;
let body = if !response.subscribed.unwrap_or(false) {
let resp = SubscriptionPayload {
errors: response.errors.drain(..).collect(),
payload: match response.data {
None | Some(Value::Null) if response.extensions.is_empty() => {
None
}
_ => response.into(),
},
};
serde_json_bytes::to_value(&resp)?
} else {
serde_json_bytes::to_value(&response)?
};
Ok(JsonResponse {
response: http::Response::from_parts(
parts,
Expand Down
Loading

0 comments on commit 0a32a72

Please sign in to comment.