Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Adds tenacity to retry failed task run creation
Browse files Browse the repository at this point in the history
  • Loading branch information
desertaxle committed Aug 7, 2023
1 parent f1c68b1 commit 37d9d21
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 2 deletions.
2 changes: 2 additions & 0 deletions prefect_aws/workers/ecs_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
)
from pydantic import Field, root_validator
from slugify import slugify
from tenacity import retry, stop_after_attempt, wait_fixed, wait_random
from typing_extensions import Literal

from prefect_aws import AwsCredentials
Expand Down Expand Up @@ -1421,6 +1422,7 @@ def _prepare_task_run_request(

return task_run_request

@retry(stop=stop_after_attempt(3), wait=wait_fixed(1) + wait_random(0, 3))
def _create_task_run(self, ecs_client: _ECSClient, task_run_request: dict) -> str:
"""
Create a run of a task definition.
Expand Down
5 changes: 3 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
boto3>=1.24.53
botocore>=1.27.53
prefect>=2.10.11
mypy_boto3_s3>=1.24.94
mypy_boto3_secretsmanager>=1.26.49
mypy_boto3_secretsmanager>=1.26.49
prefect>=2.10.11
tenacity>=8.0.0
24 changes: 24 additions & 0 deletions tests/workers/test_ecs_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from moto.ec2.utils import generate_instance_identity_document
from prefect.server.schemas.core import FlowRun
from prefect.utilities.asyncutils import run_sync_in_worker_thread
from tenacity import RetryError

from prefect_aws.workers.ecs_worker import (
_TASK_DEFINITION_CACHE,
Expand Down Expand Up @@ -1900,3 +1901,26 @@ async def test_kill_infrastructure_with_grace_period(aws_credentials, caplog, fl

# Logs warning
assert "grace period of 60s requested, but AWS does not support" in caplog.text


async def test_retry_on_failed_task_start(
aws_credentials: AwsCredentials, flow_run, ecs_mocks
):
run_task_mock = MagicMock(return_value=[])

configuration = await construct_configuration(
aws_credentials=aws_credentials, command="echo test"
)

inject_moto_patches(
ecs_mocks,
{
"run_task": [run_task_mock],
},
)

with pytest.raises(RetryError):
async with ECSWorker(work_pool_name="test") as worker:
await run_then_stop_task(worker, configuration, flow_run)

assert run_task_mock.call_count == 3

0 comments on commit 37d9d21

Please sign in to comment.