Skip to content

Commit

Permalink
Track processed event IDs instead of last event time
Browse files Browse the repository at this point in the history
  • Loading branch information
nobu-maeda committed Apr 24, 2024
1 parent e285fc0 commit 8b40cd3
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 69 deletions.
78 changes: 17 additions & 61 deletions src/comms/comms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::path::Path;
use std::str::FromStr;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::time::Duration;
use tracing::{debug, error, info, trace, warn};

use nostr_sdk::prelude::*;
Expand Down Expand Up @@ -611,10 +611,11 @@ impl CommsActor {
self.handle_notification_event(url::Url::from_str(url.as_str()).unwrap(), event)
.await;
}
RelayPoolNotification::Message(url, _relay_message) => {
RelayPoolNotification::Message(url, relay_message) => {
trace!(
"Comms w/ pubkey {} handle_notification(), dropping Relay Message from url {}",
"Comms w/ pubkey {} handle_notification(), dropping Relay Message {:?} from url {}",
self.pubkey,
relay_message,
url.to_string()
);
}
Expand All @@ -624,10 +625,11 @@ impl CommsActor {
self.pubkey
);
}
RelayPoolNotification::RelayStatus { url, status: _ } => {
RelayPoolNotification::RelayStatus { url, status } => {
trace!(
"Comms w/ pubkey {} handle_notification(), dropping Relay Status from url {}",
"Comms w/ pubkey {} handle_notification(), dropping Relay Status {:?} from url {}",
self.pubkey,
status,
url.to_string()
);
}
Expand All @@ -636,7 +638,15 @@ impl CommsActor {
}

async fn handle_notification_event(&mut self, url: url::Url, event: Event) {
self.data.set_last_event(SystemTime::now());
let event_id = event.id.to_string();
if self.data.event_id_seen(&event_id) {
debug!(
"Comms w/ pubkey {} handle_notification_event() EventID {} already seen",
self.pubkey, event.id
);
return;
}
self.data.store_event_id(&event_id);

if let Kind::EncryptedDirectMessage = event.kind {
self.handle_direct_message(url, event).await;
Expand Down Expand Up @@ -863,7 +873,6 @@ impl CommsActor {

async fn connect_all_relays(&mut self, rsp_tx: oneshot::Sender<Result<(), N3xbError>>) {
self.client.connect().await;
self.resync_peer_messages().await.unwrap();
rsp_tx.send(Ok(())).unwrap();
}

Expand All @@ -872,7 +881,7 @@ impl CommsActor {
// Need a way to correlate State Machines to Subscriptions as to remove filters as necessary

// Subscribe to all DM to own pubkey. Filter unrecognized DM out some other way. Can be spam prone
let dm_filter = Filter::new().since(Timestamp::now()).pubkey(pubkey);
let dm_filter = Filter::new().pubkey(pubkey);
vec![dm_filter]
}

Expand Down Expand Up @@ -984,59 +993,6 @@ impl CommsActor {
.collect()
}

#[allow(dead_code)]
async fn handle_resync_events(&mut self, events: Vec<Event>) {
for event in events {
// Get relays this event is seen in
let relay_urls = self
.client
.database()
.event_recently_seen_on_relays(event.id)
.await
.unwrap()
.unwrap();

let urls: Vec<url::Url> = relay_urls
.iter()
.map(|url| url::Url::parse(url.as_str()).unwrap())
.collect();

let url = urls.first().unwrap().to_owned();

self.handle_notification_event(url, event).await;
}
}

async fn resync_peer_messages(&mut self) -> Result<(), N3xbError> {
let pubkey = self.pubkey;
let unix_epoch_secs = self
.data
.last_event()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
println!("unix_epoch_secs: {}", unix_epoch_secs);
let timestamp = Timestamp::from(unix_epoch_secs);
let filters = vec![Filter::new().since(timestamp).pubkey(pubkey)];
let timeout = Duration::from_secs(1);

match self.client.get_events_of(filters, Some(timeout)).await {
Ok(events) => {
debug!(
"Comms w/ pubkey {} got {} events on resync_peer_messages()",
pubkey,
events.len()
);
// These events somehow also get caught in the subscription filter,
// but only if get_events_of is called. If we handle these here also,
// we end up with duplicate event notifications
// self.handle_resync_events(events).await;
Ok(())
}
Err(error) => Err(error.into()),
}
}

// Query Order Notes

async fn query_orders(
Expand Down
15 changes: 7 additions & 8 deletions src/comms/data.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::{
collections::HashMap,
collections::{HashMap, HashSet},
net::SocketAddr,
path::{Path, PathBuf},
sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
time::SystemTime,
};
use tracing::debug;

Expand All @@ -16,7 +15,7 @@ use crate::common::{error::N3xbError, persist::Persister, types::SerdeGenericTra
struct CommsDataStore {
relays: HashMap<url::Url, Option<SocketAddr>>,
// filters:
last_event: SystemTime,
event_ids: HashSet<String>,
}

#[typetag::serde(name = "n3xb_comms_data")]
Expand All @@ -40,7 +39,7 @@ impl CommsData {

let mut store = CommsDataStore {
relays: HashMap::new(),
last_event: SystemTime::now(),
event_ids: HashSet::new(),
};

if data_path.exists() {
Expand Down Expand Up @@ -126,13 +125,13 @@ impl CommsData {
self.persister.queue();
}

pub(crate) fn last_event(&self) -> SystemTime {
self.read_store().last_event
pub(crate) fn event_id_seen(&self, event_id: impl Into<String>) -> bool {
self.read_store().event_ids.contains(&event_id.into())
}

pub(crate) fn set_last_event(&self, last_event: SystemTime) {
pub(crate) fn store_event_id(&self, event_id: impl Into<String>) {
let mut store = self.write_store();
store.last_event = last_event;
store.event_ids.insert(event_id.into());
self.persister.queue();
}

Expand Down

0 comments on commit 8b40cd3

Please sign in to comment.