Skip to content

Commit

Permalink
Adding missing methods to python client (#154)
Browse files Browse the repository at this point in the history
  • Loading branch information
mustafai-gr authored and GitHub Enterprise committed May 29, 2024
1 parent ae456f5 commit 02b3d2e
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 0 deletions.
27 changes: 27 additions & 0 deletions client/python/armada_client/asyncio_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def __init__(
) -> None:
self.submit_stub = submit_pb2_grpc.SubmitStub(channel)
self.event_stub = event_pb2_grpc.EventStub(channel)
self.job_stub = job_pb2_grpc.JobsStub(channel)
self.event_timeout = event_timeout

async def get_job_events_stream(
Expand Down Expand Up @@ -185,6 +186,32 @@ async def submit_jobs(
response = await self.submit_stub.SubmitJobs(request)
return response

async def get_job_status(self, job_id: str) -> JobState:
"""
Asynchronously retrieves the status of a job from Armada.
:param job_id: The unique job identifier.
:type job_id: str
:returns: The status of the job.
"""
req = job_pb2.JobStatusRequest(job_ids=[job_id])
resp = await self.job_stub.GetJobStatus(req)
return resp.job_states[job_id]

async def get_job_details(self, job_id: str) -> JobDetails:
"""
Asynchronously retrieves the details of a job from Armada.
:param job_id: The unique job identifier.
:type job_id: str
:returns: The Armada job detail.
"""
req = job_pb2.JobDetailsRequest(job_ids=[job_id], expand_job_run=True)
resp = await self.job_stub.GetJobDetails(req)
return resp.job_details[job_id]

async def cancel_jobs(
self,
queue: str,
Expand Down
63 changes: 63 additions & 0 deletions client/python/armada_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
submit_pb2,
submit_pb2_grpc,
health_pb2,
job_pb2,
job_pb2_grpc
)
from armada_client.event import Event
from armada_client.k8s.io.api.core.v1 import generated_pb2 as core_v1
Expand All @@ -28,6 +30,41 @@
)


def is_terminal(state: JobState) -> bool:
"""
Determines if a job state is terminal.
Terminal states indicate that a job has completed its lifecycle, whether successfully or due to failure.
:param state: The current state of the job.
:type state: JobState
:returns: True if the job state is terminal, False if it is active.
:rtype: bool
"""
terminal_states = {
JobState.SUCCEEDED,
JobState.FAILED,
JobState.CANCELLED,
JobState.PREEMPTED,
}
return state in terminal_states


def is_active(state: JobState) -> bool:
"""
Determines if a job state is active.
Active states indicate that a job is still running or in a non-terminal state.
:param state: The current state of the job.
:type state: JobState
:returns: True if the job state is active, False if it is terminal.
:rtype: bool
"""
return not is_terminal(state)

class _ResilientArmadaEventStream(Iterator[event_pb2.EventStreamMessage]):
def __init__(
self,
Expand Down Expand Up @@ -102,6 +139,7 @@ def __init__(self, channel, event_timeout: timedelta = timedelta(minutes=15)):
self.submit_stub = submit_pb2_grpc.SubmitStub(channel)
self.event_stub = event_pb2_grpc.EventStub(channel)
self.event_timeout = event_timeout
self.job_stub = job_pb2_grpc.JobsStub(channel)

def get_job_events_stream(
self,
Expand Down Expand Up @@ -161,6 +199,31 @@ def event_health(self) -> health_pb2.HealthCheckResponse:
"""
return self.event_stub.Health(request=empty_pb2.Empty())

def get_job_status(self, job_id: str) -> JobState:
"""
Retrieves the status of a job from Armada.
:param job_id: The unique job identifier.
:type job_id: str
:returns: The response from the server containing the job status.
:rtype: JobStatusResponse
"""
req = job_pb2.JobStatusRequest(job_ids=[job_id])
return self.job_stub.GetJobStatus(req).job_states[job_id]

def get_job_details(self, job_id: str) -> JobDetails:
"""
Retrieves the details of a job from Armada.
:param job_id: The unique job identifier.
:type job_id: str
:returns: The Armada job detail.
"""
req = job_pb2.JobDetailsRequest(job_ids=[job_id], expand_job_run=True)
return self.job_stub.GetJobDetails(req).job_details[job_id]

def submit_jobs(
self, queue: str, job_set_id: str, job_request_items
) -> submit_pb2.JobSubmitResponse:
Expand Down

0 comments on commit 02b3d2e

Please sign in to comment.