Skip to content

Commit

Permalink
fix(tasks): handle inconsistency in status triplet
Browse files Browse the repository at this point in the history
  • Loading branch information
petarvujovic98 committed Jul 16, 2024
1 parent e7c72b6 commit 18ac56e
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 18 deletions.
18 changes: 12 additions & 6 deletions tasks/src/adv_sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ impl TaskDb {
-- Proofs might also be large, so we isolate them in a dedicated table
CREATE TABLE task_proofs(
task_id INTEGER UNIQUE NOT NULL PRIMARY KEY,
proof BLOB NOT NULL,
proof TEXT,
FOREIGN KEY(task_id) REFERENCES tasks(id)
);
Expand Down Expand Up @@ -562,7 +562,7 @@ impl TaskDb {
":proofsys_id": proof_system as u8,
":prover": prover,
":status_id": status as i32,
":proof": proof
":proof": proof.map(hex::encode)
})?;

Ok(())
Expand All @@ -581,11 +581,12 @@ impl TaskDb {
r#"
SELECT
ts.status_id,
t.prover,
tp.proof,
timestamp
FROM
task_status ts
LEFT JOIN tasks t ON ts.task_id = t.id
LEFT JOIN task_proofs tp ON tp.task_id = t.id
WHERE
t.chain_id = :chain_id
AND t.blockhash = :blockhash
Expand All @@ -605,7 +606,7 @@ impl TaskDb {
|row| {
Ok((
TaskStatus::from(row.get::<_, i32>(0)?),
Some(row.get::<_, String>(1)?),
row.get::<_, Option<String>>(1)?,
row.get::<_, DateTime<Utc>>(2)?,
))
},
Expand Down Expand Up @@ -646,10 +647,15 @@ impl TaskDb {
":proofsys_id": *proof_system as u8,
":prover": prover,
},
|row| row.get(0),
|row| row.get::<_, Option<String>>(0),
)?;

Ok(query)
let Some(proof) = query else {
return Ok(vec![]);
};

hex::decode(proof)
.map_err(|_| TaskManagerError::SqlError("couldn't decode from hex".to_owned()))
}

pub fn get_db_size(&self) -> TaskManagerResult<(usize, Vec<(String, usize)>)> {
Expand Down
16 changes: 9 additions & 7 deletions tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl From<(ChainId, B256, ProofType, String)> for TaskDescriptor {
}
}

/// Task status triplet (status, proof, timestamp).
pub type TaskProvingStatus = (TaskStatus, Option<String>, DateTime<Utc>);

pub type TaskProvingStatusRecords = Vec<TaskProvingStatus>;
Expand All @@ -112,38 +113,39 @@ pub struct TaskReport(pub TaskDescriptor, pub TaskStatus);

#[async_trait::async_trait]
pub trait TaskManager {
/// new a task manager
/// Create a new task manager.
fn new(opts: &TaskManagerOpts) -> Self;

/// enqueue_task
/// Enqueue a new task to the tasks database.
async fn enqueue_task(
&mut self,
request: &TaskDescriptor,
) -> TaskManagerResult<TaskProvingStatusRecords>;

/// Update the task progress
/// Update a specific tasks progress.
async fn update_task_progress(
&mut self,
key: TaskDescriptor,
status: TaskStatus,
proof: Option<&[u8]>,
) -> TaskManagerResult<()>;

/// Returns the latest triplet (submitter or fulfiller, status, last update time)
/// Returns the latest triplet (status, proof - if any, last update time).
async fn get_task_proving_status(
&mut self,
key: &TaskDescriptor,
) -> TaskManagerResult<TaskProvingStatusRecords>;

/// Returns the proof for the given task
/// Returns the proof for the given task.
async fn get_task_proof(&mut self, key: &TaskDescriptor) -> TaskManagerResult<Vec<u8>>;

/// Returns the total and detailed database size
/// Returns the total and detailed database size.
async fn get_db_size(&mut self) -> TaskManagerResult<(usize, Vec<(String, usize)>)>;

/// Prune old tasks
/// Prune old tasks.
async fn prune_db(&mut self) -> TaskManagerResult<()>;

/// List all tasks in the db.
async fn list_all_tasks(&mut self) -> TaskManagerResult<Vec<TaskReport>>;
}

Expand Down
10 changes: 5 additions & 5 deletions tasks/src/mem_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ use std::{
sync::{Arc, Once},
};

use chrono::Utc;
use tokio::sync::Mutex;
use tracing::{debug, info};

use crate::{
ensure, TaskDescriptor, TaskManager, TaskManagerError, TaskManagerOpts, TaskManagerResult,
TaskProvingStatusRecords, TaskReport, TaskStatus,
};

use chrono::Utc;
use tokio::sync::Mutex;
use tracing::{debug, info};

#[derive(Debug)]
pub struct InMemoryTaskManager {
db: Arc<Mutex<InMemoryTaskDb>>,
Expand All @@ -39,7 +39,7 @@ impl InMemoryTaskDb {
}

fn enqueue_task(&mut self, key: &TaskDescriptor) {
let task_status = (TaskStatus::Registered, Some(key.prover.clone()), Utc::now());
let task_status = (TaskStatus::Registered, None, Utc::now());

match self.enqueue_task.get(key) {
Some(task_proving_records) => {
Expand Down

0 comments on commit 18ac56e

Please sign in to comment.