Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed May 8, 2024
1 parent 555b020 commit 0c820f0
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 9 deletions.
111 changes: 108 additions & 3 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,25 @@ use crate::metrics::NodeMetrics;
use crate::RunningNode;

use bytes::Bytes;
use libp2p::{identity::Keypair, Multiaddr, PeerId};
use libp2p::{
identity::Keypair,
kad::{Quorum, Record},
Multiaddr, PeerId,
};
#[cfg(feature = "open-metrics")]
use prometheus_client::registry::Registry;
use rand::{rngs::StdRng, thread_rng, Rng, SeedableRng};
use sn_networking::{
close_group_majority, Network, NetworkBuilder, NetworkError, NetworkEvent, NodeIssue,
SwarmDriver, CLOSE_GROUP_SIZE,
PutRecordCfg, SwarmDriver, CLOSE_GROUP_SIZE,
};
use sn_protocol::{
error::Error as ProtocolError,
messages::{ChunkProof, Cmd, CmdResponse, Query, QueryResponse, Request, Response},
storage::{try_serialize_record, RecordKind, SpendAddress},
NetworkAddress, PrettyPrintRecordKey,
};
use sn_transfers::{HotWallet, MainPubkey, MainSecretKey, NanoTokens};
use sn_transfers::{Hash, HotWallet, MainPubkey, MainSecretKey, NanoTokens, NETWORK_ROYALTIES_PK};
use std::{
net::SocketAddr,
path::PathBuf,
Expand Down Expand Up @@ -233,6 +238,17 @@ impl Node {

let mut rolling_index = 0;

// use a random timeout to ensure not sync when transmit messages.
let balance_forward_interval: u64 = 10
* rng.gen_range(
PERIODIC_REPLICATION_INTERVAL_MAX_S / 2..PERIODIC_REPLICATION_INTERVAL_MAX_S,
);
let balance_forward_time = Duration::from_secs(balance_forward_interval);
debug!("BalanceForward interval set to {balance_forward_time:?}");

let mut balance_forward_interval = tokio::time::interval(balance_forward_time);
let _ = balance_forward_interval.tick().await; // first tick completes immediately

loop {
let peers_connected = &peers_connected;

Expand Down Expand Up @@ -284,6 +300,18 @@ impl Node {
rolling_index += 1;
}
}
// runs every balance_forward_interval time
_ = balance_forward_interval.tick() => {
let start = std::time::Instant::now();
trace!("Periodic balance forward triggered");
let network = self.network.clone();
let owner = self.owner.clone();

let _handle = spawn(async move {
let _ = Self::try_forward_blance(network, owner);
info!("Periodic blance forward took {:?}", start.elapsed());
});
}
node_cmd = cmds_receiver.recv() => {
match node_cmd {
Ok(cmd) => {
Expand Down Expand Up @@ -726,6 +754,83 @@ impl Node {
}
}
}

fn try_forward_blance(network: Network, owner: String) -> Result<()> {
let mut spend_requests = vec![];
{
// load wallet
let mut wallet = HotWallet::load_from(&network.root_dir_path)?;
let balance = wallet.balance();

let payee = vec![(
"reward collector".to_string(),
balance,
*NETWORK_ROYALTIES_PK,
)];

spend_requests.extend(
wallet
.prepare_forward_signed_spend(payee, Some(Hash::hash(&owner.into_bytes())))?,
);
}

let record_kind = RecordKind::Spend;
let put_cfg = PutRecordCfg {
put_quorum: Quorum::Majority,
retry_strategy: None,
use_put_record_to: None,
verification: None,
};

info!(
"Reward forwarding sending {} spends in this iteration.",
spend_requests.len()
);

for spend_request in spend_requests {
let network_clone = network.clone();
let put_cfg_clone = put_cfg.clone();

// Sent out spend in separate thread to avoid blocking the main one
let _handle = spawn(async move {
let unique_pubkey = *spend_request.unique_pubkey();
let cash_note_addr = SpendAddress::from_unique_pubkey(&unique_pubkey);
let network_address = NetworkAddress::from_spend_address(cash_note_addr);

let record_key = network_address.to_record_key();
let pretty_key = PrettyPrintRecordKey::from(&record_key);

debug!("Reward forwarding in spend {pretty_key:?}: {spend_request:#?}");

let value = if let Ok(value) = try_serialize_record(&[spend_request], record_kind) {
value
} else {
error!("Reward forwarding: Failed to serialise spend {pretty_key:?}");
return;
};

let record = Record {
key: record_key.clone(),
value: value.to_vec(),
publisher: None,
expires: None,
};

let result = network_clone.put_record(record, &put_cfg_clone).await;

match result {
Ok(_) => info!("Reward forwarding: sending spend {pretty_key:?} completed"),
Err(err) => {
info!("Reward forwarding: sending spend {pretty_key:?} failed with {err:?}")
}
}
});

std::thread::sleep(Duration::from_millis(500));
}

Ok(())
}
}

async fn chunk_proof_verify_peer(
Expand Down
50 changes: 44 additions & 6 deletions sn_transfers/src/wallet/hot_wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ impl HotWallet {

let created_cash_notes = transfer.cash_notes_for_recipient.clone();

self.update_local_wallet(transfer, exclusive_access)?;
self.update_local_wallet(transfer, exclusive_access, true)?;

trace!("Releasing wallet lock"); // by dropping _exclusive_access
Ok(created_cash_notes)
Expand All @@ -370,12 +370,48 @@ impl HotWallet {
self.reload()?;
trace!("Wallet locked and loaded!");

self.update_local_wallet(transfer, exclusive_access)?;
self.update_local_wallet(transfer, exclusive_access, true)?;

trace!("Releasing wallet lock"); // by dropping _exclusive_access
Ok(created_cash_notes)
}

pub fn prepare_forward_signed_spend(
&mut self,
to: Vec<TransactionPayeeDetails>,
reason_hash: Option<Hash>,
) -> Result<Vec<SignedSpend>> {
let (available_cash_notes, exclusive_access) = self.available_cash_notes()?;
debug!(
"Available CashNotes for local send: {:#?}",
available_cash_notes
);

let reason_hash = reason_hash.unwrap_or_default();

// create a unique key for each output
let mut rng = &mut rand::rngs::OsRng;
let to_unique_keys: Vec<_> = to
.into_iter()
.map(|(purpose, amount, address)| {
(amount, purpose, address, DerivationIndex::random(&mut rng))
})
.collect();

let transfer = OfflineTransfer::new(
available_cash_notes,
to_unique_keys,
self.address(),
reason_hash,
)?;

let signed_spends = transfer.all_spend_requests.clone();

self.update_local_wallet(transfer, exclusive_access, false)?;

Ok(signed_spends)
}

/// Performs a payment for each content address.
/// Includes payment of network royalties.
/// Returns the amount paid for storage, including the network royalties fee paid.
Expand Down Expand Up @@ -524,7 +560,7 @@ impl HotWallet {

// write all changes to local wallet
let start = Instant::now();
self.update_local_wallet(offline_transfer, exclusive_access)?;
self.update_local_wallet(offline_transfer, exclusive_access, true)?;
trace!(
"local_send_storage_payment completed local wallet update in {:?}",
start.elapsed()
Expand All @@ -537,6 +573,7 @@ impl HotWallet {
&mut self,
transfer: OfflineTransfer,
exclusive_access: WalletExclusiveAccess,
insert_into_pending_spends: bool,
) -> Result<()> {
// First of all, update client local state.
let spent_unique_pubkeys: BTreeSet<_> = transfer
Expand Down Expand Up @@ -569,9 +606,10 @@ impl HotWallet {
start.elapsed()
);
}

for request in transfer.all_spend_requests {
self.unconfirmed_spend_requests.insert(request);
if insert_into_pending_spends {
for request in transfer.all_spend_requests {
self.unconfirmed_spend_requests.insert(request);
}
}

// store wallet to disk
Expand Down

0 comments on commit 0c820f0

Please sign in to comment.