Skip to content

Commit

Permalink
Merge pull request #119 from backend-developers-ltd/valiorgjob
Browse files Browse the repository at this point in the history
fix debug_run_organic_job command
  • Loading branch information
mpnowacki-reef authored May 31, 2024
2 parents 96b7c44 + 375e0ab commit 542eacb
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 86 deletions.
Original file line number Diff line number Diff line change
@@ -1,34 +1,24 @@
import argparse
import asyncio
import json
import logging

import bittensor
from compute_horde.mv_protocol.miner_requests import (
V0DeclineJobRequest,
V0ExecutorFailedRequest,
V0JobFailedRequest,
V0JobFinishedRequest,
)
from django.conf import settings
from django.core.management.base import BaseCommand
from django.utils import timezone

from compute_horde_validator.validator.models import Miner, MinerBlacklist, OrganicJob
from compute_horde_validator.validator.synthetic_jobs.generator.cli import CLIJobGenerator
from compute_horde_validator.validator.synthetic_jobs.utils import (
_execute_job,
from compute_horde_validator.validator.miner_driver import JobStatusUpdate
from compute_horde_validator.validator.models import (
AdminJobRequest,
Miner,
MinerBlacklist,
OrganicJob,
)

logger = logging.getLogger(__name__)
from compute_horde_validator.validator.tasks import run_admin_job_request


def string_list(value):
values = json.loads(value)
if not isinstance(values, list):
raise argparse.ArgumentTypeError(f"{value} is not a list of strings")
if not all(isinstance(v, str) for v in values):
raise argparse.ArgumentTypeError(f"{value} is not a list of strings")
return values
async def notify_job_status_update(msg: JobStatusUpdate):
comment = msg.metadata.comment if msg.metadata else ""
print(f"\njob status: {msg.status} {comment}")
if msg.metadata and msg.metadata.miner_response:
print(f"stderr: {msg.metadata.miner_response.docker_process_stderr}")
print(f"stdout: {msg.metadata.miner_response.docker_process_stdout}")


class Command(BaseCommand):
Expand All @@ -37,78 +27,61 @@ class Command(BaseCommand):
"""

def add_arguments(self, parser):
parser.add_argument("--miner_uid", type=int, help="Miner uid", required=True)
parser.add_argument("--miner_hotkey", default=None, type=str, help="Miner Hotkey")
parser.add_argument("--timeout", type=int, help="Timeout value", required=True)
parser.add_argument(
"--base_docker_image_name", type=str, help="First string argument", required=True
"--docker_image", type=str, help="docker image for job execution", required=True
)
parser.add_argument("--raw_script", type=str, default="", help="raw script to be executed")
parser.add_argument(
"--docker_image_name", type=str, help="Second string argument", required=True
"--cmd_args",
type=str,
default=None,
help="arguments passed to the script or docker image",
)

parser.add_argument(
"--docker_run_options_preset",
"--use_gpu",
action="store_true",
default=True,
help="use gpu for job execution",
)
parser.add_argument(
"--input_url",
type=str,
help="executors translate this to the 'RUN_OPTS' part of 'docker run *RUN_OPTS image_name *RUN_CMD'",
required=True,
default="",
help="input url for job execution",
)
parser.add_argument(
"--docker_run_cmd",
type=string_list,
help="the 'RUN_CMD' part of 'docker run *RUN_OPTS image_name *RUN_CMD'",
required=True,
"--output_url",
type=str,
default="",
help="output url for job execution",
)

def handle(self, *args, **options):
miner_uid = options["miner_uid"]
timeout = options["timeout"]
base_docker_image_name = options["base_docker_image_name"]
docker_image_name = options["docker_image_name"]
docker_run_options_preset = options["docker_run_options_preset"]
docker_run_cmd = options["docker_run_cmd"]
CLIJobGenerator.set_parameters(
timeout=timeout,
base_docker_image_name=base_docker_image_name,
docker_image_name=docker_image_name,
docker_run_options_preset=docker_run_options_preset,
docker_run_cmd=docker_run_cmd,
)
metagraph = bittensor.metagraph(
settings.BITTENSOR_NETUID, network=settings.BITTENSOR_NETWORK
)
neurons = [n for n in metagraph.neurons if n.uid == miner_uid]
if not neurons:
raise ValueError(f"{miner_uid=} not present in this subnetowrk")
neuron = neurons[0]
if not neuron.axon_info.is_serving:
raise ValueError(f"{miner_uid=} did not announce it's ip address")

miner = Miner.objects.get_or_create(hotkey=neuron.hotkey)[0]
miner_blacklisted = MinerBlacklist.objects.filter(miner=miner).exists()
if miner_blacklisted:
raise ValueError(f"{miner_uid=} with hotkey {neuron.hotkey} is blacklisted")
hotkey = options["miner_hotkey"]
if hotkey:
miner = Miner.objects.filter(hotkey=hotkey).first()
miner_blacklisted = MinerBlacklist.objects.filter(miner=miner).exists()
if miner_blacklisted:
raise ValueError(f"miner with hotkey {hotkey} is blacklisted")
else:
miner = Miner.objects.exclude(minerblacklist__isnull=False).first()
print(f"Picked miner: {miner} to run the job")

job = OrganicJob.objects.create(
job_request = AdminJobRequest.objects.create(
miner=miner,
miner_address=neuron.axon_info.ip,
miner_address_ip_version=neuron.axon_info.ip_type,
miner_port=neuron.axon_info.port,
timeout=options["timeout"],
docker_image=options["docker_image"],
raw_script=options["raw_script"],
args=options["cmd_args"],
use_gpu=options["use_gpu"],
input_url=options["input_url"],
output_url=options["output_url"],
created_at=timezone.now(),
)
_, msg = asyncio.run(_execute_job(job))
if isinstance(msg, V0DeclineJobRequest):
print("Miner declined")
raise SystemExit(1)
elif isinstance(msg, V0ExecutorFailedRequest):
print("Miner accepted but executor failed to prepare")
raise SystemExit(1)
elif isinstance(msg, V0JobFailedRequest):
print("Executor started the job but failed")
exit_status = 1
elif isinstance(msg, V0JobFinishedRequest):
print("Executor finished the job successfully")
exit_status = 0
else:
raise ValueError(f"Unexpected message: {msg}")
print(f"stderr: {msg.docker_process_stderr}")
print(f"\nstdout: {msg.docker_process_stdout}")
raise SystemExit(exit_status)
print(f"Processing job request: {job_request}")

asyncio.run(run_admin_job_request(job_request.pk, callback=notify_job_status_update))
job = OrganicJob.objects.get(job_uuid=job_request.uuid)
print(f"\nJob {job.job_uuid} done processing\nstatus: {job.status}\ncomment: {job.comment}")
3 changes: 3 additions & 0 deletions validator/app/src/compute_horde_validator/validator/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ class AdminJobRequest(models.Model):
def get_args(self):
return shlex.split(self.args)

def __str__(self):
return f"uuid: {self.uuid} - miner hotkey: {self.miner.hotkey}"


class JobReceipt(models.Model):
job_uuid = models.UUIDField()
Expand Down
4 changes: 2 additions & 2 deletions validator/app/src/compute_horde_validator/validator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def trigger_run_admin_job_request(job_request_id: int):
async_to_sync(run_admin_job_request)(job_request_id)


async def run_admin_job_request(job_request_id: int):
async def run_admin_job_request(job_request_id: int, callback=None):
job_request: AdminJobRequest = await AdminJobRequest.objects.prefetch_related("miner").aget(
id=job_request_id
)
Expand Down Expand Up @@ -145,7 +145,7 @@ async def run_admin_job_request(job_request_id: int):
job_request,
total_job_timeout=job_request.timeout,
wait_timeout=job_request.timeout,
notify_callback=None,
notify_callback=callback,
)


Expand Down

0 comments on commit 542eacb

Please sign in to comment.