Skip to content

Commit

Permalink
Merge pull request #2334 from maidsafe/revert-2258-optimize_uploads
Browse files Browse the repository at this point in the history
Revert "feat(autonomi): allow the uploader to work with the new apis"
  • Loading branch information
grumbach authored Oct 28, 2024
2 parents cefd686 + f37b4d3 commit 31721be
Show file tree
Hide file tree
Showing 23 changed files with 318 additions and 3,188 deletions.
6 changes: 1 addition & 5 deletions .github/workflows/merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,6 @@ jobs:

- uses: Swatinem/rust-cache@v2

- name: Run autonomi tests
timeout-minutes: 25
run: cargo test --release --package autonomi --lib --features="full,fs"

- name: Run node tests
timeout-minutes: 25
run: cargo test --release --package sn_node --lib
Expand Down Expand Up @@ -192,7 +188,7 @@ jobs:
# only these unit tests require a network, the rest are run above in unit test section
- name: Run autonomi --tests
run: cargo test --package autonomi --features="full,fs" --tests -- --nocapture
run: cargo test --package autonomi --tests -- --nocapture
env:
SN_LOG: "v"
# only set the target dir for windows to bypass the linker issue.
Expand Down
17 changes: 0 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 13 additions & 28 deletions autonomi-cli/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,28 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use autonomi::client::{Amount, ClientEvent};

/// Summary of the upload operation.
#[derive(Debug, Clone)]
pub struct CliUploadSummary {
/// Total tokens spent during the upload.
pub tokens_spent: Amount,
/// Total number of records uploaded.
pub record_count: usize,
}
use autonomi::client::{Amount, ClientEvent, UploadSummary};

