Skip to content

Commit

Permalink
Merge pull request #334 from backend-developers-ltd/worker-healthcheck
Browse files Browse the repository at this point in the history
Make celery services restart if they stop doing tasks
  • Loading branch information
mpnowacki-reef authored Jan 9, 2025
2 parents 1655aa8 + b7288d4 commit cfcc724
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 5 deletions.
2 changes: 2 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions validator/app/envs/prod/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ COPY ./app/envs/prod/gunicorn.conf.py /root/src/
COPY ./app/envs/prod/celery-entrypoint.sh /root/src/
COPY ./app/envs/prod/celery-beat-entrypoint.sh /root/src/
COPY ./app/envs/prod/prometheus-cleanup.sh /root/src/
COPY ./app/envs/prod/celery-worker-healthcheck.sh /root/src/
COPY ./app/envs/prod/celery_beat_healthcheck.py /root/src/

RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --frozen --no-dev --no-editable
Expand Down
22 changes: 22 additions & 0 deletions validator/app/envs/prod/celery-worker-healthcheck.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/bin/bash
set -eu

# Check if the file does not exist
if [[ ! -f "$WORKER_HEALTHCHECK_FILE_PATH" ]]; then
# Create the parent directory, if necessary
mkdir -p "$(dirname "$WORKER_HEALTHCHECK_FILE_PATH")"
# Create the file
touch "$WORKER_HEALTHCHECK_FILE_PATH"
fi

# Default timeout - 10 seconds
WORKER_HEALTHCHECK_TIMEOUT=${WORKER_HEALTHCHECK_TIMEOUT:-10}

# Check the file's modification time
MOD_TIME=$(stat -c %Y "$WORKER_HEALTHCHECK_FILE_PATH")
CURRENT_TIME=$(date +%s)

if (( CURRENT_TIME - MOD_TIME > WORKER_HEALTHCHECK_TIMEOUT )); then
# Liveness check failed: file not modified within $WORKER_HEALTHCHECK_TIMEOUT seconds
exit 1
fi
11 changes: 11 additions & 0 deletions validator/app/envs/prod/celery_beat_healthcheck.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import os

import kombu

# Copied from settings.py to not setup Django in a healthcheck
REDIS_HOST = os.getenv("REDIS_HOST", default="redis")
REDIS_PORT = os.getenv("REDIS_PORT", default="6379")
CELERY_BROKER_URL = os.getenv("CELERY_BROKER_URL", default=f"redis://{REDIS_HOST}:{REDIS_PORT}/0")

with kombu.Connection(CELERY_BROKER_URL) as conn:
conn.connect()
36 changes: 35 additions & 1 deletion validator/app/src/compute_horde_validator/celery.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import importlib
import logging
import os
from pathlib import Path

from celery import Celery, signals
from celery import Celery, bootsteps, signals
from celery.signals import worker_process_shutdown
from django.conf import settings
from prometheus_client import multiprocess
Expand Down Expand Up @@ -41,6 +42,8 @@

CELERY_TASK_QUEUES = list(set(TASK_QUEUE_MAP.values()))

WORKER_HEALTHCHECK_FILE = Path(settings.WORKER_HEALTHCHECK_FILE_PATH)


def route_task(name, args, kwargs, options, task=None, **kw):
if name not in TASK_QUEUE_MAP:
Expand Down Expand Up @@ -70,3 +73,34 @@ def get_num_tasks_in_queue(queue_name: str) -> int:
return int(conn.default_channel.client.llen(queue_name))
except (TypeError, ValueError, ConnectionError):
return 0


# Worker healthcheck
# Taken from https://github.com/celery/celery/issues/4079#issuecomment-1128954283
class LivenessProbe(bootsteps.StartStopStep):
requires = ("celery.worker.components:Timer",)

def __init__(self, worker, **kwargs):
super().__init__(worker, **kwargs)
self.tref = None

def start(self, worker):
# Create the parent directory if it doesn't exist
WORKER_HEALTHCHECK_FILE.parent.mkdir(parents=True, exist_ok=True)
# Ensure the file exists
WORKER_HEALTHCHECK_FILE.touch()
self.tref = worker.timer.call_repeatedly(
10.0,
self.update_heartbeat_file,
(worker,),
priority=10,
)

def stop(self, worker):
WORKER_HEALTHCHECK_FILE.unlink(missing_ok=True)

def update_heartbeat_file(self, worker):
WORKER_HEALTHCHECK_FILE.touch()


app.steps["worker"].add(LivenessProbe)
4 changes: 4 additions & 0 deletions validator/app/src/compute_horde_validator/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,10 @@ def wrapped(*args, **kwargs):
CELERY_WORKER_PREFETCH_MULTIPLIER = env.int("CELERY_WORKER_PREFETCH_MULTIPLIER", default=10)
CELERY_BROKER_POOL_LIMIT = env.int("CELERY_BROKER_POOL_LIMIT", default=50)

