From c81065a7c82deb2fd45fddde42cc0ae2bc187faa Mon Sep 17 00:00:00 2001 From: Pavan Kongara Date: Wed, 21 Jun 2023 02:01:15 +0530 Subject: [PATCH 01/12] added backoff-retry-algorithm_for_clients --- client/python/armada_client/client.py | 25 +++++++++++++++---- .../airflow/armada/operators/jobservice.py | 6 ++++- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/client/python/armada_client/client.py b/client/python/armada_client/client.py index c9d1e3db703..440ebebd4cb 100644 --- a/client/python/armada_client/client.py +++ b/client/python/armada_client/client.py @@ -21,6 +21,7 @@ from armada_client.k8s.io.api.core.v1 import generated_pb2 as core_v1 from armada_client.permissions import Permissions from armada_client.typings import JobState +import tenacity class ArmadaClient: @@ -38,6 +39,7 @@ def __init__(self, channel): self.event_stub = event_pb2_grpc.EventStub(channel) self.usage_stub = usage_pb2_grpc.UsageStub(channel) + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def get_job_events_stream( self, queue: str, @@ -86,21 +88,24 @@ def unmarshal_event_response(event: event_pb2.EventStreamMessage) -> Event: """ return Event(event) - + + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def submit_health(self) -> health_pb2.HealthCheckResponse: """ Health check for Submit Service. :return: A HealthCheckResponse object. """ return self.submit_stub.Health(request=empty_pb2.Empty()) - + + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def event_health(self) -> health_pb2.HealthCheckResponse: """ Health check for Event Service. :return: A HealthCheckResponse object. """ return self.event_stub.Health(request=empty_pb2.Empty()) - + + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def submit_jobs( self, queue: str, job_set_id: str, job_request_items ) -> submit_pb2.JobSubmitResponse: @@ -119,7 +124,8 @@ def submit_jobs( ) response = self.submit_stub.SubmitJobs(request) return response - + + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def cancel_jobs( self, queue: Optional[str] = None, @@ -152,7 +158,8 @@ def cancel_jobs( response = self.submit_stub.CancelJobs(request) return response - + + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def cancel_jobset( self, queue: str, @@ -181,6 +188,7 @@ def cancel_jobset( response = self.submit_stub.CancelJobSet(request) return response + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def reprioritize_jobs( self, new_priority: float, @@ -222,6 +230,7 @@ def reprioritize_jobs( response = self.submit_stub.ReprioritizeJobs(request) return response + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def create_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty: """ Uses the CreateQueue RPC to create a queue. @@ -232,6 +241,7 @@ def create_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty: response = self.submit_stub.CreateQueue(queue) return response + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def update_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty: """ Uses the UpdateQueue RPC to update a queue. @@ -242,6 +252,7 @@ def update_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty: response = self.submit_stub.UpdateQueue(queue) return response + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def create_queues( self, queues: List[submit_pb2.Queue] ) -> submit_pb2.BatchQueueCreateResponse: @@ -255,6 +266,7 @@ def create_queues( response = self.submit_stub.CreateQueues(queue_list) return response + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def update_queues( self, queues: List[submit_pb2.Queue] ) -> submit_pb2.BatchQueueUpdateResponse: @@ -268,6 +280,7 @@ def update_queues( response = self.submit_stub.UpdateQueues(queue_list) return response + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def delete_queue(self, name: str) -> None: """Delete an empty queue by name. @@ -279,6 +292,7 @@ def delete_queue(self, name: str) -> None: request = submit_pb2.QueueDeleteRequest(name=name) self.submit_stub.DeleteQueue(request) + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def get_queue(self, name: str) -> submit_pb2.Queue: """Get the queue by name. @@ -291,6 +305,7 @@ def get_queue(self, name: str) -> submit_pb2.Queue: response = self.submit_stub.GetQueue(request) return response + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def get_queue_info(self, name: str) -> submit_pb2.QueueInfo: """Get the queue info by name. diff --git a/third_party/airflow/armada/operators/jobservice.py b/third_party/airflow/armada/operators/jobservice.py index 2d5b4248e42..266c6ed1506 100644 --- a/third_party/airflow/armada/operators/jobservice.py +++ b/third_party/airflow/armada/operators/jobservice.py @@ -2,6 +2,7 @@ from google.protobuf import empty_pb2 +import tenacity class JobServiceClient: """ @@ -18,6 +19,7 @@ class JobServiceClient: def __init__(self, channel): self.job_stub = jobservice_pb2_grpc.JobServiceStub(channel) + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def get_job_status( self, queue: str, job_set_id: str, job_id: str ) -> jobservice_pb2.JobServiceResponse: @@ -30,11 +32,13 @@ def get_job_status( :param job_id: The id of the job :return: A Job Service Request (State, Error) """ + print("Hello world") job_service_request = jobservice_pb2.JobServiceRequest( queue=queue, job_set_id=job_set_id, job_id=job_id ) return self.job_stub.GetJobStatus(job_service_request) - + + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def health(self) -> jobservice_pb2.HealthCheckResponse: """Health Check for GRPC Request""" return self.job_stub.Health(request=empty_pb2.Empty()) From 171d99bbcac288d30f94c9f8cdb6c1755b6000f0 Mon Sep 17 00:00:00 2001 From: Pavan Kongara Date: Thu, 22 Jun 2023 12:16:00 +0530 Subject: [PATCH 02/12] deleted print statement --- third_party/airflow/armada/operators/jobservice.py | 1 - 1 file changed, 1 deletion(-) diff --git a/third_party/airflow/armada/operators/jobservice.py b/third_party/airflow/armada/operators/jobservice.py index 266c6ed1506..a2f56acf6c7 100644 --- a/third_party/airflow/armada/operators/jobservice.py +++ b/third_party/airflow/armada/operators/jobservice.py @@ -32,7 +32,6 @@ def get_job_status( :param job_id: The id of the job :return: A Job Service Request (State, Error) """ - print("Hello world") job_service_request = jobservice_pb2.JobServiceRequest( queue=queue, job_set_id=job_set_id, job_id=job_id ) From c9b5a5989180e008bd88c7381d20da9d2b860d1c Mon Sep 17 00:00:00 2001 From: Pavan Kongara Date: Thu, 22 Jun 2023 12:24:50 +0530 Subject: [PATCH 03/12] added tenacity in pyproject.toml --- third_party/airflow/pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/third_party/airflow/pyproject.toml b/third_party/airflow/pyproject.toml index 60adbe46c94..d2ed5d7826f 100644 --- a/third_party/airflow/pyproject.toml +++ b/third_party/airflow/pyproject.toml @@ -12,7 +12,8 @@ dependencies = [ "apache-airflow>=2.3.1", "grpcio>=1.46.3", "grpcio-tools>=1.46.3", - "types-protobuf>=3.19.22" + "types-protobuf>=3.19.22", + "tenacity" ] authors = [{name = "Armada-GROSS", email = "armada@armadaproject.io"}] license = { text = "Apache Software License" } From 019d6fa6ad9b33b2f5dc2307ff8232b7670e62bb Mon Sep 17 00:00:00 2001 From: Pavan Kongara Date: Thu, 22 Jun 2023 12:28:28 +0530 Subject: [PATCH 04/12] formatted --- .../airflow/armada/operators/jobservice.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/third_party/airflow/armada/operators/jobservice.py b/third_party/airflow/armada/operators/jobservice.py index a2f56acf6c7..7a21b87a8cf 100644 --- a/third_party/airflow/armada/operators/jobservice.py +++ b/third_party/airflow/armada/operators/jobservice.py @@ -4,6 +4,7 @@ import tenacity + class JobServiceClient: """ The JobService Client @@ -19,7 +20,11 @@ class JobServiceClient: def __init__(self, channel): self.job_stub = jobservice_pb2_grpc.JobServiceStub(channel) - @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) + @tenacity.retry( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_exponential(), + reraise=True, + ) def get_job_status( self, queue: str, job_set_id: str, job_id: str ) -> jobservice_pb2.JobServiceResponse: @@ -36,8 +41,12 @@ def get_job_status( queue=queue, job_set_id=job_set_id, job_id=job_id ) return self.job_stub.GetJobStatus(job_service_request) - - @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) + + @tenacity.retry( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_exponential(), + reraise=True, + ) def health(self) -> jobservice_pb2.HealthCheckResponse: """Health Check for GRPC Request""" return self.job_stub.Health(request=empty_pb2.Empty()) From d1533980d1f9d701b557cf24f1e0058af5c1aef0 Mon Sep 17 00:00:00 2001 From: Pavan Kongara Date: Thu, 22 Jun 2023 12:39:44 +0530 Subject: [PATCH 05/12] armada_client formatted --- client/python/armada_client/client.py | 94 +++++++++++++++++++++------ 1 file changed, 75 insertions(+), 19 deletions(-) diff --git a/client/python/armada_client/client.py b/client/python/armada_client/client.py index 440ebebd4cb..c3cef51dae8 100644 --- a/client/python/armada_client/client.py +++ b/client/python/armada_client/client.py @@ -39,7 +39,11 @@ def __init__(self, channel): self.event_stub = event_pb2_grpc.EventStub(channel) self.usage_stub = usage_pb2_grpc.UsageStub(channel) - @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) + @tenacity.retry( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_exponential(), + reraise=True, + ) def get_job_events_stream( self, queue: str, @@ -88,24 +92,36 @@ def unmarshal_event_response(event: event_pb2.EventStreamMessage) -> Event: """ return Event(event) - - @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) + + @tenacity.retry( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_exponential(), + reraise=True, + ) def submit_health(self) -> health_pb2.HealthCheckResponse: """ Health check for Submit Service. :return: A HealthCheckResponse object. """ return self.submit_stub.Health(request=empty_pb2.Empty()) - - @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) + + @tenacity.retry( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_exponential(), + reraise=True, + ) def event_health(self) -> health_pb2.HealthCheckResponse: """ Health check for Event Service. :return: A HealthCheckResponse object. """ return self.event_stub.Health(request=empty_pb2.Empty()) - - @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) + + @tenacity.retry( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_exponential(), + reraise=True, + ) def submit_jobs( self, queue: str, job_set_id: str, job_request_items ) -> submit_pb2.JobSubmitResponse: @@ -124,8 +140,12 @@ def submit_jobs( ) response = self.submit_stub.SubmitJobs(request) return response - - @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) + + @tenacity.retry( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_exponential(), + reraise=True, + ) def cancel_jobs( self, queue: Optional[str] = None, @@ -158,8 +178,12 @@ def cancel_jobs( response = self.submit_stub.CancelJobs(request) return response - - @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) + + @tenacity.retry( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_exponential(), + reraise=True, + ) def cancel_jobset( self, queue: str, @@ -188,7 +212,11 @@ def cancel_jobset( response = self.submit_stub.CancelJobSet(request) return response - @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) + @tenacity.retry( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_exponential(), + reraise=True, + ) def reprioritize_jobs( self, new_priority: float, @@ -230,7 +258,11 @@ def reprioritize_jobs( response = self.submit_stub.ReprioritizeJobs(request) return response - @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) + @tenacity.retry( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_exponential(), + reraise=True, + ) def create_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty: """ Uses the CreateQueue RPC to create a queue. @@ -241,7 +273,11 @@ def create_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty: response = self.submit_stub.CreateQueue(queue) return response - @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) + @tenacity.retry( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_exponential(), + reraise=True, + ) def update_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty: """ Uses the UpdateQueue RPC to update a queue. @@ -252,7 +288,11 @@ def update_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty: response = self.submit_stub.UpdateQueue(queue) return response - @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) + @tenacity.retry( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_exponential(), + reraise=True, + ) def create_queues( self, queues: List[submit_pb2.Queue] ) -> submit_pb2.BatchQueueCreateResponse: @@ -266,7 +306,11 @@ def create_queues( response = self.submit_stub.CreateQueues(queue_list) return response - @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) + @tenacity.retry( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_exponential(), + reraise=True, + ) def update_queues( self, queues: List[submit_pb2.Queue] ) -> submit_pb2.BatchQueueUpdateResponse: @@ -280,7 +324,11 @@ def update_queues( response = self.submit_stub.UpdateQueues(queue_list) return response - @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) + @tenacity.retry( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_exponential(), + reraise=True, + ) def delete_queue(self, name: str) -> None: """Delete an empty queue by name. @@ -292,7 +340,11 @@ def delete_queue(self, name: str) -> None: request = submit_pb2.QueueDeleteRequest(name=name) self.submit_stub.DeleteQueue(request) - @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) + @tenacity.retry( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_exponential(), + reraise=True, + ) def get_queue(self, name: str) -> submit_pb2.Queue: """Get the queue by name. @@ -305,7 +357,11 @@ def get_queue(self, name: str) -> submit_pb2.Queue: response = self.submit_stub.GetQueue(request) return response - @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) + @tenacity.retry( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_exponential(), + reraise=True, + ) def get_queue_info(self, name: str) -> submit_pb2.QueueInfo: """Get the queue info by name. From 3b8d965c9155c8d5504f590e2f0b86013bf1fe59 Mon Sep 17 00:00:00 2001 From: Pavan Kongara Date: Thu, 22 Jun 2023 13:59:35 +0530 Subject: [PATCH 06/12] added tenacity for armada-client --- client/python/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/python/pyproject.toml b/client/python/pyproject.toml index aacd4f9bf77..bfa8187f64a 100644 --- a/client/python/pyproject.toml +++ b/client/python/pyproject.toml @@ -4,7 +4,7 @@ version = "0.2.7" description = "Armada gRPC API python client" readme = "README.md" requires-python = ">=3.7" -dependencies = ["grpcio>=1.46.3", "grpcio-tools>=1.46.3", "mypy-protobuf>=3.2.0", "protobuf>=3.20.3"] +dependencies = ["grpcio>=1.46.3", "grpcio-tools>=1.46.3", "mypy-protobuf>=3.2.0", "protobuf>=3.20.3","tenacity"] license = { text = "Apache Software License" } authors = [{ name = "G-Research Open Source Software", email = "armada@armadaproject.io" }] From 32fab4068057479eb7960cd1c9cf87c03e8661da Mon Sep 17 00:00:00 2001 From: Pavan Kongara Date: Fri, 23 Jun 2023 11:40:33 +0530 Subject: [PATCH 07/12] deleted tenacity for armada_client --- client/python/armada_client/client.py | 82 ++++----------------------- client/python/pyproject.toml | 2 +- 2 files changed, 12 insertions(+), 72 deletions(-) diff --git a/client/python/armada_client/client.py b/client/python/armada_client/client.py index c3cef51dae8..dde5b07d8a7 100644 --- a/client/python/armada_client/client.py +++ b/client/python/armada_client/client.py @@ -21,7 +21,6 @@ from armada_client.k8s.io.api.core.v1 import generated_pb2 as core_v1 from armada_client.permissions import Permissions from armada_client.typings import JobState -import tenacity class ArmadaClient: @@ -39,11 +38,6 @@ def __init__(self, channel): self.event_stub = event_pb2_grpc.EventStub(channel) self.usage_stub = usage_pb2_grpc.UsageStub(channel) - @tenacity.retry( - stop=tenacity.stop_after_attempt(3), - wait=tenacity.wait_exponential(), - reraise=True, - ) def get_job_events_stream( self, queue: str, @@ -93,11 +87,6 @@ def unmarshal_event_response(event: event_pb2.EventStreamMessage) -> Event: return Event(event) - @tenacity.retry( - stop=tenacity.stop_after_attempt(3), - wait=tenacity.wait_exponential(), - reraise=True, - ) def submit_health(self) -> health_pb2.HealthCheckResponse: """ Health check for Submit Service. @@ -105,11 +94,7 @@ def submit_health(self) -> health_pb2.HealthCheckResponse: """ return self.submit_stub.Health(request=empty_pb2.Empty()) - @tenacity.retry( - stop=tenacity.stop_after_attempt(3), - wait=tenacity.wait_exponential(), - reraise=True, - ) + def event_health(self) -> health_pb2.HealthCheckResponse: """ Health check for Event Service. @@ -117,11 +102,7 @@ def event_health(self) -> health_pb2.HealthCheckResponse: """ return self.event_stub.Health(request=empty_pb2.Empty()) - @tenacity.retry( - stop=tenacity.stop_after_attempt(3), - wait=tenacity.wait_exponential(), - reraise=True, - ) + def submit_jobs( self, queue: str, job_set_id: str, job_request_items ) -> submit_pb2.JobSubmitResponse: @@ -141,11 +122,6 @@ def submit_jobs( response = self.submit_stub.SubmitJobs(request) return response - @tenacity.retry( - stop=tenacity.stop_after_attempt(3), - wait=tenacity.wait_exponential(), - reraise=True, - ) def cancel_jobs( self, queue: Optional[str] = None, @@ -179,11 +155,7 @@ def cancel_jobs( response = self.submit_stub.CancelJobs(request) return response - @tenacity.retry( - stop=tenacity.stop_after_attempt(3), - wait=tenacity.wait_exponential(), - reraise=True, - ) + def cancel_jobset( self, queue: str, @@ -212,11 +184,7 @@ def cancel_jobset( response = self.submit_stub.CancelJobSet(request) return response - @tenacity.retry( - stop=tenacity.stop_after_attempt(3), - wait=tenacity.wait_exponential(), - reraise=True, - ) + def reprioritize_jobs( self, new_priority: float, @@ -258,11 +226,7 @@ def reprioritize_jobs( response = self.submit_stub.ReprioritizeJobs(request) return response - @tenacity.retry( - stop=tenacity.stop_after_attempt(3), - wait=tenacity.wait_exponential(), - reraise=True, - ) + def create_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty: """ Uses the CreateQueue RPC to create a queue. @@ -273,11 +237,7 @@ def create_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty: response = self.submit_stub.CreateQueue(queue) return response - @tenacity.retry( - stop=tenacity.stop_after_attempt(3), - wait=tenacity.wait_exponential(), - reraise=True, - ) + def update_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty: """ Uses the UpdateQueue RPC to update a queue. @@ -288,11 +248,7 @@ def update_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty: response = self.submit_stub.UpdateQueue(queue) return response - @tenacity.retry( - stop=tenacity.stop_after_attempt(3), - wait=tenacity.wait_exponential(), - reraise=True, - ) + def create_queues( self, queues: List[submit_pb2.Queue] ) -> submit_pb2.BatchQueueCreateResponse: @@ -306,11 +262,7 @@ def create_queues( response = self.submit_stub.CreateQueues(queue_list) return response - @tenacity.retry( - stop=tenacity.stop_after_attempt(3), - wait=tenacity.wait_exponential(), - reraise=True, - ) + def update_queues( self, queues: List[submit_pb2.Queue] ) -> submit_pb2.BatchQueueUpdateResponse: @@ -324,11 +276,7 @@ def update_queues( response = self.submit_stub.UpdateQueues(queue_list) return response - @tenacity.retry( - stop=tenacity.stop_after_attempt(3), - wait=tenacity.wait_exponential(), - reraise=True, - ) + def delete_queue(self, name: str) -> None: """Delete an empty queue by name. @@ -340,11 +288,7 @@ def delete_queue(self, name: str) -> None: request = submit_pb2.QueueDeleteRequest(name=name) self.submit_stub.DeleteQueue(request) - @tenacity.retry( - stop=tenacity.stop_after_attempt(3), - wait=tenacity.wait_exponential(), - reraise=True, - ) + def get_queue(self, name: str) -> submit_pb2.Queue: """Get the queue by name. @@ -357,11 +301,7 @@ def get_queue(self, name: str) -> submit_pb2.Queue: response = self.submit_stub.GetQueue(request) return response - @tenacity.retry( - stop=tenacity.stop_after_attempt(3), - wait=tenacity.wait_exponential(), - reraise=True, - ) + def get_queue_info(self, name: str) -> submit_pb2.QueueInfo: """Get the queue info by name. diff --git a/client/python/pyproject.toml b/client/python/pyproject.toml index bfa8187f64a..aacd4f9bf77 100644 --- a/client/python/pyproject.toml +++ b/client/python/pyproject.toml @@ -4,7 +4,7 @@ version = "0.2.7" description = "Armada gRPC API python client" readme = "README.md" requires-python = ">=3.7" -dependencies = ["grpcio>=1.46.3", "grpcio-tools>=1.46.3", "mypy-protobuf>=3.2.0", "protobuf>=3.20.3","tenacity"] +dependencies = ["grpcio>=1.46.3", "grpcio-tools>=1.46.3", "mypy-protobuf>=3.2.0", "protobuf>=3.20.3"] license = { text = "Apache Software License" } authors = [{ name = "G-Research Open Source Software", email = "armada@armadaproject.io" }] From 5f92446c54e3aec26b30a7fb4840bcd84b01690f Mon Sep 17 00:00:00 2001 From: Pavan Kongara Date: Fri, 23 Jun 2023 11:41:46 +0530 Subject: [PATCH 08/12] formatted code --- client/python/armada_client/client.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/client/python/armada_client/client.py b/client/python/armada_client/client.py index dde5b07d8a7..c9d1e3db703 100644 --- a/client/python/armada_client/client.py +++ b/client/python/armada_client/client.py @@ -94,7 +94,6 @@ def submit_health(self) -> health_pb2.HealthCheckResponse: """ return self.submit_stub.Health(request=empty_pb2.Empty()) - def event_health(self) -> health_pb2.HealthCheckResponse: """ Health check for Event Service. @@ -102,7 +101,6 @@ def event_health(self) -> health_pb2.HealthCheckResponse: """ return self.event_stub.Health(request=empty_pb2.Empty()) - def submit_jobs( self, queue: str, job_set_id: str, job_request_items ) -> submit_pb2.JobSubmitResponse: @@ -155,7 +153,6 @@ def cancel_jobs( response = self.submit_stub.CancelJobs(request) return response - def cancel_jobset( self, queue: str, @@ -184,7 +181,6 @@ def cancel_jobset( response = self.submit_stub.CancelJobSet(request) return response - def reprioritize_jobs( self, new_priority: float, @@ -226,7 +222,6 @@ def reprioritize_jobs( response = self.submit_stub.ReprioritizeJobs(request) return response - def create_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty: """ Uses the CreateQueue RPC to create a queue. @@ -237,7 +232,6 @@ def create_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty: response = self.submit_stub.CreateQueue(queue) return response - def update_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty: """ Uses the UpdateQueue RPC to update a queue. @@ -248,7 +242,6 @@ def update_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty: response = self.submit_stub.UpdateQueue(queue) return response - def create_queues( self, queues: List[submit_pb2.Queue] ) -> submit_pb2.BatchQueueCreateResponse: @@ -262,7 +255,6 @@ def create_queues( response = self.submit_stub.CreateQueues(queue_list) return response - def update_queues( self, queues: List[submit_pb2.Queue] ) -> submit_pb2.BatchQueueUpdateResponse: @@ -276,7 +268,6 @@ def update_queues( response = self.submit_stub.UpdateQueues(queue_list) return response - def delete_queue(self, name: str) -> None: """Delete an empty queue by name. @@ -288,7 +279,6 @@ def delete_queue(self, name: str) -> None: request = submit_pb2.QueueDeleteRequest(name=name) self.submit_stub.DeleteQueue(request) - def get_queue(self, name: str) -> submit_pb2.Queue: """Get the queue by name. @@ -301,7 +291,6 @@ def get_queue(self, name: str) -> submit_pb2.Queue: response = self.submit_stub.GetQueue(request) return response - def get_queue_info(self, name: str) -> submit_pb2.QueueInfo: """Get the queue info by name. From 0e46fe01b88dcaf4d7755e1af5d9f2e90d904add Mon Sep 17 00:00:00 2001 From: Pavan Kongara Date: Thu, 29 Jun 2023 19:41:38 +0530 Subject: [PATCH 09/12] tenacity added for JobServiceAsyncIOClient --- .../airflow/armada/operators/jobservice_asyncio.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/third_party/airflow/armada/operators/jobservice_asyncio.py b/third_party/airflow/armada/operators/jobservice_asyncio.py index bf2de7f1565..aaef8ee9832 100644 --- a/third_party/airflow/armada/operators/jobservice_asyncio.py +++ b/third_party/airflow/armada/operators/jobservice_asyncio.py @@ -4,6 +4,8 @@ from google.protobuf import empty_pb2 +import tenacity + class JobServiceAsyncIOClient: """ @@ -20,6 +22,11 @@ class JobServiceAsyncIOClient: def __init__(self, channel: grpc.aio.Channel) -> None: self.job_stub = jobservice_pb2_grpc.JobServiceStub(channel) + @tenacity.retry( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_exponential(), + reraise=True, + ) async def get_job_status( self, queue: str, job_set_id: str, job_id: str ) -> jobservice_pb2.JobServiceResponse: @@ -38,6 +45,11 @@ async def get_job_status( response = await self.job_stub.GetJobStatus(job_service_request) return response + @tenacity.retry( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_exponential(), + reraise=True, + ) async def health(self) -> jobservice_pb2.HealthCheckResponse: """Health Check for GRPC Request""" response = await self.job_stub.Health(request=empty_pb2.Empty()) From 1c62adc764f8717f7eddc5a882ac799f0b893554 Mon Sep 17 00:00:00 2001 From: Pavan Kongara Date: Fri, 7 Jul 2023 12:12:14 +0530 Subject: [PATCH 10/12] test_added_for_tenacity --- .../airflow/tests/unit/job_service_mock.py | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/third_party/airflow/tests/unit/job_service_mock.py b/third_party/airflow/tests/unit/job_service_mock.py index 6787ede0c71..dd8368c0b1e 100644 --- a/third_party/airflow/tests/unit/job_service_mock.py +++ b/third_party/airflow/tests/unit/job_service_mock.py @@ -1,4 +1,5 @@ from armada.jobservice import jobservice_pb2, jobservice_pb2_grpc +import tenacity # TODO - Make this a bit smarter, so we can hit at least one full @@ -22,10 +23,30 @@ def mock_dummy_mapper_terminal(request): class JobService(jobservice_pb2_grpc.JobServiceServicer): + @tenacity.retry( + stop=tenacity.stop_after_attempt(4), + wait=tenacity.wait_exponential(), + reraise=True, + ) def GetJobStatus(self, request, context): return mock_dummy_mapper_terminal(request) - + @tenacity.retry( + stop=tenacity.stop_after_attempt(4), + wait=tenacity.wait_exponential(), + reraise=True, + ) def Health(self, request, context): return jobservice_pb2.HealthCheckResponse( status=jobservice_pb2.HealthCheckResponse.SERVING ) + @tenacity.retry( + stop=tenacity.stop_after_attempt(4), + wait=tenacity.wait_exponential(), + reraise=True, + ) + def tenacity_example(target_count): + current_count = JobService.tenacity_example.retry.statistics['attempt_number'] + if current_count Date: Fri, 7 Jul 2023 12:22:49 +0530 Subject: [PATCH 11/12] formatted code --- third_party/airflow/tests/unit/job_service_mock.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/third_party/airflow/tests/unit/job_service_mock.py b/third_party/airflow/tests/unit/job_service_mock.py index dd8368c0b1e..1315d21a47a 100644 --- a/third_party/airflow/tests/unit/job_service_mock.py +++ b/third_party/airflow/tests/unit/job_service_mock.py @@ -30,6 +30,7 @@ class JobService(jobservice_pb2_grpc.JobServiceServicer): ) def GetJobStatus(self, request, context): return mock_dummy_mapper_terminal(request) + @tenacity.retry( stop=tenacity.stop_after_attempt(4), wait=tenacity.wait_exponential(), @@ -39,14 +40,15 @@ def Health(self, request, context): return jobservice_pb2.HealthCheckResponse( status=jobservice_pb2.HealthCheckResponse.SERVING ) + @tenacity.retry( stop=tenacity.stop_after_attempt(4), wait=tenacity.wait_exponential(), reraise=True, ) def tenacity_example(target_count): - current_count = JobService.tenacity_example.retry.statistics['attempt_number'] - if current_count Date: Fri, 7 Jul 2023 12:41:21 +0530 Subject: [PATCH 12/12] added test_tenacity_retry.py --- third_party/airflow/tests/unit/test_tenacity_retry.py | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 third_party/airflow/tests/unit/test_tenacity_retry.py diff --git a/third_party/airflow/tests/unit/test_tenacity_retry.py b/third_party/airflow/tests/unit/test_tenacity_retry.py new file mode 100644 index 00000000000..e2f27013321 --- /dev/null +++ b/third_party/airflow/tests/unit/test_tenacity_retry.py @@ -0,0 +1,8 @@ +from job_service_mock import JobService + + +def test_tenacity_retry(): + target_count = 3 + JobService.tenacity_example(target_count) + retry_count = JobService.tenacity_example.retry.statistics["attempt_number"] + assert retry_count == target_count