Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move heavy computation to a thread pool with a priority queue #6247

Merged
merged 8 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .changesets/fix_simon_compute_jobs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
### Move heavy computation to a thread pool with a priority queue

These components can take non-trivial amounts of CPU time:

* GraphQL parsing
* GraphQL validation
* Query planning
* Schema introspection

In order to avoid blocking threads that execute asynchronous code,
they are now run (in their respective Rust implementations)
in a new pool of as many threads as CPU cores are available.
Previously we used Tokio’s [`spawn_blocking`] for this purpose,
but it is appears to be intended for blocking I/O
and uses up to 512 threads so it isn’t a great fit for computation tasks.

[`spawn_blocking`]: https://docs.rs/tokio/latest/tokio/task/fn.spawn_blocking.html

By [@SimonSapin](https://github.com/SimonSapin) in https://github.com/apollographql/router/pull/6247
147 changes: 147 additions & 0 deletions apollo-router/src/ageing_priority_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use std::sync::atomic::AtomicUsize;
garypen marked this conversation as resolved.
Show resolved Hide resolved
use std::sync::atomic::Ordering;

/// Items with higher priority value get handled sooner
#[allow(unused)]
pub(crate) enum Priority {
P1 = 1,
P2,
P3,
P4,
P5,
P6,
P7,
P8,
}

const INNER_QUEUES_COUNT: usize = Priority::P8 as usize - Priority::P1 as usize + 1;

/// Indices start at 0 for highest priority
const fn index_from_priority(priority: Priority) -> usize {
Priority::P8 as usize - priority as usize
}

const _: () = {
assert!(index_from_priority(Priority::P1) == 7);
assert!(index_from_priority(Priority::P8) == 0);
};

pub(crate) struct AgeingPriorityQueue<T>
where
T: Send + 'static,
{
/// Items in **lower** indices queues are handled sooner
inner_queues:
[(crossbeam_channel::Sender<T>, crossbeam_channel::Receiver<T>); INNER_QUEUES_COUNT],
queued_count: AtomicUsize,
soft_capacity: usize,
}

pub(crate) struct Receiver<'a, T>
where
T: Send + 'static,
{
shared: &'a AgeingPriorityQueue<T>,
select: crossbeam_channel::Select<'a>,
}

impl<T> AgeingPriorityQueue<T>
where
T: Send + 'static,
{
pub(crate) fn soft_bounded(soft_capacity: usize) -> Self {
Self {
// Using unbounded channels: callers must use `is_full` to implement backpressure
inner_queues: std::array::from_fn(|_| crossbeam_channel::unbounded()),
queued_count: AtomicUsize::new(0),
soft_capacity,
}
}

pub(crate) fn queued_count(&self) -> usize {
self.queued_count.load(Ordering::Relaxed)
}

pub(crate) fn is_full(&self) -> bool {
self.queued_count() >= self.soft_capacity
}

/// Panics if `priority` is not in `AVAILABLE_PRIORITIES`
pub(crate) fn send(&self, priority: Priority, message: T) {
self.queued_count.fetch_add(1, Ordering::Relaxed);
let (inner_sender, _) = &self.inner_queues[index_from_priority(priority)];
inner_sender.send(message).expect("disconnected channel")
}

pub(crate) fn receiver(&self) -> Receiver<'_, T> {
let mut select = crossbeam_channel::Select::new();
for (_, inner_receiver) in &self.inner_queues {
select.recv(inner_receiver);
}
Receiver {
shared: self,
select,
}
}
}

impl<'a, T> Receiver<'a, T>
where
T: Send + 'static,
{
pub(crate) fn blocking_recv(&mut self) -> T {
loop {
// Block until something is ready.
// Ignore the returned index because it is "random" when multiple operations are ready.
self.select.ready();
// Check inner channels in priority order instead:
for (index, (_, inner_receiver)) in self.shared.inner_queues.iter().enumerate() {
if let Ok(message) = inner_receiver.try_recv() {
self.shared.queued_count.fetch_sub(1, Ordering::Relaxed);
self.age(index);
return message;
}
}
// Another thread raced us to it or `ready()` returned spuriously, try again
}
}

// Promote some messages from priorities lower (higher indices) than `message_consumed_at_index`
fn age(&self, message_consumed_at_index: usize) {
for window in self.shared.inner_queues[message_consumed_at_index..].windows(2) {
let [higher_priority, lower_priority] = window else {
panic!("expected windows of length 2")
};
let (higher_priority_sender, _) = higher_priority;
let (_, lower_priority_receiver) = lower_priority;
if let Ok(message) = lower_priority_receiver.try_recv() {
higher_priority_sender
.send(message)
.expect("disconnected channel")
}
}
}
}

