Skip to content

Commit

Permalink
[Product Data] Introduce data persistence on gateways (#5022)
Browse files Browse the repository at this point in the history
* add stats storage to gateways

* config fix

* add stats storage model and logic

* adapt stats collection to new storage

* stats cleanup on start

* change to linux only code

* tweaks

* modified stats cleanup + change session started

* change wrong table name

* store crashed session as 0 duration

* adapt for sqlx 0.7

* remove unused dependencies

* revert changes from gateway config, as it is broken anyway

* copyright and misc stuff

---------

Co-authored-by: Simon Wicky <simon@linode2-2.net>
  • Loading branch information
simonwicky and Simon Wicky authored Oct 28, 2024
1 parent e65bfae commit ab11508
Show file tree
Hide file tree
Showing 23 changed files with 2,273 additions and 204 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ members = [
"common/exit-policy",
"common/gateway-requests",
"common/gateway-storage",
"common/gateway-stats-storage",
"common/http-api-client",
"common/http-api-common",
"common/inclusion-probability",
Expand Down
34 changes: 34 additions & 0 deletions common/gateway-stats-storage/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[package]
name = "nym-gateway-stats-storage"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true

[dependencies]
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
"sqlite",
"macros",
"migrate",
"time",
] }
time = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }

nym-sphinx = { path = "../nymsphinx" }
nym-credentials-interface = { path = "../credentials-interface" }


[build-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
"sqlite",
"macros",
"migrate",
] }
28 changes: 28 additions & 0 deletions common/gateway-stats-storage/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only

use sqlx::{Connection, SqliteConnection};
use std::env;

#[tokio::main]
async fn main() {
let out_dir = env::var("OUT_DIR").unwrap();
let database_path = format!("{}/gateway-stats-example.sqlite", out_dir);

let mut conn = SqliteConnection::connect(&format!("sqlite://{}?mode=rwc", database_path))
.await
.expect("Failed to create SQLx database connection");

sqlx::migrate!("./migrations")
.run(&mut conn)
.await
.expect("Failed to perform SQLx migrations");

#[cfg(target_family = "unix")]
println!("cargo:rustc-env=DATABASE_URL=sqlite://{}", &database_path);

#[cfg(target_family = "windows")]
// for some strange reason we need to add a leading `/` to the windows path even though it's
// not a valid windows path... but hey, it works...
println!("cargo:rustc-env=DATABASE_URL=sqlite:///{}", &database_path);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
* SPDX-License-Identifier: GPL-3.0-only
*/

CREATE TABLE sessions_active
(
client_address TEXT NOT NULL PRIMARY KEY UNIQUE,
start_time TIMESTAMP WITHOUT TIME ZONE NOT NULL,
typ TEXT NOT NULL
);

CREATE TABLE sessions_finished
(
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
day DATE NOT NULL,
duration_ms INTEGER NOT NULL,
typ TEXT NOT NULL
);

CREATE TABLE sessions_unique_users
(
day DATE NOT NULL,
client_address TEXT NOT NULL,
PRIMARY KEY (day, client_address)
);
13 changes: 13 additions & 0 deletions common/gateway-stats-storage/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only

use thiserror::Error;

#[derive(Error, Debug)]
pub enum StatsStorageError {
#[error("Database experienced an internal error: {0}")]
InternalDatabaseError(#[from] sqlx::Error),

#[error("Failed to perform database migration: {0}")]
MigrationError(#[from] sqlx::migrate::MigrateError),
}
195 changes: 195 additions & 0 deletions common/gateway-stats-storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only

use error::StatsStorageError;
use models::{ActiveSession, FinishedSession, SessionType, StoredFinishedSession};
use nym_sphinx::DestinationAddressBytes;
use sessions::SessionManager;
use sqlx::ConnectOptions;
use std::path::Path;
use time::Date;
use tracing::{debug, error};

pub mod error;
pub mod models;
mod sessions;

// note that clone here is fine as upon cloning the same underlying pool will be used
#[derive(Clone)]
pub struct PersistentStatsStorage {
session_manager: SessionManager,
}

impl PersistentStatsStorage {
/// Initialises `PersistentStatsStorage` using the provided path.
///
/// # Arguments
///
/// * `database_path`: path to the database.
pub async fn init<P: AsRef<Path> + Send>(database_path: P) -> Result<Self, StatsStorageError> {
debug!(
"Attempting to connect to database {:?}",
database_path.as_ref().as_os_str()
);

// TODO: we can inject here more stuff based on our gateway global config
// struct. Maybe different pool size or timeout intervals?
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();

// TODO: do we want auto_vacuum ?

let connection_pool = match sqlx::SqlitePool::connect_with(opts).await {
Ok(db) => db,
Err(err) => {
error!("Failed to connect to SQLx database: {err}");
return Err(err.into());
}
};

if let Err(err) = sqlx::migrate!("./migrations").run(&connection_pool).await {
error!("Failed to perform migration on the SQLx database: {err}");
return Err(err.into());
}

// the cloning here are cheap as connection pool is stored behind an Arc
Ok(PersistentStatsStorage {
session_manager: sessions::SessionManager::new(connection_pool),
})
}

//Sessions fn
pub async fn insert_finished_session(
&self,
date: Date,
session: FinishedSession,
) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.insert_finished_session(
date,
session.duration.whole_milliseconds() as i64,
session.typ.to_string().into(),
)
.await?)
}

pub async fn get_finished_sessions(
&self,
date: Date,
) -> Result<Vec<StoredFinishedSession>, StatsStorageError> {
Ok(self.session_manager.get_finished_sessions(date).await?)
}

pub async fn delete_finished_sessions(
&self,
before_date: Date,
) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.delete_finished_sessions(before_date)
.await?)
}

