From d87fc3eed8eec015cbc0039d6a28c72b6f1748d4 Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Mon, 23 Dec 2024 10:11:21 +0100 Subject: [PATCH] pool: add `try_connect` methods * Add `Relay::try_connect` * Add `Client::try_connect` * Add `Client::try_connect_relay` * Add `RelayPool::try_connect` * Add `RelayPool::try_connect_relay` * Change `Relay::connect` method signature Closes https://github.com/rust-nostr/nostr/issues/624 Signed-off-by: Yuki Kishimoto --- CHANGELOG.md | 24 ++++++ bindings/nostr-sdk-ffi/src/client/mod.rs | 4 +- bindings/nostr-sdk-ffi/src/relay/mod.rs | 17 +++- bindings/nostr-sdk-js/src/client/mod.rs | 6 +- bindings/nostr-sdk-js/src/relay/mod.rs | 18 ++++- crates/nostr-connect/src/client.rs | 2 +- crates/nostr-connect/src/signer.rs | 3 +- crates/nostr-relay-pool/examples/pool.rs | 2 +- crates/nostr-relay-pool/src/pool/constants.rs | 2 - crates/nostr-relay-pool/src/pool/inner.rs | 76 ++++++++++++------ crates/nostr-relay-pool/src/pool/mod.rs | 40 +++++++--- crates/nostr-relay-pool/src/relay/error.rs | 3 + crates/nostr-relay-pool/src/relay/inner.rs | 78 +++++++++++-------- crates/nostr-relay-pool/src/relay/mod.rs | 37 ++++++--- crates/nostr-relay-pool/src/shared.rs | 1 + crates/nostr-sdk/src/client/mod.rs | 29 ++++++- crates/nwc/src/lib.rs | 2 +- 17 files changed, 243 insertions(+), 101 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c819a70c4..c2087cacb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,30 @@ --> +## [Unreleased] + +### Summary + +### Breaking changes + +* pool: change `Relay::connect` method signature ([Yuki Kishimoto]) + +### Changed + +### Added + +* pool: add `Relay::try_connect` ([Yuki Kishimoto]) +* pool: add `RelayPool::try_connect` ([Yuki Kishimoto]) +* pool: add `RelayPool::try_connect_relay` ([Yuki Kishimoto]) +* sdk: add `Client::try_connect` ([Yuki Kishimoto]) +* sdk: add `Client::try_connect_relay` ([Yuki Kishimoto]) + +### Fixed + +### Removed + +### Deprecated + ## [v0.38.0] - 2024/12/31 ### Summary diff --git a/bindings/nostr-sdk-ffi/src/client/mod.rs b/bindings/nostr-sdk-ffi/src/client/mod.rs index 808f35ed5..6f0307b98 100644 --- a/bindings/nostr-sdk-ffi/src/client/mod.rs +++ b/bindings/nostr-sdk-ffi/src/client/mod.rs @@ -195,8 +195,8 @@ impl Client { /// /// Try to connect to the relays and wait for them to be connected at most for the specified `timeout`. /// The code continues if the `timeout` is reached or if all relays connect. - pub async fn connect_with_timeout(&self, timeout: Duration) { - self.inner.connect_with_timeout(timeout).await + pub async fn try_connect(&self, timeout: Duration) { + self.inner.try_connect(timeout).await } pub async fn disconnect(&self) -> Result<()> { diff --git a/bindings/nostr-sdk-ffi/src/relay/mod.rs b/bindings/nostr-sdk-ffi/src/relay/mod.rs index 1c89cfd04..d959deb2d 100644 --- a/bindings/nostr-sdk-ffi/src/relay/mod.rs +++ b/bindings/nostr-sdk-ffi/src/relay/mod.rs @@ -200,9 +200,20 @@ impl Relay { // TODO: add notifications - /// Connect to relay and keep alive connection - pub async fn connect(&self, connection_timeout: Option) { - self.inner.connect(connection_timeout).await + /// Connect to relay + /// + /// This method returns immediately and doesn't provide any information on if the connection was successful or not. + pub fn connect(&self) { + self.inner.connect() + } + + /// Try to connect to relay + /// + /// This method returns an error if the connection fails. + /// If the connection fails, + /// a task will continue to retry in the background (unless configured differently in `RelayOptions`. + pub async fn try_connect(&self, timeout: Duration) -> Result<()> { + Ok(self.inner.try_connect(timeout).await?) } /// Disconnect from relay and set status to 'Terminated' diff --git a/bindings/nostr-sdk-js/src/client/mod.rs b/bindings/nostr-sdk-js/src/client/mod.rs index 78a292545..23dfa4fd7 100644 --- a/bindings/nostr-sdk-js/src/client/mod.rs +++ b/bindings/nostr-sdk-js/src/client/mod.rs @@ -229,9 +229,9 @@ impl JsClient { /// Try to connect to the relays and wait for them to be connected at most for the specified `timeout`. /// The code continues if the `timeout` is reached or if all relays connect. #[inline] - #[wasm_bindgen(js_name = connectWithTimeout)] - pub async fn connect_with_timeout(&self, timeout: &JsDuration) { - self.inner.connect_with_timeout(**timeout).await + #[wasm_bindgen(js_name = tryConnect)] + pub async fn try_connect(&self, timeout: &JsDuration) { + self.inner.try_connect(**timeout).await } /// Disconnect from all relays diff --git a/bindings/nostr-sdk-js/src/relay/mod.rs b/bindings/nostr-sdk-js/src/relay/mod.rs index 567c38172..3fd9571de 100644 --- a/bindings/nostr-sdk-js/src/relay/mod.rs +++ b/bindings/nostr-sdk-js/src/relay/mod.rs @@ -158,9 +158,21 @@ impl JsRelay { self.inner.queue() as u64 } - /// Connect to relay and keep alive connection - pub async fn connect(&self, connection_timeout: Option) { - self.inner.connect(connection_timeout.map(|d| *d)).await + /// Connect to relay + /// + /// This method returns immediately and doesn't provide any information on if the connection was successful or not. + pub fn connect(&self) { + self.inner.connect() + } + + /// Try to connect to relay + /// + /// This method returns an error if the connection fails. + /// If the connection fails, + /// a task will continue to retry in the background (unless configured differently in `RelayOptions`. + #[wasm_bindgen(js_name = tryConnect)] + pub async fn try_connect(&self, timeout: &JsDuration) -> Result<()> { + self.inner.try_connect(**timeout).await.map_err(into_err) } /// Disconnect from relay and set status to 'Terminated' diff --git a/crates/nostr-connect/src/client.rs b/crates/nostr-connect/src/client.rs index 7e6deac0d..de9890449 100644 --- a/crates/nostr-connect/src/client.rs +++ b/crates/nostr-connect/src/client.rs @@ -120,7 +120,7 @@ impl NostrConnect { } // Connect to relays - self.pool.connect(None).await; + self.pool.connect().await; // Subscribe let notifications = self.subscribe().await?; diff --git a/crates/nostr-connect/src/signer.rs b/crates/nostr-connect/src/signer.rs index b1c139552..492a3c927 100644 --- a/crates/nostr-connect/src/signer.rs +++ b/crates/nostr-connect/src/signer.rs @@ -6,7 +6,6 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::time::Duration; use nostr::nips::nip46::{Message, Request, ResponseResult}; use nostr_relay_pool::prelude::*; @@ -130,7 +129,7 @@ impl NostrConnectRemoteSigner { } // Connect - self.pool.connect(Some(Duration::from_secs(10))).await; + self.pool.connect().await; let filter = Filter::new() .pubkey(self.keys.signer.public_key()) diff --git a/crates/nostr-relay-pool/examples/pool.rs b/crates/nostr-relay-pool/examples/pool.rs index c8dfdf4f6..0abc9108d 100644 --- a/crates/nostr-relay-pool/examples/pool.rs +++ b/crates/nostr-relay-pool/examples/pool.rs @@ -19,7 +19,7 @@ async fn main() -> Result<()> { pool.add_relay("wss://relay.damus.io", RelayOptions::default()) .await?; - pool.connect(None).await; + pool.connect().await; let event = Event::from_json(r#"{"content":"","created_at":1698412975,"id":"f55c30722f056e330d8a7a6a9ba1522f7522c0f1ced1c93d78ea833c78a3d6ec","kind":3,"pubkey":"f831caf722214748c72db4829986bd0cbb2bb8b3aeade1c959624a52a9629046","sig":"5092a9ffaecdae7d7794706f085ff5852befdf79df424cc3419bb797bf515ae05d4f19404cb8324b8b4380a4bd497763ac7b0f3b1b63ef4d3baa17e5f5901808","tags":[["p","4ddeb9109a8cd29ba279a637f5ec344f2479ee07df1f4043f3fe26d8948cfef9","",""],["p","bb6fd06e156929649a73e6b278af5e648214a69d88943702f1fb627c02179b95","",""],["p","b8b8210f33888fdbf5cedee9edf13c3e9638612698fe6408aff8609059053420","",""],["p","9dcee4fabcd690dc1da9abdba94afebf82e1e7614f4ea92d61d52ef9cd74e083","",""],["p","3eea9e831fefdaa8df35187a204d82edb589a36b170955ac5ca6b88340befaa0","",""],["p","885238ab4568f271b572bf48b9d6f99fa07644731f288259bd395998ee24754e","",""],["p","568a25c71fba591e39bebe309794d5c15d27dbfa7114cacb9f3586ea1314d126","",""]]}"#).unwrap(); pool.send_event(event).await?; diff --git a/crates/nostr-relay-pool/src/pool/constants.rs b/crates/nostr-relay-pool/src/pool/constants.rs index 798fba7aa..f31f7291b 100644 --- a/crates/nostr-relay-pool/src/pool/constants.rs +++ b/crates/nostr-relay-pool/src/pool/constants.rs @@ -4,7 +4,5 @@ //! Constants -pub(super) const MAX_CONNECTING_CHUNK: usize = 100; - /// Relay Pool default notification channel size pub const DEFAULT_NOTIFICATION_CHANNEL_SIZE: usize = 4096; diff --git a/crates/nostr-relay-pool/src/pool/inner.rs b/crates/nostr-relay-pool/src/pool/inner.rs index b75cf80cb..d0f486180 100644 --- a/crates/nostr-relay-pool/src/pool/inner.rs +++ b/crates/nostr-relay-pool/src/pool/inner.rs @@ -4,7 +4,6 @@ //! Relay Pool -use std::cmp; use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -16,7 +15,6 @@ use atomic_destructor::AtomicDestroyer; use nostr_database::prelude::*; use tokio::sync::{broadcast, mpsc, Mutex, RwLock, RwLockReadGuard}; -use super::constants::MAX_CONNECTING_CHUNK; use super::options::RelayPoolOptions; use super::{Error, Output, RelayPoolNotification}; use crate::relay::options::{RelayOptions, ReqExitPolicy, SyncOptions}; @@ -922,34 +920,48 @@ impl InnerRelayPool { Ok(ReceiverStream::new(rx)) } - pub async fn connect(&self, connection_timeout: Option) { + pub async fn connect(&self) { // Lock with read shared access let relays = self.atomic.relays.read().await; + // Connect + for relay in relays.values() { + relay.connect() + } + } + + pub async fn try_connect(&self, timeout: Duration) -> Output<()> { + // Lock with read shared access + let relays = self.atomic.relays.read().await; + + let mut urls: Vec = Vec::with_capacity(relays.len()); let mut futures = Vec::with_capacity(relays.len()); + let mut output: Output<()> = Output::default(); // Filter only relays that can connect and compose futures for relay in relays.values().filter(|r| r.status().can_connect()) { - futures.push(relay.connect(connection_timeout)); + urls.push(relay.url().clone()); + futures.push(relay.try_connect(timeout)); } - // Check number of futures - if futures.len() <= MAX_CONNECTING_CHUNK { - future::join_all(futures).await; - return; - } + // TODO: use semaphore to limit number concurrent connections - tracing::warn!( - "Too many relays ({}). Connecting in chunks of {MAX_CONNECTING_CHUNK} relays...", - futures.len() - ); + // Join futures + let list = future::join_all(futures).await; - // Join in chunks - while !futures.is_empty() { - let upper: usize = cmp::min(MAX_CONNECTING_CHUNK, futures.len()); - let chunk = futures.drain(..upper); - future::join_all(chunk).await; + // Iterate results and compose output + for (url, result) in urls.into_iter().zip(list.into_iter()) { + match result { + Ok(..) => { + output.success.insert(url); + } + Err(e) => { + output.failed.insert(url, e.to_string()); + } + } } + + output } pub async fn disconnect(&self) -> Result<(), Error> { @@ -964,11 +976,7 @@ impl InnerRelayPool { Ok(()) } - pub async fn connect_relay( - &self, - url: U, - connection_timeout: Option, - ) -> Result<(), Error> + pub async fn connect_relay(&self, url: U) -> Result<(), Error> where U: TryIntoUrl, Error: From<::Err>, @@ -983,7 +991,27 @@ impl InnerRelayPool { let relay: &Relay = self.internal_relay(&relays, &url)?; // Connect - relay.connect(connection_timeout).await; + relay.connect(); + + Ok(()) + } + + pub async fn try_connect_relay(&self, url: U, timeout: Duration) -> Result<(), Error> + where + U: TryIntoUrl, + Error: From<::Err>, + { + // Convert url + let url: RelayUrl = url.try_into_url()?; + + // Lock with read shared access + let relays = self.atomic.relays.read().await; + + // Get relay + let relay: &Relay = self.internal_relay(&relays, &url)?; + + // Try to connect + relay.try_connect(timeout).await?; Ok(()) } diff --git a/crates/nostr-relay-pool/src/pool/mod.rs b/crates/nostr-relay-pool/src/pool/mod.rs index f3df7a2df..a88c6ecad 100644 --- a/crates/nostr-relay-pool/src/pool/mod.rs +++ b/crates/nostr-relay-pool/src/pool/mod.rs @@ -251,10 +251,16 @@ impl RelayPool { self.inner.remove_all_relays(true).await } - /// Connect to all added relays and keep connection alive + /// Connect to all added relays #[inline] - pub async fn connect(&self, connection_timeout: Option) { - self.inner.connect(connection_timeout).await + pub async fn connect(&self) { + self.inner.connect().await + } + + /// Connect to all added relays + #[inline] + pub async fn try_connect(&self, timeout: Duration) -> Output<()> { + self.inner.try_connect(timeout).await } /// Disconnect from all relays @@ -263,18 +269,30 @@ impl RelayPool { self.inner.disconnect().await } - /// Connect to relay + /// Connect to a previously added relay + /// + /// This method doesn't provide any information on if the connection was successful or not. + /// + /// Return [`Error::RelayNotFound`] if the relay doesn't exist in the pool. #[inline] - pub async fn connect_relay( - &self, - url: U, - connection_timeout: Option, - ) -> Result<(), Error> + pub async fn connect_relay(&self, url: U) -> Result<(), Error> + where + U: TryIntoUrl, + Error: From<::Err>, + { + self.inner.connect_relay(url).await + } + + /// Try to connect to a previously added relay + /// + /// This method returns an error if the connection fails. + #[inline] + pub async fn try_connect_relay(&self, url: U, timeout: Duration) -> Result<(), Error> where U: TryIntoUrl, Error: From<::Err>, { - self.inner.connect_relay(url, connection_timeout).await + self.inner.try_connect_relay(url, timeout).await } /// Disconnect relay @@ -633,7 +651,7 @@ mod tests { pool.add_relay(&url, RelayOptions::default()).await.unwrap(); - pool.connect(None).await; + pool.connect().await; assert!(!pool.inner.is_shutdown()); diff --git a/crates/nostr-relay-pool/src/relay/error.rs b/crates/nostr-relay-pool/src/relay/error.rs index 6f030c460..377a4ff93 100644 --- a/crates/nostr-relay-pool/src/relay/error.rs +++ b/crates/nostr-relay-pool/src/relay/error.rs @@ -59,6 +59,8 @@ pub enum Error { NotReady, /// Relay not connected NotConnected, + /// Connection failed + ConnectionFailed, /// Received shutdown ReceivedShutdown, /// Relay message @@ -145,6 +147,7 @@ impl fmt::Display for Error { } Self::NotReady => write!(f, "relay is initialized but not ready"), Self::NotConnected => write!(f, "relay not connected"), + Self::ConnectionFailed => write!(f, "connection failed"), Self::ReceivedShutdown => write!(f, "received shutdown"), Self::RelayMessage(message) => write!(f, "{message}"), Self::BatchMessagesEmpty => write!(f, "can't batch empty list of messages"), diff --git a/crates/nostr-relay-pool/src/relay/inner.rs b/crates/nostr-relay-pool/src/relay/inner.rs index f4fea8091..7122e9284 100644 --- a/crates/nostr-relay-pool/src/relay/inner.rs +++ b/crates/nostr-relay-pool/src/relay/inner.rs @@ -432,42 +432,58 @@ impl InnerRelay { } } - pub async fn connect(&self, connection_timeout: Option) { - // Return if relay can't connect - if !self.status().can_connect() { - return; - } - + fn _connect(&self, timeout: Duration) { // Update status // Change it to pending to avoid issues with the health check (initialized check) self.set_status(RelayStatus::Pending, false); - // If connection timeout is `Some`, try to connect waiting for connection - match connection_timeout { - Some(timeout) => { - let mut notifications = self.internal_notification_sender.subscribe(); + // Spawn connection task + self.spawn_and_try_connect(timeout); + } + + pub fn connect(&self) { + if self.status().can_connect() { + self._connect(DEFAULT_CONNECTION_TIMEOUT); + } + } - // Spawn and try connect - self.spawn_and_try_connect(timeout); + pub async fn try_connect(&self, timeout: Duration) -> Result<(), Error> { + // Check if relay can't connect + if !self.status().can_connect() { + // TODO: should return `Error::AlreadyConnected`? + return Ok(()); + } - // Wait for status change (connected or disconnected) - tracing::debug!(url = %self.url, "Waiting for status change before continue"); - while let Ok(notification) = notifications.recv().await { - if let RelayNotification::RelayStatus { - status: RelayStatus::Connected | RelayStatus::Disconnected, - } = notification - { - break; + // Subscribe to notifications + let mut notifications = self.internal_notification_sender.subscribe(); + + // Connect + self._connect(timeout); + + // Wait for status change + while let Ok(notification) = notifications.recv().await { + if let RelayNotification::RelayStatus { status } = notification { + match status { + // This status shouldn't happen, break the loop + RelayStatus::Initialized => break, + // Waiting for connection, stay in the loop + RelayStatus::Pending | RelayStatus::Connecting => continue, + // Success + RelayStatus::Connected => return Ok(()), + // Failed + RelayStatus::Disconnected | RelayStatus::Terminated => { + return Err(Error::ConnectionFailed) } } } - None => { - self.spawn_and_try_connect(DEFAULT_CONNECTION_TIMEOUT); - } } + + // TODO: return the websocket error instead of just `ConnectionFailed` + + Err(Error::PrematureExit) } - fn spawn_and_try_connect(&self, connection_timeout: Duration) { + fn spawn_and_try_connect(&self, timeout: Duration) { if self.is_running() { tracing::warn!(url = %self.url, "Connection task is already running."); return; @@ -482,7 +498,7 @@ impl InnerRelay { let mut rx_service = relay.atomic.channels.rx_service().await; // Last websocket error - // Store it to avoid to print every time the same connection error + // Store it to avoid printing every time the same connection error let mut last_ws_error = None; // Auto-connect loop @@ -493,7 +509,7 @@ impl InnerRelay { tokio::select! { // Connect and run message handler - _ = relay.connect_and_run(connection_timeout, &mut last_ws_error) => {}, + _ = relay.connect_and_run(timeout, &mut last_ws_error) => {}, // Handle "terminate" message _ = relay.handle_terminate(&mut rx_service) => { // Update status @@ -557,7 +573,7 @@ impl InnerRelay { /// Depending on attempts and success, use default or incremental retry interval fn calculate_retry_interval(&self) -> Duration { - // Check if incremental interval is enabled + // Check if the incremental interval is enabled if self.opts.adjust_retry_interval { // Calculate the difference between attempts and success let diff: u32 = self.stats.attempts().saturating_sub(self.stats.success()) as u32; @@ -609,11 +625,7 @@ impl InnerRelay { } /// Connect and run message handler - async fn connect_and_run( - &self, - connection_timeout: Duration, - last_ws_error: &mut Option, - ) { + async fn connect_and_run(&self, timeout: Duration, last_ws_error: &mut Option) { // Update status self.set_status(RelayStatus::Connecting, true); @@ -626,7 +638,7 @@ impl InnerRelay { DEFAULT_CONNECTION_TIMEOUT } else { // First attempt, use external timeout - connection_timeout + timeout }; // Connect diff --git a/crates/nostr-relay-pool/src/relay/mod.rs b/crates/nostr-relay-pool/src/relay/mod.rs index 28cd72dd5..886c671c8 100644 --- a/crates/nostr-relay-pool/src/relay/mod.rs +++ b/crates/nostr-relay-pool/src/relay/mod.rs @@ -254,10 +254,22 @@ impl Relay { self.inner.internal_notification_sender.subscribe() } - /// Connect to relay and keep alive connection + /// Connect to relay + /// + /// This method returns immediately and doesn't provide any information on if the connection was successful or not. + #[inline] + pub fn connect(&self) { + self.inner.connect() + } + + /// Try to connect to relay + /// + /// This method returns an error if the connection fails. + /// If the connection fails, + /// a task will continue to retry in the background (unless configured differently in [`RelayOptions`]. #[inline] - pub async fn connect(&self, connection_timeout: Option) { - self.inner.connect(connection_timeout).await + pub async fn try_connect(&self, timeout: Duration) -> Result<(), Error> { + self.inner.try_connect(timeout).await } /// Disconnect from relay and set status to 'Terminated' @@ -443,7 +455,7 @@ mod tests { let relay = Relay::new(url); - relay.connect(Some(Duration::from_millis(100))).await; + relay.try_connect(Duration::from_millis(100)).await.unwrap(); let keys = Keys::generate(); let event = EventBuilder::text_note("Test") @@ -462,7 +474,7 @@ mod tests { assert_eq!(relay.status(), RelayStatus::Initialized); - relay.connect(Some(Duration::from_millis(100))).await; + relay.try_connect(Duration::from_millis(100)).await.unwrap(); assert_eq!(relay.status(), RelayStatus::Connected); @@ -485,7 +497,7 @@ mod tests { assert_eq!(relay.status(), RelayStatus::Initialized); - relay.connect(Some(Duration::from_millis(100))).await; + relay.try_connect(Duration::from_millis(100)).await.unwrap(); assert_eq!(relay.status(), RelayStatus::Connected); @@ -508,7 +520,7 @@ mod tests { assert_eq!(relay.status(), RelayStatus::Initialized); - relay.connect(Some(Duration::from_millis(100))).await; + relay.try_connect(Duration::from_millis(100)).await.unwrap(); assert_eq!(relay.status(), RelayStatus::Connected); @@ -532,7 +544,8 @@ mod tests { assert_eq!(relay.status(), RelayStatus::Initialized); - relay.connect(Some(Duration::from_millis(100))).await; + let res = relay.try_connect(Duration::from_millis(100)).await; + assert!(matches!(res.unwrap_err(), Error::ConnectionFailed)); assert!(relay.inner.is_running()); @@ -562,7 +575,7 @@ mod tests { assert_eq!(relay.status(), RelayStatus::Initialized); - relay.connect(None).await; + relay.connect(); time::sleep(Duration::from_secs(1)).await; @@ -594,7 +607,7 @@ mod tests { assert_eq!(relay.status(), RelayStatus::Initialized); - relay.connect(None).await; + relay.connect(); time::sleep(Duration::from_secs(1)).await; @@ -623,7 +636,7 @@ mod tests { relay.inner.state.automatic_authentication(true); - relay.connect(Some(Duration::from_millis(100))).await; + relay.connect(); // Signer let keys = Keys::generate(); @@ -664,7 +677,7 @@ mod tests { let relay = Relay::new(url); - relay.connect(Some(Duration::from_millis(100))).await; + relay.connect(); // Signer let keys = Keys::generate(); diff --git a/crates/nostr-relay-pool/src/shared.rs b/crates/nostr-relay-pool/src/shared.rs index 587a8b587..85f73683f 100644 --- a/crates/nostr-relay-pool/src/shared.rs +++ b/crates/nostr-relay-pool/src/shared.rs @@ -36,6 +36,7 @@ pub struct SharedState { nip42_auto_authentication: Arc, min_pow_difficulty: Arc, pub(crate) filtering: RelayFiltering, + // TODO: add a semaphore to limit number of concurrent websocket connections attempts? } impl Default for SharedState { diff --git a/crates/nostr-sdk/src/client/mod.rs b/crates/nostr-sdk/src/client/mod.rs index 2b42f7221..226120897 100644 --- a/crates/nostr-sdk/src/client/mod.rs +++ b/crates/nostr-sdk/src/client/mod.rs @@ -493,13 +493,27 @@ impl Client { } /// Connect to a previously added relay + /// + /// Check [`RelayPool::connect_relay`] docs to learn more. #[inline] pub async fn connect_relay(&self, url: U) -> Result<(), Error> where U: TryIntoUrl, pool::Error: From<::Err>, { - Ok(self.pool.connect_relay(url, None).await?) + Ok(self.pool.connect_relay(url).await?) + } + + /// Try to connect to a previously added relay + /// + /// Check [`RelayPool::try_connect_relay`] docs to learn more. + #[inline] + pub async fn try_connect_relay(&self, url: U, timeout: Duration) -> Result<(), Error> + where + U: TryIntoUrl, + pool::Error: From<::Err>, + { + Ok(self.pool.try_connect_relay(url, timeout).await?) } /// Disconnect relay @@ -515,7 +529,7 @@ impl Client { /// Connect to all added relays #[inline] pub async fn connect(&self) { - self.pool.connect(None).await; + self.pool.connect().await; } /// Connect to all added relays @@ -523,8 +537,17 @@ impl Client { /// Try to connect to the relays and wait for them to be connected at most for the specified `timeout`. /// The code continues if the `timeout` is reached or if all relays connect. #[inline] + pub async fn try_connect(&self, timeout: Duration) -> Output<()> { + self.pool.try_connect(timeout).await + } + + /// Connect to all added relays + /// + /// Try to connect to the relays and wait for them to be connected at most for the specified `timeout`. + /// The code continues if the `timeout` is reached or if all relays connect. + #[deprecated(since = "0.39.0", note = "Use `try_connect` instead")] pub async fn connect_with_timeout(&self, timeout: Duration) { - self.pool.connect(Some(timeout)).await + self.pool.try_connect(timeout).await; } /// Disconnect from all relays diff --git a/crates/nwc/src/lib.rs b/crates/nwc/src/lib.rs index 6b6830059..fecdb6d23 100644 --- a/crates/nwc/src/lib.rs +++ b/crates/nwc/src/lib.rs @@ -73,7 +73,7 @@ impl NWC { } // Connect - self.relay.connect(None).await; + self.relay.connect(); let filter = Filter::new() .author(self.uri.public_key)