Skip to content

Commit

Permalink
chore: use universal batching fn
Browse files Browse the repository at this point in the history
  • Loading branch information
grumbach committed Oct 30, 2024
1 parent 6c2c3e8 commit b671669
Showing 1 changed file with 23 additions and 20 deletions.
43 changes: 23 additions & 20 deletions autonomi/src/client/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,29 +33,32 @@ use crate::utils::payment_proof_from_quotes_and_payments;
impl Client {
/// Fetch and decrypt all chunks in the data map.
pub(crate) async fn fetch_from_data_map(&self, data_map: &DataMap) -> Result<Bytes, GetError> {
let mut encrypted_chunks = vec![];

let mut stream = futures::stream::iter(data_map.infos().into_iter())
.map(|info| {
let dst_hash = info.dst_hash;
async move {
self.chunk_get(dst_hash)
.await
.inspect_err(move |err| {
error!("Error fetching chunk {:?}: {err:?}", dst_hash)
})
.map(|chunk| EncryptedChunk {
index: info.index,
content: chunk.value,
})
let mut download_tasks = vec![];
for info in data_map.infos() {
download_tasks.push(async move {
match self
.chunk_get(info.dst_hash)
.await
.inspect_err(|err| error!("Error fetching chunk {:?}: {err:?}", info.dst_hash))
{
Ok(chunk) => Ok(EncryptedChunk {
index: info.index,
content: chunk.value,
}),
Err(err) => {
error!("Error fetching chunk {:?}: {err:?}", info.dst_hash);
Err(err)
}
}
})
.buffered(*CHUNK_DOWNLOAD_BATCH_SIZE);

while let Some(encrypted_chunk_result) = stream.next().await {
encrypted_chunks.push(encrypted_chunk_result?);
});
}

let encrypted_chunks =
process_tasks_with_max_concurrency(download_tasks, *CHUNK_DOWNLOAD_BATCH_SIZE)
.await
.into_iter()
.collect::<Result<Vec<EncryptedChunk>, GetError>>()?;

let data = decrypt_full_set(data_map, &encrypted_chunks).map_err(|e| {
error!("Error decrypting encrypted_chunks: {e:?}");
GetError::Decryption(crate::self_encryption::Error::SelfEncryption(e))
Expand Down

0 comments on commit b671669

Please sign in to comment.