Skip to content

Commit

Permalink
pool: reduce atomic operations when cloning
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Kishimoto <yukikishimoto@protonmail.com>
  • Loading branch information
yukibtc committed Dec 28, 2024
1 parent 748951d commit c78d4ef
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 81 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
* pool: improve shutdown docs ([dluvian])
* pool: rename `FilterOptions` to `ReqExitPolicy` ([Yuki Kishimoto])
* pool: log WebSocket connection error only if different from the last one ([Yuki Kishimoto])
* pool: reduce atomic operations when cloning ([Yuki Kishimoto])
* sdk: refactor POW difficulty management ([Yuki Kishimoto])
* connect: require `fmt::Debug`, `Send` and `Sync` for `AuthUrlHandler` ([Yuki Kishimoto])
* zapper: bump `webln` to 0.4 ([Yuki Kishimoto])
Expand Down
77 changes: 43 additions & 34 deletions crates/nostr-relay-pool/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,21 @@ use crate::{RelayServiceFlags, SubscribeOptions};

type Relays = HashMap<RelayUrl, Relay>;

// Instead of wrap every field in an `Arc<T>`, which increases the number of atomic operations,
// put all fields that require an `Arc` here.
#[derive(Debug)]
struct AtomicPrivateData {
relays: RwLock<Relays>,
subscriptions: RwLock<HashMap<SubscriptionId, Vec<Filter>>>,
shutdown: AtomicBool,
}

#[derive(Debug, Clone)]
pub struct InnerRelayPool {
pub(super) state: SharedState,
relays: Arc<RwLock<Relays>>,
atomic: Arc<AtomicPrivateData>,
notification_sender: broadcast::Sender<RelayPoolNotification>, // TODO: move to shared state?
subscriptions: Arc<RwLock<HashMap<SubscriptionId, Vec<Filter>>>>,
opts: RelayPoolOptions,
shutdown: Arc<AtomicBool>,
}

