diff --git a/autonomi/src/client/data.rs b/autonomi/src/client/data.rs index ec7ebf6d70..ba1831ea4b 100644 --- a/autonomi/src/client/data.rs +++ b/autonomi/src/client/data.rs @@ -9,7 +9,7 @@ use bytes::Bytes; use libp2p::kad::Quorum; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::LazyLock; use xor_name::XorName; @@ -17,8 +17,8 @@ use crate::client::payment::PaymentOption; use crate::client::utils::process_tasks_with_max_concurrency; use crate::client::{ClientEvent, UploadSummary}; use crate::{self_encryption::encrypt, Client}; -use sn_evm::EvmWalletError; use sn_evm::{Amount, AttoTokens}; +use sn_evm::{EvmWalletError, ProofOfPayment}; use sn_networking::{GetRecordCfg, NetworkError}; use sn_protocol::{ storage::{try_deserialize_record, Chunk, ChunkAddress, RecordHeader, RecordKind}, @@ -41,6 +41,9 @@ pub static CHUNK_UPLOAD_BATCH_SIZE: LazyLock = LazyLock::new(|| { batch_size }); +/// Number of retries to upload chunks. +pub const RETRY_ATTEMPTS: usize = 3; + /// Number of chunks to download in parallel. /// Can be overridden by the `CHUNK_DOWNLOAD_BATCH_SIZE` environment variable. pub static CHUNK_DOWNLOAD_BATCH_SIZE: LazyLock = LazyLock::new(|| { @@ -164,35 +167,28 @@ impl Client { // Upload all the chunks in parallel including the data map chunk debug!("Uploading {} chunks", chunks.len()); - let mut upload_tasks = vec![]; - for chunk in chunks.into_iter().chain(std::iter::once(data_map_chunk)) { - let self_clone = self.clone(); - let address = *chunk.address(); - if let Some(proof) = receipt.get(chunk.name()) { - let proof_clone = proof.clone(); - upload_tasks.push(async move { - self_clone - .chunk_upload_with_payment(chunk, proof_clone) - .await - .inspect_err(|err| error!("Error uploading chunk {address:?} :{err:?}")) - }); - } else { - debug!("Chunk at {address:?} was already paid for so skipping"); - } + + let mut failed_uploads = self + .upload_chunks_with_retries( + chunks + .iter() + .chain(std::iter::once(&data_map_chunk)) + .collect(), + &receipt, + ) + .await; + + // Return the last chunk upload error + if let Some(last_chunk_fail) = failed_uploads.pop() { + tracing::error!( + "Error uploading chunk ({:?}): {:?}", + last_chunk_fail.0.address(), + last_chunk_fail.1 + ); + return Err(last_chunk_fail.1); } - let uploads = - process_tasks_with_max_concurrency(upload_tasks, *CHUNK_UPLOAD_BATCH_SIZE).await; - - // Check for errors - let total_uploads = uploads.len(); - let ok_uploads = uploads - .iter() - .filter_map(|up| up.is_ok().then_some(())) - .count(); - info!("Uploaded {} chunks out of {}", ok_uploads, total_uploads); - let uploads: Result, _> = uploads.into_iter().collect(); - uploads.inspect_err(|err| error!("Error uploading chunk: {err:?}"))?; - let record_count = ok_uploads; + + let record_count = chunks.len() + 1; // Reporting if let Some(channel) = self.client_event_sender.as_ref() { @@ -273,4 +269,64 @@ impl Client { ); Ok(total_cost) } + + // Upload chunks and retry failed uploads up to `RETRY_ATTEMPTS` times. + pub(crate) async fn upload_chunks_with_retries<'a>( + &self, + mut chunks: Vec<&'a Chunk>, + receipt: &HashMap, + ) -> Vec<(&'a Chunk, PutError)> { + let mut current_attempt: usize = 1; + + loop { + let mut upload_tasks = vec![]; + for chunk in chunks { + let self_clone = self.clone(); + let address = *chunk.address(); + + let Some(proof) = receipt.get(chunk.name()) else { + debug!("Chunk at {address:?} was already paid for so skipping"); + continue; + }; + + upload_tasks.push(async move { + self_clone + .chunk_upload_with_payment(chunk, proof.clone()) + .await + .inspect_err(|err| error!("Error uploading chunk {address:?} :{err:?}")) + // Return chunk reference too, to re-use it next attempt/iteration + .map_err(|err| (chunk, err)) + }); + } + let uploads = + process_tasks_with_max_concurrency(upload_tasks, *CHUNK_UPLOAD_BATCH_SIZE).await; + + // Check for errors. + let total_uploads = uploads.len(); + let uploads_failed: Vec<_> = uploads.into_iter().filter_map(|up| up.err()).collect(); + info!( + "Uploaded {} chunks out of {total_uploads}", + total_uploads - uploads_failed.len() + ); + + // All uploads succeeded. + if uploads_failed.is_empty() { + return vec![]; + } + + // Max retries reached. + if current_attempt > RETRY_ATTEMPTS { + return uploads_failed; + } + + tracing::info!( + "Retrying putting {} failed chunks (attempt {current_attempt}/3)", + uploads_failed.len() + ); + + // Re-iterate over the failed chunks + chunks = uploads_failed.into_iter().map(|(chunk, _)| chunk).collect(); + current_attempt += 1; + } + } } diff --git a/autonomi/src/client/data_private.rs b/autonomi/src/client/data_private.rs index 29925b915b..2ddac1734a 100644 --- a/autonomi/src/client/data_private.rs +++ b/autonomi/src/client/data_private.rs @@ -13,10 +13,8 @@ use serde::{Deserialize, Serialize}; use sn_evm::Amount; use sn_protocol::storage::Chunk; -use super::data::CHUNK_UPLOAD_BATCH_SIZE; use super::data::{GetError, PutError}; use crate::client::payment::PaymentOption; -use crate::client::utils::process_tasks_with_max_concurrency; use crate::client::{ClientEvent, UploadSummary}; use crate::{self_encryption::encrypt, Client}; @@ -81,35 +79,22 @@ impl Client { // Upload the chunks with the payments debug!("Uploading {} chunks", chunks.len()); - let mut upload_tasks = vec![]; - for chunk in chunks { - let self_clone = self.clone(); - let address = *chunk.address(); - if let Some(proof) = receipt.get(chunk.name()) { - let proof_clone = proof.clone(); - upload_tasks.push(async move { - self_clone - .chunk_upload_with_payment(chunk, proof_clone) - .await - .inspect_err(|err| error!("Error uploading chunk {address:?} :{err:?}")) - }); - } else { - debug!("Chunk at {address:?} was already paid for so skipping"); - } + + let mut failed_uploads = self + .upload_chunks_with_retries(chunks.iter().collect(), &receipt) + .await; + + // Return the last chunk upload error + if let Some(last_chunk_fail) = failed_uploads.pop() { + tracing::error!( + "Error uploading chunk ({:?}): {:?}", + last_chunk_fail.0.address(), + last_chunk_fail.1 + ); + return Err(last_chunk_fail.1); } - let uploads = - process_tasks_with_max_concurrency(upload_tasks, *CHUNK_UPLOAD_BATCH_SIZE).await; - - // Check for errors - let total_uploads = uploads.len(); - let ok_uploads = uploads - .iter() - .filter_map(|up| up.is_ok().then_some(())) - .count(); - info!("Uploaded {} chunks out of {}", ok_uploads, total_uploads); - let uploads: Result, _> = uploads.into_iter().collect(); - uploads.inspect_err(|err| error!("Error uploading chunk: {err:?}"))?; - let record_count = ok_uploads; + + let record_count = chunks.len(); // Reporting if let Some(channel) = self.client_event_sender.as_ref() { diff --git a/autonomi/src/client/utils.rs b/autonomi/src/client/utils.rs index e8e8556820..28be35ff9e 100644 --- a/autonomi/src/client/utils.rs +++ b/autonomi/src/client/utils.rs @@ -100,7 +100,7 @@ impl Client { pub(crate) async fn chunk_upload_with_payment( &self, - chunk: Chunk, + chunk: &Chunk, payment: ProofOfPayment, ) -> Result<(), PutError> { let storing_node = payment.to_peer_id_payee().expect("Missing node Peer ID");