Skip to content

Commit

Permalink
pool: add try_connect methods
Browse files Browse the repository at this point in the history
* Add `Relay::try_connect`
* Add `Client::try_connect`
* Add `Client::try_connect_relay`
* Add `RelayPool::try_connect`
* Add `RelayPool::try_connect_relay`
* Change `Relay::connect` method signature

Closes #624

Signed-off-by: Yuki Kishimoto <yukikishimoto@protonmail.com>
  • Loading branch information
yukibtc committed Dec 31, 2024
1 parent 6ccc81a commit f149de3
Show file tree
Hide file tree
Showing 17 changed files with 243 additions and 100 deletions.
24 changes: 24 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,30 @@
-->

## [Unreleased]

### Summary

### Breaking changes

* pool: change `Relay::connect` method signature ([Yuki Kishimoto])

### Changed

### Added

* pool: add `Relay::try_connect` ([Yuki Kishimoto])
* pool: add `RelayPool::try_connect` ([Yuki Kishimoto])
* pool: add `RelayPool::try_connect_relay` ([Yuki Kishimoto])
* sdk: add `Client::try_connect` ([Yuki Kishimoto])
* sdk: add `Client::try_connect_relay` ([Yuki Kishimoto])

### Fixed

### Removed

### Deprecated

## [v0.38.0] - 2024/12/31

