From 810c706a4d953a1aa8c0f7903fcfeec3b1bed540 Mon Sep 17 00:00:00 2001 From: Julio Merino Date: Thu, 19 Oct 2023 16:46:07 -0700 Subject: [PATCH] Return retriable tasks from get_result{,s_since} Modify the queue's get_result() and get_results_since() primitives to return tasks that failed on an earlier run and are scheduled to rerun later. This keeps all callers the same by making the client's wait() and wait_all() methods ignore retriable tasks until they are fully done. However, this adds a new wait_once() to wait for a task until it asks to retry or finishes, whichever happens first. --- queue/src/db/mod.rs | 76 ++++++++++++++++++++++----- queue/src/db/status.rs | 103 ++++++++++++++++++++++++++++++++----- queue/src/driver/client.rs | 24 +++++++++ queue/src/driver/mod.rs | 55 +++++++++++++++++++- 4 files changed, 231 insertions(+), 27 deletions(-) diff --git a/queue/src/db/mod.rs b/queue/src/db/mod.rs index 223ff30..5f3c46f 100644 --- a/queue/src/db/mod.rs +++ b/queue/src/db/mod.rs @@ -161,9 +161,12 @@ pub(crate) async fn get_result(ex: &mut Executor, id: Uuid) -> DbResult { let query_str = " - SELECT status_code, status_reason + SELECT status_code, status_reason, runs, only_after FROM tasks - WHERE id = $1 AND status_code != $2 + WHERE id = $1 AND ( + status_code != $2 + OR (status_code = $2 AND runs > 0 AND only_after IS NOT NULL) + ) "; match sqlx::query(query_str) .bind(id) @@ -176,6 +179,9 @@ pub(crate) async fn get_result(ex: &mut Executor, id: Uuid) -> DbResult = row.try_get("status_reason").map_err(postgres::map_sqlx_error)?; + let runs: i16 = row.try_get("runs").map_err(postgres::map_sqlx_error)?; + let only_after: Option = + row.try_get("only_after").map_err(postgres::map_sqlx_error)?; let code = match i8::try_from(code) { Ok(code) => code, @@ -187,7 +193,7 @@ pub(crate) async fn get_result(ex: &mut Executor, id: Uuid) -> DbResult DbResult { let query_str = " - SELECT status_code, status_reason + SELECT status_code, status_reason, runs, only_after_sec, only_after_nsec FROM tasks - WHERE id = ? AND status_code != ? + WHERE id = ? AND ( + status_code != ? + OR (status_code = ? AND runs > 0 AND only_after_sec IS NOT NULL) + ) "; match sqlx::query(query_str) .bind(id) .bind(TaskStatus::Runnable as i8) + .bind(TaskStatus::Runnable as i8) .fetch_optional(ex) .await .map_err(sqlite::map_sqlx_error)? @@ -213,8 +223,24 @@ pub(crate) async fn get_result(ex: &mut Executor, id: Uuid) -> DbResult = row.try_get("status_reason").map_err(sqlite::map_sqlx_error)?; + let runs: i16 = row.try_get("runs").map_err(postgres::map_sqlx_error)?; + let only_after_sec: Option = + row.try_get("only_after_sec").map_err(sqlite::map_sqlx_error)?; + let only_after_nsec: Option = + row.try_get("only_after_nsec").map_err(sqlite::map_sqlx_error)?; + + let only_after = match (only_after_sec, only_after_nsec) { + (Some(sec), Some(nsec)) => Some(sqlite::build_timestamp(sec, nsec)?), + (None, None) => None, + (_, _) => { + return Err(DbError::DataIntegrityError(format!( + "Inconsistent only_after sec ({:?}) and nsec ({:?}) values", + only_after_sec, only_after_nsec + ))); + } + }; - let result = status_to_result(id, code, reason)? + let result = status_to_result(id, code, reason, runs, only_after)? .expect("Must not have queried runnable tasks"); Ok(Some(result)) } @@ -239,9 +265,12 @@ pub(crate) async fn get_results_since( #[cfg(feature = "postgres")] Executor::Postgres(ref mut ex) => { let query_str = " - SELECT id, status_code, status_reason + SELECT id, status_code, status_reason, runs, only_after FROM tasks - WHERE status_code != $1 AND updated >= $2 + WHERE ( + status_code != $1 + OR (status_code = $1 AND runs > 0 AND only_after IS NOT NULL) + ) AND updated >= $2 ORDER BY updated ASC "; let mut rows = @@ -252,6 +281,9 @@ pub(crate) async fn get_results_since( let code: i16 = row.try_get("status_code").map_err(postgres::map_sqlx_error)?; let reason: Option = row.try_get("status_reason").map_err(postgres::map_sqlx_error)?; + let runs: i16 = row.try_get("runs").map_err(postgres::map_sqlx_error)?; + let only_after: Option = + row.try_get("only_after").map_err(postgres::map_sqlx_error)?; let code = match i8::try_from(code) { Ok(code) => code, @@ -263,7 +295,7 @@ pub(crate) async fn get_results_since( } }; - let result = status_to_result(id, code, reason)? + let result = status_to_result(id, code, reason, runs, only_after)? .expect("Must not have queried runnable tasks"); results.push((id, result)); } @@ -274,14 +306,16 @@ pub(crate) async fn get_results_since( let (since_sec, since_nsec) = sqlite::unpack_timestamp(since); let query_str = " - SELECT id, status_code, status_reason + SELECT id, status_code, status_reason, runs, only_after_sec, only_after_nsec FROM tasks - WHERE + WHERE ( status_code != ? - AND (updated_sec >= ? OR (updated_sec = ? AND updated_nsec >= ?)) + OR (status_code = ? AND runs > 0 AND only_after_sec IS NOT NULL) + ) AND (updated_sec >= ? OR (updated_sec = ? AND updated_nsec >= ?)) ORDER BY updated_sec ASC, updated_nsec ASC "; let mut rows = sqlx::query(query_str) + .bind(TaskStatus::Runnable as i8) .bind(TaskStatus::Runnable as i8) .bind(since_sec) .bind(since_sec) @@ -293,8 +327,24 @@ pub(crate) async fn get_results_since( let code: i8 = row.try_get("status_code").map_err(sqlite::map_sqlx_error)?; let reason: Option = row.try_get("status_reason").map_err(sqlite::map_sqlx_error)?; + let runs: i16 = row.try_get("runs").map_err(postgres::map_sqlx_error)?; + let only_after_sec: Option = + row.try_get("only_after_sec").map_err(sqlite::map_sqlx_error)?; + let only_after_nsec: Option = + row.try_get("only_after_nsec").map_err(sqlite::map_sqlx_error)?; + + let only_after = match (only_after_sec, only_after_nsec) { + (Some(sec), Some(nsec)) => Some(sqlite::build_timestamp(sec, nsec)?), + (None, None) => None, + (_, _) => { + return Err(DbError::DataIntegrityError(format!( + "Inconsistent only_after sec ({:?}) and msec ({:?}) values", + only_after_sec, only_after_nsec + ))); + } + }; - let result = status_to_result(id, code, reason)? + let result = status_to_result(id, code, reason, runs, only_after)? .expect("Must not have queried runnable tasks"); results.push((id, result)); } diff --git a/queue/src/db/status.rs b/queue/src/db/status.rs index e854662..3ab2946 100644 --- a/queue/src/db/status.rs +++ b/queue/src/db/status.rs @@ -58,16 +58,29 @@ pub(super) fn result_to_status( /// Parses a status `code`/`reason` pair as extracted from the database into a `TaskResult`. /// -/// If the task is still running, there is no result yet. +/// If the task is still running, there is no result yet, unless the task has been deferred after +/// a retry, in which case there will be a result. /// /// The `id` is used for error reporting reasons only. pub(super) fn status_to_result( id: Uuid, code: i8, reason: Option, + runs: i16, + only_after: Option, ) -> DbResult> { match code { - x if x == (TaskStatus::Runnable as i8) => Ok(None), + x if x == (TaskStatus::Runnable as i8) => match (runs, only_after) { + (0, _) => Ok(None), + (_, Some(only_after)) => match reason { + Some(reason) => Ok(Some(TaskResult::Retry(only_after, reason))), + None => Err(DbError::DataIntegrityError(format!( + "Task {} is Retry but status_reason is missing", + id + ))), + }, + (_, None) => Ok(None), + }, x if x == (TaskStatus::Done as i8) => Ok(Some(TaskResult::Done(reason))), @@ -123,27 +136,81 @@ mod tests { #[test] fn test_status_to_result_runnable_is_none() { - match status_to_result(Uuid::new_v4(), TaskStatus::Runnable as i8, None) { + match status_to_result(Uuid::new_v4(), TaskStatus::Runnable as i8, None, 3, None) { + Ok(None) => (), + r => panic!("Unexpected result: {:?}", r), + } + + match status_to_result( + Uuid::new_v4(), + TaskStatus::Runnable as i8, + Some("foo".to_owned()), + 0, + None, + ) { + Ok(None) => (), + r => panic!("Unexpected result: {:?}", r), + } + } + + #[test] + fn test_status_to_result_runnable_in_the_future_is_none() { + let now = datetime!(2023-10-19 15:50:00 UTC); + + match status_to_result(Uuid::new_v4(), TaskStatus::Runnable as i8, None, 0, Some(now)) { Ok(None) => (), r => panic!("Unexpected result: {:?}", r), } - match status_to_result(Uuid::new_v4(), TaskStatus::Runnable as i8, Some("foo".to_owned())) { + match status_to_result( + Uuid::new_v4(), + TaskStatus::Runnable as i8, + Some("foo".to_owned()), + 0, + Some(now), + ) { Ok(None) => (), r => panic!("Unexpected result: {:?}", r), } } + #[test] + fn test_status_to_result_retry_after_failure() { + let now = datetime!(2023-10-19 15:50:00 UTC); + + match status_to_result(Uuid::new_v4(), TaskStatus::Runnable as i8, None, 1, Some(now)) { + Err(DbError::DataIntegrityError(_)) => (), + r => panic!("Unexpected result: {:?}", r), + } + + assert_eq!( + Ok(Some(TaskResult::Retry(now, "foo".to_owned()))), + status_to_result( + Uuid::new_v4(), + TaskStatus::Runnable as i8, + Some("foo".to_owned()), + 1, + Some(now), + ) + ); + } + #[test] fn test_status_to_result_done_may_have_reason() { assert_eq!( Ok(Some(TaskResult::Done(None))), - status_to_result(Uuid::new_v4(), TaskStatus::Done as i8, None) + status_to_result(Uuid::new_v4(), TaskStatus::Done as i8, None, 123, None) ); assert_eq!( Ok(Some(TaskResult::Done(Some("msg".to_owned())))), - status_to_result(Uuid::new_v4(), TaskStatus::Done as i8, Some("msg".to_owned())) + status_to_result( + Uuid::new_v4(), + TaskStatus::Done as i8, + Some("msg".to_owned()), + 0, + None + ) ); } @@ -151,10 +218,16 @@ mod tests { fn test_status_to_result_failed_must_have_reason() { assert_eq!( Ok(Some(TaskResult::Failed("msg".to_owned()))), - status_to_result(Uuid::new_v4(), TaskStatus::Failed as i8, Some("msg".to_owned())) + status_to_result( + Uuid::new_v4(), + TaskStatus::Failed as i8, + Some("msg".to_owned()), + 0, + None + ) ); - match status_to_result(Uuid::new_v4(), TaskStatus::Failed as i8, None) { + match status_to_result(Uuid::new_v4(), TaskStatus::Failed as i8, None, 1, None) { Err(DbError::DataIntegrityError(_)) => (), r => panic!("Unexpected result: {:?}", r), } @@ -164,10 +237,16 @@ mod tests { fn test_status_to_result_abandoned_must_have_reason() { assert_eq!( Ok(Some(TaskResult::Abandoned("msg".to_owned()))), - status_to_result(Uuid::new_v4(), TaskStatus::Abandoned as i8, Some("msg".to_owned())) + status_to_result( + Uuid::new_v4(), + TaskStatus::Abandoned as i8, + Some("msg".to_owned()), + 1, + None + ) ); - match status_to_result(Uuid::new_v4(), TaskStatus::Abandoned as i8, None) { + match status_to_result(Uuid::new_v4(), TaskStatus::Abandoned as i8, None, 0, None) { Err(DbError::DataIntegrityError(_)) => (), r => panic!("Unexpected result: {:?}", r), } @@ -175,12 +254,12 @@ mod tests { #[test] fn test_status_to_result_unknown_code() { - match status_to_result(Uuid::new_v4(), 123, None) { + match status_to_result(Uuid::new_v4(), 123, None, 0, None) { Err(DbError::DataIntegrityError(e)) => assert!(e.contains("unknown")), r => panic!("Unexpected result: {:?}", r), } - match status_to_result(Uuid::new_v4(), 123, Some("foo".to_owned())) { + match status_to_result(Uuid::new_v4(), 123, Some("foo".to_owned()), 0, None) { Err(DbError::DataIntegrityError(e)) => assert!(e.contains("unknown")), r => panic!("Unexpected result: {:?}", r), } diff --git a/queue/src/driver/client.rs b/queue/src/driver/client.rs index 455a235..60d3539 100644 --- a/queue/src/driver/client.rs +++ b/queue/src/driver/client.rs @@ -137,11 +137,35 @@ where } /// Waits for task `id` until it has completed execution by polling its state every `period`. + /// + /// In other words: wait for tasks to be fully done, and in particular, if a task asks to be + /// retried, wait for all necessary retries to happen until the task is done. Use `wait_once` + /// instead to return on the first retry attempt. pub async fn wait( &mut self, db: Arc, id: Uuid, period: Duration, + ) -> DriverResult { + loop { + match self.poll(&mut db.ex().await?, id).await? { + None | Some(TaskResult::Retry(_, _)) => (), + Some(result) => break Ok(result), + } + + self.maybe_notify_worker().await; + + tokio::time::sleep(period).await; + } + } + + /// Waits for task `id` until it has completed execution or until it has attempted to run but + /// has decided to retry by polling its state every `period`. + pub async fn wait_once( + &mut self, + db: Arc, + id: Uuid, + period: Duration, ) -> DriverResult { loop { if let Some(result) = self.poll(&mut db.ex().await?, id).await? { diff --git a/queue/src/driver/mod.rs b/queue/src/driver/mod.rs index 21edfa1..43756a2 100644 --- a/queue/src/driver/mod.rs +++ b/queue/src/driver/mod.rs @@ -234,6 +234,7 @@ mod tests { use crate::driver::testutils::*; use crate::model::TaskResult; use std::time::Duration; + use time::macros::datetime; async fn do_stress_test(num_workers: usize, num_tasks: u16, batch_size: u16) { let opts = WorkerOptions { @@ -518,7 +519,10 @@ mod tests { context.notify_workers(1).await; tokio::time::sleep(Duration::from_millis(1)).await; } - assert_eq!(None, context.client.poll(&mut context.ex().await, id).await.unwrap()); + match context.client.poll(&mut context.ex().await, id).await { + Ok(Some(TaskResult::Retry(_, _))) => (), + e => panic!("{:?}", e), + } context.advance_clock(delay * 2); @@ -556,7 +560,10 @@ mod tests { context.notify_workers(1).await; tokio::time::sleep(Duration::from_millis(1)).await; } - assert_eq!(None, context.client.poll(&mut context.ex().await, id).await.unwrap()); + match context.client.poll(&mut context.ex().await, id).await { + Ok(Some(TaskResult::Retry(_, _))) => (), + e => panic!("{:?}", e), + } context.advance_clock(Duration::from_secs(1)); @@ -601,4 +608,48 @@ mod tests { assert_eq!(4, state.deferred); assert!(!state.done); } + + #[tokio::test] + async fn test_wait_once_returns_retries() { + let opts = WorkerOptions { max_runs: 5, ..Default::default() }; + let mut context = TestContext::setup_one_connected(opts.clone()).await; + + let delay = Duration::from_secs(60); + let task = MockTask { id: 123, defer: Some((2, delay)), ..Default::default() }; + let id = context.client.enqueue(&mut context.ex().await, &task).await.unwrap(); + + // Wait until we know the task has asked to retry the `defer` times we configured. + loop { + { + let state = context.state.lock().await; + assert!(state.len() <= 1); + if let Some(state) = state.get(&123) { + assert!(!state.done); + if state.deferred == task.defer.unwrap().0 { + break; + } + } + } + context.advance_clock(delay); + context.notify_workers(1).await; + tokio::time::sleep(Duration::from_millis(1)).await; + } + + for _ in 0..2 { + let result = + context.client.wait_once(context.db.clone(), id, Duration::from_millis(1)).await; + match result { + Ok(TaskResult::Retry(_, _)) => (), + e => panic!("{:?}", e), + } + } + let result = context.client.wait(context.db.clone(), id, Duration::from_millis(1)).await; + assert_eq!(Ok(TaskResult::Done(None)), result); + + let state = context.state.lock().await; + assert_eq!(1, state.len()); + let state = state.get(&123).unwrap(); + assert_eq!(2, state.deferred); + assert!(state.done); + } }