Skip to content

Commit

Permalink
pool: add wait_for_connection methods
Browse files Browse the repository at this point in the history
* Add `Relay::wait_for_connection`
* Add `RelayPool::wait_for_connection`
* Add `Client::wait_for_connection`

Signed-off-by: Yuki Kishimoto <yukikishimoto@protonmail.com>
  • Loading branch information
yukibtc committed Jan 2, 2025
1 parent 1db1245 commit 72a428c
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 1 deletion.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@
### Added

* pool: add `Relay::try_connect` ([Yuki Kishimoto])
* pool: add `Relay::wait_for_connection` ([Yuki Kishimoto])
* pool: add `RelayPool::try_connect` ([Yuki Kishimoto])
* pool: add `RelayPool::try_connect_relay` ([Yuki Kishimoto])
* pool: add `RelayPool::wait_for_connection` ([Yuki Kishimoto])
* sdk: add `Client::try_connect` ([Yuki Kishimoto])
* sdk: add `Client::try_connect_relay` ([Yuki Kishimoto])
* sdk: add `Client::wait_for_connection` ([Yuki Kishimoto])

### Fixed

Expand Down
8 changes: 8 additions & 0 deletions bindings/nostr-sdk-ffi/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,14 @@ impl Client {
self.inner.connect().await
}

/// Waits for relays connections
///
/// Wait for relays connections at most for the specified `timeout`.
/// The code continues when the relays are connected or the `timeout` is reached.
pub async fn wait_for_connection(&self, timeout: Duration) {
self.inner.wait_for_connection(timeout).await
}

/// Try to establish a connection with the relays.
///
/// Attempts to establish a connection without spawning the connection task if it fails.
Expand Down
9 changes: 9 additions & 0 deletions bindings/nostr-sdk-js/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,15 @@ impl JsClient {
self.inner.connect().await
}

/// Waits for relays connections
///
/// Wait for relays connections at most for the specified `timeout`.
/// The code continues when the relays are connected or the `timeout` is reached.
#[wasm_bindgen(js_name = waitForConnection)]
pub async fn wait_for_connection(&self, timeout: &JsDuration) {
self.inner.wait_for_connection(**timeout).await
}

/// Try to establish a connection with the relays.
///
/// Attempts to establish a connection without spawning the connection task if it fails.
Expand Down
14 changes: 14 additions & 0 deletions crates/nostr-relay-pool/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,20 @@ impl InnerRelayPool {
}
}

pub async fn wait_for_connection(&self, timeout: Duration) {
// Lock with read shared access
let relays = self.atomic.relays.read().await;

// Compose futures
let mut futures = Vec::with_capacity(relays.len());
for relay in relays.values() {
futures.push(relay.wait_for_connection(timeout));
}

// Join futures
future::join_all(futures).await;
}

pub async fn try_connect(&self, timeout: Duration) -> Output<()> {
// Lock with read shared access
let relays = self.atomic.relays.read().await;
Expand Down
9 changes: 9 additions & 0 deletions crates/nostr-relay-pool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,15 @@ impl RelayPool {
self.inner.connect().await
}

/// Waits for relays connections
///
/// Wait for relays connections at most for the specified `timeout`.
/// The code continues when the relays are connected or the `timeout` is reached.
#[inline]
pub async fn wait_for_connection(&self, timeout: Duration) {
self.inner.wait_for_connection(timeout).await
}

/// Try to establish a connection with the relays.
///
/// Attempts to establish a connection without spawning the connection task if it fails.
Expand Down
26 changes: 26 additions & 0 deletions crates/nostr-relay-pool/src/relay/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,32 @@ impl InnerRelay {
self.spawn_and_try_connect(None);
}

pub async fn wait_for_connection(&self, timeout: Duration) {
let status: RelayStatus = self.status();

// Already connected
if status.is_connected() {
return;
}

// Subscribe to notifications
let mut notifications = self.internal_notification_sender.subscribe();

// Set timeout
time::timeout(Some(timeout), async {
while let Ok(notification) = notifications.recv().await {
// Wait for status change. Break loop when connect.
if let RelayNotification::RelayStatus {
status: RelayStatus::Connected,
} = notification
{
break;
}
}
})
.await;
}

pub async fn try_connect(&self, timeout: Duration) -> Result<(), Error> {
// Check if relay can't connect
if !self.status().can_connect() {
Expand Down
33 changes: 33 additions & 0 deletions crates/nostr-relay-pool/src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,15 @@ impl Relay {
self.inner.connect()
}

/// Waits for relay connection
///
/// Wait for relay connection at most for the specified `timeout`.
/// The code continues when the relay is connected or the `timeout` is reached.
#[inline]
pub async fn wait_for_connection(&self, timeout: Duration) {
self.inner.wait_for_connection(timeout).await
}

/// Try to establish a connection with the relay.
///
/// Attempts to establish a connection without spawning the connection task if it fails.
Expand Down Expand Up @@ -644,6 +653,30 @@ mod tests {
assert!(!relay.inner.is_running());
}

#[tokio::test]
async fn test_wait_for_connection() {
// Mock relay
let opts = RelayTestOptions {
unresponsive_connection: Some(Duration::from_secs(2)),
};
let mock = MockRelay::run_with_opts(opts).await.unwrap();
let url = RelayUrl::parse(&mock.url()).unwrap();

let relay = Relay::new(url);

assert_eq!(relay.status(), RelayStatus::Initialized);

relay.connect();

relay.wait_for_connection(Duration::from_millis(500)).await; // This timeout

assert_eq!(relay.status(), RelayStatus::Connecting);

relay.wait_for_connection(Duration::from_secs(3)).await;

assert_eq!(relay.status(), RelayStatus::Connected);
}

#[tokio::test]
async fn test_nip42_send_event() {
// Mock relay
Expand Down
14 changes: 13 additions & 1 deletion crates/nostr-sdk/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,15 @@ impl Client {
self.pool.connect().await;
}

/// Waits for relays connections
///
/// Wait for relays connections at most for the specified `timeout`.
/// The code continues when the relays are connected or the `timeout` is reached.
#[inline]
pub async fn wait_for_connection(&self, timeout: Duration) {
self.pool.wait_for_connection(timeout).await
}

/// Try to establish a connection with the relays.
///
/// Attempts to establish a connection without spawning the connection task if it fails.
Expand All @@ -549,7 +558,10 @@ impl Client {
///
/// Try to connect to the relays and wait for them to be connected at most for the specified `timeout`.
/// The code continues if the `timeout` is reached or if all relays connect.
#[deprecated(since = "0.39.0", note = "Use `try_connect` instead")]
#[deprecated(
since = "0.39.0",
note = "Use `connect` + `wait_for_connection` instead."
)]
pub async fn connect_with_timeout(&self, timeout: Duration) {
self.pool.try_connect(timeout).await;
}
Expand Down

0 comments on commit 72a428c

Please sign in to comment.