Skip to content

Commit

Permalink
FLEET-19 Use correct metrics form
Browse files Browse the repository at this point in the history
Also improve how we handle refreshing the System
object, rather than doing it in either the callback
or the async task, contain that within a struct
and do it there.
  • Loading branch information
jonathanrainer committed Nov 1, 2024
1 parent 0762c5d commit 492ab28
Showing 1 changed file with 106 additions and 19 deletions.
125 changes: 106 additions & 19 deletions apollo-router/src/plugins/fleet_detector.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,112 @@
use std::env::consts::ARCH;
use std::sync::atomic;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;

use opentelemetry::metrics::MeterProvider;
use opentelemetry_api::metrics::ObservableGauge;
use opentelemetry_api::metrics::Unit;
use schemars::JsonSchema;
use serde::Deserialize;
use sysinfo::System;
use tokio::task::JoinHandle;
use tower::BoxError;
use tracing::debug;
use tracing::info;

use crate::metrics::meter_provider;
use crate::plugin::Plugin;
use crate::plugin::PluginInit;

const REFRESH_INTERVAL: Duration = Duration::from_secs(60);

#[derive(Debug, Default, Deserialize, JsonSchema)]
struct Conf {}

#[derive(Debug)]
struct SystemGetter {
system: System,
start: Instant,
// Duration in milliseconds since the start time, used because Instant is not atomic
// and multiple threads may try to access the system
refresh_at: AtomicU64,
}

impl SystemGetter {
fn new() -> Self {
let mut system = System::new();
system.refresh_all();
Self {
system,
start: Instant::now(),
refresh_at: AtomicU64::new(REFRESH_INTERVAL.as_millis() as u64),
}
}

fn get_system(&mut self) -> &System {
let refresh_at = Duration::from_millis(self.refresh_at.load(atomic::Ordering::Relaxed));
if self.start.elapsed() < refresh_at {
&self.system
} else {
self.system.refresh_cpu_all();
self.system.refresh_memory();
self.refresh_at = ((refresh_at + REFRESH_INTERVAL).as_millis() as u64).into();
&self.system
}
}
}

#[derive(Debug)]
struct FleetDetector {
handle: JoinHandle<()>,
#[allow(dead_code)]
// We have to store a reference to the gauge otherwise it will be dropped once the plugin is
// initialised, even though it still has data to emit
freq_gauge: ObservableGauge<u64>,
}

#[derive(Debug, Default, Deserialize, JsonSchema)]
struct Conf {}

#[async_trait::async_trait]
impl Plugin for FleetDetector {
type Config = Conf;

async fn new(_: PluginInit<Self::Config>) -> Result<Self, BoxError> {
debug!("beginning environment detection");
debug!("spawning continuous detector task");
let handle = tokio::task::spawn(async {
let mut sys = System::new_all();
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
debug!("beginning environment detection, spawning gauges");
let system_getter = Arc::new(Mutex::new(SystemGetter::new()));
let meter = meter_provider().meter("apollo/router");

let gauge_local_system_getter = system_getter.clone();
let freq_gauge = meter
.u64_observable_gauge("apollo.router.instance.cpu_freq")
.with_description(
"The CPU frequency of the underlying instance the router is deployed to",
)
.with_unit(Unit::new("Mhz"))
.with_callback(move |i| {
let mut system_getter = gauge_local_system_getter.lock().unwrap();
let system = system_getter.get_system();
let cpus = system.cpus();
let cpu_freq =
cpus.iter().map(|cpu| cpu.frequency()).sum::<u64>() / cpus.len() as u64;
i.observe(cpu_freq, &[])
})
.init();

debug!("establishing metrics emission task");
let counter_local_system_getter = system_getter.clone();
let handle = tokio::task::spawn(async move {
let mut interval = tokio::time::interval(REFRESH_INTERVAL);
loop {
interval.tick().await;
sys.refresh_cpu_all();
sys.refresh_memory();
detect_cpu_values(&sys);
detect_memory_values(&sys);
let mut system_getter = counter_local_system_getter.lock().unwrap();
let system = system_getter.get_system();
detect_cpu_values(system);
detect_memory_values(system);
}
});

Ok(FleetDetector { handle })
Ok(FleetDetector { handle, freq_gauge })
}
}

Expand All @@ -47,11 +117,13 @@ impl Drop for FleetDetector {
}

fn detect_cpu_values(system: &System) {
let cpus = system.cpus();
let cpu_count = detect_cpu_count(system);
let cpu_freq = cpus.iter().map(|cpu| cpu.frequency()).sum::<u64>() / cpus.len() as u64;
info!(value.apollo.router.instance.cpu_freq = cpu_freq);
info!(counter.apollo.router.instance.cpu_count = cpu_count);
u64_counter!(
"apollo.router.instance.cpu_count",
"The number of CPUs reported by the instance the router is running on",
cpu_count,
host.arch = get_otel_arch()
);
}

#[cfg(not(target_os = "linux"))]
Expand Down Expand Up @@ -120,7 +192,22 @@ fn detect_cpu_count(system: &System) -> u64 {
}

fn detect_memory_values(system: &System) {
info!(counter.apollo.router.instance.total_memory = system.total_memory())
u64_counter!(
"apollo.router.instance.total_memory",
"The amount of memory reported by the instance the router is running on",
system.total_memory()
);
}

fn get_otel_arch() -> &'static str {
match ARCH {
"x86_64" => "amd64",
"aarch64" => "arm64",
"arm" => "arm32",
"powerpc" => "ppc32",
"powerpc64" => "ppc64",
a => a,
}
}

register_plugin!("apollo", "fleet_detector", FleetDetector);

0 comments on commit 492ab28

Please sign in to comment.