Skip to content

Commit

Permalink
backport celery queue metrics from the cookiecutter template
Browse files Browse the repository at this point in the history
  • Loading branch information
mpnowacki-reef committed Nov 3, 2024
1 parent 219f7f7 commit 613e28e
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 0 deletions.
7 changes: 7 additions & 0 deletions validator/app/src/compute_horde_validator/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
"compute_horde_validator.validator.tasks.fetch_dynamic_config": DEFAULT_QUEUE,
}

CELERY_TASK_QUEUES = list(set(TASK_QUEUE_MAP.values()))


def route_task(name, args, kwargs, options, task=None, **kw):
if name not in TASK_QUEUE_MAP:
Expand All @@ -60,3 +62,8 @@ def apply_startup_hook(*args, **kwargs):
importlib.import_module(hook_script_file)
else:
print("Not loading any startup hook")


def get_num_tasks_in_queue(queue_name: str) -> int:
with app.pool.acquire(block=True) as conn:
return conn.default_channel.client.llen(queue_name)
2 changes: 2 additions & 0 deletions validator/app/src/compute_horde_validator/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,8 @@ def wrapped(*args, **kwargs):
CELERY_TASK_ROUTES = ["compute_horde_validator.celery.route_task"]
CELERY_TASK_TIME_LIMIT = int(timedelta(hours=2, minutes=5).total_seconds())
CELERY_TASK_ALWAYS_EAGER = env.bool("CELERY_TASK_ALWAYS_EAGER", default=False)
CELERY_WORKER_SEND_TASK_EVENTS = True
CELERY_TASK_SEND_SENT_EVENT = True
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
Expand Down
21 changes: 21 additions & 0 deletions validator/app/src/compute_horde_validator/validator/metrics.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import glob
import os
from collections.abc import Iterator

import prometheus_client
from django.http import HttpResponse
from django_prometheus.exports import ExportToDjangoView
from prometheus_client import multiprocess
from prometheus_client.core import REGISTRY, GaugeMetricFamily, Metric
from prometheus_client.registry import Collector

from ..celery import get_num_tasks_in_queue, CELERY_TASK_QUEUES


class RecursiveMultiProcessCollector(multiprocess.MultiProcessCollector):
Expand All @@ -23,9 +28,25 @@ def metrics_view(request):
if os.environ.get(ENV_VAR_NAME):
registry = prometheus_client.CollectorRegistry()
RecursiveMultiProcessCollector(registry)
registry.register(CustomCeleryCollector())
return HttpResponse(
prometheus_client.generate_latest(registry),
content_type=prometheus_client.CONTENT_TYPE_LATEST,
)
else:
return ExportToDjangoView(request)


class CustomCeleryCollector(Collector):
def collect(self) -> Iterator[Metric]:
num_tasks_in_queue = GaugeMetricFamily(
"celery_queue_len",
"How many tasks are there in a queue",
labels=("queue",),
)
for queue in CELERY_TASK_QUEUES:
num_tasks_in_queue.add_metric([queue], get_num_tasks_in_queue(queue))
yield num_tasks_in_queue


REGISTRY.register(CustomCeleryCollector())

0 comments on commit 613e28e

Please sign in to comment.