Skip to content

Commit

Permalink
Return retriable tasks from get_result{,s_since}
Browse files Browse the repository at this point in the history
Modify the queue's get_result() and get_results_since() primitives to
return tasks that failed on an earlier run and are scheduled to rerun
later.

This keeps all callers the same by making the client's wait() and
wait_all() methods ignore retriable tasks until they are fully done.
However, this adds a new wait_once() to wait for a task until it asks
to retry or finishes, whichever happens first.
  • Loading branch information
jmmv committed Oct 22, 2023
1 parent f0e9729 commit 810c706
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 27 deletions.
76 changes: 63 additions & 13 deletions queue/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,12 @@ pub(crate) async fn get_result(ex: &mut Executor, id: Uuid) -> DbResult<Option<T
#[cfg(feature = "postgres")]
Executor::Postgres(ref mut ex) => {
let query_str = "
SELECT status_code, status_reason
SELECT status_code, status_reason, runs, only_after
FROM tasks
WHERE id = $1 AND status_code != $2
WHERE id = $1 AND (
status_code != $2
OR (status_code = $2 AND runs > 0 AND only_after IS NOT NULL)
)
";
match sqlx::query(query_str)
.bind(id)
Expand All @@ -176,6 +179,9 @@ pub(crate) async fn get_result(ex: &mut Executor, id: Uuid) -> DbResult<Option<T
let code: i16 = row.try_get("status_code").map_err(postgres::map_sqlx_error)?;
let reason: Option<String> =
row.try_get("status_reason").map_err(postgres::map_sqlx_error)?;
let runs: i16 = row.try_get("runs").map_err(postgres::map_sqlx_error)?;
let only_after: Option<OffsetDateTime> =
row.try_get("only_after").map_err(postgres::map_sqlx_error)?;

let code = match i8::try_from(code) {
Ok(code) => code,
Expand All @@ -187,7 +193,7 @@ pub(crate) async fn get_result(ex: &mut Executor, id: Uuid) -> DbResult<Option<T
}
};

