diff --git a/CHANGELOG.md b/CHANGELOG.md index c514dfc9e..8d9c8abef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -81,6 +81,7 @@ * database: impl PartialEq and Eq for `Events` ([Yuki Kishimoto]) * database: add `SaveEventStatus` enum ([Yuki Kishimoto]) * pool: add `ReceiverStream` ([Yuki Kishimoto]) +* Add `SubscribeAutoCloseOptions::relative_timeout` ([Yuki Kishimoto]) * sdk: automatically resend event after NIP-42 authentication ([Yuki Kishimoto]) * sdk: add `Connection::embedded_tor_with_path` ([Yuki Kishimoto]) * connect: add `NostrConnect::status` ([Yuki Kishimoto]) diff --git a/bindings/nostr-sdk-ffi/src/relay/options.rs b/bindings/nostr-sdk-ffi/src/relay/options.rs index d6e1df1db..442ff4acf 100644 --- a/bindings/nostr-sdk-ffi/src/relay/options.rs +++ b/bindings/nostr-sdk-ffi/src/relay/options.rs @@ -212,12 +212,19 @@ impl SubscribeAutoCloseOptions { builder } - /// Automatically close subscription after `Duration` + /// Automatically close subscription after duration. pub fn timeout(&self, timeout: Option) -> Self { let mut builder = self.clone(); builder.inner = builder.inner.timeout(timeout); builder } + + /// Automatically close subscription if don't receive any more notifications/events within the duration. + pub fn relative_timeout(&self, timeout: Option) -> Self { + let mut builder = self.clone(); + builder.inner = builder.inner.relative_timeout(timeout); + builder + } } /// Subscribe options diff --git a/bindings/nostr-sdk-js/src/relay/options.rs b/bindings/nostr-sdk-js/src/relay/options.rs index 4069e2a8d..1d679e6bb 100644 --- a/bindings/nostr-sdk-js/src/relay/options.rs +++ b/bindings/nostr-sdk-js/src/relay/options.rs @@ -3,6 +3,7 @@ // Distributed under the MIT software license use core::ops::Deref; +use std::time::Duration; use nostr_sdk::prelude::*; use wasm_bindgen::prelude::*; @@ -171,6 +172,12 @@ impl JsSubscribeAutoCloseOptions { pub fn timeout(self, timeout: Option) -> Self { self.inner.timeout(timeout.map(|t| *t)).into() } + + /// Automatically close subscription if don't receive any more notifications/events within the duration. + #[wasm_bindgen(js_name = relativeTimeout)] + pub fn relative_timeout(self, timeout: Option) -> Self { + self.inner.relative_timeout(timeout.map(|t| *t)).into() + } } /// Subscribe options diff --git a/crates/nostr-relay-pool/src/relay/inner.rs b/crates/nostr-relay-pool/src/relay/inner.rs index 9a07f9374..7f9d9b478 100644 --- a/crates/nostr-relay-pool/src/relay/inner.rs +++ b/crates/nostr-relay-pool/src/relay/inner.rs @@ -1339,40 +1339,7 @@ impl InnerRelay { // Check if auto-close condition is set match opts.auto_close { - Some(opts) => { - let relay = self.clone(); - task::spawn(async move { - let res: Option<(bool, Option)> = - relay.handle_auto_closing(&id, filters, opts).await; - - // Check if CLOSE needed - let to_close: bool = match res { - Some((to_close, reason)) => { - // Send subscription auto closed notification - if let Some(reason) = reason { - relay.send_notification( - RelayNotification::SubscriptionAutoClosed { reason }, - false, - ); - } - - to_close - } - None => { - tracing::warn!(id = %id, "Timeout reached for subscription, auto-closing."); - true - } - }; - - // Close subscription - if to_close { - tracing::debug!(id = %id, "Auto-closing subscription."); - relay.send_msg(ClientMessage::close(id))?; - } - - Ok::<(), Error>(()) - }); - } + Some(opts) => self.spawn_auto_closing_handler(id, filters, opts), None => { // No auto-close subscription: update subscription filters self.update_subscription(id, filters, true).await; @@ -1382,6 +1349,44 @@ impl InnerRelay { Ok(()) } + fn spawn_auto_closing_handler( + &self, + id: SubscriptionId, + filters: Vec, + opts: SubscribeAutoCloseOptions, + ) { + let relay = self.clone(); + task::spawn(async move { + // Check if CLOSE needed + let to_close: bool = match relay.handle_auto_closing(&id, filters, opts).await { + Some((to_close, reason)) => { + // Send subscription auto-closed notification + if let Some(reason) = reason { + relay.send_notification( + RelayNotification::SubscriptionAutoClosed { reason }, + false, + ); + } + + to_close + } + // Timeout + None => { + tracing::warn!(id = %id, "Timeout reached for subscription, auto-closing."); + true + } + }; + + // Close subscription + if to_close { + tracing::debug!(id = %id, "Auto-closing subscription."); + relay.send_msg(ClientMessage::close(id))?; + } + + Ok::<(), Error>(()) + }); + } + async fn handle_auto_closing( &self, id: &SubscriptionId, @@ -1389,18 +1394,40 @@ impl InnerRelay { opts: SubscribeAutoCloseOptions, ) -> Option<(bool, Option)> { time::timeout(opts.timeout, async move { - let mut counter = 0; + let mut counter: u16 = 0; let mut received_eose: bool = false; let mut require_resubscription: bool = false; + let mut last_event: Option = None; + // Subscribe to notifications let mut notifications = self.internal_notification_sender.subscribe(); - while let Ok(notification) = notifications.recv().await { + + // Listen to notifications with timeout + // If no notification is received within no-events timeout, `None` is returned. + while let Ok(notification) = + time::timeout(opts.relative_timeout, notifications.recv()).await? + { + // Check if no-events timeout is reached + if let (Some(relative_timeout), Some(last_event)) = + (opts.relative_timeout, last_event) + { + if last_event.elapsed() > relative_timeout { + // Close the subscription + return Some((true, None)); // TODO: use SubscriptionAutoClosedReason::Timeout? + } + } + match notification { RelayNotification::Message { message, .. } => match message { RelayMessage::Event { subscription_id, .. } => { if &subscription_id == id { + // If no-events timeout is enabled, update instant of last event received + if opts.relative_timeout.is_some() { + last_event = Some(Instant::now()); + } + if let ReqExitPolicy::WaitForEventsAfterEOSE(num) = opts.exit_policy { if received_eose { @@ -1433,17 +1460,17 @@ impl InnerRelay { if self.state.is_auto_authentication_enabled() { require_resubscription = true; } else { - return ( + return Some(( false, Some(SubscriptionAutoClosedReason::Closed(message)), - ); // No need to send CLOSE msg + )); // No need to send CLOSE msg } } _ => { - return ( + return Some(( false, Some(SubscriptionAutoClosedReason::Closed(message)), - ); // No need to send CLOSE msg + )); // No need to send CLOSE msg } } } @@ -1460,18 +1487,18 @@ impl InnerRelay { } } RelayNotification::AuthenticationFailed => { - return ( + return Some(( false, Some(SubscriptionAutoClosedReason::AuthenticationFailed), - ); // No need to send CLOSE msg + )); // No need to send CLOSE msg } RelayNotification::RelayStatus { status } => { if status.is_disconnected() { - return (false, None); // No need to send CLOSE msg + return Some((false, None)); // No need to send CLOSE msg } } RelayNotification::Shutdown => { - return (false, None); // No need to send CLOSE msg + return Some((false, None)); // No need to send CLOSE msg } _ => (), } @@ -1498,9 +1525,9 @@ impl InnerRelay { .await; } - (true, Some(SubscriptionAutoClosedReason::Completed)) // Need to send CLOSE msg + Some((true, Some(SubscriptionAutoClosedReason::Completed))) // Need to send CLOSE msg }) - .await + .await? } pub async fn unsubscribe(&self, id: SubscriptionId) -> Result<(), Error> { diff --git a/crates/nostr-relay-pool/src/relay/options.rs b/crates/nostr-relay-pool/src/relay/options.rs index af2fa1e08..b1bade87e 100644 --- a/crates/nostr-relay-pool/src/relay/options.rs +++ b/crates/nostr-relay-pool/src/relay/options.rs @@ -156,6 +156,7 @@ impl RelayOptions { pub struct SubscribeAutoCloseOptions { pub(super) exit_policy: ReqExitPolicy, pub(super) timeout: Option, + pub(super) relative_timeout: Option, } impl SubscribeAutoCloseOptions { @@ -172,11 +173,17 @@ impl SubscribeAutoCloseOptions { self } - /// Automatically close subscription after [Duration] + /// Automatically close subscription after [`Duration`]. pub fn timeout(mut self, timeout: Option) -> Self { self.timeout = timeout; self } + + /// Automatically close subscription if don't receive any more notifications/events within the [`Duration`]. + pub fn relative_timeout(mut self, timeout: Option) -> Self { + self.relative_timeout = timeout; + self + } } /// Subscribe options