diff --git a/autonomi/src/client/data.rs b/autonomi/src/client/data.rs index 0a6be8598a..86f500bc57 100644 --- a/autonomi/src/client/data.rs +++ b/autonomi/src/client/data.rs @@ -40,6 +40,22 @@ pub static CHUNK_UPLOAD_BATCH_SIZE: LazyLock = LazyLock::new(|| { batch_size }); +/// Number of chunks to download in parallel. +/// Can be overridden by the `CHUNK_DOWNLOAD_BATCH_SIZE` environment variable. +pub static CHUNK_DOWNLOAD_BATCH_SIZE: LazyLock = LazyLock::new(|| { + let batch_size = std::env::var("CHUNK_DOWNLOAD_BATCH_SIZE") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or( + std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1) + * 8, + ); + info!("Chunk download batch size: {}", batch_size); + batch_size +}); + /// Raw Data Address (points to a DataMap) pub type DataAddr = XorName; /// Raw Chunk Address (points to a [`Chunk`]) diff --git a/autonomi/src/client/utils.rs b/autonomi/src/client/utils.rs index 95d70b6e4d..fc3679e01b 100644 --- a/autonomi/src/client/utils.rs +++ b/autonomi/src/client/utils.rs @@ -24,7 +24,7 @@ use std::{collections::HashMap, future::Future, num::NonZero}; use xor_name::XorName; use super::{ - data::{CostError, GetError, PayError, PutError}, + data::{CostError, GetError, PayError, PutError, CHUNK_DOWNLOAD_BATCH_SIZE}, Client, }; use crate::self_encryption::DataMapLevel; @@ -35,16 +35,25 @@ impl Client { pub(crate) async fn fetch_from_data_map(&self, data_map: &DataMap) -> Result { let mut encrypted_chunks = vec![]; - for info in data_map.infos() { - let chunk = self - .chunk_get(info.dst_hash) - .await - .inspect_err(|err| error!("Error fetching chunk {:?}: {err:?}", info.dst_hash))?; - let chunk = EncryptedChunk { - index: info.index, - content: chunk.value, - }; - encrypted_chunks.push(chunk); + let mut stream = futures::stream::iter(data_map.infos().into_iter()) + .map(|info| { + let dst_hash = info.dst_hash; + async move { + self.chunk_get(dst_hash) + .await + .inspect_err(move |err| { + error!("Error fetching chunk {:?}: {err:?}", dst_hash) + }) + .map(|chunk| EncryptedChunk { + index: info.index, + content: chunk.value, + }) + } + }) + .buffered(*CHUNK_DOWNLOAD_BATCH_SIZE); + + while let Some(encrypted_chunk_result) = stream.next().await { + encrypted_chunks.push(encrypted_chunk_result?); } let data = decrypt_full_set(data_map, &encrypted_chunks).map_err(|e| {