Skip to content

Commit

Permalink
fix: dont override successfull result with claimed (#224)
Browse files Browse the repository at this point in the history
  • Loading branch information
Threated authored Jan 14, 2025
1 parent cb06ca9 commit 47c0266
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 5 deletions.
19 changes: 14 additions & 5 deletions broker/src/task_manager.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::{
borrow::Cow,
ops::Deref,
time::{Duration, SystemTime}, collections::HashMap, sync::Arc, convert::Infallible,
borrow::Cow, collections::{hash_map::Entry, HashMap}, convert::Infallible, ops::Deref, sync::Arc, time::{Duration, SystemTime}
};

use axum::{response::{IntoResponse, sse::Event, Sse}, Json, http::StatusCode};
Expand Down Expand Up @@ -35,7 +33,18 @@ impl<State: MsgState> Task for MsgTaskRequest<State> {
type Result = MsgSigned<MsgTaskResult<State>>;

fn insert_result(&mut self, result: Self::Result) -> bool {
self.results.insert(result.get_from().clone(), result).is_some()
match self.results.entry(result.get_from().clone()) {
// Don't override a successful result. See tests::task_test::test_claim_after_success for more details
Entry::Occupied(prev) if prev.get().msg.status == WorkStatus::Succeeded && result.msg.status == WorkStatus::Claimed => false,
Entry::Occupied(mut prev) => {
prev.insert(result);
true
},
Entry::Vacant(empty) => {
empty.insert(result);
false
},
}
}

fn get_results(&self) -> &HashMap<AppOrProxyId, Self::Result> {
Expand Down Expand Up @@ -179,7 +188,7 @@ impl<T: HasWaitId<MsgId> + Task + Msg> TaskManager<T> {
}
let max_receivers = task.get_to().len();
self.tasks.insert(id.clone(), task);
// Create a large enough buffer that all receivers can at least create one claimed result and a successfull result
// Create a large enough buffer that all receivers can at least create one claimed result and a successful result
// while the receiver channel is not being polled filling up the buffer and causing the channel to lag
let (results_sender, _) = broadcast::channel(1.max(max_receivers) * 2);
self.new_results.insert(id.clone(), results_sender);
Expand Down
15 changes: 15 additions & 0 deletions tests/src/task_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,21 @@ async fn test_task_claiming() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_claim_after_success() -> Result<()> {
// We dont want to update a successful result to claimed which is almost always a http race condition where we select on claiming and answerering a task at the same time.
// Example:
// We might claim a task and have not gotten a response yet so the future is still not completed and might be at some unfair proxy.
// In parallel we are computing the result of that task and finished it so we drop the future thats waiting on the response and imidiatly send the successful result.
// This result might end up arriving before the request that claims the task so when the claiming request arrived we should not override the result.
let id = post_task(()).await?;
put_result(id, (), Some(WorkStatus::Succeeded)).await?;
put_result(id, (), Some(WorkStatus::Claimed)).await?;
let res = tokio::time::timeout(Duration::from_secs(10), poll_result::<()>(id, &BlockingOptions::from_count(1))).await??;
assert_eq!(res.status, WorkStatus::Succeeded);
Ok(())
}

pub async fn post_task<T: Serialize + 'static>(body: T) -> Result<MsgId> {
let id = MsgId::new();
client1().post_task(&TaskRequest {
Expand Down

0 comments on commit 47c0266

Please sign in to comment.