Skip to content

Commit

Permalink
Move heavy computation to a thread pool with a priority queue
Browse files Browse the repository at this point in the history
These components can take non-trivial amounts of CPU time:

* GraphQL parsing
* GraphQL validation
* Query planning
* Schema introspection

In order to avoid blocking threads that execute asynchronous code,
they are now run (in their respective Rust implementations)
in a new pool of as many threads as CPU cores are available.
Previously we used Tokio’s [`spawn_blocking`] for this purpose,
but it is appears to be intended for blocking I/O
and uses up to 512 threads so it isn’t a great fit for computation tasks.

[`spawn_blocking`]: https://docs.rs/tokio/latest/tokio/task/fn.spawn_blocking.html

This PR supersedes and closes #6122

The ageing priority algorithm is based on @garypen’s work
in https://github.com/apollographql/ageing
  • Loading branch information
SimonSapin committed Nov 7, 2024
1 parent 0189a16 commit a6ea805
Show file tree
Hide file tree
Showing 9 changed files with 256 additions and 13 deletions.
19 changes: 19 additions & 0 deletions .changesets/fix_simon_compute_jobs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
### Move heavy computation to a thread pool with a priority queue

These components can take non-trivial amounts of CPU time:

* GraphQL parsing
* GraphQL validation
* Query planning
* Schema introspection

In order to avoid blocking threads that execute asynchronous code,
they are now run (in their respective Rust implementations)
in a new pool of as many threads as CPU cores are available.
Previously we used Tokio’s [`spawn_blocking`] for this purpose,
but it is appears to be intended for blocking I/O
and uses up to 512 threads so it isn’t a great fit for computation tasks.

[`spawn_blocking`]: https://docs.rs/tokio/latest/tokio/task/fn.spawn_blocking.html

By [@SimonSapin](https://github.com/SimonSapin) in https://github.com/apollographql/router/pull/6247
124 changes: 124 additions & 0 deletions apollo-router/src/ageing_priority_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;

/// Items with higher priority value get handled sooner
#[allow(unused)]
pub(crate) enum Priority {
P1 = 1,
P2,
P3,
P4,
P5,
P6,
P7,
P8,
}

const INNER_QUEUES_COUNT: usize = Priority::P8 as usize - Priority::P1 as usize + 1;

/// Indices start at 0 for highest priority
const fn index_from_priority(priority: Priority) -> usize {
Priority::P8 as usize - priority as usize
}

const _: () = {
assert!(index_from_priority(Priority::P1) == 7);
assert!(index_from_priority(Priority::P8) == 0);
};

pub(crate) struct AgeingPriorityQueue<T>
where
T: Send + 'static,
{
/// Items in **lower** indices queues are handled sooner
inner_queues:
[(crossbeam_channel::Sender<T>, crossbeam_channel::Receiver<T>); INNER_QUEUES_COUNT],
queued_count: AtomicUsize,
soft_capacity: usize,
}

pub(crate) struct Receiver<'a, T>
where
T: Send + 'static,
{
shared: &'a AgeingPriorityQueue<T>,
select: crossbeam_channel::Select<'a>,
}

impl<T> AgeingPriorityQueue<T>
where
T: Send + 'static,
{
pub(crate) fn soft_bounded(soft_capacity: usize) -> Self {
Self {
// Using unbounded channels: callers must use `is_full` to implement backpressure
inner_queues: std::array::from_fn(|_| crossbeam_channel::unbounded()),
queued_count: AtomicUsize::new(0),
soft_capacity,
}
}

pub(crate) fn queued_count(&self) -> usize {
self.queued_count.load(Ordering::Relaxed)
}

pub(crate) fn is_full(&self) -> bool {
self.queued_count() >= self.soft_capacity
}

/// Panics if `priority` is not in `AVAILABLE_PRIORITIES`
pub(crate) fn send(&self, priority: Priority, message: T) {
self.queued_count.fetch_add(1, Ordering::Relaxed);
let (inner_sender, _) = &self.inner_queues[index_from_priority(priority)];
inner_sender.send(message).expect("disconnected channel")
}

pub(crate) fn receiver(&self) -> Receiver<'_, T> {
let mut select = crossbeam_channel::Select::new();
for (_, inner_receiver) in &self.inner_queues {
select.recv(inner_receiver);
}
Receiver {
shared: self,
select,
}
}
}

