Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resiliency for ArmadaOperator services calls #2591

Closed
wants to merge 14 commits into from
25 changes: 20 additions & 5 deletions client/python/armada_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from armada_client.k8s.io.api.core.v1 import generated_pb2 as core_v1
from armada_client.permissions import Permissions
from armada_client.typings import JobState
import tenacity
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you will need to add tenacity to our pyproject.toml file as this is a new dependency.

You will also need to run the python code formatter (Black via tox -e format-code) to get your code to pass CI.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove tenacity from the client? Not sure I like this change being enforced on all users of the client.

I’d leave the changed in jobservice. Can you also experiment with deploying jobservice with multiple replicas

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didnt get you but in the comments of the issue , It was mentioned that we need to include backoff for all the clients(ArmadaClient & JobServiceClient).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for removing the client side. The main user of this is mostly for jobservice and I'm not sure we want to add retries to all users of the client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @kannon92 , I added the test for tenacity.



class ArmadaClient:
Expand All @@ -38,6 +39,7 @@ def __init__(self, channel):
self.event_stub = event_pb2_grpc.EventStub(channel)
self.usage_stub = usage_pb2_grpc.UsageStub(channel)

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def get_job_events_stream(
self,
queue: str,
Expand Down Expand Up @@ -86,21 +88,24 @@ def unmarshal_event_response(event: event_pb2.EventStreamMessage) -> Event:
"""

return Event(event)


@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def submit_health(self) -> health_pb2.HealthCheckResponse:
"""
Health check for Submit Service.
:return: A HealthCheckResponse object.
"""
return self.submit_stub.Health(request=empty_pb2.Empty())


@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def event_health(self) -> health_pb2.HealthCheckResponse:
"""
Health check for Event Service.
:return: A HealthCheckResponse object.
"""
return self.event_stub.Health(request=empty_pb2.Empty())


@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def submit_jobs(
self, queue: str, job_set_id: str, job_request_items
) -> submit_pb2.JobSubmitResponse:
Expand All @@ -119,7 +124,8 @@ def submit_jobs(
)
response = self.submit_stub.SubmitJobs(request)
return response


@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def cancel_jobs(
self,
queue: Optional[str] = None,
Expand Down Expand Up @@ -152,7 +158,8 @@ def cancel_jobs(

response = self.submit_stub.CancelJobs(request)
return response


@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def cancel_jobset(
self,
queue: str,
Expand Down Expand Up @@ -181,6 +188,7 @@ def cancel_jobset(
response = self.submit_stub.CancelJobSet(request)
return response

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def reprioritize_jobs(
self,
new_priority: float,
Expand Down Expand Up @@ -222,6 +230,7 @@ def reprioritize_jobs(
response = self.submit_stub.ReprioritizeJobs(request)
return response

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def create_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty:
"""
Uses the CreateQueue RPC to create a queue.
Expand All @@ -232,6 +241,7 @@ def create_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty:
response = self.submit_stub.CreateQueue(queue)
return response

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def update_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty:
"""
Uses the UpdateQueue RPC to update a queue.
Expand All @@ -242,6 +252,7 @@ def update_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty:
response = self.submit_stub.UpdateQueue(queue)
return response

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def create_queues(
self, queues: List[submit_pb2.Queue]
) -> submit_pb2.BatchQueueCreateResponse:
Expand All @@ -255,6 +266,7 @@ def create_queues(
response = self.submit_stub.CreateQueues(queue_list)
return response

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def update_queues(
self, queues: List[submit_pb2.Queue]
) -> submit_pb2.BatchQueueUpdateResponse:
Expand All @@ -268,6 +280,7 @@ def update_queues(
response = self.submit_stub.UpdateQueues(queue_list)
return response

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def delete_queue(self, name: str) -> None:
"""Delete an empty queue by name.

Expand All @@ -279,6 +292,7 @@ def delete_queue(self, name: str) -> None:
request = submit_pb2.QueueDeleteRequest(name=name)
self.submit_stub.DeleteQueue(request)

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def get_queue(self, name: str) -> submit_pb2.Queue:
"""Get the queue by name.

Expand All @@ -291,6 +305,7 @@ def get_queue(self, name: str) -> submit_pb2.Queue:
response = self.submit_stub.GetQueue(request)
return response

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def get_queue_info(self, name: str) -> submit_pb2.QueueInfo:
"""Get the queue info by name.

Expand Down
6 changes: 5 additions & 1 deletion third_party/airflow/armada/operators/jobservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from google.protobuf import empty_pb2

import tenacity

class JobServiceClient:
"""
Expand All @@ -18,6 +19,7 @@ class JobServiceClient:
def __init__(self, channel):
self.job_stub = jobservice_pb2_grpc.JobServiceStub(channel)

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def get_job_status(
self, queue: str, job_set_id: str, job_id: str
) -> jobservice_pb2.JobServiceResponse:
Expand All @@ -30,11 +32,13 @@ def get_job_status(
:param job_id: The id of the job
:return: A Job Service Request (State, Error)
"""
print("Hello world")
kannon92 marked this conversation as resolved.
Show resolved Hide resolved
job_service_request = jobservice_pb2.JobServiceRequest(
queue=queue, job_set_id=job_set_id, job_id=job_id
)
return self.job_stub.GetJobStatus(job_service_request)


@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def health(self) -> jobservice_pb2.HealthCheckResponse:
"""Health Check for GRPC Request"""
return self.job_stub.Health(request=empty_pb2.Empty())