Skip to content

Commit

Permalink
feat(host): add concurrency limit to proof tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
petarvujovic98 committed Jul 3, 2024
1 parent 8f03ef0 commit b768d2a
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
4 changes: 3 additions & 1 deletion host/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,9 @@ impl ProverState {
let (task_channel, receiver) = mpsc::channel::<TaskChannelOpts>(opts.concurrency_limit);

tokio::spawn(async move {
ProofActor::new(receiver).run().await;
ProofActor::new(receiver, opts.concurrency_limit)
.run()
.await;
});

Ok(Self {
Expand Down
12 changes: 9 additions & 3 deletions host/src/proof.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::sync::Arc;

use raiko_core::{
interfaces::{ProofRequest, RaikoError},
provider::{get_task_data, rpc::RpcBlockDataProvider},
Raiko,
};
use raiko_lib::{consts::SupportedChainSpecs, Measurement};
use raiko_task_manager::{get_task_manager, TaskManager, TaskStatus};
use tokio::sync::mpsc::Receiver;
use tokio::sync::{mpsc::Receiver, Semaphore};
use tracing::{error, info};

use crate::{
Expand All @@ -24,16 +26,20 @@ use crate::{

pub struct ProofActor {
rx: Receiver<TaskChannelOpts>,
task_count: usize,
}

impl ProofActor {
pub fn new(rx: Receiver<TaskChannelOpts>) -> Self {
Self { rx }
pub fn new(rx: Receiver<TaskChannelOpts>, task_count: usize) -> Self {
Self { rx, task_count }
}

pub async fn run(&mut self) {
let semaphore = Arc::new(Semaphore::new(self.task_count));
while let Some(message) = self.rx.recv().await {
let permit = Arc::clone(&semaphore).acquire_owned().await;
tokio::spawn(async move {
let _permit = permit;
if let Err(error) = Self::handle_message(message).await {
error!("Worker failed due to: {error:?}");
}
Expand Down

0 comments on commit b768d2a

Please sign in to comment.