WORKER_HEALTHCHECK_FILE_PATH = env(
"WORKER_HEALTHCHECK_FILE_PATH", default="/tmp/worker-healthcheck"
)

EMAIL_BACKEND = env("EMAIL_BACKEND", default="django.core.mail.backends.filebased.EmailBackend")
EMAIL_FILE_PATH = env("EMAIL_FILE_PATH", default="/tmp/email")
EMAIL_HOST = env("EMAIL_HOST", default="smtp.sendgrid.net")
Expand Down
43 changes: 43 additions & 0 deletions validator/envs/runner/data/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,44 +57,64 @@ services:
environment:
- DEBUG=off
- PROMETHEUS_MULTIPROC_DIR=/prometheus-multiproc-dir/celery-worker
- WORKER_HEALTHCHECK_FILE_PATH=/tmp/worker-healthcheck
- WORKER_HEALTHCHECK_TIMEOUT=30
command: ./celery-entrypoint.sh -Q default
volumes:
- ./prometheus-metrics:/prometheus-multiproc-dir
- ${HOST_WALLET_DIR}:/root/.bittensor/wallets
tmpfs: /run
depends_on:
- redis
links:
- redis
- db
logging:
<<: *logging
labels:
- "com.centurylinklabs.watchtower.enable=true"
- "autoheal=true"
healthcheck:
test: ["CMD", "./celery-worker-healthcheck.sh"]
interval: 30s
timeout: 5s
retries: 3
start_period: 2m

celery-worker-weights:
<<: *celery-worker
environment:
- DEBUG=off
- PROMETHEUS_MULTIPROC_DIR=/prometheus-multiproc-dir/celery-worker-weights
- WORKER_HEALTHCHECK_FILE_PATH=/tmp/worker-healthcheck
- WORKER_HEALTHCHECK_TIMEOUT=30
command: ./celery-entrypoint.sh -Q weights

celery-worker-jobs:
<<: *celery-worker
environment:
- DEBUG=off
- PROMETHEUS_MULTIPROC_DIR=/prometheus-multiproc-dir/celery-worker-jobs
- WORKER_HEALTHCHECK_FILE_PATH=/tmp/worker-healthcheck
- WORKER_HEALTHCHECK_TIMEOUT=30
command: ./celery-entrypoint.sh -Q jobs

celery-worker-llm:
<<: *celery-worker
environment:
- DEBUG=off
- PROMETHEUS_MULTIPROC_DIR=/prometheus-multiproc-dir/celery-worker-llm
- WORKER_HEALTHCHECK_FILE_PATH=/tmp/worker-healthcheck
- WORKER_HEALTHCHECK_TIMEOUT=30
command: ./celery-entrypoint.sh -Q llm

celery-worker-receipts:
<<: *celery-worker
environment:
- DEBUG=off
- PROMETHEUS_MULTIPROC_DIR=/prometheus-multiproc-dir/celery-worker-receipts
- WORKER_HEALTHCHECK_FILE_PATH=/tmp/worker-healthcheck
- WORKER_HEALTHCHECK_TIMEOUT=30
command: ./celery-entrypoint.sh -Q receipts

cadvisor:
Expand Down Expand Up @@ -185,6 +205,8 @@ services:
command: celery --app=compute_horde_validator --broker="redis://redis:6379/0" flower
depends_on:
- redis
links:
- redis
logging:
<<: *logging

Expand All @@ -200,10 +222,19 @@ services:
depends_on:
- redis
- db
links:
- redis
logging:
<<: *logging
labels:
- "com.centurylinklabs.watchtower.enable=true"
- "autoheal=true"
healthcheck:
test: [ "CMD", "python", "celery_beat_healthcheck.py" ]
interval: 30s
timeout: 5s
retries: 3
start_period: 2m

connect-facilitator:
image: backenddevelopersltd/${VALIDATOR_IMAGE_REPO}:v0-latest
Expand Down Expand Up @@ -243,6 +274,18 @@ services:
labels:
- "com.centurylinklabs.watchtower.enable=true"

# Auto restart unhealthy containers with label autoheal=true.
# Warning! This container has access to the Docker socket.
# See https://github.com/willfarrell/docker-autoheal
autoheal:
# Pinned SHA for security, see above.
image: willfarrell/autoheal@sha256:794dfad2bd562984a6baaa7e4baa3ba1d211fc452f036dc2079a801a9a71a053
restart: unless-stopped
environment:
AUTOHEAL_CONTAINER_LABEL: autoheal
network_mode: none
volumes:
- /var/run/docker.sock:/var/run/docker.sock

volumes:
redis:
Expand Down
1 change: 1 addition & 0 deletions validator/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies = [
"boto3>=1.35.11",
"httpx~=0.26.0",
"class-registry>=2.1.2",
"kombu~=5.3",
]

[tool.uv.sources]
Expand Down
9 changes: 5 additions & 4 deletions validator/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit cfcc724

Please sign in to comment.