From 3432737265602098d1db546b95541542360d64e2 Mon Sep 17 00:00:00 2001 From: smtmfft <99081233+smtmfft@users.noreply.github.com> Date: Fri, 22 Nov 2024 11:17:21 +0800 Subject: [PATCH 1/3] feat(raiko): retry task if previous running failed. (#408) * feat(raiko): retry task if error Signed-off-by: smtmfft * fix in-memory related tests Signed-off-by: smtmfft * fix lint Signed-off-by: smtmfft * fix CI test Signed-off-by: smtmfft --------- Signed-off-by: smtmfft --- host/src/interfaces.rs | 36 ++++---- host/src/server/api/v2/proof/mod.rs | 62 +++++++------- host/src/server/api/v3/proof/mod.rs | 124 ++++++++++++++-------------- provers/risc0/driver/src/bonsai.rs | 67 +++++++++------ provers/risc0/driver/src/lib.rs | 21 ++--- taskdb/src/lib.rs | 75 ++++++++++++++--- taskdb/src/mem_db.rs | 28 ++++--- taskdb/src/redis_db.rs | 14 +++- taskdb/tests/main.rs | 16 ++-- 9 files changed, 266 insertions(+), 177 deletions(-) diff --git a/host/src/interfaces.rs b/host/src/interfaces.rs index 330446ef4..4800bb71b 100644 --- a/host/src/interfaces.rs +++ b/host/src/interfaces.rs @@ -121,15 +121,15 @@ impl From for TaskStatus { | HostError::JoinHandle(_) | HostError::InvalidAddress(_) | HostError::InvalidRequestConfig(_) => unreachable!(), - HostError::Conversion(e) => TaskStatus::NonDbFailure(e), - HostError::Serde(e) => TaskStatus::NonDbFailure(e.to_string()), - HostError::Core(e) => TaskStatus::NonDbFailure(e.to_string()), - HostError::Anyhow(e) => TaskStatus::NonDbFailure(e.to_string()), - HostError::FeatureNotSupportedError(e) => TaskStatus::NonDbFailure(e.to_string()), - HostError::Io(e) => TaskStatus::NonDbFailure(e.to_string()), - HostError::RPC(_) => TaskStatus::NetworkFailure, - HostError::Guest(_) => TaskStatus::ProofFailure_Generic, - HostError::TaskManager(_) => TaskStatus::SqlDbCorruption, + HostError::Conversion(e) => TaskStatus::IoFailure(e), + HostError::Serde(e) => TaskStatus::IoFailure(e.to_string()), + HostError::Core(e) => TaskStatus::GuestProverFailure(e.to_string()), + HostError::Anyhow(e) => TaskStatus::AnyhowError(e.to_string()), + HostError::FeatureNotSupportedError(_) => TaskStatus::InvalidOrUnsupportedBlock, + HostError::Io(e) => TaskStatus::IoFailure(e.to_string()), + HostError::RPC(e) => TaskStatus::NetworkFailure(e.to_string()), + HostError::Guest(e) => TaskStatus::GuestProverFailure(e.to_string()), + HostError::TaskManager(e) => TaskStatus::TaskDbCorruption(e.to_string()), } } } @@ -142,15 +142,15 @@ impl From<&HostError> for TaskStatus { | HostError::JoinHandle(_) | HostError::InvalidAddress(_) | HostError::InvalidRequestConfig(_) => unreachable!(), - HostError::Conversion(e) => TaskStatus::NonDbFailure(e.to_owned()), - HostError::Serde(e) => TaskStatus::NonDbFailure(e.to_string()), - HostError::Core(e) => TaskStatus::NonDbFailure(e.to_string()), - HostError::Anyhow(e) => TaskStatus::NonDbFailure(e.to_string()), - HostError::FeatureNotSupportedError(e) => TaskStatus::NonDbFailure(e.to_string()), - HostError::Io(e) => TaskStatus::NonDbFailure(e.to_string()), - HostError::RPC(_) => TaskStatus::NetworkFailure, - HostError::Guest(_) => TaskStatus::ProofFailure_Generic, - HostError::TaskManager(_) => TaskStatus::SqlDbCorruption, + HostError::Conversion(e) => TaskStatus::GuestProverFailure(e.to_owned()), + HostError::Serde(e) => TaskStatus::GuestProverFailure(e.to_string()), + HostError::Core(e) => TaskStatus::GuestProverFailure(e.to_string()), + HostError::Anyhow(e) => TaskStatus::AnyhowError(e.to_string()), + HostError::FeatureNotSupportedError(e) => TaskStatus::GuestProverFailure(e.to_string()), + HostError::Io(e) => TaskStatus::GuestProverFailure(e.to_string()), + HostError::RPC(e) => TaskStatus::NetworkFailure(e.to_string()), + HostError::Guest(e) => TaskStatus::GuestProverFailure(e.to_string()), + HostError::TaskManager(e) => TaskStatus::TaskDbCorruption(e.to_string()), } } } diff --git a/host/src/server/api/v2/proof/mod.rs b/host/src/server/api/v2/proof/mod.rs index 1f3473ecc..12472fc56 100644 --- a/host/src/server/api/v2/proof/mod.rs +++ b/host/src/server/api/v2/proof/mod.rs @@ -64,27 +64,39 @@ async fn proof_handler( let mut manager = prover_state.task_manager(); let status = manager.get_task_proving_status(&key).await?; - - let Some((latest_status, ..)) = status.0.last() else { - // If there are no tasks with provided config, create a new one. - manager.enqueue_task(&key).await?; - - prover_state - .task_channel - .try_send(Message::from(&proof_request))?; - - return Ok(TaskStatus::Registered.into()); - }; - - match latest_status { - // If task has been cancelled add it to the queue again - TaskStatus::Cancelled - | TaskStatus::Cancelled_Aborted - | TaskStatus::Cancelled_NeverStarted - | TaskStatus::CancellationInProgress => { - manager - .update_task_progress(key, TaskStatus::Registered, None) - .await?; + match status.0.last() { + Some((latest_status, ..)) => { + match latest_status { + // If task has been cancelled + TaskStatus::Cancelled + | TaskStatus::Cancelled_Aborted + | TaskStatus::Cancelled_NeverStarted + | TaskStatus::CancellationInProgress + // or if the task is failed, add it to the queue again + | TaskStatus::GuestProverFailure(_) + | TaskStatus::UnspecifiedFailureReason => { + manager + .update_task_progress(key, TaskStatus::Registered, None) + .await?; + + prover_state + .task_channel + .try_send(Message::from(&proof_request))?; + + Ok(TaskStatus::Registered.into()) + } + // If the task has succeeded, return the proof. + TaskStatus::Success => { + let proof = manager.get_task_proof(&key).await?; + + Ok(proof.into()) + } + // For all other statuses just return the status. + status => Ok(status.clone().into()), + } + } + None => { + manager.enqueue_task(&key).await?; prover_state .task_channel @@ -92,14 +104,6 @@ async fn proof_handler( Ok(TaskStatus::Registered.into()) } - // If the task has succeeded, return the proof. - TaskStatus::Success => { - let proof = manager.get_task_proof(&key).await?; - - Ok(proof.into()) - } - // For all other statuses just return the status. - status => Ok(status.clone().into()), } } diff --git a/host/src/server/api/v3/proof/mod.rs b/host/src/server/api/v3/proof/mod.rs index 5429f4217..db5b51382 100644 --- a/host/src/server/api/v3/proof/mod.rs +++ b/host/src/server/api/v3/proof/mod.rs @@ -84,7 +84,35 @@ async fn proof_handler( for (key, req) in tasks.iter() { let status = manager.get_task_proving_status(key).await?; - let Some((latest_status, ..)) = status.0.last() else { + if let Some((latest_status, ..)) = status.0.last() { + match latest_status { + // If task has been cancelled + TaskStatus::Cancelled + | TaskStatus::Cancelled_Aborted + | TaskStatus::Cancelled_NeverStarted + | TaskStatus::CancellationInProgress + // or if the task is failed, add it to the queue again + | TaskStatus::GuestProverFailure(_) + | TaskStatus::UnspecifiedFailureReason + => { + manager + .update_task_progress(key.clone(), TaskStatus::Registered, None) + .await?; + prover_state.task_channel.try_send(Message::from(req))?; + + is_registered = true; + is_success = false; + } + // If the task has succeeded, return the proof. + TaskStatus::Success => {} + // For all other statuses just return the status. + status => { + statuses.push(status.clone()); + is_registered = false; + is_success = false; + } + } + } else { // If there are no tasks with provided config, create a new one. manager.enqueue_task(key).await?; @@ -92,31 +120,6 @@ async fn proof_handler( is_registered = true; continue; }; - - match latest_status { - // If task has been cancelled add it to the queue again - TaskStatus::Cancelled - | TaskStatus::Cancelled_Aborted - | TaskStatus::Cancelled_NeverStarted - | TaskStatus::CancellationInProgress => { - manager - .update_task_progress(key.clone(), TaskStatus::Registered, None) - .await?; - - prover_state.task_channel.try_send(Message::from(req))?; - - is_registered = true; - is_success = false; - } - // If the task has succeeded, return the proof. - TaskStatus::Success => {} - // For all other statuses just return the status. - status => { - statuses.push(status.clone()); - is_registered = false; - is_success = false; - } - } } if is_registered { @@ -147,7 +150,40 @@ async fn proof_handler( .get_aggregation_task_proving_status(&aggregation_request) .await?; - let Some((latest_status, ..)) = status.0.last() else { + if let Some((latest_status, ..)) = status.0.last() { + match latest_status { + // If task has been cancelled add it to the queue again + TaskStatus::Cancelled + | TaskStatus::Cancelled_Aborted + | TaskStatus::Cancelled_NeverStarted + | TaskStatus::CancellationInProgress + // or if the task is failed, add it to the queue again + | TaskStatus::GuestProverFailure(_) + | TaskStatus::UnspecifiedFailureReason + => { + manager + .update_aggregation_task_progress( + &aggregation_request, + TaskStatus::Registered, + None, + ) + .await?; + prover_state + .task_channel + .try_send(Message::from(aggregation_request))?; + Ok(Status::from(TaskStatus::Registered)) + } + // If the task has succeeded, return the proof. + TaskStatus::Success => { + let proof = manager + .get_aggregation_task_proof(&aggregation_request) + .await?; + Ok(proof.into()) + } + // For all other statuses just return the status. + status => Ok(status.clone().into()), + } + } else { // If there are no tasks with provided config, create a new one. manager .enqueue_aggregation_task(&aggregation_request) @@ -156,39 +192,7 @@ async fn proof_handler( prover_state .task_channel .try_send(Message::from(aggregation_request.clone()))?; - return Ok(Status::from(TaskStatus::Registered)); - }; - - match latest_status { - // If task has been cancelled add it to the queue again - TaskStatus::Cancelled - | TaskStatus::Cancelled_Aborted - | TaskStatus::Cancelled_NeverStarted - | TaskStatus::CancellationInProgress => { - manager - .update_aggregation_task_progress( - &aggregation_request, - TaskStatus::Registered, - None, - ) - .await?; - - prover_state - .task_channel - .try_send(Message::from(aggregation_request))?; - - Ok(Status::from(TaskStatus::Registered)) - } - // If the task has succeeded, return the proof. - TaskStatus::Success => { - let proof = manager - .get_aggregation_task_proof(&aggregation_request) - .await?; - - Ok(proof.into()) - } - // For all other statuses just return the status. - status => Ok(status.clone().into()), + Ok(Status::from(TaskStatus::Registered)) } } else { let status = statuses.into_iter().collect::(); diff --git a/provers/risc0/driver/src/bonsai.rs b/provers/risc0/driver/src/bonsai.rs index f983d5347..7648566f3 100644 --- a/provers/risc0/driver/src/bonsai.rs +++ b/provers/risc0/driver/src/bonsai.rs @@ -128,7 +128,7 @@ pub async fn maybe_prove>, Vec), proof_key: ProofKey, id_store: &mut Option<&mut dyn IdWrite>, -) -> Option<(String, Receipt)> { +) -> ProverResult<(String, Receipt)> { let (assumption_instances, assumption_uuids) = assumptions; let encoded_output = @@ -152,12 +152,41 @@ pub async fn maybe_prove {} Err(e) => { error!("Failed to scale up bonsai: {e:?}"); - return None; + return Err(ProverError::GuestError( + "Failed to scale up bonsai".to_string(), + )); } } // query bonsai service until it works - loop { - match prove_bonsai( + macro_rules! retry_with_backoff { + ($max_retries:expr, $retry_delay:expr, $operation:expr, $err_transform:expr) => {{ + let mut attempt = 0; + loop { + match $operation { + Ok(result) => break Ok(result), + Err(e) => { + if attempt >= $max_retries { + error!("Max retries ({}) reached, aborting...", $max_retries); + break Err($err_transform(e)); + } + warn!( + "Operation failed (attempt {}/{}): {:?}", + attempt + 1, + $max_retries, + e + ); + tokio_async_sleep(Duration::from_secs($retry_delay)).await; + attempt += 1; + } + } + } + }}; + } + + let (uuid, receipt) = retry_with_backoff!( + MAX_REQUEST_RETRY, + 20, + prove_bonsai( encoded_input.clone(), elf, expected_output, @@ -165,25 +194,10 @@ pub async fn maybe_prove { - break (receipt_uuid, receipt, false); - } - Err(BonsaiExecutionError::SdkFailure(err)) => { - warn!("Bonsai SDK fail: {err:?}, keep tracking..."); - tokio_async_sleep(Duration::from_secs(15)).await; - } - Err(BonsaiExecutionError::Other(err)) => { - warn!("Something wrong: {err:?}, keep tracking..."); - tokio_async_sleep(Duration::from_secs(15)).await; - } - Err(BonsaiExecutionError::Fatal(err)) => { - error!("Fatal error on Bonsai: {err:?}"); - return None; - } - } - } + .await, + |e| ProverError::GuestError(format!("Bonsai SDK call fail: {e:?}").to_string()) + )?; + (uuid, receipt, false) } else { // run prover info!("start running local prover"); @@ -197,7 +211,9 @@ pub async fn maybe_prove (Default::default(), receipt, false), Err(e) => { warn!("Failed to prove locally: {e:?}"); - return None; + return Err(ProverError::GuestError( + "Failed to prove locally".to_string(), + )); } } }; @@ -211,6 +227,7 @@ pub async fn maybe_prove anyhow::Result { diff --git a/provers/risc0/driver/src/lib.rs b/provers/risc0/driver/src/lib.rs index 1c5b1c510..d5b813ccb 100644 --- a/provers/risc0/driver/src/lib.rs +++ b/provers/risc0/driver/src/lib.rs @@ -90,33 +90,28 @@ impl Prover for Risc0Prover { proof_key, &mut id_store, ) - .await; + .await?; - let receipt = result.clone().unwrap().1.clone(); - let uuid = result.clone().unwrap().0; - - let proof_gen_result = if result.is_some() { + let proof_gen_result = { if config.snark && config.bonsai { - let (stark_uuid, stark_receipt) = result.clone().unwrap(); + let (stark_uuid, stark_receipt) = result.clone(); bonsai::bonsai_stark_to_snark(stark_uuid, stark_receipt, output.hash) .await .map(|r0_response| r0_response.into()) .map_err(|e| ProverError::GuestError(e.to_string())) } else { - warn!("proof is not in snark mode, please check."); - let (_, stark_receipt) = result.clone().unwrap(); + if !config.snark { + warn!("proof is not in snark mode, please check."); + } + let (uuid, stark_receipt) = result.clone(); Ok(Risc0Response { proof: stark_receipt.journal.encode_hex_with_prefix(), - receipt: serde_json::to_string(&receipt).unwrap(), + receipt: serde_json::to_string(&stark_receipt).unwrap(), uuid, input: output.hash, } .into()) } - } else { - Err(ProverError::GuestError( - "Failed to generate proof".to_string(), - )) }; #[cfg(feature = "bonsai-auto-scaling")] diff --git a/taskdb/src/lib.rs b/taskdb/src/lib.rs index b83339faf..1e7d517d1 100644 --- a/taskdb/src/lib.rs +++ b/taskdb/src/lib.rs @@ -83,15 +83,17 @@ pub enum TaskStatus { WorkInProgress, ProofFailure_Generic, ProofFailure_OutOfMemory, - NetworkFailure, + NetworkFailure(String), Cancelled, Cancelled_NeverStarted, Cancelled_Aborted, CancellationInProgress, InvalidOrUnsupportedBlock, - NonDbFailure(String), + IoFailure(String), + AnyhowError(String), + GuestProverFailure(String), UnspecifiedFailureReason, - SqlDbCorruption, + TaskDbCorruption(String), } impl From for i32 { @@ -102,15 +104,17 @@ impl From for i32 { TaskStatus::WorkInProgress => 2000, TaskStatus::ProofFailure_Generic => -1000, TaskStatus::ProofFailure_OutOfMemory => -1100, - TaskStatus::NetworkFailure => -2000, + TaskStatus::NetworkFailure(_) => -2000, TaskStatus::Cancelled => -3000, TaskStatus::Cancelled_NeverStarted => -3100, TaskStatus::Cancelled_Aborted => -3200, TaskStatus::CancellationInProgress => -3210, TaskStatus::InvalidOrUnsupportedBlock => -4000, - TaskStatus::NonDbFailure(_) => -5000, - TaskStatus::UnspecifiedFailureReason => -9999, - TaskStatus::SqlDbCorruption => -99999, + TaskStatus::IoFailure(_) => -5000, + TaskStatus::AnyhowError(_) => -6000, + TaskStatus::GuestProverFailure(_) => -7000, + TaskStatus::UnspecifiedFailureReason => -8000, + TaskStatus::TaskDbCorruption(_) => -9000, } } } @@ -123,15 +127,17 @@ impl From for TaskStatus { 2000 => TaskStatus::WorkInProgress, -1000 => TaskStatus::ProofFailure_Generic, -1100 => TaskStatus::ProofFailure_OutOfMemory, - -2000 => TaskStatus::NetworkFailure, + -2000 => TaskStatus::NetworkFailure("".to_string()), -3000 => TaskStatus::Cancelled, -3100 => TaskStatus::Cancelled_NeverStarted, -3200 => TaskStatus::Cancelled_Aborted, -3210 => TaskStatus::CancellationInProgress, -4000 => TaskStatus::InvalidOrUnsupportedBlock, - -5000 => TaskStatus::NonDbFailure("".to_string()), - -9999 => TaskStatus::UnspecifiedFailureReason, - -99999 => TaskStatus::SqlDbCorruption, + -5000 => TaskStatus::IoFailure("".to_string()), + -6000 => TaskStatus::AnyhowError("".to_string()), + -7000 => TaskStatus::GuestProverFailure("".to_string()), + -8000 => TaskStatus::UnspecifiedFailureReason, + -9000 => TaskStatus::TaskDbCorruption("".to_string()), _ => TaskStatus::UnspecifiedFailureReason, } } @@ -466,4 +472,51 @@ mod test { 1 ); } + + #[tokio::test] + async fn test_enqueue_twice() { + let sqlite_file: &Path = Path::new("test.db"); + // remove existed one + if sqlite_file.exists() { + std::fs::remove_file(sqlite_file).unwrap(); + } + + let opts = TaskManagerOpts { + sqlite_file: sqlite_file.to_path_buf(), + max_db_size: 1024 * 1024, + redis_url: "redis://localhost:6379".to_string(), + redis_ttl: 3600, + }; + let mut task_manager = get_task_manager(&opts); + let key = ProofTaskDescriptor { + chain_id: 1, + block_id: 0, + blockhash: B256::default(), + proof_system: ProofType::Native, + prover: "test".to_string(), + }; + + assert_eq!(task_manager.enqueue_task(&key).await.unwrap().0.len(), 1); + // enqueue again + assert_eq!(task_manager.enqueue_task(&key).await.unwrap().0.len(), 1); + + let status = task_manager.get_task_proving_status(&key).await.unwrap(); + assert_eq!(status.0.len(), 1); + + task_manager + .update_task_progress(key.clone(), TaskStatus::InvalidOrUnsupportedBlock, None) + .await + .expect("update task failed"); + let status = task_manager.get_task_proving_status(&key).await.unwrap(); + assert_eq!(status.0.len(), 2); + + task_manager + .update_task_progress(key.clone(), TaskStatus::Registered, None) + .await + .expect("update task failed"); + let status = task_manager.get_task_proving_status(&key).await.unwrap(); + assert_eq!(status.0.len(), 3); + assert_eq!(status.0.first().unwrap().0, TaskStatus::Registered); + assert_eq!(status.0.last().unwrap().0, TaskStatus::Registered); + } } diff --git a/taskdb/src/mem_db.rs b/taskdb/src/mem_db.rs index 5f55a6df4..508c819dc 100644 --- a/taskdb/src/mem_db.rs +++ b/taskdb/src/mem_db.rs @@ -16,7 +16,7 @@ use chrono::Utc; use raiko_core::interfaces::AggregationOnlyRequest; use raiko_lib::prover::{IdStore, IdWrite, ProofKey, ProverError, ProverResult}; use tokio::sync::Mutex; -use tracing::{debug, info}; +use tracing::{info, warn}; use crate::{ ensure, AggregationTaskDescriptor, ProofTaskDescriptor, TaskDescriptor, TaskManager, @@ -45,15 +45,16 @@ impl InMemoryTaskDb { } } - fn enqueue_task(&mut self, key: &ProofTaskDescriptor) { + fn enqueue_task(&mut self, key: &ProofTaskDescriptor) -> TaskManagerResult<()> { let task_status = (TaskStatus::Registered, None, Utc::now()); match self.tasks_queue.get(key) { Some(task_proving_records) => { - debug!( - "Task already exists: {:?}", - task_proving_records.0.last().unwrap().0 - ); + let previous_status = &task_proving_records.0.last().unwrap().0; + warn!("Task already exists: {key:?} with previous statuw {previous_status:?}"); + if previous_status != &TaskStatus::Success { + self.update_task_progress(key.clone(), TaskStatus::Registered, None)?; + } } // do nothing None => { info!("Enqueue new task: {key:?}"); @@ -61,6 +62,8 @@ impl InMemoryTaskDb { .insert(key.clone(), TaskProvingStatusRecords(vec![task_status])); } } + + Ok(()) } fn update_task_progress( @@ -173,10 +176,11 @@ impl InMemoryTaskDb { match self.aggregation_tasks_queue.get(request) { Some(task_proving_records) => { - debug!( - "Task already exists: {:?}", - task_proving_records.0.last().unwrap().0 - ); + let previous_status = &task_proving_records.0.last().unwrap().0; + warn!("Task already exists: {request} with previous status {previous_status:?}"); + if previous_status != &TaskStatus::Success { + self.update_aggregation_task_progress(request, TaskStatus::Registered, None)?; + } } // do nothing None => { info!("Enqueue new task: {request}"); @@ -305,7 +309,7 @@ impl TaskManager for InMemoryTaskManager { return Ok(status); } - db.enqueue_task(params); + db.enqueue_task(params)?; db.get_task_proving_status(params) } @@ -411,7 +415,7 @@ mod tests { proof_system: ProofType::Native, prover: "0x1234".to_owned(), }; - db.enqueue_task(¶ms); + db.enqueue_task(¶ms).expect("enqueue task"); let status = db.get_task_proving_status(¶ms); assert!(status.is_ok()); } diff --git a/taskdb/src/redis_db.rs b/taskdb/src/redis_db.rs index 518d8edeb..506e414d8 100644 --- a/taskdb/src/redis_db.rs +++ b/taskdb/src/redis_db.rs @@ -288,8 +288,12 @@ impl RedisTaskDb { match self.query_proof_task(key) { Ok(Some(task_proving_records)) => { - warn!("Task already exists: {:?}", task_proving_records.0.last()); - Ok(task_proving_records.0.last().unwrap().clone()) + warn!( + "Task status exists: {:?}, register again", + task_proving_records.0.last() + ); + self.insert_proof_task(key, &TaskProvingStatusRecords(vec![task_status.clone()]))?; + Ok(task_status) } // do nothing Ok(None) => { info!("Enqueue new task: {key:?}"); @@ -691,7 +695,11 @@ mod tests { #[test] fn test_db_enqueue() { - let mut db = RedisTaskDb::new("redis://localhost:6379").unwrap(); + let mut db = RedisTaskDb::new(RedisConfig { + url: "redis://localhost:6379".to_owned(), + ttl: 3600, + }) + .unwrap(); let params = ProofTaskDescriptor { chain_id: 1, block_id: 1, diff --git a/taskdb/tests/main.rs b/taskdb/tests/main.rs index 061ee1f41..13134e34b 100644 --- a/taskdb/tests/main.rs +++ b/taskdb/tests/main.rs @@ -302,14 +302,18 @@ mod tests { std::thread::sleep(Duration::from_millis(1)); - tama.update_task_progress(task_3_desc.clone(), TaskStatus::NetworkFailure, None) - .await - .unwrap(); + tama.update_task_progress( + task_3_desc.clone(), + TaskStatus::UnspecifiedFailureReason, + None, + ) + .await + .unwrap(); { let task_status = tama.get_task_proving_status(task_3_desc).await.unwrap().0; assert_eq!(task_status.len(), 3); - assert_eq!(task_status[2].0, TaskStatus::NetworkFailure); + assert_eq!(task_status[2].0, TaskStatus::UnspecifiedFailureReason); assert_eq!(task_status[1].0, TaskStatus::WorkInProgress); assert_eq!(task_status[0].0, TaskStatus::Registered); } @@ -324,7 +328,7 @@ mod tests { let task_status = tama.get_task_proving_status(task_3_desc).await.unwrap().0; assert_eq!(task_status.len(), 4); assert_eq!(task_status[3].0, TaskStatus::WorkInProgress); - assert_eq!(task_status[2].0, TaskStatus::NetworkFailure); + assert_eq!(task_status[2].0, TaskStatus::UnspecifiedFailureReason); assert_eq!(task_status[1].0, TaskStatus::WorkInProgress); assert_eq!(task_status[0].0, TaskStatus::Registered); } @@ -345,7 +349,7 @@ mod tests { assert_eq!(task_status.len(), 5); assert_eq!(task_status[4].0, TaskStatus::Success); assert_eq!(task_status[3].0, TaskStatus::WorkInProgress); - assert_eq!(task_status[2].0, TaskStatus::NetworkFailure); + assert_eq!(task_status[2].0, TaskStatus::UnspecifiedFailureReason); assert_eq!(task_status[1].0, TaskStatus::WorkInProgress); assert_eq!(task_status[0].0, TaskStatus::Registered); } From bd7bf99e488b5712cdb8f7da66f9ae70a4a1fa86 Mon Sep 17 00:00:00 2001 From: Roger <50648015+RogerLamTd@users.noreply.github.com> Date: Thu, 21 Nov 2024 23:01:25 -0800 Subject: [PATCH 2/3] chore(repo): ignore docs changes in workflows (#411) * chore(repo): ignore docs changes in workflows * . * . * chore(repo): remove workflows from merge_group so they don't trigger every time a PR gets merged to main, add merge-gatekeeper --- .github/workflows/ci-all.yml | 1 - .github/workflows/ci-lint.yml | 3 ++- .github/workflows/ci-provers.yml | 3 ++- .github/workflows/ci-risc0.yml | 3 ++- .github/workflows/ci-sgx-all.yml | 3 ++- .github/workflows/ci-sgx-docker.yml | 3 ++- .github/workflows/ci-sp1.yml | 3 ++- .github/workflows/openapi-deploy.yml | 7 ++++--- .github/workflows/repo--merge-gatekeeper.yml | 21 +++++++++++++++++++ ...se-please.yml => repo--release-please.yml} | 0 .../{typo-check.yml => repo--typo-check.yml} | 0 ...-title.yml => repo--validate-pr-title.yml} | 0 12 files changed, 37 insertions(+), 10 deletions(-) create mode 100644 .github/workflows/repo--merge-gatekeeper.yml rename .github/workflows/{release-please.yml => repo--release-please.yml} (100%) rename .github/workflows/{typo-check.yml => repo--typo-check.yml} (100%) rename .github/workflows/{validate-pr-title.yml => repo--validate-pr-title.yml} (100%) diff --git a/.github/workflows/ci-all.yml b/.github/workflows/ci-all.yml index 0a1130276..c52c5bf3b 100644 --- a/.github/workflows/ci-all.yml +++ b/.github/workflows/ci-all.yml @@ -14,7 +14,6 @@ on: - "lib/**" - "script/**" - "!docs/**" - merge_group: env: CARGO_TERM_COLOR: always diff --git a/.github/workflows/ci-lint.yml b/.github/workflows/ci-lint.yml index c42d8363e..8cdaa66a6 100644 --- a/.github/workflows/ci-lint.yml +++ b/.github/workflows/ci-lint.yml @@ -3,7 +3,8 @@ name: CI - Lint on: pull_request: types: [opened, reopened, edited, synchronize] - merge_group: + paths: + - "!docs/**" env: CARGO_TERM_COLOR: always diff --git a/.github/workflows/ci-provers.yml b/.github/workflows/ci-provers.yml index 16dd7cb1c..ca09162aa 100644 --- a/.github/workflows/ci-provers.yml +++ b/.github/workflows/ci-provers.yml @@ -7,11 +7,12 @@ on: paths: - "pipelines/**" - "harness/**" + - "!docs/**" pull_request: paths: - "pipelines/**" - "harness/**" - merge_group: + - "!docs/**" env: CARGO_TERM_COLOR: always diff --git a/.github/workflows/ci-risc0.yml b/.github/workflows/ci-risc0.yml index 3fd4cb08a..792707fa0 100644 --- a/.github/workflows/ci-risc0.yml +++ b/.github/workflows/ci-risc0.yml @@ -6,10 +6,11 @@ on: branches: ["main"] paths: - "provers/risc0/**" + - "!docs/**" pull_request: paths: - "provers/risc0/**" - merge_group: + - "!docs/**" jobs: build-test-risc0: diff --git a/.github/workflows/ci-sgx-all.yml b/.github/workflows/ci-sgx-all.yml index 9a020125c..d6879a8e2 100644 --- a/.github/workflows/ci-sgx-all.yml +++ b/.github/workflows/ci-sgx-all.yml @@ -6,10 +6,11 @@ on: branches: ["main"] paths: - "provers/sgx/**" + - "!docs/**" pull_request: paths: - "provers/sgx/**" - merge_group: + - "!docs/**" jobs: build-test-sgx: diff --git a/.github/workflows/ci-sgx-docker.yml b/.github/workflows/ci-sgx-docker.yml index 15335598f..9050bd3d3 100644 --- a/.github/workflows/ci-sgx-docker.yml +++ b/.github/workflows/ci-sgx-docker.yml @@ -6,10 +6,11 @@ on: branches: ["main"] paths: - "docker/**" + - "!docs/**" pull_request: paths: - "docker/**" - merge_group: + - "!docs/**" jobs: build-test-sgx-with-docker: diff --git a/.github/workflows/ci-sp1.yml b/.github/workflows/ci-sp1.yml index 1e7adbf66..127e8a088 100644 --- a/.github/workflows/ci-sp1.yml +++ b/.github/workflows/ci-sp1.yml @@ -6,10 +6,11 @@ on: branches: ["main"] paths: - "provers/sp1/**" + - "!docs/**" pull_request: paths: - "provers/sp1/**" - merge_group: + - "!docs/**" jobs: build-test-sp1: diff --git a/.github/workflows/openapi-deploy.yml b/.github/workflows/openapi-deploy.yml index 2e01cea6e..e1e57b187 100644 --- a/.github/workflows/openapi-deploy.yml +++ b/.github/workflows/openapi-deploy.yml @@ -3,10 +3,11 @@ name: OpenAPI on: push: branches: ["main"] - paths-ignore: - - "docs/**" + paths: + - "!docs/**" pull_request: - merge_group: + paths: + - "!docs/**" permissions: contents: read diff --git a/.github/workflows/repo--merge-gatekeeper.yml b/.github/workflows/repo--merge-gatekeeper.yml new file mode 100644 index 000000000..795a81aa7 --- /dev/null +++ b/.github/workflows/repo--merge-gatekeeper.yml @@ -0,0 +1,21 @@ +name: Merge Gatekeeper + +on: + pull_request: + branches: + - main + merge_group: # Trigger in merge queue to pass the required status check + +jobs: + merge-gatekeeper: + if: ${{ github.event_name == 'pull_request' && github.event.pull_request.draft == false }} + runs-on: [arc-runner-set] + permissions: + checks: read + statuses: read + steps: + - name: Run Merge Gatekeeper + uses: upsidr/merge-gatekeeper@v1 + with: + timeout: 1200 + token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/release-please.yml b/.github/workflows/repo--release-please.yml similarity index 100% rename from .github/workflows/release-please.yml rename to .github/workflows/repo--release-please.yml diff --git a/.github/workflows/typo-check.yml b/.github/workflows/repo--typo-check.yml similarity index 100% rename from .github/workflows/typo-check.yml rename to .github/workflows/repo--typo-check.yml diff --git a/.github/workflows/validate-pr-title.yml b/.github/workflows/repo--validate-pr-title.yml similarity index 100% rename from .github/workflows/validate-pr-title.yml rename to .github/workflows/repo--validate-pr-title.yml From 6e984840a8038a6654ab1ecbb1caea7c036feb3a Mon Sep 17 00:00:00 2001 From: Kero Date: Fri, 22 Nov 2024 16:43:54 +0800 Subject: [PATCH 3/3] perf(host): release running_tasks lock asap (#417) Co-authored-by: smtmfft <99081233+smtmfft@users.noreply.github.com> --- host/src/proof.rs | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/host/src/proof.rs b/host/src/proof.rs index 1712572a4..70da9574c 100644 --- a/host/src/proof.rs +++ b/host/src/proof.rs @@ -79,10 +79,15 @@ impl ProofActor { } pub async fn cancel_task(&mut self, key: ProofTaskDescriptor) -> HostResult<()> { - let tasks_map = self.running_tasks.lock().await; - let Some(task) = tasks_map.get(&key) else { - warn!("No task with those keys to cancel"); - return Ok(()); + let task = { + let tasks_map = self.running_tasks.lock().await; + match tasks_map.get(&key) { + Some(task) => task.to_owned(), + None => { + warn!("No task with those keys to cancel"); + return Ok(()); + } + } }; let mut manager = get_task_manager(&self.opts.clone().into()); @@ -134,10 +139,12 @@ impl ProofActor { proof_request.prover.clone().to_string(), )); - let mut tasks = self.running_tasks.lock().await; - tasks.insert(key.clone(), cancel_token.clone()); - let sender = self.sender.clone(); + { + let mut tasks = self.running_tasks.lock().await; + tasks.insert(key.clone(), cancel_token.clone()); + } + let sender = self.sender.clone(); let tasks = self.running_tasks.clone(); let opts = self.opts.clone(); let chain_specs = self.chain_specs.clone();