From 1534e97d07c9366812f5bc7d2fd35646a2be44e3 Mon Sep 17 00:00:00 2001 From: Karl Knutsson Date: Fri, 23 Aug 2024 10:51:46 +0200 Subject: [PATCH] Add metrics module for prometheus counters Example: blockperf_header_delta 140.0 blockperf_block_req_delta 0.0 blockperf_block_rsp_delta 10.0 blockperf_block_adopt_delta 50.0 blockperf_block_delay 200.0 blockperf_block_no 1.0740607e+07 blockperf_valid_samples_total 24.0 blockperf_valid_samples_created 1.7244026992243912e+09 blockperf_invalid_samples_total 0.0 blockperf_invalid_samples_created 1.7244026992244196e+09 --- README.md | 7 ++-- pyproject.toml | 1 + src/blockperf/app.py | 30 ++++++++++++++-- src/blockperf/blocksample.py | 4 +-- src/blockperf/metrics.py | 66 ++++++++++++++++++++++++++++++++++++ 5 files changed, 101 insertions(+), 7 deletions(-) create mode 100644 src/blockperf/metrics.py diff --git a/README.md b/README.md index 7b2b520..bec7b62 100644 --- a/README.md +++ b/README.md @@ -92,6 +92,9 @@ BLOCKPERF_RELAY_PUBLIC_PORT="3001" # If you do not want to share certain ips of your setup, list them here and # blockperf will not send these out to the broker. BLOCKPERF_MASKED_ADDRESSES="x.x.x.x,x.x.x.x" + +# Optional: Specify a port number for promtheus metrics server, Defalts to disabled +BLOCKPERF_METRICS_PORT="8082" ``` @@ -161,10 +164,10 @@ need to reload systemd and start the service. ```bash sudo systemctl daemon-reloadsudo systemctl daemon-reload -sudo systemctl start blockperf +sudo systemctl start blockperf ``` -Inspect logs with +Inspect logs with ```bash journalctl -fu blockperf diff --git a/pyproject.toml b/pyproject.toml index 61d9799..e6d8e68 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,7 @@ classifiers = [ dependencies = [ "paho-mqtt==1.6.1", "psutil==5.9.6", + "prometheus-client==0.20.0", ] [project.optional-dependencies] # Optional diff --git a/src/blockperf/app.py b/src/blockperf/app.py index 5316324..48b30eb 100644 --- a/src/blockperf/app.py +++ b/src/blockperf/app.py @@ -12,6 +12,7 @@ from blockperf import __version__ as blockperf_version from blockperf.blocksample import BlockSample, slot_time_of from blockperf.config import AppConfig +from blockperf.metrics import Metrics from blockperf.mqtt import MQTTClient from blockperf.nodelogs import LogEvent, LogEventKind @@ -23,6 +24,7 @@ class App: node_config: dict mqtt_client: MQTTClient start_time: int + metrics: Metrics # holds a dictionairy for each kind of events for each block_hash logevents: dict = {} @@ -35,6 +37,7 @@ def __init__(self, config: AppConfig) -> None: self.q: queue.Queue = queue.Queue(maxsize=50) self.app_config = config self.start_time = int(datetime.now().timestamp()) + self.metrics = Metrics() def run(self): """Runs the App by creating the mqtt client and two threads. @@ -126,7 +129,15 @@ def ensure_maxblocks(self): * logevents holds all events recorded for all hashes seen. * published_blocks holds hashes of all published blocks. - LogEvents hashes eventuallnetworkize, the hashes that are added first will + LogEvents hashes eventually get adopted (or not). But this may + take some time. I want to wait for some time (config.max_concurrent_blocks) + before i drop that hash. + + Samples for blocks that already have a sample published should not get + republished. Thus the list of published_blocks. + + To not have both lists grow indefinetly i use the deque in self.working_hashes. + Once it reaches a certain size, the hashes that are added first will get popped of and delete from the other two lists. """ if len(self.working_hashes) > self.app_config.max_concurrent_blocks: @@ -168,7 +179,6 @@ def run_blocksample_loop(self): Once that is the case a new sample is created by collecting all events and instanciating BlockSample(). If the sample is complete it published. - The """ for event in self.logevents_logfile(): @@ -232,10 +242,24 @@ def run_blocksample_loop(self): # Check values are in acceptable ranges if not new_sample.is_sane(): - logger.info("Insane values for block \n%s\n", new_sample) + logger.debug("Insane values for sample %s", new_sample) + self.metrics.inc("invalid_samples") continue logger.info("Sample for %s created", _block_hash_short) + self.metrics.set("header_delta", new_sample.header_delta) + self.metrics.set("block_request_delta", new_sample.block_request_delta) + self.metrics.set("block_response_delta", new_sample.block_response_delta) + self.metrics.set("block_adopt_delta", new_sample.block_adopt_delta) + self.metrics.set( + "block_delay", + new_sample.header_delta + + new_sample.block_request_delta + + new_sample.block_response_delta + + new_sample.block_adopt_delta, + ) + self.metrics.set("block_no", new_sample.block_num) + self.metrics.inc("valid_samples") # The sample is ready to be published, create the payload for mqtt, # determine the topic and publish that sample diff --git a/src/blockperf/blocksample.py b/src/blockperf/blocksample.py index c2e77cd..469bbf6 100644 --- a/src/blockperf/blocksample.py +++ b/src/blockperf/blocksample.py @@ -25,7 +25,7 @@ def slot_time_of(slot_num: int, network: int) -> datetime: """Calculate the timestamp that given slot should have occured. Works only if the networks slots are 1 second lenghts! """ - logger.info("slot_time_of(%s, %s", slot_num, network) + logger.debug("slot_time_of(%s, %s", slot_num, network) if network not in NETWORK_STARTTIMES: raise ValueError(f"No starttime for {network} available") @@ -113,7 +113,7 @@ def first_trace_header(self) -> Union[LogEvent, None]: """Returnms first TRACE_DOWNLOADED_HEADER received""" for event in self.trace_events: if event.kind == LogEventKind.TRACE_DOWNLOADED_HEADER: - logger.info("Found first TraceHeader %s", event.atstr) + logger.debug("Found first TraceHeader %s", event.atstr) return event return None diff --git a/src/blockperf/metrics.py b/src/blockperf/metrics.py new file mode 100644 index 0000000..aaaddab --- /dev/null +++ b/src/blockperf/metrics.py @@ -0,0 +1,66 @@ +import logging +import os + +from prometheus_client import Counter, Gauge, start_http_server + +logger = logging.getLogger(__name__) + + +class Metrics: + enabled: bool = False + header_delta: Gauge = None + block_request_delta: Gauge = None + block_response_delta: Gauge = None + block_adopt_delta: Gauge = None + block_delay: Gauge = None + block_no: Gauge = None + valid_samples: Counter = None + invalid_samples: Counter = None + + def __init__(self): + port = os.getenv("BLOCKPERF_METRICS_PORT", None) + # If not given or not a number, dont setup anything + if not port or not port.isdigit(): + return + port = int(port) + self.enabled = True + self.header_delta = Gauge( + "blockperf_header_delta", + "time from when a block was forged until received (ms)", + ) + self.block_request_delta = Gauge( + "blockperf_block_req_delta", + "time between the header was received until the block request was sent (ms)", + ) + self.block_response_delta = Gauge( + "blockperf_block_rsp_delta", + "time between the block request was sent until the block responce was received (ms)", + ) + self.block_adopt_delta = Gauge( + "blockperf_block_adopt_delta", "time for adopting the block (ms)" + ) + self.block_delay = Gauge("blockperf_block_delay", "Total block delay (ms)") + self.block_no = Gauge("blockperf_block_no", "Block number of latest sample") + self.valid_samples = Counter( + "blockperf_valid_samples", "valid samples collected" + ) + self.invalid_samples = Counter( + "blockperf_invalid_samples", "invalid samples discarded" + ) + start_http_server(port) + + def set(self, metric, value): + """Calls set() on given metric with given value""" + if not self.enabled: + return + logger.info("set %s to %s", metric, value) + prom_metric = getattr(self, metric) + prom_metric.set(value) + + def inc(self, metric): + """Calls inc() on given metric""" + if not self.enabled: + return + logger.info("inc %s", metric) + prom_metric = getattr(self, metric) + prom_metric.inc()