Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add try_connect methods #689

Merged
merged 3 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,35 @@

-->

## [Unreleased]

### Summary

### Breaking changes

* pool: change `Relay::connect` method signature ([Yuki Kishimoto])

### Changed

* pool: update `Error::WebSocket` variant inner type ([Yuki Kishimoto])

### Added

* pool: add `Relay::try_connect` ([Yuki Kishimoto])
* pool: add `Relay::wait_for_connection` ([Yuki Kishimoto])
* pool: add `RelayPool::try_connect` ([Yuki Kishimoto])
* pool: add `RelayPool::try_connect_relay` ([Yuki Kishimoto])
* pool: add `RelayPool::wait_for_connection` ([Yuki Kishimoto])
* sdk: add `Client::try_connect` ([Yuki Kishimoto])
* sdk: add `Client::try_connect_relay` ([Yuki Kishimoto])
* sdk: add `Client::wait_for_connection` ([Yuki Kishimoto])

### Fixed

### Removed

### Deprecated

## [v0.38.0] - 2024/12/31

### Summary
Expand Down
20 changes: 15 additions & 5 deletions bindings/nostr-sdk-ffi/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,22 @@ impl Client {
self.inner.connect().await
}

/// Connect to all added relays
/// Waits for relays connections
///
/// Wait for relays connections at most for the specified `timeout`.
/// The code continues when the relays are connected or the `timeout` is reached.
pub async fn wait_for_connection(&self, timeout: Duration) {
self.inner.wait_for_connection(timeout).await
}

