Skip to content

Commit

Permalink
Merge pull request #258 from backend-developers-ltd/initial-job-reque…
Browse files Browse the repository at this point in the history
…st-volume

Initial job request volume
  • Loading branch information
mzukowski-reef authored Oct 2, 2024
2 parents c97c660 + 3e6074b commit ee51e3f
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 23 deletions.
7 changes: 7 additions & 0 deletions compute_horde/compute_horde/em_protocol/miner_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,15 @@ class V0InitialJobRequest(BaseMinerRequest, JobMixin):
message_type: RequestType = RequestType.V0PrepareJobRequest
base_docker_image_name: str | None = None
timeout_seconds: int | None = None
volume: Volume | None = None
volume_type: VolumeType | None = None

@model_validator(mode="after")
def validate_volume_or_volume_type(self) -> Self:
if bool(self.volume) and bool(self.volume_type):
raise ValueError("Expected either `volume` or `volume_type`, got both")
return self


class V0JobRequest(BaseMinerRequest, JobMixin):
message_type: RequestType = RequestType.V0RunJobRequest
Expand Down
7 changes: 7 additions & 0 deletions compute_horde/compute_horde/mv_protocol/validator_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,15 @@ class V0InitialJobRequest(BaseValidatorRequest, JobMixin):
executor_class: ExecutorClass | None = None
base_docker_image_name: str | None = None
timeout_seconds: int | None = None
volume: Volume | None = None
volume_type: VolumeType | None = None

@model_validator(mode="after")
def validate_volume_or_volume_type(self) -> Self:
if bool(self.volume) and bool(self.volume_type):
raise ValueError("Expected either `volume` or `volume_type`, got both")
return self


