diff --git a/host/src/lib.rs b/host/src/lib.rs index 3cff28cd5..8d30f9228 100644 --- a/host/src/lib.rs +++ b/host/src/lib.rs @@ -169,7 +169,9 @@ impl ProverState { let (task_channel, receiver) = mpsc::channel::(opts.concurrency_limit); tokio::spawn(async move { - ProofActor::new(receiver).run().await; + ProofActor::new(receiver, opts.concurrency_limit) + .run() + .await; }); Ok(Self { diff --git a/host/src/proof.rs b/host/src/proof.rs index d5835a734..c8fa3e87b 100644 --- a/host/src/proof.rs +++ b/host/src/proof.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use raiko_core::{ interfaces::{ProofRequest, RaikoError}, provider::{get_task_data, rpc::RpcBlockDataProvider}, @@ -5,7 +7,7 @@ use raiko_core::{ }; use raiko_lib::{consts::SupportedChainSpecs, Measurement}; use raiko_task_manager::{get_task_manager, TaskManager, TaskStatus}; -use tokio::sync::mpsc::Receiver; +use tokio::sync::{mpsc::Receiver, Semaphore}; use tracing::{error, info}; use crate::{ @@ -24,16 +26,20 @@ use crate::{ pub struct ProofActor { rx: Receiver, + task_count: usize, } impl ProofActor { - pub fn new(rx: Receiver) -> Self { - Self { rx } + pub fn new(rx: Receiver, task_count: usize) -> Self { + Self { rx, task_count } } pub async fn run(&mut self) { + let semaphore = Arc::new(Semaphore::new(self.task_count)); while let Some(message) = self.rx.recv().await { + let permit = Arc::clone(&semaphore).acquire_owned().await; tokio::spawn(async move { + let _permit = permit; if let Err(error) = Self::handle_message(message).await { error!("Worker failed due to: {error:?}"); }