diff --git a/.changesets/fix_simon_compute_jobs.md b/.changesets/fix_simon_compute_jobs.md new file mode 100644 index 0000000000..f3861ec975 --- /dev/null +++ b/.changesets/fix_simon_compute_jobs.md @@ -0,0 +1,19 @@ +### Move heavy computation to a thread pool with a priority queue + +These components can take non-trivial amounts of CPU time: + +* GraphQL parsing +* GraphQL validation +* Query planning +* Schema introspection + +In order to avoid blocking threads that execute asynchronous code, +they are now run (in their respective Rust implementations) +in a new pool of as many threads as CPU cores are available. +Previously we used Tokio’s [`spawn_blocking`] for this purpose, +but it is appears to be intended for blocking I/O +and uses up to 512 threads so it isn’t a great fit for computation tasks. + +[`spawn_blocking`]: https://docs.rs/tokio/latest/tokio/task/fn.spawn_blocking.html + +By [@SimonSapin](https://github.com/SimonSapin) in https://github.com/apollographql/router/pull/6247 diff --git a/apollo-router/src/ageing_priority_queue.rs b/apollo-router/src/ageing_priority_queue.rs new file mode 100644 index 0000000000..6bf742106b --- /dev/null +++ b/apollo-router/src/ageing_priority_queue.rs @@ -0,0 +1,124 @@ +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; + +/// Items with higher priority value get handled sooner +#[allow(unused)] +pub(crate) enum Priority { + P1 = 1, + P2, + P3, + P4, + P5, + P6, + P7, + P8, +} + +const INNER_QUEUES_COUNT: usize = Priority::P8 as usize - Priority::P1 as usize + 1; + +/// Indices start at 0 for highest priority +const fn index_from_priority(priority: Priority) -> usize { + Priority::P8 as usize - priority as usize +} + +const _: () = { + assert!(index_from_priority(Priority::P1) == 7); + assert!(index_from_priority(Priority::P8) == 0); +}; + +pub(crate) struct AgeingPriorityQueue +where + T: Send + 'static, +{ + /// Items in **lower** indices queues are handled sooner + inner_queues: + [(crossbeam_channel::Sender, crossbeam_channel::Receiver); INNER_QUEUES_COUNT], + queued_count: AtomicUsize, + soft_capacity: usize, +} + +pub(crate) struct Receiver<'a, T> +where + T: Send + 'static, +{ + shared: &'a AgeingPriorityQueue, + select: crossbeam_channel::Select<'a>, +} + +impl AgeingPriorityQueue +where + T: Send + 'static, +{ + pub(crate) fn soft_bounded(soft_capacity: usize) -> Self { + Self { + // Using unbounded channels: callers must use `is_full` to implement backpressure + inner_queues: std::array::from_fn(|_| crossbeam_channel::unbounded()), + queued_count: AtomicUsize::new(0), + soft_capacity, + } + } + + pub(crate) fn queued_count(&self) -> usize { + self.queued_count.load(Ordering::Relaxed) + } + + pub(crate) fn is_full(&self) -> bool { + self.queued_count() >= self.soft_capacity + } + + /// Panics if `priority` is not in `AVAILABLE_PRIORITIES` + pub(crate) fn send(&self, priority: Priority, message: T) { + self.queued_count.fetch_add(1, Ordering::Relaxed); + let (inner_sender, _) = &self.inner_queues[index_from_priority(priority)]; + inner_sender.send(message).expect("disconnected channel") + } + + pub(crate) fn receiver(&self) -> Receiver<'_, T> { + let mut select = crossbeam_channel::Select::new(); + for (_, inner_receiver) in &self.inner_queues { + select.recv(inner_receiver); + } + Receiver { + shared: self, + select, + } + } +} + +impl<'a, T> Receiver<'a, T> +where + T: Send + 'static, +{ + pub(crate) fn blocking_recv(&mut self) -> T { + loop { + // Block until something is ready. + // Ignore the returned index because it is "random" when multiple operations are ready. + self.select.ready(); + // Check inner channels in priority order instead: + for (index, (_, inner_receiver)) in self.shared.inner_queues.iter().enumerate() { + if let Ok(message) = inner_receiver.try_recv() { + self.shared.queued_count.fetch_sub(1, Ordering::Relaxed); + self.age(index); + return message; + } + } + // Another thread raced us to it or `ready()` returned spuriously, try again + } + } + + // Promote some messages from priorities lower (higher indices) than `message_consumed_at_index` + fn age(&self, message_consumed_at_index: usize) { + for window in self.shared.inner_queues[message_consumed_at_index..].windows(2) { + let [higher_priority, lower_priority] = window else { + panic!("expected windows of length 2") + }; + let (higher_priority_sender, _) = higher_priority; + let (_, lower_priority_receiver) = lower_priority; + if let Ok(message) = lower_priority_receiver.try_recv() { + higher_priority_sender + .send(message) + .expect("disconnected channel") + } + } + } +} diff --git a/apollo-router/src/compute_job.rs b/apollo-router/src/compute_job.rs new file mode 100644 index 0000000000..8370d05882 --- /dev/null +++ b/apollo-router/src/compute_job.rs @@ -0,0 +1,78 @@ +use std::future::Future; +use std::num::NonZeroUsize; +use std::panic::UnwindSafe; +use std::sync::OnceLock; + +use opentelemetry::metrics::MeterProvider as _; +use opentelemetry::metrics::ObservableGauge; +use tokio::sync::oneshot; + +use crate::ageing_priority_queue::AgeingPriorityQueue; +pub(crate) use crate::ageing_priority_queue::Priority; +use crate::metrics::meter_provider; + +/// We generate backpressure in tower `poll_ready` when reaching this many queued items +// TODO: what’s a good number? should it be configurable? +const QUEUE_SOFT_CAPACITY: usize = 100; + +// TODO: should this be configurable? +fn thread_pool_size() -> NonZeroUsize { + std::thread::available_parallelism().expect("available_parallelism() failed") +} + +type Job = Box; + +fn queue() -> &'static AgeingPriorityQueue { + static QUEUE: OnceLock> = OnceLock::new(); + QUEUE.get_or_init(|| { + for _ in 0..thread_pool_size().get() { + std::thread::spawn(|| { + // This looks like we need the queue before creating the queue, + // but it happens in a child thread where OnceLock will block + // until `get_or_init` in the parent thread is finished + // and the parent is *not* blocked on the child thread making progress. + let queue = queue(); + + let mut receiver = queue.receiver(); + loop { + let job = receiver.blocking_recv(); + job(); + } + }); + } + AgeingPriorityQueue::soft_bounded(QUEUE_SOFT_CAPACITY) + }) +} + +/// Returns a future that resolves to a `Result` that is `Ok` if `f` returned or `Err` if it panicked. +pub(crate) fn execute( + priority: Priority, + job: F, +) -> impl Future> +where + F: FnOnce() -> T + Send + UnwindSafe + 'static, + T: Send + 'static, +{ + let (tx, rx) = oneshot::channel(); + let job = Box::new(move || { + // Ignore the error if the oneshot receiver was dropped + let _ = tx.send(std::panic::catch_unwind(job)); + }); + queue().send(priority, job); + async { rx.await.expect("channel disconnected") } +} + +pub(crate) fn is_full() -> bool { + queue().is_full() +} + +pub(crate) fn create_queue_size_gauge() -> ObservableGauge { + meter_provider() + .meter("apollo/router") + .u64_observable_gauge("apollo.router.compute_jobs.queued") + .with_description( + "Number of computation jobs (parsing, planning, …) waiting to be scheduled", + ) + .with_callback(move |m| m.observe(queue().queued_count() as u64, &[])) + .init() +} diff --git a/apollo-router/src/introspection.rs b/apollo-router/src/introspection.rs index a0a539895d..b69b03c68b 100644 --- a/apollo-router/src/introspection.rs +++ b/apollo-router/src/introspection.rs @@ -6,6 +6,7 @@ use apollo_compiler::executable::Selection; use serde_json_bytes::json; use crate::cache::storage::CacheStorage; +use crate::compute_job; use crate::graphql; use crate::query_planner::QueryKey; use crate::services::layers::query_analysis::ParsedDocument; @@ -130,8 +131,9 @@ impl IntrospectionCache { } let schema = schema.clone(); let doc = doc.clone(); + let priority = compute_job::Priority::P1; // Low priority let response = - tokio::task::spawn_blocking(move || Self::execute_introspection(&schema, &doc)) + compute_job::execute(priority, move || Self::execute_introspection(&schema, &doc)) .await .expect("Introspection panicked"); storage.insert(cache_key, response.clone()).await; diff --git a/apollo-router/src/lib.rs b/apollo-router/src/lib.rs index 9ac39c9e23..6f30ab6239 100644 --- a/apollo-router/src/lib.rs +++ b/apollo-router/src/lib.rs @@ -52,10 +52,12 @@ pub mod plugin; #[macro_use] pub(crate) mod metrics; +mod ageing_priority_queue; mod apollo_studio_interop; pub(crate) mod axum_factory; mod batching; mod cache; +mod compute_job; mod configuration; mod context; mod error; diff --git a/apollo-router/src/query_planner/bridge_query_planner.rs b/apollo-router/src/query_planner/bridge_query_planner.rs index f10e424bb3..aa83aa5dcc 100644 --- a/apollo-router/src/query_planner/bridge_query_planner.rs +++ b/apollo-router/src/query_planner/bridge_query_planner.rs @@ -29,6 +29,7 @@ use tower::Service; use super::PlanNode; use super::QueryKey; use crate::apollo_studio_interop::generate_usage_reporting; +use crate::compute_job; use crate::configuration::QueryPlannerMode; use crate::error::PlanErrors; use crate::error::QueryPlannerError; @@ -259,7 +260,8 @@ impl PlannerMode { PlannerMode::Rust(rust_planner) => { let doc = doc.clone(); let rust_planner = rust_planner.clone(); - let (plan, mut root_node) = tokio::task::spawn_blocking(move || { + let priority = compute_job::Priority::P8; // High priority + let (plan, mut root_node) = compute_job::execute(priority, move || { let start = Instant::now(); let query_plan_options = QueryPlanOptions { diff --git a/apollo-router/src/query_planner/bridge_query_planner_pool.rs b/apollo-router/src/query_planner/bridge_query_planner_pool.rs index 722a965efd..a6a3b341e6 100644 --- a/apollo-router/src/query_planner/bridge_query_planner_pool.rs +++ b/apollo-router/src/query_planner/bridge_query_planner_pool.rs @@ -4,6 +4,7 @@ use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::Mutex; +use std::task::Poll; use std::time::Instant; use apollo_compiler::validation::Valid; @@ -42,6 +43,7 @@ pub(crate) struct BridgeQueryPlannerPool { )>, schema: Arc, subgraph_schemas: Arc>>>, + compute_jobs_queue_size_gauge: Arc>>>, pool_size_gauge: Arc>>>, v8_heap_used: Arc, v8_heap_used_gauge: Arc>>>, @@ -151,6 +153,7 @@ impl BridgeQueryPlannerPool { sender, schema, subgraph_schemas, + compute_jobs_queue_size_gauge: Default::default(), pool_size_gauge: Default::default(), v8_heap_used, v8_heap_used_gauge: Default::default(), @@ -228,6 +231,10 @@ impl BridgeQueryPlannerPool { pub(super) fn activate(&self) { // Gauges MUST be initialized after a meter provider is created. // When a hot reload happens this means that the gauges must be re-initialized. + *self + .compute_jobs_queue_size_gauge + .lock() + .expect("lock poisoned") = Some(crate::compute_job::create_queue_size_gauge()); *self.pool_size_gauge.lock().expect("lock poisoned") = Some(self.create_pool_size_gauge()); *self.v8_heap_used_gauge.lock().expect("lock poisoned") = Some(self.create_heap_used_gauge()); @@ -244,16 +251,16 @@ impl tower::Service for BridgeQueryPlannerPool { type Future = BoxFuture<'static, Result>; - fn poll_ready( - &mut self, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { + fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll> { + if crate::compute_job::is_full() { + return Poll::Pending; + } if self.sender.is_full() { - std::task::Poll::Ready(Err(QueryPlannerError::PoolProcessing( + Poll::Ready(Err(QueryPlannerError::PoolProcessing( "query plan queue is full".into(), ))) } else { - std::task::Poll::Ready(Ok(())) + Poll::Ready(Ok(())) } } diff --git a/apollo-router/src/services/layers/query_analysis.rs b/apollo-router/src/services/layers/query_analysis.rs index 4af57a5f38..2e2fe77eff 100644 --- a/apollo-router/src/services/layers/query_analysis.rs +++ b/apollo-router/src/services/layers/query_analysis.rs @@ -13,10 +13,10 @@ use http::StatusCode; use lru::LruCache; use router_bridge::planner::UsageReporting; use tokio::sync::Mutex; -use tokio::task; use crate::apollo_studio_interop::generate_extended_references; use crate::apollo_studio_interop::ExtendedReferenceStats; +use crate::compute_job; use crate::context::OPERATION_KIND; use crate::context::OPERATION_NAME; use crate::graphql::Error; @@ -89,7 +89,8 @@ impl QueryAnalysisLayer { // parent let span = tracing::info_span!(QUERY_PARSING_SPAN_NAME, "otel.kind" = "INTERNAL"); - task::spawn_blocking(move || { + let priority = compute_job::Priority::P4; // Medium priority + let job = move || { span.in_scope(|| { Query::parse_document( &query, @@ -98,9 +99,12 @@ impl QueryAnalysisLayer { conf.as_ref(), ) }) - }) - .await - .expect("parse_document task panicked") + }; + // TODO: is this correct? + let job = std::panic::AssertUnwindSafe(job); + compute_job::execute(priority, job) + .await + .expect("Query::parse_document panicked") } pub(crate) async fn supergraph_request( diff --git a/apollo-router/src/services/router/service.rs b/apollo-router/src/services/router/service.rs index 9e379ae3c4..e5792c1a4d 100644 --- a/apollo-router/src/services/router/service.rs +++ b/apollo-router/src/services/router/service.rs @@ -214,6 +214,11 @@ impl Service for RouterService { type Future = BoxFuture<'static, Result>; fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll> { + // This service eventually calls `QueryAnalysisLayer::parse_document()` + // which calls `compute_job::execute()` + if crate::compute_job::is_full() { + return Poll::Pending; + } Poll::Ready(Ok(())) }