From b6eb321bd65ffb7b1cff1a357540a58de7a66d88 Mon Sep 17 00:00:00 2001 From: urimandujano Date: Tue, 31 Oct 2023 09:34:03 -0500 Subject: [PATCH 01/13] 'Update the pin on `prefect` version' (#331) --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 40b6007d..919ce567 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,5 +2,5 @@ boto3>=1.24.53 botocore>=1.27.53 mypy_boto3_s3>=1.24.94 mypy_boto3_secretsmanager>=1.26.49 -prefect>=2.10.11 +prefect>=2.13.5 tenacity>=8.0.0 \ No newline at end of file From 3a5124fc383d94da6342af2b86588a1766c63591 Mon Sep 17 00:00:00 2001 From: urimandujano Date: Thu, 2 Nov 2023 11:56:12 -0500 Subject: [PATCH 02/13] Bumps build's python version (#334) --- .github/workflows/release.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index cb1465e9..4b31753b 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -15,7 +15,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v4 with: - python-version: 3.7 + python-version: 3.8 - name: Install packages run: | From f8a26eb0edc0603077da5ddd365b1e131ff6888b Mon Sep 17 00:00:00 2001 From: Justin T Date: Mon, 6 Nov 2023 20:22:20 +0100 Subject: [PATCH 03/13] fix ssl default (#328) Co-authored-by: Alexander Streed --- CHANGELOG.md | 8 ++++++++ prefect_aws/deployments/steps.py | 2 +- tests/deploments/test_steps.py | 4 ++-- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ec431ba7..d6d86da1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Removed +## 0.4.2 + +Released November 6th, 2023. + +### Fixed + +- Fixed use_ssl default for s3 client. + ## 0.4.1 Released October 13th, 2023. diff --git a/prefect_aws/deployments/steps.py b/prefect_aws/deployments/steps.py index 77930465..7525a5e2 100644 --- a/prefect_aws/deployments/steps.py +++ b/prefect_aws/deployments/steps.py @@ -220,7 +220,7 @@ def get_s3_client( aws_client_parameters = credentials.get("aws_client_parameters", client_parameters) api_version = aws_client_parameters.get("api_version", None) endpoint_url = aws_client_parameters.get("endpoint_url", None) - use_ssl = aws_client_parameters.get("use_ssl", None) + use_ssl = aws_client_parameters.get("use_ssl", True) verify = aws_client_parameters.get("verify", None) config_params = aws_client_parameters.get("config", {}) config = Config(**config_params) diff --git a/tests/deploments/test_steps.py b/tests/deploments/test_steps.py index c78c3578..22608bd7 100644 --- a/tests/deploments/test_steps.py +++ b/tests/deploments/test_steps.py @@ -228,7 +228,7 @@ def test_s3_session_with_params(): assert { "api_version": "v1", "endpoint_url": None, - "use_ssl": None, + "use_ssl": True, "verify": None, }.items() <= all_calls[1].kwargs.items() assert all_calls[1].kwargs.get("config").connect_timeout == 300 @@ -244,7 +244,7 @@ def test_s3_session_with_params(): assert { "api_version": None, "endpoint_url": None, - "use_ssl": None, + "use_ssl": True, "verify": None, }.items() <= all_calls[3].kwargs.items() assert all_calls[3].kwargs.get("config").connect_timeout == 60 From 679fad1c6a55a30240212302d0f579d42dbab445 Mon Sep 17 00:00:00 2001 From: nick-amplify <131465708+nick-amplify@users.noreply.github.com> Date: Mon, 13 Nov 2023 10:37:02 -0500 Subject: [PATCH 04/13] Read AWS Secrets with 'SecretString' (#274) Co-authored-by: ntorba <32570754+ntorba@users.noreply.github.com> Co-authored-by: Alexander Streed Co-authored-by: Alexander Streed --- CHANGELOG.md | 1 + prefect_aws/secrets_manager.py | 5 ++++- tests/test_secrets_manager.py | 7 +++++++ 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d6d86da1..d4e01351 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added ### Changed +- Added 'SecretBrinary' suport to `AwsSecret` block - [#274](https://github.com/PrefectHQ/prefect-aws/pull/274) ### Fixed diff --git a/prefect_aws/secrets_manager.py b/prefect_aws/secrets_manager.py index 9b93e867..c043f28c 100644 --- a/prefect_aws/secrets_manager.py +++ b/prefect_aws/secrets_manager.py @@ -411,7 +411,10 @@ async def read_secret( response = await run_sync_in_worker_thread( client.get_secret_value, SecretId=self.secret_name, **read_kwargs ) - secret = response["SecretBinary"] + if "SecretBinary" in response: + secret = response["SecretBinary"] + elif "SecretString" in response: + secret = response["SecretString"] arn = response["ARN"] self.logger.info(f"The secret {arn!r} data was successfully read.") return secret diff --git a/tests/test_secrets_manager.py b/tests/test_secrets_manager.py index 03d0a08b..654f0576 100644 --- a/tests/test_secrets_manager.py +++ b/tests/test_secrets_manager.py @@ -199,3 +199,10 @@ def test_delete_secret_recovery_window(self, aws_secret: AwsSecret): ValueError, match="Recovery window must be between 7 and 30 days" ): aws_secret.delete_secret(recovery_window_in_days=42) + + async def test_read_secret(self, secret_under_test, aws_credentials): + secret = AwsSecret( + aws_credentials=aws_credentials, + secret_name=secret_under_test["secret_name"], + ) + assert await secret.read_secret() == secret_under_test["expected_value"] From 30e358fb9e0f617e13fd4eda5ccbc7b5572cb6a8 Mon Sep 17 00:00:00 2001 From: kevingrismore <146098880+kevingrismore@users.noreply.github.com> Date: Wed, 15 Nov 2023 11:55:43 -0500 Subject: [PATCH 05/13] ecs guide fixes and clarifications (#329) Co-authored-by: nate nowack --- docs/ecs_guide.md | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/docs/ecs_guide.md b/docs/ecs_guide.md index 33bcfccb..5ac31ae2 100644 --- a/docs/ecs_guide.md +++ b/docs/ecs_guide.md @@ -182,22 +182,14 @@ To create an [IAM role](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_role "containerDefinitions": [ { "name": "prefect-worker", - "image": "prefecthq/prefect", + "image": "prefecthq/prefect:2-latest", "cpu": 512, "memory": 1024, "essential": true, "command": [ - "pip", - "install", - "prefect-aws", - "&&", - "prefect", - "worker", - "start", - "--pool", - "my-ecs-pool", - "--type", - "ecs" + "/bin/sh", + "-c", + "pip install prefect-aws && prefect worker start --pool my-ecs-pool --type ecs" ], "environment": [ { @@ -218,7 +210,7 @@ To create an [IAM role](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_role - For the `PREFECT_API_KEY`, individuals on the organization tier can create a [service account](https://docs.prefect.io/latest/cloud/users/service-accounts/) for the worker. If on a personal tier, you can pass a user’s API key. - - Replace `` with the ARN of the IAM role you created in Step 1. + - Replace both instances of `` with the ARN of the IAM role you created in Step 2. - Notice that the CPU and Memory allocations are relatively small. The worker's main responsibility is to submit work through API calls to AWS, _not_ to execute your Prefect flow code. From 36b203dff4b04b988251a6c1bbdbba1c28b210c6 Mon Sep 17 00:00:00 2001 From: Jeff Hale Date: Thu, 16 Nov 2023 15:08:06 -0500 Subject: [PATCH 06/13] Update ecs guide to remove pinned Prefect docs references (#338) --- docs/ecs_guide.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/ecs_guide.md b/docs/ecs_guide.md index 5ac31ae2..d188a89a 100644 --- a/docs/ecs_guide.md +++ b/docs/ecs_guide.md @@ -11,7 +11,7 @@ ECS (Elastic Container Service) tasks are a good option for executing Prefect 2 ## ECS Flow Run Execution -Prefect enables remote flow execution via [workers](https://docs.prefect.io/2.11.1/concepts/work-pools/#worker-overview) and [work pools](https://docs.prefect.io/2.11.1/concepts/work-pools/#work-pool-overview). To learn more about these concepts please see our [deployment tutorial](https://docs.prefect.io/2.11.1/tutorial/deployments/). +Prefect enables remote flow execution via [workers](https://docs.prefect.io/concepts/work-pools/#worker-overview) and [work pools](https://docs.prefect.io/concepts/work-pools/#work-pool-overview). To learn more about these concepts please see our [deployment tutorial](https://docs.prefect.io/tutorial/deployments/). For details on how workers and work pools are implemented for ECS, see the diagram below: #### Architecture Diagram @@ -288,10 +288,10 @@ To create an [IAM role](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_role - Do your flow runs require higher `CPU`? - Would an EC2 `Launch Type` speed up your flow run execution? - These infrastructure configuration values can be set on your ECS work pool or they can be overridden on the deployment level through [job_variables](https://docs.prefect.io/2.11.0/concepts/infrastructure/#kubernetesjob-overrides-and-customizations) if desired. + These infrastructure configuration values can be set on your ECS work pool or they can be overridden on the deployment level through [job_variables](https://docs.prefect.io/concepts/infrastructure/#kubernetesjob-overrides-and-customizations) if desired. -2. Consider adding a [build action](https://docs.prefect.io/2.11.0/concepts/deployments-ux/#the-build-action) to your Prefect Project [`prefect.yaml`](https://docs.prefect.io/2.11.0/concepts/deployments-ux/#the-prefect-yaml-file) if you want to automatically build a Docker image and push it to an image registry `prefect deploy` is run. +2. Consider adding a [build action](https://docs.prefect.io/concepts/deployments-ux/#the-build-action) to your Prefect Project [`prefect.yaml`](https://docs.prefect.io/concepts/deployments-ux/#the-prefect-yaml-file) if you want to automatically build a Docker image and push it to an image registry `prefect deploy` is run. Here is an example build action for ECR: ```yaml From 5735a492a421c924c0541f35463869a32b434131 Mon Sep 17 00:00:00 2001 From: jakekaplan <40362401+jakekaplan@users.noreply.github.com> Date: Mon, 27 Nov 2023 15:40:10 -0500 Subject: [PATCH 07/13] mask prefect api key (#341) --- prefect_aws/workers/ecs_worker.py | 29 ++++++++++++++++++++++++++++- tests/workers/test_ecs_worker.py | 25 +++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/prefect_aws/workers/ecs_worker.py b/prefect_aws/workers/ecs_worker.py index 02c4117c..afbe631b 100644 --- a/prefect_aws/workers/ecs_worker.py +++ b/prefect_aws/workers/ecs_worker.py @@ -221,6 +221,31 @@ def parse_identifier(identifier: str) -> ECSIdentifier: return ECSIdentifier(cluster, task) +def mask_sensitive_env_values( + task_run_request: dict, values: List[str], keep_length=3, replace_with="***" +): + for container in task_run_request.get("overrides", {}).get( + "containerOverrides", [] + ): + for env_var in container.get("environment", []): + if ( + "name" not in env_var + or "value" not in env_var + or env_var["name"] not in values + ): + continue + if len(env_var["value"]) > keep_length: + # Replace characters beyond the keep length + env_var["value"] = env_var["value"][:keep_length] + replace_with + return task_run_request + + +def mask_api_key(task_run_request): + return mask_sensitive_env_values( + task_run_request, ["PREFECT_API_KEY"], keep_length=6 + ) + + class ECSJobConfiguration(BaseJobConfiguration): """ Job configuration for an ECS worker. @@ -724,8 +749,10 @@ def _create_task_and_wait_for_start( logger.info("Creating ECS task run...") logger.debug( - f"Task run request {json.dumps(task_run_request, indent=2, default=str)}" + "Task run request" + f"{json.dumps(mask_api_key(task_run_request), indent=2, default=str)}" ) + try: task = self._create_task_run(ecs_client, task_run_request) task_arn = task["taskArn"] diff --git a/tests/workers/test_ecs_worker.py b/tests/workers/test_ecs_worker.py index b6a39b35..077a178a 100644 --- a/tests/workers/test_ecs_worker.py +++ b/tests/workers/test_ecs_worker.py @@ -34,6 +34,7 @@ InfrastructureNotFound, _get_container, get_prefect_image_name, + mask_sensitive_env_values, parse_identifier, ) @@ -2180,3 +2181,27 @@ async def test_retry_on_failed_task_start( await run_then_stop_task(worker, configuration, flow_run) assert run_task_mock.call_count == 3 + + +async def test_mask_sensitive_env_values(): + task_run_request = { + "overrides": { + "containerOverrides": [ + { + "environment": [ + {"name": "PREFECT_API_KEY", "value": "SeNsItiVe VaLuE"}, + {"name": "PREFECT_API_URL", "value": "NORMAL_VALUE"}, + ] + } + ] + } + } + + res = mask_sensitive_env_values(task_run_request, ["PREFECT_API_KEY"], 3, "***") + assert ( + res["overrides"]["containerOverrides"][0]["environment"][0]["value"] == "SeN***" + ) + assert ( + res["overrides"]["containerOverrides"][0]["environment"][1]["value"] + == "NORMAL_VALUE" + ) From 6ba701f4208c33f5c40e035f08a956deb7285432 Mon Sep 17 00:00:00 2001 From: jakekaplan <40362401+jakekaplan@users.noreply.github.com> Date: Thu, 30 Nov 2023 10:34:15 -0500 Subject: [PATCH 08/13] dont modify task run request (#347) --- prefect_aws/workers/ecs_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prefect_aws/workers/ecs_worker.py b/prefect_aws/workers/ecs_worker.py index afbe631b..c55e0f30 100644 --- a/prefect_aws/workers/ecs_worker.py +++ b/prefect_aws/workers/ecs_worker.py @@ -242,7 +242,7 @@ def mask_sensitive_env_values( def mask_api_key(task_run_request): return mask_sensitive_env_values( - task_run_request, ["PREFECT_API_KEY"], keep_length=6 + deepcopy(task_run_request), ["PREFECT_API_KEY"], keep_length=6 ) From 6865af76b7f0a0555eefc63fce41721fcf827407 Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Thu, 30 Nov 2023 12:15:19 -0600 Subject: [PATCH 09/13] Update CHANGELOG.md for v0.4.5 (#348) --- CHANGELOG.md | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d4e01351..514f2ffa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added ### Changed -- Added 'SecretBrinary' suport to `AwsSecret` block - [#274](https://github.com/PrefectHQ/prefect-aws/pull/274) ### Fixed @@ -18,6 +17,30 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Removed +## 0.4.5 + +Released November 30th, 2023. + +### Fixed + +- Bug where Prefect API key provided to ECS tasks was masked - [#347](https://github.com/PrefectHQ/prefect-aws/pull/347) + +## 0.4.4 + +Released November 29th, 2023. + +### Changed + +- Mask Prefect API key in logs - [#341](https://github.com/PrefectHQ/prefect-aws/pull/341) + +## 0.4.3 + +Released November 13th, 2023. + +### Added + +- `SecretBrinary` suport to `AwsSecret` block - [#274](https://github.com/PrefectHQ/prefect-aws/pull/274) + ## 0.4.2 Released November 6th, 2023. From 6aca613bf901d04541f53dc61874b732051448eb Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Mon, 11 Dec 2023 12:45:13 -0600 Subject: [PATCH 10/13] Adds ability to publish `ECSTask` block as a `ecs` work pool (#353) --- CHANGELOG.md | 8 ++ prefect_aws/ecs.py | 74 +++++++++++++++++- requirements.txt | 2 +- tests/conftest.py | 4 +- tests/test_ecs.py | 189 +++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 274 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 514f2ffa..653b1d94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Removed +## 0.4.6 + +Released December 11th, 2023. + +### Added + +Ability to publish `ECSTask`` block as an ecs work pool - [#353](https://github.com/PrefectHQ/prefect-aws/pull/353) + ## 0.4.5 Released November 30th, 2023. diff --git a/prefect_aws/ecs.py b/prefect_aws/ecs.py index a6ebe206..8e7052f2 100644 --- a/prefect_aws/ecs.py +++ b/prefect_aws/ecs.py @@ -108,6 +108,7 @@ import json import logging import pprint +import shlex import sys import time import warnings @@ -116,6 +117,8 @@ import boto3 import yaml from anyio.abc import TaskStatus +from jsonpointer import JsonPointerException +from prefect.blocks.core import BlockNotSavedError from prefect.exceptions import InfrastructureNotAvailable, InfrastructureNotFound from prefect.infrastructure.base import Infrastructure, InfrastructureResult from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible @@ -132,7 +135,7 @@ from typing_extensions import Literal, Self from prefect_aws import AwsCredentials -from prefect_aws.workers.ecs_worker import _TAG_REGEX +from prefect_aws.workers.ecs_worker import _TAG_REGEX, ECSWorker # Internal type alias for ECS clients which are generated dynamically in botocore _ECSClient = Any @@ -681,6 +684,75 @@ async def kill(self, identifier: str, grace_seconds: int = 30) -> None: cluster, task = parse_task_identifier(identifier) await run_sync_in_worker_thread(self._stop_task, cluster, task) + @staticmethod + def get_corresponding_worker_type() -> str: + """Return the corresponding worker type for this infrastructure block.""" + return ECSWorker.type + + async def generate_work_pool_base_job_template(self) -> dict: + """ + Generate a base job template for a cloud-run work pool with the same + configuration as this block. + + Returns: + - dict: a base job template for a cloud-run work pool + """ + base_job_template = copy.deepcopy(ECSWorker.get_default_base_job_template()) + for key, value in self.dict(exclude_unset=True, exclude_defaults=True).items(): + if key == "command": + base_job_template["variables"]["properties"]["command"]["default"] = ( + shlex.join(value) + ) + elif key in [ + "type", + "block_type_slug", + "_block_document_id", + "_block_document_name", + "_is_anonymous", + "task_customizations", + ]: + continue + elif key == "aws_credentials": + if not self.aws_credentials._block_document_id: + raise BlockNotSavedError( + "It looks like you are trying to use a block that" + " has not been saved. Please call `.save` on your block" + " before publishing it as a work pool." + ) + base_job_template["variables"]["properties"]["aws_credentials"][ + "default" + ] = { + "$ref": { + "block_document_id": str( + self.aws_credentials._block_document_id + ) + } + } + elif key == "task_definition": + base_job_template["job_configuration"]["task_definition"] = value + elif key in base_job_template["variables"]["properties"]: + base_job_template["variables"]["properties"][key]["default"] = value + else: + self.logger.warning( + f"Variable {key!r} is not supported by Cloud Run work pools." + " Skipping." + ) + + if self.task_customizations: + try: + base_job_template["job_configuration"]["task_run_request"] = ( + self.task_customizations.apply( + base_job_template["job_configuration"]["task_run_request"] + ) + ) + except JsonPointerException: + self.logger.warning( + "Unable to apply task customizations to the base job template." + "You may need to update the template manually." + ) + + return base_job_template + def _stop_task(self, cluster: str, task: str) -> None: """ Stop a running ECS task. diff --git a/requirements.txt b/requirements.txt index 919ce567..e5cfb0b0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,5 +2,5 @@ boto3>=1.24.53 botocore>=1.27.53 mypy_boto3_s3>=1.24.94 mypy_boto3_secretsmanager>=1.26.49 -prefect>=2.13.5 +prefect>=2.14.10 tenacity>=8.0.0 \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py index ea17328f..9d2da154 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -22,11 +22,13 @@ def prefect_db(): @pytest.fixture def aws_credentials(): - return AwsCredentials( + block = AwsCredentials( aws_access_key_id="access_key_id", aws_secret_access_key="secret_access_key", region_name="us-east-1", ) + block.save("test-creds-block", overwrite=True) + return block @pytest.fixture diff --git a/tests/test_ecs.py b/tests/test_ecs.py index cf18bfe4..2f970116 100644 --- a/tests/test_ecs.py +++ b/tests/test_ecs.py @@ -1,6 +1,7 @@ import json import logging import textwrap +from copy import deepcopy from functools import partial from typing import Any, Awaitable, Callable, Dict, List, Optional from unittest.mock import MagicMock @@ -18,6 +19,8 @@ from prefect.utilities.dockerutils import get_prefect_image_name from pydantic import VERSION as PYDANTIC_VERSION +from prefect_aws.workers.ecs_worker import ECSWorker + if PYDANTIC_VERSION.startswith("2."): from pydantic.v1 import ValidationError else: @@ -2047,3 +2050,189 @@ async def test_kill_with_grace_period(aws_credentials, caplog): # Logs warning assert "grace period of 60s requested, but AWS does not support" in caplog.text + + +@pytest.fixture +def default_base_job_template(): + return deepcopy(ECSWorker.get_default_base_job_template()) + + +@pytest.fixture +def base_job_template_with_defaults(default_base_job_template, aws_credentials): + base_job_template_with_defaults = deepcopy(default_base_job_template) + base_job_template_with_defaults["variables"]["properties"]["command"][ + "default" + ] = "python my_script.py" + base_job_template_with_defaults["variables"]["properties"]["env"]["default"] = { + "VAR1": "value1", + "VAR2": "value2", + } + base_job_template_with_defaults["variables"]["properties"]["labels"]["default"] = { + "label1": "value1", + "label2": "value2", + } + base_job_template_with_defaults["variables"]["properties"]["name"][ + "default" + ] = "prefect-job" + base_job_template_with_defaults["variables"]["properties"]["image"][ + "default" + ] = "docker.io/my_image:latest" + base_job_template_with_defaults["variables"]["properties"]["aws_credentials"][ + "default" + ] = {"$ref": {"block_document_id": str(aws_credentials._block_document_id)}} + base_job_template_with_defaults["variables"]["properties"]["launch_type"][ + "default" + ] = "FARGATE_SPOT" + base_job_template_with_defaults["variables"]["properties"]["vpc_id"][ + "default" + ] = "vpc-123456" + base_job_template_with_defaults["variables"]["properties"]["task_role_arn"][ + "default" + ] = "arn:aws:iam::123456789012:role/ecsTaskExecutionRole" + base_job_template_with_defaults["variables"]["properties"]["execution_role_arn"][ + "default" + ] = "arn:aws:iam::123456789012:role/ecsTaskExecutionRole" + base_job_template_with_defaults["variables"]["properties"]["cluster"][ + "default" + ] = "test-cluster" + base_job_template_with_defaults["variables"]["properties"]["cpu"]["default"] = 2048 + base_job_template_with_defaults["variables"]["properties"]["memory"][ + "default" + ] = 4096 + + base_job_template_with_defaults["variables"]["properties"]["family"][ + "default" + ] = "test-family" + base_job_template_with_defaults["variables"]["properties"]["task_definition_arn"][ + "default" + ] = "arn:aws:ecs:us-east-1:123456789012:task-definition/test-family:1" + base_job_template_with_defaults["variables"]["properties"][ + "cloudwatch_logs_options" + ]["default"] = { + "awslogs-group": "prefect", + "awslogs-region": "us-east-1", + "awslogs-stream-prefix": "prefect", + } + base_job_template_with_defaults["variables"]["properties"][ + "configure_cloudwatch_logs" + ]["default"] = True + base_job_template_with_defaults["variables"]["properties"]["stream_output"][ + "default" + ] = True + base_job_template_with_defaults["variables"]["properties"][ + "task_watch_poll_interval" + ]["default"] = 5.1 + base_job_template_with_defaults["variables"]["properties"][ + "task_start_timeout_seconds" + ]["default"] = 60 + base_job_template_with_defaults["variables"]["properties"][ + "auto_deregister_task_definition" + ]["default"] = False + return base_job_template_with_defaults + + +@pytest.fixture +def base_job_template_with_task_arn(default_base_job_template, aws_credentials): + base_job_template_with_task_arn = deepcopy(default_base_job_template) + base_job_template_with_task_arn["variables"]["properties"]["image"][ + "default" + ] = "docker.io/my_image:latest" + + base_job_template_with_task_arn["job_configuration"]["task_definition"] = { + "containerDefinitions": [ + {"image": "docker.io/my_image:latest", "name": "prefect-job"} + ], + "cpu": "2048", + "family": "test-family", + "memory": "2024", + "executionRoleArn": "arn:aws:iam::123456789012:role/ecsTaskExecutionRole", + } + return base_job_template_with_task_arn + + +@pytest.mark.parametrize( + "job_config", + [ + "default", + "custom", + "task_definition_arn", + ], +) +async def test_generate_work_pool_base_job_template( + job_config, + base_job_template_with_defaults, + aws_credentials, + default_base_job_template, + base_job_template_with_task_arn, + caplog, +): + job = ECSTask() + expected_template = default_base_job_template + expected_template["variables"]["properties"]["image"][ + "default" + ] = get_prefect_image_name() + if job_config == "custom": + expected_template = base_job_template_with_defaults + job = ECSTask( + command=["python", "my_script.py"], + env={"VAR1": "value1", "VAR2": "value2"}, + labels={"label1": "value1", "label2": "value2"}, + name="prefect-job", + image="docker.io/my_image:latest", + aws_credentials=aws_credentials, + launch_type="FARGATE_SPOT", + vpc_id="vpc-123456", + task_role_arn="arn:aws:iam::123456789012:role/ecsTaskExecutionRole", + execution_role_arn="arn:aws:iam::123456789012:role/ecsTaskExecutionRole", + cluster="test-cluster", + cpu=2048, + memory=4096, + task_customizations=[ + { + "op": "add", + "path": "/networkConfiguration/awsvpcConfiguration/securityGroups", + "value": ["sg-d72e9599956a084f5"], + }, + ], + family="test-family", + task_definition_arn=( + "arn:aws:ecs:us-east-1:123456789012:task-definition/test-family:1" + ), + cloudwatch_logs_options={ + "awslogs-group": "prefect", + "awslogs-region": "us-east-1", + "awslogs-stream-prefix": "prefect", + }, + configure_cloudwatch_logs=True, + stream_output=True, + task_watch_poll_interval=5.1, + task_start_timeout_seconds=60, + auto_deregister_task_definition=False, + ) + elif job_config == "task_definition_arn": + expected_template = base_job_template_with_task_arn + job = ECSTask( + image="docker.io/my_image:latest", + task_definition={ + "containerDefinitions": [ + {"image": "docker.io/my_image:latest", "name": "prefect-job"} + ], + "cpu": "2048", + "family": "test-family", + "memory": "2024", + "executionRoleArn": ( + "arn:aws:iam::123456789012:role/ecsTaskExecutionRole" + ), + }, + ) + + template = await job.generate_work_pool_base_job_template() + + assert template == expected_template + + if job_config == "custom": + assert ( + "Unable to apply task customizations to the base job template." + "You may need to update the template manually." + in caplog.text + ) From 2d6dc288827ebaf08850e37e42750bc84723faf6 Mon Sep 17 00:00:00 2001 From: kevingrismore <146098880+kevingrismore@users.noreply.github.com> Date: Mon, 18 Dec 2023 08:56:58 -0600 Subject: [PATCH 11/13] Fix `S3Bucket.load()` for nested MinIO Credentials block (#359) --- CHANGELOG.md | 2 ++ prefect_aws/s3.py | 5 +---- tests/test_s3.py | 4 ++++ 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 653b1d94..cefe0b6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Bug where `S3Bucket.load()` constructed `AwsCredentials` instead of `MinIOCredentials` - [#359](https://github.com/PrefectHQ/prefect-aws/pull/359) + ### Deprecated ### Removed diff --git a/prefect_aws/s3.py b/prefect_aws/s3.py index 11ca0438..643d78ac 100644 --- a/prefect_aws/s3.py +++ b/prefect_aws/s3.py @@ -412,7 +412,7 @@ class S3Bucket(WritableFileSystem, WritableDeploymentStorage, ObjectStorageBlock bucket_name: str = Field(default=..., description="Name of your bucket.") - credentials: Union[AwsCredentials, MinIOCredentials] = Field( + credentials: Union[MinIOCredentials, AwsCredentials] = Field( default_factory=AwsCredentials, description="A block containing your credentials to AWS or MinIO.", ) @@ -425,9 +425,6 @@ class S3Bucket(WritableFileSystem, WritableDeploymentStorage, ObjectStorageBlock ), ) - class Config: - smart_union = True - # Property to maintain compatibility with storage block based deployments @property def basepath(self) -> str: diff --git a/tests/test_s3.py b/tests/test_s3.py index 89a39f7d..93d11cc1 100644 --- a/tests/test_s3.py +++ b/tests/test_s3.py @@ -822,7 +822,11 @@ def s3_bucket_with_similar_objects(self, s3_bucket_with_objects, objects_in_fold def test_credentials_are_correct_type(self, credentials): s3_bucket = S3Bucket(bucket_name="bucket", credentials=credentials) + s3_bucket_parsed = S3Bucket.parse_obj( + {"bucket_name": "bucket", "credentials": dict(credentials)} + ) assert isinstance(s3_bucket.credentials, type(credentials)) + assert isinstance(s3_bucket_parsed.credentials, type(credentials)) @pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) def test_list_objects_empty(self, s3_bucket_empty, client_parameters): From c0a4f9c5f3695ee482aa7d0f9f307148f0d453a0 Mon Sep 17 00:00:00 2001 From: Dominic Tarro <57306102+dominictarro@users.noreply.github.com> Date: Wed, 3 Jan 2024 08:32:02 -0500 Subject: [PATCH 12/13] Added Lambda function block (#355) Co-authored-by: nate nowack Co-authored-by: Alexander Streed --- docs/lambda_function.md | 6 + mkdocs.yml | 1 + prefect_aws/__init__.py | 2 + prefect_aws/lambda_function.py | 194 +++++++++++++++++++++++++ tests/test_lambda_function.py | 253 +++++++++++++++++++++++++++++++++ 5 files changed, 456 insertions(+) create mode 100644 docs/lambda_function.md create mode 100644 prefect_aws/lambda_function.py create mode 100644 tests/test_lambda_function.py diff --git a/docs/lambda_function.md b/docs/lambda_function.md new file mode 100644 index 00000000..3f5c52e8 --- /dev/null +++ b/docs/lambda_function.md @@ -0,0 +1,6 @@ +--- +description: Module handling AWS Lambda functions +notes: This documentation page is generated from source file docstrings. +--- + +::: prefect_aws.lambda_function \ No newline at end of file diff --git a/mkdocs.yml b/mkdocs.yml index 465f6407..d5084099 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -87,6 +87,7 @@ nav: - Credentials: credentials.md - ECS Worker: ecs_worker.md - ECS: ecs.md + - Lambda: lambda_function.md - Deployments: - Steps: deployments/steps.md - S3: s3.md diff --git a/prefect_aws/__init__.py b/prefect_aws/__init__.py index 9e39166b..b1de3fc5 100644 --- a/prefect_aws/__init__.py +++ b/prefect_aws/__init__.py @@ -1,6 +1,7 @@ from . import _version from .credentials import AwsCredentials, MinIOCredentials from .client_parameters import AwsClientParameters +from .lambda_function import LambdaFunction from .s3 import S3Bucket from .ecs import ECSTask from .secrets_manager import AwsSecret @@ -17,6 +18,7 @@ __all__ = [ "AwsCredentials", "AwsClientParameters", + "LambdaFunction", "MinIOCredentials", "S3Bucket", "ECSTask", diff --git a/prefect_aws/lambda_function.py b/prefect_aws/lambda_function.py new file mode 100644 index 00000000..b2cc631c --- /dev/null +++ b/prefect_aws/lambda_function.py @@ -0,0 +1,194 @@ +"""Integrations with AWS Lambda. + +Examples: + + Run a lambda function with a payload + + ```python + LambdaFunction( + function_name="test-function", + aws_credentials=aws_credentials, + ).invoke(payload={"foo": "bar"}) + ``` + + Specify a version of a lambda function + + ```python + LambdaFunction( + function_name="test-function", + qualifier="1", + aws_credentials=aws_credentials, + ).invoke() + ``` + + Invoke a lambda function asynchronously + + ```python + LambdaFunction( + function_name="test-function", + aws_credentials=aws_credentials, + ).invoke(invocation_type="Event") + ``` + + Invoke a lambda function and return the last 4 KB of logs + + ```python + LambdaFunction( + function_name="test-function", + aws_credentials=aws_credentials, + ).invoke(tail=True) + ``` + + Invoke a lambda function with a client context + + ```python + LambdaFunction( + function_name="test-function", + aws_credentials=aws_credentials, + ).invoke(client_context={"bar": "foo"}) + ``` + +""" +import json +from typing import Literal, Optional + +from prefect.blocks.core import Block +from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible +from pydantic import VERSION as PYDANTIC_VERSION + +if PYDANTIC_VERSION.startswith("2."): + from pydantic.v1 import Field +else: + from pydantic import Field + +from prefect_aws.credentials import AwsCredentials + + +class LambdaFunction(Block): + """Invoke a Lambda function. This block is part of the prefect-aws + collection. Install prefect-aws with `pip install prefect-aws` to use this + block. + + Attributes: + function_name: The name, ARN, or partial ARN of the Lambda function to + run. This must be the name of a function that is already deployed + to AWS Lambda. + qualifier: The version or alias of the Lambda function to use when + invoked. If not specified, the latest (unqualified) version of the + Lambda function will be used. + aws_credentials: The AWS credentials to use to connect to AWS Lambda + with a default factory of AwsCredentials. + + """ + + _block_type_name = "Lambda Function" + _logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/d74b16fe84ce626345adf235a47008fea2869a60-225x225.png" # noqa + _documentation_url = "https://prefecthq.github.io/prefect-aws/s3/#prefect_aws.lambda_function.LambdaFunction" # noqa + + function_name: str = Field( + title="Function Name", + description=( + "The name, ARN, or partial ARN of the Lambda function to run. This" + " must be the name of a function that is already deployed to AWS" + " Lambda." + ), + ) + qualifier: Optional[str] = Field( + default=None, + title="Qualifier", + description=( + "The version or alias of the Lambda function to use when invoked. " + "If not specified, the latest (unqualified) version of the Lambda " + "function will be used." + ), + ) + aws_credentials: AwsCredentials = Field( + title="AWS Credentials", + default_factory=AwsCredentials, + description="The AWS credentials to invoke the Lambda with.", + ) + + class Config: + """Lambda's pydantic configuration.""" + + smart_union = True + + def _get_lambda_client(self): + """ + Retrieve a boto3 session and Lambda client + """ + boto_session = self.aws_credentials.get_boto3_session() + lambda_client = boto_session.client("lambda") + return lambda_client + + @sync_compatible + async def invoke( + self, + payload: dict = None, + invocation_type: Literal[ + "RequestResponse", "Event", "DryRun" + ] = "RequestResponse", + tail: bool = False, + client_context: Optional[dict] = None, + ) -> dict: + """ + [Invoke](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda/client/invoke.html) + the Lambda function with the given payload. + + Args: + payload: The payload to send to the Lambda function. + invocation_type: The invocation type of the Lambda function. This + can be one of "RequestResponse", "Event", or "DryRun". Uses + "RequestResponse" by default. + tail: If True, the response will include the base64-encoded last 4 + KB of log data produced by the Lambda function. + client_context: The client context to send to the Lambda function. + Limited to 3583 bytes. + + Returns: + The response from the Lambda function. + + Examples: + + ```python + from prefect_aws.lambda_function import LambdaFunction + from prefect_aws.credentials import AwsCredentials + + credentials = AwsCredentials() + lambda_function = LambdaFunction( + function_name="test_lambda_function", + aws_credentials=credentials, + ) + response = lambda_function.invoke( + payload={"foo": "bar"}, + invocation_type="RequestResponse", + ) + response["Payload"].read() + ``` + ```txt + b'{"foo": "bar"}' + ``` + + """ + # Add invocation arguments + kwargs = dict(FunctionName=self.function_name) + + if payload: + kwargs["Payload"] = json.dumps(payload).encode() + + # Let boto handle invalid invocation types + kwargs["InvocationType"] = invocation_type + + if self.qualifier is not None: + kwargs["Qualifier"] = self.qualifier + + if tail: + kwargs["LogType"] = "Tail" + + if client_context is not None: + # For some reason this is string, but payload is bytes + kwargs["ClientContext"] = json.dumps(client_context) + + # Get client and invoke + lambda_client = await run_sync_in_worker_thread(self._get_lambda_client) + return await run_sync_in_worker_thread(lambda_client.invoke, **kwargs) diff --git a/tests/test_lambda_function.py b/tests/test_lambda_function.py new file mode 100644 index 00000000..32210629 --- /dev/null +++ b/tests/test_lambda_function.py @@ -0,0 +1,253 @@ +import inspect +import io +import json +import zipfile +from typing import Optional + +import boto3 +import pytest +from botocore.response import StreamingBody +from moto import mock_iam, mock_lambda +from pytest_lazyfixture import lazy_fixture + +from prefect_aws.credentials import AwsCredentials +from prefect_aws.lambda_function import LambdaFunction + + +@pytest.fixture +def lambda_mock(aws_credentials: AwsCredentials): + with mock_lambda(): + yield boto3.client( + "lambda", + region_name=aws_credentials.region_name, + ) + + +@pytest.fixture +def iam_mock(aws_credentials: AwsCredentials): + with mock_iam(): + yield boto3.client( + "iam", + region_name=aws_credentials.region_name, + ) + + +@pytest.fixture +def mock_iam_rule(iam_mock): + yield iam_mock.create_role( + RoleName="test-role", + AssumeRolePolicyDocument=json.dumps( + { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": {"Service": "lambda.amazonaws.com"}, + "Action": "sts:AssumeRole", + } + ], + } + ), + ) + + +def handler_a(event, context): + if isinstance(event, dict): + if "error" in event: + raise Exception(event["error"]) + event["foo"] = "bar" + else: + event = {"foo": "bar"} + return event + + +LAMBDA_TEST_CODE = inspect.getsource(handler_a) + + +@pytest.fixture +def mock_lambda_code(): + with io.BytesIO() as f: + with zipfile.ZipFile(f, mode="w") as z: + z.writestr("foo.py", LAMBDA_TEST_CODE) + f.seek(0) + yield f.read() + + +@pytest.fixture +def mock_lambda_function(lambda_mock, mock_iam_rule, mock_lambda_code): + r = lambda_mock.create_function( + FunctionName="test-function", + Runtime="python3.10", + Role=mock_iam_rule["Role"]["Arn"], + Handler="foo.handler", + Code={"ZipFile": mock_lambda_code}, + ) + r2 = lambda_mock.publish_version( + FunctionName="test-function", + ) + r["Version"] = r2["Version"] + yield r + + +def handler_b(event, context): + event = {"data": [1, 2, 3]} + return event + + +LAMBDA_TEST_CODE_V2 = inspect.getsource(handler_b) + + +@pytest.fixture +def mock_lambda_code_v2(): + with io.BytesIO() as f: + with zipfile.ZipFile(f, mode="w") as z: + z.writestr("foo.py", LAMBDA_TEST_CODE_V2) + f.seek(0) + yield f.read() + + +@pytest.fixture +def add_lambda_version(mock_lambda_function, lambda_mock, mock_lambda_code_v2): + r = mock_lambda_function.copy() + lambda_mock.update_function_code( + FunctionName="test-function", + ZipFile=mock_lambda_code_v2, + ) + r2 = lambda_mock.publish_version( + FunctionName="test-function", + ) + r["Version"] = r2["Version"] + yield r + + +@pytest.fixture +def lambda_function(aws_credentials): + return LambdaFunction( + function_name="test-function", + aws_credentials=aws_credentials, + ) + + +def make_patched_invocation(client, handler): + """Creates a patched invoke method for moto lambda. The method replaces + the response 'Payload' with the result of the handler function. + """ + true_invoke = client.invoke + + def invoke(*args, **kwargs): + """Calls the true invoke and replaces the Payload with its result.""" + result = true_invoke(*args, **kwargs) + blob = json.dumps( + handler( + event=kwargs.get("Payload"), + context=kwargs.get("ClientContext"), + ) + ).encode() + result["Payload"] = StreamingBody(io.BytesIO(blob), len(blob)) + return result + + return invoke + + +@pytest.fixture +def mock_invoke( + lambda_function: LambdaFunction, handler, monkeypatch: pytest.MonkeyPatch +): + """Fixture to patch the invocation response's 'Payload' field. + + When `result["Payload"].read` is called, moto attempts to run the function + in a Docker container and return the result. This is total overkill, so + we actually call the handler with the given arguments. + """ + client = lambda_function._get_lambda_client() + + monkeypatch.setattr( + client, + "invoke", + make_patched_invocation(client, handler), + ) + + def _get_lambda_client(): + return client + + monkeypatch.setattr( + lambda_function, + "_get_lambda_client", + _get_lambda_client, + ) + + yield + + +class TestLambdaFunction: + def test_init(self, aws_credentials): + function = LambdaFunction( + function_name="test-function", + aws_credentials=aws_credentials, + ) + assert function.function_name == "test-function" + assert function.qualifier is None + + @pytest.mark.parametrize( + "payload,expected,handler", + [ + ({"foo": "baz"}, {"foo": "bar"}, handler_a), + (None, {"foo": "bar"}, handler_a), + ], + ) + def test_invoke_lambda_payloads( + self, + payload: Optional[dict], + expected: dict, + handler, + mock_lambda_function, + lambda_function: LambdaFunction, + mock_invoke, + ): + result = lambda_function.invoke(payload) + assert result["StatusCode"] == 200 + response_payload = json.loads(result["Payload"].read()) + assert response_payload == expected + + @pytest.mark.parametrize("handler", [handler_a]) + def test_invoke_lambda_tail( + self, lambda_function: LambdaFunction, mock_lambda_function, mock_invoke + ): + result = lambda_function.invoke(tail=True) + assert result["StatusCode"] == 200 + response_payload = json.loads(result["Payload"].read()) + assert response_payload == {"foo": "bar"} + assert "LogResult" in result + + @pytest.mark.parametrize("handler", [handler_a]) + def test_invoke_lambda_client_context( + self, lambda_function: LambdaFunction, mock_lambda_function, mock_invoke + ): + # Just making sure boto doesn't throw an error + result = lambda_function.invoke(client_context={"bar": "foo"}) + assert result["StatusCode"] == 200 + response_payload = json.loads(result["Payload"].read()) + assert response_payload == {"foo": "bar"} + + @pytest.mark.parametrize( + "func_fixture,expected,handler", + [ + (lazy_fixture("mock_lambda_function"), {"foo": "bar"}, handler_a), + (lazy_fixture("add_lambda_version"), {"data": [1, 2, 3]}, handler_b), + ], + ) + def test_invoke_lambda_qualifier( + self, + func_fixture, + expected, + lambda_function: LambdaFunction, + mock_invoke, + ): + try: + lambda_function.qualifier = func_fixture["Version"] + result = lambda_function.invoke() + assert result["StatusCode"] == 200 + response_payload = json.loads(result["Payload"].read()) + assert response_payload == expected + finally: + lambda_function.qualifier = None From 50d5f6395e860d2c00c5f4377399d84232954069 Mon Sep 17 00:00:00 2001 From: nate nowack Date: Wed, 3 Jan 2024 09:57:17 -0600 Subject: [PATCH 13/13] update changelog (#363) --- CHANGELOG.md | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cefe0b6d..64454b60 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,12 +13,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed -- Bug where `S3Bucket.load()` constructed `AwsCredentials` instead of `MinIOCredentials` - [#359](https://github.com/PrefectHQ/prefect-aws/pull/359) - ### Deprecated ### Removed +## 0.4.7 + +Released January 3rd, 2024. + +### Added + +- `LambdaFunction` block to invoke lambda functions - [#355](https://github.com/PrefectHQ/prefect-aws/pull/355) + +### Fixed + +- Bug where `S3Bucket.load()` constructed `AwsCredentials` instead of `MinIOCredentials` - [#359](https://github.com/PrefectHQ/prefect-aws/pull/359) + ## 0.4.6 Released December 11th, 2023.