Skip to content

Commit

Permalink
send terminate
Browse files Browse the repository at this point in the history
  • Loading branch information
andreea-popescu-reef committed Jan 7, 2025
1 parent 5acb2a1 commit 2d98b2e
Showing 1 changed file with 17 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1478,10 +1478,11 @@ async def _trigger_streaming_job(
# wait to trigger all streaming jobs at the same time
await streaming_start_barrier.wait()

url = f"https://{response.ip}:{response.port}/execute-job"
async with httpx.AsyncClient(
verify=str(executor_cert_path), cert=ctx.own_certs
) as client:
# send the seed to the executor to start the streaming job
url = f"https://{response.ip}:{response.port}/execute-job"
try:
r = await client.post(url, json={"seed": seed}, headers={"Host": response.ip})
if r.status_code == 200:
Expand All @@ -1497,6 +1498,21 @@ async def _trigger_streaming_job(
f"Error triggering streaming job {job_uuid} execution {url}: {e}"
)

# shedule the job to terminate
url = f"https://{response.ip}:{response.port}/terminate"
try:
r = await client.get(url, headers={"Host": response.ip})
if r.status_code == 200:
logger.debug(f"Successfully sheduled streaming job {job_uuid} termination")
else:
raise Exception(
f"Bad response sheduling streaming job {job_uuid} termination on {url}: {r.status_code}, {r.text}"
)
except Exception as e:
raise Exception(
f"Error sheduling streaming job {job_uuid} termination on {url}: {e}"
)


async def _multi_send_job_request(ctx: BatchContext) -> None:
streaming_classes = await get_streaming_job_executor_classes()
Expand Down

0 comments on commit 2d98b2e

Please sign in to comment.