Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(fortuna): implement metrics #1560

Merged
merged 30 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/fortuna/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/fortuna/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fortuna"
version = "5.2.4"
version = "5.3.0"
edition = "2021"

[dependencies]
Expand Down
91 changes: 46 additions & 45 deletions apps/fortuna/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,45 @@ mod revelation;

pub type ChainId = String;

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub struct RequestLabel {
pub value: String,
}

pub struct ApiMetrics {
pub http_requests: Family<RequestLabel, Counter>,
}

#[derive(Clone)]
pub struct ApiState {
pub chains: Arc<HashMap<ChainId, BlockchainState>>,

pub metrics_registry: Arc<RwLock<Registry>>,

/// Prometheus metrics
pub metrics: Arc<Metrics>,
pub metrics: Arc<ApiMetrics>,
}

impl ApiState {
pub fn new(chains: &[(ChainId, BlockchainState)]) -> ApiState {
let map: HashMap<ChainId, BlockchainState> = chains.into_iter().cloned().collect();
pub async fn new(
chains: HashMap<ChainId, BlockchainState>,
metrics_registry: Arc<RwLock<Registry>>,
) -> ApiState {
let metrics = ApiMetrics {
http_requests: Family::default(),
};

let http_requests = metrics.http_requests.clone();
metrics_registry.write().await.register(
"http_requests",
"Number of HTTP requests received",
http_requests,
);

ApiState {
chains: Arc::new(map),
metrics: Arc::new(Metrics::new()),
chains: Arc::new(chains),
metrics: Arc::new(metrics),
metrics_registry,
}
}
}
Expand All @@ -89,38 +114,6 @@ pub struct BlockchainState {
pub confirmed_block_status: BlockStatus,
}

pub struct Metrics {
pub registry: RwLock<Registry>,
// TODO: track useful metrics. this counter is just a placeholder to get things set up.
pub request_counter: Family<Label, Counter>,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub struct Label {
value: String,
}

impl Metrics {
pub fn new() -> Self {
let mut metrics_registry = Registry::default();
let http_requests = Family::<Label, Counter>::default();

// Register the metric family with the registry.
metrics_registry.register(
// With the metric name.
"http_requests",
// And the metric help text.
"Number of HTTP requests received",
http_requests.clone(),
);

Metrics {
registry: RwLock::new(metrics_registry),
request_counter: http_requests,
}
}
}

pub enum RestError {
/// The caller passed a sequence number that isn't within the supported range
InvalidSequenceNumber,
Expand Down Expand Up @@ -225,7 +218,12 @@ mod test {
},
ethers::prelude::Address,
lazy_static::lazy_static,
std::sync::Arc,
prometheus_client::registry::Registry,
std::{
collections::HashMap,
sync::Arc,
},
tokio::sync::RwLock,
};

const PROVIDER: Address = Address::zero();
Expand All @@ -243,7 +241,7 @@ mod test {
));
}