impl<'a, T> Receiver<'a, T>
where
T: Send + 'static,
{
pub(crate) fn blocking_recv(&mut self) -> T {
loop {
// Block until something is ready.
// Ignore the returned index because it is "random" when multiple operations are ready.
self.select.ready();
// Check inner channels in priority order instead:
for (index, (_, inner_receiver)) in self.shared.inner_queues.iter().enumerate() {
if let Ok(message) = inner_receiver.try_recv() {
self.shared.queued_count.fetch_sub(1, Ordering::Relaxed);
self.age(index);
return message;
}
}
// Another thread raced us to it or `ready()` returned spuriously, try again
}
}

// Promote some messages from priorities lower (higher indices) than `message_consumed_at_index`
fn age(&self, message_consumed_at_index: usize) {
for window in self.shared.inner_queues[message_consumed_at_index..].windows(2) {
let [higher_priority, lower_priority] = window else {
panic!("expected windows of length 2")
};
let (higher_priority_sender, _) = higher_priority;
let (_, lower_priority_receiver) = lower_priority;
if let Ok(message) = lower_priority_receiver.try_recv() {
higher_priority_sender
.send(message)
.expect("disconnected channel")
}
}
}
}
78 changes: 78 additions & 0 deletions apollo-router/src/compute_job.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use std::future::Future;
use std::num::NonZeroUsize;
use std::panic::UnwindSafe;
use std::sync::OnceLock;

use opentelemetry::metrics::MeterProvider as _;
use opentelemetry::metrics::ObservableGauge;
use tokio::sync::oneshot;

use crate::ageing_priority_queue::AgeingPriorityQueue;
pub(crate) use crate::ageing_priority_queue::Priority;
use crate::metrics::meter_provider;

/// We generate backpressure in tower `poll_ready` when reaching this many queued items
// TODO: what’s a good number? should it be configurable?
const QUEUE_SOFT_CAPACITY: usize = 100;

// TODO: should this be configurable?
fn thread_pool_size() -> NonZeroUsize {
std::thread::available_parallelism().expect("available_parallelism() failed")
}

type Job = Box<dyn FnOnce() + Send + 'static>;

fn queue() -> &'static AgeingPriorityQueue<Job> {
static QUEUE: OnceLock<AgeingPriorityQueue<Job>> = OnceLock::new();
QUEUE.get_or_init(|| {
for _ in 0..thread_pool_size().get() {
std::thread::spawn(|| {
// This looks like we need the queue before creating the queue,
// but it happens in a child thread where OnceLock will block
// until `get_or_init` in the parent thread is finished
// and the parent is *not* blocked on the child thread making progress.
let queue = queue();

let mut receiver = queue.receiver();
loop {
let job = receiver.blocking_recv();
job();
}
});
}
AgeingPriorityQueue::soft_bounded(QUEUE_SOFT_CAPACITY)
})
}

/// Returns a future that resolves to a `Result` that is `Ok` if `f` returned or `Err` if it panicked.
pub(crate) fn execute<T, F>(
priority: Priority,
job: F,
) -> impl Future<Output = std::thread::Result<T>>
where
F: FnOnce() -> T + Send + UnwindSafe + 'static,
T: Send + 'static,
{
let (tx, rx) = oneshot::channel();
let job = Box::new(move || {
// Ignore the error if the oneshot receiver was dropped
let _ = tx.send(std::panic::catch_unwind(job));
});
queue().send(priority, job);
async { rx.await.expect("channel disconnected") }
}

pub(crate) fn is_full() -> bool {
queue().is_full()
}

pub(crate) fn create_queue_size_gauge() -> ObservableGauge<u64> {
meter_provider()
.meter("apollo/router")
.u64_observable_gauge("apollo.router.compute_jobs.queued")
.with_description(
"Number of computation jobs (parsing, planning, …) waiting to be scheduled",
)
.with_callback(move |m| m.observe(queue().queued_count() as u64, &[]))
.init()
}
4 changes: 3 additions & 1 deletion apollo-router/src/introspection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use apollo_compiler::executable::Selection;
use serde_json_bytes::json;

use crate::cache::storage::CacheStorage;
use crate::compute_job;
use crate::graphql;
use crate::query_planner::QueryKey;
use crate::services::layers::query_analysis::ParsedDocument;
Expand Down Expand Up @@ -130,8 +131,9 @@ impl IntrospectionCache {
}
let schema = schema.clone();
let doc = doc.clone();
let priority = compute_job::Priority::P1; // Low priority
let response =
tokio::task::spawn_blocking(move || Self::execute_introspection(&schema, &doc))
compute_job::execute(priority, move || Self::execute_introspection(&schema, &doc))
.await
.expect("Introspection panicked");
storage.insert(cache_key, response.clone()).await;
Expand Down
2 changes: 2 additions & 0 deletions apollo-router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ pub mod plugin;
#[macro_use]
pub(crate) mod metrics;

