Skip to content

Commit

Permalink
derived_data_service: make enqueuing items into the derivation queue …
Browse files Browse the repository at this point in the history
…resilient against races

Summary:
When enqueuing an item into the derivation queue, the item being inserted might represent a batch of changesets that share some changesets with another batch that's simultaneously being inserted e.g.:
```
       a   d
        \ /
         b
         |
         c
First batch: [a, b, c]
Second batch: [d, b, c]
```
In this case one of the two batches will lose the race and will fail to get enqueued in the derivation queue. If the second batch failed to get enqueued while the first batch was still in the queue, then the logic in `build_underived_batched_graph` will handle this correctly by creating a new batch that includes only changesets that are in the second batch but not in the first and trying to enqueue it.

There's however another case that's not currently handled correctly which is that the first batch was processed and was partially removed from the queue before the attempt to enqueue the second batch. In this case the enqueue attempt will fail but we will not find the first batch in the queue so this will count as a failure and we will retry enqueuing the same batch and potentially hit the same failure again. Instead let's check if the batch was partially derived and in that case try to enqueue only the part of the batch that was not derived and not count this case as a failure.

Reviewed By: singhsrb

Differential Revision: D64899662

fbshipit-source-id: c45188061de0ca24397eade4fc790154ceef4748
  • Loading branch information
YousefSalama authored and facebook-github-bot committed Oct 24, 2024
1 parent 94198eb commit df16677
Showing 1 changed file with 44 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ pub async fn build_underived_batched_graph<'a>(
let item = DerivationDagItem::new(
repo_id,
config_name.to_string(),
derived_data_type.clone(),
root_cs_id.clone(),
derived_data_type,
root_cs_id,
head_cs_id,
bubble_id,
deps.collect(),
Expand All @@ -125,9 +125,9 @@ pub async fn build_underived_batched_graph<'a>(

let max_failed_attemps = justknobs::get_as::<u64>("scm/mononoke:build_underived_batched_graph_max_failed_attempts", None)?;

let mut cur_item = Some(item.clone());
// Upstream batch will depend on this cs
let mut upstream_dep = item.id().clone();
let mut cur_item = Some(item);
let mut failed_attempt = 0;
let mut err_msg = None;
while let Some(item) = cur_item {
Expand Down Expand Up @@ -171,23 +171,47 @@ pub async fn build_underived_batched_graph<'a>(
}
}
Err(e) => {
let is_derived =
ddm.is_derived(ctx, item.head_cs_id(), None, derived_data_type).await?;
if is_derived {
let err_msg_str = format!("Failed to enqueue with error: {}, but the data was derived", e);
debug!(ctx.logger(), "{}", err_msg_str);
err_msg = Some(err_msg_str);
// derived, update ready watch and return no dependency
*watch.lock() =
Some(EnqueueResponse::new(Box::new(future::ok(true))));
None
} else {
// return same item for enqueue and incremente failures count
failed_attempt += 1;
let err_msg_str = format!("Failed to enqueue into DAG: {}", e);
error!(ctx.logger(), "{}", err_msg_str);
err_msg = Some(err_msg_str);
Some(item)
let derived_ancestors = commit_graph.ancestors_frontier_with(ctx, vec![item.head_cs_id()], |cs_id| async move {
Ok(ddm.is_derived(ctx, cs_id, None, derived_data_type).await?)
}).await?;

let mut underived_batch = commit_graph.ancestors_difference(ctx, vec![item.head_cs_id()], derived_ancestors).await?;
match underived_batch.pop() {
// All changesets in the batch were derived
None => {
let err_msg_str = format!("Failed to enqueue with error: {}, but the data was derived", e);
debug!(ctx.logger(), "{}", err_msg_str);
err_msg = Some(err_msg_str);
// derived, update ready watch and return no dependency
*watch.lock() =
Some(EnqueueResponse::new(Box::new(future::ok(true))));
None
}
// None of the changesets in the batch were derived, but enqueuing failed
Some(root_cs_id) if root_cs_id == item.root_cs_id() => {
// return same item for enqueue and increment failures count
failed_attempt += 1;
let err_msg_str = format!("Failed to enqueue into DAG: {}", e);
error!(ctx.logger(), "{}", err_msg_str);
err_msg = Some(err_msg_str);
Some(item)
}
// Some of the changesets in the batch were derived
Some(root_cs_id) => {
// Create a new item with only the underived changesets
Some(
DerivationDagItem::new(
item.repo_id(),
item.config_name().to_string(),
item.derived_data_type(),
root_cs_id,
item.head_cs_id(),
item.bubble_id(),
vec![],
item.client_info(),
)?
)
}
}
}
}
Expand Down

0 comments on commit df16677

Please sign in to comment.