pub async fn insert_unique_user(
&self,
date: Date,
client_address_bs58: String,
) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.insert_unique_user(date, client_address_bs58)
.await?)
}

pub async fn get_unique_users_count(&self, date: Date) -> Result<i32, StatsStorageError> {
Ok(self.session_manager.get_unique_users_count(date).await?)
}

pub async fn delete_unique_users(&self, before_date: Date) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.delete_unique_users(before_date)
.await?)
}

pub async fn insert_active_session(
&self,
client_address: DestinationAddressBytes,
session: ActiveSession,
) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.insert_active_session(
client_address.as_base58_string(),
session.start,
session.typ.to_string().into(),
)
.await?)
}

pub async fn update_active_session_type(
&self,
client_address: DestinationAddressBytes,
session_type: SessionType,
) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.update_active_session_type(
client_address.as_base58_string(),
session_type.to_string().into(),
)
.await?)
}

pub async fn get_active_session(
&self,
client_address: DestinationAddressBytes,
) -> Result<Option<ActiveSession>, StatsStorageError> {
Ok(self
.session_manager
.get_active_session(client_address.as_base58_string())
.await?
.map(Into::into))
}

pub async fn get_all_active_sessions(&self) -> Result<Vec<ActiveSession>, StatsStorageError> {
Ok(self
.session_manager
.get_all_active_sessions()
.await?
.into_iter()
.map(Into::into)
.collect())
}

pub async fn get_started_sessions_count(
&self,
start_date: Date,
) -> Result<i32, StatsStorageError> {
Ok(self
.session_manager
.get_started_sessions_count(start_date)
.await?)
}

pub async fn get_active_users(&self) -> Result<Vec<String>, StatsStorageError> {
Ok(self.session_manager.get_active_users().await?)
}

pub async fn delete_active_session(
&self,
client_address: DestinationAddressBytes,
) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.delete_active_session(client_address.as_base58_string())
.await?)
}

pub async fn cleanup_active_sessions(&self) -> Result<(), StatsStorageError> {
Ok(self.session_manager.cleanup_active_sessions().await?)
}
}
Loading

0 comments on commit ab11508

Please sign in to comment.