Skip to content

Commit

Permalink
feat(raiko): retry task if previous running failed. (#408)
Browse files Browse the repository at this point in the history
* feat(raiko): retry task if error

Signed-off-by: smtmfft <smtm@taiko.xyz>

* fix in-memory related tests

Signed-off-by: smtmfft <smtm@taiko.xyz>

* fix lint

Signed-off-by: smtmfft <smtm@taiko.xyz>

* fix CI test

Signed-off-by: smtmfft <smtm@taiko.xyz>

---------

Signed-off-by: smtmfft <smtm@taiko.xyz>
  • Loading branch information
smtmfft authored Nov 22, 2024
1 parent 987241b commit 3432737
Show file tree
Hide file tree
Showing 9 changed files with 266 additions and 177 deletions.
36 changes: 18 additions & 18 deletions host/src/interfaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ impl From<HostError> for TaskStatus {
| HostError::JoinHandle(_)
| HostError::InvalidAddress(_)
| HostError::InvalidRequestConfig(_) => unreachable!(),
HostError::Conversion(e) => TaskStatus::NonDbFailure(e),
HostError::Serde(e) => TaskStatus::NonDbFailure(e.to_string()),
HostError::Core(e) => TaskStatus::NonDbFailure(e.to_string()),
HostError::Anyhow(e) => TaskStatus::NonDbFailure(e.to_string()),
HostError::FeatureNotSupportedError(e) => TaskStatus::NonDbFailure(e.to_string()),
HostError::Io(e) => TaskStatus::NonDbFailure(e.to_string()),
HostError::RPC(_) => TaskStatus::NetworkFailure,
HostError::Guest(_) => TaskStatus::ProofFailure_Generic,
HostError::TaskManager(_) => TaskStatus::SqlDbCorruption,
HostError::Conversion(e) => TaskStatus::IoFailure(e),
HostError::Serde(e) => TaskStatus::IoFailure(e.to_string()),
HostError::Core(e) => TaskStatus::GuestProverFailure(e.to_string()),
HostError::Anyhow(e) => TaskStatus::AnyhowError(e.to_string()),
HostError::FeatureNotSupportedError(_) => TaskStatus::InvalidOrUnsupportedBlock,
HostError::Io(e) => TaskStatus::IoFailure(e.to_string()),
HostError::RPC(e) => TaskStatus::NetworkFailure(e.to_string()),
HostError::Guest(e) => TaskStatus::GuestProverFailure(e.to_string()),
HostError::TaskManager(e) => TaskStatus::TaskDbCorruption(e.to_string()),
}
}
}
Expand All @@ -142,15 +142,15 @@ impl From<&HostError> for TaskStatus {
| HostError::JoinHandle(_)
| HostError::InvalidAddress(_)
| HostError::InvalidRequestConfig(_) => unreachable!(),
HostError::Conversion(e) => TaskStatus::NonDbFailure(e.to_owned()),
HostError::Serde(e) => TaskStatus::NonDbFailure(e.to_string()),
HostError::Core(e) => TaskStatus::NonDbFailure(e.to_string()),
HostError::Anyhow(e) => TaskStatus::NonDbFailure(e.to_string()),
HostError::FeatureNotSupportedError(e) => TaskStatus::NonDbFailure(e.to_string()),
HostError::Io(e) => TaskStatus::NonDbFailure(e.to_string()),
HostError::RPC(_) => TaskStatus::NetworkFailure,
HostError::Guest(_) => TaskStatus::ProofFailure_Generic,
HostError::TaskManager(_) => TaskStatus::SqlDbCorruption,
HostError::Conversion(e) => TaskStatus::GuestProverFailure(e.to_owned()),
HostError::Serde(e) => TaskStatus::GuestProverFailure(e.to_string()),
HostError::Core(e) => TaskStatus::GuestProverFailure(e.to_string()),
HostError::Anyhow(e) => TaskStatus::AnyhowError(e.to_string()),
HostError::FeatureNotSupportedError(e) => TaskStatus::GuestProverFailure(e.to_string()),
HostError::Io(e) => TaskStatus::GuestProverFailure(e.to_string()),
HostError::RPC(e) => TaskStatus::NetworkFailure(e.to_string()),
HostError::Guest(e) => TaskStatus::GuestProverFailure(e.to_string()),
HostError::TaskManager(e) => TaskStatus::TaskDbCorruption(e.to_string()),
}
}
}
62 changes: 33 additions & 29 deletions host/src/server/api/v2/proof/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,42 +64,46 @@ async fn proof_handler(

let mut manager = prover_state.task_manager();
let status = manager.get_task_proving_status(&key).await?;

let Some((latest_status, ..)) = status.0.last() else {
// If there are no tasks with provided config, create a new one.
manager.enqueue_task(&key).await?;

prover_state
.task_channel
.try_send(Message::from(&proof_request))?;

return Ok(TaskStatus::Registered.into());
};

match latest_status {
// If task has been cancelled add it to the queue again
TaskStatus::Cancelled
| TaskStatus::Cancelled_Aborted
| TaskStatus::Cancelled_NeverStarted
| TaskStatus::CancellationInProgress => {
manager
.update_task_progress(key, TaskStatus::Registered, None)
.await?;
match status.0.last() {
Some((latest_status, ..)) => {
match latest_status {
// If task has been cancelled
TaskStatus::Cancelled
| TaskStatus::Cancelled_Aborted
| TaskStatus::Cancelled_NeverStarted
| TaskStatus::CancellationInProgress
// or if the task is failed, add it to the queue again
| TaskStatus::GuestProverFailure(_)
| TaskStatus::UnspecifiedFailureReason => {
manager
.update_task_progress(key, TaskStatus::Registered, None)
.await?;

prover_state
.task_channel
.try_send(Message::from(&proof_request))?;

Ok(TaskStatus::Registered.into())
}
// If the task has succeeded, return the proof.
TaskStatus::Success => {
let proof = manager.get_task_proof(&key).await?;

Ok(proof.into())
}
// For all other statuses just return the status.
status => Ok(status.clone().into()),
}
}
None => {
manager.enqueue_task(&key).await?;

prover_state
.task_channel
.try_send(Message::from(&proof_request))?;

Ok(TaskStatus::Registered.into())
}
// If the task has succeeded, return the proof.
TaskStatus::Success => {
let proof = manager.get_task_proof(&key).await?;

Ok(proof.into())
}
// For all other statuses just return the status.
status => Ok(status.clone().into()),
}
}

Expand Down
124 changes: 64 additions & 60 deletions host/src/server/api/v3/proof/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,39 +84,42 @@ async fn proof_handler(
for (key, req) in tasks.iter() {
let status = manager.get_task_proving_status(key).await?;

let Some((latest_status, ..)) = status.0.last() else {
if let Some((latest_status, ..)) = status.0.last() {
match latest_status {
// If task has been cancelled
TaskStatus::Cancelled
| TaskStatus::Cancelled_Aborted
| TaskStatus::Cancelled_NeverStarted
| TaskStatus::CancellationInProgress
// or if the task is failed, add it to the queue again
| TaskStatus::GuestProverFailure(_)
| TaskStatus::UnspecifiedFailureReason
=> {
manager
.update_task_progress(key.clone(), TaskStatus::Registered, None)
.await?;
prover_state.task_channel.try_send(Message::from(req))?;

is_registered = true;
is_success = false;
}
// If the task has succeeded, return the proof.
TaskStatus::Success => {}
// For all other statuses just return the status.
status => {
statuses.push(status.clone());
is_registered = false;
is_success = false;
}
}
} else {
// If there are no tasks with provided config, create a new one.
manager.enqueue_task(key).await?;

prover_state.task_channel.try_send(Message::from(req))?;
is_registered = true;
continue;
};

match latest_status {
// If task has been cancelled add it to the queue again
TaskStatus::Cancelled
| TaskStatus::Cancelled_Aborted
| TaskStatus::Cancelled_NeverStarted
| TaskStatus::CancellationInProgress => {
manager
.update_task_progress(key.clone(), TaskStatus::Registered, None)
.await?;

prover_state.task_channel.try_send(Message::from(req))?;

is_registered = true;
is_success = false;
}
// If the task has succeeded, return the proof.
TaskStatus::Success => {}
// For all other statuses just return the status.
status => {
statuses.push(status.clone());
is_registered = false;
is_success = false;
}
}
}

if is_registered {
Expand Down Expand Up @@ -147,7 +150,40 @@ async fn proof_handler(
.get_aggregation_task_proving_status(&aggregation_request)
.await?;

let Some((latest_status, ..)) = status.0.last() else {
if let Some((latest_status, ..)) = status.0.last() {
match latest_status {
// If task has been cancelled add it to the queue again
TaskStatus::Cancelled
| TaskStatus::Cancelled_Aborted
| TaskStatus::Cancelled_NeverStarted
| TaskStatus::CancellationInProgress
// or if the task is failed, add it to the queue again
| TaskStatus::GuestProverFailure(_)
| TaskStatus::UnspecifiedFailureReason
=> {
manager
.update_aggregation_task_progress(
&aggregation_request,
TaskStatus::Registered,
None,
)
.await?;
prover_state
.task_channel
.try_send(Message::from(aggregation_request))?;
Ok(Status::from(TaskStatus::Registered))
}
// If the task has succeeded, return the proof.
TaskStatus::Success => {
let proof = manager
.get_aggregation_task_proof(&aggregation_request)
.await?;
Ok(proof.into())
}
// For all other statuses just return the status.
status => Ok(status.clone().into()),
}
} else {
// If there are no tasks with provided config, create a new one.
manager
.enqueue_aggregation_task(&aggregation_request)
Expand All @@ -156,39 +192,7 @@ async fn proof_handler(
prover_state
.task_channel
.try_send(Message::from(aggregation_request.clone()))?;
return Ok(Status::from(TaskStatus::Registered));
};

match latest_status {
// If task has been cancelled add it to the queue again
TaskStatus::Cancelled
| TaskStatus::Cancelled_Aborted
| TaskStatus::Cancelled_NeverStarted
| TaskStatus::CancellationInProgress => {
manager
.update_aggregation_task_progress(
&aggregation_request,
TaskStatus::Registered,
None,
)
.await?;

prover_state
.task_channel
.try_send(Message::from(aggregation_request))?;

Ok(Status::from(TaskStatus::Registered))
}
// If the task has succeeded, return the proof.
TaskStatus::Success => {
let proof = manager
.get_aggregation_task_proof(&aggregation_request)
.await?;

Ok(proof.into())
}
// For all other statuses just return the status.
status => Ok(status.clone().into()),
Ok(Status::from(TaskStatus::Registered))
}
} else {
let status = statuses.into_iter().collect::<TaskStatus>();
Expand Down
67 changes: 42 additions & 25 deletions provers/risc0/driver/src/bonsai.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ pub async fn maybe_prove<I: Serialize, O: Eq + Debug + Serialize + DeserializeOw
assumptions: (Vec<impl Into<AssumptionReceipt>>, Vec<String>),
proof_key: ProofKey,
id_store: &mut Option<&mut dyn IdWrite>,
) -> Option<(String, Receipt)> {
) -> ProverResult<(String, Receipt)> {
let (assumption_instances, assumption_uuids) = assumptions;

let encoded_output =
Expand All @@ -152,38 +152,52 @@ pub async fn maybe_prove<I: Serialize, O: Eq + Debug + Serialize + DeserializeOw
Ok(_) => {}
Err(e) => {
error!("Failed to scale up bonsai: {e:?}");
return None;
return Err(ProverError::GuestError(
"Failed to scale up bonsai".to_string(),
));
}
}
// query bonsai service until it works
loop {
match prove_bonsai(
macro_rules! retry_with_backoff {
($max_retries:expr, $retry_delay:expr, $operation:expr, $err_transform:expr) => {{
let mut attempt = 0;
loop {
match $operation {
Ok(result) => break Ok(result),
Err(e) => {
if attempt >= $max_retries {
error!("Max retries ({}) reached, aborting...", $max_retries);
break Err($err_transform(e));
}
warn!(
"Operation failed (attempt {}/{}): {:?}",
attempt + 1,
$max_retries,
e
);
tokio_async_sleep(Duration::from_secs($retry_delay)).await;
attempt += 1;
}
}
}
}};
}

let (uuid, receipt) = retry_with_backoff!(
MAX_REQUEST_RETRY,
20,
prove_bonsai(
encoded_input.clone(),
elf,
expected_output,
assumption_uuids.clone(),
proof_key,
id_store,
)
.await
{
Ok((receipt_uuid, receipt)) => {
break (receipt_uuid, receipt, false);
}
Err(BonsaiExecutionError::SdkFailure(err)) => {
warn!("Bonsai SDK fail: {err:?}, keep tracking...");
tokio_async_sleep(Duration::from_secs(15)).await;
}
Err(BonsaiExecutionError::Other(err)) => {
warn!("Something wrong: {err:?}, keep tracking...");
tokio_async_sleep(Duration::from_secs(15)).await;
}
Err(BonsaiExecutionError::Fatal(err)) => {
error!("Fatal error on Bonsai: {err:?}");
return None;
}
}
}
.await,
|e| ProverError::GuestError(format!("Bonsai SDK call fail: {e:?}").to_string())
)?;
(uuid, receipt, false)
} else {
// run prover
info!("start running local prover");
Expand All @@ -197,7 +211,9 @@ pub async fn maybe_prove<I: Serialize, O: Eq + Debug + Serialize + DeserializeOw
Ok(receipt) => (Default::default(), receipt, false),
Err(e) => {
warn!("Failed to prove locally: {e:?}");
return None;
return Err(ProverError::GuestError(
"Failed to prove locally".to_string(),
));
}
}
};
Expand All @@ -211,6 +227,7 @@ pub async fn maybe_prove<I: Serialize, O: Eq + Debug + Serialize + DeserializeOw
info!("Prover succeeded");
} else {
error!("Output mismatch! Prover: {output_guest:?}, expected: {expected_output:?}");
return Err(ProverError::GuestError("Output mismatch!".to_string()));
}

// upload receipt to bonsai
Expand All @@ -229,7 +246,7 @@ pub async fn maybe_prove<I: Serialize, O: Eq + Debug + Serialize + DeserializeOw
}

// return result
Some(result)
Ok(result)
}

pub async fn upload_receipt(receipt: &Receipt) -> anyhow::Result<String> {
Expand Down
Loading

0 comments on commit 3432737

Please sign in to comment.