impl AtomicDestroyer for InnerRelayPool {
Expand All @@ -55,11 +62,13 @@ impl InnerRelayPool {

Self {
state,
relays: Arc::new(RwLock::new(HashMap::new())),
atomic: Arc::new(AtomicPrivateData {
relays: RwLock::new(HashMap::new()),
subscriptions: RwLock::new(HashMap::new()),
shutdown: AtomicBool::new(false),
}),
notification_sender,
subscriptions: Arc::new(RwLock::new(HashMap::new())),
opts,
shutdown: Arc::new(AtomicBool::new(false)),
}
}

Expand All @@ -73,22 +82,22 @@ impl InnerRelayPool {
.send(RelayPoolNotification::Shutdown);

// Mark as shutdown
self.shutdown.store(true, Ordering::SeqCst);
self.atomic.shutdown.store(true, Ordering::SeqCst);

Ok(())
}

#[inline]
pub(super) fn is_shutdown(&self) -> bool {
self.shutdown.load(Ordering::SeqCst)
self.atomic.shutdown.load(Ordering::SeqCst)
}

pub fn notifications(&self) -> broadcast::Receiver<RelayPoolNotification> {
self.notification_sender.subscribe()
}

pub async fn all_relays(&self) -> Relays {
let relays = self.relays.read().await;
let relays = self.atomic.relays.read().await;
relays.clone()
}

Expand All @@ -114,15 +123,15 @@ impl InnerRelayPool {

/// Get relays that have a certain [RelayServiceFlag] enabled
pub async fn relays_with_flag(&self, flag: RelayServiceFlags, check: FlagCheck) -> Relays {
let relays = self.relays.read().await;
let relays = self.atomic.relays.read().await;
self.internal_relays_with_flag(&relays, flag, check)
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
}

/// Get relays with `READ` or `WRITE` relays
async fn relay_urls(&self) -> Vec<RelayUrl> {
let relays = self.relays.read().await;
let relays = self.atomic.relays.read().await;
self.internal_relays_with_flag(
&relays,
RelayServiceFlags::READ | RelayServiceFlags::WRITE,
Expand All @@ -133,14 +142,14 @@ impl InnerRelayPool {
}

async fn read_relay_urls(&self) -> Vec<RelayUrl> {
let relays = self.relays.read().await;
let relays = self.atomic.relays.read().await;
self.internal_relays_with_flag(&relays, RelayServiceFlags::READ, FlagCheck::All)
.map(|(k, ..)| k.clone())
.collect()
}

async fn write_relay_urls(&self) -> Vec<RelayUrl> {
let relays = self.relays.read().await;
let relays = self.atomic.relays.read().await;
self.internal_relays_with_flag(&relays, RelayServiceFlags::WRITE, FlagCheck::All)
.map(|(k, ..)| k.clone())
.collect()
Expand All @@ -161,32 +170,32 @@ impl InnerRelayPool {
Error: From<<U as TryIntoUrl>::Err>,
{
let url: RelayUrl = url.try_into_url()?;
let relays = self.relays.read().await;
let relays = self.atomic.relays.read().await;
self.internal_relay(&relays, &url).cloned()
}

pub async fn subscriptions(&self) -> HashMap<SubscriptionId, Vec<Filter>> {
self.subscriptions.read().await.clone()
self.atomic.subscriptions.read().await.clone()
}

pub async fn subscription(&self, id: &SubscriptionId) -> Option<Vec<Filter>> {
let subscriptions = self.subscriptions.read().await;
let subscriptions = self.atomic.subscriptions.read().await;
subscriptions.get(id).cloned()
}

pub async fn save_subscription(&self, id: SubscriptionId, filters: Vec<Filter>) {
let mut subscriptions = self.subscriptions.write().await;
let mut subscriptions = self.atomic.subscriptions.write().await;
let current: &mut Vec<Filter> = subscriptions.entry(id).or_default();
*current = filters;
}

pub(crate) async fn remove_subscription(&self, id: &SubscriptionId) {
let mut subscriptions = self.subscriptions.write().await;
let mut subscriptions = self.atomic.subscriptions.write().await;
subscriptions.remove(id);
}

pub(crate) async fn remove_all_subscriptions(&self) {
let mut subscriptions = self.subscriptions.write().await;
let mut subscriptions = self.atomic.subscriptions.write().await;
subscriptions.clear();
}

Expand All @@ -209,7 +218,7 @@ impl InnerRelayPool {
}

// Get relays
let mut relays = self.relays.write().await;
let mut relays = self.atomic.relays.write().await;

// Check if map already contains url
if relays.contains_key(&url) {
Expand Down Expand Up @@ -302,15 +311,15 @@ impl InnerRelayPool {
let url: RelayUrl = url.try_into_url()?;

// Acquire write lock
let mut relays = self.relays.write().await;
let mut relays = self.atomic.relays.write().await;

// Remove
self.internal_remove_relay(&mut relays, url, force).await
}

pub async fn remove_all_relays(&self, force: bool) -> Result<(), Error> {
// Acquire write lock
let mut relays = self.relays.write().await;
let mut relays = self.atomic.relays.write().await;

// Collect all relay urls
let urls: Vec<RelayUrl> = relays.keys().cloned().collect();
Expand Down Expand Up @@ -354,7 +363,7 @@ impl InnerRelayPool {
}

// Lock with read shared access
let relays = self.relays.read().await;
let relays = self.atomic.relays.read().await;

if relays.is_empty() {
return Err(Error::NoRelays);
Expand Down Expand Up @@ -418,7 +427,7 @@ impl InnerRelayPool {
}

// Lock with read shared access
let relays = self.relays.read().await;
let relays = self.atomic.relays.read().await;

if relays.is_empty() {
return Err(Error::NoRelays);
Expand Down Expand Up @@ -566,7 +575,7 @@ impl InnerRelayPool {
}

// Lock with read shared access
let relays = self.relays.read().await;
let relays = self.atomic.relays.read().await;

// Check if relays map is empty
if relays.is_empty() {
Expand Down Expand Up @@ -618,7 +627,7 @@ impl InnerRelayPool {
self.remove_subscription(&id).await;

// Lock with read shared access
let relays = self.relays.read().await;
let relays = self.atomic.relays.read().await;

// TODO: use join_all and return `Output`?

Expand All @@ -635,7 +644,7 @@ impl InnerRelayPool {
self.remove_all_subscriptions().await;

// Lock with read shared access
let relays = self.relays.read().await;
let relays = self.atomic.relays.read().await;

// TODO: use join_all and return `Output`?

Expand Down Expand Up @@ -708,7 +717,7 @@ impl InnerRelayPool {
}

// Lock with read shared access
let relays = self.relays.read().await;
let relays = self.atomic.relays.read().await;

// Check if empty
if relays.is_empty() {
Expand Down Expand Up @@ -846,7 +855,7 @@ impl InnerRelayPool {
}

// Lock with read shared access
let relays = self.relays.read().await;
let relays = self.atomic.relays.read().await;

// Check if empty
if relays.is_empty() {
Expand All @@ -868,7 +877,7 @@ impl InnerRelayPool {
let this = self.clone();
task::spawn(async move {
// Lock with read shared access
let relays = this.relays.read().await;
let relays = this.atomic.relays.read().await;

let ids: Mutex<HashSet<EventId>> = Mutex::new(HashSet::new());

Expand Down Expand Up @@ -915,7 +924,7 @@ impl InnerRelayPool {

pub async fn connect(&self, connection_timeout: Option<Duration>) {
// Lock with read shared access
let relays = self.relays.read().await;
let relays = self.atomic.relays.read().await;

let mut futures = Vec::with_capacity(relays.len());

Expand Down Expand Up @@ -945,7 +954,7 @@ impl InnerRelayPool {

pub async fn disconnect(&self) -> Result<(), Error> {
// Lock with read shared access
let relays = self.relays.read().await;
let relays = self.atomic.relays.read().await;

// Iter values and disconnect
for relay in relays.values() {
Expand All @@ -968,7 +977,7 @@ impl InnerRelayPool {
let url: RelayUrl = url.try_into_url()?;

// Lock with read shared access
let relays = self.relays.read().await;
let relays = self.atomic.relays.read().await;

// Get relay
let relay: &Relay = self.internal_relay(&relays, &url)?;
Expand All @@ -988,7 +997,7 @@ impl InnerRelayPool {
let url: RelayUrl = url.try_into_url()?;

// Lock with read shared access
let relays = self.relays.read().await;
let relays = self.atomic.relays.read().await;

// Get relay
let relay: &Relay = self.internal_relay(&relays, &url)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/nostr-relay-pool/src/relay/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub const MAX_CONTACT_LIST_EVENT_SIZE: u32 = 840 * 1024; // 840 kB

pub(super) const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(10);
// Not increase the max retry interval too much.
// Keep it small avoid huge waits before reconnection if internet was gone for much time and then come back.
// Keep it small, avoid huge waits before reconnection if internet was gone for much time and then come back.
pub(super) const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(60);
pub(super) const JITTER_RANGE: RangeInclusive<i8> = -3..=3;

Expand Down
Loading

0 comments on commit c78d4ef

Please sign in to comment.