Skip to content

Commit

Permalink
pool: auto-close subscription after Duration of no new events
Browse files Browse the repository at this point in the history
Add `SubscribeAutoCloseOptions::relative_timeout`

Closes #691

Signed-off-by: Yuki Kishimoto <yukikishimoto@protonmail.com>
  • Loading branch information
yukibtc committed Dec 26, 2024
1 parent 7351b86 commit 94167e3
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 48 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
* database: impl PartialEq and Eq for `Events` ([Yuki Kishimoto])
* database: add `SaveEventStatus` enum ([Yuki Kishimoto])
* pool: add `ReceiverStream` ([Yuki Kishimoto])
* Add `SubscribeAutoCloseOptions::relative_timeout` ([Yuki Kishimoto])
* sdk: automatically resend event after NIP-42 authentication ([Yuki Kishimoto])
* sdk: add `Connection::embedded_tor_with_path` ([Yuki Kishimoto])
* connect: add `NostrConnect::status` ([Yuki Kishimoto])
Expand Down
9 changes: 8 additions & 1 deletion bindings/nostr-sdk-ffi/src/relay/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,19 @@ impl SubscribeAutoCloseOptions {
builder
}

/// Automatically close subscription after `Duration`
/// Automatically close subscription after duration.
pub fn timeout(&self, timeout: Option<Duration>) -> Self {
let mut builder = self.clone();
builder.inner = builder.inner.timeout(timeout);
builder
}

/// Automatically close subscription if don't receive any more notifications/events within the duration.
pub fn relative_timeout(&self, timeout: Option<Duration>) -> Self {
let mut builder = self.clone();
builder.inner = builder.inner.relative_timeout(timeout);
builder
}
}

/// Subscribe options
Expand Down
7 changes: 7 additions & 0 deletions bindings/nostr-sdk-js/src/relay/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// Distributed under the MIT software license

use core::ops::Deref;
use std::time::Duration;

use nostr_sdk::prelude::*;
use wasm_bindgen::prelude::*;
Expand Down Expand Up @@ -171,6 +172,12 @@ impl JsSubscribeAutoCloseOptions {
pub fn timeout(self, timeout: Option<JsDuration>) -> Self {
self.inner.timeout(timeout.map(|t| *t)).into()
}

/// Automatically close subscription if don't receive any more notifications/events within the duration.
#[wasm_bindgen(js_name = relativeTimeout)]
pub fn relative_timeout(self, timeout: Option<JsDuration>) -> Self {
self.inner.relative_timeout(timeout.map(|t| *t)).into()
}
}

