From df1667746a1923b0554cf66a88891a353a3c4efc Mon Sep 17 00:00:00 2001 From: Youssef Ibrahim Date: Thu, 24 Oct 2024 07:18:24 -0700 Subject: [PATCH] derived_data_service: make enqueuing items into the derivation queue resilient against races Summary: When enqueuing an item into the derivation queue, the item being inserted might represent a batch of changesets that share some changesets with another batch that's simultaneously being inserted e.g.: ``` a d \ / b | c First batch: [a, b, c] Second batch: [d, b, c] ``` In this case one of the two batches will lose the race and will fail to get enqueued in the derivation queue. If the second batch failed to get enqueued while the first batch was still in the queue, then the logic in `build_underived_batched_graph` will handle this correctly by creating a new batch that includes only changesets that are in the second batch but not in the first and trying to enqueue it. There's however another case that's not currently handled correctly which is that the first batch was processed and was partially removed from the queue before the attempt to enqueue the second batch. In this case the enqueue attempt will fail but we will not find the first batch in the queue so this will count as a failure and we will retry enqueuing the same batch and potentially hit the same failure again. Instead let's check if the batch was partially derived and in that case try to enqueue only the part of the batch that was not derived and not count this case as a failure. Reviewed By: singhsrb Differential Revision: D64899662 fbshipit-source-id: c45188061de0ca24397eade4fc790154ceef4748 --- .../repo_derivation_queues/src/underived.rs | 64 +++++++++++++------ 1 file changed, 44 insertions(+), 20 deletions(-) diff --git a/eden/mononoke/repo_attributes/repo_derivation_queues/src/underived.rs b/eden/mononoke/repo_attributes/repo_derivation_queues/src/underived.rs index f3437a3a6e111..b176a2a2ebaad 100644 --- a/eden/mononoke/repo_attributes/repo_derivation_queues/src/underived.rs +++ b/eden/mononoke/repo_attributes/repo_derivation_queues/src/underived.rs @@ -115,8 +115,8 @@ pub async fn build_underived_batched_graph<'a>( let item = DerivationDagItem::new( repo_id, config_name.to_string(), - derived_data_type.clone(), - root_cs_id.clone(), + derived_data_type, + root_cs_id, head_cs_id, bubble_id, deps.collect(), @@ -125,9 +125,9 @@ pub async fn build_underived_batched_graph<'a>( let max_failed_attemps = justknobs::get_as::("scm/mononoke:build_underived_batched_graph_max_failed_attempts", None)?; - let mut cur_item = Some(item.clone()); // Upstream batch will depend on this cs let mut upstream_dep = item.id().clone(); + let mut cur_item = Some(item); let mut failed_attempt = 0; let mut err_msg = None; while let Some(item) = cur_item { @@ -171,23 +171,47 @@ pub async fn build_underived_batched_graph<'a>( } } Err(e) => { - let is_derived = - ddm.is_derived(ctx, item.head_cs_id(), None, derived_data_type).await?; - if is_derived { - let err_msg_str = format!("Failed to enqueue with error: {}, but the data was derived", e); - debug!(ctx.logger(), "{}", err_msg_str); - err_msg = Some(err_msg_str); - // derived, update ready watch and return no dependency - *watch.lock() = - Some(EnqueueResponse::new(Box::new(future::ok(true)))); - None - } else { - // return same item for enqueue and incremente failures count - failed_attempt += 1; - let err_msg_str = format!("Failed to enqueue into DAG: {}", e); - error!(ctx.logger(), "{}", err_msg_str); - err_msg = Some(err_msg_str); - Some(item) + let derived_ancestors = commit_graph.ancestors_frontier_with(ctx, vec![item.head_cs_id()], |cs_id| async move { + Ok(ddm.is_derived(ctx, cs_id, None, derived_data_type).await?) + }).await?; + + let mut underived_batch = commit_graph.ancestors_difference(ctx, vec![item.head_cs_id()], derived_ancestors).await?; + match underived_batch.pop() { + // All changesets in the batch were derived + None => { + let err_msg_str = format!("Failed to enqueue with error: {}, but the data was derived", e); + debug!(ctx.logger(), "{}", err_msg_str); + err_msg = Some(err_msg_str); + // derived, update ready watch and return no dependency + *watch.lock() = + Some(EnqueueResponse::new(Box::new(future::ok(true)))); + None + } + // None of the changesets in the batch were derived, but enqueuing failed + Some(root_cs_id) if root_cs_id == item.root_cs_id() => { + // return same item for enqueue and increment failures count + failed_attempt += 1; + let err_msg_str = format!("Failed to enqueue into DAG: {}", e); + error!(ctx.logger(), "{}", err_msg_str); + err_msg = Some(err_msg_str); + Some(item) + } + // Some of the changesets in the batch were derived + Some(root_cs_id) => { + // Create a new item with only the underived changesets + Some( + DerivationDagItem::new( + item.repo_id(), + item.config_name().to_string(), + item.derived_data_type(), + root_cs_id, + item.head_cs_id(), + item.bubble_id(), + vec![], + item.client_info(), + )? + ) + } } } }