#[test]
fn test_priorities() {
let queue = AgeingPriorityQueue::soft_bounded(3);
assert_eq!(queue.queued_count(), 0);
assert!(!queue.is_full());
queue.send(Priority::P1, "p1");
assert!(!queue.is_full());
queue.send(Priority::P2, "p2");
assert!(!queue.is_full());
queue.send(Priority::P3, "p3");
// The queue is now "full" but sending still works, it’s up to the caller to stop sending
assert!(queue.is_full());
queue.send(Priority::P2, "p2 again");
assert_eq!(queue.queued_count(), 4);

let mut receiver = queue.receiver();
assert_eq!(receiver.blocking_recv(), "p3");
assert_eq!(receiver.blocking_recv(), "p2");
assert_eq!(receiver.blocking_recv(), "p2 again");
assert_eq!(receiver.blocking_recv(), "p1");
assert_eq!(queue.queued_count(), 0);
}
116 changes: 116 additions & 0 deletions apollo-router/src/compute_job.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use std::future::Future;
garypen marked this conversation as resolved.
Show resolved Hide resolved
use std::num::NonZeroUsize;
use std::panic::UnwindSafe;
use std::sync::OnceLock;

use opentelemetry::metrics::MeterProvider as _;
use opentelemetry::metrics::ObservableGauge;
use tokio::sync::oneshot;

use crate::ageing_priority_queue::AgeingPriorityQueue;
pub(crate) use crate::ageing_priority_queue::Priority;
use crate::metrics::meter_provider;

/// We generate backpressure in tower `poll_ready` when reaching this many queued items
// TODO: what’s a good number? should it be configurable?
const QUEUE_SOFT_CAPACITY: usize = 100;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it should be configurable initially. I think this queue represents the tradeoff between memory and losing time here vs being re-scheduled onto a different router, if one is available. If we are rejected from the queue here, then we know at least we have to spend the time/work to move the job elsewhere.

That's hard to quantify, but it's likely in the order of milliseconds. Perhaps we can workshop up a rough calculation based on this thinking?


// TODO: should this be configurable?
fn thread_pool_size() -> NonZeroUsize {
std::thread::available_parallelism().expect("available_parallelism() failed")
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some open questions here

My opinion is that if we make something configurable just because we don’t know what a good value would be, most users won’t know either.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Better to try and think of a good default and only make it configurable (if ever) later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be available - 1 so we keep 1 core free to handle traffic? Or is it fine to rely on the OS scheduler to still let traffic go through the router?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking with the initial PR was to rely on the OS scheduler, but minus one might be ok too. The downside is that for example minus one out of 2 available cores has a much bigger impact that minus one out of 32 cores

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's my proposal:

size: max(1, available - (ceiling(available/8)))

WORKINGS:
AVAILABLE: 1 POOL SIZE: 1
AVAILABLE: 2 POOL SIZE: 1
AVAILABLE: 3 POOL SIZE: 2
AVAILABLE: 4 POOL SIZE: 3
AVAILABLE: 5 POOL SIZE: 4
...
AVAILABLE: 8 POOL SIZE: 7
AVAILABLE: 9 POOL SIZE: 7
...
AVAILABLE: 16 POOL SIZE: 14
AVAILABLE: 17 POOL SIZE: 14
...
AVAILABLE: 32 POOL SIZE: 28

Tweaks on the basic approach are welcome, but it seems to offer reasonable scaling for query planning. We can always refine it later.


type Job = Box<dyn FnOnce() + Send + 'static>;

fn queue() -> &'static AgeingPriorityQueue<Job> {
static QUEUE: OnceLock<AgeingPriorityQueue<Job>> = OnceLock::new();
QUEUE.get_or_init(|| {
for _ in 0..thread_pool_size().get() {
std::thread::spawn(|| {
// This looks like we need the queue before creating the queue,
// but it happens in a child thread where OnceLock will block
// until `get_or_init` in the parent thread is finished
// and the parent is *not* blocked on the child thread making progress.
let queue = queue();

let mut receiver = queue.receiver();
loop {
let job = receiver.blocking_recv();
job();
}
});
}
AgeingPriorityQueue::soft_bounded(QUEUE_SOFT_CAPACITY)
})
}

