Skip to content
This repository has been archived by the owner on Sep 13, 2022. It is now read-only.

Commit

Permalink
feat: supported storage metrics (#307)
Browse files Browse the repository at this point in the history
  • Loading branch information
homura committed Jun 3, 2020
1 parent 1237354 commit 2531b8d
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 23 deletions.
10 changes: 7 additions & 3 deletions common/apm/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ pub mod api;
pub mod consensus;
pub mod mempool;
pub mod network;
pub mod storage;

pub use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec};
pub use prometheus::{
CounterVec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
};

use derive_more::Display;
use prometheus::{
exponential_buckets, register_histogram, register_histogram_vec, register_int_counter,
register_int_counter_vec, register_int_gauge, register_int_gauge_vec, Encoder, TextEncoder,
exponential_buckets, register_counter_vec, register_histogram, register_histogram_vec,
register_int_counter, register_int_counter_vec, register_int_gauge, register_int_gauge_vec,
Encoder, TextEncoder,
};
use prometheus_static_metric::{auto_flush_from, make_auto_flush_static_metric};
use protocol::{ProtocolError, ProtocolErrorKind, ProtocolResult};
Expand Down
8 changes: 7 additions & 1 deletion common/apm/src/metrics/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,15 @@ lazy_static! {
"The counter for sync blocks from remote"
)
.unwrap();
pub static ref ENGINE_SYNC_BLOCK_HISTOGRAM: Histogram = register_histogram!(
"muta_consensus_sync_block_duration",
"Histogram of consensus sync duration",
exponential_buckets(0.5, 1.2, 20).expect("consensus duration time exponential")
)
.unwrap();
pub static ref ENGINE_CONSENSUS_COST_TIME: Histogram = register_histogram!(
"muta_consensus_duration_seconds",
"Consensus duration from last block",
"Histogram of consensus duration from last block",
exponential_buckets(1.0, 1.2, 15).expect("consensus duration time exponential")
)
.unwrap();
Expand Down
34 changes: 26 additions & 8 deletions common/apm/src/metrics/network.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use lazy_static::lazy_static;

use crate::metrics::{
auto_flush_from, exponential_buckets, make_auto_flush_static_metric, register_histogram_vec,
register_int_counter_vec, register_int_gauge, register_int_gauge_vec, HistogramVec,
IntCounterVec, IntGauge, IntGaugeVec,
};

use lazy_static::lazy_static;

