Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opmon protobuf microservice #115

Merged
merged 40 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
3888fe5
Initial file for the opmon microservice
mroda88 Jul 8, 2024
e6cd1a9
initial draft for testing
mroda88 Jul 10, 2024
8546ed7
Running state for tests
mroda88 Jul 10, 2024
4142862
correct creation of the json structure
mroda88 Jul 10, 2024
4b97499
a bit of cleanup
mroda88 Jul 10, 2024
1851d77
Great progress in the microservice
mroda88 Jul 10, 2024
dc6b778
Some clenup
mroda88 Jul 11, 2024
085bff1
Passing the client to the consumer
mroda88 Jul 11, 2024
8faa41a
Tentative entry file
mroda88 Jul 11, 2024
c479da7
Draft of yaml file
mroda88 Jul 11, 2024
b08dccd
Update dependencies
mroda88 Jul 11, 2024
964a0ff
Implement suggestions from Pat
mroda88 Jul 12, 2024
83a1b6c
Correct dependencies
mroda88 Jul 12, 2024
2bf2a74
Merge remote-tracking branch 'origin/develop' into mroda/opmon_protobuf
mroda88 Jul 12, 2024
4139322
fix typo
mroda88 Jul 12, 2024
0f41c4f
Buildable dependency
mroda88 Jul 15, 2024
3b34b28
Remove debug printouts
mroda88 Jul 15, 2024
0ac1f09
add the right image for documentation
mroda88 Jul 15, 2024
e3c1561
black format and update in the servers
mroda88 Jul 15, 2024
eb62cf5
Fix dbnmae in entrypoint
mroda88 Jul 16, 2024
1f157b5
Add username and password
mroda88 Jul 16, 2024
8e53ef8
Balck format
mroda88 Jul 16, 2024
4d538ae
Correct inputs
mroda88 Jul 16, 2024
fb53755
Add documentation
mroda88 Jul 16, 2024
a914582
Correct indentation
mroda88 Jul 16, 2024
962b73d
add user pass for influx
mroda88 Jul 24, 2024
3fcf1b3
Update dependency, ers version updade. The change is not related to a…
mroda88 Jul 24, 2024
d750aca
fix mistake
mroda88 Jul 24, 2024
d420767
Fix ordering of variables
mroda88 Jul 24, 2024
f6441bb
Better entry point formatting
mroda88 Jul 25, 2024
ea464b5
Better matchLabels
mroda88 Jul 25, 2024
8fe1664
Correct name in deployment
mroda88 Jul 25, 2024
84d1d15
Fix type
mroda88 Jul 25, 2024
26147ef
Correct use of logging
mroda88 Jul 25, 2024
483ca78
Correct queue timeout handling
mroda88 Jul 25, 2024
0781f62
Adding influx exception
mroda88 Jul 25, 2024
4f8a2b3
Better exception handling
mroda88 Jul 25, 2024
18366aa
Testing dict approach
mroda88 Jul 25, 2024
65091f7
Cleanup
mroda88 Jul 25, 2024
e24f34e
Final decoration
mroda88 Jul 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions dockerfiles/microservices-dependencies.dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
FROM cern/alma9-base

ARG ERSVERSION=v1.5.1 # For issue.proto from ers
ARG ERSVERSION=v1.5.2 # For issue.proto from ers
ARG ERSKAFKAVERSION=v1.5.4 # For ERSSubscriber.py from erskafka
ARG OPMONLIBVERSION=v2.0.0 # For opmon_entry.proto from opmonlib
ARG KAFKAOPMONVERSION=v2.0.0 # For OpMonSubscriber.py from kafkaopmon

ARG LOCALPYDIR=/microservices_python

RUN yum clean all \
Expand Down Expand Up @@ -30,10 +33,15 @@ RUN curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v24.3
unzip protoc-24.3-linux-x86_64.zip && \
curl -O https://raw.githubusercontent.com/DUNE-DAQ/ers/$ERSVERSION/schema/ers/issue.proto && \
mkdir -p $LOCALPYDIR/ers && \
protoc --python_out=$LOCALPYDIR/ers issue.proto
protoc --python_out=$LOCALPYDIR/ers issue.proto && \
curl -O https://raw.githubusercontent.com/DUNE-DAQ/opmonlib/$OPMONLIBVERSION/schema/opmonlib/opmon_entry.proto && \
mkdir -p $LOCALPYDIR/opmonlib && \
protoc --python_out=$LOCALPYDIR/opmonlib -I/ -I/include opmon_entry.proto

RUN mkdir -p $LOCALPYDIR/erskafka && \
curl https://raw.githubusercontent.com/DUNE-DAQ/erskafka/$ERSKAFKAVERSION/python/erskafka/ERSSubscriber.py -o $LOCALPYDIR/erskafka/ERSSubscriber.py
curl https://raw.githubusercontent.com/DUNE-DAQ/erskafka/$ERSKAFKAVERSION/python/erskafka/ERSSubscriber.py -o $LOCALPYDIR/erskafka/ERSSubscriber.py && \
mkdir -p $LOCALPYDIR/kafkaopmon && \
curl https://raw.githubusercontent.com/DUNE-DAQ/kafkaopmon/$KAFKAOPMONVERSION/python/kafkaopmon/OpMonSubscriber.py -o $LOCALPYDIR/kafkaopmon/OpMonSubscriber.py

