diff --git a/broker/src/crypto.rs b/broker/src/crypto.rs index c9468148..598f53e9 100644 --- a/broker/src/crypto.rs +++ b/broker/src/crypto.rs @@ -1,4 +1,4 @@ -use std::{future::Future, mem::discriminant}; +use std::{future::Future, mem::discriminant, sync::Arc}; use axum::http::{header, method, uri::Scheme, Method, Request, StatusCode, Uri}; use serde::{Deserialize, Serialize}; @@ -6,15 +6,15 @@ use shared::{ async_trait, config, crypto::{parse_crl, CertificateCache, CertificateCacheUpdate, GetCerts}, errors::SamplyBeamError, http_client::{self, SamplyHttpClient}, openssl::x509::X509Crl, reqwest::{self, Url} }; use std::time::Duration; -use tokio::time::timeout; +use tokio::{sync::RwLock, time::timeout}; use tracing::{debug, error, warn, info}; -use crate::health::{self, VaultStatus}; +use crate::health::{self, Health, VaultStatus}; pub struct GetCertsFromPki { pki_realm: String, hyper_client: SamplyHttpClient, - health_report_sender: tokio::sync::watch::Sender, + health: Arc>, } #[derive(Debug, Deserialize, Clone, Hash)] @@ -35,7 +35,7 @@ struct PkiListResponse { impl GetCertsFromPki { pub(crate) fn new( - health_report_sender: tokio::sync::watch::Sender, + health: Arc>, ) -> Result { let mut certs: Vec = Vec::new(); if let Some(dir) = &config::CONFIG_CENTRAL.tls_ca_certificates_dir { @@ -61,19 +61,12 @@ impl GetCertsFromPki { Ok(Self { pki_realm, hyper_client, - health_report_sender, + health, }) } async fn report_vault_health(&self, status: VaultStatus) { - self.health_report_sender.send_if_modified(|val| { - if discriminant(val) != discriminant(&status) { - *val = status; - true - } else { - false - } - }); + self.health.write().await.vault = status; } pub(crate) async fn check_vault_health(&self) -> Result<(), SamplyBeamError> { @@ -261,12 +254,6 @@ impl GetCerts for GetCertsFromPki { } } -pub(crate) fn build_cert_getter( - sender: tokio::sync::watch::Sender, -) -> Result { - GetCertsFromPki::new(sender) -} - -pub(crate) fn pki_url_builder(location: &str) -> Url { +fn pki_url_builder(location: &str) -> Url { config::CONFIG_CENTRAL.pki_address.join(&format!("/v1/{location}")).unwrap() } diff --git a/broker/src/health.rs b/broker/src/health.rs index b9c115e5..c28e46c3 100644 --- a/broker/src/health.rs +++ b/broker/src/health.rs @@ -19,36 +19,27 @@ impl Default for Verdict { } } -#[derive(Serialize, Clone, Copy)] +#[derive(Debug, Serialize, Clone, Copy, Default)] #[serde(rename_all = "lowercase")] pub enum VaultStatus { Ok, + #[default] Unknown, OtherError, LockedOrSealed, Unreachable, } -impl Default for VaultStatus { - fn default() -> Self { - VaultStatus::Unknown - } -} - -#[derive(Serialize, Clone, Copy)] +#[derive(Debug, Serialize, Clone, Copy, Default)] #[serde(rename_all = "lowercase")] pub enum InitStatus { + #[default] Unknown, FetchingIntermediateCert, Done } -impl Default for InitStatus { - fn default() -> Self { - InitStatus::Unknown - } -} - +#[derive(Debug, Default)] pub struct Health { pub vault: VaultStatus, pub initstatus: InitStatus, @@ -92,61 +83,3 @@ impl ProxyStatus { ProxyStatus { last_connect: SystemTime::now(), connections: 1, last_disconnect: None } } } - -pub struct Senders { - pub vault: tokio::sync::watch::Sender, - pub init: tokio::sync::watch::Sender, -} - -impl Health { - pub fn make() -> (Senders, Arc>) { - let health = Health { - vault: VaultStatus::default(), - initstatus: InitStatus::default(), - proxies: HashMap::default() - }; - let (vault_tx, mut vault_rx) = tokio::sync::watch::channel(VaultStatus::default()); - let (init_tx, mut init_rx) = tokio::sync::watch::channel(InitStatus::default()); - let health = Arc::new(RwLock::new(health)); - let health2 = health.clone(); - let health3 = health.clone(); - - let vault_watcher = async move { - while vault_rx.changed().await.is_ok() { - let new_val = vault_rx.borrow().clone(); - let mut health = health2.write().await; - health.vault = new_val; - match &health.vault { - VaultStatus::Ok => info!("Vault connection is now healthy"), - x => warn!( - "Vault connection is degraded: {}", - serde_json::to_string(x).unwrap_or_default() - ), - } - } - }; - - tokio::task::spawn(vault_watcher); - let initstatus_watcher = async move { - while init_rx.changed().await.is_ok() { - let new_val = init_rx.borrow().clone(); - let mut health = health3.write().await; - health.initstatus = new_val; - match &health.initstatus { - InitStatus::Done => { - info!("Initialization is now complete"); - return; - }, - x => warn!( - "Still initializing: {}", - serde_json::to_string(x).unwrap_or_default() - ), - } - } - }; - tokio::task::spawn(initstatus_watcher); - - let senders = Senders { vault: vault_tx, init: init_tx }; - (senders, health) - } -} diff --git a/broker/src/main.rs b/broker/src/main.rs index 9ad591c4..4fe6d51f 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -14,9 +14,11 @@ mod compare_client_server_version; use std::{collections::HashMap, sync::Arc, time::Duration}; -use health::{Senders, InitStatus}; +use crypto::GetCertsFromPki; +use health::{Health, InitStatus}; +use once_cell::sync::Lazy; use shared::{config::CONFIG_CENTRAL, *, errors::SamplyBeamError}; -use tokio::sync::{RwLock, watch}; +use tokio::sync::RwLock; use tracing::{error, info, warn}; #[tokio::main] @@ -24,25 +26,27 @@ pub async fn main() -> anyhow::Result<()> { shared::logger::init_logger()?; banner::print_banner(); - let (Senders { init: init_status_sender, vault: vault_status_sender}, health) = health::Health::make(); - let cert_getter = crypto::build_cert_getter(vault_status_sender)?; + let health = Arc::new(RwLock::new(Health::default())); + let cert_getter = GetCertsFromPki::new(health.clone())?; shared::crypto::init_cert_getter(cert_getter); - tokio::task::spawn(init_broker_ca_chain(init_status_sender)); + tokio::task::spawn(init_broker_ca_chain(health.clone())); #[cfg(debug_assertions)] if shared::examples::print_example_objects() { return Ok(()); } - let _ = config::CONFIG_CENTRAL.bind_addr; // Initialize config + Lazy::force(&config::CONFIG_CENTRAL); // Initialize config serve::serve(health).await?; Ok(()) } -async fn init_broker_ca_chain(sender: watch::Sender) { - sender.send_replace(health::InitStatus::FetchingIntermediateCert); +async fn init_broker_ca_chain(health: Arc>) { + { + health.write().await.initstatus = health::InitStatus::FetchingIntermediateCert + } shared::crypto::init_ca_chain().await.expect("Failed to init broker ca chain"); - sender.send_replace(health::InitStatus::Done); + health.write().await.initstatus = health::InitStatus::Done; }