Skip to content

Commit

Permalink
Merge pull request #52 from backend-developers-ltd/fix-schema
Browse files Browse the repository at this point in the history
Fix schema differences
  • Loading branch information
mzukowski-reef authored Mar 21, 2024
2 parents 7f54157 + 75dc9ca commit 12a12a5
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class ExecutorFinished(pydantic.BaseModel):

class ExecutorFailed(pydantic.BaseModel):
job_uuid: str
docker_process_exit_status: int
docker_process_exit_status: int | None
docker_process_stdout: str
docker_process_stderr: str

Expand Down Expand Up @@ -183,7 +183,7 @@ async def send_executor_finished(self, job_uuid: str, executor_token: str, stdou
)

async def send_executor_failed(self, job_uuid: str, executor_token: str, stdout: str, stderr: str,
exit_status: int):
exit_status: int | None):
group_name = ValidatorInterfaceMixin.group_name(executor_token)
await self.channel_layer.group_send(
group_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,26 @@ def validate(cls, values: dict[str, Any]) -> dict[str, Any]:
return values


class MinerResponse(BaseModel, extra=Extra.allow):
job_uuid: str
message_type: str
docker_process_stderr: str
docker_process_stdout: str


class JobStatusMetadata(BaseModel, extra=Extra.allow):
comment: str
miner_response: MinerResponse | None = None


class JobStatusUpdate(BaseModel, extra=Extra.forbid):
"""
Message sent from validator to facilitator in response to NewJobRequest.
"""

uuid: str
status: Literal['failed', 'rejected', 'accepted', 'completed']
metadata: dict = {}
metadata: JobStatusMetadata | None = None


@cache
Expand Down Expand Up @@ -272,9 +284,9 @@ async def miner_driver(self, job_request: JobRequest):
await self.send_model(JobStatusUpdate(
uuid=job_request.uuid,
status='failed',
metadata={
'comment': f'Miner timed out while preparing executor after {PREPARE_WAIT_TIMEOUT} seconds',
},
metadata=JobStatusMetadata(
comment=f'Miner timed out while preparing executor after {PREPARE_WAIT_TIMEOUT} seconds',
),
))
job.status = OrganicJob.Status.FAILED
job.comment = 'Miner timed out while preparing executor'
Expand All @@ -286,10 +298,7 @@ async def miner_driver(self, job_request: JobRequest):
await self.send_model(JobStatusUpdate(
uuid=job_request.uuid,
status='rejected',
metadata={
'comment': "Miner didn't accept the job",
'miner_response': msg.dict(),
},
metadata=JobStatusMetadata(comment=f"Miner didn't accept the job. Miner sent {msg.message_type}"),
))
job.status = OrganicJob.Status.FAILED
job.comment = f"Miner didn't accept the job saying: {msg.json()}"
Expand All @@ -300,7 +309,7 @@ async def miner_driver(self, job_request: JobRequest):
await self.send_model(JobStatusUpdate(
uuid=job_request.uuid,
status='accepted',
metadata={'comment': "Miner accepted job"},
metadata=JobStatusMetadata(comment="Miner accepted job"),
))
else:
raise ValueError(f'Unexpected msg: {msg}')
Expand Down Expand Up @@ -337,7 +346,7 @@ async def miner_driver(self, job_request: JobRequest):
await self.send_model(JobStatusUpdate(
uuid=job_request.uuid,
status='failed',
metadata={'comment': f'Miner timed out after {TOTAL_JOB_TIMEOUT} seconds'},
metadata=JobStatusMetadata(comment=f'Miner timed out after {TOTAL_JOB_TIMEOUT} seconds'),
))
job.status = OrganicJob.Status.FAILED
job.comment = 'Miner timed out'
Expand All @@ -348,7 +357,15 @@ async def miner_driver(self, job_request: JobRequest):
await self.send_model(JobStatusUpdate(
uuid=job_request.uuid,
status='failed',
metadata={'comment': 'Miner failed', 'miner_response': msg.dict()},
metadata=JobStatusMetadata(
comment='Miner failed',
miner_response=MinerResponse(
job_uuid=msg.job_uuid,
message_type=msg.message_type.value,
docker_process_stderr=msg.docker_process_stderr,
docker_process_stdout=msg.docker_process_stdout,
),
),
))
job.status = OrganicJob.Status.FAILED
job.comment = f'Miner failed: {msg.json()}'
Expand All @@ -359,7 +376,15 @@ async def miner_driver(self, job_request: JobRequest):
await self.send_model(JobStatusUpdate(
uuid=job_request.uuid,
status='completed',
metadata={'comment': 'Miner finished', 'miner_response': msg.dict()},
metadata=JobStatusMetadata(
comment='Miner finished',
miner_response=MinerResponse(
job_uuid=msg.job_uuid,
message_type=msg.message_type.value,
docker_process_stderr=msg.docker_process_stderr,
docker_process_stdout=msg.docker_process_stdout,
),
),
))
job.status = OrganicJob.Status.COMPLETED
job.comment = f'Miner finished: {msg.json()}'
Expand Down

0 comments on commit 12a12a5

Please sign in to comment.