Skip to content

Commit

Permalink
Merge pull request #25 from cardano-foundation/develop
Browse files Browse the repository at this point in the history
  • Loading branch information
oversize authored Aug 28, 2024
2 parents ad75468 + 1534e97 commit 3497385
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 30 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ BLOCKPERF_RELAY_PUBLIC_PORT="3001"
# If you do not want to share certain ips of your setup, list them here and
# blockperf will not send these out to the broker.
BLOCKPERF_MASKED_ADDRESSES="x.x.x.x,x.x.x.x"

# Optional: Specify a port number for promtheus metrics server, Defalts to disabled
BLOCKPERF_METRICS_PORT="8082"
```


Expand Down Expand Up @@ -161,10 +164,10 @@ need to reload systemd and start the service.

```bash
sudo systemctl daemon-reloadsudo systemctl daemon-reload
sudo systemctl start blockperf
sudo systemctl start blockperf
```

Inspect logs with
Inspect logs with

```bash
journalctl -fu blockperf
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ classifiers = [
dependencies = [
"paho-mqtt==1.6.1",
"psutil==5.9.6",
"prometheus-client==0.20.0",
]

[project.optional-dependencies] # Optional
Expand Down
34 changes: 24 additions & 10 deletions src/blockperf/app.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
import sys
import collections
from datetime import datetime, timezone, timedelta
import json
import logging
import queue
import os
import queue
import sys
import threading
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path

from blockperf import __version__ as blockperf_version
from blockperf.config import AppConfig

from blockperf.blocksample import BlockSample, slot_time_of
from blockperf.nodelogs import LogEventKind, LogEvent
from blockperf.config import AppConfig
from blockperf.metrics import Metrics
from blockperf.mqtt import MQTTClient

from blockperf.nodelogs import LogEvent, LogEventKind

logger = logging.getLogger(__name__)

Expand All @@ -25,6 +24,7 @@ class App:
node_config: dict
mqtt_client: MQTTClient
start_time: int
metrics: Metrics

# holds a dictionairy for each kind of events for each block_hash
logevents: dict = {}
Expand All @@ -37,6 +37,7 @@ def __init__(self, config: AppConfig) -> None:
self.q: queue.Queue = queue.Queue(maxsize=50)
self.app_config = config
self.start_time = int(datetime.now().timestamp())
self.metrics = Metrics()

def run(self):
"""Runs the App by creating the mqtt client and two threads.
Expand Down Expand Up @@ -178,7 +179,6 @@ def run_blocksample_loop(self):
Once that is the case a new sample is created by collecting all events
and instanciating BlockSample(). If the sample is complete it published.
The
"""

for event in self.logevents_logfile():
Expand Down Expand Up @@ -233,7 +233,7 @@ def run_blocksample_loop(self):
for event_kind_list in self.logevents[_block_hash].values():
all_events.extend(event_kind_list)

new_sample = BlockSample(all_events)
new_sample = BlockSample(all_events, self.app_config.network_magic)

# Check BlockSample has all needed Events to produce sample
if not new_sample.is_complete():
Expand All @@ -243,9 +243,23 @@ def run_blocksample_loop(self):
# Check values are in acceptable ranges
if not new_sample.is_sane():
logger.debug("Insane values for sample %s", new_sample)
self.metrics.inc("invalid_samples")
continue

logger.info("Sample for %s created", _block_hash_short)
self.metrics.set("header_delta", new_sample.header_delta)
self.metrics.set("block_request_delta", new_sample.block_request_delta)
self.metrics.set("block_response_delta", new_sample.block_response_delta)
self.metrics.set("block_adopt_delta", new_sample.block_adopt_delta)
self.metrics.set(
"block_delay",
new_sample.header_delta
+ new_sample.block_request_delta
+ new_sample.block_response_delta
+ new_sample.block_adopt_delta,
)
self.metrics.set("block_no", new_sample.block_num)
self.metrics.inc("valid_samples")

# The sample is ready to be published, create the payload for mqtt,
# determine the topic and publish that sample
Expand Down Expand Up @@ -299,7 +313,7 @@ def slot_is_too_old(self, logevents: list) -> bool:

# If there is one, check its slot_time
trace_header = trace_headers.pop(0)
slot_time = slot_time_of(trace_header.slot_num)
slot_time = slot_time_of(trace_header.slot_num, self.app_config.network_magic)
if slot_time < datetime.now(tz=timezone.utc) - timedelta(hours=12):
logger.info(
"Slot %s is too old (%s)",
Expand Down
36 changes: 26 additions & 10 deletions src/blockperf/blocksample.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,31 @@
# import json
# import sys
# from enum import Enum
from typing import Union
from datetime import datetime, timezone
import logging
from datetime import datetime, timezone
from typing import Union

from blockperf import __version__ as blockperf_version

# from blockperf.config import AppConfig
from blockperf.nodelogs import LogEventKind, LogEvent
from blockperf.nodelogs import LogEvent, LogEventKind

# logging.basicConfig(level=logging.DEBUG, format="(%(threadName)-9s) %(message)s")
logger = logging.getLogger(__name__)


NETWORK_STARTTIMES = {
"mainnet": 1591566291,
"preview": 1655683200,
"preprod": 1660003200,
# mainnet
764824073: 1591566291,
# preprod
1: 1655683200,
}


def slot_time_of(slot_num: int, network: str = "mainnet") -> datetime:
def slot_time_of(slot_num: int, network: int) -> datetime:
"""Calculate the timestamp that given slot should have occured.
Works only if the networks slots are 1 second lenghts!
"""
logger.debug("slot_time_of(%s, %s", slot_num, network)
if network not in NETWORK_STARTTIMES:
raise ValueError(f"No starttime for {network} available")

Expand Down Expand Up @@ -76,10 +77,24 @@ class BlockSample:

trace_events: list = []

def __init__(self, events: list) -> None:
def __init__(self, events: list, network_magic: int) -> None:
"""Creates LogEvent and orders the events by at field"""
events.sort(key=lambda x: x.at)
self.trace_events = events
self.network_magic = network_magic

def __str__(self):
""" """
return (
f"block_num: {self.block_num} \n"
f"slot_num {self.slot_num} \n"
f"block_hash len: {len(self.block_hash)} \n"
f"block_size: {self.block_size} \n"
f"header_delta {self.header_delta} \n"
f"block_request_delta {self.block_request_delta}\n"
f"block_response_delta {self.block_response_delta}\n"
f"block_adopt_delta {self.block_adopt_delta} \n"
)

def is_complete(self) -> bool:
"""Determines if all needed LogEvents are in this sample"""
Expand All @@ -98,6 +113,7 @@ def first_trace_header(self) -> Union[LogEvent, None]:
"""Returnms first TRACE_DOWNLOADED_HEADER received"""
for event in self.trace_events:
if event.kind == LogEventKind.TRACE_DOWNLOADED_HEADER:
logger.debug("Found first TraceHeader %s", event.atstr)
return event
return None

Expand Down Expand Up @@ -156,7 +172,7 @@ def slot_num(self) -> int:
@property
def slot_time(self) -> datetime:
"""Determine the time that current slot_num should have happened."""
_slot_time = slot_time_of(self.slot_num)
_slot_time = slot_time_of(self.slot_num, self.network_magic)
return _slot_time

@property
Expand Down
6 changes: 4 additions & 2 deletions src/blockperf/cli.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""CLI Entrypoint for blockperf"""
import logging
from logging.config import dictConfig

import argparse
import logging
import sys
from logging.config import dictConfig

import psutil

from blockperf.app import App
Expand Down
3 changes: 2 additions & 1 deletion src/blockperf/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
App Configuration is done either via Environment variables or the stdlib
configparser module.
"""

import ipaddress
import json
import logging
import os
import sys
import logging
from configparser import ConfigParser
from pathlib import Path
from typing import Union
Expand Down
66 changes: 66 additions & 0 deletions src/blockperf/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import logging
import os

from prometheus_client import Counter, Gauge, start_http_server

logger = logging.getLogger(__name__)


class Metrics:
enabled: bool = False
header_delta: Gauge = None
block_request_delta: Gauge = None
block_response_delta: Gauge = None
block_adopt_delta: Gauge = None
block_delay: Gauge = None
block_no: Gauge = None
valid_samples: Counter = None
invalid_samples: Counter = None

def __init__(self):
port = os.getenv("BLOCKPERF_METRICS_PORT", None)
# If not given or not a number, dont setup anything
if not port or not port.isdigit():
return
port = int(port)
self.enabled = True
self.header_delta = Gauge(
"blockperf_header_delta",
"time from when a block was forged until received (ms)",
)
self.block_request_delta = Gauge(
"blockperf_block_req_delta",
"time between the header was received until the block request was sent (ms)",
)
self.block_response_delta = Gauge(
"blockperf_block_rsp_delta",
"time between the block request was sent until the block responce was received (ms)",
)
self.block_adopt_delta = Gauge(
"blockperf_block_adopt_delta", "time for adopting the block (ms)"
)
self.block_delay = Gauge("blockperf_block_delay", "Total block delay (ms)")
self.block_no = Gauge("blockperf_block_no", "Block number of latest sample")
self.valid_samples = Counter(
"blockperf_valid_samples", "valid samples collected"
)
self.invalid_samples = Counter(
"blockperf_invalid_samples", "invalid samples discarded"
)
start_http_server(port)

def set(self, metric, value):
"""Calls set() on given metric with given value"""
if not self.enabled:
return
logger.info("set %s to %s", metric, value)
prom_metric = getattr(self, metric)
prom_metric.set(value)

def inc(self, metric):
"""Calls inc() on given metric"""
if not self.enabled:
return
logger.info("inc %s", metric)
prom_metric = getattr(self, metric)
prom_metric.inc()
8 changes: 5 additions & 3 deletions src/blockperf/mqtt.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
"""MQTT Client
"""
import sys
import logging

import json
import logging
import sys

from paho.mqtt.client import MQTTMessageInfo
from paho.mqtt.properties import Properties as Properties

try:
import paho.mqtt.client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
from paho.mqtt.properties import Properties
except ImportError:
sys.exit(
"This script needs paho-mqtt package.\n"
Expand Down
4 changes: 2 additions & 2 deletions src/blockperf/nodelogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
"""

import json
import logging
import sys
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Union
from datetime import datetime, timezone, timedelta
import logging

logger = logging.getLogger(__name__)

Expand Down

0 comments on commit 3497385

Please sign in to comment.