From 63179e5b7690d3a1d147543bec945b5a81ff022a Mon Sep 17 00:00:00 2001 From: Alexander Tesfamichael Date: Fri, 10 Nov 2023 15:43:43 +0100 Subject: [PATCH] feat(discover_slots): retry on connection reset --- src/bin/bundle-submissions/discover_slots.rs | 35 ++++++++++++++++---- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/src/bin/bundle-submissions/discover_slots.rs b/src/bin/bundle-submissions/discover_slots.rs index a89c255..d8de8f0 100644 --- a/src/bin/bundle-submissions/discover_slots.rs +++ b/src/bin/bundle-submissions/discover_slots.rs @@ -25,7 +25,7 @@ async fn discover_slots_for_path( let mut last_sent_slots: VecDeque = VecDeque::with_capacity(SLOT_MEMORY); let mut last_sent_slots_set: HashSet = HashSet::with_capacity(SLOT_MEMORY); - while let Some(block_submission_meta) = block_submission_meta_stream.try_next().await.unwrap() { + while let Some(block_submission_meta) = block_submission_meta_stream.try_next().await? { let path_str = block_submission_meta.location.to_string(); let slot = path_str .split('/') @@ -59,18 +59,39 @@ async fn discover_slots_for_path( Ok(()) } +fn check_if_error_is_connection_reset_by_peer(err: &anyhow::Error) -> bool { + err.downcast_ref::() + .map_or(false, |e| e.kind() == std::io::ErrorKind::ConnectionReset) +} + pub fn run_discover_slots_thread( path_prefix: Option, object_store: Arc, slots_tx: Sender, ) -> JoinHandle<()> { spawn(async move { - match discover_slots_for_path(path_prefix.as_ref(), object_store.as_ref(), slots_tx).await { - Ok(_) => { - info!(?path_prefix, "finished discovering slots for day"); - } - Err(e) => { - panic!("failed to discover slots for day, {}", e); + // OVH Object Storage resets the connection sometimes. If this is the case, we simply + // retry. + loop { + match discover_slots_for_path( + path_prefix.as_ref(), + object_store.as_ref(), + slots_tx.clone(), + ) + .await + { + Ok(_) => { + info!(?path_prefix, "finished discovering slots for day"); + break; + } + Err(e) => { + if check_if_error_is_connection_reset_by_peer(&e) { + warn!("failed to discover slots for day, {}, retrying", e); + continue; + } else { + panic!("failed to discover slots for day, {}", e); + } + } } } })