From c78d4efc8248a8c9b118d53994d28bda4bb50c18 Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Thu, 26 Dec 2024 12:12:37 +0100 Subject: [PATCH] pool: reduce atomic operations when cloning Signed-off-by: Yuki Kishimoto --- CHANGELOG.md | 1 + crates/nostr-relay-pool/src/pool/inner.rs | 77 ++++++++------- .../nostr-relay-pool/src/relay/constants.rs | 2 +- crates/nostr-relay-pool/src/relay/inner.rs | 98 ++++++++++--------- crates/nostr-relay-pool/src/shared.rs | 3 +- 5 files changed, 100 insertions(+), 81 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6be969f4b..383e34bf0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -66,6 +66,7 @@ * pool: improve shutdown docs ([dluvian]) * pool: rename `FilterOptions` to `ReqExitPolicy` ([Yuki Kishimoto]) * pool: log WebSocket connection error only if different from the last one ([Yuki Kishimoto]) +* pool: reduce atomic operations when cloning ([Yuki Kishimoto]) * sdk: refactor POW difficulty management ([Yuki Kishimoto]) * connect: require `fmt::Debug`, `Send` and `Sync` for `AuthUrlHandler` ([Yuki Kishimoto]) * zapper: bump `webln` to 0.4 ([Yuki Kishimoto]) diff --git a/crates/nostr-relay-pool/src/pool/inner.rs b/crates/nostr-relay-pool/src/pool/inner.rs index bd5c4e6d0..b75cf80cb 100644 --- a/crates/nostr-relay-pool/src/pool/inner.rs +++ b/crates/nostr-relay-pool/src/pool/inner.rs @@ -27,14 +27,21 @@ use crate::{RelayServiceFlags, SubscribeOptions}; type Relays = HashMap; +// Instead of wrap every field in an `Arc`, which increases the number of atomic operations, +// put all fields that require an `Arc` here. +#[derive(Debug)] +struct AtomicPrivateData { + relays: RwLock, + subscriptions: RwLock>>, + shutdown: AtomicBool, +} + #[derive(Debug, Clone)] pub struct InnerRelayPool { pub(super) state: SharedState, - relays: Arc>, + atomic: Arc, notification_sender: broadcast::Sender, // TODO: move to shared state? - subscriptions: Arc>>>, opts: RelayPoolOptions, - shutdown: Arc, } impl AtomicDestroyer for InnerRelayPool { @@ -55,11 +62,13 @@ impl InnerRelayPool { Self { state, - relays: Arc::new(RwLock::new(HashMap::new())), + atomic: Arc::new(AtomicPrivateData { + relays: RwLock::new(HashMap::new()), + subscriptions: RwLock::new(HashMap::new()), + shutdown: AtomicBool::new(false), + }), notification_sender, - subscriptions: Arc::new(RwLock::new(HashMap::new())), opts, - shutdown: Arc::new(AtomicBool::new(false)), } } @@ -73,14 +82,14 @@ impl InnerRelayPool { .send(RelayPoolNotification::Shutdown); // Mark as shutdown - self.shutdown.store(true, Ordering::SeqCst); + self.atomic.shutdown.store(true, Ordering::SeqCst); Ok(()) } #[inline] pub(super) fn is_shutdown(&self) -> bool { - self.shutdown.load(Ordering::SeqCst) + self.atomic.shutdown.load(Ordering::SeqCst) } pub fn notifications(&self) -> broadcast::Receiver { @@ -88,7 +97,7 @@ impl InnerRelayPool { } pub async fn all_relays(&self) -> Relays { - let relays = self.relays.read().await; + let relays = self.atomic.relays.read().await; relays.clone() } @@ -114,7 +123,7 @@ impl InnerRelayPool { /// Get relays that have a certain [RelayServiceFlag] enabled pub async fn relays_with_flag(&self, flag: RelayServiceFlags, check: FlagCheck) -> Relays { - let relays = self.relays.read().await; + let relays = self.atomic.relays.read().await; self.internal_relays_with_flag(&relays, flag, check) .map(|(k, v)| (k.clone(), v.clone())) .collect() @@ -122,7 +131,7 @@ impl InnerRelayPool { /// Get relays with `READ` or `WRITE` relays async fn relay_urls(&self) -> Vec { - let relays = self.relays.read().await; + let relays = self.atomic.relays.read().await; self.internal_relays_with_flag( &relays, RelayServiceFlags::READ | RelayServiceFlags::WRITE, @@ -133,14 +142,14 @@ impl InnerRelayPool { } async fn read_relay_urls(&self) -> Vec { - let relays = self.relays.read().await; + let relays = self.atomic.relays.read().await; self.internal_relays_with_flag(&relays, RelayServiceFlags::READ, FlagCheck::All) .map(|(k, ..)| k.clone()) .collect() } async fn write_relay_urls(&self) -> Vec { - let relays = self.relays.read().await; + let relays = self.atomic.relays.read().await; self.internal_relays_with_flag(&relays, RelayServiceFlags::WRITE, FlagCheck::All) .map(|(k, ..)| k.clone()) .collect() @@ -161,32 +170,32 @@ impl InnerRelayPool { Error: From<::Err>, { let url: RelayUrl = url.try_into_url()?; - let relays = self.relays.read().await; + let relays = self.atomic.relays.read().await; self.internal_relay(&relays, &url).cloned() } pub async fn subscriptions(&self) -> HashMap> { - self.subscriptions.read().await.clone() + self.atomic.subscriptions.read().await.clone() } pub async fn subscription(&self, id: &SubscriptionId) -> Option> { - let subscriptions = self.subscriptions.read().await; + let subscriptions = self.atomic.subscriptions.read().await; subscriptions.get(id).cloned() } pub async fn save_subscription(&self, id: SubscriptionId, filters: Vec) { - let mut subscriptions = self.subscriptions.write().await; + let mut subscriptions = self.atomic.subscriptions.write().await; let current: &mut Vec = subscriptions.entry(id).or_default(); *current = filters; } pub(crate) async fn remove_subscription(&self, id: &SubscriptionId) { - let mut subscriptions = self.subscriptions.write().await; + let mut subscriptions = self.atomic.subscriptions.write().await; subscriptions.remove(id); } pub(crate) async fn remove_all_subscriptions(&self) { - let mut subscriptions = self.subscriptions.write().await; + let mut subscriptions = self.atomic.subscriptions.write().await; subscriptions.clear(); } @@ -209,7 +218,7 @@ impl InnerRelayPool { } // Get relays - let mut relays = self.relays.write().await; + let mut relays = self.atomic.relays.write().await; // Check if map already contains url if relays.contains_key(&url) { @@ -302,7 +311,7 @@ impl InnerRelayPool { let url: RelayUrl = url.try_into_url()?; // Acquire write lock - let mut relays = self.relays.write().await; + let mut relays = self.atomic.relays.write().await; // Remove self.internal_remove_relay(&mut relays, url, force).await @@ -310,7 +319,7 @@ impl InnerRelayPool { pub async fn remove_all_relays(&self, force: bool) -> Result<(), Error> { // Acquire write lock - let mut relays = self.relays.write().await; + let mut relays = self.atomic.relays.write().await; // Collect all relay urls let urls: Vec = relays.keys().cloned().collect(); @@ -354,7 +363,7 @@ impl InnerRelayPool { } // Lock with read shared access - let relays = self.relays.read().await; + let relays = self.atomic.relays.read().await; if relays.is_empty() { return Err(Error::NoRelays); @@ -418,7 +427,7 @@ impl InnerRelayPool { } // Lock with read shared access - let relays = self.relays.read().await; + let relays = self.atomic.relays.read().await; if relays.is_empty() { return Err(Error::NoRelays); @@ -566,7 +575,7 @@ impl InnerRelayPool { } // Lock with read shared access - let relays = self.relays.read().await; + let relays = self.atomic.relays.read().await; // Check if relays map is empty if relays.is_empty() { @@ -618,7 +627,7 @@ impl InnerRelayPool { self.remove_subscription(&id).await; // Lock with read shared access - let relays = self.relays.read().await; + let relays = self.atomic.relays.read().await; // TODO: use join_all and return `Output`? @@ -635,7 +644,7 @@ impl InnerRelayPool { self.remove_all_subscriptions().await; // Lock with read shared access - let relays = self.relays.read().await; + let relays = self.atomic.relays.read().await; // TODO: use join_all and return `Output`? @@ -708,7 +717,7 @@ impl InnerRelayPool { } // Lock with read shared access - let relays = self.relays.read().await; + let relays = self.atomic.relays.read().await; // Check if empty if relays.is_empty() { @@ -846,7 +855,7 @@ impl InnerRelayPool { } // Lock with read shared access - let relays = self.relays.read().await; + let relays = self.atomic.relays.read().await; // Check if empty if relays.is_empty() { @@ -868,7 +877,7 @@ impl InnerRelayPool { let this = self.clone(); task::spawn(async move { // Lock with read shared access - let relays = this.relays.read().await; + let relays = this.atomic.relays.read().await; let ids: Mutex> = Mutex::new(HashSet::new()); @@ -915,7 +924,7 @@ impl InnerRelayPool { pub async fn connect(&self, connection_timeout: Option) { // Lock with read shared access - let relays = self.relays.read().await; + let relays = self.atomic.relays.read().await; let mut futures = Vec::with_capacity(relays.len()); @@ -945,7 +954,7 @@ impl InnerRelayPool { pub async fn disconnect(&self) -> Result<(), Error> { // Lock with read shared access - let relays = self.relays.read().await; + let relays = self.atomic.relays.read().await; // Iter values and disconnect for relay in relays.values() { @@ -968,7 +977,7 @@ impl InnerRelayPool { let url: RelayUrl = url.try_into_url()?; // Lock with read shared access - let relays = self.relays.read().await; + let relays = self.atomic.relays.read().await; // Get relay let relay: &Relay = self.internal_relay(&relays, &url)?; @@ -988,7 +997,7 @@ impl InnerRelayPool { let url: RelayUrl = url.try_into_url()?; // Lock with read shared access - let relays = self.relays.read().await; + let relays = self.atomic.relays.read().await; // Get relay let relay: &Relay = self.internal_relay(&relays, &url)?; diff --git a/crates/nostr-relay-pool/src/relay/constants.rs b/crates/nostr-relay-pool/src/relay/constants.rs index da1636ee9..f347d0645 100644 --- a/crates/nostr-relay-pool/src/relay/constants.rs +++ b/crates/nostr-relay-pool/src/relay/constants.rs @@ -20,7 +20,7 @@ pub const MAX_CONTACT_LIST_EVENT_SIZE: u32 = 840 * 1024; // 840 kB pub(super) const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(10); // Not increase the max retry interval too much. -// Keep it small avoid huge waits before reconnection if internet was gone for much time and then come back. +// Keep it small, avoid huge waits before reconnection if internet was gone for much time and then come back. pub(super) const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(60); pub(super) const JITTER_RANGE: RangeInclusive = -3..=3; diff --git a/crates/nostr-relay-pool/src/relay/inner.rs b/crates/nostr-relay-pool/src/relay/inner.rs index 20b113ebd..d172a8b88 100644 --- a/crates/nostr-relay-pool/src/relay/inner.rs +++ b/crates/nostr-relay-pool/src/relay/inner.rs @@ -137,23 +137,30 @@ impl Default for SubscriptionData { } } +// Instead of wrap every field in an `Arc`, which increases the number of atomic operations, +// put all fields that require an `Arc` here. +#[derive(Debug)] +pub(super) struct AtomicPrivateData { + status: AtomicRelayStatus, + #[cfg(feature = "nip11")] + document: RwLock, + #[cfg(feature = "nip11")] + last_document_fetch: AtomicU64, + channels: RelayChannels, + subscriptions: RwLock>, + running: AtomicBool, +} + #[derive(Debug, Clone)] pub(crate) struct InnerRelay { pub(super) url: RelayUrl, - status: Arc, - #[cfg(feature = "nip11")] - document: Arc>, - #[cfg(feature = "nip11")] - last_document_fetch: Arc, + pub(super) atomic: Arc, pub(super) opts: RelayOptions, pub(super) flags: AtomicRelayServiceFlags, pub(super) stats: RelayConnectionStats, pub(super) state: SharedState, - channels: Arc, pub(super) internal_notification_sender: broadcast::Sender, external_notification_sender: OnceCell>, - subscriptions: Arc>>, - running: Arc, } impl AtomicDestroyer for InnerRelay { @@ -170,20 +177,22 @@ impl InnerRelay { Self { url, - status: Arc::new(AtomicRelayStatus::default()), - #[cfg(feature = "nip11")] - document: Arc::new(RwLock::new(RelayInformationDocument::new())), - #[cfg(feature = "nip11")] - last_document_fetch: Arc::new(AtomicU64::new(0)), + atomic: Arc::new(AtomicPrivateData { + status: AtomicRelayStatus::default(), + #[cfg(feature = "nip11")] + document: RwLock::new(RelayInformationDocument::new()), + #[cfg(feature = "nip11")] + last_document_fetch: AtomicU64::new(0), + channels: RelayChannels::new(), + subscriptions: RwLock::new(HashMap::new()), + running: AtomicBool::new(false), + }), flags: AtomicRelayServiceFlags::new(opts.flags), opts, stats: RelayConnectionStats::default(), state, - channels: Arc::new(RelayChannels::new()), internal_notification_sender: relay_notification_sender, external_notification_sender: OnceCell::new(), - subscriptions: Arc::new(RwLock::new(HashMap::new())), - running: Arc::new(AtomicBool::new(false)), } } @@ -195,17 +204,17 @@ impl InnerRelay { /// Is connection task running? #[inline] pub(super) fn is_running(&self) -> bool { - self.running.load(Ordering::SeqCst) + self.atomic.running.load(Ordering::SeqCst) } #[inline] pub fn status(&self) -> RelayStatus { - self.status.load() + self.atomic.status.load() } fn set_status(&self, status: RelayStatus, log: bool) { // Change status - self.status.set(status); + self.atomic.status.set(status); // Log if log { @@ -265,7 +274,7 @@ impl InnerRelay { #[cfg(feature = "nip11")] pub async fn document(&self) -> RelayInformationDocument { - let document = self.document.read().await; + let document = self.atomic.document.read().await; document.clone() } @@ -283,17 +292,17 @@ impl InnerRelay { let now: u64 = Timestamp::now().as_u64(); // Check last fetch - if self.last_document_fetch.load(Ordering::SeqCst) + 3600 < now { + if self.atomic.last_document_fetch.load(Ordering::SeqCst) + 3600 < now { // Update last fetch - self.last_document_fetch.store(now, Ordering::SeqCst); + self.atomic.last_document_fetch.store(now, Ordering::SeqCst); // Fetch let url = self.url.clone(); - let d = self.document.clone(); + let atomic = self.atomic.clone(); task::spawn(async move { match RelayInformationDocument::get(url.clone().into(), proxy).await { Ok(document) => { - let mut d = d.write().await; + let mut d = atomic.document.write().await; *d = document } Err(e) => { @@ -306,7 +315,7 @@ impl InnerRelay { } pub async fn subscriptions(&self) -> HashMap> { - let subscription = self.subscriptions.read().await; + let subscription = self.atomic.subscriptions.read().await; subscription .iter() .map(|(k, v)| (k.clone(), v.filters.clone())) @@ -314,7 +323,7 @@ impl InnerRelay { } pub async fn subscription(&self, id: &SubscriptionId) -> Option> { - let subscription = self.subscriptions.read().await; + let subscription = self.atomic.subscriptions.read().await; subscription.get(id).map(|d| d.filters.clone()) } @@ -324,7 +333,7 @@ impl InnerRelay { filters: Vec, update_subscribed_at: bool, ) { - let mut subscriptions = self.subscriptions.write().await; + let mut subscriptions = self.atomic.subscriptions.write().await; let data: &mut SubscriptionData = subscriptions.entry(id).or_default(); data.filters = filters; @@ -335,7 +344,7 @@ impl InnerRelay { /// Mark subscription as closed async fn subscription_closed(&self, id: &SubscriptionId) { - let mut subscriptions = self.subscriptions.write().await; + let mut subscriptions = self.atomic.subscriptions.write().await; if let Some(data) = subscriptions.get_mut(id) { data.closed = true; } @@ -343,7 +352,7 @@ impl InnerRelay { /// Check if it should subscribe for current websocket session pub(crate) async fn should_resubscribe(&self, id: &SubscriptionId) -> bool { - let subscriptions = self.subscriptions.read().await; + let subscriptions = self.atomic.subscriptions.read().await; match subscriptions.get(id) { Some(SubscriptionData { subscribed_at, @@ -365,13 +374,13 @@ impl InnerRelay { } pub(crate) async fn remove_subscription(&self, id: &SubscriptionId) { - let mut subscriptions = self.subscriptions.write().await; + let mut subscriptions = self.atomic.subscriptions.write().await; subscriptions.remove(id); } #[inline] pub fn queue(&self) -> usize { - self.channels.nostr_queue() + self.atomic.channels.nostr_queue() } pub(crate) fn set_notification_sender( @@ -467,10 +476,10 @@ impl InnerRelay { let relay = self.clone(); task::spawn(async move { // Set that connection task is running - relay.running.store(true, Ordering::SeqCst); + relay.atomic.running.store(true, Ordering::SeqCst); // Acquire service watcher - let mut rx_service = relay.channels.rx_service().await; + let mut rx_service = relay.atomic.channels.rx_service().await; // Last websocket error // Store it to avoid to print every time the same connection error @@ -485,7 +494,7 @@ impl InnerRelay { tokio::select! { // Connect and run message handler _ = relay.connect_and_run(connection_timeout, &mut last_ws_error) => {}, - // Handle terminate + // Handle "terminate" message _ = relay.handle_terminate(&mut rx_service) => { // Update status relay.set_status(RelayStatus::Terminated, true); @@ -498,7 +507,7 @@ impl InnerRelay { // Get status let status: RelayStatus = relay.status(); - // If status is set to terminated, break loop. + // If the status is set to "terminated", break loop. if status.is_terminated() { break; } @@ -522,7 +531,7 @@ impl InnerRelay { tokio::select! { // Sleep _ = time::sleep(interval) => {}, - // Handle terminate + // Handle "terminate" message _ = relay.handle_terminate(&mut rx_service) => { // Update status relay.set_status(RelayStatus::Terminated, true); @@ -530,7 +539,7 @@ impl InnerRelay { } } } else { - // Reconnection disabled, set status to terminated + // Reconnection disabled, set status to "terminated" relay.set_status(RelayStatus::Terminated, true); // Break loop and exit @@ -540,7 +549,7 @@ impl InnerRelay { } // Set that connection task is no longer running - relay.running.store(false, Ordering::SeqCst); + relay.atomic.running.store(false, Ordering::SeqCst); tracing::debug!(url = %relay.url, "Auto connect loop terminated."); }); @@ -670,7 +679,7 @@ impl InnerRelay { let ping: PingTracker = PingTracker::default(); - // Wait that one of the futures terminate/complete + // Wait that one of the futures terminates/completes tokio::select! { res = self.receiver_message_handler(ws_rx, &ping) => match res { Ok(()) => tracing::trace!(url = %self.url, "Relay received exited."), @@ -696,8 +705,8 @@ impl InnerRelay { let _ping = ping; // Lock receivers - let mut rx_nostr = self.channels.rx_nostr().await; - let mut rx_ping = self.channels.rx_ping().await; + let mut rx_nostr = self.atomic.channels.rx_nostr().await; + let mut rx_ping = self.atomic.channels.rx_ping().await; loop { tokio::select! { @@ -824,7 +833,7 @@ impl InnerRelay { ping.set_replied(false); // Try to ping - self.channels.ping(nonce)?; + self.atomic.channels.ping(nonce)?; // Sleep time::sleep(PING_INTERVAL).await; @@ -1115,7 +1124,8 @@ impl InnerRelay { pub fn disconnect(&self) -> Result<(), Error> { // Check if it's NOT terminated if !self.status().is_terminated() { - self.channels + self.atomic + .channels .send_service_msg(RelayServiceEvent::Terminate)?; self.send_notification(RelayNotification::Shutdown, false); } @@ -1148,7 +1158,7 @@ impl InnerRelay { } // Send message - self.channels.send_client_msgs(msgs) + self.atomic.channels.send_client_msgs(msgs) } #[inline] diff --git a/crates/nostr-relay-pool/src/shared.rs b/crates/nostr-relay-pool/src/shared.rs index 1ac897db4..587a8b587 100644 --- a/crates/nostr-relay-pool/src/shared.rs +++ b/crates/nostr-relay-pool/src/shared.rs @@ -28,8 +28,7 @@ impl fmt::Display for SharedStateError { } } -// TODO: add SharedStateBuilder? - +// TODO: reduce atomic operations #[derive(Debug, Clone)] pub struct SharedState { pub(crate) database: Arc,