diff --git a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/batch_run.py b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/batch_run.py index 8710be99f..4c4a0f159 100644 --- a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/batch_run.py +++ b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/batch_run.py @@ -1478,10 +1478,11 @@ async def _trigger_streaming_job( # wait to trigger all streaming jobs at the same time await streaming_start_barrier.wait() - url = f"https://{response.ip}:{response.port}/execute-job" async with httpx.AsyncClient( verify=str(executor_cert_path), cert=ctx.own_certs ) as client: + # send the seed to the executor to start the streaming job + url = f"https://{response.ip}:{response.port}/execute-job" try: r = await client.post(url, json={"seed": seed}, headers={"Host": response.ip}) if r.status_code == 200: @@ -1497,6 +1498,21 @@ async def _trigger_streaming_job( f"Error triggering streaming job {job_uuid} execution {url}: {e}" ) + # shedule the job to terminate + url = f"https://{response.ip}:{response.port}/terminate" + try: + r = await client.get(url, headers={"Host": response.ip}) + if r.status_code == 200: + logger.debug(f"Successfully sheduled streaming job {job_uuid} termination") + else: + raise Exception( + f"Bad response sheduling streaming job {job_uuid} termination on {url}: {r.status_code}, {r.text}" + ) + except Exception as e: + raise Exception( + f"Error sheduling streaming job {job_uuid} termination on {url}: {e}" + ) + async def _multi_send_job_request(ctx: BatchContext) -> None: streaming_classes = await get_streaming_job_executor_classes()