class V0JobRequest(BaseValidatorRequest, JobMixin):
message_type: RequestType = RequestType.V0JobRequest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
InlineVolume,
MultiVolume,
SingleFileVolume,
Volume,
ZipUrlVolume,
)
from compute_horde.base_requests import BaseRequest
Expand Down Expand Up @@ -399,8 +400,9 @@ async def run_job(self, job_request: V0JobRequest):
docker_run_options = RunConfigManager.preset_to_docker_run_args(
job_request.docker_run_options_preset
)
await self.unpack_volume(job_request)
await self.unpack_volume()
except JobError as ex:
logger.error("Job error: %s", ex.description)
return JobResult(
success=False,
exit_status=None,
Expand Down Expand Up @@ -535,27 +537,25 @@ async def clean(self):
await process.wait()
self.temp_dir.rmdir()

async def _unpack_volume(self, job_request: V0JobRequest):
async def _unpack_volume(self, volume: Volume | None):
assert str(self.volume_mount_dir) not in {"~", "/"}
for path in self.volume_mount_dir.glob("*"):
if path.is_file():
path.unlink()
elif path.is_dir():
shutil.rmtree(path)

if job_request.volume is not None:
if isinstance(job_request.volume, InlineVolume):
await self._unpack_inline_volume(job_request.volume)
elif isinstance(job_request.volume, ZipUrlVolume):
await self._unpack_zip_url_volume(job_request.volume)
elif isinstance(job_request.volume, SingleFileVolume):
await self._unpack_single_file_volume(job_request.volume)
elif isinstance(job_request.volume, MultiVolume):
await self._unpack_multi_volume(job_request.volume)
if volume is not None:
if isinstance(volume, InlineVolume):
await self._unpack_inline_volume(volume)
elif isinstance(volume, ZipUrlVolume):
await self._unpack_zip_url_volume(volume)
elif isinstance(volume, SingleFileVolume):
await self._unpack_single_file_volume(volume)
elif isinstance(volume, MultiVolume):
await self._unpack_multi_volume(volume)
else:
raise NotImplementedError(
f"Unsupported volume_type: {job_request.volume.volume_type}"
)
raise NotImplementedError(f"Unsupported volume_type: {volume.volume_type}")

chmod_proc = await asyncio.create_subprocess_exec(
"chmod", "-R", "777", self.temp_dir.as_posix()
Expand Down Expand Up @@ -598,10 +598,17 @@ async def _unpack_multi_volume(self, volume: MultiVolume):
else:
raise NotImplementedError(f"Unsupported sub-volume type: {type(sub_volume)}")

async def unpack_volume(self, job_request: V0JobRequest):
async def get_job_volume(self) -> Volume | None:
if self.full_job_request.volume and self.initial_job_request.volume:
raise JobError("Received multiple volumes")

return self.full_job_request.volume or self.initial_job_request.volume or None

async def unpack_volume(self):
try:
await asyncio.wait_for(
self._unpack_volume(job_request), timeout=INPUT_VOLUME_UNPACK_TIMEOUT_SECONDS
self._unpack_volume(await self.get_job_volume()),
timeout=INPUT_VOLUME_UNPACK_TIMEOUT_SECONDS,
)
except JobError:
raise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ async def connect(self):
job_uuid=initial_job_details.job_uuid,
base_docker_image_name=initial_job_details.base_docker_image_name,
timeout_seconds=initial_job_details.timeout_seconds,
volume=initial_job_details.volume,
volume_type=initial_job_details.volume_type.value
if initial_job_details.volume_type
else None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,21 @@ async def handle(self, msg: BaseValidatorRequest):
if not self.validator_authenticated:
self.msg_queue.append(msg)
return

if isinstance(msg, validator_requests.V0InitialJobRequest) or isinstance(
msg, validator_requests.V0JobRequest
):
# Proactively check volume safety in both requests that may contain a volume
if msg.volume and not msg.volume.is_safe():
error_msg = f"Received JobRequest with unsafe volume: {msg.volume.contents}"
logger.error(error_msg)
await self.send(
miner_requests.GenericError(
details=error_msg,
).model_dump_json()
)
return

if isinstance(msg, validator_requests.V0InitialJobRequest):
validator_blacklisted = await ValidatorBlacklist.objects.filter(
validator=self.validator
Expand Down Expand Up @@ -294,17 +309,18 @@ async def handle(self, msg: BaseValidatorRequest):

if isinstance(msg, validator_requests.V0JobRequest):
job = self.pending_jobs.get(msg.job_uuid)
if msg.volume and not msg.volume.is_safe():
error_msg = f"Received JobRequest with unsafe volume: {msg.volume.contents}"
if job is None:
error_msg = f"Received JobRequest for unknown job_uuid: {msg.job_uuid}"
logger.error(error_msg)
await self.send(
miner_requests.GenericError(
details=error_msg,
).model_dump_json()
)
return
if job is None:
error_msg = f"Received JobRequest for unknown job_uuid: {msg.job_uuid}"
if job.initial_job_details.get("volume") is not None and msg.volume is not None:
# The volume may have been already sent in the initial job request.
error_msg = f"Received job volume twice job_uuid: {msg.job_uuid}"
logger.error(error_msg)
await self.send(
miner_requests.GenericError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ async def fake_executor(token):
"base_docker_image_name": "it's teeeeests",
"timeout_seconds": 60,
"volume_type": "inline",
"volume": None,
}, response
await communicator.send_json_to(
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
V0JobFinishedReceiptRequest,
V0JobRequest,
V0JobStartedReceiptRequest,
VolumeType,
)
from compute_horde.transport import AbstractTransport, WSTransport
from django.conf import settings
Expand Down Expand Up @@ -868,7 +867,7 @@ async def _send_initial_job_request(
executor_class=job.executor_class,
base_docker_image_name=job.job_generator.base_docker_image_name(),
timeout_seconds=job.job_generator.timeout_seconds(),
volume_type=VolumeType.inline,
volume=job.volume if job.job_generator.volume_in_initial_req() else None,
)
request_json = request.model_dump_json()

Expand Down Expand Up @@ -921,7 +920,7 @@ async def _send_job_request(
docker_run_options_preset=job.job_generator.docker_run_options_preset(),
docker_run_cmd=job.job_generator.docker_run_cmd(),
raw_script=job.job_generator.raw_script(),
volume=job.volume,
volume=job.volume if not job.job_generator.volume_in_initial_req() else None,
output_upload=job.output_upload,
)
request_json = request.model_dump_json()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ def verify(self, msg: V0JobFinishedRequest, time_took: float) -> tuple[bool, str
@abc.abstractmethod
def job_description(self) -> str: ...

def volume_in_initial_req(self) -> bool:
return False


class BaseSyntheticJobGeneratorFactory(abc.ABC):
@abc.abstractmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,11 @@ def verify(self, msg: V0JobFinishedRequest, time_took: float) -> tuple[bool, str

def job_description(self) -> str:
return f"Hashcat {self.hash_job}"

def volume_in_initial_req(self) -> bool:
if self.weights_version in [0, 1, 2, 3]:
return False
elif self.weights_version == 4:
return True
else:
raise RuntimeError(f"No score function for weights_version: {self.weights_version}")
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ def verify(self, msg: V0JobFinishedRequest, time_took: float) -> tuple[bool, str
def job_description(self) -> str:
return "LLM prompts job"

def volume_in_initial_req(self) -> bool:
return True


class LlmPromptsSyntheticJobGenerator(LlmPromptsJobGenerator):
def __init__(
Expand Down

0 comments on commit ee51e3f

Please sign in to comment.