From e6cb9f814066948099ad201ae8822e0280f0c27d Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Fri, 27 Dec 2024 11:34:12 +0100 Subject: [PATCH] relay-builder: fix new events are not passed in real-time to subscriptions Signed-off-by: Yuki Kishimoto --- CHANGELOG.md | 1 + crates/nostr-relay-builder/src/local/inner.rs | 278 ++++++++++++------ .../nostr-relay-builder/src/local/session.rs | 3 +- 3 files changed, 183 insertions(+), 99 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6be969f4b..7fd6c9cc1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -95,6 +95,7 @@ * nostr: remove redundant NIP10 tags from `EventBuilder::text_note_reply` ([Yuki Kishimoto]) * sdk: fix NIP42 authentication for auto-closing REQ ([Yuki Kishimoto]) * sdk: fix min POW is not updated to already existing relays ([Yuki Kishimoto]) +* relay-builder: fix new events are not passed in real-time to subscriptions ([Yuki Kishimoto]) * bindings: allow passing empty string as relay url without return an error ([Yuki Kishimoto]) * ffi: fix UniFFI checksum mismatch issue ([Yuki Kishimoto]) * flutter: fix `default` is reserved in dart ([J. Azad EMERY]) diff --git a/crates/nostr-relay-builder/src/local/inner.rs b/crates/nostr-relay-builder/src/local/inner.rs index c698b4c38..9e35821b8 100644 --- a/crates/nostr-relay-builder/src/local/inner.rs +++ b/crates/nostr-relay-builder/src/local/inner.rs @@ -6,14 +6,14 @@ use std::collections::HashMap; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Arc; -use async_utility::futures_util::stream::{self, SplitSink}; +use async_utility::futures_util::stream::{self, SplitSink, SplitStream}; use async_utility::futures_util::{SinkExt, StreamExt}; use async_wsocket::native::{self, Message, WebSocketStream}; use atomic_destructor::AtomicDestroyer; use negentropy::{Bytes, Id, NegentropyStorageVector}; use nostr_database::prelude::*; use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::{broadcast, Semaphore}; +use tokio::sync::{broadcast, mpsc, watch, RwLock, Semaphore}; use super::session::{Nip42Session, RateLimiterResponse, Session, Tokens}; use super::util; @@ -24,6 +24,7 @@ use crate::builder::{ use crate::error::Error; type WsTx = SplitSink, Message>; +type WsRx = SplitStream>; #[derive(Debug, Clone)] pub(super) struct InnerLocalRelay { @@ -173,67 +174,143 @@ impl InnerLocalRelay { tracing::debug!("WebSocket connection established: {addr}"); let mut shutdown_rx = self.shutdown.subscribe(); - let mut new_event = self.new_event.subscribe(); + let new_event = self.new_event.subscribe(); - let (mut tx, mut rx) = ws_stream.split(); + let (ws_tx, ws_rx) = ws_stream.split(); let mut session: Session = Session { - subscriptions: HashMap::new(), negentropy_subscription: HashMap::new(), nip42: Nip42Session::default(), tokens: Tokens::new(self.rate_limit.notes_per_minute), }; + let subscriptions: RwLock>> = + RwLock::new(HashMap::new()); - loop { - tokio::select! { - msg = rx.next() => { - match msg { - Some(Ok(msg)) => { - match msg { - Message::Text(json) => { - tracing::trace!("Received {json}"); - self.handle_client_msg(&mut session, &mut tx, ClientMessage::from_json(json)?, &addr) - .await?; - } - Message::Binary(..) => { - let msg: RelayMessage = - RelayMessage::notice("binary messages are not processed by this relay"); - if let Err(e) = self.send_msg(&mut tx, msg).await { - tracing::error!("Can't send msg to client: {e}"); - } - } - Message::Ping(val) => { - let _ = tx.send(Message::Pong(val)).await; - } - Message::Pong(..) => {} - Message::Close(..) => {} - Message::Frame(..) => {} - } - } - Some(Err(e)) => tracing::error!("Can't handle websocket msg: {e}"), - None => break, - } + let (nostr_tx, nostr_rx) = mpsc::channel::>(256); + let (pong_tx, pong_rx) = watch::channel::>(Vec::new()); + + // Wait that one of the futures terminates/completes + tokio::select! { + res = self.receiver_message_handler(ws_rx, &nostr_tx, pong_tx, &mut session, &subscriptions, &addr) => match res { + Ok(()) => tracing::trace!(addr = %addr, "Relay received exited."), + Err(e) => tracing::error!(addr = %addr, error = %e, "Relay receiver exited with error.") + }, + res = self.sender_message_handler(ws_tx, nostr_rx, pong_rx) => match res { + Ok(()) => tracing::trace!(addr = %addr, "Relay sender exited."), + Err(e) => tracing::error!(addr = %addr, error = %e, "Relay sender exited with error.") + }, + res = self.new_event_dispatcher(new_event, &nostr_tx, &subscriptions) => match res { + Ok(()) => tracing::trace!(addr = %addr, "New event dispatcher exited."), + Err(e) => tracing::error!(addr = %addr, error = %e, "New event dispatcher exited with error.") + }, + _ = shutdown_rx.recv() => {} + } + + // Drop connection permit + drop(permit); + + tracing::debug!(addr = %addr, "WebSocket connection terminated."); + + Ok(()) + } + + async fn receiver_message_handler( + &self, + mut ws_rx: WsRx, + nostr_tx: &mpsc::Sender>, + pong_tx: watch::Sender>, + session: &mut Session, + subscriptions: &RwLock>>, + addr: &SocketAddr, + ) -> Result<()> { + while let Some(msg) = ws_rx.next().await { + match msg? { + Message::Text(json) => { + tracing::trace!("Received {json}"); + self.handle_client_msg( + session, + subscriptions, + nostr_tx, + ClientMessage::from_json(json)?, + addr, + ) + .await?; } - event = new_event.recv() => { - if let Ok(event) = event { - // Iter subscriptions - for (id, filters) in session.subscriptions.iter() { - if filters.iter().any(|f| f.match_event(&event)) { - self.send_msg(&mut tx, RelayMessage::event(id.to_owned(), event.clone())).await?; - } - } + Message::Binary(..) => { + let msg: RelayMessage = + RelayMessage::notice("binary messages are not processed by this relay"); + if let Err(e) = self.send_msg(nostr_tx, msg).await { + tracing::error!("Can't send msg to client: {e}"); } } - _ = shutdown_rx.recv() => { - break; + Message::Ping(val) => { + pong_tx.send(val)?; } + Message::Pong(..) => {} + Message::Close(..) => {} + Message::Frame(..) => {} } } - // Drop connection permit - drop(permit); + Ok(()) + } + + async fn sender_message_handler( + &self, + mut ws_tx: WsTx, + mut nostr_rx: mpsc::Receiver>, + mut pong_rx: watch::Receiver>, + ) -> Result<()> { + loop { + tokio::select! { + // Nostr channel receiver + Some(msgs) = nostr_rx.recv() => { + // Serialize messages to JSON and compose WebSocket text messages + let iter = msgs + .into_iter() + .map(|msg| Ok(Message::Text(msg.as_json()))); + + // Send WebSocket messages + let mut s = stream::iter(iter); + ws_tx.send_all(&mut s).await?; + } + // Ping channel receiver + Ok(()) = pong_rx.changed() => { + // Get ping data and mark as seen + let data: Vec = pong_rx.borrow_and_update().to_vec(); + + // Compose ping message + let msg = Message::Pong(data); - tracing::debug!("WebSocket connection terminated for {addr}"); + // Send WebSocket message + ws_tx.send(msg).await?; + } + else => break + } + } + + // Close WebSocket + ws_tx.close().await?; + + Ok(()) + } + + async fn new_event_dispatcher( + &self, + mut new_event: broadcast::Receiver, + nostr_tx: &mpsc::Sender>, + subscriptions: &RwLock>>, + ) -> Result<()> { + while let Ok(event) = new_event.recv().await { + // Iter subscriptions + let subs = subscriptions.read().await; + for (id, filters) in subs.iter() { + if filters.iter().any(|f| f.match_event(&event)) { + self.send_msg(nostr_tx, RelayMessage::event(id.clone(), event.clone())) + .await?; + } + } + } Ok(()) } @@ -241,7 +318,8 @@ impl InnerLocalRelay { async fn handle_client_msg( &self, session: &mut Session, - ws_tx: &mut WsTx, + subscriptions: &RwLock>>, + tx: &mpsc::Sender>, msg: ClientMessage, addr: &SocketAddr, ) -> Result<()> { @@ -253,7 +331,7 @@ impl InnerLocalRelay { { return self .send_msg( - ws_tx, + tx, RelayMessage::Ok { event_id: event.id, status: false, @@ -271,7 +349,7 @@ impl InnerLocalRelay { if !event.id.check_pow(difficulty) { return self .send_msg( - ws_tx, + tx, RelayMessage::Ok { event_id: event.id, status: false, @@ -293,7 +371,7 @@ impl InnerLocalRelay { if nip42.mode.is_write() && !session.nip42.is_authenticated() { // Generate and send AUTH challenge self.send_msg( - ws_tx, + tx, RelayMessage::Auth { challenge: session.nip42.generate_challenge(), }, @@ -303,7 +381,7 @@ impl InnerLocalRelay { // Return error return self .send_msg( - ws_tx, + tx, RelayMessage::Ok { event_id: event.id, status: false, @@ -323,7 +401,7 @@ impl InnerLocalRelay { if let PolicyResult::Reject(m) = policy.admit_event(&event, addr).await { return self .send_msg( - ws_tx, + tx, RelayMessage::Ok { event_id, status: false, @@ -340,7 +418,7 @@ impl InnerLocalRelay { DatabaseEventStatus::Saved => { return self .send_msg( - ws_tx, + tx, RelayMessage::Ok { event_id: event.id, status: true, @@ -355,7 +433,7 @@ impl InnerLocalRelay { DatabaseEventStatus::Deleted => { return self .send_msg( - ws_tx, + tx, RelayMessage::Ok { event_id: event.id, status: false, @@ -378,7 +456,7 @@ impl InnerLocalRelay { if !authored && !tagged { return self .send_msg( - ws_tx, + tx, RelayMessage::Ok { event_id: event.id, status: false, @@ -395,7 +473,7 @@ impl InnerLocalRelay { if !event.verify_id() { return self .send_msg( - ws_tx, + tx, RelayMessage::Ok { event_id: event.id, status: false, @@ -411,7 +489,7 @@ impl InnerLocalRelay { if !event.verify_signature() { return self .send_msg( - ws_tx, + tx, RelayMessage::Ok { event_id: event.id, status: false, @@ -433,7 +511,7 @@ impl InnerLocalRelay { // Send OK message return self .send_msg( - ws_tx, + tx, RelayMessage::Ok { event_id, status: true, @@ -476,28 +554,31 @@ impl InnerLocalRelay { } }; - self.send_msg(ws_tx, msg).await + self.send_msg(tx, msg).await } ClientMessage::Req { subscription_id, filters, } => { // Check number of subscriptions - if session.subscriptions.len() >= self.rate_limit.max_reqs - && !session.subscriptions.contains_key(&subscription_id) { - return self - .send_msg( - ws_tx, - RelayMessage::Closed { - subscription_id, - message: format!( - "{}: too many REQs", - MachineReadablePrefix::RateLimited - ), - }, - ) - .await; + let subs = subscriptions.read().await; + if subs.len() >= self.rate_limit.max_reqs + && !subs.contains_key(&subscription_id) + { + return self + .send_msg( + tx, + RelayMessage::Closed { + subscription_id, + message: format!( + "{}: too many REQs", + MachineReadablePrefix::RateLimited + ), + }, + ) + .await; + } } // Check NIP42 @@ -508,7 +589,7 @@ impl InnerLocalRelay { if nip42.mode.is_read() && !session.nip42.is_authenticated() { // Generate and send AUTH challenge self.send_msg( - ws_tx, + tx, RelayMessage::Auth { challenge: session.nip42.generate_challenge(), }, @@ -518,7 +599,7 @@ impl InnerLocalRelay { // Return error return self .send_msg( - ws_tx, + tx, RelayMessage::Closed { subscription_id, message: format!( @@ -536,7 +617,7 @@ impl InnerLocalRelay { if let PolicyResult::Reject(msg) = plugin.admit_query(&filters, addr).await { return self .send_msg( - ws_tx, + tx, RelayMessage::Closed { subscription_id, message: format!("{}: {}", MachineReadablePrefix::Error, msg), @@ -547,9 +628,8 @@ impl InnerLocalRelay { } // Update session subscriptions - session - .subscriptions - .insert(subscription_id.clone(), filters.clone()); + let mut subs = subscriptions.write().await; + subs.insert(subscription_id.clone(), filters.clone()); // Query database let events = self.database.query(filters).await?; @@ -567,7 +647,7 @@ impl InnerLocalRelay { ); msgs.push(RelayMessage::eose(subscription_id)); - self.send_msgs(ws_tx, msgs).await?; + self.send_msgs(tx, msgs).await?; Ok(()) } @@ -576,17 +656,18 @@ impl InnerLocalRelay { filters, } => { let count: usize = self.database.count(filters).await?; - self.send_msg(ws_tx, RelayMessage::count(subscription_id, count)) + self.send_msg(tx, RelayMessage::count(subscription_id, count)) .await } ClientMessage::Close(subscription_id) => { - session.subscriptions.remove(&subscription_id); + let mut subs = subscriptions.write().await; + subs.remove(&subscription_id); Ok(()) } ClientMessage::Auth(event) => match session.nip42.check_challenge(&event) { Ok(()) => { self.send_msg( - ws_tx, + tx, RelayMessage::Ok { event_id: event.id, status: true, @@ -597,7 +678,7 @@ impl InnerLocalRelay { } Err(e) => { self.send_msg( - ws_tx, + tx, RelayMessage::Ok { event_id: event.id, status: false, @@ -648,7 +729,7 @@ impl InnerLocalRelay { // Reply self.send_msg( - ws_tx, + tx, RelayMessage::NegMsg { subscription_id, message: message.to_hex(), @@ -668,7 +749,7 @@ impl InnerLocalRelay { // Reply self.send_msg( - ws_tx, + tx, RelayMessage::NegMsg { subscription_id, message: message.to_hex(), @@ -678,7 +759,7 @@ impl InnerLocalRelay { } None => { self.send_msg( - ws_tx, + tx, RelayMessage::NegErr { subscription_id, message: format!( @@ -699,18 +780,21 @@ impl InnerLocalRelay { } #[inline] - async fn send_msg(&self, tx: &mut WsTx, msg: RelayMessage) -> Result<()> { - tx.send(Message::Text(msg.as_json())).await?; - Ok(()) + async fn send_msg( + &self, + tx: &mpsc::Sender>, + msg: RelayMessage, + ) -> Result<()> { + self.send_msgs(tx, vec![msg]).await } #[inline] - async fn send_msgs(&self, tx: &mut WsTx, msgs: I) -> Result<()> - where - I: IntoIterator, - { - let mut stream = stream::iter(msgs.into_iter()).map(|msg| Ok(Message::Text(msg.as_json()))); - tx.send_all(&mut stream).await?; + async fn send_msgs( + &self, + tx: &mpsc::Sender>, + msgs: Vec, + ) -> Result<()> { + tx.send(msgs).await?; Ok(()) } } diff --git a/crates/nostr-relay-builder/src/local/session.rs b/crates/nostr-relay-builder/src/local/session.rs index 2adec8538..2a7c01b6c 100644 --- a/crates/nostr-relay-builder/src/local/session.rs +++ b/crates/nostr-relay-builder/src/local/session.rs @@ -6,7 +6,7 @@ use std::collections::{HashMap, HashSet}; use std::time::{Duration, Instant}; use negentropy::{Negentropy, NegentropyStorageVector}; -use nostr::{Event, Filter, PublicKey, Result, SubscriptionId, Timestamp}; +use nostr::{Event, PublicKey, Result, SubscriptionId, Timestamp}; pub(super) enum RateLimiterResponse { Allowed, @@ -73,7 +73,6 @@ impl Nip42Session { } pub(super) struct Session { - pub subscriptions: HashMap>, pub negentropy_subscription: HashMap>, pub nip42: Nip42Session, pub tokens: Tokens,