make_auto_flush_static_metric! {
pub label_enum MessageDirection {
sent,
Expand All @@ -21,6 +21,12 @@ make_auto_flush_static_metric! {
timeout,
}

pub label_enum MessageTaret {
single,
multi,
all
}

pub struct MessageCounterVec: LocalIntCounter {
"direction" => MessageDirection,
}
Expand All @@ -38,7 +44,7 @@ lazy_static! {
pub static ref NETWORK_MESSAGE_COUNT_VEC: IntCounterVec = register_int_counter_vec!(
"muta_network_message_total",
"Total number of network message",
&["direction", "type", "module", "action"]
&["direction", "target", "type", "module", "action"]
)
.expect("network message total");
pub static ref NETWORK_RPC_RESULT_COUNT_VEC: IntCounterVec = register_int_counter_vec!(
Expand Down Expand Up @@ -79,21 +85,33 @@ lazy_static! {
.expect("network ip pending data size");
}

fn on_network_message(direction: &str, url: &str) {
fn on_network_message(direction: &str, target: &str, url: &str, inc: i64) {
let spliced: Vec<&str> = url.split('/').collect();
if spliced.len() < 4 {
return;
}

let network_type = spliced[1];
let module = spliced[2];
let action = spliced[3];

NETWORK_MESSAGE_COUNT_VEC
.with_label_values(&[direction, spliced[1], spliced[2], spliced[3]])
.inc();
.with_label_values(&[direction, target, network_type, module, action])
.inc_by(inc);
}

pub fn on_network_message_sent_all_target(url: &str) {
on_network_message("sent", "all", url, 1)
}

pub fn on_network_message_sent_multi_target(url: &str, target_count: i64) {
on_network_message("sent", "single", url, target_count);
}

pub fn on_network_message_sent(url: &str) {
on_network_message("sent", url);
on_network_message("sent", "single", url, 1);
}

pub fn on_network_message_received(url: &str) {
on_network_message("received", url);
on_network_message("received", "single", url, 1);
}
127 changes: 127 additions & 0 deletions common/apm/src/metrics/storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use std::time::Duration;

use lazy_static::lazy_static;
use protocol::traits::StorageCategory;

use crate::metrics::{
auto_flush_from, duration_to_sec, make_auto_flush_static_metric, register_counter_vec,
register_int_counter_vec, CounterVec, IntCounterVec,
};

make_auto_flush_static_metric! {
pub label_enum COLUMN_FAMILY_TYPES {
block,
receipt,
signed_tx,
wal,
hash_height,
}

pub struct StoragePutCfTimeUsageVec: LocalCounter {
"cf" => COLUMN_FAMILY_TYPES
}

pub struct StoragePutCfBytesVec: LocalIntCounter {
"cf" => COLUMN_FAMILY_TYPES
}

pub struct StorageGetCfTimeUsageVec: LocalCounter {
"cf" => COLUMN_FAMILY_TYPES
}

pub struct StorageGetCfTotalVec: LocalIntCounter {
"cf" => COLUMN_FAMILY_TYPES
}
}

lazy_static! {
pub static ref STORAGE_PUT_CF_TIME_USAGE_VEC: CounterVec = register_counter_vec!(
"muta_storage_put_cf_seconds",
"Storage put_cf time usage",
&["cf"]
)
.unwrap();
pub static ref STORAGE_PUT_CF_BYTES_COUNTER_VEC: IntCounterVec = register_int_counter_vec!(
"muta_storage_put_cf_bytes",
"Storage total insert bytes",
&["cf"]
)
.unwrap();
pub static ref STORAGE_GET_CF_TIME_USAGE_VEC: CounterVec = register_counter_vec!(
"muta_storage_get_cf_seconds",
"Storage get_cf time usage",
&["cf"]
)
.unwrap();
pub static ref STORAGE_GET_CF_COUNTER_VEC: IntCounterVec = register_int_counter_vec!(
"muta_storage_get_cf_total",
"Storage total get_cf keys number",
&["cf"]
)
.unwrap();
}

lazy_static! {
pub static ref STORAGE_PUT_CF_TIME_USAGE: StoragePutCfTimeUsageVec =
auto_flush_from!(STORAGE_PUT_CF_TIME_USAGE_VEC, StoragePutCfTimeUsageVec);
pub static ref STORAGE_PUT_CF_BYTES_COUNTER: StoragePutCfBytesVec =
auto_flush_from!(STORAGE_PUT_CF_BYTES_COUNTER_VEC, StoragePutCfBytesVec);
pub static ref STORAGE_GET_CF_TIME_USAGE: StorageGetCfTimeUsageVec =
auto_flush_from!(STORAGE_GET_CF_TIME_USAGE_VEC, StorageGetCfTimeUsageVec);
pub static ref STORAGE_GET_CF_COUNTER: StorageGetCfTotalVec =
auto_flush_from!(STORAGE_GET_CF_COUNTER_VEC, StorageGetCfTotalVec);
}

pub fn on_storage_get_cf(sc: StorageCategory, duration: Duration, keys: i64) {
let seconds = duration_to_sec(duration);

match sc {
StorageCategory::Block => {
STORAGE_GET_CF_TIME_USAGE.block.inc_by(seconds);
STORAGE_GET_CF_COUNTER.block.inc_by(keys);
}
StorageCategory::Receipt => {
STORAGE_GET_CF_TIME_USAGE.receipt.inc_by(seconds);
STORAGE_GET_CF_COUNTER.receipt.inc_by(keys);
}
StorageCategory::Wal => {
STORAGE_GET_CF_TIME_USAGE.wal.inc_by(seconds);
STORAGE_GET_CF_COUNTER.wal.inc_by(keys);
}
StorageCategory::SignedTransaction => {
STORAGE_GET_CF_TIME_USAGE.signed_tx.inc_by(seconds);
STORAGE_GET_CF_COUNTER.signed_tx.inc_by(keys);
}
StorageCategory::HashHeight => {
STORAGE_GET_CF_TIME_USAGE.hash_height.inc_by(seconds);
STORAGE_GET_CF_COUNTER.hash_height.inc_by(keys);
}
}
}

pub fn on_storage_put_cf(sc: StorageCategory, duration: Duration, size: i64) {
let seconds = duration_to_sec(duration);

match sc {
StorageCategory::Block => {
STORAGE_PUT_CF_TIME_USAGE.block.inc_by(seconds);
STORAGE_PUT_CF_BYTES_COUNTER.block.inc_by(size);
}
StorageCategory::Receipt => {
STORAGE_PUT_CF_TIME_USAGE.receipt.inc_by(seconds);
STORAGE_PUT_CF_BYTES_COUNTER.receipt.inc_by(size);
}
StorageCategory::Wal => {
STORAGE_PUT_CF_TIME_USAGE.wal.inc_by(seconds);
STORAGE_PUT_CF_BYTES_COUNTER.wal.inc_by(size);
}
StorageCategory::SignedTransaction => {
STORAGE_PUT_CF_TIME_USAGE.signed_tx.inc_by(seconds);
STORAGE_PUT_CF_BYTES_COUNTER.signed_tx.inc_by(size);
}
StorageCategory::HashHeight => {
STORAGE_PUT_CF_TIME_USAGE.hash_height.inc_by(seconds);
STORAGE_PUT_CF_BYTES_COUNTER.hash_height.inc_by(size);
}
}
}
1 change: 0 additions & 1 deletion core/consensus/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,6 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill> for ConsensusEngine<
common_apm::metrics::consensus::ENGINE_CONSENSUS_COST_TIME.observe(elapsed / 1e3);
let mut last_commit_time = self.last_commit_time.write();
*last_commit_time = now;
// pill.block.header.timestamp
Ok(status)
}

Expand Down
3 changes: 3 additions & 0 deletions core/consensus/src/synchronization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl<Adapter: SynchronizationAdapter> Synchronization for OverlordSynchronizatio
logs = "{'remote_height': 'remote_height'}"
)]
async fn receive_remote_block(&self, ctx: Context, remote_height: u64) -> ProtocolResult<()> {
let inst = Instant::now();
let syncing_lock = self.syncing.try_lock();
if syncing_lock.is_none() {
return Ok(());
Expand Down Expand Up @@ -93,6 +94,8 @@ impl<Adapter: SynchronizationAdapter> Synchronization for OverlordSynchronizatio

common_apm::metrics::consensus::ENGINE_SYNC_BLOCK_COUNTER
.inc_by((remote_height - current_height) as i64);
common_apm::metrics::consensus::ENGINE_SYNC_BLOCK_HISTOGRAM
.observe(common_apm::metrics::duration_to_sec(inst.elapsed()));

self.status.replace(sync_status.clone());
self.adapter.update_status(
Expand Down
5 changes: 3 additions & 2 deletions core/network/src/outbound/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ where
headers.set_span_id(state.span_id());
log::info!("no trace id found for gossip {}", endpoint.full_url());
}
common_apm::metrics::network::on_network_message_sent(endpoint.full_url());
let net_msg = NetworkMessage::new(endpoint, data, headers)
.encode()
.await?;
Expand Down Expand Up @@ -85,7 +84,7 @@ where
{
let msg = self.package_message(cx.clone(), end, msg).await?;
self.send(cx, TargetSession::All, msg, p)?;

common_apm::metrics::network::on_network_message_sent_all_target(end);
Ok(())
}

Expand All @@ -101,7 +100,9 @@ where
M: MessageCodec,
{
let msg = self.package_message(cx.clone(), end, msg).await?;
let user_count = users.len();
self.users_send(cx, users, msg, p).await?;
common_apm::metrics::network::on_network_message_sent_multi_target(end, user_count as i64);

Ok(())
}
Expand Down
24 changes: 17 additions & 7 deletions core/storage/src/adapter/rocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ use std::error::Error;
use std::marker::PhantomData;
use std::path::Path;
use std::sync::Arc;
use std::time::Instant;

use async_trait::async_trait;
use derive_more::{Display, From};
use rocksdb::{ColumnFamily, DBIterator, Options, WriteBatch, DB};

use async_trait::async_trait;

use common_apm::metrics::storage::on_storage_put_cf;
use protocol::codec::ProtocolCodecSync;
use protocol::traits::{
IntoIteratorByRef, StorageAdapter, StorageBatchModify, StorageCategory, StorageIterator,
Expand Down Expand Up @@ -105,16 +108,16 @@ impl<'c, S: StorageSchema, P: AsRef<[u8]>> IntoIteratorByRef<S> for RocksIntoIte

#[async_trait]
impl StorageAdapter for RocksAdapter {
async fn insert<S: StorageSchema>(
&self,
key: <S as StorageSchema>::Key,
val: <S as StorageSchema>::Value,
) -> ProtocolResult<()> {
async fn insert<S: StorageSchema>(&self, key: S::Key, val: S::Value) -> ProtocolResult<()> {
let inst = Instant::now();

let column = get_column::<S>(&self.db)?;
let key = key.encode_sync()?.to_vec();
let val = val.encode_sync()?.to_vec();
let size = val.len() as i64;

db!(self.db, put_cf, column, key, val)?;
on_storage_put_cf(S::category(), inst.elapsed(), size);

Ok(())
}
Expand Down Expand Up @@ -182,13 +185,20 @@ impl StorageAdapter for RocksAdapter {
}

let mut batch = WriteBatch::default();
let mut insert_size = 0usize;
let inst = Instant::now();
for (key, value) in pairs.into_iter() {
match value {
Some(value) => batch.put_cf(column, key, value),
Some(value) => {
insert_size += value.len();
batch.put_cf(column, key, value)
}
None => batch.delete_cf(column, key),
}
}

on_storage_put_cf(S::category(), inst.elapsed(), insert_size as i64);

self.db.write(batch).map_err(RocksAdapterError::from)?;
Ok(())
}
Expand Down
Loading

0 comments on commit 2531b8d

Please sign in to comment.