diff --git a/.changesets/fix_simon_compute_jobs.md b/.changesets/fix_simon_compute_jobs.md new file mode 100644 index 0000000000..604f1624c5 --- /dev/null +++ b/.changesets/fix_simon_compute_jobs.md @@ -0,0 +1,24 @@ +### Move heavy computation to a thread pool with a priority queue + +These components can take non-trivial amounts of CPU time: + +* GraphQL parsing +* GraphQL validation +* Query planning +* Schema introspection + +In order to avoid blocking threads that execute asynchronous code, +they are now run (in their respective Rust implementations) +in a new thread pool whose size is based on available CPU cores, +with a priority queue. +Previously we used Tokio’s [`spawn_blocking`] for this purpose, +but it is appears to be intended for blocking I/O +and uses up to 512 threads so it isn’t a great fit for computation tasks. + +`apollo.router.compute_jobs.queued` is a new gauge metric for the number of items in this new queue. +When the new query planner is enabled, the dedicated queue is no longer used +and the `apollo.router.query_planning.queued` metric is no longer emitted. + +[`spawn_blocking`]: https://docs.rs/tokio/latest/tokio/task/fn.spawn_blocking.html + +By [@SimonSapin](https://github.com/SimonSapin) in https://github.com/apollographql/router/pull/6247 diff --git a/apollo-router/src/ageing_priority_queue.rs b/apollo-router/src/ageing_priority_queue.rs new file mode 100644 index 0000000000..d206283093 --- /dev/null +++ b/apollo-router/src/ageing_priority_queue.rs @@ -0,0 +1,147 @@ +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; + +/// Items with higher priority value get handled sooner +#[allow(unused)] +pub(crate) enum Priority { + P1 = 1, + P2, + P3, + P4, + P5, + P6, + P7, + P8, +} + +const INNER_QUEUES_COUNT: usize = Priority::P8 as usize - Priority::P1 as usize + 1; + +/// Indices start at 0 for highest priority +const fn index_from_priority(priority: Priority) -> usize { + Priority::P8 as usize - priority as usize +} + +const _: () = { + assert!(index_from_priority(Priority::P1) == 7); + assert!(index_from_priority(Priority::P8) == 0); +}; + +pub(crate) struct AgeingPriorityQueue +where + T: Send + 'static, +{ + /// Items in **lower** indices queues are handled sooner + inner_queues: + [(crossbeam_channel::Sender, crossbeam_channel::Receiver); INNER_QUEUES_COUNT], + queued_count: AtomicUsize, + soft_capacity: usize, +} + +pub(crate) struct Receiver<'a, T> +where + T: Send + 'static, +{ + shared: &'a AgeingPriorityQueue, + select: crossbeam_channel::Select<'a>, +} + +impl AgeingPriorityQueue +where + T: Send + 'static, +{ + pub(crate) fn soft_bounded(soft_capacity: usize) -> Self { + Self { + // Using unbounded channels: callers must use `is_full` to implement backpressure + inner_queues: std::array::from_fn(|_| crossbeam_channel::unbounded()), + queued_count: AtomicUsize::new(0), + soft_capacity, + } + } + + pub(crate) fn queued_count(&self) -> usize { + self.queued_count.load(Ordering::Relaxed) + } + + pub(crate) fn is_full(&self) -> bool { + self.queued_count() >= self.soft_capacity + } + + /// Panics if `priority` is not in `AVAILABLE_PRIORITIES` + pub(crate) fn send(&self, priority: Priority, message: T) { + self.queued_count.fetch_add(1, Ordering::Relaxed); + let (inner_sender, _) = &self.inner_queues[index_from_priority(priority)]; + inner_sender.send(message).expect("disconnected channel") + } + + pub(crate) fn receiver(&self) -> Receiver<'_, T> { + let mut select = crossbeam_channel::Select::new(); + for (_, inner_receiver) in &self.inner_queues { + select.recv(inner_receiver); + } + Receiver { + shared: self, + select, + } + } +} + +impl<'a, T> Receiver<'a, T> +where + T: Send + 'static, +{ + pub(crate) fn blocking_recv(&mut self) -> T { + loop { + // Block until something is ready. + // Ignore the returned index because it is "random" when multiple operations are ready. + self.select.ready(); + // Check inner channels in priority order instead: + for (index, (_, inner_receiver)) in self.shared.inner_queues.iter().enumerate() { + if let Ok(message) = inner_receiver.try_recv() { + self.shared.queued_count.fetch_sub(1, Ordering::Relaxed); + self.age(index); + return message; + } + } + // Another thread raced us to it or `ready()` returned spuriously, try again + } + } + + // Promote some messages from priorities lower (higher indices) than `message_consumed_at_index` + fn age(&self, message_consumed_at_index: usize) { + for window in self.shared.inner_queues[message_consumed_at_index..].windows(2) { + let [higher_priority, lower_priority] = window else { + panic!("expected windows of length 2") + }; + let (higher_priority_sender, _) = higher_priority; + let (_, lower_priority_receiver) = lower_priority; + if let Ok(message) = lower_priority_receiver.try_recv() { + higher_priority_sender + .send(message) + .expect("disconnected channel") + } + } + } +} + +#[test] +fn test_priorities() { + let queue = AgeingPriorityQueue::soft_bounded(3); + assert_eq!(queue.queued_count(), 0); + assert!(!queue.is_full()); + queue.send(Priority::P1, "p1"); + assert!(!queue.is_full()); + queue.send(Priority::P2, "p2"); + assert!(!queue.is_full()); + queue.send(Priority::P3, "p3"); + // The queue is now "full" but sending still works, it’s up to the caller to stop sending + assert!(queue.is_full()); + queue.send(Priority::P2, "p2 again"); + assert_eq!(queue.queued_count(), 4); + + let mut receiver = queue.receiver(); + assert_eq!(receiver.blocking_recv(), "p3"); + assert_eq!(receiver.blocking_recv(), "p2"); + assert_eq!(receiver.blocking_recv(), "p2 again"); + assert_eq!(receiver.blocking_recv(), "p1"); + assert_eq!(queue.queued_count(), 0); +} diff --git a/apollo-router/src/compute_job.rs b/apollo-router/src/compute_job.rs new file mode 100644 index 0000000000..7672dfea83 --- /dev/null +++ b/apollo-router/src/compute_job.rs @@ -0,0 +1,148 @@ +use std::future::Future; +use std::panic::UnwindSafe; +use std::sync::OnceLock; + +use opentelemetry::metrics::MeterProvider as _; +use opentelemetry::metrics::ObservableGauge; +use tokio::sync::oneshot; + +use crate::ageing_priority_queue::AgeingPriorityQueue; +pub(crate) use crate::ageing_priority_queue::Priority; +use crate::metrics::meter_provider; + +/// We generate backpressure in tower `poll_ready` when the number of queued jobs +/// reaches `QUEUE_SOFT_CAPACITY_PER_THREAD * thread_pool_size()` +const QUEUE_SOFT_CAPACITY_PER_THREAD: usize = 20; + +/// Leave a fraction of CPU cores free to run Tokio threads even if this thread pool is very busy: +/// +/// available: 1 pool size: 1 +/// available: 2 pool size: 1 +/// available: 3 pool size: 2 +/// available: 4 pool size: 3 +/// available: 5 pool size: 4 +/// ... +/// available: 8 pool size: 7 +/// available: 9 pool size: 7 +/// ... +/// available: 16 pool size: 14 +/// available: 17 pool size: 14 +/// ... +/// available: 32 pool size: 28 +fn thread_pool_size() -> usize { + let available = std::thread::available_parallelism() + .expect("available_parallelism() failed") + .get(); + thread_poll_size_for_available_parallelism(available) +} + +fn thread_poll_size_for_available_parallelism(available: usize) -> usize { + let reserved = available.div_ceil(8); + (available - reserved).max(1) +} + +type Job = Box; + +fn queue() -> &'static AgeingPriorityQueue { + static QUEUE: OnceLock> = OnceLock::new(); + QUEUE.get_or_init(|| { + let pool_size = thread_pool_size(); + for _ in 0..pool_size { + std::thread::spawn(|| { + // This looks like we need the queue before creating the queue, + // but it happens in a child thread where OnceLock will block + // until `get_or_init` in the parent thread is finished + // and the parent is *not* blocked on the child thread making progress. + let queue = queue(); + + let mut receiver = queue.receiver(); + loop { + let job = receiver.blocking_recv(); + job(); + } + }); + } + AgeingPriorityQueue::soft_bounded(QUEUE_SOFT_CAPACITY_PER_THREAD * pool_size) + }) +} + +/// Returns a future that resolves to a `Result` that is `Ok` if `f` returned or `Err` if it panicked. +pub(crate) fn execute( + priority: Priority, + job: F, +) -> impl Future> +where + F: FnOnce() -> T + Send + UnwindSafe + 'static, + T: Send + 'static, +{ + let (tx, rx) = oneshot::channel(); + let job = Box::new(move || { + // Ignore the error if the oneshot receiver was dropped + let _ = tx.send(std::panic::catch_unwind(job)); + }); + queue().send(priority, job); + async { rx.await.expect("channel disconnected") } +} + +pub(crate) fn is_full() -> bool { + queue().is_full() +} + +pub(crate) fn create_queue_size_gauge() -> ObservableGauge { + meter_provider() + .meter("apollo/router") + .u64_observable_gauge("apollo.router.compute_jobs.queued") + .with_description( + "Number of computation jobs (parsing, planning, …) waiting to be scheduled", + ) + .with_callback(move |m| m.observe(queue().queued_count() as u64, &[])) + .init() +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + use std::time::Instant; + + use super::*; + + #[tokio::test] + async fn test_executes_on_different_thread() { + let test_thread = std::thread::current().id(); + let job_thread = execute(Priority::P4, || std::thread::current().id()) + .await + .unwrap(); + assert_ne!(job_thread, test_thread) + } + + #[tokio::test] + async fn test_parallelism() { + if thread_pool_size() < 2 { + return; + } + let start = Instant::now(); + let one = execute(Priority::P8, || { + std::thread::sleep(Duration::from_millis(1_000)); + 1 + }); + let two = execute(Priority::P8, || { + std::thread::sleep(Duration::from_millis(1_000)); + 1 + 1 + }); + tokio::time::sleep(Duration::from_millis(500)).await; + assert_eq!(one.await.unwrap(), 1); + assert_eq!(two.await.unwrap(), 2); + // Evidence of fearless parallel sleep: + assert!(start.elapsed() < Duration::from_millis(1_400)); + } + + #[test] + fn pool_size() { + assert_eq!(thread_poll_size_for_available_parallelism(1), 1); + assert_eq!(thread_poll_size_for_available_parallelism(2), 1); + assert_eq!(thread_poll_size_for_available_parallelism(3), 2); + assert_eq!(thread_poll_size_for_available_parallelism(4), 3); + assert_eq!(thread_poll_size_for_available_parallelism(31), 27); + assert_eq!(thread_poll_size_for_available_parallelism(32), 28); + } +} diff --git a/apollo-router/src/introspection.rs b/apollo-router/src/introspection.rs index a0a539895d..b69b03c68b 100644 --- a/apollo-router/src/introspection.rs +++ b/apollo-router/src/introspection.rs @@ -6,6 +6,7 @@ use apollo_compiler::executable::Selection; use serde_json_bytes::json; use crate::cache::storage::CacheStorage; +use crate::compute_job; use crate::graphql; use crate::query_planner::QueryKey; use crate::services::layers::query_analysis::ParsedDocument; @@ -130,8 +131,9 @@ impl IntrospectionCache { } let schema = schema.clone(); let doc = doc.clone(); + let priority = compute_job::Priority::P1; // Low priority let response = - tokio::task::spawn_blocking(move || Self::execute_introspection(&schema, &doc)) + compute_job::execute(priority, move || Self::execute_introspection(&schema, &doc)) .await .expect("Introspection panicked"); storage.insert(cache_key, response.clone()).await; diff --git a/apollo-router/src/lib.rs b/apollo-router/src/lib.rs index 9ac39c9e23..6f30ab6239 100644 --- a/apollo-router/src/lib.rs +++ b/apollo-router/src/lib.rs @@ -52,10 +52,12 @@ pub mod plugin; #[macro_use] pub(crate) mod metrics; +mod ageing_priority_queue; mod apollo_studio_interop; pub(crate) mod axum_factory; mod batching; mod cache; +mod compute_job; mod configuration; mod context; mod error; diff --git a/apollo-router/src/query_planner/bridge_query_planner.rs b/apollo-router/src/query_planner/bridge_query_planner.rs index e3cc52dcdd..cc058b5555 100644 --- a/apollo-router/src/query_planner/bridge_query_planner.rs +++ b/apollo-router/src/query_planner/bridge_query_planner.rs @@ -29,6 +29,7 @@ use tower::Service; use super::PlanNode; use super::QueryKey; use crate::apollo_studio_interop::generate_usage_reporting; +use crate::compute_job; use crate::configuration::QueryPlannerMode; use crate::error::PlanErrors; use crate::error::QueryPlannerError; @@ -258,7 +259,8 @@ impl PlannerMode { PlannerMode::Rust(rust_planner) => { let doc = doc.clone(); let rust_planner = rust_planner.clone(); - let (plan, mut root_node) = tokio::task::spawn_blocking(move || { + let priority = compute_job::Priority::P8; // High priority + let (plan, mut root_node) = compute_job::execute(priority, move || { let start = Instant::now(); let query_plan_options = QueryPlanOptions { diff --git a/apollo-router/src/query_planner/bridge_query_planner_pool.rs b/apollo-router/src/query_planner/bridge_query_planner_pool.rs index 652bc997b9..5246b79901 100644 --- a/apollo-router/src/query_planner/bridge_query_planner_pool.rs +++ b/apollo-router/src/query_planner/bridge_query_planner_pool.rs @@ -4,6 +4,7 @@ use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::Mutex; +use std::task::Poll; use std::time::Instant; use apollo_compiler::validation::Valid; @@ -21,6 +22,7 @@ use tower::ServiceExt; use super::bridge_query_planner::BridgeQueryPlanner; use super::QueryPlanResult; +use crate::configuration::QueryPlannerMode; use crate::error::QueryPlannerError; use crate::error::ServiceBuildError; use crate::introspection::IntrospectionCache; @@ -36,13 +38,10 @@ static CHANNEL_SIZE: usize = 1_000; #[derive(Clone)] pub(crate) struct BridgeQueryPlannerPool { js_planners: Vec>>, - sender: Sender<( - QueryPlannerRequest, - oneshot::Sender>, - )>, + pool_mode: PoolMode, schema: Arc, subgraph_schemas: Arc>>>, - pool_size_gauge: Arc>>>, + compute_jobs_queue_size_gauge: Arc>>>, v8_heap_used: Arc, v8_heap_used_gauge: Arc>>>, v8_heap_total: Arc, @@ -50,6 +49,20 @@ pub(crate) struct BridgeQueryPlannerPool { introspection_cache: Arc, } +#[derive(Clone)] +enum PoolMode { + Pool { + sender: Sender<( + QueryPlannerRequest, + oneshot::Sender>, + )>, + pool_size_gauge: Arc>>>, + }, + PassThrough { + delegate: BridgeQueryPlanner, + }, +} + impl BridgeQueryPlannerPool { pub(crate) async fn new( old_js_planners: Vec>>, @@ -59,79 +72,100 @@ impl BridgeQueryPlannerPool { ) -> Result { let rust_planner = PlannerMode::maybe_rust(&schema, &configuration)?; - let mut join_set = JoinSet::new(); - - let (sender, receiver) = bounded::<( - QueryPlannerRequest, - oneshot::Sender>, - )>(CHANNEL_SIZE); - let mut old_js_planners_iterator = old_js_planners.into_iter(); // All query planners in the pool now share the same introspection cache. // This allows meaningful gauges, and it makes sense that queries should be cached across all planners. let introspection_cache = Arc::new(IntrospectionCache::new(&configuration)); - for _ in 0..size.into() { - let schema = schema.clone(); - let configuration = configuration.clone(); - let rust_planner = rust_planner.clone(); - let introspection_cache = introspection_cache.clone(); - + let pool_mode; + let js_planners; + let subgraph_schemas; + if let QueryPlannerMode::New = configuration.experimental_query_planner_mode { let old_planner = old_js_planners_iterator.next(); - join_set.spawn(async move { - BridgeQueryPlanner::new( - schema, - configuration, - old_planner, - rust_planner, - introspection_cache, - ) - .await - }); - } - - let mut bridge_query_planners = Vec::new(); - - while let Some(task_result) = join_set.join_next().await { - let bridge_query_planner = - task_result.map_err(|e| ServiceBuildError::ServiceError(Box::new(e)))??; - bridge_query_planners.push(bridge_query_planner); - } - - let subgraph_schemas = bridge_query_planners - .first() - .ok_or_else(|| { - ServiceBuildError::QueryPlannerError(QueryPlannerError::PoolProcessing( - "There should be at least 1 Query Planner service in pool".to_string(), - )) - })? - .subgraph_schemas(); - - let js_planners: Vec<_> = bridge_query_planners - .iter() - .filter_map(|p| p.js_planner()) - .collect(); - - for mut planner in bridge_query_planners.into_iter() { - let receiver = receiver.clone(); - - tokio::spawn(async move { - while let Ok((request, res_sender)) = receiver.recv().await { - let svc = match planner.ready().await { - Ok(svc) => svc, - Err(e) => { - let _ = res_sender.send(Err(e)); + let delegate = BridgeQueryPlanner::new( + schema.clone(), + configuration, + old_planner, + rust_planner, + introspection_cache.clone(), + ) + .await?; + js_planners = delegate.js_planner().into_iter().collect::>(); + subgraph_schemas = delegate.subgraph_schemas(); + pool_mode = PoolMode::PassThrough { delegate } + } else { + let mut join_set = JoinSet::new(); + let (sender, receiver) = bounded::<( + QueryPlannerRequest, + oneshot::Sender>, + )>(CHANNEL_SIZE); + + for _ in 0..size.into() { + let schema = schema.clone(); + let configuration = configuration.clone(); + let rust_planner = rust_planner.clone(); + let introspection_cache = introspection_cache.clone(); + + let old_planner = old_js_planners_iterator.next(); + join_set.spawn(async move { + BridgeQueryPlanner::new( + schema, + configuration, + old_planner, + rust_planner, + introspection_cache, + ) + .await + }); + } - continue; - } - }; + let mut bridge_query_planners = Vec::new(); - let res = svc.call(request).await; + while let Some(task_result) = join_set.join_next().await { + let bridge_query_planner = + task_result.map_err(|e| ServiceBuildError::ServiceError(Box::new(e)))??; + bridge_query_planners.push(bridge_query_planner); + } - let _ = res_sender.send(res); - } - }); + subgraph_schemas = bridge_query_planners + .first() + .ok_or_else(|| { + ServiceBuildError::QueryPlannerError(QueryPlannerError::PoolProcessing( + "There should be at least 1 Query Planner service in pool".to_string(), + )) + })? + .subgraph_schemas(); + + js_planners = bridge_query_planners + .iter() + .filter_map(|p| p.js_planner()) + .collect(); + + for mut planner in bridge_query_planners.into_iter() { + let receiver = receiver.clone(); + + tokio::spawn(async move { + while let Ok((request, res_sender)) = receiver.recv().await { + let svc = match planner.ready().await { + Ok(svc) => svc, + Err(e) => { + let _ = res_sender.send(Err(e)); + + continue; + } + }; + + let res = svc.call(request).await; + + let _ = res_sender.send(res); + } + }); + } + pool_mode = PoolMode::Pool { + sender, + pool_size_gauge: Default::default(), + } } let v8_heap_used: Arc = Default::default(); let v8_heap_total: Arc = Default::default(); @@ -148,10 +182,10 @@ impl BridgeQueryPlannerPool { Ok(Self { js_planners, - sender, + pool_mode, schema, subgraph_schemas, - pool_size_gauge: Default::default(), + compute_jobs_queue_size_gauge: Default::default(), v8_heap_used, v8_heap_used_gauge: Default::default(), v8_heap_total, @@ -160,15 +194,22 @@ impl BridgeQueryPlannerPool { }) } - fn create_pool_size_gauge(&self) -> ObservableGauge { - let sender = self.sender.clone(); - let meter = meter_provider().meter("apollo/router"); - meter - .u64_observable_gauge("apollo.router.query_planning.queued") - .with_description("Number of queries waiting to be planned") - .with_unit(Unit::new("query")) - .with_callback(move |m| m.observe(sender.len() as u64, &[])) - .init() + fn create_pool_size_gauge(&self) { + if let PoolMode::Pool { + sender, + pool_size_gauge, + } = &self.pool_mode + { + let sender = sender.clone(); + let meter = meter_provider().meter("apollo/router"); + let gauge = meter + .u64_observable_gauge("apollo.router.query_planning.queued") + .with_description("Number of queries waiting to be planned") + .with_unit(Unit::new("query")) + .with_callback(move |m| m.observe(sender.len() as u64, &[])) + .init(); + *pool_size_gauge.lock().expect("lock poisoned") = Some(gauge); + } } fn create_heap_used_gauge(&self) -> ObservableGauge { @@ -228,7 +269,11 @@ impl BridgeQueryPlannerPool { pub(super) fn activate(&self) { // Gauges MUST be initialized after a meter provider is created. // When a hot reload happens this means that the gauges must be re-initialized. - *self.pool_size_gauge.lock().expect("lock poisoned") = Some(self.create_pool_size_gauge()); + *self + .compute_jobs_queue_size_gauge + .lock() + .expect("lock poisoned") = Some(crate::compute_job::create_queue_size_gauge()); + self.create_pool_size_gauge(); *self.v8_heap_used_gauge.lock().expect("lock poisoned") = Some(self.create_heap_used_gauge()); *self.v8_heap_total_gauge.lock().expect("lock poisoned") = @@ -244,22 +289,20 @@ impl tower::Service for BridgeQueryPlannerPool { type Future = BoxFuture<'static, Result>; - fn poll_ready( - &mut self, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - if self.sender.is_full() { - std::task::Poll::Ready(Err(QueryPlannerError::PoolProcessing( - "query plan queue is full".into(), - ))) - } else { - std::task::Poll::Ready(Ok(())) + fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll> { + if crate::compute_job::is_full() { + return Poll::Pending; + } + match &self.pool_mode { + PoolMode::Pool { sender, .. } if sender.is_full() => Poll::Ready(Err( + QueryPlannerError::PoolProcessing("query plan queue is full".into()), + )), + _ => Poll::Ready(Ok(())), } } fn call(&mut self, req: QueryPlannerRequest) -> Self::Future { - let (response_sender, response_receiver) = oneshot::channel(); - let sender = self.sender.clone(); + let pool_mode = self.pool_mode.clone(); let get_metrics_future = if let Some(bridge_query_planner) = self.js_planners.first().cloned() { @@ -273,12 +316,22 @@ impl tower::Service for BridgeQueryPlannerPool { }; Box::pin(async move { - let start = Instant::now(); - let _ = sender.send((req, response_sender)).await; - - let res = response_receiver - .await - .map_err(|_| QueryPlannerError::UnhandledPlannerResult)?; + let start; + let res = match pool_mode { + PoolMode::Pool { sender, .. } => { + let (response_sender, response_receiver) = oneshot::channel(); + start = Instant::now(); + let _ = sender.send((req, response_sender)).await; + + response_receiver + .await + .map_err(|_| QueryPlannerError::UnhandledPlannerResult)? + } + PoolMode::PassThrough { mut delegate } => { + start = Instant::now(); + delegate.call(req).await + } + }; f64_histogram!( "apollo.router.query_planning.total.duration", diff --git a/apollo-router/src/services/layers/query_analysis.rs b/apollo-router/src/services/layers/query_analysis.rs index 4af57a5f38..2e2fe77eff 100644 --- a/apollo-router/src/services/layers/query_analysis.rs +++ b/apollo-router/src/services/layers/query_analysis.rs @@ -13,10 +13,10 @@ use http::StatusCode; use lru::LruCache; use router_bridge::planner::UsageReporting; use tokio::sync::Mutex; -use tokio::task; use crate::apollo_studio_interop::generate_extended_references; use crate::apollo_studio_interop::ExtendedReferenceStats; +use crate::compute_job; use crate::context::OPERATION_KIND; use crate::context::OPERATION_NAME; use crate::graphql::Error; @@ -89,7 +89,8 @@ impl QueryAnalysisLayer { // parent let span = tracing::info_span!(QUERY_PARSING_SPAN_NAME, "otel.kind" = "INTERNAL"); - task::spawn_blocking(move || { + let priority = compute_job::Priority::P4; // Medium priority + let job = move || { span.in_scope(|| { Query::parse_document( &query, @@ -98,9 +99,12 @@ impl QueryAnalysisLayer { conf.as_ref(), ) }) - }) - .await - .expect("parse_document task panicked") + }; + // TODO: is this correct? + let job = std::panic::AssertUnwindSafe(job); + compute_job::execute(priority, job) + .await + .expect("Query::parse_document panicked") } pub(crate) async fn supergraph_request( diff --git a/apollo-router/src/services/router/service.rs b/apollo-router/src/services/router/service.rs index 9e379ae3c4..e5792c1a4d 100644 --- a/apollo-router/src/services/router/service.rs +++ b/apollo-router/src/services/router/service.rs @@ -214,6 +214,11 @@ impl Service for RouterService { type Future = BoxFuture<'static, Result>; fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll> { + // This service eventually calls `QueryAnalysisLayer::parse_document()` + // which calls `compute_job::execute()` + if crate::compute_job::is_full() { + return Poll::Pending; + } Poll::Ready(Ok(())) } diff --git a/docs/source/reference/router/telemetry/instrumentation/standard-instruments.mdx b/docs/source/reference/router/telemetry/instrumentation/standard-instruments.mdx index bf0101d802..da9b655036 100644 --- a/docs/source/reference/router/telemetry/instrumentation/standard-instruments.mdx +++ b/docs/source/reference/router/telemetry/instrumentation/standard-instruments.mdx @@ -65,11 +65,15 @@ The coprocessor operations metric has the following attributes: - `apollo_router.query_planning.warmup.duration` - Time spent warming up the query planner queries in seconds. - `apollo.router.query_planning.plan.duration` - Histogram of plan durations isolated to query planning time only. - `apollo.router.query_planning.total.duration` - Histogram of plan durations including queue time. -- `apollo.router.query_planning.queued` - A gauge of the number of queued plans requests. +- `apollo.router.query_planning.queued` - When the legacy planner is used, a gauge of the number of queued plans requests. - `apollo.router.query_planning.plan.evaluated_plans` - Histogram of the number of evaluated query plans. - `apollo.router.v8.heap.used` - heap memory used by V8, in bytes. - `apollo.router.v8.heap.total` - total heap allocated by V8, in bytes. +### Compute jobs + +- `apollo.router.compute_jobs.queued` - A gauge of the number of jobs queued for the thread pool dedicated to CPU-heavy components like GraphQL parsing and validation, and the (new) query planner. + ### Uplink