mod ageing_priority_queue;
mod apollo_studio_interop;
pub(crate) mod axum_factory;
mod batching;
mod cache;
mod compute_job;
mod configuration;
mod context;
mod error;
Expand Down
4 changes: 3 additions & 1 deletion apollo-router/src/query_planner/bridge_query_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use tower::Service;
use super::PlanNode;
use super::QueryKey;
use crate::apollo_studio_interop::generate_usage_reporting;
use crate::compute_job;
use crate::configuration::QueryPlannerMode;
use crate::error::PlanErrors;
use crate::error::QueryPlannerError;
Expand Down Expand Up @@ -259,7 +260,8 @@ impl PlannerMode {
PlannerMode::Rust(rust_planner) => {
let doc = doc.clone();
let rust_planner = rust_planner.clone();
let (plan, mut root_node) = tokio::task::spawn_blocking(move || {
let priority = compute_job::Priority::P8; // High priority
let (plan, mut root_node) = compute_job::execute(priority, move || {
let start = Instant::now();

let query_plan_options = QueryPlanOptions {
Expand Down
19 changes: 13 additions & 6 deletions apollo-router/src/query_planner/bridge_query_planner_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::task::Poll;
use std::time::Instant;

use apollo_compiler::validation::Valid;
Expand Down Expand Up @@ -42,6 +43,7 @@ pub(crate) struct BridgeQueryPlannerPool {
)>,
schema: Arc<Schema>,
subgraph_schemas: Arc<HashMap<String, Arc<Valid<apollo_compiler::Schema>>>>,
compute_jobs_queue_size_gauge: Arc<Mutex<Option<ObservableGauge<u64>>>>,
pool_size_gauge: Arc<Mutex<Option<ObservableGauge<u64>>>>,
v8_heap_used: Arc<AtomicU64>,
v8_heap_used_gauge: Arc<Mutex<Option<ObservableGauge<u64>>>>,
Expand Down Expand Up @@ -151,6 +153,7 @@ impl BridgeQueryPlannerPool {
sender,
schema,
subgraph_schemas,
compute_jobs_queue_size_gauge: Default::default(),
pool_size_gauge: Default::default(),
v8_heap_used,
v8_heap_used_gauge: Default::default(),
Expand Down Expand Up @@ -228,6 +231,10 @@ impl BridgeQueryPlannerPool {
pub(super) fn activate(&self) {
// Gauges MUST be initialized after a meter provider is created.
// When a hot reload happens this means that the gauges must be re-initialized.
*self
.compute_jobs_queue_size_gauge
.lock()
.expect("lock poisoned") = Some(crate::compute_job::create_queue_size_gauge());
*self.pool_size_gauge.lock().expect("lock poisoned") = Some(self.create_pool_size_gauge());
*self.v8_heap_used_gauge.lock().expect("lock poisoned") =
Some(self.create_heap_used_gauge());
Expand All @@ -244,16 +251,16 @@ impl tower::Service<QueryPlannerRequest> for BridgeQueryPlannerPool {

type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(
&mut self,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
if crate::compute_job::is_full() {
return Poll::Pending;
}
if self.sender.is_full() {
std::task::Poll::Ready(Err(QueryPlannerError::PoolProcessing(
Poll::Ready(Err(QueryPlannerError::PoolProcessing(
"query plan queue is full".into(),
)))
} else {
std::task::Poll::Ready(Ok(()))
Poll::Ready(Ok(()))
}
}

Expand Down
14 changes: 9 additions & 5 deletions apollo-router/src/services/layers/query_analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ use http::StatusCode;
use lru::LruCache;
use router_bridge::planner::UsageReporting;
use tokio::sync::Mutex;
use tokio::task;

use crate::apollo_studio_interop::generate_extended_references;
use crate::apollo_studio_interop::ExtendedReferenceStats;
use crate::compute_job;
use crate::context::OPERATION_KIND;
use crate::context::OPERATION_NAME;
use crate::graphql::Error;
Expand Down Expand Up @@ -89,7 +89,8 @@ impl QueryAnalysisLayer {
// parent
let span = tracing::info_span!(QUERY_PARSING_SPAN_NAME, "otel.kind" = "INTERNAL");

task::spawn_blocking(move || {
let priority = compute_job::Priority::P4; // Medium priority
let job = move || {
span.in_scope(|| {
Query::parse_document(
&query,
Expand All @@ -98,9 +99,12 @@ impl QueryAnalysisLayer {
conf.as_ref(),
)
})
})
.await
.expect("parse_document task panicked")
};
// TODO: is this correct?
let job = std::panic::AssertUnwindSafe(job);
compute_job::execute(priority, job)
.await
.expect("Query::parse_document panicked")
}

pub(crate) async fn supergraph_request(
Expand Down
5 changes: 5 additions & 0 deletions apollo-router/src/services/router/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,11 @@ impl Service<RouterRequest> for RouterService {
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
// This service eventually calls `QueryAnalysisLayer::parse_document()`
// which calls `compute_job::execute()`
if crate::compute_job::is_full() {
return Poll::Pending;
}
Poll::Ready(Ok(()))
}

Expand Down

0 comments on commit a6ea805

Please sign in to comment.