let result = status_to_result(id, code, reason)?
let result = status_to_result(id, code, reason, runs, only_after)?
.expect("Must not have queried runnable tasks");
Ok(Some(result))
}
Expand All @@ -198,13 +204,17 @@ pub(crate) async fn get_result(ex: &mut Executor, id: Uuid) -> DbResult<Option<T
#[cfg(any(feature = "sqlite", test))]
Executor::Sqlite(ref mut ex) => {
let query_str = "
SELECT status_code, status_reason
SELECT status_code, status_reason, runs, only_after_sec, only_after_nsec
FROM tasks
WHERE id = ? AND status_code != ?
WHERE id = ? AND (
status_code != ?
OR (status_code = ? AND runs > 0 AND only_after_sec IS NOT NULL)
)
";
match sqlx::query(query_str)
.bind(id)
.bind(TaskStatus::Runnable as i8)
.bind(TaskStatus::Runnable as i8)
.fetch_optional(ex)
.await
.map_err(sqlite::map_sqlx_error)?
Expand All @@ -213,8 +223,24 @@ pub(crate) async fn get_result(ex: &mut Executor, id: Uuid) -> DbResult<Option<T
let code: i8 = row.try_get("status_code").map_err(sqlite::map_sqlx_error)?;
let reason: Option<String> =
row.try_get("status_reason").map_err(sqlite::map_sqlx_error)?;
let runs: i16 = row.try_get("runs").map_err(postgres::map_sqlx_error)?;
let only_after_sec: Option<i64> =
row.try_get("only_after_sec").map_err(sqlite::map_sqlx_error)?;
let only_after_nsec: Option<i64> =
row.try_get("only_after_nsec").map_err(sqlite::map_sqlx_error)?;

let only_after = match (only_after_sec, only_after_nsec) {
(Some(sec), Some(nsec)) => Some(sqlite::build_timestamp(sec, nsec)?),
(None, None) => None,
(_, _) => {
return Err(DbError::DataIntegrityError(format!(
"Inconsistent only_after sec ({:?}) and nsec ({:?}) values",
only_after_sec, only_after_nsec
)));
}
};

let result = status_to_result(id, code, reason)?
let result = status_to_result(id, code, reason, runs, only_after)?
.expect("Must not have queried runnable tasks");
Ok(Some(result))
}
Expand All @@ -239,9 +265,12 @@ pub(crate) async fn get_results_since(
#[cfg(feature = "postgres")]
Executor::Postgres(ref mut ex) => {
let query_str = "
SELECT id, status_code, status_reason
SELECT id, status_code, status_reason, runs, only_after
FROM tasks
WHERE status_code != $1 AND updated >= $2
WHERE (
status_code != $1
OR (status_code = $1 AND runs > 0 AND only_after IS NOT NULL)
) AND updated >= $2
ORDER BY updated ASC
";
let mut rows =
Expand All @@ -252,6 +281,9 @@ pub(crate) async fn get_results_since(
let code: i16 = row.try_get("status_code").map_err(postgres::map_sqlx_error)?;
let reason: Option<String> =
row.try_get("status_reason").map_err(postgres::map_sqlx_error)?;
let runs: i16 = row.try_get("runs").map_err(postgres::map_sqlx_error)?;
let only_after: Option<OffsetDateTime> =
row.try_get("only_after").map_err(postgres::map_sqlx_error)?;

let code = match i8::try_from(code) {
Ok(code) => code,
Expand All @@ -263,7 +295,7 @@ pub(crate) async fn get_results_since(
}
};

let result = status_to_result(id, code, reason)?
let result = status_to_result(id, code, reason, runs, only_after)?
.expect("Must not have queried runnable tasks");
results.push((id, result));
}
Expand All @@ -274,14 +306,16 @@ pub(crate) async fn get_results_since(
let (since_sec, since_nsec) = sqlite::unpack_timestamp(since);

let query_str = "
SELECT id, status_code, status_reason
SELECT id, status_code, status_reason, runs, only_after_sec, only_after_nsec
FROM tasks
WHERE
WHERE (
status_code != ?
AND (updated_sec >= ? OR (updated_sec = ? AND updated_nsec >= ?))
OR (status_code = ? AND runs > 0 AND only_after_sec IS NOT NULL)
) AND (updated_sec >= ? OR (updated_sec = ? AND updated_nsec >= ?))
ORDER BY updated_sec ASC, updated_nsec ASC
";
let mut rows = sqlx::query(query_str)
.bind(TaskStatus::Runnable as i8)
.bind(TaskStatus::Runnable as i8)
.bind(since_sec)
.bind(since_sec)
Expand All @@ -293,8 +327,24 @@ pub(crate) async fn get_results_since(
let code: i8 = row.try_get("status_code").map_err(sqlite::map_sqlx_error)?;
let reason: Option<String> =
row.try_get("status_reason").map_err(sqlite::map_sqlx_error)?;
let runs: i16 = row.try_get("runs").map_err(postgres::map_sqlx_error)?;
let only_after_sec: Option<i64> =
row.try_get("only_after_sec").map_err(sqlite::map_sqlx_error)?;
let only_after_nsec: Option<i64> =
row.try_get("only_after_nsec").map_err(sqlite::map_sqlx_error)?;

let only_after = match (only_after_sec, only_after_nsec) {
(Some(sec), Some(nsec)) => Some(sqlite::build_timestamp(sec, nsec)?),
(None, None) => None,
(_, _) => {
return Err(DbError::DataIntegrityError(format!(
"Inconsistent only_after sec ({:?}) and msec ({:?}) values",
only_after_sec, only_after_nsec
)));
}
};

let result = status_to_result(id, code, reason)?
let result = status_to_result(id, code, reason, runs, only_after)?
.expect("Must not have queried runnable tasks");
results.push((id, result));
}
Expand Down
103 changes: 91 additions & 12 deletions queue/src/db/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,29 @@ pub(super) fn result_to_status(

/// Parses a status `code`/`reason` pair as extracted from the database into a `TaskResult`.
///
/// If the task is still running, there is no result yet.
/// If the task is still running, there is no result yet, unless the task has been deferred after
/// a retry, in which case there will be a result.
///
/// The `id` is used for error reporting reasons only.
pub(super) fn status_to_result(
id: Uuid,
code: i8,
reason: Option<String>,
runs: i16,
only_after: Option<OffsetDateTime>,
) -> DbResult<Option<TaskResult>> {
match code {
x if x == (TaskStatus::Runnable as i8) => Ok(None),
x if x == (TaskStatus::Runnable as i8) => match (runs, only_after) {
(0, _) => Ok(None),
(_, Some(only_after)) => match reason {
Some(reason) => Ok(Some(TaskResult::Retry(only_after, reason))),
None => Err(DbError::DataIntegrityError(format!(
"Task {} is Retry but status_reason is missing",
id
))),
},
(_, None) => Ok(None),
},

x if x == (TaskStatus::Done as i8) => Ok(Some(TaskResult::Done(reason))),

Expand Down Expand Up @@ -123,38 +136,98 @@ mod tests {

#[test]
fn test_status_to_result_runnable_is_none() {
match status_to_result(Uuid::new_v4(), TaskStatus::Runnable as i8, None) {
match status_to_result(Uuid::new_v4(), TaskStatus::Runnable as i8, None, 3, None) {
Ok(None) => (),
r => panic!("Unexpected result: {:?}", r),
}

match status_to_result(
Uuid::new_v4(),
TaskStatus::Runnable as i8,
Some("foo".to_owned()),
0,
None,
) {
Ok(None) => (),
r => panic!("Unexpected result: {:?}", r),
}
}

#[test]
fn test_status_to_result_runnable_in_the_future_is_none() {
let now = datetime!(2023-10-19 15:50:00 UTC);

match status_to_result(Uuid::new_v4(), TaskStatus::Runnable as i8, None, 0, Some(now)) {
Ok(None) => (),
r => panic!("Unexpected result: {:?}", r),
}

match status_to_result(Uuid::new_v4(), TaskStatus::Runnable as i8, Some("foo".to_owned())) {
match status_to_result(
Uuid::new_v4(),
TaskStatus::Runnable as i8,
Some("foo".to_owned()),
0,
Some(now),
) {
Ok(None) => (),
r => panic!("Unexpected result: {:?}", r),
}
}

#[test]
fn test_status_to_result_retry_after_failure() {
let now = datetime!(2023-10-19 15:50:00 UTC);

match status_to_result(Uuid::new_v4(), TaskStatus::Runnable as i8, None, 1, Some(now)) {
Err(DbError::DataIntegrityError(_)) => (),
r => panic!("Unexpected result: {:?}", r),
}

assert_eq!(
Ok(Some(TaskResult::Retry(now, "foo".to_owned()))),
status_to_result(
Uuid::new_v4(),
TaskStatus::Runnable as i8,
Some("foo".to_owned()),
1,
Some(now),
)
);
}

#[test]
fn test_status_to_result_done_may_have_reason() {
assert_eq!(
Ok(Some(TaskResult::Done(None))),
status_to_result(Uuid::new_v4(), TaskStatus::Done as i8, None)
status_to_result(Uuid::new_v4(), TaskStatus::Done as i8, None, 123, None)
);

assert_eq!(
Ok(Some(TaskResult::Done(Some("msg".to_owned())))),
status_to_result(Uuid::new_v4(), TaskStatus::Done as i8, Some("msg".to_owned()))
status_to_result(
Uuid::new_v4(),
TaskStatus::Done as i8,
Some("msg".to_owned()),
0,
None
)
);
}

#[test]
fn test_status_to_result_failed_must_have_reason() {
assert_eq!(
Ok(Some(TaskResult::Failed("msg".to_owned()))),
status_to_result(Uuid::new_v4(), TaskStatus::Failed as i8, Some("msg".to_owned()))
status_to_result(
Uuid::new_v4(),
TaskStatus::Failed as i8,
Some("msg".to_owned()),
0,
None
)
);

match status_to_result(Uuid::new_v4(), TaskStatus::Failed as i8, None) {
match status_to_result(Uuid::new_v4(), TaskStatus::Failed as i8, None, 1, None) {
Err(DbError::DataIntegrityError(_)) => (),
r => panic!("Unexpected result: {:?}", r),
}
Expand All @@ -164,23 +237,29 @@ mod tests {
fn test_status_to_result_abandoned_must_have_reason() {
assert_eq!(
Ok(Some(TaskResult::Abandoned("msg".to_owned()))),
status_to_result(Uuid::new_v4(), TaskStatus::Abandoned as i8, Some("msg".to_owned()))
status_to_result(
Uuid::new_v4(),
TaskStatus::Abandoned as i8,
Some("msg".to_owned()),
1,
None
)
);

match status_to_result(Uuid::new_v4(), TaskStatus::Abandoned as i8, None) {
match status_to_result(Uuid::new_v4(), TaskStatus::Abandoned as i8, None, 0, None) {
Err(DbError::DataIntegrityError(_)) => (),
r => panic!("Unexpected result: {:?}", r),
}
}

#[test]
fn test_status_to_result_unknown_code() {
match status_to_result(Uuid::new_v4(), 123, None) {
match status_to_result(Uuid::new_v4(), 123, None, 0, None) {
Err(DbError::DataIntegrityError(e)) => assert!(e.contains("unknown")),
r => panic!("Unexpected result: {:?}", r),
}

match status_to_result(Uuid::new_v4(), 123, Some("foo".to_owned())) {
match status_to_result(Uuid::new_v4(), 123, Some("foo".to_owned()), 0, None) {
Err(DbError::DataIntegrityError(e)) => assert!(e.contains("unknown")),
r => panic!("Unexpected result: {:?}", r),
}
Expand Down
24 changes: 24 additions & 0 deletions queue/src/driver/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,35 @@ where
}

/// Waits for task `id` until it has completed execution by polling its state every `period`.
///
/// In other words: wait for tasks to be fully done, and in particular, if a task asks to be
/// retried, wait for all necessary retries to happen until the task is done. Use `wait_once`
/// instead to return on the first retry attempt.
pub async fn wait(
&mut self,
db: Arc<dyn Db + Send + Sync>,
id: Uuid,
period: Duration,
) -> DriverResult<TaskResult> {
loop {
match self.poll(&mut db.ex().await?, id).await? {
None | Some(TaskResult::Retry(_, _)) => (),
Some(result) => break Ok(result),
}

self.maybe_notify_worker().await;

tokio::time::sleep(period).await;
}
}

/// Waits for task `id` until it has completed execution or until it has attempted to run but
/// has decided to retry by polling its state every `period`.
pub async fn wait_once(
&mut self,
db: Arc<dyn Db + Send + Sync>,
id: Uuid,
period: Duration,
) -> DriverResult<TaskResult> {
loop {
if let Some(result) = self.poll(&mut db.ex().await?, id).await? {
Expand Down
Loading

0 comments on commit 810c706

Please sign in to comment.