/// Collects upload summary from the event receiver.
/// Send a signal to the returned sender to stop collecting and to return the result via the join handle.
pub fn collect_upload_summary(
mut event_receiver: tokio::sync::mpsc::Receiver<ClientEvent>,
) -> (
tokio::task::JoinHandle<CliUploadSummary>,
tokio::task::JoinHandle<UploadSummary>,
tokio::sync::oneshot::Sender<()>,
) {
let (upload_completed_tx, mut upload_completed_rx) = tokio::sync::oneshot::channel::<()>();
let stats_thread = tokio::spawn(async move {
let mut tokens: Amount = Amount::from(0);
let mut records = 0;
let mut tokens_spent: Amount = Amount::from(0);
let mut record_count = 0;

loop {
tokio::select! {
event = event_receiver.recv() => {
match event {
Some(ClientEvent::UploadComplete {
tokens_spent,
record_count
}) => {
tokens += tokens_spent;
records += record_count;
Some(ClientEvent::UploadComplete(upload_summary)) => {
tokens_spent += upload_summary.tokens_spent;
record_count += upload_summary.record_count;
}
None => break,
}
Expand All @@ -51,19 +39,16 @@ pub fn collect_upload_summary(
// try to drain the event receiver in case there are any more events
while let Ok(event) = event_receiver.try_recv() {
match event {
ClientEvent::UploadComplete {
tokens_spent,
record_count,
} => {
tokens += tokens_spent;
records += record_count;
ClientEvent::UploadComplete(upload_summary) => {
tokens_spent += upload_summary.tokens_spent;
record_count += upload_summary.record_count;
}
}
}

CliUploadSummary {
tokens_spent: tokens,
record_count: records,
UploadSummary {
tokens_spent,
record_count,
}
});

Expand Down
5 changes: 0 additions & 5 deletions autonomi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@ bytes = { version = "1.0.1", features = ["serde"] }
curv = { version = "0.10.1", package = "sn_curv", default-features = false, features = [
"num-bigint",
] }
custom_debug = "~0.6.1"
eip2333 = { version = "0.2.1", package = "sn_bls_ckd" }
const-hex = "1.12.0"
hex = "~0.4.3"
itertools = "~0.12.1"
libp2p = "0.54.1"
rand = "0.8.5"
rmp-serde = "1.1.1"
Expand All @@ -60,13 +58,10 @@ blstrs = "0.7.1"

[dev-dependencies]
alloy = { version = "0.5.3", default-features = false, features = ["std", "reqwest-rustls-tls", "provider-anvil-node", "sol-types", "json", "signers", "contract", "signer-local", "network"] }
assert_matches = "1.5.0"
eyre = "0.6.5"
sha2 = "0.10.6"
sn_logging = { path = "../sn_logging", version = "0.2.37" }
sn_peers_acquisition = { path = "../sn_peers_acquisition", version = "0.5.4" }
sn_registers = { path = "../sn_registers", version = "0.4.0", features = ["test-utils"] }
tempfile = "3.6.0"
# Do not specify the version field. Release process expects even the local dev deps to be published.
# Removing the version field is a workaround.
test_utils = { path = "../test_utils" }
Expand Down
78 changes: 60 additions & 18 deletions autonomi/src/client/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,22 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use crate::client::ClientEvent;
use crate::uploader::{UploadError, Uploader};
use crate::{self_encryption::encrypt, Client};
use bytes::Bytes;
use libp2p::kad::Quorum;
use tokio::task::{JoinError, JoinSet};

use std::collections::HashSet;
use xor_name::XorName;

use crate::client::{ClientEvent, UploadSummary};
use crate::{self_encryption::encrypt, Client};
use sn_evm::{Amount, AttoTokens};
use sn_evm::{EvmWallet, EvmWalletError};
use sn_networking::{GetRecordCfg, NetworkError};
use sn_protocol::{
storage::{try_deserialize_record, Chunk, ChunkAddress, RecordHeader, RecordKind},
NetworkAddress,
};
use std::collections::HashSet;
use xor_name::XorName;

/// Raw Data Address (points to a DataMap)
pub type DataAddr = XorName;
Expand All @@ -39,14 +41,14 @@ pub enum PutError {
PayError(#[from] PayError),
#[error("Serialization error: {0}")]
Serialization(String),
#[error("Upload Error")]
Upload(#[from] UploadError),
#[error("A wallet error occurred.")]
Wallet(#[from] sn_evm::EvmError),
#[error("The vault owner key does not match the client's public key")]
VaultBadOwner,
#[error("Payment unexpectedly invalid for {0:?}")]
PaymentUnexpectedlyInvalid(NetworkAddress),
#[error("Could not simultaneously upload chunks: {0:?}")]
JoinError(tokio::task::JoinError),
}

/// Errors that can occur during the pay operation.
Expand Down Expand Up @@ -78,6 +80,8 @@ pub enum GetError {
/// Errors that can occur during the cost calculation.
#[derive(Debug, thiserror::Error)]
pub enum CostError {
#[error("Could not simultaneously fetch store costs: {0:?}")]
JoinError(JoinError),
#[error("Failed to self-encrypt data.")]
SelfEncryption(#[from] crate::self_encryption::Error),
#[error("Could not get store quote for: {0:?} after several retries")]
Expand Down Expand Up @@ -114,24 +118,62 @@ impl Client {
debug!("Encryption took: {:.2?}", now.elapsed());

let map_xor_name = *data_map_chunk.address().xorname();
let mut xor_names = vec![map_xor_name];

let mut uploader = Uploader::new(self.clone(), wallet.clone());
uploader.insert_chunks(chunks);
uploader.insert_chunks(vec![data_map_chunk]);
for chunk in &chunks {
xor_names.push(*chunk.name());
}

let summary = uploader.start_upload().await?;
// Pay for all chunks + data map chunk
info!("Paying for {} addresses", xor_names.len());
let (payment_proofs, _free_chunks) = self
.pay(xor_names.into_iter(), wallet)
.await
.inspect_err(|err| error!("Error paying for data: {err:?}"))?;

let mut record_count = 0;

// Upload all the chunks in parallel including the data map chunk
debug!("Uploading {} chunks", chunks.len());
let mut tasks = JoinSet::new();
for chunk in chunks.into_iter().chain(std::iter::once(data_map_chunk)) {
let self_clone = self.clone();
let address = *chunk.address();
if let Some(proof) = payment_proofs.get(chunk.name()) {
let proof_clone = proof.clone();
tasks.spawn(async move {
self_clone
.chunk_upload_with_payment(chunk, proof_clone)
.await
.inspect_err(|err| error!("Error uploading chunk {address:?} :{err:?}"))
});
} else {
debug!("Chunk at {address:?} was already paid for so skipping");
}
}
while let Some(result) = tasks.join_next().await {
result
.inspect_err(|err| error!("Join error uploading chunk: {err:?}"))
.map_err(PutError::JoinError)?
.inspect_err(|err| error!("Error uploading chunk: {err:?}"))?;
record_count += 1;
}

if let Some(channel) = self.client_event_sender.as_ref() {
if let Err(err) = channel
.send(ClientEvent::UploadComplete {
record_count: summary.uploaded_count,
tokens_spent: summary.storage_cost,
})
.await
{
let tokens_spent = payment_proofs
.values()
.map(|proof| proof.quote.cost.as_atto())
.sum::<Amount>();

let summary = UploadSummary {
record_count,
tokens_spent,
};
if let Err(err) = channel.send(ClientEvent::UploadComplete(summary)).await {
error!("Failed to send client event: {err:?}");
}
}

Ok(map_xor_name)
}

Expand Down
70 changes: 52 additions & 18 deletions autonomi/src/client/data_private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use super::data::{GetError, PutError};
use crate::client::ClientEvent;
use crate::uploader::Uploader;
use crate::{self_encryption::encrypt, Client};
use std::hash::{DefaultHasher, Hash, Hasher};

use bytes::Bytes;
use serde::{Deserialize, Serialize};
use sn_evm::EvmWallet;
use sn_evm::{Amount, EvmWallet};
use sn_protocol::storage::Chunk;
use std::hash::{DefaultHasher, Hash, Hasher};
use tokio::task::JoinSet;

use super::data::{GetError, PutError};
use crate::client::{ClientEvent, UploadSummary};
use crate::{self_encryption::encrypt, Client};

/// Private data on the network can be accessed with this
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, Ord)]
Expand Down Expand Up @@ -67,21 +69,53 @@ impl Client {
let (data_map_chunk, chunks) = encrypt(data)?;
debug!("Encryption took: {:.2?}", now.elapsed());

// Upload the chunks with the payments
let mut uploader = Uploader::new(self.clone(), wallet.clone());
uploader.insert_chunks(chunks);
uploader.insert_chunks(vec![data_map_chunk.clone()]);
// Pay for all chunks
let xor_names: Vec<_> = chunks.iter().map(|chunk| *chunk.name()).collect();
info!("Paying for {} addresses", xor_names.len());
let (payment_proofs, _free_chunks) = self
.pay(xor_names.into_iter(), wallet)
.await
.inspect_err(|err| error!("Error paying for data: {err:?}"))?;

let summary = uploader.start_upload().await?;
// Upload the chunks with the payments
let mut record_count = 0;
debug!("Uploading {} chunks", chunks.len());
let mut tasks = JoinSet::new();
for chunk in chunks {
let self_clone = self.clone();
let address = *chunk.address();
if let Some(proof) = payment_proofs.get(chunk.name()) {
let proof_clone = proof.clone();
tasks.spawn(async move {
self_clone
.chunk_upload_with_payment(chunk, proof_clone)
.await
.inspect_err(|err| error!("Error uploading chunk {address:?} :{err:?}"))
});
} else {
debug!("Chunk at {address:?} was already paid for so skipping");
}
}
while let Some(result) = tasks.join_next().await {
result
.inspect_err(|err| error!("Join error uploading chunk: {err:?}"))
.map_err(PutError::JoinError)?
.inspect_err(|err| error!("Error uploading chunk: {err:?}"))?;
record_count += 1;
}

// Reporting
if let Some(channel) = self.client_event_sender.as_ref() {
if let Err(err) = channel
.send(ClientEvent::UploadComplete {
record_count: summary.uploaded_count,
tokens_spent: summary.storage_cost,
})
.await
{
let tokens_spent = payment_proofs
.values()
.map(|proof| proof.quote.cost.as_atto())
.sum::<Amount>();

let summary = UploadSummary {
record_count,
tokens_spent,
};
if let Err(err) = channel.send(ClientEvent::UploadComplete(summary)).await {
error!("Failed to send client event: {err:?}");
}
}
Expand Down
Loading

0 comments on commit 31721be

Please sign in to comment.