Skip to content

Commit

Permalink
Merge pull request #121 from backend-developers-ltd/validatorfixes
Browse files Browse the repository at this point in the history
Validator organic job fixes
  • Loading branch information
andreea-popescu-reef authored Jun 6, 2024
2 parents a99564e + 983e8a5 commit 788ed48
Show file tree
Hide file tree
Showing 13 changed files with 334 additions and 46 deletions.
36 changes: 35 additions & 1 deletion validator/app/src/compute_horde_validator/validator/admin.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from django.contrib import admin # noqa
from django.contrib import messages # noqa
from django.shortcuts import redirect # noqa
from django.utils.safestring import mark_safe # noqa
from django import forms

from compute_horde_validator.validator.models import (
Expand Down Expand Up @@ -35,23 +38,44 @@ def has_add_permission(self, *args, **kwargs):
class AdminJobRequestForm(forms.ModelForm):
class Meta:
model = AdminJobRequest
fields = "__all__"
fields = [
"uuid",
"miner",
"docker_image",
"timeout",
"raw_script",
"args",
"use_gpu",
"input_url",
"output_url",
"status_message",
]

def __init__(self, *args, **kwargs):
super(__class__, self).__init__(*args, **kwargs)
if self.fields:
# exclude blacklisted miners from valid results
self.fields["miner"].queryset = Miner.objects.exclude(minerblacklist__isnull=False)


class AdminJobRequestAddOnlyAdmin(AddOnlyAdmin):
form = AdminJobRequestForm
exclude = ["env"] # not used ?
list_display = ["uuid", "docker_image", "use_gpu", "miner", "created_at"]
readonly_fields = ["uuid", "status_message"]
ordering = ["-created_at"]
autocomplete_fields = ["miner"]

def save_model(self, request, obj, form, change):
super().save_model(request, obj, form, change)
trigger_run_admin_job_request.delay(obj.id)
organic_job = OrganicJob.objects.filter(job_uuid=obj.uuid).first()
msg = (
f"Please see <a href='/admin/validator/organicjob/{organic_job.pk}/change/'>ORGANIC JOB</a> for further details"
if organic_job
else f"Job {obj.uuid} failed to initialize"
)
messages.add_message(request, messages.INFO, mark_safe(msg))


class JobReadOnlyAdmin(ReadOnlyAdmin):
Expand All @@ -67,6 +91,16 @@ class MinerReadOnlyAdmin(ReadOnlyAdmin):
def has_add_permission(self, *args, **kwargs):
return False

# exclude blacklisted miners from autocomplete results
def get_search_results(self, request, queryset, search_term):
queryset, use_distinct = super().get_search_results(
request,
queryset,
search_term,
)
queryset = queryset.exclude(minerblacklist__isnull=False)
return queryset, use_distinct


class JobReceiptsReadOnlyAdmin(ReadOnlyAdmin):
list_display = [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import sys

from asgiref.sync import async_to_sync
from django.core.management.base import BaseCommand
from django.utils import timezone

Expand All @@ -16,7 +17,14 @@
async def notify_job_status_update(msg: JobStatusUpdate):
comment = msg.metadata.comment if msg.metadata else ""
print(f"\njob status: {msg.status} {comment}")
if msg.metadata and msg.metadata.miner_response:
if (
msg.metadata
and msg.metadata.miner_response
and (
msg.metadata.miner_response.docker_process_stderr != ""
or msg.metadata.miner_response.docker_process_stdout != ""
)
):
print(f"stderr: {msg.metadata.miner_response.docker_process_stderr}")
print(f"stdout: {msg.metadata.miner_response.docker_process_stdout}")

Expand All @@ -36,7 +44,7 @@ def add_arguments(self, parser):
parser.add_argument(
"--cmd_args",
type=str,
default=None,
default="",
help="arguments passed to the script or docker image",
)
parser.add_argument(
Expand Down Expand Up @@ -67,7 +75,7 @@ def handle(self, *args, **options):
raise ValueError(f"miner with hotkey {hotkey} is blacklisted")
else:
miner = Miner.objects.exclude(minerblacklist__isnull=False).first()
print(f"Picked miner: {miner} to run the job")
print(f"\nPicked miner: {miner} to run the job")

job_request = AdminJobRequest.objects.create(
miner=miner,
Expand All @@ -80,8 +88,11 @@ def handle(self, *args, **options):
output_url=options["output_url"],
created_at=timezone.now(),
)
print(f"Processing job request: {job_request}")

asyncio.run(run_admin_job_request(job_request.pk, callback=notify_job_status_update))
job = OrganicJob.objects.get(job_uuid=job_request.uuid)
print(f"\nJob {job.job_uuid} done processing\nstatus: {job.status}\ncomment: {job.comment}")
async_to_sync(run_admin_job_request)(job_request.pk, callback=notify_job_status_update)
try:
job_request.refresh_from_db()
job = OrganicJob.objects.get(job_uuid=job_request.uuid)
print(f"\nJob {job.job_uuid} done processing")
except OrganicJob.DoesNotExist:
print(f"\nJob {job_request.uuid} not found")
sys.exit(1)
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
import logging

from django.conf import settings
from django.core.management.base import BaseCommand

from compute_horde_validator.validator.synthetic_jobs.utils import (
create_and_run_sythethic_job_batch,
)

logger = logging.getLogger(__name__)


class Command(BaseCommand):
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Generated by Django 4.2.11 on 2024-06-03 07:46

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("validator", "0010_jobreceipt_jobreceipt_unique_job_receipt_job_uuid"),
]

operations = [
migrations.AddField(
model_name="organicjob",
name="stderr",
field=models.TextField(blank=True, default=""),
),
migrations.AddField(
model_name="organicjob",
name="stdout",
field=models.TextField(blank=True, default=""),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Generated by Django 4.2.11 on 2024-06-04 12:18

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("validator", "0011_organicjob_stderr_organicjob_stdout"),
]

operations = [
migrations.AddField(
model_name="adminjobrequest",
name="status_message",
field=models.TextField(blank=True, default=""),
),
]
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ async def run_miner_job(
),
)
)
job.stdout = msg.docker_process_stdout
job.stderr = msg.docker_process_stderr
job.status = OrganicJob.Status.FAILED
job.comment = f"Miner failed: {msg.json()}"
await job.asave()
Expand All @@ -235,6 +237,8 @@ async def run_miner_job(
),
)
)
job.stdout = msg.docker_process_stdout
job.stderr = msg.docker_process_stderr
job.status = OrganicJob.Status.COMPLETED
job.comment = f"Miner finished: {msg.json()}"
await job.asave()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ class SyntheticJob(JobBase):


