Skip to content

Commit

Permalink
refactor(health): dont use channels to update state
Browse files Browse the repository at this point in the history
  • Loading branch information
Threated committed Jan 22, 2025
1 parent 1bc153d commit ba6bbae
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 102 deletions.
29 changes: 8 additions & 21 deletions broker/src/crypto.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
use std::{future::Future, mem::discriminant};
use std::{future::Future, mem::discriminant, sync::Arc};

use axum::http::{header, method, uri::Scheme, Method, Request, StatusCode, Uri};
use serde::{Deserialize, Serialize};
use shared::{
async_trait, config, crypto::{parse_crl, CertificateCache, CertificateCacheUpdate, GetCerts}, errors::SamplyBeamError, http_client::{self, SamplyHttpClient}, openssl::x509::X509Crl, reqwest::{self, Url}
};
use std::time::Duration;
use tokio::time::timeout;
use tokio::{sync::RwLock, time::timeout};
use tracing::{debug, error, warn, info};

use crate::health::{self, VaultStatus};
use crate::health::{self, Health, VaultStatus};

pub struct GetCertsFromPki {
pki_realm: String,
hyper_client: SamplyHttpClient,
health_report_sender: tokio::sync::watch::Sender<health::VaultStatus>,
health: Arc<RwLock<Health>>,
}

