From 25ce977484830d053e7074e9325c52e59dc63538 Mon Sep 17 00:00:00 2001 From: jonathanrainer Date: Mon, 28 Oct 2024 08:33:34 +0000 Subject: [PATCH] FLEET-19 Use correct metrics form Also improve how we handle refreshing the System object, rather than doing it in either the callback or the async task, contain that within a struct and do it there. --- apollo-router/src/plugins/fleet_detector.rs | 125 +++++++++++++++++--- 1 file changed, 106 insertions(+), 19 deletions(-) diff --git a/apollo-router/src/plugins/fleet_detector.rs b/apollo-router/src/plugins/fleet_detector.rs index 78b5d06c998..1a8c784841c 100644 --- a/apollo-router/src/plugins/fleet_detector.rs +++ b/apollo-router/src/plugins/fleet_detector.rs @@ -1,42 +1,112 @@ +use std::env::consts::ARCH; +use std::sync::atomic; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; +use std::sync::Mutex; +use std::time::Duration; +use std::time::Instant; + +use opentelemetry::metrics::MeterProvider; +use opentelemetry_api::metrics::ObservableGauge; +use opentelemetry_api::metrics::Unit; use schemars::JsonSchema; use serde::Deserialize; use sysinfo::System; use tokio::task::JoinHandle; use tower::BoxError; use tracing::debug; -use tracing::info; +use crate::metrics::meter_provider; use crate::plugin::Plugin; use crate::plugin::PluginInit; +const REFRESH_INTERVAL: Duration = Duration::from_secs(60); + +#[derive(Debug, Default, Deserialize, JsonSchema)] +struct Conf {} + +#[derive(Debug)] +struct SystemGetter { + system: System, + start: Instant, + // Duration in milliseconds since the start time, used because Instant is not atomic + // and multiple threads may try to access the system + refresh_at: AtomicU64, +} + +impl SystemGetter { + fn new() -> Self { + let mut system = System::new(); + system.refresh_all(); + Self { + system, + start: Instant::now(), + refresh_at: AtomicU64::new(REFRESH_INTERVAL.as_millis() as u64), + } + } + + fn get_system(&mut self) -> &System { + let refresh_at = Duration::from_millis(self.refresh_at.load(atomic::Ordering::Relaxed)); + if self.start.elapsed() < refresh_at { + &self.system + } else { + self.system.refresh_cpu_all(); + self.system.refresh_memory(); + self.refresh_at = ((refresh_at + REFRESH_INTERVAL).as_millis() as u64).into(); + &self.system + } + } +} + #[derive(Debug)] struct FleetDetector { handle: JoinHandle<()>, + #[allow(dead_code)] + // We have to store a reference to the gauge otherwise it will be dropped once the plugin is + // initialised, even though it still has data to emit + freq_gauge: ObservableGauge, } -#[derive(Debug, Default, Deserialize, JsonSchema)] -struct Conf {} - #[async_trait::async_trait] impl Plugin for FleetDetector { type Config = Conf; async fn new(_: PluginInit) -> Result { - debug!("beginning environment detection"); - debug!("spawning continuous detector task"); - let handle = tokio::task::spawn(async { - let mut sys = System::new_all(); - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); + debug!("beginning environment detection, spawning gauges"); + let system_getter = Arc::new(Mutex::new(SystemGetter::new())); + let meter = meter_provider().meter("apollo/router"); + + let gauge_local_system_getter = system_getter.clone(); + let freq_gauge = meter + .u64_observable_gauge("apollo.router.instance.cpu_freq") + .with_description( + "The CPU frequency of the underlying instance the router is deployed to", + ) + .with_unit(Unit::new("Mhz")) + .with_callback(move |i| { + let mut system_getter = gauge_local_system_getter.lock().unwrap(); + let system = system_getter.get_system(); + let cpus = system.cpus(); + let cpu_freq = + cpus.iter().map(|cpu| cpu.frequency()).sum::() / cpus.len() as u64; + i.observe(cpu_freq, &[]) + }) + .init(); + + debug!("establishing metrics emission task"); + let counter_local_system_getter = system_getter.clone(); + let handle = tokio::task::spawn(async move { + let mut interval = tokio::time::interval(REFRESH_INTERVAL); loop { interval.tick().await; - sys.refresh_cpu_all(); - sys.refresh_memory(); - detect_cpu_values(&sys); - detect_memory_values(&sys); + let mut system_getter = counter_local_system_getter.lock().unwrap(); + let system = system_getter.get_system(); + detect_cpu_values(system); + detect_memory_values(system); } }); - Ok(FleetDetector { handle }) + Ok(FleetDetector { handle, freq_gauge }) } } @@ -47,11 +117,13 @@ impl Drop for FleetDetector { } fn detect_cpu_values(system: &System) { - let cpus = system.cpus(); let cpu_count = detect_cpu_count(system); - let cpu_freq = cpus.iter().map(|cpu| cpu.frequency()).sum::() / cpus.len() as u64; - info!(value.apollo.router.instance.cpu_freq = cpu_freq); - info!(counter.apollo.router.instance.cpu_count = cpu_count); + u64_counter!( + "apollo.router.instance.cpu_count", + "The number of CPUs reported by the instance the router is running on", + cpu_count, + host.arch = get_otel_arch() + ); } #[cfg(not(target_os = "linux"))] @@ -120,7 +192,22 @@ fn detect_cpu_count(system: &System) -> u64 { } fn detect_memory_values(system: &System) { - info!(counter.apollo.router.instance.total_memory = system.total_memory()) + u64_counter!( + "apollo.router.instance.total_memory", + "The amount of memory reported by the instance the router is running on", + system.total_memory() + ); +} + +fn get_otel_arch() -> &'static str { + match ARCH { + "x86_64" => "amd64", + "aarch64" => "arm64", + "arm" => "arm32", + "powerpc" => "ppc32", + "powerpc64" => "ppc64", + a => a, + } } register_plugin!("apollo", "fleet_detector", FleetDetector);