From 0c820f03e17bb4981a2288553bacaf083ee756c1 Mon Sep 17 00:00:00 2001 From: qima Date: Thu, 9 May 2024 03:23:28 +0800 Subject: [PATCH] WIP --- sn_node/src/node.rs | 111 +++++++++++++++++++++++++- sn_transfers/src/wallet/hot_wallet.rs | 50 ++++++++++-- 2 files changed, 152 insertions(+), 9 deletions(-) diff --git a/sn_node/src/node.rs b/sn_node/src/node.rs index cc93f9f564..0c4148697d 100644 --- a/sn_node/src/node.rs +++ b/sn_node/src/node.rs @@ -17,20 +17,25 @@ use crate::metrics::NodeMetrics; use crate::RunningNode; use bytes::Bytes; -use libp2p::{identity::Keypair, Multiaddr, PeerId}; +use libp2p::{ + identity::Keypair, + kad::{Quorum, Record}, + Multiaddr, PeerId, +}; #[cfg(feature = "open-metrics")] use prometheus_client::registry::Registry; use rand::{rngs::StdRng, thread_rng, Rng, SeedableRng}; use sn_networking::{ close_group_majority, Network, NetworkBuilder, NetworkError, NetworkEvent, NodeIssue, - SwarmDriver, CLOSE_GROUP_SIZE, + PutRecordCfg, SwarmDriver, CLOSE_GROUP_SIZE, }; use sn_protocol::{ error::Error as ProtocolError, messages::{ChunkProof, Cmd, CmdResponse, Query, QueryResponse, Request, Response}, + storage::{try_serialize_record, RecordKind, SpendAddress}, NetworkAddress, PrettyPrintRecordKey, }; -use sn_transfers::{HotWallet, MainPubkey, MainSecretKey, NanoTokens}; +use sn_transfers::{Hash, HotWallet, MainPubkey, MainSecretKey, NanoTokens, NETWORK_ROYALTIES_PK}; use std::{ net::SocketAddr, path::PathBuf, @@ -233,6 +238,17 @@ impl Node { let mut rolling_index = 0; + // use a random timeout to ensure not sync when transmit messages. + let balance_forward_interval: u64 = 10 + * rng.gen_range( + PERIODIC_REPLICATION_INTERVAL_MAX_S / 2..PERIODIC_REPLICATION_INTERVAL_MAX_S, + ); + let balance_forward_time = Duration::from_secs(balance_forward_interval); + debug!("BalanceForward interval set to {balance_forward_time:?}"); + + let mut balance_forward_interval = tokio::time::interval(balance_forward_time); + let _ = balance_forward_interval.tick().await; // first tick completes immediately + loop { let peers_connected = &peers_connected; @@ -284,6 +300,18 @@ impl Node { rolling_index += 1; } } + // runs every balance_forward_interval time + _ = balance_forward_interval.tick() => { + let start = std::time::Instant::now(); + trace!("Periodic balance forward triggered"); + let network = self.network.clone(); + let owner = self.owner.clone(); + + let _handle = spawn(async move { + let _ = Self::try_forward_blance(network, owner); + info!("Periodic blance forward took {:?}", start.elapsed()); + }); + } node_cmd = cmds_receiver.recv() => { match node_cmd { Ok(cmd) => { @@ -726,6 +754,83 @@ impl Node { } } } + + fn try_forward_blance(network: Network, owner: String) -> Result<()> { + let mut spend_requests = vec![]; + { + // load wallet + let mut wallet = HotWallet::load_from(&network.root_dir_path)?; + let balance = wallet.balance(); + + let payee = vec![( + "reward collector".to_string(), + balance, + *NETWORK_ROYALTIES_PK, + )]; + + spend_requests.extend( + wallet + .prepare_forward_signed_spend(payee, Some(Hash::hash(&owner.into_bytes())))?, + ); + } + + let record_kind = RecordKind::Spend; + let put_cfg = PutRecordCfg { + put_quorum: Quorum::Majority, + retry_strategy: None, + use_put_record_to: None, + verification: None, + }; + + info!( + "Reward forwarding sending {} spends in this iteration.", + spend_requests.len() + ); + + for spend_request in spend_requests { + let network_clone = network.clone(); + let put_cfg_clone = put_cfg.clone(); + + // Sent out spend in separate thread to avoid blocking the main one + let _handle = spawn(async move { + let unique_pubkey = *spend_request.unique_pubkey(); + let cash_note_addr = SpendAddress::from_unique_pubkey(&unique_pubkey); + let network_address = NetworkAddress::from_spend_address(cash_note_addr); + + let record_key = network_address.to_record_key(); + let pretty_key = PrettyPrintRecordKey::from(&record_key); + + debug!("Reward forwarding in spend {pretty_key:?}: {spend_request:#?}"); + + let value = if let Ok(value) = try_serialize_record(&[spend_request], record_kind) { + value + } else { + error!("Reward forwarding: Failed to serialise spend {pretty_key:?}"); + return; + }; + + let record = Record { + key: record_key.clone(), + value: value.to_vec(), + publisher: None, + expires: None, + }; + + let result = network_clone.put_record(record, &put_cfg_clone).await; + + match result { + Ok(_) => info!("Reward forwarding: sending spend {pretty_key:?} completed"), + Err(err) => { + info!("Reward forwarding: sending spend {pretty_key:?} failed with {err:?}") + } + } + }); + + std::thread::sleep(Duration::from_millis(500)); + } + + Ok(()) + } } async fn chunk_proof_verify_peer( diff --git a/sn_transfers/src/wallet/hot_wallet.rs b/sn_transfers/src/wallet/hot_wallet.rs index 60846f0a58..08cb087909 100644 --- a/sn_transfers/src/wallet/hot_wallet.rs +++ b/sn_transfers/src/wallet/hot_wallet.rs @@ -345,7 +345,7 @@ impl HotWallet { let created_cash_notes = transfer.cash_notes_for_recipient.clone(); - self.update_local_wallet(transfer, exclusive_access)?; + self.update_local_wallet(transfer, exclusive_access, true)?; trace!("Releasing wallet lock"); // by dropping _exclusive_access Ok(created_cash_notes) @@ -370,12 +370,48 @@ impl HotWallet { self.reload()?; trace!("Wallet locked and loaded!"); - self.update_local_wallet(transfer, exclusive_access)?; + self.update_local_wallet(transfer, exclusive_access, true)?; trace!("Releasing wallet lock"); // by dropping _exclusive_access Ok(created_cash_notes) } + pub fn prepare_forward_signed_spend( + &mut self, + to: Vec, + reason_hash: Option, + ) -> Result> { + let (available_cash_notes, exclusive_access) = self.available_cash_notes()?; + debug!( + "Available CashNotes for local send: {:#?}", + available_cash_notes + ); + + let reason_hash = reason_hash.unwrap_or_default(); + + // create a unique key for each output + let mut rng = &mut rand::rngs::OsRng; + let to_unique_keys: Vec<_> = to + .into_iter() + .map(|(purpose, amount, address)| { + (amount, purpose, address, DerivationIndex::random(&mut rng)) + }) + .collect(); + + let transfer = OfflineTransfer::new( + available_cash_notes, + to_unique_keys, + self.address(), + reason_hash, + )?; + + let signed_spends = transfer.all_spend_requests.clone(); + + self.update_local_wallet(transfer, exclusive_access, false)?; + + Ok(signed_spends) + } + /// Performs a payment for each content address. /// Includes payment of network royalties. /// Returns the amount paid for storage, including the network royalties fee paid. @@ -524,7 +560,7 @@ impl HotWallet { // write all changes to local wallet let start = Instant::now(); - self.update_local_wallet(offline_transfer, exclusive_access)?; + self.update_local_wallet(offline_transfer, exclusive_access, true)?; trace!( "local_send_storage_payment completed local wallet update in {:?}", start.elapsed() @@ -537,6 +573,7 @@ impl HotWallet { &mut self, transfer: OfflineTransfer, exclusive_access: WalletExclusiveAccess, + insert_into_pending_spends: bool, ) -> Result<()> { // First of all, update client local state. let spent_unique_pubkeys: BTreeSet<_> = transfer @@ -569,9 +606,10 @@ impl HotWallet { start.elapsed() ); } - - for request in transfer.all_spend_requests { - self.unconfirmed_spend_requests.insert(request); + if insert_into_pending_spends { + for request in transfer.all_spend_requests { + self.unconfirmed_spend_requests.insert(request); + } } // store wallet to disk