ENV PYTHONPATH=$LOCALPYDIR:$PYTHONPATH

Expand Down
2 changes: 1 addition & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ docker run --rm -e MICROSERVICE=<name of microservice> ghcr.io/dune-daq/microser
```

There are a couple of points to note:
* The value of MICROSERVICE should be the name of a given microservice's subdirectory in this repo. As of Oct-6-2023, the available subdirectories are: `config-service`, `elisa-logbook`, `ers-dbwriter`, `ers-protobuf-dbwriter`, `opmon-dbwriter`, `runnumber-rest` and `runregistry-rest`.
* The value of MICROSERVICE should be the name of a given microservice's subdirectory in this repo. As of Jul-16-2024, the available subdirectories are: `config-service`, `elisa-logbook`, `ers-dbwriter`, `ers-protobuf-dbwriter`, `opmon-dbwriter` (now deprecated), `opmon-protobuf-dbwriter`, `runnumber-rest` and `runregistry-rest`.
* Most microservices require additional environment variables to be set, which can be passed using the usual docker syntax: `-e VARIABLE_NAME=<variable value>`
* If you don't know what these additional environment variables are, you can just run the `docker` command as above without setting them; the container will exit out almost immediately but only after telling you what variables are missing
* The microservices image tag will be `microservices:<name-of-branch>` or `microservices:<version-tag>`, i.e. `microservices:develop`.
Expand Down
15 changes: 15 additions & 0 deletions docs/README_opmon-protobuf-dbwriter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
`dbwriter.py` is the script responsible for taking the opmon messages via the OpMonSubscriber
and writing to an InfluxDB database so that the data can be displayed in a
grafana dashboard. To run it manually do:
```python dbwriter.py [options]```


# Running locally
The script can be run locally which can be useful to debug or start up quickly. After setting up a working area and cloning this repo, run:
```
python3 dbwriter.py
```
Passing the appropriate variables.
As this script requires opmonlibs and kafkaopmon, it has to be launched by a developing envirnoment.
It can run at the same time locally and in kubernetes.

244 changes: 244 additions & 0 deletions opmon-protobuf-dbwriter/dbwriter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
# @file dbwriter.py Writing Opmon entries into InfluxDB
# This is part of the DUNE DAQ software, copyright 2020.
# Licensing/copyright details are in the COPYING file that you should have
# received with this code.
#

import kafkaopmon.OpMonSubscriber as opmon_sub
import google.protobuf.json_format as pb_json
from google.protobuf.timestamp_pb2 import Timestamp
import opmonlib.opmon_entry_pb2 as opmon_schema

from influxdb import InfluxDBClient
import influxdb
from functools import partial
import json
import click
import logging
import queue
import threading


CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])


@click.command(context_settings=CONTEXT_SETTINGS)
# subscriber options
@click.option(
"--subscriber-bootstrap",
type=click.STRING,
default="monkafka.cern.ch:30092",
help="boostrap server and port of the OpMonSubscriber",
)
@click.option(
"--subscriber-group",
type=click.STRING,
default=None,
help="group ID of the OpMonSubscriber",
)
@click.option(
"--subscriber-timeout",
type=click.INT,
default=500,
help="timeout in ms used in the OpMonSubscriber",
)
@click.option(
"--subscriber-topic",
type=click.STRING,
multiple=True,
default=["opmon_stream"],
help='The system will add the "monitoring." prefix',
)

# influx options
@click.option(
"--influxdb-address",
type=click.STRING,
default="monkafka.cern.ch",
help="address of the influx db",
)
@click.option(
"--influxdb-port", type=click.INT, default=31002, help="port of the influxdb"
)
@click.option(
"--influxdb-name",
type=click.STRING,
default="test_influx",
help="Table name destination inside influxdb",
)
@click.option(
"--influxdb-create",
type=click.BOOL,
default=True,
help="Creates the influxdb if it does not exists",
)
@click.option(
"--influxdb-timeout",
type=click.INT,
default=500,
help="Size in ms of the batches sent to influx",
)
@click.option(
"--influxdb-username",
type=click.STRING,
default=None,
help="Username to acces influxdb",
)
@click.option(
"--influxdb-password",
type=click.STRING,
default=None,
help="Password to acces influxdb",
)
@click.option("--debug", type=click.BOOL, default=True, help="Set debug print levels")
def cli(
subscriber_bootstrap,
subscriber_group,
subscriber_timeout,
subscriber_topic,
influxdb_address,
influxdb_port,
influxdb_name,
influxdb_create,
influxdb_timeout,
influxdb_username,
influxdb_password,
debug,
):

logging.basicConfig(
format="%(asctime)s %(levelname)-8s %(message)s",
level=logging.DEBUG if debug else logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
)

kwargs = dict()
if influxdb_username:
kwargs["username"] = influxdb_username
if influxdb_password:
kwargs["password"] = influxdb_password
influx = InfluxDBClient(host=influxdb_address, port=influxdb_port, **kwargs)
db_list = influx.get_list_database()
logging.info("Available DBs: %s", db_list)
if {"name": influxdb_name} not in db_list:
logging.warning("%s DB not available", influxdb_name )
if influxdb_create:
influx.create_database(influxdb_name)
logging.info("New list of DBs: %s", influx.get_list_database())

influx.switch_database(influxdb_name)

sub = opmon_sub.OpMonSubscriber(
bootstrap=subscriber_bootstrap,
topics=subscriber_topic,
group_id=subscriber_group,
timeout_ms=subscriber_timeout,
)

# this is a list of Entries
q = queue.Queue()

callback_function = partial(process_entry, q=q)

sub.add_callback(name="to_influx", function=callback_function)

thread = threading.Thread(
target=consume, daemon=True, args=(q, influxdb_timeout, influx)
)
thread.start()

sub.start()


def consume(q: queue.Queue, timeout_ms, influx: InfluxDBClient = None):
logging.info("Starting consumer thread")
batch = []
batch_ms = 0
while True:
try:
entry = q.get(timeout=1) ## timeout here is in seconds

if entry.ms - batch_ms < timeout_ms:
# because of the if, batch_ms is not zero.
batch.append(entry.json)
batch_ms = min(batch_ms, entry.ms)

if entry.ms - batch_ms >= timeout_ms:
# note that if we are facing with a late arrival, i.e. entry.ms was smaller than batch_ms, the difference is 0, so this if is skipped
# i.e. there is not double insertion
send_batch(batch)
batch = [entry.json]
batch_ms = entry.ms

except queue.Empty:
logging.debug("Queue is empty")
jcpunk marked this conversation as resolved.
Show resolved Hide resolved
send_batch(batch, influx)
batch = []
batch_ms = 0


def send_batch(batch: list, influx: InfluxDBClient = None):
if len(batch) > 0:
logging.info("Sending %s points", len(batch))
if influx:
try:
influx.write_points(batch)
except influxdb.exceptions.InfluxDBClientError as e:
logging.error(e)
except Exception as e:
logging.error("Something went wrong: json batch not sent")
logging.error("Details: {}".format(str(e)))
else:
print(batch)


def process_entry(entry: opmon_schema.OpMonEntry, q: queue.Queue):
d = to_dict(entry)
e = Entry(json=d, ms=entry.time.ToMilliseconds())
q.put(e)


def to_dict(entry: opmon_schema.OpMonEntry) -> dict:
ret = dict(measurement=entry.measurement)
ret["fields"] = unpack_payload(entry)
ret["tags"] = create_tags(entry)
ret["time"] = entry.time.ToJsonString()
return ret


def unpack_payload(entry: opmon_schema.OpMonEntry) -> dict:
data = entry.data
ret = dict()
for key in data:
value = data[key]
casted_value = getattr(value, value.WhichOneof("kind"))
ret[key] = casted_value

return ret


def create_tags(entry: opmon_schema.OpMonEntry) -> dict:
opmon_id = entry.origin
# session and application
tags = dict(session=opmon_id.session, application=opmon_id.application)

# element and subelements
struct = opmon_id.substructure
for i in range(len(struct)):
name = "sub" * i + "element"
tags[name] = struct[i]

# custom origin
tags |= entry.custom_origin

return tags


class Entry:
def __init__(self, json: dict, ms: int):
self.json = json
self.ms = ms


if __name__ == "__main__":
cli()
16 changes: 16 additions & 0 deletions opmon-protobuf-dbwriter/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

cd $(dirname $0)
source ../entrypoint_functions.sh

ensure_required_variables "OPMON_DBWRITER_KAFKA_BOOTSTRAP_SERVER OPMON_DBWRITER_KAFKA_GROUP OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS OPMON_DBWRITER_TOPIC OPMON_DBWRITER_INFLUX_HOST OPMON_DBWRITER_INFLUX_PORT OPMON_DBWRITER_TABLE OPMON_DBWRITER_BATCH_SIZE_MS OPMON_DBWRITER_INFLUX_USER OPMON_DBWRITER_INFLUX_PASSWORD"

python3 ./dbwriter.py --subscriber-bootstrap $OPMON_DBWRITER_KAFKA_BOOTSTRAP_SERVER \
--subscriber-group $OPMON_DBWRITER_KAFKA_GROUP --subscriber-timeout $OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS \
--subscriber-topic $OPMON_DBWRITER_TOPIC \
--influxdb-address $OPMON_DBWRITER_INFLUX_HOST --influxdb-port $OPMON_DBWRITER_INFLUX_PORT \
--influxdb-name $OPMON_DBWRITER_TABLE --influxdb-timeout $OPMON_DBWRITER_BATCH_SIZE_MS \
--influxdb-create True \
--debug False \
--influxdb-username $OPMON_DBWRITER_INFLUX_USER --influxdb-password $OPMON_DBWRITER_INFLUX_PASSWORD

Loading