Skip to content

Commit

Permalink
Merge branch 'master' into 2318-make-all-limits-of-the-of-the-complex…
Browse files Browse the repository at this point in the history
…ity-to-be-configurable-via-the-cli
  • Loading branch information
xgreenx authored Oct 14, 2024
2 parents 645db49 + b9e7083 commit 1290d66
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 36 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]
### Added
- [2350](https://github.com/FuelLabs/fuel-core/pull/2350): Added a new CLI flag `graphql-number-of-threads` to limit the number of threads used by the GraphQL service. The default value is `2`, `0` enables the old behavior.

### Fixed
- [2345](https://github.com/FuelLabs/fuel-core/pull/2345): In PoA increase priority of block creation timer trigger compare to txpool event management

### Changed
- [2334](https://github.com/FuelLabs/fuel-core/pull/2334): Prepare the GraphQL service for the switching to `async` methods.
- [2350](https://github.com/FuelLabs/fuel-core/pull/2350): Limited the number of threads used by the GraphQL service.

### Added
- [2335](https://github.com/FuelLabs/fuel-core/pull/2335): Added CLI arguments for configuring GraphQL query costs.
Expand Down
1 change: 1 addition & 0 deletions bin/fuel-core/src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ impl Command {
let config = Config {
graphql_config: GraphQLConfig {
addr,
number_of_threads: graphql.graphql_number_of_threads,
max_queries_depth: graphql.graphql_max_depth,
max_queries_complexity: graphql.graphql_max_complexity,
max_queries_recursive_depth: graphql.graphql_max_recursive_depth,
Expand Down
4 changes: 4 additions & 0 deletions bin/fuel-core/src/cli/run/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ pub struct GraphQLArgs {
#[clap(long = "port", default_value = "4000", env)]
pub port: u16,

/// The number of threads to use for the GraphQL service.
#[clap(long = "graphql-number-of-threads", default_value = "2", env)]
pub graphql_number_of_threads: usize,

/// The max depth of GraphQL queries.
#[clap(long = "graphql-max-depth", default_value = "16", env)]
pub graphql_max_depth: usize,
Expand Down
1 change: 1 addition & 0 deletions crates/fuel-core/src/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub struct Config {
#[derive(Clone, Debug)]
pub struct ServiceConfig {
pub addr: SocketAddr,
pub number_of_threads: usize,
pub max_queries_depth: usize,
pub max_queries_complexity: usize,
pub max_queries_recursive_depth: usize,
Expand Down
39 changes: 26 additions & 13 deletions crates/fuel-core/src/graphql_api/api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,8 @@ use axum::{
Json,
Router,
};
use fuel_core_metrics::futures::{
metered_future::MeteredFuture,
FuturesMetrics,
};
use fuel_core_services::{
AsyncProcessor,
RunnableService,
RunnableTask,
StateWatcher,
Expand All @@ -80,6 +77,7 @@ use std::{
TcpListener,
},
pin::Pin,
sync::Arc,
};
use tokio_stream::StreamExt;
use tower::limit::ConcurrencyLimitLayer;
Expand Down Expand Up @@ -116,6 +114,7 @@ pub struct GraphqlService {
pub struct ServerParams {
router: Router,
listener: TcpListener,
number_of_threads: usize,
}

pub struct Task {
Expand All @@ -125,7 +124,7 @@ pub struct Task {

#[derive(Clone)]
struct ExecutorWithMetrics {
metric: FuturesMetrics,
processor: Arc<AsyncProcessor>,
}

impl<F> Executor<F> for ExecutorWithMetrics
Expand All @@ -134,9 +133,7 @@ where
F::Output: Send + 'static,
{
fn execute(&self, fut: F) {
let future = MeteredFuture::new(fut, self.metric.clone());

tokio::task::spawn(future);
let _ = self.processor.try_spawn(fut);
}
}

Expand All @@ -160,14 +157,25 @@ impl RunnableService for GraphqlService {
params: Self::TaskParams,
) -> anyhow::Result<Self::Task> {
let mut state = state.clone();
let ServerParams { router, listener } = params;
let metric = ExecutorWithMetrics {
metric: FuturesMetrics::obtain_futures_metrics("GraphQLFutures"),
let ServerParams {
router,
listener,
number_of_threads,
} = params;

let processor = AsyncProcessor::new(
"GraphQLFutures",
number_of_threads,
tokio::sync::Semaphore::MAX_PERMITS,
)?;

let executor = ExecutorWithMetrics {
processor: Arc::new(processor),
};

let server = axum::Server::from_tcp(listener)
.unwrap()
.executor(metric)
.executor(executor)
.serve(router.into_make_service())
.with_graceful_shutdown(async move {
state
Expand Down Expand Up @@ -231,6 +239,7 @@ where
let body_limit = config.config.request_body_bytes_limit;
let max_queries_resolver_recursive_depth =
config.config.max_queries_resolver_recursive_depth;
let number_of_threads = config.config.number_of_threads;

let schema = schema
.limit_complexity(config.config.max_queries_complexity)
Expand Down Expand Up @@ -295,7 +304,11 @@ where

Ok(Service::new_with_params(
GraphqlService { bound_address },
ServerParams { router, listener },
ServerParams {
router,
listener,
number_of_threads,
},
))
}

Expand Down
1 change: 1 addition & 0 deletions crates/fuel-core/src/service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ impl Config {
std::net::Ipv4Addr::new(127, 0, 0, 1).into(),
0,
),
number_of_threads: 0,
max_queries_depth: 16,
max_queries_complexity: 80000,
max_queries_recursive_depth: 16,
Expand Down
8 changes: 4 additions & 4 deletions crates/services/consensus_module/poa/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,10 +573,6 @@ where
should_continue = false;
}
}
_ = self.new_txs_watcher.changed() => {
self.on_txpool_event().await.context("While processing txpool event")?;
should_continue = true;
}
_ = next_block_production => {
match self.on_timer().await.context("While processing timer event") {
Ok(()) => should_continue = true,
Expand All @@ -587,6 +583,10 @@ where
}
};
}
_ = self.new_txs_watcher.changed() => {
self.on_txpool_event().await.context("While processing txpool event")?;
should_continue = true;
}
}

Ok(should_continue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,3 +478,32 @@ async fn interval_trigger_produces_blocks_in_the_future_when_time_rewinds() {
// similarly to how it works when time is lagging.
assert_eq!(second_block_time, start_time + block_time.as_secs() * 2);
}

#[tokio::test]
async fn interval_trigger_even_if_queued_tx_events() {
let block_time = Duration::from_secs(2);
let mut ctx = DefaultContext::new(Config {
trigger: Trigger::Interval { block_time },
signer: SignMode::Key(test_signing_key()),
metrics: false,
..Default::default()
})
.await;
let block_creation_notifier = Arc::new(Notify::new());
tokio::task::spawn({
let notifier = ctx.new_txs_notifier.clone();
async move {
loop {
time::sleep(Duration::from_nanos(10)).await;
notifier.send_replace(());
}
}
});
let block_creation_waiter = block_creation_notifier.clone();
tokio::task::spawn(async move {
ctx.block_import.recv().await.unwrap();
dbg!("First block produced");
block_creation_notifier.notify_waiters();
});
block_creation_waiter.notified().await;
}
84 changes: 65 additions & 19 deletions crates/services/src/async_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tokio::{
OwnedSemaphorePermit,
Semaphore,
},
task::JoinHandle,
};

/// A processor that can execute async tasks with a limit on the number of tasks that can be
Expand Down Expand Up @@ -76,40 +77,50 @@ impl AsyncProcessor {
}

/// Spawn a task with a reservation.
pub fn spawn_reserved<F>(&self, reservation: AsyncReservation, future: F)
pub fn spawn_reserved<F>(
&self,
reservation: AsyncReservation,
future: F,
) -> JoinHandle<F::Output>
where
F: Future<Output = ()> + Send + 'static,
F: Future + Send + 'static,
F::Output: Send,
{
let permit = reservation.0;
let future = async move {
let permit = permit;
future.await;
drop(permit)
let result = future.await;
drop(permit);
result
};
let metered_future = MeteredFuture::new(future, self.metric.clone());
if let Some(runtime) = &self.thread_pool {
runtime.spawn(metered_future);
runtime.spawn(metered_future)
} else {
tokio::spawn(metered_future);
tokio::spawn(metered_future)
}
}

/// Tries to spawn a task. If the task cannot be spawned, returns an error.
pub fn try_spawn<F>(&self, future: F) -> Result<(), OutOfCapacity>
pub fn try_spawn<F>(&self, future: F) -> Result<JoinHandle<F::Output>, OutOfCapacity>
where
F: Future<Output = ()> + Send + 'static,
F: Future + Send + 'static,
F::Output: Send,
{
let reservation = self.reserve()?;
self.spawn_reserved(reservation, future);
Ok(())
Ok(self.spawn_reserved(reservation, future))
}
}

#[cfg(test)]
#[allow(clippy::bool_assert_comparison)]
#[allow(non_snake_case)]
mod tests {
use super::*;
use futures::future::join_all;
use std::{
collections::HashSet,
iter,
thread::sleep,
time::Duration,
};
Expand All @@ -129,11 +140,45 @@ mod tests {
});

// Then
assert_eq!(result, Ok(()));
result.expect("Expected Ok result");
sleep(Duration::from_secs(1));
receiver.try_recv().unwrap();
}

#[tokio::test]
async fn one_spawn_single_tasks_works__thread_id_is_different_than_main() {
// Given
let number_of_threads = 10;
let number_of_pending_tasks = 10000;
let heavy_task_processor =
AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks)
.unwrap();
let main_handler = tokio::spawn(async move { std::thread::current().id() });
let main_id = main_handler.await.unwrap();

// When
let futures = iter::repeat_with(|| {
heavy_task_processor
.try_spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
std::thread::current().id()
})
.unwrap()
})
.take(number_of_pending_tasks)
.collect::<Vec<_>>();

// Then
let thread_ids = join_all(futures).await;
let unique_thread_ids = thread_ids
.into_iter()
.map(|r| r.unwrap())
.collect::<HashSet<_>>();

assert!(!unique_thread_ids.contains(&main_id));
assert_eq!(unique_thread_ids.len(), number_of_threads);
}

#[test]
fn second_spawn_fails_when_limit_is_one_and_first_in_progress() {
// Given
Expand All @@ -143,15 +188,16 @@ mod tests {
let first_spawn_result = heavy_task_processor.try_spawn(async move {
sleep(Duration::from_secs(1));
});
assert_eq!(first_spawn_result, Ok(()));
first_spawn_result.expect("Expected Ok result");

// When
let second_spawn_result = heavy_task_processor.try_spawn(async move {
sleep(Duration::from_secs(1));
});

// Then
assert_eq!(second_spawn_result, Err(OutOfCapacity));
let err = second_spawn_result.expect_err("Expected Ok result");
assert_eq!(err, OutOfCapacity);
}

#[test]
Expand All @@ -166,7 +212,7 @@ mod tests {
sleep(Duration::from_secs(1));
sender.send(()).unwrap();
});
assert_eq!(first_spawn, Ok(()));
first_spawn.expect("Expected Ok result");
futures::executor::block_on(async move {
receiver.await.unwrap();
});
Expand All @@ -177,7 +223,7 @@ mod tests {
});

// Then
assert_eq!(second_spawn, Ok(()));
second_spawn.expect("Expected Ok result");
}

#[test]
Expand All @@ -194,7 +240,7 @@ mod tests {
});

// Then
assert_eq!(result, Ok(()));
result.expect("Expected Ok result");
}
}

Expand All @@ -217,7 +263,7 @@ mod tests {
sleep(Duration::from_secs(1));
broadcast_sender.send(()).unwrap();
});
assert_eq!(result, Ok(()));
result.expect("Expected Ok result");
}
drop(broadcast_sender);

Expand Down Expand Up @@ -249,7 +295,7 @@ mod tests {
sleep(Duration::from_secs(1));
broadcast_sender.send(()).unwrap();
});
assert_eq!(result, Ok(()));
result.expect("Expected Ok result");
}
drop(broadcast_sender);

Expand Down Expand Up @@ -281,7 +327,7 @@ mod tests {
tokio::time::sleep(Duration::from_secs(1)).await;
broadcast_sender.send(()).unwrap();
});
assert_eq!(result, Ok(()));
result.expect("Expected Ok result");
}
drop(broadcast_sender);

Expand Down

0 comments on commit 1290d66

Please sign in to comment.