Skip to content

Commit

Permalink
fix(core,host,lib): handle async boundary
Browse files Browse the repository at this point in the history
  • Loading branch information
petarvujovic98 committed Jul 17, 2024
1 parent 6551a65 commit f1af18d
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 15 deletions.
4 changes: 2 additions & 2 deletions core/src/interfaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl ProofType {
input: GuestInput,
output: &GuestOutput,
config: &Value,
store: &mut dyn IdWrite,
store: Option<&mut dyn IdWrite>,
) -> RaikoResult<Proof> {
let mut proof = match self {
ProofType::Native => NativeProver::run(input.clone(), output, config, store)
Expand Down Expand Up @@ -212,7 +212,7 @@ impl ProofType {
pub async fn cancel_proof(
&self,
proof_key: ProofKey,
read: &mut dyn IdStore,
read: Box<&mut dyn IdStore>,
) -> RaikoResult<()> {
let _ = match self {
ProofType::Native => NativeProver::cancel(proof_key.clone(), read)
Expand Down
8 changes: 6 additions & 2 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl Raiko {
&self,
input: GuestInput,
output: &GuestOutput,
store: &mut dyn IdWrite,
store: Option<&mut dyn IdWrite>,
) -> RaikoResult<Proof> {
let config = serde_json::to_value(&self.request)?;
self.request
Expand All @@ -114,7 +114,11 @@ impl Raiko {
.await
}

pub async fn cancel(&self, proof_key: ProofKey, read: &mut dyn IdStore) -> RaikoResult<()> {
pub async fn cancel(
&self,
proof_key: ProofKey,
read: Box<&mut dyn IdStore>,
) -> RaikoResult<()> {
self.request.proof_type.cancel_proof(proof_key, read).await
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/prover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl Prover for NativeProver {
input: GuestInput,
output: &GuestOutput,
config: &ProverConfig,
_store: &mut dyn IdWrite,
_store: Option<&mut dyn IdWrite>,
) -> ProverResult<Proof> {
let param =
config
Expand Down Expand Up @@ -64,7 +64,7 @@ impl Prover for NativeProver {
})
}

async fn cancel(_proof_key: ProofKey, _read: &mut dyn IdStore) -> ProverResult<()> {
async fn cancel(_proof_key: ProofKey, _read: Box<&mut dyn IdStore>) -> ProverResult<()> {
Ok(())
}
}
14 changes: 8 additions & 6 deletions host/src/proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use raiko_lib::{
prover::{IdWrite, Proof},
Measurement,
};
use raiko_tasks::{get_task_manager, TaskDescriptor, TaskManager, TaskStatus};
use raiko_tasks::{get_task_manager, TaskDescriptor, TaskManager, TaskManagerWrapper, TaskStatus};
use tokio::{
select,
sync::{mpsc::Receiver, Mutex, OwnedSemaphorePermit, Semaphore},
Expand Down Expand Up @@ -59,7 +59,7 @@ impl ProofActor {

let mut manager = get_task_manager(&self.opts.clone().into());
key.proof_system
.cancel_proof((key.chain_id, key.blockhash), &mut manager)
.cancel_proof((key.chain_id, key.blockhash), Box::new(&mut manager))
.await?;
task.cancel();
Ok(())
Expand Down Expand Up @@ -120,7 +120,9 @@ impl ProofActor {
while let Some(message) = self.receiver.recv().await {
match message {
Message::Cancel(key) => {
self.cancel_task(key).await;
if let Err(error) = self.cancel_task(key).await {
error!("Failed to cancel task: {error}")
}
}
Message::Task(proof_request) => {
let permit = Arc::clone(&semaphore)
Expand Down Expand Up @@ -154,7 +156,7 @@ impl ProofActor {
.await?;

let (status, proof) =
match handle_proof(&proof_request, opts, chain_specs, &mut manager).await {
match handle_proof(&proof_request, opts, chain_specs, Some(&mut manager)).await {
Err(error) => {
error!("{error}");
(error.into(), None)
Expand All @@ -173,7 +175,7 @@ pub async fn handle_proof(
proof_request: &ProofRequest,
opts: &Opts,
chain_specs: &SupportedChainSpecs,
store: &mut dyn IdWrite,
store: Option<&mut TaskManagerWrapper>,
) -> HostResult<Proof> {
info!(
"# Generating proof for block {} on {}",
Expand Down Expand Up @@ -227,7 +229,7 @@ pub async fn handle_proof(
memory::reset_stats();
let measurement = Measurement::start("Generating proof...", false);
let proof = raiko
.prove(input.clone(), &output, store)
.prove(input.clone(), &output, store.map(|s| s as &mut dyn IdWrite))
.await
.map_err(|e| {
let total_time = total_time.stop_with("====> Proof generation failed");
Expand Down
1 change: 1 addition & 0 deletions host/src/server/api/v1/proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ async fn proof_handler(
&proof_request,
&prover_state.opts,
&prover_state.chain_specs,
None,
)
.await
.map_err(|e| {
Expand Down
6 changes: 3 additions & 3 deletions lib/src/prover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct Proof {
pub kzg_proof: Option<String>,
}

pub trait IdWrite {
pub trait IdWrite: Send {
fn store_id(&mut self, key: ProofKey, id: String) -> ProverResult<()>;

fn remove_id(&mut self, key: ProofKey) -> ProverResult<()>;
Expand All @@ -53,8 +53,8 @@ pub trait Prover {
input: GuestInput,
output: &GuestOutput,
config: &ProverConfig,
store: &mut dyn IdWrite,
store: Option<&mut dyn IdWrite>,
) -> ProverResult<Proof>;

async fn cancel(proof_key: ProofKey, read: &mut dyn IdStore) -> ProverResult<()>;
async fn cancel(proof_key: ProofKey, read: Box<&mut dyn IdStore>) -> ProverResult<()>;
}

0 comments on commit f1af18d

Please sign in to comment.