/// Subscribe options
Expand Down
119 changes: 73 additions & 46 deletions crates/nostr-relay-pool/src/relay/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1339,40 +1339,7 @@ impl InnerRelay {

// Check if auto-close condition is set
match opts.auto_close {
Some(opts) => {
let relay = self.clone();
task::spawn(async move {
let res: Option<(bool, Option<SubscriptionAutoClosedReason>)> =
relay.handle_auto_closing(&id, filters, opts).await;

// Check if CLOSE needed
let to_close: bool = match res {
Some((to_close, reason)) => {
// Send subscription auto closed notification
if let Some(reason) = reason {
relay.send_notification(
RelayNotification::SubscriptionAutoClosed { reason },
false,
);
}

to_close
}
None => {
tracing::warn!(id = %id, "Timeout reached for subscription, auto-closing.");
true
}
};

// Close subscription
if to_close {
tracing::debug!(id = %id, "Auto-closing subscription.");
relay.send_msg(ClientMessage::close(id))?;
}

Ok::<(), Error>(())
});
}
Some(opts) => self.spawn_auto_closing_handler(id, filters, opts),
None => {
// No auto-close subscription: update subscription filters
self.update_subscription(id, filters, true).await;
Expand All @@ -1382,25 +1349,85 @@ impl InnerRelay {
Ok(())
}

fn spawn_auto_closing_handler(
&self,
id: SubscriptionId,
filters: Vec<Filter>,
opts: SubscribeAutoCloseOptions,
) {
let relay = self.clone();
task::spawn(async move {
// Check if CLOSE needed
let to_close: bool = match relay.handle_auto_closing(&id, filters, opts).await {
Some((to_close, reason)) => {
// Send subscription auto-closed notification
if let Some(reason) = reason {
relay.send_notification(
RelayNotification::SubscriptionAutoClosed { reason },
false,
);
}

to_close
}
// Timeout
None => {
tracing::warn!(id = %id, "Timeout reached for subscription, auto-closing.");
true
}
};

// Close subscription
if to_close {
tracing::debug!(id = %id, "Auto-closing subscription.");
relay.send_msg(ClientMessage::close(id))?;
}

Ok::<(), Error>(())
});
}

async fn handle_auto_closing(
&self,
id: &SubscriptionId,
filters: Vec<Filter>,
opts: SubscribeAutoCloseOptions,
) -> Option<(bool, Option<SubscriptionAutoClosedReason>)> {
time::timeout(opts.timeout, async move {
let mut counter = 0;
let mut counter: u16 = 0;
let mut received_eose: bool = false;
let mut require_resubscription: bool = false;
let mut last_event: Option<Instant> = None;

// Subscribe to notifications
let mut notifications = self.internal_notification_sender.subscribe();
while let Ok(notification) = notifications.recv().await {

// Listen to notifications with timeout
// If no notification is received within no-events timeout, `None` is returned.
while let Ok(notification) =
time::timeout(opts.relative_timeout, notifications.recv()).await?
{
// Check if no-events timeout is reached
if let (Some(relative_timeout), Some(last_event)) =
(opts.relative_timeout, last_event)
{
if last_event.elapsed() > relative_timeout {
// Close the subscription
return Some((true, None)); // TODO: use SubscriptionAutoClosedReason::Timeout?
}
}

match notification {
RelayNotification::Message { message, .. } => match message {
RelayMessage::Event {
subscription_id, ..
} => {
if &subscription_id == id {
// If no-events timeout is enabled, update instant of last event received
if opts.relative_timeout.is_some() {
last_event = Some(Instant::now());
}

if let ReqExitPolicy::WaitForEventsAfterEOSE(num) = opts.exit_policy
{
if received_eose {
Expand Down Expand Up @@ -1433,17 +1460,17 @@ impl InnerRelay {
if self.state.is_auto_authentication_enabled() {
require_resubscription = true;
} else {
return (
return Some((
false,
Some(SubscriptionAutoClosedReason::Closed(message)),
); // No need to send CLOSE msg
)); // No need to send CLOSE msg
}
}
_ => {
return (
return Some((
false,
Some(SubscriptionAutoClosedReason::Closed(message)),
); // No need to send CLOSE msg
)); // No need to send CLOSE msg
}
}
}
Expand All @@ -1460,18 +1487,18 @@ impl InnerRelay {
}
}
RelayNotification::AuthenticationFailed => {
return (
return Some((
false,
Some(SubscriptionAutoClosedReason::AuthenticationFailed),
); // No need to send CLOSE msg
)); // No need to send CLOSE msg
}
RelayNotification::RelayStatus { status } => {
if status.is_disconnected() {
return (false, None); // No need to send CLOSE msg
return Some((false, None)); // No need to send CLOSE msg
}
}
RelayNotification::Shutdown => {
return (false, None); // No need to send CLOSE msg
return Some((false, None)); // No need to send CLOSE msg
}
_ => (),
}
Expand All @@ -1498,9 +1525,9 @@ impl InnerRelay {
.await;
}

(true, Some(SubscriptionAutoClosedReason::Completed)) // Need to send CLOSE msg
Some((true, Some(SubscriptionAutoClosedReason::Completed))) // Need to send CLOSE msg
})
.await
.await?
}

pub async fn unsubscribe(&self, id: SubscriptionId) -> Result<(), Error> {
Expand Down
9 changes: 8 additions & 1 deletion crates/nostr-relay-pool/src/relay/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ impl RelayOptions {
pub struct SubscribeAutoCloseOptions {
pub(super) exit_policy: ReqExitPolicy,
pub(super) timeout: Option<Duration>,
pub(super) relative_timeout: Option<Duration>,
}

impl SubscribeAutoCloseOptions {
Expand All @@ -172,11 +173,17 @@ impl SubscribeAutoCloseOptions {
self
}

/// Automatically close subscription after [Duration]
/// Automatically close subscription after [`Duration`].
pub fn timeout(mut self, timeout: Option<Duration>) -> Self {
self.timeout = timeout;
self
}

/// Automatically close subscription if don't receive any more notifications/events within the [`Duration`].
pub fn relative_timeout(mut self, timeout: Option<Duration>) -> Self {
self.relative_timeout = timeout;
self
}
}

/// Subscribe options
Expand Down

0 comments on commit 94167e3

Please sign in to comment.