fn test_server() -> (TestServer, Arc<MockEntropyReader>, Arc<MockEntropyReader>) {
async fn test_server() -> (TestServer, Arc<MockEntropyReader>, Arc<MockEntropyReader>) {
let eth_read = Arc::new(MockEntropyReader::with_requests(10, &[]));

let eth_state = BlockchainState {
Expand All @@ -255,6 +253,8 @@ mod test {
confirmed_block_status: BlockStatus::Latest,
};

let metrics_registry = Arc::new(RwLock::new(Registry::default()));

let avax_read = Arc::new(MockEntropyReader::with_requests(10, &[]));

let avax_state = BlockchainState {
Expand All @@ -266,10 +266,11 @@ mod test {
confirmed_block_status: BlockStatus::Latest,
};

let api_state = ApiState::new(&[
("ethereum".into(), eth_state),
("avalanche".into(), avax_state),
]);
let mut chains = HashMap::new();
0xfirefist marked this conversation as resolved.
Show resolved Hide resolved
chains.insert("ethereum".into(), eth_state);
chains.insert("avalanche".into(), avax_state);

let api_state = ApiState::new(chains, metrics_registry).await;

let app = api::routes(api_state);
(TestServer::new(app).unwrap(), eth_read, avax_read)
Expand All @@ -287,7 +288,7 @@ mod test {

#[tokio::test]
async fn test_revelation() {
let (server, eth_contract, avax_contract) = test_server();
let (server, eth_contract, avax_contract) = test_server().await;

// Can't access a revelation if it hasn't been requested
get_and_assert_status(
Expand Down Expand Up @@ -416,7 +417,7 @@ mod test {

#[tokio::test]
async fn test_revelation_confirmation_delay() {
let (server, eth_contract, avax_contract) = test_server();
let (server, eth_contract, avax_contract) = test_server().await;

eth_contract.insert(PROVIDER, 0, 10, false);
eth_contract.insert(PROVIDER, 1, 11, false);
Expand Down
2 changes: 1 addition & 1 deletion apps/fortuna/src/api/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
};

pub async fn metrics(State(state): State<crate::api::ApiState>) -> impl IntoResponse {
let registry = state.metrics.registry.read().await;
let registry = state.metrics_registry.read().await;
let mut buffer = String::new();

// Should not fail if the metrics are valid and there is memory available
Expand Down
6 changes: 3 additions & 3 deletions apps/fortuna/src/api/revelation.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
crate::api::{
ChainId,
Label,
RequestLabel,
RestError,
},
anyhow::Result,
Expand Down Expand Up @@ -45,8 +45,8 @@ pub async fn revelation(
) -> Result<Json<GetRandomValueResponse>, RestError> {
state
.metrics
.request_counter
.get_or_create(&Label {
.http_requests
.get_or_create(&RequestLabel {
value: "/v1/chains/{chain_id}/revelations/{sequence}".to_string(),
})
.inc();
Expand Down
118 changes: 110 additions & 8 deletions apps/fortuna/src/command/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,52 @@ use {
Result,
},
axum::Router,
ethers::{
middleware::Middleware,
providers::{
Http,
Provider,
},
types::BlockNumber,
},
prometheus_client::{
encoding::EncodeLabelSet,
metrics::{
family::Family,
gauge::Gauge,
},
registry::Registry,
},
std::{
collections::HashMap,
net::SocketAddr,
sync::Arc,
time::{
Duration,
SystemTime,
UNIX_EPOCH,
},
},
tokio::{
spawn,
sync::watch,
sync::{
watch,
RwLock,
},
time,
},
tower_http::cors::CorsLayer,
utoipa::OpenApi,
utoipa_swagger_ui::SwaggerUi,
};

/// Track metrics in this interval
const TRACK_INTERVAL: Duration = Duration::from_secs(10);

pub async fn run_api(
socket_addr: SocketAddr,
chains: HashMap<String, api::BlockchainState>,
metrics_registry: Arc<RwLock<Registry>>,
mut rx_exit: watch::Receiver<bool>,
) -> Result<()> {
#[derive(OpenApi)]
Expand All @@ -63,11 +92,7 @@ pub async fn run_api(
)]
struct ApiDoc;

let metrics_registry = api::Metrics::new();
let api_state = api::ApiState {
chains: Arc::new(chains),
metrics: Arc::new(metrics_registry),
};
let api_state = api::ApiState::new(chains, metrics_registry).await;

// Initialize Axum Router. Note the type here is a `Router<State>` due to the use of the
// `with_state` method which replaces `Body` with `State` in the type signature.
Expand Down Expand Up @@ -101,6 +126,7 @@ pub async fn run_keeper(
chains: HashMap<String, api::BlockchainState>,
config: Config,
private_key: String,
metrics_registry: Arc<RwLock<Registry>>,
) -> Result<()> {
let mut handles = Vec::new();
for (chain_id, chain_config) in chains {
Expand All @@ -114,6 +140,7 @@ pub async fn run_keeper(
private_key,
chain_eth_config,
chain_config.clone(),
metrics_registry.clone(),
)));
}

Expand Down Expand Up @@ -218,11 +245,86 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
Ok::<(), Error>(())
});

let metrics_registry = Arc::new(RwLock::new(Registry::default()));

if let Some(keeper_private_key) = opts.load_keeper_private_key()? {
spawn(run_keeper(chains.clone(), config, keeper_private_key));
spawn(run_keeper(
chains.clone(),
config.clone(),
keeper_private_key,
metrics_registry.clone(),
));
}

run_api(opts.addr.clone(), chains, rx_exit).await?;
// Spawn a thread to track latest block lag. This helps us know if the rpc is up and updated with the latest block.
spawn(track_block_timestamp_lag(config, metrics_registry.clone()));

run_api(opts.addr.clone(), chains, metrics_registry, rx_exit).await?;

Ok(())
}


#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub struct ChainLabel {
pub chain_id: String,
}

/// Tracks the difference between the server timestamp and the latest block timestamp for each chain
pub async fn track_block_timestamp_lag(config: Config, metrics_registry: Arc<RwLock<Registry>>) {
let metrics = Family::<ChainLabel, Gauge>::default();
metrics_registry.write().await.register(
"block_timestamp_lag",
"The difference between server timestamp and latest block timestamp",
metrics.clone(),
);
loop {
for (chain_id, chain_config) in &config.chains {
let chain_id = chain_id.clone();
let chain_config = chain_config.clone();
let metrics = metrics.clone();

spawn(async move {
let chain_id = chain_id.clone();
let chain_config = chain_config.clone();

let provider = match Provider::<Http>::try_from(&chain_config.geth_rpc_addr) {
Ok(r) => r,
Err(e) => {
tracing::error!(
"Failed to create provider for chain id {} - {:?}",
&chain_id,
e
);
return;
}
};

match provider.get_block(BlockNumber::Latest).await {
Ok(b) => {
if let Some(block) = b {
let block_timestamp = block.timestamp;
let server_timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let lag: i64 =
(server_timestamp as i64) - (block_timestamp.as_u64() as i64);

metrics
.get_or_create(&ChainLabel {
chain_id: chain_id.clone(),
})
.set(lag);
}
}
Err(e) => {
tracing::error!("Failed to get block for chain id {} - {:?}", &chain_id, e);
}
};
});
}

time::sleep(TRACK_INTERVAL).await;
}
}
Loading
Loading