#[derive(Debug, Deserialize, Clone, Hash)]
Expand All @@ -35,7 +35,7 @@ struct PkiListResponse {

impl GetCertsFromPki {
pub(crate) fn new(
health_report_sender: tokio::sync::watch::Sender<health::VaultStatus>,
health: Arc<RwLock<Health>>,
) -> Result<Self, SamplyBeamError> {
let mut certs: Vec<String> = Vec::new();
if let Some(dir) = &config::CONFIG_CENTRAL.tls_ca_certificates_dir {
Expand All @@ -61,19 +61,12 @@ impl GetCertsFromPki {
Ok(Self {
pki_realm,
hyper_client,
health_report_sender,
health,
})
}

async fn report_vault_health(&self, status: VaultStatus) {
self.health_report_sender.send_if_modified(|val| {
if discriminant(val) != discriminant(&status) {
*val = status;
true
} else {
false
}
});
self.health.write().await.vault = status;
}

pub(crate) async fn check_vault_health(&self) -> Result<(), SamplyBeamError> {
Expand Down Expand Up @@ -261,12 +254,6 @@ impl GetCerts for GetCertsFromPki {
}
}

pub(crate) fn build_cert_getter(
sender: tokio::sync::watch::Sender<VaultStatus>,
) -> Result<GetCertsFromPki, SamplyBeamError> {
GetCertsFromPki::new(sender)
}

pub(crate) fn pki_url_builder(location: &str) -> Url {
fn pki_url_builder(location: &str) -> Url {
config::CONFIG_CENTRAL.pki_address.join(&format!("/v1/{location}")).unwrap()
}
77 changes: 5 additions & 72 deletions broker/src/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,27 @@ impl Default for Verdict {
}
}

#[derive(Serialize, Clone, Copy)]
#[derive(Debug, Serialize, Clone, Copy, Default)]
#[serde(rename_all = "lowercase")]
pub enum VaultStatus {
Ok,
#[default]
Unknown,
OtherError,
LockedOrSealed,
Unreachable,
}

impl Default for VaultStatus {
fn default() -> Self {
VaultStatus::Unknown
}
}

#[derive(Serialize, Clone, Copy)]
#[derive(Debug, Serialize, Clone, Copy, Default)]
#[serde(rename_all = "lowercase")]
pub enum InitStatus {
#[default]
Unknown,
FetchingIntermediateCert,
Done
}

impl Default for InitStatus {
fn default() -> Self {
InitStatus::Unknown
}
}

#[derive(Debug, Default)]
pub struct Health {
pub vault: VaultStatus,
pub initstatus: InitStatus,
Expand Down Expand Up @@ -92,61 +83,3 @@ impl ProxyStatus {
ProxyStatus { last_connect: SystemTime::now(), connections: 1, last_disconnect: None }
}
}

pub struct Senders {
pub vault: tokio::sync::watch::Sender<VaultStatus>,
pub init: tokio::sync::watch::Sender<InitStatus>,
}

impl Health {
pub fn make() -> (Senders, Arc<RwLock<Self>>) {
let health = Health {
vault: VaultStatus::default(),
initstatus: InitStatus::default(),
proxies: HashMap::default()
};
let (vault_tx, mut vault_rx) = tokio::sync::watch::channel(VaultStatus::default());
let (init_tx, mut init_rx) = tokio::sync::watch::channel(InitStatus::default());
let health = Arc::new(RwLock::new(health));
let health2 = health.clone();
let health3 = health.clone();

let vault_watcher = async move {
while vault_rx.changed().await.is_ok() {
let new_val = vault_rx.borrow().clone();
let mut health = health2.write().await;
health.vault = new_val;
match &health.vault {
VaultStatus::Ok => info!("Vault connection is now healthy"),
x => warn!(
"Vault connection is degraded: {}",
serde_json::to_string(x).unwrap_or_default()
),
}
}
};

tokio::task::spawn(vault_watcher);
let initstatus_watcher = async move {
while init_rx.changed().await.is_ok() {
let new_val = init_rx.borrow().clone();
let mut health = health3.write().await;
health.initstatus = new_val;
match &health.initstatus {
InitStatus::Done => {
info!("Initialization is now complete");
return;
},
x => warn!(
"Still initializing: {}",
serde_json::to_string(x).unwrap_or_default()
),
}
}
};
tokio::task::spawn(initstatus_watcher);

let senders = Senders { vault: vault_tx, init: init_tx };
(senders, health)
}
}
22 changes: 13 additions & 9 deletions broker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,39 @@ mod compare_client_server_version;

use std::{collections::HashMap, sync::Arc, time::Duration};

use health::{Senders, InitStatus};
use crypto::GetCertsFromPki;
use health::{Health, InitStatus};
use once_cell::sync::Lazy;
use shared::{config::CONFIG_CENTRAL, *, errors::SamplyBeamError};
use tokio::sync::{RwLock, watch};
use tokio::sync::RwLock;
use tracing::{error, info, warn};

#[tokio::main]
pub async fn main() -> anyhow::Result<()> {
shared::logger::init_logger()?;
banner::print_banner();

let (Senders { init: init_status_sender, vault: vault_status_sender}, health) = health::Health::make();
let cert_getter = crypto::build_cert_getter(vault_status_sender)?;
let health = Arc::new(RwLock::new(Health::default()));
let cert_getter = GetCertsFromPki::new(health.clone())?;

shared::crypto::init_cert_getter(cert_getter);
tokio::task::spawn(init_broker_ca_chain(init_status_sender));
tokio::task::spawn(init_broker_ca_chain(health.clone()));
#[cfg(debug_assertions)]
if shared::examples::print_example_objects() {
return Ok(());
}

let _ = config::CONFIG_CENTRAL.bind_addr; // Initialize config
Lazy::force(&config::CONFIG_CENTRAL); // Initialize config

serve::serve(health).await?;

Ok(())
}

async fn init_broker_ca_chain(sender: watch::Sender<InitStatus>) {
sender.send_replace(health::InitStatus::FetchingIntermediateCert);
async fn init_broker_ca_chain(health: Arc<RwLock<Health>>) {
{
health.write().await.initstatus = health::InitStatus::FetchingIntermediateCert
}
shared::crypto::init_ca_chain().await.expect("Failed to init broker ca chain");
sender.send_replace(health::InitStatus::Done);
health.write().await.initstatus = health::InitStatus::Done;
}

0 comments on commit ba6bbae

Please sign in to comment.