Skip to content

Commit

Permalink
Changed all tokio::fs to std::fs & consolidated persister
Browse files Browse the repository at this point in the history
  • Loading branch information
nobu-maeda committed Jan 31, 2024
1 parent 9c5f483 commit 87bbf39
Show file tree
Hide file tree
Showing 11 changed files with 453 additions and 578 deletions.
2 changes: 1 addition & 1 deletion src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pub mod error;
pub mod persist;
pub mod types;
pub mod utils;
115 changes: 115 additions & 0 deletions src/common/persist.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use ::log::{error, trace};
use std::{
fs::File,
io::Write,
path::Path,
sync::{
mpsc::{self, TrySendError},
Arc, RwLock, RwLockReadGuard,
},
};

use crate::common::{error::N3xbError, types::SerdeGenericTrait};

enum PersisterMsg {
Persist,
Close,
}

pub(crate) struct Persister {
persist_tx: mpsc::SyncSender<PersisterMsg>,
task_handle: std::thread::JoinHandle<()>,
}

impl Persister {
pub(crate) fn restore(data_path: impl AsRef<Path>) -> Result<String, N3xbError> {
let json: String = std::fs::read_to_string(data_path.as_ref())?;
Ok(json)
}

pub(crate) fn new(
store: Arc<RwLock<dyn SerdeGenericTrait>>,
data_path: impl AsRef<Path>,
) -> Self {
let (persist_tx, task_handle) = Self::setup_persistence(store, data_path);

Self {
persist_tx,
task_handle,
}
}

fn setup_persistence(
store: Arc<RwLock<dyn SerdeGenericTrait>>,
data_path: impl AsRef<Path>,
) -> (mpsc::SyncSender<PersisterMsg>, std::thread::JoinHandle<()>) {
let data_path_buf = data_path.as_ref().to_path_buf();

let (persist_tx, persist_rx) = mpsc::sync_channel(1);
let task_handle = std::thread::spawn(move || {
let data_path = data_path_buf.clone();
loop {
match persist_rx.recv() {
Ok(msg) => match msg {
PersisterMsg::Persist => {
let store = match store.read() {
Ok(store) => store,
Err(error) => {
error!("Error reading store - {}", error);
continue;
}
};
if let Some(error) = Self::persist(store, &data_path).err() {
error!(
"Error persisting data to path {} - {}",
data_path.display().to_string(),
error
);
}
}
PersisterMsg::Close => {
break;
}
},
Err(err) => {
error!("Persistance channel recv Error - {}", err);
break;
}
}
}
});
(persist_tx, task_handle)
}

fn persist(
store: RwLockReadGuard<'_, dyn SerdeGenericTrait>,
data_path: impl AsRef<Path>,
) -> Result<(), N3xbError> {
let json = serde_json::to_string(&*store)?;
let mut file = File::create(data_path.as_ref())?;
file.write_all(json.as_bytes())?;
file.sync_all()?;
Ok(())
}

pub(crate) fn queue(&self) {
match self.persist_tx.try_send(PersisterMsg::Persist) {
Ok(_) => {}
Err(error) => match error {
TrySendError::Full(_) => {
trace!("Persistence channel full")
}
TrySendError::Disconnected(_) => {
error!("Persistence channel disconnected")
}
},
}
}

pub(crate) fn terminate(self) {
self.persist_tx.send(PersisterMsg::Close).unwrap();
if let Some(error) = self.task_handle.join().err() {
error!("Error terminating persistence thread - {:?}", error);
}
}
}
13 changes: 0 additions & 13 deletions src/common/utils.rs

This file was deleted.

11 changes: 5 additions & 6 deletions src/comms/comms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ impl CommsActor {
data_dir_path: impl AsRef<Path>,
) -> Self {
let pubkey = client.keys().await.public_key();
let data = match CommsData::new(&data_dir_path, pubkey).await {
let data = match CommsData::new(&data_dir_path, pubkey) {
Ok(data) => data,
Err(error) => {
panic!(
Expand All @@ -396,7 +396,7 @@ impl CommsActor {
);
}
};
let relays = data.relays().await;
let relays = data.relays();

let actor = CommsActor {
rx,
Expand Down Expand Up @@ -614,7 +614,7 @@ impl CommsActor {
}

async fn handle_notification_event(&mut self, url: url::Url, event: Event) {
self.data.set_last_event(SystemTime::now()).await;
self.data.set_last_event(SystemTime::now());

if let Kind::EncryptedDirectMessage = event.kind {
self.handle_direct_message(url, event).await;
Expand Down Expand Up @@ -707,7 +707,7 @@ impl CommsActor {
rsp_tx.send(Err(error)).unwrap(); // Oneshot should not fail
return;
}
self.data.add_relays(relay_addrs.clone()).await;
self.data.add_relays(relay_addrs.clone());

let relay_urls: Vec<url::Url> = relay_addrs.iter().map(|(url, _)| url.clone()).collect();

Expand Down Expand Up @@ -784,7 +784,7 @@ impl CommsActor {
match result {
Ok(_) => {
rsp_tx.send(Ok(())).unwrap();
self.data.remove_relay(&relay_url).await;
self.data.remove_relay(&relay_url);
}
Err(error) => rsp_tx.send(Err(error.into())).unwrap(),
};
Expand Down Expand Up @@ -963,7 +963,6 @@ impl CommsActor {
let unix_epoch_secs = self
.data
.last_event()
.await
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
Expand Down
Loading

0 comments on commit 87bbf39

Please sign in to comment.