From 72a428c4feb2cccd2c0aad2797a9d5ed784c19d3 Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Thu, 2 Jan 2025 11:16:07 +0100 Subject: [PATCH] pool: add `wait_for_connection` methods * Add `Relay::wait_for_connection` * Add `RelayPool::wait_for_connection` * Add `Client::wait_for_connection` Signed-off-by: Yuki Kishimoto --- CHANGELOG.md | 3 ++ bindings/nostr-sdk-ffi/src/client/mod.rs | 8 ++++++ bindings/nostr-sdk-js/src/client/mod.rs | 9 ++++++ crates/nostr-relay-pool/src/pool/inner.rs | 14 +++++++++ crates/nostr-relay-pool/src/pool/mod.rs | 9 ++++++ crates/nostr-relay-pool/src/relay/inner.rs | 26 +++++++++++++++++ crates/nostr-relay-pool/src/relay/mod.rs | 33 ++++++++++++++++++++++ crates/nostr-sdk/src/client/mod.rs | 14 ++++++++- 8 files changed, 115 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e79aa699d..6c630914a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,10 +40,13 @@ ### Added * pool: add `Relay::try_connect` ([Yuki Kishimoto]) +* pool: add `Relay::wait_for_connection` ([Yuki Kishimoto]) * pool: add `RelayPool::try_connect` ([Yuki Kishimoto]) * pool: add `RelayPool::try_connect_relay` ([Yuki Kishimoto]) +* pool: add `RelayPool::wait_for_connection` ([Yuki Kishimoto]) * sdk: add `Client::try_connect` ([Yuki Kishimoto]) * sdk: add `Client::try_connect_relay` ([Yuki Kishimoto]) +* sdk: add `Client::wait_for_connection` ([Yuki Kishimoto]) ### Fixed diff --git a/bindings/nostr-sdk-ffi/src/client/mod.rs b/bindings/nostr-sdk-ffi/src/client/mod.rs index 4bd6df211..578369308 100644 --- a/bindings/nostr-sdk-ffi/src/client/mod.rs +++ b/bindings/nostr-sdk-ffi/src/client/mod.rs @@ -191,6 +191,14 @@ impl Client { self.inner.connect().await } + /// Waits for relays connections + /// + /// Wait for relays connections at most for the specified `timeout`. + /// The code continues when the relays are connected or the `timeout` is reached. + pub async fn wait_for_connection(&self, timeout: Duration) { + self.inner.wait_for_connection(timeout).await + } + /// Try to establish a connection with the relays. /// /// Attempts to establish a connection without spawning the connection task if it fails. diff --git a/bindings/nostr-sdk-js/src/client/mod.rs b/bindings/nostr-sdk-js/src/client/mod.rs index 4c70649eb..e93da31dd 100644 --- a/bindings/nostr-sdk-js/src/client/mod.rs +++ b/bindings/nostr-sdk-js/src/client/mod.rs @@ -224,6 +224,15 @@ impl JsClient { self.inner.connect().await } + /// Waits for relays connections + /// + /// Wait for relays connections at most for the specified `timeout`. + /// The code continues when the relays are connected or the `timeout` is reached. + #[wasm_bindgen(js_name = waitForConnection)] + pub async fn wait_for_connection(&self, timeout: &JsDuration) { + self.inner.wait_for_connection(**timeout).await + } + /// Try to establish a connection with the relays. /// /// Attempts to establish a connection without spawning the connection task if it fails. diff --git a/crates/nostr-relay-pool/src/pool/inner.rs b/crates/nostr-relay-pool/src/pool/inner.rs index 2651ab89f..44755a2aa 100644 --- a/crates/nostr-relay-pool/src/pool/inner.rs +++ b/crates/nostr-relay-pool/src/pool/inner.rs @@ -930,6 +930,20 @@ impl InnerRelayPool { } } + pub async fn wait_for_connection(&self, timeout: Duration) { + // Lock with read shared access + let relays = self.atomic.relays.read().await; + + // Compose futures + let mut futures = Vec::with_capacity(relays.len()); + for relay in relays.values() { + futures.push(relay.wait_for_connection(timeout)); + } + + // Join futures + future::join_all(futures).await; + } + pub async fn try_connect(&self, timeout: Duration) -> Output<()> { // Lock with read shared access let relays = self.atomic.relays.read().await; diff --git a/crates/nostr-relay-pool/src/pool/mod.rs b/crates/nostr-relay-pool/src/pool/mod.rs index 8a37358c4..ef2a3d1a1 100644 --- a/crates/nostr-relay-pool/src/pool/mod.rs +++ b/crates/nostr-relay-pool/src/pool/mod.rs @@ -257,6 +257,15 @@ impl RelayPool { self.inner.connect().await } + /// Waits for relays connections + /// + /// Wait for relays connections at most for the specified `timeout`. + /// The code continues when the relays are connected or the `timeout` is reached. + #[inline] + pub async fn wait_for_connection(&self, timeout: Duration) { + self.inner.wait_for_connection(timeout).await + } + /// Try to establish a connection with the relays. /// /// Attempts to establish a connection without spawning the connection task if it fails. diff --git a/crates/nostr-relay-pool/src/relay/inner.rs b/crates/nostr-relay-pool/src/relay/inner.rs index ed1f81d2d..89d7caf83 100644 --- a/crates/nostr-relay-pool/src/relay/inner.rs +++ b/crates/nostr-relay-pool/src/relay/inner.rs @@ -447,6 +447,32 @@ impl InnerRelay { self.spawn_and_try_connect(None); } + pub async fn wait_for_connection(&self, timeout: Duration) { + let status: RelayStatus = self.status(); + + // Already connected + if status.is_connected() { + return; + } + + // Subscribe to notifications + let mut notifications = self.internal_notification_sender.subscribe(); + + // Set timeout + time::timeout(Some(timeout), async { + while let Ok(notification) = notifications.recv().await { + // Wait for status change. Break loop when connect. + if let RelayNotification::RelayStatus { + status: RelayStatus::Connected, + } = notification + { + break; + } + } + }) + .await; + } + pub async fn try_connect(&self, timeout: Duration) -> Result<(), Error> { // Check if relay can't connect if !self.status().can_connect() { diff --git a/crates/nostr-relay-pool/src/relay/mod.rs b/crates/nostr-relay-pool/src/relay/mod.rs index 3cc6fc6f2..9b41523ff 100644 --- a/crates/nostr-relay-pool/src/relay/mod.rs +++ b/crates/nostr-relay-pool/src/relay/mod.rs @@ -262,6 +262,15 @@ impl Relay { self.inner.connect() } + /// Waits for relay connection + /// + /// Wait for relay connection at most for the specified `timeout`. + /// The code continues when the relay is connected or the `timeout` is reached. + #[inline] + pub async fn wait_for_connection(&self, timeout: Duration) { + self.inner.wait_for_connection(timeout).await + } + /// Try to establish a connection with the relay. /// /// Attempts to establish a connection without spawning the connection task if it fails. @@ -644,6 +653,30 @@ mod tests { assert!(!relay.inner.is_running()); } + #[tokio::test] + async fn test_wait_for_connection() { + // Mock relay + let opts = RelayTestOptions { + unresponsive_connection: Some(Duration::from_secs(2)), + }; + let mock = MockRelay::run_with_opts(opts).await.unwrap(); + let url = RelayUrl::parse(&mock.url()).unwrap(); + + let relay = Relay::new(url); + + assert_eq!(relay.status(), RelayStatus::Initialized); + + relay.connect(); + + relay.wait_for_connection(Duration::from_millis(500)).await; // This timeout + + assert_eq!(relay.status(), RelayStatus::Connecting); + + relay.wait_for_connection(Duration::from_secs(3)).await; + + assert_eq!(relay.status(), RelayStatus::Connected); + } + #[tokio::test] async fn test_nip42_send_event() { // Mock relay diff --git a/crates/nostr-sdk/src/client/mod.rs b/crates/nostr-sdk/src/client/mod.rs index 60d264f54..e0a247fa6 100644 --- a/crates/nostr-sdk/src/client/mod.rs +++ b/crates/nostr-sdk/src/client/mod.rs @@ -532,6 +532,15 @@ impl Client { self.pool.connect().await; } + /// Waits for relays connections + /// + /// Wait for relays connections at most for the specified `timeout`. + /// The code continues when the relays are connected or the `timeout` is reached. + #[inline] + pub async fn wait_for_connection(&self, timeout: Duration) { + self.pool.wait_for_connection(timeout).await + } + /// Try to establish a connection with the relays. /// /// Attempts to establish a connection without spawning the connection task if it fails. @@ -549,7 +558,10 @@ impl Client { /// /// Try to connect to the relays and wait for them to be connected at most for the specified `timeout`. /// The code continues if the `timeout` is reached or if all relays connect. - #[deprecated(since = "0.39.0", note = "Use `try_connect` instead")] + #[deprecated( + since = "0.39.0", + note = "Use `connect` + `wait_for_connection` instead." + )] pub async fn connect_with_timeout(&self, timeout: Duration) { self.pool.try_connect(timeout).await; }