From 144e4bd933a8629977120fd10bcda01a5e1237f7 Mon Sep 17 00:00:00 2001 From: Petar Vujovic Date: Wed, 3 Jul 2024 13:30:19 +0200 Subject: [PATCH] feat(host): handle task cancell status and refactor handler --- host/src/server/api/v2/proof.rs | 103 +++++++++++++++++++------------- 1 file changed, 60 insertions(+), 43 deletions(-) diff --git a/host/src/server/api/v2/proof.rs b/host/src/server/api/v2/proof.rs index 9d89b85e2..5a2b36c70 100644 --- a/host/src/server/api/v2/proof.rs +++ b/host/src/server/api/v2/proof.rs @@ -1,8 +1,9 @@ use axum::{debug_handler, extract::State, routing::post, Json, Router}; use raiko_core::{interfaces::ProofRequest, provider::get_task_data}; -use raiko_task_manager::{get_task_manager, EnqueueTaskParams, TaskManager, TaskStatus}; +use raiko_task_manager::{ + get_task_manager, EnqueueTaskParams, TaskManager, TaskProvingStatus, TaskStatus, +}; use serde_json::Value; -use tracing::info; use utoipa::OpenApi; use crate::{ @@ -62,12 +63,8 @@ async fn proof_handler( ) .await?; - if status.is_empty() { - info!( - "# Generating proof for block {} on {}", - proof_request.block_number, proof_request.network - ); - + let Some(TaskProvingStatus(latest_status, ..)) = status.last() else { + // If there are no tasks with provided config, create a new one. manager .enqueue_task(&EnqueueTaskParams { chain_id, @@ -84,45 +81,65 @@ async fn proof_handler( prover_state.chain_specs, ))?; - return Ok(Json(serde_json::json!( - { - "status": "ok", - "data": { - "status": TaskStatus::Registered, + return Ok(status_into_response(TaskStatus::Registered)); + }; + + match latest_status { + // If task has been cancelled add it to the queue again + TaskStatus::Cancelled + | TaskStatus::Cancelled_Aborted + | TaskStatus::Cancelled_NeverStarted + | TaskStatus::CancellationInProgress => { + manager + .enqueue_task(&EnqueueTaskParams { + chain_id, + blockhash: block_hash, + proof_type: proof_request.proof_type, + prover: proof_request.prover.to_string(), + block_number: proof_request.block_number, + }) + .await?; + + prover_state.task_channel.try_send(( + proof_request.clone(), + prover_state.opts, + prover_state.chain_specs, + ))?; + + Ok(status_into_response(TaskStatus::Registered)) + } + // If the task has succeeded, return the proof. + TaskStatus::Success => { + let proof = manager + .get_task_proof( + chain_id, + block_hash, + proof_request.proof_type, + Some(proof_request.prover.to_string()), + ) + .await?; + + Ok(Json( + ProofResponse { + proof: Some(String::from_utf8(proof).unwrap()), + output: None, + quote: None, } - } - ))); - } - - let status = status.last().unwrap().0; - - if matches!(status, TaskStatus::Success) { - let proof = manager - .get_task_proof( - chain_id, - block_hash, - proof_request.proof_type, - Some(proof_request.prover.to_string()), - ) - .await?; - - let response = ProofResponse { - proof: Some(String::from_utf8(proof).unwrap()), - output: None, - quote: None, - }; - - return Ok(Json(response.to_response())); + .to_response(), + )) + } + // For all other statuses just return the status. + _ => Ok(status_into_response(*latest_status)), } +} - Ok(Json(serde_json::json!( - { - "status": "ok", - "data": { - "status": status, - } +pub fn status_into_response(status: TaskStatus) -> Json { + Json(serde_json::json!({ + "status": "ok", + "data": { + "status": status, } - ))) + })) } #[derive(OpenApi)]