/// Returns a future that resolves to a `Result` that is `Ok` if `f` returned or `Err` if it panicked.
pub(crate) fn execute<T, F>(
priority: Priority,
job: F,
) -> impl Future<Output = std::thread::Result<T>>
where
F: FnOnce() -> T + Send + UnwindSafe + 'static,
T: Send + 'static,
{
let (tx, rx) = oneshot::channel();
let job = Box::new(move || {
// Ignore the error if the oneshot receiver was dropped
let _ = tx.send(std::panic::catch_unwind(job));
});
queue().send(priority, job);
async { rx.await.expect("channel disconnected") }
}

pub(crate) fn is_full() -> bool {
queue().is_full()
}

pub(crate) fn create_queue_size_gauge() -> ObservableGauge<u64> {
meter_provider()
.meter("apollo/router")
.u64_observable_gauge("apollo.router.compute_jobs.queued")
.with_description(
"Number of computation jobs (parsing, planning, …) waiting to be scheduled",
)
.with_callback(move |m| m.observe(queue().queued_count() as u64, &[]))
.init()
Comment on lines +92 to +99
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new gauge metric. apollo.router.query_planning.queued will only exist when the legacy planner is used. It is somewhat replaced by the new metric but not exactly since the new queue also contains parsing+validation jobs and introspection jobs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new metric should be documented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve added a short description in docs/source/reference/router/telemetry/instrumentation/standard-instruments.mdx. Are there other places to add it to?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's the correct location.

}

#[cfg(test)]
mod tests {
use std::time::Duration;
use std::time::Instant;

use super::*;

#[tokio::test]
async fn test_executes_on_different_thread() {
let test_thread = std::thread::current().id();
let job_thread = execute(Priority::P4, || std::thread::current().id())
.await
.unwrap();
assert_ne!(job_thread, test_thread)
}

#[tokio::test]
async fn test_parallelism() {
if thread_pool_size().get() < 2 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:)

return;
}
let start = Instant::now();
let one = execute(Priority::P8, || {
std::thread::sleep(Duration::from_millis(1_000));
1
});
let two = execute(Priority::P8, || {
std::thread::sleep(Duration::from_millis(1_000));
1 + 1
});
tokio::time::sleep(Duration::from_millis(500)).await;
assert_eq!(one.await.unwrap(), 1);
assert_eq!(two.await.unwrap(), 2);
// Evidence of fearless parallel sleep:
assert!(start.elapsed() < Duration::from_millis(1_400));
}
}
4 changes: 3 additions & 1 deletion apollo-router/src/introspection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use apollo_compiler::executable::Selection;
use serde_json_bytes::json;

use crate::cache::storage::CacheStorage;
use crate::compute_job;
use crate::graphql;
use crate::query_planner::QueryKey;
use crate::services::layers::query_analysis::ParsedDocument;
Expand Down Expand Up @@ -130,8 +131,9 @@ impl IntrospectionCache {
}
let schema = schema.clone();
let doc = doc.clone();
let priority = compute_job::Priority::P1; // Low priority
let response =
tokio::task::spawn_blocking(move || Self::execute_introspection(&schema, &doc))
compute_job::execute(priority, move || Self::execute_introspection(&schema, &doc))
.await
.expect("Introspection panicked");
storage.insert(cache_key, response.clone()).await;
Expand Down
2 changes: 2 additions & 0 deletions apollo-router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ pub mod plugin;
#[macro_use]
pub(crate) mod metrics;

mod ageing_priority_queue;
mod apollo_studio_interop;
pub(crate) mod axum_factory;
mod batching;
mod cache;
mod compute_job;
mod configuration;
mod context;
mod error;
Expand Down
4 changes: 3 additions & 1 deletion apollo-router/src/query_planner/bridge_query_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use tower::Service;
use super::PlanNode;
use super::QueryKey;
use crate::apollo_studio_interop::generate_usage_reporting;
use crate::compute_job;
use crate::configuration::QueryPlannerMode;
use crate::error::PlanErrors;
use crate::error::QueryPlannerError;
Expand Down Expand Up @@ -258,7 +259,8 @@ impl PlannerMode {
PlannerMode::Rust(rust_planner) => {
let doc = doc.clone();
let rust_planner = rust_planner.clone();
let (plan, mut root_node) = tokio::task::spawn_blocking(move || {
let priority = compute_job::Priority::P8; // High priority
let (plan, mut root_node) = compute_job::execute(priority, move || {
let start = Instant::now();

let query_plan_options = QueryPlanOptions {
Expand Down
Loading