From ee5527d943dec76634d3bf78f1e2bf40fddde442 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Fri, 17 Jan 2025 09:43:58 +0000 Subject: [PATCH] refactor: [#1184] remove stats functionality from core tracker --- src/app.rs | 18 +- src/bootstrap/app.rs | 37 +- src/bootstrap/jobs/http_tracker.rs | 19 +- src/bootstrap/jobs/tracker_apis.rs | 66 ++- src/bootstrap/jobs/udp_tracker.rs | 6 +- src/console/profiling.rs | 4 +- src/core/mod.rs | 63 +-- src/core/services/mod.rs | 14 +- src/core/services/statistics/mod.rs | 27 +- src/core/services/torrent.rs | 101 ++-- src/main.rs | 4 +- src/servers/apis/routes.rs | 15 +- src/servers/apis/server.rs | 57 ++- src/servers/apis/v1/context/stats/handlers.rs | 6 +- src/servers/apis/v1/context/stats/routes.rs | 13 +- src/servers/apis/v1/routes.rs | 20 +- src/servers/http/server.rs | 31 +- src/servers/http/v1/handlers/announce.rs | 156 ++++-- src/servers/http/v1/handlers/scrape.rs | 137 +++-- src/servers/http/v1/routes.rs | 25 +- src/servers/http/v1/services/announce.rs | 90 ++-- src/servers/http/v1/services/scrape.rs | 102 ++-- src/servers/udp/handlers.rs | 481 ++++++++++++------ src/servers/udp/server/launcher.rs | 50 +- src/servers/udp/server/mod.rs | 27 +- src/servers/udp/server/processor.rs | 45 +- src/servers/udp/server/spawner.rs | 13 +- src/servers/udp/server/states.rs | 16 +- tests/servers/api/environment.rs | 29 +- tests/servers/http/environment.rs | 16 +- tests/servers/udp/environment.rs | 19 +- 31 files changed, 1159 insertions(+), 548 deletions(-) diff --git a/src/app.rs b/src/app.rs index 1cfc57c2e..14dc0b07f 100644 --- a/src/app.rs +++ b/src/app.rs @@ -29,6 +29,8 @@ use torrust_tracker_configuration::Configuration; use tracing::instrument; use crate::bootstrap::jobs::{health_check_api, http_tracker, torrent_cleanup, tracker_apis, udp_tracker}; +use crate::core::statistics::event::sender::Sender; +use crate::core::statistics::repository::Repository; use crate::servers::registar::Registar; use crate::servers::udp::server::banning::BanService; use crate::{core, servers}; @@ -39,11 +41,13 @@ use crate::{core, servers}; /// /// - Can't retrieve tracker keys from database. /// - Can't load whitelist from database. -#[instrument(skip(config, tracker, ban_service))] +#[instrument(skip(config, tracker, ban_service, stats_event_sender, stats_repository))] pub async fn start( config: &Configuration, tracker: Arc, ban_service: Arc>, + stats_event_sender: Arc>>, + stats_repository: Arc, ) -> Vec> { if config.http_api.is_none() && (config.udp_trackers.is_none() || config.udp_trackers.as_ref().map_or(true, std::vec::Vec::is_empty)) @@ -83,7 +87,14 @@ pub async fn start( ); } else { jobs.push( - udp_tracker::start_job(udp_tracker_config, tracker.clone(), ban_service.clone(), registar.give_form()).await, + udp_tracker::start_job( + udp_tracker_config, + tracker.clone(), + stats_event_sender.clone(), + ban_service.clone(), + registar.give_form(), + ) + .await, ); } } @@ -97,6 +108,7 @@ pub async fn start( if let Some(job) = http_tracker::start_job( http_tracker_config, tracker.clone(), + stats_event_sender.clone(), registar.give_form(), servers::http::Version::V1, ) @@ -115,6 +127,8 @@ pub async fn start( http_api_config, tracker.clone(), ban_service.clone(), + stats_event_sender.clone(), + stats_repository.clone(), registar.give_form(), servers::apis::Version::V1, ) diff --git a/src/bootstrap/app.rs b/src/bootstrap/app.rs index 2c6c23ab9..d63b414e1 100644 --- a/src/bootstrap/app.rs +++ b/src/bootstrap/app.rs @@ -38,8 +38,15 @@ use crate::shared::crypto::keys::{self, Keeper as _}; /// /// Setup can file if the configuration is invalid. #[must_use] +#[allow(clippy::type_complexity)] #[instrument(skip())] -pub fn setup() -> (Configuration, Arc, Arc>) { +pub fn setup() -> ( + Configuration, + Arc, + Arc>, + Arc>>, + Arc, +) { #[cfg(not(test))] check_seed(); @@ -49,13 +56,19 @@ pub fn setup() -> (Configuration, Arc, Arc>) { panic!("Configuration error: {e}"); } - let tracker = initialize_with_configuration(&configuration); + // Initialize services + + let (stats_event_sender, stats_repository) = statistics::setup::factory(configuration.core.tracker_usage_statistics); + let stats_event_sender = Arc::new(stats_event_sender); + let stats_repository = Arc::new(stats_repository); let udp_ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); + let tracker = initialize_with_configuration(&configuration); + tracing::info!("Configuration:\n{}", configuration.clone().mask_secrets().to_json()); - (configuration, tracker, udp_ban_service) + (configuration, tracker, udp_ban_service, stats_event_sender, stats_repository) } /// checks if the seed is the instance seed in production. @@ -109,28 +122,18 @@ pub fn initialize_static() { #[must_use] #[instrument(skip(config))] pub fn initialize_tracker(config: &Configuration) -> Tracker { - let (database, whitelist_manager, stats_event_sender, stats_repository) = initialize_tracker_dependencies(config); + let (database, whitelist_manager) = initialize_tracker_dependencies(config); - tracker_factory(config, &database, &whitelist_manager, &stats_event_sender, &stats_repository) + tracker_factory(config, &database, &whitelist_manager) } #[allow(clippy::type_complexity)] #[must_use] -pub fn initialize_tracker_dependencies( - config: &Configuration, -) -> ( - Arc>, - Arc, - Arc>>, - Arc, -) { +pub fn initialize_tracker_dependencies(config: &Configuration) -> (Arc>, Arc) { let database = initialize_database(config); let whitelist_manager = initialize_whitelist(database.clone()); - let (stats_event_sender, stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); - let stats_event_sender = Arc::new(stats_event_sender); - let stats_repository = Arc::new(stats_repository); - (database, whitelist_manager, stats_event_sender, stats_repository) + (database, whitelist_manager) } /// It initializes the log threshold, format and channel. diff --git a/src/bootstrap/jobs/http_tracker.rs b/src/bootstrap/jobs/http_tracker.rs index c55723bc6..058acfcf1 100644 --- a/src/bootstrap/jobs/http_tracker.rs +++ b/src/bootstrap/jobs/http_tracker.rs @@ -19,7 +19,8 @@ use torrust_tracker_configuration::HttpTracker; use tracing::instrument; use super::make_rust_tls; -use crate::core; +use crate::core::statistics::event::sender::Sender; +use crate::core::{self, statistics}; use crate::servers::http::server::{HttpServer, Launcher}; use crate::servers::http::Version; use crate::servers::registar::ServiceRegistrationForm; @@ -33,10 +34,11 @@ use crate::servers::registar::ServiceRegistrationForm; /// /// It would panic if the `config::HttpTracker` struct would contain inappropriate values. /// -#[instrument(skip(config, tracker, form))] +#[instrument(skip(config, tracker, stats_event_sender, form))] pub async fn start_job( config: &HttpTracker, tracker: Arc, + stats_event_sender: Arc>>, form: ServiceRegistrationForm, version: Version, ) -> Option> { @@ -47,20 +49,21 @@ pub async fn start_job( .map(|tls| tls.expect("it should have a valid http tracker tls configuration")); match version { - Version::V1 => Some(start_v1(socket, tls, tracker.clone(), form).await), + Version::V1 => Some(start_v1(socket, tls, tracker.clone(), stats_event_sender.clone(), form).await), } } #[allow(clippy::async_yields_async)] -#[instrument(skip(socket, tls, tracker, form))] +#[instrument(skip(socket, tls, tracker, stats_event_sender, form))] async fn start_v1( socket: SocketAddr, tls: Option, tracker: Arc, + stats_event_sender: Arc>>, form: ServiceRegistrationForm, ) -> JoinHandle<()> { let server = HttpServer::new(Launcher::new(socket, tls)) - .start(tracker, form) + .start(tracker, stats_event_sender, form) .await .expect("it should be able to start to the http tracker"); @@ -85,6 +88,7 @@ mod tests { use crate::bootstrap::app::initialize_with_configuration; use crate::bootstrap::jobs::http_tracker::start_job; + use crate::core::services::statistics; use crate::servers::http::Version; use crate::servers::registar::Registar; @@ -93,10 +97,13 @@ mod tests { let cfg = Arc::new(ephemeral_public()); let http_tracker = cfg.http_trackers.clone().expect("missing HTTP tracker configuration"); let config = &http_tracker[0]; + let (stats_event_sender, stats_repository) = statistics::setup::factory(cfg.core.tracker_usage_statistics); + let stats_event_sender = Arc::new(stats_event_sender); + let _stats_repository = Arc::new(stats_repository); let tracker = initialize_with_configuration(&cfg); let version = Version::V1; - start_job(config, tracker, Registar::default().give_form(), version) + start_job(config, tracker, stats_event_sender, Registar::default().give_form(), version) .await .expect("it should be able to join to the http tracker start-job"); } diff --git a/src/bootstrap/jobs/tracker_apis.rs b/src/bootstrap/jobs/tracker_apis.rs index 858888540..d84bb08a9 100644 --- a/src/bootstrap/jobs/tracker_apis.rs +++ b/src/bootstrap/jobs/tracker_apis.rs @@ -31,6 +31,8 @@ use tracing::instrument; use super::make_rust_tls; use crate::core; +use crate::core::statistics::event::sender::Sender; +use crate::core::statistics::repository::Repository; use crate::servers::apis::server::{ApiServer, Launcher}; use crate::servers::apis::Version; use crate::servers::registar::ServiceRegistrationForm; @@ -56,11 +58,13 @@ pub struct ApiServerJobStarted(); /// It would panic if unable to send the `ApiServerJobStarted` notice. /// /// -#[instrument(skip(config, tracker, ban_service, form))] +#[instrument(skip(config, tracker, ban_service, stats_event_sender, stats_repository, form))] pub async fn start_job( config: &HttpApi, tracker: Arc, ban_service: Arc>, + stats_event_sender: Arc>>, + stats_repository: Arc, form: ServiceRegistrationForm, version: Version, ) -> Option> { @@ -73,22 +77,53 @@ pub async fn start_job( let access_tokens = Arc::new(config.access_tokens.clone()); match version { - Version::V1 => Some(start_v1(bind_to, tls, tracker.clone(), ban_service.clone(), form, access_tokens).await), + Version::V1 => Some( + start_v1( + bind_to, + tls, + tracker.clone(), + ban_service.clone(), + stats_event_sender.clone(), + stats_repository.clone(), + form, + access_tokens, + ) + .await, + ), } } #[allow(clippy::async_yields_async)] -#[instrument(skip(socket, tls, tracker, ban_service, form, access_tokens))] +#[allow(clippy::too_many_arguments)] +#[instrument(skip( + socket, + tls, + tracker, + ban_service, + stats_event_sender, + stats_repository, + form, + access_tokens +))] async fn start_v1( socket: SocketAddr, tls: Option, tracker: Arc, ban_service: Arc>, + stats_event_sender: Arc>>, + stats_repository: Arc, form: ServiceRegistrationForm, access_tokens: Arc, ) -> JoinHandle<()> { let server = ApiServer::new(Launcher::new(socket, tls)) - .start(tracker, ban_service, form, access_tokens) + .start( + tracker, + stats_event_sender, + stats_repository, + ban_service, + form, + access_tokens, + ) .await .expect("it should be able to start to the tracker api"); @@ -107,6 +142,7 @@ mod tests { use crate::bootstrap::app::initialize_with_configuration; use crate::bootstrap::jobs::tracker_apis::start_job; + use crate::core::services::statistics; use crate::servers::apis::Version; use crate::servers::registar::Registar; use crate::servers::udp::server::banning::BanService; @@ -116,12 +152,26 @@ mod tests { async fn it_should_start_http_tracker() { let cfg = Arc::new(ephemeral_public()); let config = &cfg.http_api.clone().unwrap(); - let tracker = initialize_with_configuration(&cfg); + let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); + let (stats_event_sender, stats_repository) = statistics::setup::factory(cfg.core.tracker_usage_statistics); + let stats_event_sender = Arc::new(stats_event_sender); + let stats_repository = Arc::new(stats_repository); + + let tracker = initialize_with_configuration(&cfg); + let version = Version::V1; - start_job(config, tracker, ban_service, Registar::default().give_form(), version) - .await - .expect("it should be able to join to the tracker api start-job"); + start_job( + config, + tracker, + ban_service, + stats_event_sender, + stats_repository, + Registar::default().give_form(), + version, + ) + .await + .expect("it should be able to join to the tracker api start-job"); } } diff --git a/src/bootstrap/jobs/udp_tracker.rs b/src/bootstrap/jobs/udp_tracker.rs index 8948811af..105c7f723 100644 --- a/src/bootstrap/jobs/udp_tracker.rs +++ b/src/bootstrap/jobs/udp_tracker.rs @@ -14,6 +14,7 @@ use torrust_tracker_configuration::UdpTracker; use tracing::instrument; use crate::core; +use crate::core::statistics::event::sender::Sender; use crate::servers::registar::ServiceRegistrationForm; use crate::servers::udp::server::banning::BanService; use crate::servers::udp::server::spawner::Spawner; @@ -31,10 +32,11 @@ use crate::servers::udp::UDP_TRACKER_LOG_TARGET; /// It will panic if the task did not finish successfully. #[must_use] #[allow(clippy::async_yields_async)] -#[instrument(skip(config, tracker, ban_service, form))] +#[instrument(skip(config, tracker, stats_event_sender, ban_service, form))] pub async fn start_job( config: &UdpTracker, tracker: Arc, + stats_event_sender: Arc>>, ban_service: Arc>, form: ServiceRegistrationForm, ) -> JoinHandle<()> { @@ -42,7 +44,7 @@ pub async fn start_job( let cookie_lifetime = config.cookie_lifetime; let server = Server::new(Spawner::new(bind_to)) - .start(tracker, ban_service, form, cookie_lifetime) + .start(tracker, stats_event_sender, ban_service, form, cookie_lifetime) .await .expect("it should be able to start the udp tracker"); diff --git a/src/console/profiling.rs b/src/console/profiling.rs index 1d31af3ce..2f6471906 100644 --- a/src/console/profiling.rs +++ b/src/console/profiling.rs @@ -179,9 +179,9 @@ pub async fn run() { return; }; - let (config, tracker, ban_service) = bootstrap::app::setup(); + let (config, tracker, ban_service, stats_event_sender, stats_repository) = bootstrap::app::setup(); - let jobs = app::start(&config, tracker, ban_service).await; + let jobs = app::start(&config, tracker, ban_service, stats_event_sender, stats_repository).await; // Run the tracker for a fixed duration let run_duration = sleep(Duration::from_secs(duration_secs)); diff --git a/src/core/mod.rs b/src/core/mod.rs index d61474c2c..cd911ca28 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -422,7 +422,7 @@ //! For example, the HTTP tracker would send an event like the following when it handles an `announce` request received from a peer using IP version 4. //! //! ```text -//! tracker.send_stats_event(statistics::event::Event::Tcp4Announce).await +//! stats_event_sender.send_stats_event(statistics::event::Event::Tcp4Announce).await //! ``` //! //! Refer to [`statistics`] module for more information about statistics. @@ -458,7 +458,6 @@ use std::time::Duration; use auth::PeerKey; use bittorrent_primitives::info_hash::InfoHash; use error::PeerKeyError; -use tokio::sync::mpsc::error::SendError; use torrust_tracker_clock::clock::Time; use torrust_tracker_configuration::{AnnouncePolicy, Core, TORRENT_PEERS_LIMIT}; use torrust_tracker_located_error::Located; @@ -502,12 +501,6 @@ pub struct Tracker { /// The in-memory torrents repository. torrents: Arc, - - /// Service to send stats events. - stats_event_sender: Arc>>, - - /// The in-memory stats repo. - stats_repository: Arc, } /// How many peers the peer announcing wants in the announce response. @@ -576,8 +569,6 @@ impl Tracker { config: &Core, database: &Arc>, whitelist_manager: &Arc, - stats_event_sender: &Arc>>, - stats_repository: &Arc, ) -> Result { Ok(Tracker { config: config.clone(), @@ -585,8 +576,6 @@ impl Tracker { keys: tokio::sync::RwLock::new(std::collections::HashMap::new()), whitelist_manager: whitelist_manager.clone(), torrents: Arc::default(), - stats_event_sender: stats_event_sender.clone(), - stats_repository: stats_repository.clone(), }) } @@ -1054,26 +1043,6 @@ impl Tracker { }) } - /// It return the `Tracker` [`statistics::metrics::Metrics`]. - /// - /// # Context: Statistics - pub async fn get_stats(&self) -> tokio::sync::RwLockReadGuard<'_, statistics::metrics::Metrics> { - self.stats_repository.get_stats().await - } - - /// It allows to send a statistic events which eventually will be used to update [`statistics::metrics::Metrics`]. - /// - /// # Context: Statistics - pub async fn send_stats_event( - &self, - event: statistics::event::Event, - ) -> Option>> { - match &*self.stats_event_sender { - None => None, - Some(stats_event_sender) => stats_event_sender.send_event(event).await, - } - } - /// It drops the database tables. /// /// # Errors @@ -1113,26 +1082,35 @@ mod tests { use crate::bootstrap::app::initialize_tracker_dependencies; use crate::core::peer::Peer; - use crate::core::services::tracker_factory; + use crate::core::services::{statistics, tracker_factory}; use crate::core::whitelist::WhiteListManager; use crate::core::{TorrentsMetrics, Tracker}; fn public_tracker() -> Tracker { let config = configuration::ephemeral_public(); - let (database, whitelist_manager, stats_event_sender, stats_repository) = initialize_tracker_dependencies(&config); - tracker_factory(&config, &database, &whitelist_manager, &stats_event_sender, &stats_repository) + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); + let (stats_event_sender, stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); + let _stats_event_sender = Arc::new(stats_event_sender); + let _stats_repository = Arc::new(stats_repository); + tracker_factory(&config, &database, &whitelist_manager) } fn private_tracker() -> Tracker { let config = configuration::ephemeral_private(); - let (database, whitelist_manager, stats_event_sender, stats_repository) = initialize_tracker_dependencies(&config); - tracker_factory(&config, &database, &whitelist_manager, &stats_event_sender, &stats_repository) + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); + let (stats_event_sender, stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); + let _stats_event_sender = Arc::new(stats_event_sender); + let _stats_repository = Arc::new(stats_repository); + tracker_factory(&config, &database, &whitelist_manager) } fn whitelisted_tracker() -> (Tracker, Arc) { let config = configuration::ephemeral_listed(); - let (database, whitelist_manager, stats_event_sender, stats_repository) = initialize_tracker_dependencies(&config); - let tracker = tracker_factory(&config, &database, &whitelist_manager, &stats_event_sender, &stats_repository); + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); + let (stats_event_sender, stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); + let _stats_event_sender = Arc::new(stats_event_sender); + let _stats_repository = Arc::new(stats_repository); + let tracker = tracker_factory(&config, &database, &whitelist_manager); (tracker, whitelist_manager) } @@ -1140,8 +1118,11 @@ mod tests { pub fn tracker_persisting_torrents_in_database() -> Tracker { let mut config = configuration::ephemeral_listed(); config.core.tracker_policy.persistent_torrent_completed_stat = true; - let (database, whitelist_manager, stats_event_sender, stats_repository) = initialize_tracker_dependencies(&config); - tracker_factory(&config, &database, &whitelist_manager, &stats_event_sender, &stats_repository) + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); + let (stats_event_sender, stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); + let _stats_event_sender = Arc::new(stats_event_sender); + let _stats_repository = Arc::new(stats_repository); + tracker_factory(&config, &database, &whitelist_manager) } fn sample_info_hash() -> InfoHash { diff --git a/src/core/services/mod.rs b/src/core/services/mod.rs index 4034925cd..d3336068c 100644 --- a/src/core/services/mod.rs +++ b/src/core/services/mod.rs @@ -14,8 +14,6 @@ use torrust_tracker_configuration::v2_0_0::database; use torrust_tracker_configuration::Configuration; use super::databases::{self, Database}; -use super::statistics::event::sender::Sender; -use super::statistics::repository::Repository; use super::whitelist::persisted::DatabaseWhitelist; use super::whitelist::WhiteListManager; use crate::core::Tracker; @@ -30,18 +28,8 @@ pub fn tracker_factory( config: &Configuration, database: &Arc>, whitelist_manager: &Arc, - stats_event_sender: &Arc>>, - stats_repository: &Arc, ) -> Tracker { - //let (stats_event_sender, stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); - - match Tracker::new( - &Arc::new(config).core, - database, - whitelist_manager, - stats_event_sender, - stats_repository, - ) { + match Tracker::new(&Arc::new(config).core, database, whitelist_manager) { Ok(tracker) => tracker, Err(error) => { panic!("{}", error) diff --git a/src/core/services/statistics/mod.rs b/src/core/services/statistics/mod.rs index 3c44bc310..c001391b0 100644 --- a/src/core/services/statistics/mod.rs +++ b/src/core/services/statistics/mod.rs @@ -44,6 +44,7 @@ use tokio::sync::RwLock; use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; use crate::core::statistics::metrics::Metrics; +use crate::core::statistics::repository::Repository; use crate::core::Tracker; use crate::servers::udp::server::banning::BanService; @@ -62,9 +63,13 @@ pub struct TrackerMetrics { } /// It returns all the [`TrackerMetrics`] -pub async fn get_metrics(tracker: Arc, ban_service: Arc>) -> TrackerMetrics { +pub async fn get_metrics( + tracker: Arc, + ban_service: Arc>, + stats_repository: Arc, +) -> TrackerMetrics { let torrents_metrics = tracker.get_torrents_metrics(); - let stats = tracker.get_stats().await; + let stats = stats_repository.get_stats().await; let udp_banned_ips_total = ban_service.read().await.get_banned_ips_total(); TrackerMetrics { @@ -114,7 +119,7 @@ mod tests { use crate::bootstrap::app::initialize_tracker_dependencies; use crate::core; - use crate::core::services::statistics::{get_metrics, TrackerMetrics}; + use crate::core::services::statistics::{self, get_metrics, TrackerMetrics}; use crate::core::services::tracker_factory; use crate::servers::udp::server::banning::BanService; use crate::servers::udp::server::launcher::MAX_CONNECTION_ID_ERRORS_PER_IP; @@ -127,18 +132,16 @@ mod tests { async fn the_statistics_service_should_return_the_tracker_metrics() { let config = tracker_configuration(); - let (database, whitelist_manager, stats_event_sender, stats_repository) = initialize_tracker_dependencies(&config); - let tracker = Arc::new(tracker_factory( - &config, - &database, - &whitelist_manager, - &stats_event_sender, - &stats_repository, - )); + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); + let (stats_event_sender, stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); + let _stats_event_sender = Arc::new(stats_event_sender); + let stats_repository = Arc::new(stats_repository); + + let tracker = Arc::new(tracker_factory(&config, &database, &whitelist_manager)); let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); - let tracker_metrics = get_metrics(tracker.clone(), ban_service.clone()).await; + let tracker_metrics = get_metrics(tracker.clone(), ban_service.clone(), stats_repository.clone()).await; assert_eq!( tracker_metrics, diff --git a/src/core/services/torrent.rs b/src/core/services/torrent.rs index a4db67979..aa4749795 100644 --- a/src/core/services/torrent.rs +++ b/src/core/services/torrent.rs @@ -132,7 +132,7 @@ mod tests { use crate::bootstrap::app::initialize_tracker_dependencies; use crate::core::services::torrent::tests::sample_peer; use crate::core::services::torrent::{get_torrent_info, Info}; - use crate::core::services::tracker_factory; + use crate::core::services::{statistics, tracker_factory}; pub fn tracker_configuration() -> Configuration { configuration::ephemeral() @@ -142,8 +142,11 @@ mod tests { async fn should_return_none_if_the_tracker_does_not_have_the_torrent() { let config = tracker_configuration(); - let (database, whitelist_manager, stats_event_sender, stats_repository) = initialize_tracker_dependencies(&config); - let tracker = tracker_factory(&config, &database, &whitelist_manager, &stats_event_sender, &stats_repository); + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); + let (stats_event_sender, stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); + let _stats_event_sender = Arc::new(stats_event_sender); + let _stats_repository = Arc::new(stats_repository); + let tracker = tracker_factory(&config, &database, &whitelist_manager); let tracker = Arc::new(tracker); @@ -159,14 +162,13 @@ mod tests { #[tokio::test] async fn should_return_the_torrent_info_if_the_tracker_has_the_torrent() { let config = tracker_configuration(); - let (database, whitelist_manager, stats_event_sender, stats_repository) = initialize_tracker_dependencies(&config); - let tracker = Arc::new(tracker_factory( - &config, - &database, - &whitelist_manager, - &stats_event_sender, - &stats_repository, - )); + + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); + let (stats_event_sender, stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); + let _stats_event_sender = Arc::new(stats_event_sender); + let _stats_repository = Arc::new(stats_repository); + + let tracker = Arc::new(tracker_factory(&config, &database, &whitelist_manager)); let hash = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); let info_hash = InfoHash::from_str(&hash).unwrap(); @@ -199,7 +201,7 @@ mod tests { use crate::bootstrap::app::initialize_tracker_dependencies; use crate::core::services::torrent::tests::sample_peer; use crate::core::services::torrent::{get_torrents_page, BasicInfo, Pagination}; - use crate::core::services::tracker_factory; + use crate::core::services::{statistics, tracker_factory}; pub fn tracker_configuration() -> Configuration { configuration::ephemeral() @@ -208,14 +210,13 @@ mod tests { #[tokio::test] async fn should_return_an_empty_result_if_the_tracker_does_not_have_any_torrent() { let config = tracker_configuration(); - let (database, whitelist_manager, stats_event_sender, stats_repository) = initialize_tracker_dependencies(&config); - let tracker = Arc::new(tracker_factory( - &config, - &database, - &whitelist_manager, - &stats_event_sender, - &stats_repository, - )); + + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); + let (stats_event_sender, stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); + let _stats_event_sender = Arc::new(stats_event_sender); + let _stats_repository = Arc::new(stats_repository); + + let tracker = Arc::new(tracker_factory(&config, &database, &whitelist_manager)); let torrents = get_torrents_page(tracker.clone(), Some(&Pagination::default())).await; @@ -225,14 +226,13 @@ mod tests { #[tokio::test] async fn should_return_a_summarized_info_for_all_torrents() { let config = tracker_configuration(); - let (database, whitelist_manager, stats_event_sender, stats_repository) = initialize_tracker_dependencies(&config); - let tracker = Arc::new(tracker_factory( - &config, - &database, - &whitelist_manager, - &stats_event_sender, - &stats_repository, - )); + + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); + let (stats_event_sender, stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); + let _stats_event_sender = Arc::new(stats_event_sender); + let _stats_repository = Arc::new(stats_repository); + + let tracker = Arc::new(tracker_factory(&config, &database, &whitelist_manager)); let hash = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); let info_hash = InfoHash::from_str(&hash).unwrap(); @@ -255,14 +255,13 @@ mod tests { #[tokio::test] async fn should_allow_limiting_the_number_of_torrents_in_the_result() { let config = tracker_configuration(); - let (database, whitelist_manager, stats_event_sender, stats_repository) = initialize_tracker_dependencies(&config); - let tracker = Arc::new(tracker_factory( - &config, - &database, - &whitelist_manager, - &stats_event_sender, - &stats_repository, - )); + + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); + let (stats_event_sender, stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); + let _stats_event_sender = Arc::new(stats_event_sender); + let _stats_repository = Arc::new(stats_repository); + + let tracker = Arc::new(tracker_factory(&config, &database, &whitelist_manager)); let hash1 = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); let info_hash1 = InfoHash::from_str(&hash1).unwrap(); @@ -283,14 +282,13 @@ mod tests { #[tokio::test] async fn should_allow_using_pagination_in_the_result() { let config = tracker_configuration(); - let (database, whitelist_manager, stats_event_sender, stats_repository) = initialize_tracker_dependencies(&config); - let tracker = Arc::new(tracker_factory( - &config, - &database, - &whitelist_manager, - &stats_event_sender, - &stats_repository, - )); + + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); + let (stats_event_sender, stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); + let _stats_event_sender = Arc::new(stats_event_sender); + let _stats_repository = Arc::new(stats_repository); + + let tracker = Arc::new(tracker_factory(&config, &database, &whitelist_manager)); let hash1 = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); let info_hash1 = InfoHash::from_str(&hash1).unwrap(); @@ -320,14 +318,13 @@ mod tests { #[tokio::test] async fn should_return_torrents_ordered_by_info_hash() { let config = tracker_configuration(); - let (database, whitelist_manager, stats_event_sender, stats_repository) = initialize_tracker_dependencies(&config); - let tracker = Arc::new(tracker_factory( - &config, - &database, - &whitelist_manager, - &stats_event_sender, - &stats_repository, - )); + + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); + let (stats_event_sender, stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); + let _stats_event_sender = Arc::new(stats_event_sender); + let _stats_repository = Arc::new(stats_repository); + + let tracker = Arc::new(tracker_factory(&config, &database, &whitelist_manager)); let hash1 = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); let info_hash1 = InfoHash::from_str(&hash1).unwrap(); diff --git a/src/main.rs b/src/main.rs index c93982191..e536124a2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,9 +2,9 @@ use torrust_tracker_lib::{app, bootstrap}; #[tokio::main] async fn main() { - let (config, tracker, udp_ban_service) = bootstrap::app::setup(); + let (config, tracker, udp_ban_service, stats_event_sender, stats_repository) = bootstrap::app::setup(); - let jobs = app::start(&config, tracker, udp_ban_service).await; + let jobs = app::start(&config, tracker, udp_ban_service, stats_event_sender, stats_repository).await; // handle the signals tokio::select! { diff --git a/src/servers/apis/routes.rs b/src/servers/apis/routes.rs index 98442ea97..cb3789a06 100644 --- a/src/servers/apis/routes.rs +++ b/src/servers/apis/routes.rs @@ -30,6 +30,8 @@ use tracing::{instrument, Level, Span}; use super::v1; use super::v1::context::health_check::handlers::health_check_handler; use super::v1::middlewares::auth::State; +use crate::core::statistics::event::sender::Sender; +use crate::core::statistics::repository::Repository; use crate::core::Tracker; use crate::servers::apis::API_LOG_TARGET; use crate::servers::logging::Latency; @@ -37,10 +39,12 @@ use crate::servers::udp::server::banning::BanService; /// Add all API routes to the router. #[allow(clippy::needless_pass_by_value)] -#[instrument(skip(tracker, ban_service, access_tokens))] +#[instrument(skip(tracker, ban_service, stats_event_sender, stats_repository, access_tokens))] pub fn router( tracker: Arc, ban_service: Arc>, + stats_event_sender: Arc>>, + stats_repository: Arc, access_tokens: Arc, server_socket_addr: SocketAddr, ) -> Router { @@ -48,7 +52,14 @@ pub fn router( let api_url_prefix = "/api"; - let router = v1::routes::add(api_url_prefix, router, tracker.clone(), ban_service.clone()); + let router = v1::routes::add( + api_url_prefix, + router, + tracker.clone(), + ban_service.clone(), + stats_event_sender.clone(), + stats_repository.clone(), + ); let state = State { access_tokens }; diff --git a/src/servers/apis/server.rs b/src/servers/apis/server.rs index 9d1c77c03..bf1511edb 100644 --- a/src/servers/apis/server.rs +++ b/src/servers/apis/server.rs @@ -39,7 +39,8 @@ use tracing::{instrument, Level}; use super::routes::router; use crate::bootstrap::jobs::Started; -use crate::core::Tracker; +use crate::core::statistics::repository::Repository; +use crate::core::{statistics, Tracker}; use crate::servers::apis::API_LOG_TARGET; use crate::servers::custom_axum_server::{self, TimeoutAcceptor}; use crate::servers::logging::STARTED_ON; @@ -124,10 +125,12 @@ impl ApiServer { /// # Panics /// /// It would panic if the bound socket address cannot be sent back to this starter. - #[instrument(skip(self, tracker, ban_service, form, access_tokens), err, ret(Display, level = Level::INFO))] + #[instrument(skip(self, tracker, stats_event_sender, ban_service, stats_repository, form, access_tokens), err, ret(Display, level = Level::INFO))] pub async fn start( self, tracker: Arc, + stats_event_sender: Arc>>, + stats_repository: Arc, ban_service: Arc>, form: ServiceRegistrationForm, access_tokens: Arc, @@ -140,7 +143,17 @@ impl ApiServer { let task = tokio::spawn(async move { tracing::debug!(target: API_LOG_TARGET, "Starting with launcher in spawned task ..."); - let _task = launcher.start(tracker, ban_service, access_tokens, tx_start, rx_halt).await; + let _task = launcher + .start( + tracker, + ban_service, + stats_event_sender, + stats_repository, + access_tokens, + tx_start, + rx_halt, + ) + .await; tracing::debug!(target: API_LOG_TARGET, "Started with launcher in spawned task"); @@ -238,11 +251,23 @@ impl Launcher { /// /// Will panic if unable to bind to the socket, or unable to get the address of the bound socket. /// Will also panic if unable to send message regarding the bound socket address. - #[instrument(skip(self, tracker, ban_service, access_tokens, tx_start, rx_halt))] + #[allow(clippy::too_many_arguments)] + #[instrument(skip( + self, + tracker, + ban_service, + stats_event_sender, + stats_repository, + access_tokens, + tx_start, + rx_halt + ))] pub fn start( &self, tracker: Arc, ban_service: Arc>, + stats_event_sender: Arc>>, + stats_repository: Arc, access_tokens: Arc, tx_start: Sender, rx_halt: Receiver, @@ -250,7 +275,14 @@ impl Launcher { let socket = std::net::TcpListener::bind(self.bind_to).expect("Could not bind tcp_listener to address."); let address = socket.local_addr().expect("Could not get local_addr from tcp_listener."); - let router = router(tracker, ban_service, access_tokens, address); + let router = router( + tracker, + ban_service, + stats_event_sender, + stats_repository, + access_tokens, + address, + ); let handle = Handle::new(); @@ -303,6 +335,7 @@ mod tests { use crate::bootstrap::app::initialize_with_configuration; use crate::bootstrap::jobs::make_rust_tls; + use crate::core::services::statistics; use crate::servers::apis::server::{ApiServer, Launcher}; use crate::servers::registar::Registar; use crate::servers::udp::server::banning::BanService; @@ -313,8 +346,11 @@ mod tests { let cfg = Arc::new(ephemeral_public()); let config = &cfg.http_api.clone().unwrap(); - let tracker = initialize_with_configuration(&cfg); let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); + let (stats_event_sender, stats_repository) = statistics::setup::factory(cfg.core.tracker_usage_statistics); + let stats_event_sender = Arc::new(stats_event_sender); + let stats_repository = Arc::new(stats_repository); + let tracker = initialize_with_configuration(&cfg); let bind_to = config.bind_address; @@ -329,7 +365,14 @@ mod tests { let register = &Registar::default(); let started = stopped - .start(tracker, ban_service, register.give_form(), access_tokens) + .start( + tracker, + stats_event_sender, + stats_repository, + ban_service, + register.give_form(), + access_tokens, + ) .await .expect("it should start the server"); let stopped = started.stop().await.expect("it should stop the server"); diff --git a/src/servers/apis/v1/context/stats/handlers.rs b/src/servers/apis/v1/context/stats/handlers.rs index b630c763d..af7e1c239 100644 --- a/src/servers/apis/v1/context/stats/handlers.rs +++ b/src/servers/apis/v1/context/stats/handlers.rs @@ -10,6 +10,7 @@ use tokio::sync::RwLock; use super::responses::{metrics_response, stats_response}; use crate::core::services::statistics::get_metrics; +use crate::core::statistics::repository::Repository; use crate::core::Tracker; use crate::servers::udp::server::banning::BanService; @@ -37,11 +38,12 @@ pub struct QueryParams { /// /// Refer to the [API endpoint documentation](crate::servers::apis::v1::context::stats#get-tracker-statistics) /// for more information about this endpoint. +#[allow(clippy::type_complexity)] pub async fn get_stats_handler( - State(state): State<(Arc, Arc>)>, + State(state): State<(Arc, Arc>, Arc)>, params: Query, ) -> Response { - let metrics = get_metrics(state.0.clone(), state.1.clone()).await; + let metrics = get_metrics(state.0.clone(), state.1.clone(), state.2.clone()).await; match params.0.format { Some(format) => match format { diff --git a/src/servers/apis/v1/context/stats/routes.rs b/src/servers/apis/v1/context/stats/routes.rs index fde1056c3..b5df32963 100644 --- a/src/servers/apis/v1/context/stats/routes.rs +++ b/src/servers/apis/v1/context/stats/routes.rs @@ -10,13 +10,22 @@ use axum::Router; use tokio::sync::RwLock; use super::handlers::get_stats_handler; +use crate::core::statistics::event::sender::Sender; +use crate::core::statistics::repository::Repository; use crate::core::Tracker; use crate::servers::udp::server::banning::BanService; /// It adds the routes to the router for the [`stats`](crate::servers::apis::v1::context::stats) API context. -pub fn add(prefix: &str, router: Router, tracker: Arc, ban_service: Arc>) -> Router { +pub fn add( + prefix: &str, + router: Router, + tracker: Arc, + ban_service: Arc>, + _stats_event_sender: Arc>>, + stats_repository: Arc, +) -> Router { router.route( &format!("{prefix}/stats"), - get(get_stats_handler).with_state((tracker, ban_service)), + get(get_stats_handler).with_state((tracker, ban_service, stats_repository)), ) } diff --git a/src/servers/apis/v1/routes.rs b/src/servers/apis/v1/routes.rs index 4c97c7578..9fbd5da0e 100644 --- a/src/servers/apis/v1/routes.rs +++ b/src/servers/apis/v1/routes.rs @@ -5,15 +5,31 @@ use axum::Router; use tokio::sync::RwLock; use super::context::{auth_key, stats, torrent, whitelist}; +use crate::core::statistics::event::sender::Sender; +use crate::core::statistics::repository::Repository; use crate::core::Tracker; use crate::servers::udp::server::banning::BanService; /// Add the routes for the v1 API. -pub fn add(prefix: &str, router: Router, tracker: Arc, ban_service: Arc>) -> Router { +pub fn add( + prefix: &str, + router: Router, + tracker: Arc, + ban_service: Arc>, + stats_event_sender: Arc>>, + stats_repository: Arc, +) -> Router { let v1_prefix = format!("{prefix}/v1"); let router = auth_key::routes::add(&v1_prefix, router, tracker.clone()); - let router = stats::routes::add(&v1_prefix, router, tracker.clone(), ban_service); + let router = stats::routes::add( + &v1_prefix, + router, + tracker.clone(), + ban_service, + stats_event_sender, + stats_repository, + ); let router = whitelist::routes::add(&v1_prefix, router, &tracker); torrent::routes::add(&v1_prefix, router, tracker) diff --git a/src/servers/http/server.rs b/src/servers/http/server.rs index 560d91681..a392df09c 100644 --- a/src/servers/http/server.rs +++ b/src/servers/http/server.rs @@ -11,7 +11,7 @@ use tracing::instrument; use super::v1::routes::router; use crate::bootstrap::jobs::Started; -use crate::core::Tracker; +use crate::core::{statistics, Tracker}; use crate::servers::custom_axum_server::{self, TimeoutAcceptor}; use crate::servers::http::HTTP_TRACKER_LOG_TARGET; use crate::servers::logging::STARTED_ON; @@ -42,8 +42,14 @@ pub struct Launcher { } impl Launcher { - #[instrument(skip(self, tracker, tx_start, rx_halt))] - fn start(&self, tracker: Arc, tx_start: Sender, rx_halt: Receiver) -> BoxFuture<'static, ()> { + #[instrument(skip(self, tracker, stats_event_sender, tx_start, rx_halt))] + fn start( + &self, + tracker: Arc, + stats_event_sender: Arc>>, + tx_start: Sender, + rx_halt: Receiver, + ) -> BoxFuture<'static, ()> { let socket = std::net::TcpListener::bind(self.bind_to).expect("Could not bind tcp_listener to address."); let address = socket.local_addr().expect("Could not get local_addr from tcp_listener."); @@ -60,7 +66,7 @@ impl Launcher { tracing::info!(target: HTTP_TRACKER_LOG_TARGET, "Starting on: {protocol}://{}", address); - let app = router(tracker, address); + let app = router(tracker, stats_event_sender, address); let running = Box::pin(async { match tls { @@ -153,14 +159,19 @@ impl HttpServer { /// /// It would panic spawned HTTP server launcher cannot send the bound `SocketAddr` /// back to the main thread. - pub async fn start(self, tracker: Arc, form: ServiceRegistrationForm) -> Result, Error> { + pub async fn start( + self, + tracker: Arc, + stats_event_sender: Arc>>, + form: ServiceRegistrationForm, + ) -> Result, Error> { let (tx_start, rx_start) = tokio::sync::oneshot::channel::(); let (tx_halt, rx_halt) = tokio::sync::oneshot::channel::(); let launcher = self.state.launcher; let task = tokio::spawn(async move { - let server = launcher.start(tracker, tx_start, rx_halt); + let server = launcher.start(tracker, stats_event_sender, tx_start, rx_halt); server.await; @@ -233,13 +244,19 @@ mod tests { use crate::bootstrap::app::initialize_with_configuration; use crate::bootstrap::jobs::make_rust_tls; + use crate::core::services::statistics; use crate::servers::http::server::{HttpServer, Launcher}; use crate::servers::registar::Registar; #[tokio::test] async fn it_should_be_able_to_start_and_stop() { let cfg = Arc::new(ephemeral_public()); + + let (stats_event_sender, stats_repository) = statistics::setup::factory(cfg.core.tracker_usage_statistics); + let stats_event_sender = Arc::new(stats_event_sender); + let _stats_repository = Arc::new(stats_repository); let tracker = initialize_with_configuration(&cfg); + let http_trackers = cfg.http_trackers.clone().expect("missing HTTP trackers configuration"); let config = &http_trackers[0]; @@ -253,7 +270,7 @@ mod tests { let stopped = HttpServer::new(Launcher::new(bind_to, tls)); let started = stopped - .start(tracker, register.give_form()) + .start(tracker, stats_event_sender, register.give_form()) .await .expect("it should start the server"); let stopped = started.stop().await.expect("it should stop the server"); diff --git a/src/servers/http/v1/handlers/announce.rs b/src/servers/http/v1/handlers/announce.rs index b21f035da..1c8779625 100644 --- a/src/servers/http/v1/handlers/announce.rs +++ b/src/servers/http/v1/handlers/announce.rs @@ -22,6 +22,7 @@ use torrust_tracker_primitives::core::AnnounceData; use torrust_tracker_primitives::peer; use crate::core::auth::Key; +use crate::core::statistics::event::sender::Sender; use crate::core::{PeersWanted, Tracker}; use crate::servers::http::v1::extractors::announce_request::ExtractRequest; use crate::servers::http::v1::extractors::authentication_key::Extract as ExtractKey; @@ -33,28 +34,30 @@ use crate::CurrentClock; /// It handles the `announce` request when the HTTP tracker does not require /// authentication (no PATH `key` parameter required). #[allow(clippy::unused_async)] +#[allow(clippy::type_complexity)] pub async fn handle_without_key( - State(tracker): State>, + State(state): State<(Arc, Arc>>)>, ExtractRequest(announce_request): ExtractRequest, ExtractClientIpSources(client_ip_sources): ExtractClientIpSources, ) -> Response { tracing::debug!("http announce request: {:#?}", announce_request); - handle(&tracker, &announce_request, &client_ip_sources, None).await + handle(&state.0, &state.1, &announce_request, &client_ip_sources, None).await } /// It handles the `announce` request when the HTTP tracker requires /// authentication (PATH `key` parameter required). #[allow(clippy::unused_async)] +#[allow(clippy::type_complexity)] pub async fn handle_with_key( - State(tracker): State>, + State(state): State<(Arc, Arc>>)>, ExtractRequest(announce_request): ExtractRequest, ExtractClientIpSources(client_ip_sources): ExtractClientIpSources, ExtractKey(key): ExtractKey, ) -> Response { tracing::debug!("http announce request: {:#?}", announce_request); - handle(&tracker, &announce_request, &client_ip_sources, Some(key)).await + handle(&state.0, &state.1, &announce_request, &client_ip_sources, Some(key)).await } /// It handles the `announce` request. @@ -63,11 +66,20 @@ pub async fn handle_with_key( /// `unauthenticated` modes. async fn handle( tracker: &Arc, + opt_stats_event_sender: &Arc>>, announce_request: &Announce, client_ip_sources: &ClientIpSources, maybe_key: Option, ) -> Response { - let announce_data = match handle_announce(tracker, announce_request, client_ip_sources, maybe_key).await { + let announce_data = match handle_announce( + tracker, + opt_stats_event_sender, + announce_request, + client_ip_sources, + maybe_key, + ) + .await + { Ok(announce_data) => announce_data, Err(error) => return (StatusCode::OK, error.write()).into_response(), }; @@ -82,6 +94,7 @@ async fn handle( async fn handle_announce( tracker: &Arc, + opt_stats_event_sender: &Arc>>, announce_request: &Announce, client_ip_sources: &ClientIpSources, maybe_key: Option, @@ -118,7 +131,14 @@ async fn handle_announce( None => PeersWanted::All, }; - let announce_data = services::announce::invoke(tracker.clone(), announce_request.info_hash, &mut peer, &peers_wanted).await; + let announce_data = services::announce::invoke( + tracker.clone(), + opt_stats_event_sender.clone(), + announce_request.info_hash, + &mut peer, + &peers_wanted, + ) + .await; Ok(announce_data) } @@ -186,31 +206,44 @@ mod tests { use torrust_tracker_test_helpers::configuration; use crate::bootstrap::app::initialize_tracker_dependencies; - use crate::core::services::tracker_factory; + use crate::core::services::{statistics, tracker_factory}; + use crate::core::statistics::event::sender::Sender; use crate::core::Tracker; - fn private_tracker() -> Tracker { + fn private_tracker() -> (Tracker, Option>) { let config = configuration::ephemeral_private(); - let (database, whitelist_manager, stats_event_sender, stats_repository) = initialize_tracker_dependencies(&config); - tracker_factory(&config, &database, &whitelist_manager, &stats_event_sender, &stats_repository) + + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); + let (stats_event_sender, _stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); + + (tracker_factory(&config, &database, &whitelist_manager), stats_event_sender) } - fn whitelisted_tracker() -> Tracker { + fn whitelisted_tracker() -> (Tracker, Option>) { let config = configuration::ephemeral_listed(); - let (database, whitelist_manager, stats_event_sender, stats_repository) = initialize_tracker_dependencies(&config); - tracker_factory(&config, &database, &whitelist_manager, &stats_event_sender, &stats_repository) + + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); + let (stats_event_sender, _stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); + + (tracker_factory(&config, &database, &whitelist_manager), stats_event_sender) } - fn tracker_on_reverse_proxy() -> Tracker { + fn tracker_on_reverse_proxy() -> (Tracker, Option>) { let config = configuration::ephemeral_with_reverse_proxy(); - let (database, whitelist_manager, stats_event_sender, stats_repository) = initialize_tracker_dependencies(&config); - tracker_factory(&config, &database, &whitelist_manager, &stats_event_sender, &stats_repository) + + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); + let (stats_event_sender, _stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); + + (tracker_factory(&config, &database, &whitelist_manager), stats_event_sender) } - fn tracker_not_on_reverse_proxy() -> Tracker { + fn tracker_not_on_reverse_proxy() -> (Tracker, Option>) { let config = configuration::ephemeral_without_reverse_proxy(); - let (database, whitelist_manager, stats_event_sender, stats_repository) = initialize_tracker_dependencies(&config); - tracker_factory(&config, &database, &whitelist_manager, &stats_event_sender, &stats_repository) + + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); + let (stats_event_sender, _stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); + + (tracker_factory(&config, &database, &whitelist_manager), stats_event_sender) } fn sample_announce_request() -> Announce { @@ -253,13 +286,22 @@ mod tests { #[tokio::test] async fn it_should_fail_when_the_authentication_key_is_missing() { - let tracker = Arc::new(private_tracker()); + let (tracker, stats_event_sender) = private_tracker(); + + let tracker = Arc::new(tracker); + let stats_event_sender = Arc::new(stats_event_sender); let maybe_key = None; - let response = handle_announce(&tracker, &sample_announce_request(), &sample_client_ip_sources(), maybe_key) - .await - .unwrap_err(); + let response = handle_announce( + &tracker, + &stats_event_sender, + &sample_announce_request(), + &sample_client_ip_sources(), + maybe_key, + ) + .await + .unwrap_err(); assert_error_response( &response, @@ -269,15 +311,24 @@ mod tests { #[tokio::test] async fn it_should_fail_when_the_authentication_key_is_invalid() { - let tracker = Arc::new(private_tracker()); + let (tracker, stats_event_sender) = private_tracker(); + + let tracker = Arc::new(tracker); + let stats_event_sender = Arc::new(stats_event_sender); let unregistered_key = auth::Key::from_str("YZSl4lMZupRuOpSRC3krIKR5BPB14nrJ").unwrap(); let maybe_key = Some(unregistered_key); - let response = handle_announce(&tracker, &sample_announce_request(), &sample_client_ip_sources(), maybe_key) - .await - .unwrap_err(); + let response = handle_announce( + &tracker, + &stats_event_sender, + &sample_announce_request(), + &sample_client_ip_sources(), + maybe_key, + ) + .await + .unwrap_err(); assert_error_response(&response, "Authentication error: Failed to read key"); } @@ -293,13 +344,22 @@ mod tests { #[tokio::test] async fn it_should_fail_when_the_announced_torrent_is_not_whitelisted() { - let tracker = Arc::new(whitelisted_tracker()); + let (tracker, stats_event_sender) = whitelisted_tracker(); + + let tracker = Arc::new(tracker); + let stats_event_sender = Arc::new(stats_event_sender); let announce_request = sample_announce_request(); - let response = handle_announce(&tracker, &announce_request, &sample_client_ip_sources(), None) - .await - .unwrap_err(); + let response = handle_announce( + &tracker, + &stats_event_sender, + &announce_request, + &sample_client_ip_sources(), + None, + ) + .await + .unwrap_err(); assert_error_response( &response, @@ -323,16 +383,25 @@ mod tests { #[tokio::test] async fn it_should_fail_when_the_right_most_x_forwarded_for_header_ip_is_not_available() { - let tracker = Arc::new(tracker_on_reverse_proxy()); + let (tracker, stats_event_sender) = tracker_on_reverse_proxy(); + + let tracker = Arc::new(tracker); + let stats_event_sender = Arc::new(stats_event_sender); let client_ip_sources = ClientIpSources { right_most_x_forwarded_for: None, connection_info_ip: None, }; - let response = handle_announce(&tracker, &sample_announce_request(), &client_ip_sources, None) - .await - .unwrap_err(); + let response = handle_announce( + &tracker, + &stats_event_sender, + &sample_announce_request(), + &client_ip_sources, + None, + ) + .await + .unwrap_err(); assert_error_response( &response, @@ -353,16 +422,25 @@ mod tests { #[tokio::test] async fn it_should_fail_when_the_client_ip_from_the_connection_info_is_not_available() { - let tracker = Arc::new(tracker_not_on_reverse_proxy()); + let (tracker, stats_event_sender) = tracker_not_on_reverse_proxy(); + + let tracker = Arc::new(tracker); + let stats_event_sender = Arc::new(stats_event_sender); let client_ip_sources = ClientIpSources { right_most_x_forwarded_for: None, connection_info_ip: None, }; - let response = handle_announce(&tracker, &sample_announce_request(), &client_ip_sources, None) - .await - .unwrap_err(); + let response = handle_announce( + &tracker, + &stats_event_sender, + &sample_announce_request(), + &client_ip_sources, + None, + ) + .await + .unwrap_err(); assert_error_response( &response, diff --git a/src/servers/http/v1/handlers/scrape.rs b/src/servers/http/v1/handlers/scrape.rs index 41afb6bbb..6ff8a61cf 100644 --- a/src/servers/http/v1/handlers/scrape.rs +++ b/src/servers/http/v1/handlers/scrape.rs @@ -16,6 +16,7 @@ use hyper::StatusCode; use torrust_tracker_primitives::core::ScrapeData; use crate::core::auth::Key; +use crate::core::statistics::event::sender::Sender; use crate::core::Tracker; use crate::servers::http::v1::extractors::authentication_key::Extract as ExtractKey; use crate::servers::http::v1::extractors::client_ip_sources::Extract as ExtractClientIpSources; @@ -25,14 +26,15 @@ use crate::servers::http::v1::services; /// It handles the `scrape` request when the HTTP tracker is configured /// to run in `public` mode. #[allow(clippy::unused_async)] +#[allow(clippy::type_complexity)] pub async fn handle_without_key( - State(tracker): State>, + State(state): State<(Arc, Arc>>)>, ExtractRequest(scrape_request): ExtractRequest, ExtractClientIpSources(client_ip_sources): ExtractClientIpSources, ) -> Response { tracing::debug!("http scrape request: {:#?}", &scrape_request); - handle(&tracker, &scrape_request, &client_ip_sources, None).await + handle(&state.0, &state.1, &scrape_request, &client_ip_sources, None).await } /// It handles the `scrape` request when the HTTP tracker is configured @@ -40,24 +42,26 @@ pub async fn handle_without_key( /// /// In this case, the authentication `key` parameter is required. #[allow(clippy::unused_async)] +#[allow(clippy::type_complexity)] pub async fn handle_with_key( - State(tracker): State>, + State(state): State<(Arc, Arc>>)>, ExtractRequest(scrape_request): ExtractRequest, ExtractClientIpSources(client_ip_sources): ExtractClientIpSources, ExtractKey(key): ExtractKey, ) -> Response { tracing::debug!("http scrape request: {:#?}", &scrape_request); - handle(&tracker, &scrape_request, &client_ip_sources, Some(key)).await + handle(&state.0, &state.1, &scrape_request, &client_ip_sources, Some(key)).await } async fn handle( tracker: &Arc, + stats_event_sender: &Arc>>, scrape_request: &Scrape, client_ip_sources: &ClientIpSources, maybe_key: Option, ) -> Response { - let scrape_data = match handle_scrape(tracker, scrape_request, client_ip_sources, maybe_key).await { + let scrape_data = match handle_scrape(tracker, stats_event_sender, scrape_request, client_ip_sources, maybe_key).await { Ok(scrape_data) => scrape_data, Err(error) => return (StatusCode::OK, error.write()).into_response(), }; @@ -72,6 +76,7 @@ async fn handle( async fn handle_scrape( tracker: &Arc, + opt_stats_event_sender: &Arc>>, scrape_request: &Scrape, client_ip_sources: &ClientIpSources, maybe_key: Option, @@ -98,9 +103,9 @@ async fn handle_scrape( }; if return_real_scrape_data { - Ok(services::scrape::invoke(tracker, &scrape_request.info_hashes, &peer_ip).await) + Ok(services::scrape::invoke(tracker, opt_stats_event_sender, &scrape_request.info_hashes, &peer_ip).await) } else { - Ok(services::scrape::fake(tracker, &scrape_request.info_hashes, &peer_ip).await) + Ok(services::scrape::fake(opt_stats_event_sender, &scrape_request.info_hashes, &peer_ip).await) } } @@ -122,31 +127,43 @@ mod tests { use torrust_tracker_test_helpers::configuration; use crate::bootstrap::app::initialize_tracker_dependencies; - use crate::core::services::tracker_factory; + use crate::core::services::{statistics, tracker_factory}; use crate::core::Tracker; - fn private_tracker() -> Tracker { + fn private_tracker() -> (Tracker, Option>) { let config = configuration::ephemeral_private(); - let (database, whitelist_manager, stats_event_sender, stats_repository) = initialize_tracker_dependencies(&config); - tracker_factory(&config, &database, &whitelist_manager, &stats_event_sender, &stats_repository) + + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); + let (stats_event_sender, _stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); + + (tracker_factory(&config, &database, &whitelist_manager), stats_event_sender) } - fn whitelisted_tracker() -> Tracker { + fn whitelisted_tracker() -> (Tracker, Option>) { let config = configuration::ephemeral_listed(); - let (database, whitelist_manager, stats_event_sender, stats_repository) = initialize_tracker_dependencies(&config); - tracker_factory(&config, &database, &whitelist_manager, &stats_event_sender, &stats_repository) + + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); + let (stats_event_sender, _stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); + + (tracker_factory(&config, &database, &whitelist_manager), stats_event_sender) } - fn tracker_on_reverse_proxy() -> Tracker { + fn tracker_on_reverse_proxy() -> (Tracker, Option>) { let config = configuration::ephemeral_with_reverse_proxy(); - let (database, whitelist_manager, stats_event_sender, stats_repository) = initialize_tracker_dependencies(&config); - tracker_factory(&config, &database, &whitelist_manager, &stats_event_sender, &stats_repository) + + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); + let (stats_event_sender, _stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); + + (tracker_factory(&config, &database, &whitelist_manager), stats_event_sender) } - fn tracker_not_on_reverse_proxy() -> Tracker { + fn tracker_not_on_reverse_proxy() -> (Tracker, Option>) { let config = configuration::ephemeral_without_reverse_proxy(); - let (database, whitelist_manager, stats_event_sender, stats_repository) = initialize_tracker_dependencies(&config); - tracker_factory(&config, &database, &whitelist_manager, &stats_event_sender, &stats_repository) + + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); + let (stats_event_sender, _stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); + + (tracker_factory(&config, &database, &whitelist_manager), stats_event_sender) } fn sample_scrape_request() -> Scrape { @@ -181,14 +198,22 @@ mod tests { #[tokio::test] async fn it_should_return_zeroed_swarm_metadata_when_the_authentication_key_is_missing() { - let tracker = Arc::new(private_tracker()); + let (tracker, stats_event_sender) = private_tracker(); + let tracker = Arc::new(tracker); + let stats_event_sender = Arc::new(stats_event_sender); let scrape_request = sample_scrape_request(); let maybe_key = None; - let scrape_data = handle_scrape(&tracker, &scrape_request, &sample_client_ip_sources(), maybe_key) - .await - .unwrap(); + let scrape_data = handle_scrape( + &tracker, + &stats_event_sender, + &scrape_request, + &sample_client_ip_sources(), + maybe_key, + ) + .await + .unwrap(); let expected_scrape_data = ScrapeData::zeroed(&scrape_request.info_hashes); @@ -197,15 +222,23 @@ mod tests { #[tokio::test] async fn it_should_return_zeroed_swarm_metadata_when_the_authentication_key_is_invalid() { - let tracker = Arc::new(private_tracker()); + let (tracker, stats_event_sender) = private_tracker(); + let tracker = Arc::new(tracker); + let stats_event_sender = Arc::new(stats_event_sender); let scrape_request = sample_scrape_request(); let unregistered_key = auth::Key::from_str("YZSl4lMZupRuOpSRC3krIKR5BPB14nrJ").unwrap(); let maybe_key = Some(unregistered_key); - let scrape_data = handle_scrape(&tracker, &scrape_request, &sample_client_ip_sources(), maybe_key) - .await - .unwrap(); + let scrape_data = handle_scrape( + &tracker, + &stats_event_sender, + &scrape_request, + &sample_client_ip_sources(), + maybe_key, + ) + .await + .unwrap(); let expected_scrape_data = ScrapeData::zeroed(&scrape_request.info_hashes); @@ -224,13 +257,21 @@ mod tests { #[tokio::test] async fn it_should_return_zeroed_swarm_metadata_when_the_torrent_is_not_whitelisted() { - let tracker = Arc::new(whitelisted_tracker()); + let (tracker, stats_event_sender) = whitelisted_tracker(); + let tracker: Arc = Arc::new(tracker); + let stats_event_sender = Arc::new(stats_event_sender); let scrape_request = sample_scrape_request(); - let scrape_data = handle_scrape(&tracker, &scrape_request, &sample_client_ip_sources(), None) - .await - .unwrap(); + let scrape_data = handle_scrape( + &tracker, + &stats_event_sender, + &scrape_request, + &sample_client_ip_sources(), + None, + ) + .await + .unwrap(); let expected_scrape_data = ScrapeData::zeroed(&scrape_request.info_hashes); @@ -249,16 +290,24 @@ mod tests { #[tokio::test] async fn it_should_fail_when_the_right_most_x_forwarded_for_header_ip_is_not_available() { - let tracker = Arc::new(tracker_on_reverse_proxy()); + let (tracker, stats_event_sender) = tracker_on_reverse_proxy(); + let tracker: Arc = Arc::new(tracker); + let stats_event_sender = Arc::new(stats_event_sender); let client_ip_sources = ClientIpSources { right_most_x_forwarded_for: None, connection_info_ip: None, }; - let response = handle_scrape(&tracker, &sample_scrape_request(), &client_ip_sources, None) - .await - .unwrap_err(); + let response = handle_scrape( + &tracker, + &stats_event_sender, + &sample_scrape_request(), + &client_ip_sources, + None, + ) + .await + .unwrap_err(); assert_error_response( &response, @@ -278,16 +327,24 @@ mod tests { #[tokio::test] async fn it_should_fail_when_the_client_ip_from_the_connection_info_is_not_available() { - let tracker = Arc::new(tracker_not_on_reverse_proxy()); + let (tracker, stats_event_sender) = tracker_not_on_reverse_proxy(); + let tracker: Arc = Arc::new(tracker); + let stats_event_sender = Arc::new(stats_event_sender); let client_ip_sources = ClientIpSources { right_most_x_forwarded_for: None, connection_info_ip: None, }; - let response = handle_scrape(&tracker, &sample_scrape_request(), &client_ip_sources, None) - .await - .unwrap_err(); + let response = handle_scrape( + &tracker, + &stats_event_sender, + &sample_scrape_request(), + &client_ip_sources, + None, + ) + .await + .unwrap_err(); assert_error_response( &response, diff --git a/src/servers/http/v1/routes.rs b/src/servers/http/v1/routes.rs index 3c6926c37..97eb5b95d 100644 --- a/src/servers/http/v1/routes.rs +++ b/src/servers/http/v1/routes.rs @@ -22,6 +22,7 @@ use tower_http::LatencyUnit; use tracing::{instrument, Level, Span}; use super::handlers::{announce, health_check, scrape}; +use crate::core::statistics::event::sender::Sender; use crate::core::Tracker; use crate::servers::http::HTTP_TRACKER_LOG_TARGET; use crate::servers::logging::Latency; @@ -31,17 +32,29 @@ use crate::servers::logging::Latency; /// > **NOTICE**: it's added a layer to get the client IP from the connection /// > info. The tracker could use the connection info to get the client IP. #[allow(clippy::needless_pass_by_value)] -#[instrument(skip(tracker, server_socket_addr))] -pub fn router(tracker: Arc, server_socket_addr: SocketAddr) -> Router { +#[instrument(skip(tracker, stats_event_sender, server_socket_addr))] +pub fn router(tracker: Arc, stats_event_sender: Arc>>, server_socket_addr: SocketAddr) -> Router { Router::new() // Health check .route("/health_check", get(health_check::handler)) // Announce request - .route("/announce", get(announce::handle_without_key).with_state(tracker.clone())) - .route("/announce/{key}", get(announce::handle_with_key).with_state(tracker.clone())) + .route( + "/announce", + get(announce::handle_without_key).with_state((tracker.clone(), stats_event_sender.clone())), + ) + .route( + "/announce/{key}", + get(announce::handle_with_key).with_state((tracker.clone(), stats_event_sender.clone())), + ) // Scrape request - .route("/scrape", get(scrape::handle_without_key).with_state(tracker.clone())) - .route("/scrape/{key}", get(scrape::handle_with_key).with_state(tracker)) + .route( + "/scrape", + get(scrape::handle_without_key).with_state((tracker.clone(), stats_event_sender.clone())), + ) + .route( + "/scrape/{key}", + get(scrape::handle_with_key).with_state((tracker.clone(), stats_event_sender.clone())), + ) // Add extension to get the client IP from the connection info .layer(SecureClientIpSource::ConnectInfo.into_extension()) .layer(CompressionLayer::new()) diff --git a/src/servers/http/v1/services/announce.rs b/src/servers/http/v1/services/announce.rs index 3a4d4820a..4a7c5707d 100644 --- a/src/servers/http/v1/services/announce.rs +++ b/src/servers/http/v1/services/announce.rs @@ -15,7 +15,9 @@ use bittorrent_primitives::info_hash::InfoHash; use torrust_tracker_primitives::core::AnnounceData; use torrust_tracker_primitives::peer; -use crate::core::{statistics, PeersWanted, Tracker}; +use crate::core::statistics::event::sender::Sender; +use crate::core::statistics::{self}; +use crate::core::{PeersWanted, Tracker}; /// The HTTP tracker `announce` service. /// @@ -29,6 +31,7 @@ use crate::core::{statistics, PeersWanted, Tracker}; /// > each `announce` request. pub async fn invoke( tracker: Arc, + opt_stats_event_sender: Arc>>, info_hash: InfoHash, peer: &mut peer::Peer, peers_wanted: &PeersWanted, @@ -38,12 +41,14 @@ pub async fn invoke( // The tracker could change the original peer ip let announce_data = tracker.announce(&info_hash, peer, &original_peer_ip, peers_wanted); - match original_peer_ip { - IpAddr::V4(_) => { - tracker.send_stats_event(statistics::event::Event::Tcp4Announce).await; - } - IpAddr::V6(_) => { - tracker.send_stats_event(statistics::event::Event::Tcp6Announce).await; + if let Some(stats_event_sender) = opt_stats_event_sender.as_deref() { + match original_peer_ip { + IpAddr::V4(_) => { + stats_event_sender.send_event(statistics::event::Event::Tcp4Announce).await; + } + IpAddr::V6(_) => { + stats_event_sender.send_event(statistics::event::Event::Tcp6Announce).await; + } } } @@ -53,6 +58,7 @@ pub async fn invoke( #[cfg(test)] mod tests { use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + use std::sync::Arc; use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes, PeerId}; use bittorrent_primitives::info_hash::InfoHash; @@ -60,13 +66,21 @@ mod tests { use torrust_tracker_test_helpers::configuration; use crate::bootstrap::app::initialize_tracker_dependencies; - use crate::core::services::tracker_factory; + use crate::core::services::{statistics, tracker_factory}; + use crate::core::statistics::event::sender::Sender; use crate::core::Tracker; - fn public_tracker() -> Tracker { + fn public_tracker() -> (Tracker, Arc>>) { let config = configuration::ephemeral_public(); - let (database, whitelist_manager, stats_event_sender, stats_repository) = initialize_tracker_dependencies(&config); - tracker_factory(&config, &database, &whitelist_manager, &stats_event_sender, &stats_repository) + + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); + let (stats_event_sender, stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); + let stats_event_sender = Arc::new(stats_event_sender); + let _stats_repository = Arc::new(stats_repository); + + let tracker = tracker_factory(&config, &database, &whitelist_manager); + + (tracker, stats_event_sender) } fn sample_info_hash() -> InfoHash { @@ -115,32 +129,30 @@ mod tests { use crate::servers::http::v1::services::announce::invoke; use crate::servers::http::v1::services::announce::tests::{public_tracker, sample_info_hash, sample_peer}; - fn test_tracker_factory(stats_event_sender: Option>) -> Tracker { + fn test_tracker_factory() -> Tracker { let config = configuration::ephemeral(); - let (database, whitelist_manager, _stats_event_sender, _stats_repository) = initialize_tracker_dependencies(&config); + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); - let stats_event_sender = Arc::new(stats_event_sender); - - let stats_repository = Arc::new(statistics::repository::Repository::new()); - - Tracker::new( - &config.core, - &database, - &whitelist_manager, - &stats_event_sender, - &stats_repository, - ) - .unwrap() + Tracker::new(&config.core, &database, &whitelist_manager).unwrap() } #[tokio::test] async fn it_should_return_the_announce_data() { - let tracker = Arc::new(public_tracker()); + let (tracker, stats_event_sender) = public_tracker(); + + let tracker = Arc::new(tracker); let mut peer = sample_peer(); - let announce_data = invoke(tracker.clone(), sample_info_hash(), &mut peer, &PeersWanted::All).await; + let announce_data = invoke( + tracker.clone(), + stats_event_sender.clone(), + sample_info_hash(), + &mut peer, + &PeersWanted::All, + ) + .await; let expected_announce_data = AnnounceData { peers: vec![], @@ -163,22 +175,23 @@ mod tests { .with(eq(statistics::event::Event::Tcp4Announce)) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); - let stats_event_sender = Box::new(stats_event_sender_mock); + let stats_event_sender: Arc>> = + Arc::new(Some(Box::new(stats_event_sender_mock))); - let tracker = Arc::new(test_tracker_factory(Some(stats_event_sender))); + let tracker = Arc::new(test_tracker_factory()); let mut peer = sample_peer_using_ipv4(); - let _announce_data = invoke(tracker, sample_info_hash(), &mut peer, &PeersWanted::All).await; + let _announce_data = invoke(tracker, stats_event_sender, sample_info_hash(), &mut peer, &PeersWanted::All).await; } - fn tracker_with_an_ipv6_external_ip(stats_event_sender: Box) -> Tracker { + fn tracker_with_an_ipv6_external_ip() -> Tracker { let mut configuration = configuration::ephemeral(); configuration.core.net.external_ip = Some(IpAddr::V6(Ipv6Addr::new( 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, ))); - test_tracker_factory(Some(stats_event_sender)) + test_tracker_factory() } fn peer_with_the_ipv4_loopback_ip() -> peer::Peer { @@ -200,12 +213,14 @@ mod tests { .with(eq(statistics::event::Event::Tcp4Announce)) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); - let stats_event_sender = Box::new(stats_event_sender_mock); + let stats_event_sender: Arc>> = + Arc::new(Some(Box::new(stats_event_sender_mock))); let mut peer = peer_with_the_ipv4_loopback_ip(); let _announce_data = invoke( - tracker_with_an_ipv6_external_ip(stats_event_sender).into(), + tracker_with_an_ipv6_external_ip().into(), + stats_event_sender, sample_info_hash(), &mut peer, &PeersWanted::All, @@ -222,13 +237,14 @@ mod tests { .with(eq(statistics::event::Event::Tcp6Announce)) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); - let stats_event_sender = Box::new(stats_event_sender_mock); + let stats_event_sender: Arc>> = + Arc::new(Some(Box::new(stats_event_sender_mock))); - let tracker = Arc::new(test_tracker_factory(Some(stats_event_sender))); + let tracker = Arc::new(test_tracker_factory()); let mut peer = sample_peer_using_ipv6(); - let _announce_data = invoke(tracker, sample_info_hash(), &mut peer, &PeersWanted::All).await; + let _announce_data = invoke(tracker, stats_event_sender, sample_info_hash(), &mut peer, &PeersWanted::All).await; } } } diff --git a/src/servers/http/v1/services/scrape.rs b/src/servers/http/v1/services/scrape.rs index 01be81db6..9805dd8a4 100644 --- a/src/servers/http/v1/services/scrape.rs +++ b/src/servers/http/v1/services/scrape.rs @@ -14,7 +14,9 @@ use std::sync::Arc; use bittorrent_primitives::info_hash::InfoHash; use torrust_tracker_primitives::core::ScrapeData; -use crate::core::{statistics, Tracker}; +use crate::core::statistics::event::sender::Sender; +use crate::core::statistics::{self}; +use crate::core::Tracker; /// The HTTP tracker `scrape` service. /// @@ -26,10 +28,15 @@ use crate::core::{statistics, Tracker}; /// > **NOTICE**: as the HTTP tracker does not requires a connection request /// > like the UDP tracker, the number of TCP connections is incremented for /// > each `scrape` request. -pub async fn invoke(tracker: &Arc, info_hashes: &Vec, original_peer_ip: &IpAddr) -> ScrapeData { +pub async fn invoke( + tracker: &Arc, + opt_stats_event_sender: &Arc>>, + info_hashes: &Vec, + original_peer_ip: &IpAddr, +) -> ScrapeData { let scrape_data = tracker.scrape(info_hashes).await; - send_scrape_event(original_peer_ip, tracker).await; + send_scrape_event(original_peer_ip, opt_stats_event_sender).await; scrape_data } @@ -40,19 +47,25 @@ pub async fn invoke(tracker: &Arc, info_hashes: &Vec, origina /// the tracker returns empty stats for all the torrents. /// /// > **NOTICE**: tracker statistics are not updated in this case. -pub async fn fake(tracker: &Arc, info_hashes: &Vec, original_peer_ip: &IpAddr) -> ScrapeData { - send_scrape_event(original_peer_ip, tracker).await; +pub async fn fake( + opt_stats_event_sender: &Arc>>, + info_hashes: &Vec, + original_peer_ip: &IpAddr, +) -> ScrapeData { + send_scrape_event(original_peer_ip, opt_stats_event_sender).await; ScrapeData::zeroed(info_hashes) } -async fn send_scrape_event(original_peer_ip: &IpAddr, tracker: &Arc) { - match original_peer_ip { - IpAddr::V4(_) => { - tracker.send_stats_event(statistics::event::Event::Tcp4Scrape).await; - } - IpAddr::V6(_) => { - tracker.send_stats_event(statistics::event::Event::Tcp6Scrape).await; +async fn send_scrape_event(original_peer_ip: &IpAddr, opt_stats_event_sender: &Arc>>) { + if let Some(stats_event_sender) = opt_stats_event_sender.as_deref() { + match original_peer_ip { + IpAddr::V4(_) => { + stats_event_sender.send_event(statistics::event::Event::Tcp4Scrape).await; + } + IpAddr::V6(_) => { + stats_event_sender.send_event(statistics::event::Event::Tcp6Scrape).await; + } } } } @@ -61,7 +74,6 @@ async fn send_scrape_event(original_peer_ip: &IpAddr, tracker: &Arc) { mod tests { use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - use std::sync::Arc; use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes, PeerId}; use bittorrent_primitives::info_hash::InfoHash; @@ -70,12 +82,14 @@ mod tests { use crate::bootstrap::app::initialize_tracker_dependencies; use crate::core::services::tracker_factory; - use crate::core::{statistics, Tracker}; + use crate::core::Tracker; fn public_tracker() -> Tracker { let config = configuration::ephemeral_public(); - let (database, whitelist_manager, stats_event_sender, stats_repository) = initialize_tracker_dependencies(&config); - tracker_factory(&config, &database, &whitelist_manager, &stats_event_sender, &stats_repository) + + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); + + tracker_factory(&config, &database, &whitelist_manager) } fn sample_info_hashes() -> Vec { @@ -98,23 +112,12 @@ mod tests { } } - fn test_tracker_factory(stats_event_sender: Option>) -> Tracker { + fn test_tracker_factory() -> Tracker { let config = configuration::ephemeral(); - let (database, whitelist_manager, _stats_event_sender, _stats_repository) = initialize_tracker_dependencies(&config); - - let stats_event_sender = Arc::new(stats_event_sender); - - let stats_repository = Arc::new(statistics::repository::Repository::new()); + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); - Tracker::new( - &config.core, - &database, - &whitelist_manager, - &stats_event_sender, - &stats_repository, - ) - .unwrap() + Tracker::new(&config.core, &database, &whitelist_manager).unwrap() } mod with_real_data { @@ -135,6 +138,9 @@ mod tests { #[tokio::test] async fn it_should_return_the_scrape_data_for_a_torrent() { + let (stats_event_sender, _stats_repository) = crate::core::services::statistics::setup::factory(false); + let stats_event_sender = Arc::new(stats_event_sender); + let tracker = Arc::new(public_tracker()); let info_hash = sample_info_hash(); @@ -145,7 +151,7 @@ mod tests { let original_peer_ip = peer.ip(); tracker.announce(&info_hash, &mut peer, &original_peer_ip, &PeersWanted::All); - let scrape_data = invoke(&tracker, &info_hashes, &original_peer_ip).await; + let scrape_data = invoke(&tracker, &stats_event_sender, &info_hashes, &original_peer_ip).await; let mut expected_scrape_data = ScrapeData::empty(); expected_scrape_data.add_file( @@ -168,13 +174,14 @@ mod tests { .with(eq(statistics::event::Event::Tcp4Scrape)) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); - let stats_event_sender = Box::new(stats_event_sender_mock); + let stats_event_sender: Arc>> = + Arc::new(Some(Box::new(stats_event_sender_mock))); - let tracker = Arc::new(test_tracker_factory(Some(stats_event_sender))); + let tracker = Arc::new(test_tracker_factory()); let peer_ip = IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)); - invoke(&tracker, &sample_info_hashes(), &peer_ip).await; + invoke(&tracker, &stats_event_sender, &sample_info_hashes(), &peer_ip).await; } #[tokio::test] @@ -185,13 +192,14 @@ mod tests { .with(eq(statistics::event::Event::Tcp6Scrape)) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); - let stats_event_sender = Box::new(stats_event_sender_mock); + let stats_event_sender: Arc>> = + Arc::new(Some(Box::new(stats_event_sender_mock))); - let tracker = Arc::new(test_tracker_factory(Some(stats_event_sender))); + let tracker = Arc::new(test_tracker_factory()); let peer_ip = IpAddr::V6(Ipv6Addr::new(0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969)); - invoke(&tracker, &sample_info_hashes(), &peer_ip).await; + invoke(&tracker, &stats_event_sender, &sample_info_hashes(), &peer_ip).await; } } @@ -207,11 +215,13 @@ mod tests { use crate::core::{statistics, PeersWanted}; use crate::servers::http::v1::services::scrape::fake; use crate::servers::http::v1::services::scrape::tests::{ - public_tracker, sample_info_hash, sample_info_hashes, sample_peer, test_tracker_factory, + public_tracker, sample_info_hash, sample_info_hashes, sample_peer, }; #[tokio::test] async fn it_should_always_return_the_zeroed_scrape_data_for_a_torrent() { + let (stats_event_sender, _stats_repository) = crate::core::services::statistics::setup::factory(false); + let stats_event_sender = Arc::new(stats_event_sender); let tracker = Arc::new(public_tracker()); let info_hash = sample_info_hash(); @@ -222,7 +232,7 @@ mod tests { let original_peer_ip = peer.ip(); tracker.announce(&info_hash, &mut peer, &original_peer_ip, &PeersWanted::All); - let scrape_data = fake(&tracker, &info_hashes, &original_peer_ip).await; + let scrape_data = fake(&stats_event_sender, &info_hashes, &original_peer_ip).await; let expected_scrape_data = ScrapeData::zeroed(&info_hashes); @@ -237,13 +247,12 @@ mod tests { .with(eq(statistics::event::Event::Tcp4Scrape)) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); - let stats_event_sender = Box::new(stats_event_sender_mock); - - let tracker = Arc::new(test_tracker_factory(Some(stats_event_sender))); + let stats_event_sender: Arc>> = + Arc::new(Some(Box::new(stats_event_sender_mock))); let peer_ip = IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)); - fake(&tracker, &sample_info_hashes(), &peer_ip).await; + fake(&stats_event_sender, &sample_info_hashes(), &peer_ip).await; } #[tokio::test] @@ -254,13 +263,12 @@ mod tests { .with(eq(statistics::event::Event::Tcp6Scrape)) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); - let stats_event_sender = Box::new(stats_event_sender_mock); - - let tracker = Arc::new(test_tracker_factory(Some(stats_event_sender))); + let stats_event_sender: Arc>> = + Arc::new(Some(Box::new(stats_event_sender_mock))); let peer_ip = IpAddr::V6(Ipv6Addr::new(0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969)); - fake(&tracker, &sample_info_hashes(), &peer_ip).await; + fake(&stats_event_sender, &sample_info_hashes(), &peer_ip).await; } } } diff --git a/src/servers/udp/handlers.rs b/src/servers/udp/handlers.rs index 4b20c2ac5..9883de54b 100644 --- a/src/servers/udp/handlers.rs +++ b/src/servers/udp/handlers.rs @@ -20,6 +20,7 @@ use zerocopy::network_endian::I32; use super::connection_cookie::{check, make}; use super::server::banning::BanService; use super::RawRequest; +use crate::core::statistics::event::sender::Sender; use crate::core::{statistics, PeersWanted, Tracker}; use crate::servers::udp::error::Error; use crate::servers::udp::{peer_builder, UDP_TRACKER_LOG_TARGET}; @@ -53,10 +54,11 @@ impl CookieTimeValues { /// - Delegating the request to the correct handler depending on the request type. /// /// It will return an `Error` response if the request is invalid. -#[instrument(fields(request_id), skip(udp_request, tracker, cookie_time_values, ban_service), ret(level = Level::TRACE))] +#[instrument(fields(request_id), skip(udp_request, tracker, opt_stats_event_sender, cookie_time_values, ban_service), ret(level = Level::TRACE))] pub(crate) async fn handle_packet( udp_request: RawRequest, tracker: &Tracker, + opt_stats_event_sender: &Arc>>, local_addr: SocketAddr, cookie_time_values: CookieTimeValues, ban_service: Arc>, @@ -70,7 +72,15 @@ pub(crate) async fn handle_packet( let response = match Request::parse_bytes(&udp_request.payload[..udp_request.payload.len()], MAX_SCRAPE_TORRENTS).map_err(Error::from) { - Ok(request) => match handle_request(request, udp_request.from, tracker, cookie_time_values.clone()).await { + Ok(request) => match handle_request( + request, + udp_request.from, + tracker, + opt_stats_event_sender, + cookie_time_values.clone(), + ) + .await + { Ok(response) => return response, Err((e, transaction_id)) => { match &e { @@ -88,7 +98,7 @@ pub(crate) async fn handle_packet( udp_request.from, local_addr, request_id, - tracker, + opt_stats_event_sender, cookie_time_values.valid_range.clone(), &e, Some(transaction_id), @@ -101,7 +111,7 @@ pub(crate) async fn handle_packet( udp_request.from, local_addr, request_id, - tracker, + opt_stats_event_sender, cookie_time_values.valid_range.clone(), &e, None, @@ -121,24 +131,43 @@ pub(crate) async fn handle_packet( /// # Errors /// /// If a error happens in the `handle_request` function, it will just return the `ServerError`. -#[instrument(skip(request, remote_addr, tracker, cookie_time_values))] +#[instrument(skip(request, remote_addr, tracker, opt_stats_event_sender, cookie_time_values))] pub async fn handle_request( request: Request, remote_addr: SocketAddr, tracker: &Tracker, + opt_stats_event_sender: &Arc>>, cookie_time_values: CookieTimeValues, ) -> Result { tracing::trace!("handle request"); match request { - Request::Connect(connect_request) => { - Ok(handle_connect(remote_addr, &connect_request, tracker, cookie_time_values.issue_time).await) - } + Request::Connect(connect_request) => Ok(handle_connect( + remote_addr, + &connect_request, + opt_stats_event_sender, + cookie_time_values.issue_time, + ) + .await), Request::Announce(announce_request) => { - handle_announce(remote_addr, &announce_request, tracker, cookie_time_values.valid_range).await + handle_announce( + remote_addr, + &announce_request, + tracker, + opt_stats_event_sender, + cookie_time_values.valid_range, + ) + .await } Request::Scrape(scrape_request) => { - handle_scrape(remote_addr, &scrape_request, tracker, cookie_time_values.valid_range).await + handle_scrape( + remote_addr, + &scrape_request, + tracker, + opt_stats_event_sender, + cookie_time_values.valid_range, + ) + .await } } } @@ -149,11 +178,11 @@ pub async fn handle_request( /// # Errors /// /// This function does not ever return an error. -#[instrument(fields(transaction_id), skip(tracker), ret(level = Level::TRACE))] +#[instrument(fields(transaction_id), skip(opt_stats_event_sender), ret(level = Level::TRACE))] pub async fn handle_connect( remote_addr: SocketAddr, request: &ConnectRequest, - tracker: &Tracker, + opt_stats_event_sender: &Arc>>, cookie_issue_time: f64, ) -> Response { tracing::Span::current().record("transaction_id", request.transaction_id.0.to_string()); @@ -167,13 +196,14 @@ pub async fn handle_connect( connection_id, }; - // send stats event - match remote_addr { - SocketAddr::V4(_) => { - tracker.send_stats_event(statistics::event::Event::Udp4Connect).await; - } - SocketAddr::V6(_) => { - tracker.send_stats_event(statistics::event::Event::Udp6Connect).await; + if let Some(stats_event_sender) = opt_stats_event_sender.as_deref() { + match remote_addr { + SocketAddr::V4(_) => { + stats_event_sender.send_event(statistics::event::Event::Udp4Connect).await; + } + SocketAddr::V6(_) => { + stats_event_sender.send_event(statistics::event::Event::Udp6Connect).await; + } } } @@ -186,11 +216,12 @@ pub async fn handle_connect( /// # Errors /// /// If a error happens in the `handle_announce` function, it will just return the `ServerError`. -#[instrument(fields(transaction_id, connection_id, info_hash), skip(tracker), ret(level = Level::TRACE))] +#[instrument(fields(transaction_id, connection_id, info_hash), skip(tracker, opt_stats_event_sender), ret(level = Level::TRACE))] pub async fn handle_announce( remote_addr: SocketAddr, request: &AnnounceRequest, tracker: &Tracker, + opt_stats_event_sender: &Arc>>, cookie_valid_range: Range, ) -> Result { tracing::Span::current() @@ -224,12 +255,14 @@ pub async fn handle_announce( let response = tracker.announce(&info_hash, &mut peer, &remote_client_ip, &peers_wanted); - match remote_client_ip { - IpAddr::V4(_) => { - tracker.send_stats_event(statistics::event::Event::Udp4Announce).await; - } - IpAddr::V6(_) => { - tracker.send_stats_event(statistics::event::Event::Udp6Announce).await; + if let Some(stats_event_sender) = opt_stats_event_sender.as_deref() { + match remote_client_ip { + IpAddr::V4(_) => { + stats_event_sender.send_event(statistics::event::Event::Udp4Announce).await; + } + IpAddr::V6(_) => { + stats_event_sender.send_event(statistics::event::Event::Udp6Announce).await; + } } } @@ -293,11 +326,12 @@ pub async fn handle_announce( /// # Errors /// /// This function does not ever return an error. -#[instrument(fields(transaction_id, connection_id), skip(tracker), ret(level = Level::TRACE))] +#[instrument(fields(transaction_id, connection_id), skip(tracker, opt_stats_event_sender), ret(level = Level::TRACE))] pub async fn handle_scrape( remote_addr: SocketAddr, request: &ScrapeRequest, tracker: &Tracker, + opt_stats_event_sender: &Arc>>, cookie_valid_range: Range, ) -> Result { tracing::Span::current() @@ -338,13 +372,14 @@ pub async fn handle_scrape( torrent_stats.push(scrape_entry); } - // send stats event - match remote_addr { - SocketAddr::V4(_) => { - tracker.send_stats_event(statistics::event::Event::Udp4Scrape).await; - } - SocketAddr::V6(_) => { - tracker.send_stats_event(statistics::event::Event::Udp6Scrape).await; + if let Some(stats_event_sender) = opt_stats_event_sender.as_deref() { + match remote_addr { + SocketAddr::V4(_) => { + stats_event_sender.send_event(statistics::event::Event::Udp4Scrape).await; + } + SocketAddr::V6(_) => { + stats_event_sender.send_event(statistics::event::Event::Udp6Scrape).await; + } } } @@ -356,12 +391,12 @@ pub async fn handle_scrape( Ok(Response::from(response)) } -#[instrument(fields(transaction_id), skip(tracker), ret(level = Level::TRACE))] +#[instrument(fields(transaction_id), skip(opt_stats_event_sender), ret(level = Level::TRACE))] async fn handle_error( remote_addr: SocketAddr, local_addr: SocketAddr, request_id: Uuid, - tracker: &Tracker, + opt_stats_event_sender: &Arc>>, cookie_valid_range: Range, e: &Error, transaction_id: Option, @@ -398,13 +433,14 @@ async fn handle_error( }; if e.1.is_some() { - // send stats event - match remote_addr { - SocketAddr::V4(_) => { - tracker.send_stats_event(statistics::event::Event::Udp4Error).await; - } - SocketAddr::V6(_) => { - tracker.send_stats_event(statistics::event::Event::Udp6Error).await; + if let Some(stats_event_sender) = opt_stats_event_sender.as_deref() { + match remote_addr { + SocketAddr::V4(_) => { + stats_event_sender.send_event(statistics::event::Event::Udp4Error).await; + } + SocketAddr::V6(_) => { + stats_event_sender.send_event(statistics::event::Event::Udp6Error).await; + } } } } @@ -426,7 +462,6 @@ mod tests { use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::ops::Range; - use std::sync::Arc; use aquatic_udp_protocol::{NumberOfBytes, PeerId}; use torrust_tracker_clock::clock::Time; @@ -436,8 +471,9 @@ mod tests { use super::gen_remote_fingerprint; use crate::bootstrap::app::initialize_tracker_dependencies; - use crate::core::services::tracker_factory; - use crate::core::{statistics, Tracker}; + use crate::core::services::{statistics, tracker_factory}; + use crate::core::statistics::event::sender::Sender; + use crate::core::Tracker; use crate::CurrentClock; fn tracker_configuration() -> Configuration { @@ -448,17 +484,19 @@ mod tests { configuration::ephemeral() } - fn public_tracker() -> Arc { + fn public_tracker() -> (Tracker, Option>) { initialized_tracker(&configuration::ephemeral_public()) } - fn whitelisted_tracker() -> Arc { + fn whitelisted_tracker() -> (Tracker, Option>) { initialized_tracker(&configuration::ephemeral_listed()) } - fn initialized_tracker(config: &Configuration) -> Arc { - let (database, whitelist_manager, stats_event_sender, stats_repository) = initialize_tracker_dependencies(config); - tracker_factory(config, &database, &whitelist_manager, &stats_event_sender, &stats_repository).into() + fn initialized_tracker(config: &Configuration) -> (Tracker, Option>) { + let (database, whitelist_manager) = initialize_tracker_dependencies(config); + let (stats_event_sender, _stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); + + (tracker_factory(config, &database, &whitelist_manager), stats_event_sender) } fn sample_ipv4_remote_addr() -> SocketAddr { @@ -555,23 +593,12 @@ mod tests { } } - fn test_tracker_factory(stats_event_sender: Option>) -> Tracker { + fn test_tracker_factory() -> Tracker { let config = tracker_configuration(); - let (database, whitelist_manager, _stats_event_sender, _stats_repository) = initialize_tracker_dependencies(&config); - - let stats_event_sender = Arc::new(stats_event_sender); - - let stats_repository = Arc::new(statistics::repository::Repository::new()); + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); - Tracker::new( - &config.core, - &database, - &whitelist_manager, - &stats_event_sender, - &stats_repository, - ) - .unwrap() + Tracker::new(&config.core, &database, &whitelist_manager).unwrap() } mod connect_request { @@ -587,8 +614,7 @@ mod tests { use crate::servers::udp::connection_cookie::make; use crate::servers::udp::handlers::handle_connect; use crate::servers::udp::handlers::tests::{ - public_tracker, sample_ipv4_remote_addr, sample_ipv4_remote_addr_fingerprint, sample_ipv6_remote_addr_fingerprint, - sample_issue_time, test_tracker_factory, + sample_ipv4_remote_addr, sample_ipv4_remote_addr_fingerprint, sample_ipv6_remote_addr_fingerprint, sample_issue_time, }; fn sample_connect_request() -> ConnectRequest { @@ -599,11 +625,14 @@ mod tests { #[tokio::test] async fn a_connect_response_should_contain_the_same_transaction_id_as_the_connect_request() { + let (stats_event_sender, _stats_repository) = crate::core::services::statistics::setup::factory(false); + let stats_event_sender = Arc::new(stats_event_sender); + let request = ConnectRequest { transaction_id: TransactionId(0i32.into()), }; - let response = handle_connect(sample_ipv4_remote_addr(), &request, &public_tracker(), sample_issue_time()).await; + let response = handle_connect(sample_ipv4_remote_addr(), &request, &stats_event_sender, sample_issue_time()).await; assert_eq!( response, @@ -616,11 +645,14 @@ mod tests { #[tokio::test] async fn a_connect_response_should_contain_a_new_connection_id() { + let (stats_event_sender, _stats_repository) = crate::core::services::statistics::setup::factory(false); + let stats_event_sender = Arc::new(stats_event_sender); + let request = ConnectRequest { transaction_id: TransactionId(0i32.into()), }; - let response = handle_connect(sample_ipv4_remote_addr(), &request, &public_tracker(), sample_issue_time()).await; + let response = handle_connect(sample_ipv4_remote_addr(), &request, &stats_event_sender, sample_issue_time()).await; assert_eq!( response, @@ -633,11 +665,14 @@ mod tests { #[tokio::test] async fn a_connect_response_should_contain_a_new_connection_id_ipv6() { + let (stats_event_sender, _stats_repository) = crate::core::services::statistics::setup::factory(false); + let stats_event_sender = Arc::new(stats_event_sender); + let request = ConnectRequest { transaction_id: TransactionId(0i32.into()), }; - let response = handle_connect(sample_ipv6_remote_addr(), &request, &public_tracker(), sample_issue_time()).await; + let response = handle_connect(sample_ipv6_remote_addr(), &request, &stats_event_sender, sample_issue_time()).await; assert_eq!( response, @@ -656,15 +691,15 @@ mod tests { .with(eq(statistics::event::Event::Udp4Connect)) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); - let stats_event_sender = Box::new(stats_event_sender_mock); + let stats_event_sender: Arc>> = + Arc::new(Some(Box::new(stats_event_sender_mock))); let client_socket_address = sample_ipv4_socket_address(); - let torrent_tracker = Arc::new(test_tracker_factory(Some(stats_event_sender))); handle_connect( client_socket_address, &sample_connect_request(), - &torrent_tracker, + &stats_event_sender, sample_issue_time(), ) .await; @@ -678,13 +713,13 @@ mod tests { .with(eq(statistics::event::Event::Udp6Connect)) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); - let stats_event_sender = Box::new(stats_event_sender_mock); + let stats_event_sender: Arc>> = + Arc::new(Some(Box::new(stats_event_sender_mock))); - let torrent_tracker = Arc::new(test_tracker_factory(Some(stats_event_sender))); handle_connect( sample_ipv6_remote_addr(), &sample_connect_request(), - &torrent_tracker, + &stats_event_sender, sample_issue_time(), ) .await; @@ -787,7 +822,9 @@ mod tests { #[tokio::test] async fn an_announced_peer_should_be_added_to_the_tracker() { - let tracker = public_tracker(); + let (tracker, stats_event_sender) = public_tracker(); + let tracker = Arc::new(tracker); + let stats_event_sender = Arc::new(stats_event_sender); let client_ip = Ipv4Addr::new(126, 0, 0, 1); let client_port = 8080; @@ -804,9 +841,15 @@ mod tests { .with_port(client_port) .into(); - handle_announce(remote_addr, &request, &tracker, sample_cookie_valid_range()) - .await - .unwrap(); + handle_announce( + remote_addr, + &request, + &tracker, + &stats_event_sender, + sample_cookie_valid_range(), + ) + .await + .unwrap(); let peers = tracker.get_torrent_peers(&info_hash.0.into()); @@ -820,15 +863,25 @@ mod tests { #[tokio::test] async fn the_announced_peer_should_not_be_included_in_the_response() { + let (tracker, stats_event_sender) = public_tracker(); + let tracker = Arc::new(tracker); + let stats_event_sender = Arc::new(stats_event_sender); + let remote_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), 8080); let request = AnnounceRequestBuilder::default() .with_connection_id(make(gen_remote_fingerprint(&remote_addr), sample_issue_time()).unwrap()) .into(); - let response = handle_announce(remote_addr, &request, &public_tracker(), sample_cookie_valid_range()) - .await - .unwrap(); + let response = handle_announce( + remote_addr, + &request, + &tracker, + &stats_event_sender, + sample_cookie_valid_range(), + ) + .await + .unwrap(); let empty_peer_vector: Vec> = vec![]; assert_eq!( @@ -851,7 +904,9 @@ mod tests { // From the BEP 15 (https://www.bittorrent.org/beps/bep_0015.html): // "Do note that most trackers will only honor the IP address field under limited circumstances." - let tracker = public_tracker(); + let (tracker, stats_event_sender) = public_tracker(); + let tracker = Arc::new(tracker); + let stats_event_sender = Arc::new(stats_event_sender); let info_hash = AquaticInfoHash([0u8; 20]); let peer_id = AquaticPeerId([255u8; 20]); @@ -871,9 +926,15 @@ mod tests { .with_port(client_port) .into(); - handle_announce(remote_addr, &request, &tracker, sample_cookie_valid_range()) - .await - .unwrap(); + handle_announce( + remote_addr, + &request, + &tracker, + &stats_event_sender, + sample_cookie_valid_range(), + ) + .await + .unwrap(); let peers = tracker.get_torrent_peers(&info_hash.0.into()); @@ -897,19 +958,29 @@ mod tests { } async fn announce_a_new_peer_using_ipv4(tracker: Arc) -> Response { + let (stats_event_sender, _stats_repository) = crate::core::services::statistics::setup::factory(false); + let stats_event_sender = Arc::new(stats_event_sender); + let remote_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), 8080); let request = AnnounceRequestBuilder::default() .with_connection_id(make(gen_remote_fingerprint(&remote_addr), sample_issue_time()).unwrap()) .into(); - handle_announce(remote_addr, &request, &tracker, sample_cookie_valid_range()) - .await - .unwrap() + handle_announce( + remote_addr, + &request, + &tracker, + &stats_event_sender, + sample_cookie_valid_range(), + ) + .await + .unwrap() } #[tokio::test] async fn when_the_announce_request_comes_from_a_client_using_ipv4_the_response_should_not_include_peers_using_ipv6() { - let tracker = public_tracker(); + let (tracker, _stats_event_sender) = public_tracker(); + let tracker = Arc::new(tracker); add_a_torrent_peer_using_ipv6(&tracker); @@ -932,14 +1003,16 @@ mod tests { .with(eq(statistics::event::Event::Udp4Announce)) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); - let stats_event_sender = Box::new(stats_event_sender_mock); + let stats_event_sender: Arc>> = + Arc::new(Some(Box::new(stats_event_sender_mock))); - let tracker = Arc::new(test_tracker_factory(Some(stats_event_sender))); + let tracker = Arc::new(test_tracker_factory()); handle_announce( sample_ipv4_socket_address(), &AnnounceRequestBuilder::default().into(), &tracker, + &stats_event_sender, sample_cookie_valid_range(), ) .await @@ -961,7 +1034,9 @@ mod tests { #[tokio::test] async fn the_peer_ip_should_be_changed_to_the_external_ip_in_the_tracker_configuration_if_defined() { - let tracker = public_tracker(); + let (tracker, stats_event_sender) = public_tracker(); + let tracker = Arc::new(tracker); + let stats_event_sender = Arc::new(stats_event_sender); let client_ip = Ipv4Addr::new(127, 0, 0, 1); let client_port = 8080; @@ -978,9 +1053,15 @@ mod tests { .with_port(client_port) .into(); - handle_announce(remote_addr, &request, &tracker, sample_cookie_valid_range()) - .await - .unwrap(); + handle_announce( + remote_addr, + &request, + &tracker, + &stats_event_sender, + sample_cookie_valid_range(), + ) + .await + .unwrap(); let peers = tracker.get_torrent_peers(&info_hash.0.into()); @@ -1019,7 +1100,9 @@ mod tests { #[tokio::test] async fn an_announced_peer_should_be_added_to_the_tracker() { - let tracker = public_tracker(); + let (tracker, stats_event_sender) = public_tracker(); + let tracker = Arc::new(tracker); + let stats_event_sender = Arc::new(stats_event_sender); let client_ip_v4 = Ipv4Addr::new(126, 0, 0, 1); let client_ip_v6 = client_ip_v4.to_ipv6_compatible(); @@ -1037,9 +1120,15 @@ mod tests { .with_port(client_port) .into(); - handle_announce(remote_addr, &request, &tracker, sample_cookie_valid_range()) - .await - .unwrap(); + handle_announce( + remote_addr, + &request, + &tracker, + &stats_event_sender, + sample_cookie_valid_range(), + ) + .await + .unwrap(); let peers = tracker.get_torrent_peers(&info_hash.0.into()); @@ -1053,6 +1142,10 @@ mod tests { #[tokio::test] async fn the_announced_peer_should_not_be_included_in_the_response() { + let (tracker, stats_event_sender) = public_tracker(); + let tracker = Arc::new(tracker); + let stats_event_sender = Arc::new(stats_event_sender); + let client_ip_v4 = Ipv4Addr::new(126, 0, 0, 1); let client_ip_v6 = client_ip_v4.to_ipv6_compatible(); @@ -1062,9 +1155,15 @@ mod tests { .with_connection_id(make(gen_remote_fingerprint(&remote_addr), sample_issue_time()).unwrap()) .into(); - let response = handle_announce(remote_addr, &request, &public_tracker(), sample_cookie_valid_range()) - .await - .unwrap(); + let response = handle_announce( + remote_addr, + &request, + &tracker, + &stats_event_sender, + sample_cookie_valid_range(), + ) + .await + .unwrap(); let empty_peer_vector: Vec> = vec![]; assert_eq!( @@ -1087,7 +1186,9 @@ mod tests { // From the BEP 15 (https://www.bittorrent.org/beps/bep_0015.html): // "Do note that most trackers will only honor the IP address field under limited circumstances." - let tracker = public_tracker(); + let (tracker, stats_event_sender) = public_tracker(); + let tracker = Arc::new(tracker); + let stats_event_sender = Arc::new(stats_event_sender); let info_hash = AquaticInfoHash([0u8; 20]); let peer_id = AquaticPeerId([255u8; 20]); @@ -1107,9 +1208,15 @@ mod tests { .with_port(client_port) .into(); - handle_announce(remote_addr, &request, &tracker, sample_cookie_valid_range()) - .await - .unwrap(); + handle_announce( + remote_addr, + &request, + &tracker, + &stats_event_sender, + sample_cookie_valid_range(), + ) + .await + .unwrap(); let peers = tracker.get_torrent_peers(&info_hash.0.into()); @@ -1133,6 +1240,9 @@ mod tests { } async fn announce_a_new_peer_using_ipv6(tracker: Arc) -> Response { + let (stats_event_sender, _stats_repository) = crate::core::services::statistics::setup::factory(false); + let stats_event_sender = Arc::new(stats_event_sender); + let client_ip_v4 = Ipv4Addr::new(126, 0, 0, 1); let client_ip_v6 = client_ip_v4.to_ipv6_compatible(); let client_port = 8080; @@ -1141,14 +1251,21 @@ mod tests { .with_connection_id(make(gen_remote_fingerprint(&remote_addr), sample_issue_time()).unwrap()) .into(); - handle_announce(remote_addr, &request, &tracker, sample_cookie_valid_range()) - .await - .unwrap() + handle_announce( + remote_addr, + &request, + &tracker, + &stats_event_sender, + sample_cookie_valid_range(), + ) + .await + .unwrap() } #[tokio::test] async fn when_the_announce_request_comes_from_a_client_using_ipv6_the_response_should_not_include_peers_using_ipv4() { - let tracker = public_tracker(); + let (tracker, _stats_event_sender) = public_tracker(); + let tracker = Arc::new(tracker); add_a_torrent_peer_using_ipv4(&tracker); @@ -1171,9 +1288,10 @@ mod tests { .with(eq(statistics::event::Event::Udp6Announce)) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); - let stats_event_sender = Box::new(stats_event_sender_mock); + let stats_event_sender: Arc>> = + Arc::new(Some(Box::new(stats_event_sender_mock))); - let tracker = Arc::new(test_tracker_factory(Some(stats_event_sender))); + let tracker = Arc::new(test_tracker_factory()); let remote_addr = sample_ipv6_remote_addr(); @@ -1181,20 +1299,27 @@ mod tests { .with_connection_id(make(gen_remote_fingerprint(&remote_addr), sample_issue_time()).unwrap()) .into(); - handle_announce(remote_addr, &announce_request, &tracker, sample_cookie_valid_range()) - .await - .unwrap(); + handle_announce( + remote_addr, + &announce_request, + &tracker, + &stats_event_sender, + sample_cookie_valid_range(), + ) + .await + .unwrap(); } mod from_a_loopback_ip { + use std::future; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::Arc; use aquatic_udp_protocol::{InfoHash as AquaticInfoHash, PeerId as AquaticPeerId}; + use mockall::predicate::eq; use crate::bootstrap::app::initialize_tracker_dependencies; - use crate::core; - use crate::core::statistics::keeper::Keeper; + use crate::core::{self, statistics}; use crate::servers::udp::connection_cookie::make; use crate::servers::udp::handlers::handle_announce; use crate::servers::udp::handlers::tests::announce_request::AnnounceRequestBuilder; @@ -1206,21 +1331,18 @@ mod tests { async fn the_peer_ip_should_be_changed_to_the_external_ip_in_the_tracker_configuration() { let config = Arc::new(TrackerConfigurationBuilder::default().with_external_ip("::126.0.0.1").into()); - let (database, whitelist_manager, _stats_event_sender, _stats_repository) = - initialize_tracker_dependencies(&config); + let (database, whitelist_manager) = initialize_tracker_dependencies(&config); - let (stats_event_sender, stats_repository) = Keeper::new_active_instance(); + let mut stats_event_sender_mock = statistics::event::sender::MockSender::new(); + stats_event_sender_mock + .expect_send_event() + .with(eq(statistics::event::Event::Udp6Announce)) + .times(1) + .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + let stats_event_sender: Arc>> = + Arc::new(Some(Box::new(stats_event_sender_mock))); - let tracker = Arc::new( - core::Tracker::new( - &config.core, - &database, - &whitelist_manager, - &Arc::new(Some(stats_event_sender)), - &Arc::new(stats_repository), - ) - .unwrap(), - ); + let tracker = Arc::new(core::Tracker::new(&config.core, &database, &whitelist_manager).unwrap()); let loopback_ipv4 = Ipv4Addr::new(127, 0, 0, 1); let loopback_ipv6 = Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1); @@ -1242,9 +1364,15 @@ mod tests { .with_port(client_port) .into(); - handle_announce(remote_addr, &request, &tracker, sample_cookie_valid_range()) - .await - .unwrap(); + handle_announce( + remote_addr, + &request, + &tracker, + &stats_event_sender, + sample_cookie_valid_range(), + ) + .await + .unwrap(); let peers = tracker.get_torrent_peers(&info_hash.0.into()); @@ -1273,6 +1401,7 @@ mod tests { }; use super::{gen_remote_fingerprint, TorrentPeerBuilder}; + use crate::core::services::statistics; use crate::core::{self}; use crate::servers::udp::connection_cookie::make; use crate::servers::udp::handlers::handle_scrape; @@ -1290,6 +1419,10 @@ mod tests { #[tokio::test] async fn should_return_no_stats_when_the_tracker_does_not_have_any_torrent() { + let (tracker, stats_event_sender) = public_tracker(); + let tracker = Arc::new(tracker); + let stats_event_sender = Arc::new(stats_event_sender); + let remote_addr = sample_ipv4_remote_addr(); let info_hash = InfoHash([0u8; 20]); @@ -1301,9 +1434,15 @@ mod tests { info_hashes, }; - let response = handle_scrape(remote_addr, &request, &public_tracker(), sample_cookie_valid_range()) - .await - .unwrap(); + let response = handle_scrape( + remote_addr, + &request, + &tracker, + &stats_event_sender, + sample_cookie_valid_range(), + ) + .await + .unwrap(); let expected_torrent_stats = vec![zeroed_torrent_statistics()]; @@ -1339,6 +1478,9 @@ mod tests { } async fn add_a_sample_seeder_and_scrape(tracker: Arc) -> Response { + let (stats_event_sender, _stats_repository) = statistics::setup::factory(false); + let stats_event_sender = Arc::new(stats_event_sender); + let remote_addr = sample_ipv4_remote_addr(); let info_hash = InfoHash([0u8; 20]); @@ -1346,9 +1488,15 @@ mod tests { let request = build_scrape_request(&remote_addr, &info_hash); - handle_scrape(remote_addr, &request, &tracker, sample_cookie_valid_range()) - .await - .unwrap() + handle_scrape( + remote_addr, + &request, + &tracker, + &stats_event_sender, + sample_cookie_valid_range(), + ) + .await + .unwrap() } fn match_scrape_response(response: Response) -> Option { @@ -1359,6 +1507,8 @@ mod tests { } mod with_a_public_tracker { + use std::sync::Arc; + use aquatic_udp_protocol::{NumberOfDownloads, NumberOfPeers, TorrentScrapeStatistics}; use crate::servers::udp::handlers::tests::public_tracker; @@ -1366,7 +1516,8 @@ mod tests { #[tokio::test] async fn should_return_torrent_statistics_when_the_tracker_has_the_requested_torrent() { - let tracker = public_tracker(); + let (tracker, _stats_event_sender) = public_tracker(); + let tracker = Arc::new(tracker); let torrent_stats = match_scrape_response(add_a_sample_seeder_and_scrape(tracker.clone()).await); @@ -1381,6 +1532,8 @@ mod tests { } mod with_a_whitelisted_tracker { + use std::sync::Arc; + use aquatic_udp_protocol::{InfoHash, NumberOfDownloads, NumberOfPeers, TorrentScrapeStatistics}; use crate::servers::udp::handlers::handle_scrape; @@ -1391,7 +1544,9 @@ mod tests { #[tokio::test] async fn should_return_the_torrent_statistics_when_the_requested_torrent_is_whitelisted() { - let tracker = whitelisted_tracker(); + let (tracker, stats_event_sender) = whitelisted_tracker(); + let tracker = Arc::new(tracker); + let stats_event_sender = Arc::new(stats_event_sender); let remote_addr = sample_ipv4_remote_addr(); let info_hash = InfoHash([0u8; 20]); @@ -1406,9 +1561,15 @@ mod tests { let request = build_scrape_request(&remote_addr, &info_hash); let torrent_stats = match_scrape_response( - handle_scrape(remote_addr, &request, &tracker, sample_cookie_valid_range()) - .await - .unwrap(), + handle_scrape( + remote_addr, + &request, + &tracker, + &stats_event_sender, + sample_cookie_valid_range(), + ) + .await + .unwrap(), ) .unwrap(); @@ -1423,7 +1584,9 @@ mod tests { #[tokio::test] async fn should_return_zeroed_statistics_when_the_requested_torrent_is_not_whitelisted() { - let tracker = whitelisted_tracker(); + let (tracker, stats_event_sender) = whitelisted_tracker(); + let tracker = Arc::new(tracker); + let stats_event_sender = Arc::new(stats_event_sender); let remote_addr = sample_ipv4_remote_addr(); let info_hash = InfoHash([0u8; 20]); @@ -1433,9 +1596,15 @@ mod tests { let request = build_scrape_request(&remote_addr, &info_hash); let torrent_stats = match_scrape_response( - handle_scrape(remote_addr, &request, &tracker, sample_cookie_valid_range()) - .await - .unwrap(), + handle_scrape( + remote_addr, + &request, + &tracker, + &stats_event_sender, + sample_cookie_valid_range(), + ) + .await + .unwrap(), ) .unwrap(); @@ -1477,15 +1646,17 @@ mod tests { .with(eq(statistics::event::Event::Udp4Scrape)) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); - let stats_event_sender = Box::new(stats_event_sender_mock); + let stats_event_sender: Arc>> = + Arc::new(Some(Box::new(stats_event_sender_mock))); let remote_addr = sample_ipv4_remote_addr(); - let tracker = Arc::new(test_tracker_factory(Some(stats_event_sender))); + let tracker = Arc::new(test_tracker_factory()); handle_scrape( remote_addr, &sample_scrape_request(&remote_addr), &tracker, + &stats_event_sender, sample_cookie_valid_range(), ) .await @@ -1514,15 +1685,17 @@ mod tests { .with(eq(statistics::event::Event::Udp6Scrape)) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); - let stats_event_sender = Box::new(stats_event_sender_mock); + let stats_event_sender: Arc>> = + Arc::new(Some(Box::new(stats_event_sender_mock))); let remote_addr = sample_ipv6_remote_addr(); - let tracker = Arc::new(test_tracker_factory(Some(stats_event_sender))); + let tracker = Arc::new(test_tracker_factory()); handle_scrape( remote_addr, &sample_scrape_request(&remote_addr), &tracker, + &stats_event_sender, sample_cookie_valid_range(), ) .await diff --git a/src/servers/udp/server/launcher.rs b/src/servers/udp/server/launcher.rs index 753dc9915..d71ffcfd1 100644 --- a/src/servers/udp/server/launcher.rs +++ b/src/servers/udp/server/launcher.rs @@ -13,6 +13,7 @@ use tracing::instrument; use super::banning::BanService; use super::request_buffer::ActiveRequests; use crate::bootstrap::jobs::Started; +use crate::core::statistics::event::sender::Sender; use crate::core::{statistics, Tracker}; use crate::servers::logging::STARTED_ON; use crate::servers::registar::ServiceHealthCheckJob; @@ -40,9 +41,10 @@ impl Launcher { /// It panics if unable to send address of socket. /// It panics if the udp server is loaded when the tracker is private. /// - #[instrument(skip(tracker, ban_service, bind_to, tx_start, rx_halt))] + #[instrument(skip(tracker, opt_stats_event_sender, ban_service, bind_to, tx_start, rx_halt))] pub async fn run_with_graceful_shutdown( tracker: Arc, + opt_stats_event_sender: Arc>>, ban_service: Arc>, bind_to: SocketAddr, cookie_lifetime: Duration, @@ -81,7 +83,14 @@ impl Launcher { let local_addr = local_udp_url.clone(); tokio::task::spawn(async move { tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_with_graceful_shutdown::task (listening...)"); - let () = Self::run_udp_server_main(receiver, tracker.clone(), ban_service.clone(), cookie_lifetime).await; + let () = Self::run_udp_server_main( + receiver, + tracker.clone(), + opt_stats_event_sender.clone(), + ban_service.clone(), + cookie_lifetime, + ) + .await; }) }; @@ -118,10 +127,11 @@ impl Launcher { ServiceHealthCheckJob::new(binding, info, job) } - #[instrument(skip(receiver, tracker, ban_service))] + #[instrument(skip(receiver, tracker, opt_stats_event_sender, ban_service))] async fn run_udp_server_main( mut receiver: Receiver, tracker: Arc, + opt_stats_event_sender: Arc>>, ban_service: Arc>, cookie_lifetime: Duration, ) { @@ -165,24 +175,35 @@ impl Launcher { } }; - match req.from.ip() { - IpAddr::V4(_) => { - tracker.send_stats_event(statistics::event::Event::Udp4Request).await; - } - IpAddr::V6(_) => { - tracker.send_stats_event(statistics::event::Event::Udp6Request).await; + if let Some(stats_event_sender) = opt_stats_event_sender.as_deref() { + match req.from.ip() { + IpAddr::V4(_) => { + stats_event_sender.send_event(statistics::event::Event::Udp4Request).await; + } + IpAddr::V6(_) => { + stats_event_sender.send_event(statistics::event::Event::Udp6Request).await; + } } } if ban_service.read().await.is_banned(&req.from.ip()) { tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop continue: (banned ip)"); - tracker.send_stats_event(statistics::event::Event::UdpRequestBanned).await; + if let Some(stats_event_sender) = opt_stats_event_sender.as_deref() { + stats_event_sender + .send_event(statistics::event::Event::UdpRequestBanned) + .await; + } continue; } - let processor = Processor::new(receiver.socket.clone(), tracker.clone(), cookie_lifetime); + let processor = Processor::new( + receiver.socket.clone(), + tracker.clone(), + opt_stats_event_sender.clone(), + cookie_lifetime, + ); /* We spawn the new task even if the active requests buffer is full. This could seem counterintuitive because we are accepting @@ -206,7 +227,12 @@ impl Launcher { if old_request_aborted { // Evicted task from active requests buffer was aborted. - tracker.send_stats_event(statistics::event::Event::UdpRequestAborted).await; + + if let Some(stats_event_sender) = opt_stats_event_sender.as_deref() { + stats_event_sender + .send_event(statistics::event::Event::UdpRequestAborted) + .await; + }; } } else { tokio::task::yield_now().await; diff --git a/src/servers/udp/server/mod.rs b/src/servers/udp/server/mod.rs index 6eb98a7b1..ffccad44e 100644 --- a/src/servers/udp/server/mod.rs +++ b/src/servers/udp/server/mod.rs @@ -64,6 +64,7 @@ mod tests { use super::spawner::Spawner; use super::Server; use crate::bootstrap::app::initialize_with_configuration; + use crate::core::services::statistics; use crate::servers::registar::Registar; use crate::servers::udp::server::banning::BanService; use crate::servers::udp::server::launcher::MAX_CONNECTION_ID_ERRORS_PER_IP; @@ -72,8 +73,11 @@ mod tests { async fn it_should_be_able_to_start_and_stop() { let cfg = Arc::new(ephemeral_public()); - let tracker = initialize_with_configuration(&cfg); + let (stats_event_sender, stats_repository) = statistics::setup::factory(cfg.core.tracker_usage_statistics); + let stats_event_sender = Arc::new(stats_event_sender); + let _stats_repository = Arc::new(stats_repository); let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); + let tracker = initialize_with_configuration(&cfg); let udp_trackers = cfg.udp_trackers.clone().expect("missing UDP trackers configuration"); let config = &udp_trackers[0]; @@ -83,7 +87,13 @@ mod tests { let stopped = Server::new(Spawner::new(bind_to)); let started = stopped - .start(tracker, ban_service, register.give_form(), config.cookie_lifetime) + .start( + tracker, + stats_event_sender, + ban_service, + register.give_form(), + config.cookie_lifetime, + ) .await .expect("it should start the server"); @@ -98,8 +108,11 @@ mod tests { async fn it_should_be_able_to_start_and_stop_with_wait() { let cfg = Arc::new(ephemeral_public()); - let tracker = initialize_with_configuration(&cfg); + let (stats_event_sender, stats_repository) = statistics::setup::factory(cfg.core.tracker_usage_statistics); + let stats_event_sender = Arc::new(stats_event_sender); + let _stats_repository = Arc::new(stats_repository); let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); + let tracker = initialize_with_configuration(&cfg); let config = &cfg.udp_trackers.as_ref().unwrap().first().unwrap(); let bind_to = config.bind_address; @@ -108,7 +121,13 @@ mod tests { let stopped = Server::new(Spawner::new(bind_to)); let started = stopped - .start(tracker, ban_service, register.give_form(), config.cookie_lifetime) + .start( + tracker, + stats_event_sender, + ban_service, + register.give_form(), + config.cookie_lifetime, + ) .await .expect("it should start the server"); diff --git a/src/servers/udp/server/processor.rs b/src/servers/udp/server/processor.rs index e0f7c4624..2ef7cc482 100644 --- a/src/servers/udp/server/processor.rs +++ b/src/servers/udp/server/processor.rs @@ -10,6 +10,7 @@ use tracing::{instrument, Level}; use super::banning::BanService; use super::bound_socket::BoundSocket; +use crate::core::statistics::event::sender::Sender; use crate::core::statistics::event::UdpResponseKind; use crate::core::{statistics, Tracker}; use crate::servers::udp::handlers::CookieTimeValues; @@ -18,14 +19,21 @@ use crate::servers::udp::{handlers, RawRequest}; pub struct Processor { socket: Arc, tracker: Arc, + opt_stats_event_sender: Arc>>, cookie_lifetime: f64, } impl Processor { - pub fn new(socket: Arc, tracker: Arc, cookie_lifetime: f64) -> Self { + pub fn new( + socket: Arc, + tracker: Arc, + opt_stats_event_sender: Arc>>, + cookie_lifetime: f64, + ) -> Self { Self { socket, tracker, + opt_stats_event_sender, cookie_lifetime, } } @@ -39,6 +47,7 @@ impl Processor { let response = handlers::handle_packet( request, &self.tracker, + &self.opt_stats_event_sender, self.socket.address(), CookieTimeValues::new(self.cookie_lifetime), ban_service, @@ -84,22 +93,24 @@ impl Processor { tracing::debug!(%bytes_count, %sent_bytes, "sent {response_type}"); } - match target.ip() { - IpAddr::V4(_) => { - self.tracker - .send_stats_event(statistics::event::Event::Udp4Response { - kind: response_kind, - req_processing_time, - }) - .await; - } - IpAddr::V6(_) => { - self.tracker - .send_stats_event(statistics::event::Event::Udp6Response { - kind: response_kind, - req_processing_time, - }) - .await; + if let Some(stats_event_sender) = self.opt_stats_event_sender.as_deref() { + match target.ip() { + IpAddr::V4(_) => { + stats_event_sender + .send_event(statistics::event::Event::Udp4Response { + kind: response_kind, + req_processing_time, + }) + .await; + } + IpAddr::V6(_) => { + stats_event_sender + .send_event(statistics::event::Event::Udp6Response { + kind: response_kind, + req_processing_time, + }) + .await; + } } } } diff --git a/src/servers/udp/server/spawner.rs b/src/servers/udp/server/spawner.rs index ce2fe8eae..5d7a97877 100644 --- a/src/servers/udp/server/spawner.rs +++ b/src/servers/udp/server/spawner.rs @@ -11,6 +11,7 @@ use tokio::task::JoinHandle; use super::banning::BanService; use super::launcher::Launcher; use crate::bootstrap::jobs::Started; +use crate::core::statistics::event::sender::Sender; use crate::core::Tracker; use crate::servers::signals::Halted; @@ -29,6 +30,7 @@ impl Spawner { pub fn spawn_launcher( &self, tracker: Arc, + opt_stats_event_sender: Arc>>, ban_service: Arc>, cookie_lifetime: Duration, tx_start: oneshot::Sender, @@ -37,7 +39,16 @@ impl Spawner { let spawner = Self::new(self.bind_to); tokio::spawn(async move { - Launcher::run_with_graceful_shutdown(tracker, ban_service, spawner.bind_to, cookie_lifetime, tx_start, rx_halt).await; + Launcher::run_with_graceful_shutdown( + tracker, + opt_stats_event_sender, + ban_service, + spawner.bind_to, + cookie_lifetime, + tx_start, + rx_halt, + ) + .await; spawner }) } diff --git a/src/servers/udp/server/states.rs b/src/servers/udp/server/states.rs index 02742049d..5cdca5a7d 100644 --- a/src/servers/udp/server/states.rs +++ b/src/servers/udp/server/states.rs @@ -13,6 +13,7 @@ use super::banning::BanService; use super::spawner::Spawner; use super::{Server, UdpError}; use crate::bootstrap::jobs::Started; +use crate::core::statistics::event::sender::Sender; use crate::core::Tracker; use crate::servers::registar::{ServiceRegistration, ServiceRegistrationForm}; use crate::servers::signals::Halted; @@ -64,10 +65,11 @@ impl Server { /// /// It panics if unable to receive the bound socket address from service. /// - #[instrument(skip(self, tracker, ban_service, form), err, ret(Display, level = Level::INFO))] + #[instrument(skip(self, tracker, opt_stats_event_sender, ban_service, form), err, ret(Display, level = Level::INFO))] pub async fn start( self, tracker: Arc, + opt_stats_event_sender: Arc>>, ban_service: Arc>, form: ServiceRegistrationForm, cookie_lifetime: Duration, @@ -78,10 +80,14 @@ impl Server { assert!(!tx_halt.is_closed(), "Halt channel for UDP tracker should be open"); // May need to wrap in a task to about a tokio bug. - let task = self - .state - .spawner - .spawn_launcher(tracker, ban_service, cookie_lifetime, tx_start, rx_halt); + let task = self.state.spawner.spawn_launcher( + tracker, + opt_stats_event_sender, + ban_service, + cookie_lifetime, + tx_start, + rx_halt, + ); let local_addr = rx_start.await.expect("it should be able to start the service").address; diff --git a/tests/servers/api/environment.rs b/tests/servers/api/environment.rs index 37d031e1c..6658c27da 100644 --- a/tests/servers/api/environment.rs +++ b/tests/servers/api/environment.rs @@ -8,6 +8,9 @@ use torrust_tracker_api_client::connection_info::{ConnectionInfo, Origin}; use torrust_tracker_configuration::{Configuration, HttpApi}; use torrust_tracker_lib::bootstrap::app::initialize_with_configuration; use torrust_tracker_lib::bootstrap::jobs::make_rust_tls; +use torrust_tracker_lib::core::services::statistics; +use torrust_tracker_lib::core::statistics::event::sender::Sender; +use torrust_tracker_lib::core::statistics::repository::Repository; use torrust_tracker_lib::core::whitelist::WhiteListManager; use torrust_tracker_lib::core::Tracker; use torrust_tracker_lib::servers::apis::server::{ApiServer, Launcher, Running, Stopped}; @@ -22,6 +25,8 @@ where { pub config: Arc, pub tracker: Arc, + pub stats_event_sender: Arc>>, + pub stats_repository: Arc, pub whitelist_manager: Arc, pub ban_service: Arc>, pub registar: Registar, @@ -40,13 +45,16 @@ where impl Environment { pub fn new(configuration: &Arc) -> Self { + let (stats_event_sender, stats_repository) = statistics::setup::factory(configuration.core.tracker_usage_statistics); + let stats_event_sender = Arc::new(stats_event_sender); + let stats_repository = Arc::new(stats_repository); + let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); + let tracker = initialize_with_configuration(configuration); - // todo: get from `initialize_with_configuration` + // todo: instantiate outside of `initialize_with_configuration` let whitelist_manager = tracker.whitelist_manager.clone(); - let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); - let config = Arc::new(configuration.http_api.clone().expect("missing API configuration")); let bind_to = config.bind_address; @@ -58,6 +66,8 @@ impl Environment { Self { config, tracker, + stats_event_sender, + stats_repository, whitelist_manager, ban_service, registar: Registar::default(), @@ -71,12 +81,21 @@ impl Environment { Environment { config: self.config, tracker: self.tracker.clone(), + stats_event_sender: self.stats_event_sender.clone(), + stats_repository: self.stats_repository.clone(), whitelist_manager: self.whitelist_manager.clone(), ban_service: self.ban_service.clone(), registar: self.registar.clone(), server: self .server - .start(self.tracker, self.ban_service, self.registar.give_form(), access_tokens) + .start( + self.tracker, + self.stats_event_sender, + self.stats_repository, + self.ban_service, + self.registar.give_form(), + access_tokens, + ) .await .unwrap(), } @@ -92,6 +111,8 @@ impl Environment { Environment { config: self.config, tracker: self.tracker, + stats_event_sender: self.stats_event_sender, + stats_repository: self.stats_repository, whitelist_manager: self.whitelist_manager, ban_service: self.ban_service, registar: Registar::default(), diff --git a/tests/servers/http/environment.rs b/tests/servers/http/environment.rs index 6d4001e6c..e0eb6ff12 100644 --- a/tests/servers/http/environment.rs +++ b/tests/servers/http/environment.rs @@ -5,6 +5,8 @@ use futures::executor::block_on; use torrust_tracker_configuration::{Configuration, HttpTracker}; use torrust_tracker_lib::bootstrap::app::initialize_with_configuration; use torrust_tracker_lib::bootstrap::jobs::make_rust_tls; +use torrust_tracker_lib::core::services::statistics; +use torrust_tracker_lib::core::statistics::event::sender::Sender; use torrust_tracker_lib::core::whitelist::WhiteListManager; use torrust_tracker_lib::core::Tracker; use torrust_tracker_lib::servers::http::server::{HttpServer, Launcher, Running, Stopped}; @@ -14,6 +16,7 @@ use torrust_tracker_primitives::peer; pub struct Environment { pub config: Arc, pub tracker: Arc, + pub stats_event_sender: Arc>>, pub whitelist_manager: Arc, pub registar: Registar, pub server: HttpServer, @@ -29,8 +32,12 @@ impl Environment { impl Environment { #[allow(dead_code)] pub fn new(configuration: &Arc) -> Self { + let (stats_event_sender, _stats_repository) = statistics::setup::factory(configuration.core.tracker_usage_statistics); + let stats_event_sender = Arc::new(stats_event_sender); + let tracker = initialize_with_configuration(configuration); + // todo: instantiate outside of `initialize_with_configuration` let whitelist_manager = tracker.whitelist_manager.clone(); let http_tracker = configuration @@ -49,6 +56,7 @@ impl Environment { Self { config, tracker, + stats_event_sender, whitelist_manager, registar: Registar::default(), server, @@ -60,9 +68,14 @@ impl Environment { Environment { config: self.config, tracker: self.tracker.clone(), + stats_event_sender: self.stats_event_sender.clone(), whitelist_manager: self.whitelist_manager.clone(), registar: self.registar.clone(), - server: self.server.start(self.tracker, self.registar.give_form()).await.unwrap(), + server: self + .server + .start(self.tracker, self.stats_event_sender, self.registar.give_form()) + .await + .unwrap(), } } } @@ -76,6 +89,7 @@ impl Environment { Environment { config: self.config, tracker: self.tracker, + stats_event_sender: self.stats_event_sender, whitelist_manager: self.whitelist_manager, registar: Registar::default(), diff --git a/tests/servers/udp/environment.rs b/tests/servers/udp/environment.rs index f744809c5..26fe11e27 100644 --- a/tests/servers/udp/environment.rs +++ b/tests/servers/udp/environment.rs @@ -5,6 +5,8 @@ use bittorrent_primitives::info_hash::InfoHash; use tokio::sync::RwLock; use torrust_tracker_configuration::{Configuration, UdpTracker, DEFAULT_TIMEOUT}; use torrust_tracker_lib::bootstrap::app::initialize_with_configuration; +use torrust_tracker_lib::core::services::statistics; +use torrust_tracker_lib::core::statistics::event::sender::Sender; use torrust_tracker_lib::core::Tracker; use torrust_tracker_lib::servers::registar::Registar; use torrust_tracker_lib::servers::udp::server::banning::BanService; @@ -20,6 +22,7 @@ where { pub config: Arc, pub tracker: Arc, + pub stats_event_sender: Arc>>, pub ban_service: Arc>, pub registar: Registar, pub server: Server, @@ -39,9 +42,12 @@ where impl Environment { #[allow(dead_code)] pub fn new(configuration: &Arc) -> Self { - let tracker = initialize_with_configuration(configuration); + let (stats_event_sender, _stats_repository) = statistics::setup::factory(configuration.core.tracker_usage_statistics); + let stats_event_sender = Arc::new(stats_event_sender); let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); + let tracker = initialize_with_configuration(configuration); + let udp_tracker = configuration.udp_trackers.clone().expect("missing UDP tracker configuration"); let config = Arc::new(udp_tracker[0].clone()); @@ -53,6 +59,7 @@ impl Environment { Self { config, tracker, + stats_event_sender, ban_service, registar: Registar::default(), server, @@ -65,11 +72,18 @@ impl Environment { Environment { config: self.config, tracker: self.tracker.clone(), + stats_event_sender: self.stats_event_sender.clone(), ban_service: self.ban_service.clone(), registar: self.registar.clone(), server: self .server - .start(self.tracker, self.ban_service, self.registar.give_form(), cookie_lifetime) + .start( + self.tracker, + self.stats_event_sender, + self.ban_service, + self.registar.give_form(), + cookie_lifetime, + ) .await .unwrap(), } @@ -92,6 +106,7 @@ impl Environment { Environment { config: self.config, tracker: self.tracker, + stats_event_sender: self.stats_event_sender, ban_service: self.ban_service, registar: Registar::default(), server: stopped.expect("it stop the udp tracker service"),