From d02502a96b100440fb8ab15ec33ccb66fcf4e021 Mon Sep 17 00:00:00 2001 From: Kevin Cameron Grismore Date: Mon, 22 Jan 2024 00:24:51 -0600 Subject: [PATCH] add test --- tests/workers/test_ecs_worker.py | 45 ++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/tests/workers/test_ecs_worker.py b/tests/workers/test_ecs_worker.py index 077a178a..dc775970 100644 --- a/tests/workers/test_ecs_worker.py +++ b/tests/workers/test_ecs_worker.py @@ -1414,6 +1414,51 @@ async def test_deregister_task_definition_does_not_apply_to_linked_arn( describe_task_definition(ecs_client, task)["status"] == "ACTIVE" +@pytest.mark.usefixtures("ecs_mocks") +async def test_match_latest_revision_in_family( + aws_credentials: AwsCredentials, flow_run: FlowRun +): + session = aws_credentials.get_boto3_session() + ecs_client = session.client("ecs") + + configuration_1 = await construct_configuration( + aws_credentials=aws_credentials, + match_latest_revision_in_family=True, + ) + + configuration_2 = await construct_configuration( + aws_credentials=aws_credentials, + execution_role_arn="test", + ) + + configuration_3 = await construct_configuration( + aws_credentials=aws_credentials, + match_latest_revision_in_family=True, + execution_role_arn="test", + ) + + # Let the first worker run and register two task definitions + async with ECSWorker(work_pool_name="test") as worker: + await run_then_stop_task(worker, configuration_1, flow_run) + result_1 = await run_then_stop_task(worker, configuration_2, flow_run) + + # Start a new worker with an empty cache + async with ECSWorker(work_pool_name="test") as worker: + result_2 = await run_then_stop_task(worker, configuration_3, flow_run) + + assert result_1.status_code == 0 + _, task_arn_1 = parse_identifier(result_1.identifier) + + assert result_2.status_code == 0 + _, task_arn_2 = parse_identifier(result_2.identifier) + + task_1 = describe_task(ecs_client, task_arn_1) + task_2 = describe_task(ecs_client, task_arn_2) + + assert task_1["taskDefinitionArn"] == task_2["taskDefinitionArn"] + assert task_2["taskDefinitionArn"].endswith(":2") + + @pytest.mark.usefixtures("ecs_mocks") async def test_worker_caches_registered_task_definitions( aws_credentials: AwsCredentials, flow_run: FlowRun