From 18ac56e8fc38979dfa6a2f35eb1a011224f72f55 Mon Sep 17 00:00:00 2001 From: Petar Vujovic Date: Tue, 16 Jul 2024 11:45:57 +0200 Subject: [PATCH] fix(tasks): handle inconsistency in status triplet --- tasks/src/adv_sqlite.rs | 18 ++++++++++++------ tasks/src/lib.rs | 16 +++++++++------- tasks/src/mem_db.rs | 10 +++++----- 3 files changed, 26 insertions(+), 18 deletions(-) diff --git a/tasks/src/adv_sqlite.rs b/tasks/src/adv_sqlite.rs index e3a19c90b..261f565ac 100644 --- a/tasks/src/adv_sqlite.rs +++ b/tasks/src/adv_sqlite.rs @@ -297,7 +297,7 @@ impl TaskDb { -- Proofs might also be large, so we isolate them in a dedicated table CREATE TABLE task_proofs( task_id INTEGER UNIQUE NOT NULL PRIMARY KEY, - proof BLOB NOT NULL, + proof TEXT, FOREIGN KEY(task_id) REFERENCES tasks(id) ); @@ -562,7 +562,7 @@ impl TaskDb { ":proofsys_id": proof_system as u8, ":prover": prover, ":status_id": status as i32, - ":proof": proof + ":proof": proof.map(hex::encode) })?; Ok(()) @@ -581,11 +581,12 @@ impl TaskDb { r#" SELECT ts.status_id, - t.prover, + tp.proof, timestamp FROM task_status ts LEFT JOIN tasks t ON ts.task_id = t.id + LEFT JOIN task_proofs tp ON tp.task_id = t.id WHERE t.chain_id = :chain_id AND t.blockhash = :blockhash @@ -605,7 +606,7 @@ impl TaskDb { |row| { Ok(( TaskStatus::from(row.get::<_, i32>(0)?), - Some(row.get::<_, String>(1)?), + row.get::<_, Option>(1)?, row.get::<_, DateTime>(2)?, )) }, @@ -646,10 +647,15 @@ impl TaskDb { ":proofsys_id": *proof_system as u8, ":prover": prover, }, - |row| row.get(0), + |row| row.get::<_, Option>(0), )?; - Ok(query) + let Some(proof) = query else { + return Ok(vec![]); + }; + + hex::decode(proof) + .map_err(|_| TaskManagerError::SqlError("couldn't decode from hex".to_owned())) } pub fn get_db_size(&self) -> TaskManagerResult<(usize, Vec<(String, usize)>)> { diff --git a/tasks/src/lib.rs b/tasks/src/lib.rs index 3ce700c67..e65586997 100644 --- a/tasks/src/lib.rs +++ b/tasks/src/lib.rs @@ -97,6 +97,7 @@ impl From<(ChainId, B256, ProofType, String)> for TaskDescriptor { } } +/// Task status triplet (status, proof, timestamp). pub type TaskProvingStatus = (TaskStatus, Option, DateTime); pub type TaskProvingStatusRecords = Vec; @@ -112,16 +113,16 @@ pub struct TaskReport(pub TaskDescriptor, pub TaskStatus); #[async_trait::async_trait] pub trait TaskManager { - /// new a task manager + /// Create a new task manager. fn new(opts: &TaskManagerOpts) -> Self; - /// enqueue_task + /// Enqueue a new task to the tasks database. async fn enqueue_task( &mut self, request: &TaskDescriptor, ) -> TaskManagerResult; - /// Update the task progress + /// Update a specific tasks progress. async fn update_task_progress( &mut self, key: TaskDescriptor, @@ -129,21 +130,22 @@ pub trait TaskManager { proof: Option<&[u8]>, ) -> TaskManagerResult<()>; - /// Returns the latest triplet (submitter or fulfiller, status, last update time) + /// Returns the latest triplet (status, proof - if any, last update time). async fn get_task_proving_status( &mut self, key: &TaskDescriptor, ) -> TaskManagerResult; - /// Returns the proof for the given task + /// Returns the proof for the given task. async fn get_task_proof(&mut self, key: &TaskDescriptor) -> TaskManagerResult>; - /// Returns the total and detailed database size + /// Returns the total and detailed database size. async fn get_db_size(&mut self) -> TaskManagerResult<(usize, Vec<(String, usize)>)>; - /// Prune old tasks + /// Prune old tasks. async fn prune_db(&mut self) -> TaskManagerResult<()>; + /// List all tasks in the db. async fn list_all_tasks(&mut self) -> TaskManagerResult>; } diff --git a/tasks/src/mem_db.rs b/tasks/src/mem_db.rs index 1fb73f94a..5b447c8ee 100644 --- a/tasks/src/mem_db.rs +++ b/tasks/src/mem_db.rs @@ -12,15 +12,15 @@ use std::{ sync::{Arc, Once}, }; +use chrono::Utc; +use tokio::sync::Mutex; +use tracing::{debug, info}; + use crate::{ ensure, TaskDescriptor, TaskManager, TaskManagerError, TaskManagerOpts, TaskManagerResult, TaskProvingStatusRecords, TaskReport, TaskStatus, }; -use chrono::Utc; -use tokio::sync::Mutex; -use tracing::{debug, info}; - #[derive(Debug)] pub struct InMemoryTaskManager { db: Arc>, @@ -39,7 +39,7 @@ impl InMemoryTaskDb { } fn enqueue_task(&mut self, key: &TaskDescriptor) { - let task_status = (TaskStatus::Registered, Some(key.prover.clone()), Utc::now()); + let task_status = (TaskStatus::Registered, None, Utc::now()); match self.enqueue_task.get(key) { Some(task_proving_records) => {