class OrganicJob(JobBase):
pass
stdout = models.TextField(blank=True, default="")
stderr = models.TextField(blank=True, default="")

def get_absolute_url(self):
return f"/admin/validator/organicjob/{self.pk}/change/"


class AdminJobRequest(models.Model):
Expand All @@ -90,6 +94,8 @@ class AdminJobRequest(models.Model):

created_at = models.DateTimeField(auto_now_add=True)

status_message = models.TextField(blank=True, default="")

def get_args(self):
return shlex.split(self.args)

Expand Down
56 changes: 35 additions & 21 deletions validator/app/src/compute_horde_validator/validator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,37 +114,51 @@ def trigger_run_admin_job_request(job_request_id: int):
async_to_sync(run_admin_job_request)(job_request_id)


def get_keypair():
return settings.BITTENSOR_WALLET().get_hotkey()


async def run_admin_job_request(job_request_id: int, callback=None):
job_request: AdminJobRequest = await AdminJobRequest.objects.prefetch_related("miner").aget(
id=job_request_id
)
miner = job_request.miner
miner_axon_info = await get_miner_axon_info(miner.hotkey)
job = await OrganicJob.objects.acreate(
job_uuid=str(job_request.uuid),
miner=miner,
miner_address=miner_axon_info.ip,
miner_address_ip_version=miner_axon_info.ip_type,
miner_port=miner_axon_info.port,
job_description="Validator Job from Admin Panel",
)
try:
miner = job_request.miner
miner_axon_info = await get_miner_axon_info(miner.hotkey)
job = await OrganicJob.objects.acreate(
job_uuid=str(job_request.uuid),
miner=miner,
miner_address=miner_axon_info.ip,
miner_address_ip_version=miner_axon_info.ip_type,
miner_port=miner_axon_info.port,
job_description="Validator Job from Admin Panel",
)

keypair = settings.BITTENSOR_WALLET().get_hotkey()
miner_client = MinerClient(
loop=asyncio.get_event_loop(),
miner_address=miner_axon_info.ip,
miner_port=miner_axon_info.port,
miner_hotkey=miner.hotkey,
my_hotkey=keypair.ss58_address,
job_uuid=job.job_uuid,
keypair=keypair,
)
keypair = get_keypair()
miner_client = MinerClient(
loop=asyncio.get_event_loop(),
miner_address=miner_axon_info.ip,
miner_port=miner_axon_info.port,
miner_hotkey=miner.hotkey,
my_hotkey=keypair.ss58_address,
job_uuid=job.job_uuid,
keypair=keypair,
)

job_request.status_message = "Job successfully triggered"
except Exception as e:
job_request.status_message = f"Job failed to trigger due to: {e}"
return
finally:
print(job_request.status_message)
await job_request.asave()

print(f"\nProcessing job request: {job_request}")
await run_miner_job(
miner_client,
job,
job_request,
total_job_timeout=job_request.timeout,
wait_timeout=job_request.timeout,
notify_callback=callback,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@
"""

import asyncio
from typing import NamedTuple

import bittensor
import pytest


async def throw_error(*args):
raise Exception("Error thrown for testing")


def mock_keypair():
Expand All @@ -28,3 +32,14 @@ def get_miner_client(MINER_CLIENT, job_uuid: str):
job_uuid=job_uuid,
keypair=mock_keypair(),
)


class MockedAxonInfo(NamedTuple):
is_serving: bool
ip: str
ip_type: int
port: int


async def mock_get_miner_axon_info(hotkey: str):
return MockedAxonInfo(is_serving=True, ip_type=4, ip="0000", port=8000)
Loading

0 comments on commit 788ed48

Please sign in to comment.