### Summary
Expand Down
4 changes: 2 additions & 2 deletions bindings/nostr-sdk-ffi/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ impl Client {
///
/// Try to connect to the relays and wait for them to be connected at most for the specified `timeout`.
/// The code continues if the `timeout` is reached or if all relays connect.
pub async fn connect_with_timeout(&self, timeout: Duration) {
self.inner.connect_with_timeout(timeout).await
pub async fn try_connect(&self, timeout: Duration) {
self.inner.try_connect(timeout).await
}

pub async fn disconnect(&self) -> Result<()> {
Expand Down
17 changes: 14 additions & 3 deletions bindings/nostr-sdk-ffi/src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,20 @@ impl Relay {

// TODO: add notifications

/// Connect to relay and keep alive connection
pub async fn connect(&self, connection_timeout: Option<Duration>) {
self.inner.connect(connection_timeout).await
/// Connect to relay
///
/// This method returns immediately and doesn't provide any information on if the connection was successful or not.
pub fn connect(&self) {
self.inner.connect()
}

/// Try to connect to relay
///
/// This method returns an error if the connection fails.
/// If the connection fails,
/// a task will continue to retry in the background (unless configured differently in `RelayOptions`.
pub async fn try_connect(&self, timeout: Duration) -> Result<()> {
Ok(self.inner.try_connect(timeout).await?)
}

/// Disconnect from relay and set status to 'Terminated'
Expand Down
6 changes: 3 additions & 3 deletions bindings/nostr-sdk-js/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,9 @@ impl JsClient {
/// Try to connect to the relays and wait for them to be connected at most for the specified `timeout`.
/// The code continues if the `timeout` is reached or if all relays connect.
#[inline]
#[wasm_bindgen(js_name = connectWithTimeout)]
pub async fn connect_with_timeout(&self, timeout: &JsDuration) {
self.inner.connect_with_timeout(**timeout).await
#[wasm_bindgen(js_name = tryConnect)]
pub async fn try_connect(&self, timeout: &JsDuration) {
self.inner.try_connect(**timeout).await
}

/// Disconnect from all relays
Expand Down
18 changes: 15 additions & 3 deletions bindings/nostr-sdk-js/src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,21 @@ impl JsRelay {
self.inner.queue() as u64
}

/// Connect to relay and keep alive connection
pub async fn connect(&self, connection_timeout: Option<JsDuration>) {
self.inner.connect(connection_timeout.map(|d| *d)).await
/// Connect to relay
///
/// This method returns immediately and doesn't provide any information on if the connection was successful or not.
pub fn connect(&self) {
self.inner.connect()
}

/// Try to connect to relay
///
/// This method returns an error if the connection fails.
/// If the connection fails,
/// a task will continue to retry in the background (unless configured differently in `RelayOptions`.
#[wasm_bindgen(js_name = tryConnect)]
pub async fn try_connect(&self, timeout: &JsDuration) -> Result<()> {
self.inner.try_connect(**timeout).await.map_err(into_err)
}

/// Disconnect from relay and set status to 'Terminated'
Expand Down
2 changes: 1 addition & 1 deletion crates/nostr-connect/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl NostrConnect {
}

// Connect to relays
self.pool.connect(None).await;
self.pool.connect().await;

// Subscribe
let notifications = self.subscribe().await?;
Expand Down
3 changes: 1 addition & 2 deletions crates/nostr-connect/src/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

use nostr::nips::nip46::{Message, Request, ResponseResult};
use nostr_relay_pool::prelude::*;
Expand Down Expand Up @@ -130,7 +129,7 @@ impl NostrConnectRemoteSigner {
}

// Connect
self.pool.connect(Some(Duration::from_secs(10))).await;
self.pool.connect().await;

let filter = Filter::new()
.pubkey(self.keys.signer.public_key())
Expand Down
2 changes: 1 addition & 1 deletion crates/nostr-relay-pool/examples/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async fn main() -> Result<()> {

pool.add_relay("wss://relay.damus.io", RelayOptions::default())
.await?;
pool.connect(None).await;
pool.connect().await;

let event = Event::from_json(r#"{"content":"","created_at":1698412975,"id":"f55c30722f056e330d8a7a6a9ba1522f7522c0f1ced1c93d78ea833c78a3d6ec","kind":3,"pubkey":"f831caf722214748c72db4829986bd0cbb2bb8b3aeade1c959624a52a9629046","sig":"5092a9ffaecdae7d7794706f085ff5852befdf79df424cc3419bb797bf515ae05d4f19404cb8324b8b4380a4bd497763ac7b0f3b1b63ef4d3baa17e5f5901808","tags":[["p","4ddeb9109a8cd29ba279a637f5ec344f2479ee07df1f4043f3fe26d8948cfef9","",""],["p","bb6fd06e156929649a73e6b278af5e648214a69d88943702f1fb627c02179b95","",""],["p","b8b8210f33888fdbf5cedee9edf13c3e9638612698fe6408aff8609059053420","",""],["p","9dcee4fabcd690dc1da9abdba94afebf82e1e7614f4ea92d61d52ef9cd74e083","",""],["p","3eea9e831fefdaa8df35187a204d82edb589a36b170955ac5ca6b88340befaa0","",""],["p","885238ab4568f271b572bf48b9d6f99fa07644731f288259bd395998ee24754e","",""],["p","568a25c71fba591e39bebe309794d5c15d27dbfa7114cacb9f3586ea1314d126","",""]]}"#).unwrap();
pool.send_event(event).await?;
Expand Down
2 changes: 0 additions & 2 deletions crates/nostr-relay-pool/src/pool/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,5 @@

//! Constants
pub(super) const MAX_CONNECTING_CHUNK: usize = 100;

/// Relay Pool default notification channel size
pub const DEFAULT_NOTIFICATION_CHANNEL_SIZE: usize = 4096;
75 changes: 52 additions & 23 deletions crates/nostr-relay-pool/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use atomic_destructor::AtomicDestroyer;
use nostr_database::prelude::*;
use tokio::sync::{broadcast, mpsc, Mutex, RwLock, RwLockReadGuard};

use super::constants::MAX_CONNECTING_CHUNK;
use super::options::RelayPoolOptions;
use super::{Error, Output, RelayPoolNotification};
use crate::relay::options::{RelayOptions, ReqExitPolicy, SyncOptions};
Expand Down Expand Up @@ -922,34 +921,48 @@ impl InnerRelayPool {
Ok(ReceiverStream::new(rx))
}

pub async fn connect(&self, connection_timeout: Option<Duration>) {
pub async fn connect(&self) {
// Lock with read shared access
let relays = self.atomic.relays.read().await;

// Connect
for relay in relays.values() {
relay.connect()
}
}

pub async fn try_connect(&self, timeout: Duration) -> Output<()> {
// Lock with read shared access
let relays = self.atomic.relays.read().await;

let mut urls: Vec<RelayUrl> = Vec::with_capacity(relays.len());
let mut futures = Vec::with_capacity(relays.len());
let mut output: Output<()> = Output::default();

// Filter only relays that can connect and compose futures
for relay in relays.values().filter(|r| r.status().can_connect()) {
futures.push(relay.connect(connection_timeout));
urls.push(relay.url().clone());
futures.push(relay.try_connect(timeout));
}

// Check number of futures
if futures.len() <= MAX_CONNECTING_CHUNK {
future::join_all(futures).await;
return;
}
// TODO: use semaphore to limit number concurrent connections

tracing::warn!(
"Too many relays ({}). Connecting in chunks of {MAX_CONNECTING_CHUNK} relays...",
futures.len()
);
// Join futures
let list = future::join_all(futures).await;

// Join in chunks
while !futures.is_empty() {
let upper: usize = cmp::min(MAX_CONNECTING_CHUNK, futures.len());
let chunk = futures.drain(..upper);
future::join_all(chunk).await;
// Iterate results and compose output
for (url, result) in urls.into_iter().zip(list.into_iter()) {
match result {
Ok(..) => {
output.success.insert(url);
}
Err(e) => {
output.failed.insert(url, e.to_string());
}
}
}

output
}

pub async fn disconnect(&self) -> Result<(), Error> {
Expand All @@ -964,11 +977,7 @@ impl InnerRelayPool {
Ok(())
}

pub async fn connect_relay<U>(
&self,
url: U,
connection_timeout: Option<Duration>,
) -> Result<(), Error>
pub async fn connect_relay<U>(&self, url: U) -> Result<(), Error>
where
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
Expand All @@ -983,7 +992,27 @@ impl InnerRelayPool {
let relay: &Relay = self.internal_relay(&relays, &url)?;

// Connect
relay.connect(connection_timeout).await;
relay.connect();

Ok(())
}

pub async fn try_connect_relay<U>(&self, url: U, timeout: Duration) -> Result<(), Error>
where
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
{
// Convert url
let url: RelayUrl = url.try_into_url()?;

// Lock with read shared access
let relays = self.atomic.relays.read().await;

// Get relay
let relay: &Relay = self.internal_relay(&relays, &url)?;

// Try to connect
relay.try_connect(timeout).await?;

Ok(())
}
Expand Down
40 changes: 29 additions & 11 deletions crates/nostr-relay-pool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,16 @@ impl RelayPool {
self.inner.remove_all_relays(true).await
}

/// Connect to all added relays and keep connection alive
/// Connect to all added relays
#[inline]
pub async fn connect(&self, connection_timeout: Option<Duration>) {
self.inner.connect(connection_timeout).await
pub async fn connect(&self) {
self.inner.connect().await
}

/// Connect to all added relays
#[inline]
pub async fn try_connect(&self, timeout: Duration) -> Output<()> {
self.inner.try_connect(timeout).await
}

/// Disconnect from all relays
Expand All @@ -263,18 +269,30 @@ impl RelayPool {
self.inner.disconnect().await
}

/// Connect to relay
/// Connect to a previously added relay
///
/// This method doesn't provide any information on if the connection was successful or not.
///
/// Return [`Error::RelayNotFound`] if the relay doesn't exist in the pool.
#[inline]
pub async fn connect_relay<U>(
&self,
url: U,
connection_timeout: Option<Duration>,
) -> Result<(), Error>
pub async fn connect_relay<U>(&self, url: U) -> Result<(), Error>
where
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
{
self.inner.connect_relay(url).await
}

/// Try to connect to a previously added relay
///
/// This method returns an error if the connection fails.
#[inline]
pub async fn try_connect_relay<U>(&self, url: U, timeout: Duration) -> Result<(), Error>
where
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
{
self.inner.connect_relay(url, connection_timeout).await
self.inner.try_connect_relay(url, timeout).await
}

/// Disconnect relay
Expand Down Expand Up @@ -633,7 +651,7 @@ mod tests {

pool.add_relay(&url, RelayOptions::default()).await.unwrap();

pool.connect(None).await;
pool.connect().await;

assert!(!pool.inner.is_shutdown());

Expand Down
3 changes: 3 additions & 0 deletions crates/nostr-relay-pool/src/relay/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub enum Error {
NotReady,
/// Relay not connected
NotConnected,
/// Connection failed
ConnectionFailed,
/// Received shutdown
ReceivedShutdown,
/// Relay message
Expand Down Expand Up @@ -145,6 +147,7 @@ impl fmt::Display for Error {
}
Self::NotReady => write!(f, "relay is initialized but not ready"),
Self::NotConnected => write!(f, "relay not connected"),
Self::ConnectionFailed => write!(f, "connection failed"),
Self::ReceivedShutdown => write!(f, "received shutdown"),
Self::RelayMessage(message) => write!(f, "{message}"),
Self::BatchMessagesEmpty => write!(f, "can't batch empty list of messages"),
Expand Down
Loading

0 comments on commit f149de3

Please sign in to comment.