Skip to content

Commit

Permalink
Add metrics module for prometheus counters
Browse files Browse the repository at this point in the history
Example:

blockperf_header_delta 140.0
blockperf_block_req_delta 0.0
blockperf_block_rsp_delta 10.0
blockperf_block_adopt_delta 50.0
blockperf_block_delay 200.0
blockperf_block_no 1.0740607e+07
blockperf_valid_samples_total 24.0
blockperf_valid_samples_created 1.7244026992243912e+09
blockperf_invalid_samples_total 0.0
blockperf_invalid_samples_created 1.7244026992244196e+09
  • Loading branch information
karknu authored and oversize committed Aug 28, 2024
1 parent 8576d5a commit 1534e97
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 7 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ BLOCKPERF_RELAY_PUBLIC_PORT="3001"
# If you do not want to share certain ips of your setup, list them here and
# blockperf will not send these out to the broker.
BLOCKPERF_MASKED_ADDRESSES="x.x.x.x,x.x.x.x"

# Optional: Specify a port number for promtheus metrics server, Defalts to disabled
BLOCKPERF_METRICS_PORT="8082"
```


Expand Down Expand Up @@ -161,10 +164,10 @@ need to reload systemd and start the service.

```bash
sudo systemctl daemon-reloadsudo systemctl daemon-reload
sudo systemctl start blockperf
sudo systemctl start blockperf
```

Inspect logs with
Inspect logs with

```bash
journalctl -fu blockperf
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ classifiers = [
dependencies = [
"paho-mqtt==1.6.1",
"psutil==5.9.6",
"prometheus-client==0.20.0",
]

[project.optional-dependencies] # Optional
Expand Down
30 changes: 27 additions & 3 deletions src/blockperf/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from blockperf import __version__ as blockperf_version
from blockperf.blocksample import BlockSample, slot_time_of
from blockperf.config import AppConfig
from blockperf.metrics import Metrics
from blockperf.mqtt import MQTTClient
from blockperf.nodelogs import LogEvent, LogEventKind

Expand All @@ -23,6 +24,7 @@ class App:
node_config: dict
mqtt_client: MQTTClient
start_time: int
metrics: Metrics

# holds a dictionairy for each kind of events for each block_hash
logevents: dict = {}
Expand All @@ -35,6 +37,7 @@ def __init__(self, config: AppConfig) -> None:
self.q: queue.Queue = queue.Queue(maxsize=50)
self.app_config = config
self.start_time = int(datetime.now().timestamp())
self.metrics = Metrics()

def run(self):
"""Runs the App by creating the mqtt client and two threads.
Expand Down Expand Up @@ -126,7 +129,15 @@ def ensure_maxblocks(self):
* logevents holds all events recorded for all hashes seen.
* published_blocks holds hashes of all published blocks.
LogEvents hashes eventuallnetworkize, the hashes that are added first will
LogEvents hashes eventually get adopted (or not). But this may
take some time. I want to wait for some time (config.max_concurrent_blocks)
before i drop that hash.
Samples for blocks that already have a sample published should not get
republished. Thus the list of published_blocks.
To not have both lists grow indefinetly i use the deque in self.working_hashes.
Once it reaches a certain size, the hashes that are added first will
get popped of and delete from the other two lists.
"""
if len(self.working_hashes) > self.app_config.max_concurrent_blocks:
Expand Down Expand Up @@ -168,7 +179,6 @@ def run_blocksample_loop(self):
Once that is the case a new sample is created by collecting all events
and instanciating BlockSample(). If the sample is complete it published.
The
"""

for event in self.logevents_logfile():
Expand Down Expand Up @@ -232,10 +242,24 @@ def run_blocksample_loop(self):

# Check values are in acceptable ranges
if not new_sample.is_sane():
logger.info("Insane values for block \n%s\n", new_sample)
logger.debug("Insane values for sample %s", new_sample)
self.metrics.inc("invalid_samples")
continue

logger.info("Sample for %s created", _block_hash_short)
self.metrics.set("header_delta", new_sample.header_delta)
self.metrics.set("block_request_delta", new_sample.block_request_delta)
self.metrics.set("block_response_delta", new_sample.block_response_delta)
self.metrics.set("block_adopt_delta", new_sample.block_adopt_delta)
self.metrics.set(
"block_delay",
new_sample.header_delta
+ new_sample.block_request_delta
+ new_sample.block_response_delta
+ new_sample.block_adopt_delta,
)
self.metrics.set("block_no", new_sample.block_num)
self.metrics.inc("valid_samples")

# The sample is ready to be published, create the payload for mqtt,
# determine the topic and publish that sample
Expand Down
4 changes: 2 additions & 2 deletions src/blockperf/blocksample.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def slot_time_of(slot_num: int, network: int) -> datetime:
"""Calculate the timestamp that given slot should have occured.
Works only if the networks slots are 1 second lenghts!
"""
logger.info("slot_time_of(%s, %s", slot_num, network)
logger.debug("slot_time_of(%s, %s", slot_num, network)
if network not in NETWORK_STARTTIMES:
raise ValueError(f"No starttime for {network} available")

Expand Down Expand Up @@ -113,7 +113,7 @@ def first_trace_header(self) -> Union[LogEvent, None]:
"""Returnms first TRACE_DOWNLOADED_HEADER received"""
for event in self.trace_events:
if event.kind == LogEventKind.TRACE_DOWNLOADED_HEADER:
logger.info("Found first TraceHeader %s", event.atstr)
logger.debug("Found first TraceHeader %s", event.atstr)
return event
return None

Expand Down
66 changes: 66 additions & 0 deletions src/blockperf/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import logging
import os

from prometheus_client import Counter, Gauge, start_http_server

logger = logging.getLogger(__name__)


class Metrics:
enabled: bool = False
header_delta: Gauge = None
block_request_delta: Gauge = None
block_response_delta: Gauge = None
block_adopt_delta: Gauge = None
block_delay: Gauge = None
block_no: Gauge = None
valid_samples: Counter = None
invalid_samples: Counter = None

def __init__(self):
port = os.getenv("BLOCKPERF_METRICS_PORT", None)
# If not given or not a number, dont setup anything
if not port or not port.isdigit():
return
port = int(port)
self.enabled = True
self.header_delta = Gauge(
"blockperf_header_delta",
"time from when a block was forged until received (ms)",
)
self.block_request_delta = Gauge(
"blockperf_block_req_delta",
"time between the header was received until the block request was sent (ms)",
)
self.block_response_delta = Gauge(
"blockperf_block_rsp_delta",
"time between the block request was sent until the block responce was received (ms)",
)
self.block_adopt_delta = Gauge(
"blockperf_block_adopt_delta", "time for adopting the block (ms)"
)
self.block_delay = Gauge("blockperf_block_delay", "Total block delay (ms)")
self.block_no = Gauge("blockperf_block_no", "Block number of latest sample")
self.valid_samples = Counter(
"blockperf_valid_samples", "valid samples collected"
)
self.invalid_samples = Counter(
"blockperf_invalid_samples", "invalid samples discarded"
)
start_http_server(port)

def set(self, metric, value):
"""Calls set() on given metric with given value"""
if not self.enabled:
return
logger.info("set %s to %s", metric, value)
prom_metric = getattr(self, metric)
prom_metric.set(value)

def inc(self, metric):
"""Calls inc() on given metric"""
if not self.enabled:
return
logger.info("inc %s", metric)
prom_metric = getattr(self, metric)
prom_metric.inc()

0 comments on commit 1534e97

Please sign in to comment.