/// Try to establish a connection with the relays.
///
/// Try to connect to the relays and wait for them to be connected at most for the specified `timeout`.
/// The code continues if the `timeout` is reached or if all relays connect.
pub async fn connect_with_timeout(&self, timeout: Duration) {
self.inner.connect_with_timeout(timeout).await
/// Attempts to establish a connection without spawning the connection task if it fails.
/// This means that if the connection fails, no automatic retries are scheduled.
/// Use [`Client::connect`] if you want to immediately spawn a connection task,
/// regardless of whether the initial connection succeeds.
pub async fn try_connect(&self, timeout: Duration) -> Output {
self.inner.try_connect(timeout).await.into()
}

pub async fn disconnect(&self) -> Result<()> {
Expand Down
17 changes: 14 additions & 3 deletions bindings/nostr-sdk-ffi/src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,20 @@ impl Relay {

// TODO: add notifications

/// Connect to relay and keep alive connection
pub async fn connect(&self, connection_timeout: Option<Duration>) {
self.inner.connect(connection_timeout).await
/// Connect to relay
///
/// This method returns immediately and doesn't provide any information on if the connection was successful or not.
pub fn connect(&self) {
self.inner.connect()
}

/// Try to connect to relay
///
/// This method returns an error if the connection fails.
/// If the connection fails,
/// a task will continue to retry in the background (unless configured differently in `RelayOptions`.
pub async fn try_connect(&self, timeout: Duration) -> Result<()> {
Ok(self.inner.try_connect(timeout).await?)
}

/// Disconnect from relay and set status to 'Terminated'
Expand Down
24 changes: 17 additions & 7 deletions bindings/nostr-sdk-js/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,24 @@ impl JsClient {
self.inner.connect().await
}

/// Connect to all added relays
/// Waits for relays connections
///
/// Try to connect to the relays and wait for them to be connected at most for the specified `timeout`.
/// The code continues if the `timeout` is reached or if all relays connect.
#[inline]
#[wasm_bindgen(js_name = connectWithTimeout)]
pub async fn connect_with_timeout(&self, timeout: &JsDuration) {
self.inner.connect_with_timeout(**timeout).await
/// Wait for relays connections at most for the specified `timeout`.
/// The code continues when the relays are connected or the `timeout` is reached.
#[wasm_bindgen(js_name = waitForConnection)]
pub async fn wait_for_connection(&self, timeout: &JsDuration) {
self.inner.wait_for_connection(**timeout).await
}

/// Try to establish a connection with the relays.
///
/// Attempts to establish a connection without spawning the connection task if it fails.
/// This means that if the connection fails, no automatic retries are scheduled.
/// Use [`Client::connect`] if you want to immediately spawn a connection task,
/// regardless of whether the initial connection succeeds.
#[wasm_bindgen(js_name = tryConnect)]
pub async fn try_connect(&self, timeout: &JsDuration) -> JsOutput {
self.inner.try_connect(**timeout).await.into()
}

/// Disconnect from all relays
Expand Down
18 changes: 15 additions & 3 deletions bindings/nostr-sdk-js/src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,21 @@ impl JsRelay {
self.inner.queue() as u64
}

/// Connect to relay and keep alive connection
pub async fn connect(&self, connection_timeout: Option<JsDuration>) {
self.inner.connect(connection_timeout.map(|d| *d)).await
/// Connect to relay
///
/// This method returns immediately and doesn't provide any information on if the connection was successful or not.
pub fn connect(&self) {
self.inner.connect()
}

/// Try to connect to relay
///
/// This method returns an error if the connection fails.
/// If the connection fails,
/// a task will continue to retry in the background (unless configured differently in `RelayOptions`.
#[wasm_bindgen(js_name = tryConnect)]
pub async fn try_connect(&self, timeout: &JsDuration) -> Result<()> {
self.inner.try_connect(**timeout).await.map_err(into_err)
}

/// Disconnect from relay and set status to 'Terminated'
Expand Down
2 changes: 1 addition & 1 deletion crates/nostr-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ async fn handle_command(command: ShellCommand, client: &Client) -> Result<()> {
println!("Connecting to relays...");

// Connect and wait for connection
client.connect_with_timeout(Duration::from_secs(60)).await;
client.try_connect(Duration::from_secs(60)).await;

relays.clone()
} else {
Expand Down
2 changes: 1 addition & 1 deletion crates/nostr-connect/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl NostrConnect {
}

// Connect to relays
self.pool.connect(None).await;
self.pool.connect().await;

// Subscribe
let notifications = self.subscribe().await?;
Expand Down
3 changes: 1 addition & 2 deletions crates/nostr-connect/src/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

use nostr::nips::nip46::{Message, Request, ResponseResult};
use nostr_relay_pool::prelude::*;
Expand Down Expand Up @@ -130,7 +129,7 @@ impl NostrConnectRemoteSigner {
}

// Connect
self.pool.connect(Some(Duration::from_secs(10))).await;
self.pool.connect().await;

let filter = Filter::new()
.pubkey(self.keys.signer.public_key())
Expand Down
2 changes: 1 addition & 1 deletion crates/nostr-relay-pool/examples/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async fn main() -> Result<()> {

pool.add_relay("wss://relay.damus.io", RelayOptions::default())
.await?;
pool.connect(None).await;
pool.connect().await;

let event = Event::from_json(r#"{"content":"","created_at":1698412975,"id":"f55c30722f056e330d8a7a6a9ba1522f7522c0f1ced1c93d78ea833c78a3d6ec","kind":3,"pubkey":"f831caf722214748c72db4829986bd0cbb2bb8b3aeade1c959624a52a9629046","sig":"5092a9ffaecdae7d7794706f085ff5852befdf79df424cc3419bb797bf515ae05d4f19404cb8324b8b4380a4bd497763ac7b0f3b1b63ef4d3baa17e5f5901808","tags":[["p","4ddeb9109a8cd29ba279a637f5ec344f2479ee07df1f4043f3fe26d8948cfef9","",""],["p","bb6fd06e156929649a73e6b278af5e648214a69d88943702f1fb627c02179b95","",""],["p","b8b8210f33888fdbf5cedee9edf13c3e9638612698fe6408aff8609059053420","",""],["p","9dcee4fabcd690dc1da9abdba94afebf82e1e7614f4ea92d61d52ef9cd74e083","",""],["p","3eea9e831fefdaa8df35187a204d82edb589a36b170955ac5ca6b88340befaa0","",""],["p","885238ab4568f271b572bf48b9d6f99fa07644731f288259bd395998ee24754e","",""],["p","568a25c71fba591e39bebe309794d5c15d27dbfa7114cacb9f3586ea1314d126","",""]]}"#).unwrap();
pool.send_event(event).await?;
Expand Down
2 changes: 0 additions & 2 deletions crates/nostr-relay-pool/src/pool/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,5 @@

//! Constants

pub(super) const MAX_CONNECTING_CHUNK: usize = 100;

/// Relay Pool default notification channel size
pub const DEFAULT_NOTIFICATION_CHANNEL_SIZE: usize = 4096;
90 changes: 66 additions & 24 deletions crates/nostr-relay-pool/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

//! Relay Pool

use std::cmp;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand All @@ -16,7 +15,6 @@ use atomic_destructor::AtomicDestroyer;
use nostr_database::prelude::*;
use tokio::sync::{broadcast, mpsc, Mutex, RwLock, RwLockReadGuard};

use super::constants::MAX_CONNECTING_CHUNK;
use super::options::RelayPoolOptions;
use super::{Error, Output, RelayPoolNotification};
use crate::relay::options::{RelayOptions, ReqExitPolicy, SyncOptions};
Expand Down Expand Up @@ -922,34 +920,62 @@ impl InnerRelayPool {
Ok(ReceiverStream::new(rx))
}

pub async fn connect(&self, connection_timeout: Option<Duration>) {
pub async fn connect(&self) {
// Lock with read shared access
let relays = self.atomic.relays.read().await;

// Connect
for relay in relays.values() {
relay.connect()
}
}

pub async fn wait_for_connection(&self, timeout: Duration) {
// Lock with read shared access
let relays = self.atomic.relays.read().await;

// Compose futures
let mut futures = Vec::with_capacity(relays.len());
for relay in relays.values() {
futures.push(relay.wait_for_connection(timeout));
}

// Join futures
future::join_all(futures).await;
}

pub async fn try_connect(&self, timeout: Duration) -> Output<()> {
// Lock with read shared access
let relays = self.atomic.relays.read().await;

let mut urls: Vec<RelayUrl> = Vec::with_capacity(relays.len());
let mut futures = Vec::with_capacity(relays.len());
let mut output: Output<()> = Output::default();

// Filter only relays that can connect and compose futures
for relay in relays.values().filter(|r| r.status().can_connect()) {
futures.push(relay.connect(connection_timeout));
urls.push(relay.url().clone());
futures.push(relay.try_connect(timeout));
}

// Check number of futures
if futures.len() <= MAX_CONNECTING_CHUNK {
future::join_all(futures).await;
return;
}
// TODO: use semaphore to limit number concurrent connections?

tracing::warn!(
"Too many relays ({}). Connecting in chunks of {MAX_CONNECTING_CHUNK} relays...",
futures.len()
);
// Join futures
let list = future::join_all(futures).await;

// Join in chunks
while !futures.is_empty() {
let upper: usize = cmp::min(MAX_CONNECTING_CHUNK, futures.len());
let chunk = futures.drain(..upper);
future::join_all(chunk).await;
// Iterate results and compose output
for (url, result) in urls.into_iter().zip(list.into_iter()) {
match result {
Ok(..) => {
output.success.insert(url);
}
Err(e) => {
output.failed.insert(url, e.to_string());
}
}
}

output
}

pub async fn disconnect(&self) -> Result<(), Error> {
Expand All @@ -964,11 +990,7 @@ impl InnerRelayPool {
Ok(())
}

pub async fn connect_relay<U>(
&self,
url: U,
connection_timeout: Option<Duration>,
) -> Result<(), Error>
pub async fn connect_relay<U>(&self, url: U) -> Result<(), Error>
where
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
Expand All @@ -983,7 +1005,27 @@ impl InnerRelayPool {
let relay: &Relay = self.internal_relay(&relays, &url)?;

// Connect
relay.connect(connection_timeout).await;
relay.connect();

Ok(())
}

pub async fn try_connect_relay<U>(&self, url: U, timeout: Duration) -> Result<(), Error>
where
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
{
// Convert url
let url: RelayUrl = url.try_into_url()?;

// Lock with read shared access
let relays = self.atomic.relays.read().await;

// Get relay
let relay: &Relay = self.internal_relay(&relays, &url)?;

// Try to connect
relay.try_connect(timeout).await?;

Ok(())
}
Expand Down
Loading
Loading