From a2070b27c7ca3ff11e2b8e2e7b8ed2db5407ef4e Mon Sep 17 00:00:00 2001 From: Kevin Grismore <146098880+kevingrismore@users.noreply.github.com> Date: Wed, 28 Feb 2024 14:08:30 -0600 Subject: [PATCH 1/5] Add ECS worker option to use most recent revision in task definition family (#370) Co-authored-by: nate nowack --- docs/gen_examples_catalog.py | 2 +- prefect_aws/workers/ecs_worker.py | 141 ++++++++++++++++++------------ tests/workers/test_ecs_worker.py | 91 +++++++++++++++++++ 3 files changed, 177 insertions(+), 57 deletions(-) diff --git a/docs/gen_examples_catalog.py b/docs/gen_examples_catalog.py index ba8b1c7c..9293c427 100644 --- a/docs/gen_examples_catalog.py +++ b/docs/gen_examples_catalog.py @@ -56,7 +56,7 @@ def get_code_examples(obj: Union[ModuleType, Callable]) -> Set[str]: for section in parsed_sections: if section.kind == DocstringSectionKind.examples: code_example = "\n".join( - (part[1] for part in section.as_dict().get("value", [])) + part[1] for part in section.as_dict().get("value", []) ) if not skip_block_load_code_example(code_example): code_examples.add(code_example) diff --git a/prefect_aws/workers/ecs_worker.py b/prefect_aws/workers/ecs_worker.py index 372e6b29..2e63eac4 100644 --- a/prefect_aws/workers/ecs_worker.py +++ b/prefect_aws/workers/ecs_worker.py @@ -267,6 +267,7 @@ class ECSJobConfiguration(BaseJobConfiguration): vpc_id: Optional[str] = Field(default=None) container_name: Optional[str] = Field(default=None) cluster: Optional[str] = Field(default=None) + match_latest_revision_in_family: bool = Field(default=False) @root_validator def task_run_request_requires_arn_if_no_task_definition_given(cls, values) -> dict: @@ -550,6 +551,16 @@ class ECSVariables(BaseVariables): "your AWS account, instead it will be marked as INACTIVE." ), ) + match_latest_revision_in_family: bool = Field( + default=False, + description=( + "If enabled, the most recent active revision in the task definition " + "family will be compared against the desired ECS task configuration. " + "If they are equal, the existing task definition will be used instead " + "of registering a new one. If no family is specified the default family " + f'"{ECS_DEFAULT_FAMILY}" will be used.' + ), + ) class ECSWorkerResult(BaseWorkerResult): @@ -661,55 +672,15 @@ def _create_task_and_wait_for_start( new_task_definition_registered = False if not task_definition_arn: - cached_task_definition_arn = _TASK_DEFINITION_CACHE.get( - flow_run.deployment_id - ) task_definition = self._prepare_task_definition( configuration, region=ecs_client.meta.region_name ) - - if cached_task_definition_arn: - # Read the task definition to see if the cached task definition is valid - try: - cached_task_definition = self._retrieve_task_definition( - logger, ecs_client, cached_task_definition_arn - ) - except Exception as exc: - logger.warning( - "Failed to retrieve cached task definition" - f" {cached_task_definition_arn!r}: {exc!r}" - ) - # Clear from cache - _TASK_DEFINITION_CACHE.pop(flow_run.deployment_id, None) - cached_task_definition_arn = None - else: - if not cached_task_definition["status"] == "ACTIVE": - # Cached task definition is not active - logger.warning( - "Cached task definition" - f" {cached_task_definition_arn!r} is not active" - ) - _TASK_DEFINITION_CACHE.pop(flow_run.deployment_id, None) - cached_task_definition_arn = None - elif not self._task_definitions_equal( - task_definition, cached_task_definition - ): - # Cached task definition is not valid - logger.warning( - "Cached task definition" - f" {cached_task_definition_arn!r} does not meet" - " requirements" - ) - _TASK_DEFINITION_CACHE.pop(flow_run.deployment_id, None) - cached_task_definition_arn = None - - if not cached_task_definition_arn: - task_definition_arn = self._register_task_definition( - logger, ecs_client, task_definition - ) - new_task_definition_registered = True - else: - task_definition_arn = cached_task_definition_arn + ( + task_definition_arn, + new_task_definition_registered, + ) = self._get_or_register_task_definition( + logger, ecs_client, configuration, flow_run, task_definition + ) else: task_definition = self._retrieve_task_definition( logger, ecs_client, task_definition_arn @@ -722,9 +693,6 @@ def _create_task_and_wait_for_start( self._validate_task_definition(task_definition, configuration) - # Update the cached task definition ARN to avoid re-registering the task - # definition on this worker unless necessary; registration is agressively - # rate limited by AWS _TASK_DEFINITION_CACHE[flow_run.deployment_id] = task_definition_arn logger.info(f"Using ECS task definition {task_definition_arn!r}...") @@ -732,7 +700,6 @@ def _create_task_and_wait_for_start( f"Task definition {json.dumps(task_definition, indent=2, default=str)}" ) - # Prepare the task run request task_run_request = self._prepare_task_run_request( configuration, task_definition, @@ -753,7 +720,6 @@ def _create_task_and_wait_for_start( self._report_task_run_creation_failure(configuration, task_run_request, exc) raise - # Raises an exception if the task does not start logger.info("Waiting for ECS task run to start...") self._wait_for_task_start( logger, @@ -766,6 +732,65 @@ def _create_task_and_wait_for_start( return task_arn, cluster_arn, task_definition, new_task_definition_registered + def _get_or_register_task_definition( + self, + logger: logging.Logger, + ecs_client: _ECSClient, + configuration: ECSJobConfiguration, + flow_run: FlowRun, + task_definition: dict, + ) -> Tuple[str, bool]: + """Get or register a task definition for the given flow run. + + Returns a tuple of the task definition ARN and a bool indicating if the task + definition is newly registered. + """ + + cached_task_definition_arn = _TASK_DEFINITION_CACHE.get(flow_run.deployment_id) + new_task_definition_registered = False + + if cached_task_definition_arn: + try: + cached_task_definition = self._retrieve_task_definition( + logger, ecs_client, cached_task_definition_arn + ) + if not cached_task_definition[ + "status" + ] == "ACTIVE" or not self._task_definitions_equal( + task_definition, cached_task_definition + ): + cached_task_definition_arn = None + except Exception: + cached_task_definition_arn = None + + if ( + not cached_task_definition_arn + and configuration.match_latest_revision_in_family + ): + family_name = task_definition.get("family", ECS_DEFAULT_FAMILY) + try: + task_definition_from_family = self._retrieve_task_definition_by_family( + logger, ecs_client, family_name + ) + if task_definition_from_family and self._task_definitions_equal( + task_definition, task_definition_from_family + ): + cached_task_definition_arn = task_definition_from_family[ + "taskDefinitionArn" + ] + except Exception: + pass + + if not cached_task_definition_arn: + task_definition_arn = self._register_task_definition( + logger, ecs_client, task_definition + ) + new_task_definition_registered = True + else: + task_definition_arn = cached_task_definition_arn + + return task_definition_arn, new_task_definition_registered + def _watch_task_and_get_exit_code( self, logger: logging.Logger, @@ -928,15 +953,19 @@ def _retrieve_task_definition( self, logger: logging.Logger, ecs_client: _ECSClient, - task_definition_arn: str, + task_definition: str, ): """ Retrieve an existing task definition from AWS. """ - logger.info(f"Retrieving ECS task definition {task_definition_arn!r}...") - response = ecs_client.describe_task_definition( - taskDefinition=task_definition_arn - ) + if task_definition.startswith("arn:aws:ecs:"): + logger.info(f"Retrieving ECS task definition {task_definition!r}...") + else: + logger.info( + "Retrieving most recent active revision from " + f"ECS task family {task_definition!r}..." + ) + response = ecs_client.describe_task_definition(taskDefinition=task_definition) return response["taskDefinition"] def _wait_for_task_start( diff --git a/tests/workers/test_ecs_worker.py b/tests/workers/test_ecs_worker.py index c9fbaabf..702f2ddd 100644 --- a/tests/workers/test_ecs_worker.py +++ b/tests/workers/test_ecs_worker.py @@ -1415,6 +1415,97 @@ async def test_deregister_task_definition_does_not_apply_to_linked_arn( describe_task_definition(ecs_client, task)["status"] == "ACTIVE" +@pytest.mark.usefixtures("ecs_mocks") +async def test_match_latest_revision_in_family( + aws_credentials: AwsCredentials, flow_run: FlowRun +): + session = aws_credentials.get_boto3_session() + ecs_client = session.client("ecs") + + configuration_1 = await construct_configuration( + aws_credentials=aws_credentials, + ) + + configuration_2 = await construct_configuration( + aws_credentials=aws_credentials, + execution_role_arn="test", + ) + + configuration_3 = await construct_configuration( + aws_credentials=aws_credentials, + match_latest_revision_in_family=True, + execution_role_arn="test", + ) + + # Let the first worker run and register two task definitions + async with ECSWorker(work_pool_name="test") as worker: + await run_then_stop_task(worker, configuration_1, flow_run) + result_1 = await run_then_stop_task(worker, configuration_2, flow_run) + + # Start a new worker with an empty cache + async with ECSWorker(work_pool_name="test") as worker: + result_2 = await run_then_stop_task(worker, configuration_3, flow_run) + + assert result_1.status_code == 0 + _, task_arn_1 = parse_identifier(result_1.identifier) + + assert result_2.status_code == 0 + _, task_arn_2 = parse_identifier(result_2.identifier) + + task_1 = describe_task(ecs_client, task_arn_1) + task_2 = describe_task(ecs_client, task_arn_2) + + assert task_1["taskDefinitionArn"] == task_2["taskDefinitionArn"] + assert task_2["taskDefinitionArn"].endswith(":2") + + +@pytest.mark.usefixtures("ecs_mocks") +async def test_match_latest_revision_in_family_custom_family( + aws_credentials: AwsCredentials, flow_run: FlowRun +): + session = aws_credentials.get_boto3_session() + ecs_client = session.client("ecs") + + configuration_1 = await construct_configuration( + aws_credentials=aws_credentials, + family="test-family", + ) + + configuration_2 = await construct_configuration( + aws_credentials=aws_credentials, + execution_role_arn="test", + family="test-family", + ) + + configuration_3 = await construct_configuration( + aws_credentials=aws_credentials, + match_latest_revision_in_family=True, + execution_role_arn="test", + family="test-family", + ) + + # Let the first worker run and register two task definitions + async with ECSWorker(work_pool_name="test") as worker: + await run_then_stop_task(worker, configuration_1, flow_run) + result_1 = await run_then_stop_task(worker, configuration_2, flow_run) + + # Start a new worker with an empty cache + async with ECSWorker(work_pool_name="test") as worker: + result_2 = await run_then_stop_task(worker, configuration_3, flow_run) + + assert result_1.status_code == 0 + _, task_arn_1 = parse_identifier(result_1.identifier) + + assert result_2.status_code == 0 + _, task_arn_2 = parse_identifier(result_2.identifier) + + task_1 = describe_task(ecs_client, task_arn_1) + task_2 = describe_task(ecs_client, task_arn_2) + + assert task_1["taskDefinitionArn"] == task_2["taskDefinitionArn"] + assert task_2["taskDefinitionArn"].endswith(":2") + + @pytest.mark.usefixtures("ecs_mocks") async def test_worker_caches_registered_task_definitions( aws_credentials: AwsCredentials, flow_run: FlowRun From c24bf1408f86c94b444a5c4737ee4229f7e697aa Mon Sep 17 00:00:00 2001 From: James Martin Date: Tue, 5 Mar 2024 01:37:39 +1100 Subject: [PATCH 2/5] Fixed Batch docs to reference the correct order of arguments for the batch_submit task (#391) --- prefect_aws/batch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/prefect_aws/batch.py b/prefect_aws/batch.py index 0dc57810..8abfb50a 100644 --- a/prefect_aws/batch.py +++ b/prefect_aws/batch.py @@ -21,8 +21,8 @@ async def batch_submit( Args: job_name: The AWS batch job name. - job_definition: The AWS batch job definition. job_queue: Name of the AWS batch job queue. + job_definition: The AWS batch job definition. aws_credentials: Credentials to use for authentication with AWS. **batch_kwargs: Additional keyword arguments to pass to the boto3 `submit_job` function. See the documentation for @@ -49,8 +49,8 @@ def example_batch_submit_flow(): ) job_id = batch_submit( "job_name", - "job_definition", "job_queue", + "job_definition", aws_credentials ) return job_id From 72c106fe6fb0dde5bff9ac321d5be5b84b0eb3db Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Wed, 6 Mar 2024 13:17:52 -0600 Subject: [PATCH 3/5] Adds porting of network configuration to generated base job templates (#392) --- prefect_aws/ecs.py | 19 ++++++++++ prefect_aws/utilities.py | 81 ++++++++++++++++++++++++++++++++++++++++ tests/test_ecs.py | 28 ++++++++++---- tests/test_utilities.py | 59 ++++++++++++++++++++++++++++- 4 files changed, 178 insertions(+), 9 deletions(-) diff --git a/prefect_aws/ecs.py b/prefect_aws/ecs.py index ac5dd333..6368748f 100644 --- a/prefect_aws/ecs.py +++ b/prefect_aws/ecs.py @@ -126,6 +126,8 @@ from prefect.utilities.pydantic import JsonPatch from pydantic import VERSION as PYDANTIC_VERSION +from prefect_aws.utilities import assemble_document_for_patches + if PYDANTIC_VERSION.startswith("2."): from pydantic.v1 import Field, root_validator, validator else: @@ -739,6 +741,23 @@ async def generate_work_pool_base_job_template(self) -> dict: ) if self.task_customizations: + network_config_patches = JsonPatch( + [ + patch + for patch in self.task_customizations + if "networkConfiguration" in patch["path"] + ] + ) + minimal_network_config = assemble_document_for_patches( + network_config_patches + ) + if minimal_network_config: + minimal_network_config_with_patches = network_config_patches.apply( + minimal_network_config + ) + base_job_template["variables"]["properties"]["network_configuration"][ + "default" + ] = minimal_network_config_with_patches["networkConfiguration"] try: base_job_template["job_configuration"]["task_run_request"] = ( self.task_customizations.apply( diff --git a/prefect_aws/utilities.py b/prefect_aws/utilities.py index ad1e6ed2..33b6cdc6 100644 --- a/prefect_aws/utilities.py +++ b/prefect_aws/utilities.py @@ -1,5 +1,7 @@ """Utilities for working with AWS services.""" +from typing import Dict, List, Union + from prefect.utilities.collections import visit_collection @@ -33,3 +35,82 @@ def make_hashable(item): collection, visit_fn=make_hashable, return_data=True ) return hash(hashable_collection) + + +def ensure_path_exists(doc: Union[Dict, List], path: List[str]): + """ + Ensures the path exists in the document, creating empty dictionaries or lists as + needed. + + Args: + doc: The current level of the document or sub-document. + path: The remaining path parts to ensure exist. + """ + if not path: + return + current_path = path.pop(0) + # Check if the next path part exists and is a digit + next_path_is_digit = path and path[0].isdigit() + + # Determine if the current path is for an array or an object + if isinstance(doc, list): # Path is for an array index + current_path = int(current_path) + # Ensure the current level of the document is a list and long enough + + while len(doc) <= current_path: + doc.append({}) + next_level = doc[current_path] + else: # Path is for an object + if current_path not in doc or ( + next_path_is_digit and not isinstance(doc.get(current_path), list) + ): + doc[current_path] = [] if next_path_is_digit else {} + next_level = doc[current_path] + + ensure_path_exists(next_level, path) + + +def assemble_document_for_patches(patches): + """ + Assembles an initial document that can successfully accept the given JSON Patch + operations. + + Args: + patches: A list of JSON Patch operations. + + Returns: + An initial document structured to accept the patches. + + Example: + + ```python + patches = [ + {"op": "replace", "path": "/name", "value": "Jane"}, + {"op": "add", "path": "/contact/address", "value": "123 Main St"}, + {"op": "remove", "path": "/age"} + ] + + initial_document = assemble_document_for_patches(patches) + + #output + { + "name": {}, + "contact": {}, + "age": {} + } + ``` + """ + document = {} + + for patch in patches: + operation = patch["op"] + path = patch["path"].lstrip("/").split("/") + + if operation == "add": + # Ensure all but the last element of the path exists + ensure_path_exists(document, path[:-1]) + elif operation in ["remove", "replace"]: + # For remove adn replace, the entire path should exist + ensure_path_exists(document, path) + + return document diff --git a/tests/test_ecs.py b/tests/test_ecs.py index 2f970116..a81c446b 100644 --- a/tests/test_ecs.py +++ b/tests/test_ecs.py @@ -2128,6 +2128,15 @@ def base_job_template_with_defaults(default_base_job_template, aws_credentials): base_job_template_with_defaults["variables"]["properties"][ "auto_deregister_task_definition" ]["default"] = False + base_job_template_with_defaults["variables"]["properties"]["network_configuration"][ + "default" + ] = { + "awsvpcConfiguration": { + "subnets": ["subnet-***"], + "assignPublicIp": "DISABLED", + "securityGroups": ["sg-***"], + } + } return base_job_template_with_defaults @@ -2188,10 +2197,20 @@ async def test_generate_work_pool_base_job_template( cpu=2048, memory=4096, task_customizations=[ + { + "op": "replace", + "path": "/networkConfiguration/awsvpcConfiguration/assignPublicIp", + "value": "DISABLED", + }, + { + "op": "add", + "path": "/networkConfiguration/awsvpcConfiguration/subnets", + "value": ["subnet-***"], + }, { "op": "add", "path": "/networkConfiguration/awsvpcConfiguration/securityGroups", - "value": ["sg-d72e9599956a084f5"], + "value": ["sg-***"], }, ], family="test-family", @@ -2229,10 +2248,3 @@ async def test_generate_work_pool_base_job_template( template = await job.generate_work_pool_base_job_template() assert template == expected_template - - if job_config == "custom": - assert ( - "Unable to apply task customizations to the base job template." - "You may need to update the template manually." - in caplog.text - ) diff --git a/tests/test_utilities.py b/tests/test_utilities.py index 0e0fdc6f..cecf863f 100644 --- a/tests/test_utilities.py +++ b/tests/test_utilities.py @@ -1,6 +1,10 @@ import pytest -from prefect_aws.utilities import hash_collection +from prefect_aws.utilities import ( + assemble_document_for_patches, + ensure_path_exists, + hash_collection, +) class TestHashCollection: @@ -32,3 +36,56 @@ def test_unhashable_structure(self): assert hash_collection(typically_unhashable_structure) == hash_collection( typically_unhashable_structure ), "Unhashable structure hashing failed after transformation" + + +class TestAssembleDocumentForPatches: + def test_initial_document(self): + patches = [ + {"op": "replace", "path": "/name", "value": "Jane"}, + {"op": "add", "path": "/contact/address", "value": "123 Main St"}, + {"op": "remove", "path": "/age"}, + ] + + initial_document = assemble_document_for_patches(patches) + + expected_document = {"name": {}, "contact": {}, "age": {}} + + assert initial_document == expected_document, "Initial document assembly failed" + + +class TestEnsurePathExists: + def test_existing_path(self): + doc = {"key1": {"subkey1": "value1"}} + path = ["key1", "subkey1"] + ensure_path_exists(doc, path) + assert doc == { + "key1": {"subkey1": "value1"} + }, "Existing path modification failed" + + def test_new_path_object(self): + doc = {} + path = ["key1", "subkey1"] + ensure_path_exists(doc, path) + assert doc == {"key1": {"subkey1": {}}}, "New path creation for object failed" + + def test_new_path_array(self): + doc = {} + path = ["key1", "0"] + ensure_path_exists(doc, path) + assert doc == {"key1": [{}]}, "New path creation for array failed" + + def test_existing_path_array(self): + doc = {"key1": [{"subkey1": "value1"}]} + path = ["key1", "0", "subkey1"] + ensure_path_exists(doc, path) + assert doc == { + "key1": [{"subkey1": "value1"}] + }, "Existing path modification for array failed" + + def test_existing_path_array_index_out_of_range(self): + doc = {"key1": []} + path = ["key1", "0", "subkey1"] + ensure_path_exists(doc, path) + assert doc == { + "key1": [{"subkey1": {}}] + }, "Existing path modification for array index out of range failed" From 1add0b98107307c11240d470cc085293b182f1b6 Mon Sep 17 00:00:00 2001 From: Kevin Grismore <146098880+kevingrismore@users.noreply.github.com> Date: Tue, 12 Mar 2024 13:54:01 -0500 Subject: [PATCH 4/5] Call existing function for getting recent revision from family (#393) --- prefect_aws/workers/ecs_worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/prefect_aws/workers/ecs_worker.py b/prefect_aws/workers/ecs_worker.py index 2e63eac4..edbe843d 100644 --- a/prefect_aws/workers/ecs_worker.py +++ b/prefect_aws/workers/ecs_worker.py @@ -769,7 +769,7 @@ def _get_or_register_task_definition( ): family_name = task_definition.get("family", ECS_DEFAULT_FAMILY) try: - task_definition_from_family = self._retrieve_task_definition_by_family( + task_definition_from_family = self._retrieve_task_definition( logger, ecs_client, family_name ) if task_definition_from_family and self._task_definitions_equal( @@ -779,7 +779,7 @@ def _get_or_register_task_definition( "taskDefinitionArn" ] except Exception: - pass + cached_task_definition_arn = None if not cached_task_definition_arn: task_definition_arn = self._register_task_definition( From 5fc8a3a2864dbbfc15faab3acf6afaa524adf584 Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Fri, 15 Mar 2024 08:14:57 -0500 Subject: [PATCH 5/5] Deprecate `ECSTask` block (#395) --- prefect_aws/ecs.py | 19 ++++++++++++++++++- requirements.txt | 2 +- tests/test_ecs.py | 15 +++++++++++++++ 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/prefect_aws/ecs.py b/prefect_aws/ecs.py index 6368748f..b0227bf6 100644 --- a/prefect_aws/ecs.py +++ b/prefect_aws/ecs.py @@ -1,4 +1,11 @@ """ +DEPRECATION WARNING: + +This module is deprecated as of March 2024 and will not be available after September 2024. +It has been replaced by the ECS worker, which offers enhanced functionality and better performance. + +For upgrade instructions, see https://docs.prefect.io/latest/guides/upgrade-guide-agents-to-workers/. + Integrations with the Amazon Elastic Container Service. Examples: @@ -102,7 +109,8 @@ ], ) ``` -""" +""" # noqa + import copy import difflib import json @@ -118,6 +126,7 @@ import yaml from anyio.abc import TaskStatus from jsonpointer import JsonPointerException +from prefect._internal.compatibility.deprecated import deprecated_class from prefect.blocks.core import BlockNotSavedError from prefect.exceptions import InfrastructureNotAvailable, InfrastructureNotFound from prefect.infrastructure.base import Infrastructure, InfrastructureResult @@ -205,6 +214,14 @@ def _pretty_diff(d1: dict, d2: dict) -> str: ) +@deprecated_class( + start_date="Mar 2024", + help=( + "Use the ECS worker instead." + " Refer to the upgrade guide for more information:" + " https://docs.prefect.io/latest/guides/upgrade-guide-agents-to-workers/." + ), +) class ECSTask(Infrastructure): """ Run a command as an ECS task. diff --git a/requirements.txt b/requirements.txt index e5cfb0b0..4a764f81 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,5 +2,5 @@ boto3>=1.24.53 botocore>=1.27.53 mypy_boto3_s3>=1.24.94 mypy_boto3_secretsmanager>=1.26.49 -prefect>=2.14.10 +prefect>=2.16.4 tenacity>=8.0.0 \ No newline at end of file diff --git a/tests/test_ecs.py b/tests/test_ecs.py index a81c446b..6c429e9f 100644 --- a/tests/test_ecs.py +++ b/tests/test_ecs.py @@ -12,6 +12,7 @@ from botocore.exceptions import ClientError from moto import mock_ec2, mock_ecs, mock_logs from moto.ec2.utils import generate_instance_identity_document +from prefect._internal.compatibility.deprecated import PrefectDeprecationWarning from prefect.exceptions import InfrastructureNotAvailable, InfrastructureNotFound from prefect.logging.configuration import setup_logging from prefect.server.schemas.core import Deployment, Flow, FlowRun @@ -35,6 +36,20 @@ parse_task_identifier, ) + +def test_ecs_task_emits_deprecation_warning(): + with pytest.warns( + PrefectDeprecationWarning, + match=( + "prefect_aws.ecs.ECSTask has been deprecated." + " It will not be available after Sep 2024." + " Use the ECS worker instead." + " Refer to the upgrade guide for more information" + ), + ): + ECSTask() + + setup_logging()