diff --git a/CHANGELOG.md b/CHANGELOG.md index cefe0b6d..70227627 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,16 +9,30 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Support MinIO Credentials as `credentials` dict for `push_to_s3` and `pull_from_s3` - [#366](https://github.com/PrefectHQ/prefect-aws/pull/366) + ### Changed -### Fixed +- Handle `boto3` clients more efficiently with `lru_cache` - [#361](https://github.com/PrefectHQ/prefect-aws/pull/361) -- Bug where `S3Bucket.load()` constructed `AwsCredentials` instead of `MinIOCredentials` - [#359](https://github.com/PrefectHQ/prefect-aws/pull/359) +### Fixed ### Deprecated ### Removed +## 0.4.7 + +Released January 3rd, 2024. + +### Added + +- `LambdaFunction` block to invoke lambda functions - [#355](https://github.com/PrefectHQ/prefect-aws/pull/355) + +### Fixed + +- Bug where `S3Bucket.load()` constructed `AwsCredentials` instead of `MinIOCredentials` - [#359](https://github.com/PrefectHQ/prefect-aws/pull/359) + ## 0.4.6 Released December 11th, 2023. @@ -95,6 +109,7 @@ Released August 31st, 2023. Released July 20th, 2023. ### Changed + - Promoted workers to GA, removed beta disclaimers ## 0.3.5 @@ -128,7 +143,7 @@ Released on June 13th, 2023. ### Fixed -- Change prefect.docker import to prefect.utilities.dockerutils to fix a crash when using custom blocks based on S3Bucket - [#273](https://github.com/PrefectHQ/prefect-aws/pull/273) +- Change prefect.docker import to prefect.utilities.dockerutils to fix a crash when using custom blocks based on S3Bucket - [#273](https://github.com/PrefectHQ/prefect-aws/pull/273) ## 0.3.2 @@ -217,7 +232,7 @@ Released on January 4th, 2023. - `list_objects`, `download_object_to_path`, `download_object_to_file_object`, `download_folder_to_path`, `upload_from_path`, `upload_from_file_object`, `upload_from_folder` methods in `S3Bucket` - [#85](https://github.com/PrefectHQ/prefect-aws/pull/175) - `aws_client_parameters` as a field in `AwsCredentials` and `MinioCredentials` blocks - [#175](https://github.com/PrefectHQ/prefect-aws/pull/175) -- `get_client` and `get_s3_client` methods to `AwsCredentials` and `MinioCredentials` blocks - [#175](https://github.com/PrefectHQ/prefect-aws/pull/175) +- `get_client` and `get_s3_client` methods to `AwsCredentials` and `MinioCredentials` blocks - [#175](https://github.com/PrefectHQ/prefect-aws/pull/175) ### Changed @@ -229,7 +244,7 @@ Released on January 4th, 2023. - `endpoint_url` field in S3Bucket; specify `aws_client_parameters` in `AwsCredentials` or `MinIOCredentials` instead - [#175](https://github.com/PrefectHQ/prefect-aws/pull/175) - `basepath` field in S3Bucket; specify `bucket_folder` instead - [#175](https://github.com/PrefectHQ/prefect-aws/pull/175) -- `minio_credentials` and `aws_credentials` field in S3Bucket; use the `credentials` field instead - [#175](https://github.com/PrefectHQ/prefect-aws/pull/175) +- `minio_credentials` and `aws_credentials` field in S3Bucket; use the `credentials` field instead - [#175](https://github.com/PrefectHQ/prefect-aws/pull/175) ## 0.2.1 @@ -283,6 +298,7 @@ Released on October 28th, 2022. - `ECSTask` is no longer experimental — [#137](https://github.com/PrefectHQ/prefect-aws/pull/137) ### Fixed + - Fix ignore_file option in `S3Bucket` skipping files which should be included — [#139](https://github.com/PrefectHQ/prefect-aws/pull/139) - Fixed bug where `basepath` is used twice in the path when using `S3Bucket.put_directory` - [#143](https://github.com/PrefectHQ/prefect-aws/pull/143) diff --git a/prefect_aws/client_parameters.py b/prefect_aws/client_parameters.py index bf030590..eb3be09b 100644 --- a/prefect_aws/client_parameters.py +++ b/prefect_aws/client_parameters.py @@ -70,6 +70,18 @@ class AwsClientParameters(BaseModel): title="Botocore Config", ) + def __hash__(self): + return hash( + ( + self.api_version, + self.use_ssl, + self.verify, + self.verify_cert_path, + self.endpoint_url, + self.config, + ) + ) + @validator("config", pre=True) def instantiate_config(cls, value: Union[Config, Dict[str, Any]]) -> Dict[str, Any]: """ diff --git a/prefect_aws/credentials.py b/prefect_aws/credentials.py index 64f49efe..5aeddaa6 100644 --- a/prefect_aws/credentials.py +++ b/prefect_aws/credentials.py @@ -1,6 +1,8 @@ """Module handling AWS credentials""" from enum import Enum +from functools import lru_cache +from threading import Lock from typing import Any, Optional, Union import boto3 @@ -16,14 +18,43 @@ from prefect_aws.client_parameters import AwsClientParameters +_LOCK = Lock() + class ClientType(Enum): + """The supported boto3 clients.""" + S3 = "s3" ECS = "ecs" BATCH = "batch" SECRETS_MANAGER = "secretsmanager" +@lru_cache(maxsize=8, typed=True) +def _get_client_cached(ctx, client_type: Union[str, ClientType]) -> Any: + """ + Helper method to cache and dynamically get a client type. + + Args: + client_type: The client's service name. + + Returns: + An authenticated client. + + Raises: + ValueError: if the client is not supported. + """ + with _LOCK: + if isinstance(client_type, ClientType): + client_type = client_type.value + + client = ctx.get_boto3_session().client( + service_name=client_type, + **ctx.aws_client_parameters.get_params_override(), + ) + return client + + class AwsCredentials(CredentialsBlock): """ Block used to manage authentication with AWS. AWS authentication is @@ -75,6 +106,22 @@ class AwsCredentials(CredentialsBlock): title="AWS Client Parameters", ) + class Config: + """Config class for pydantic model.""" + + arbitrary_types_allowed = True + + def __hash__(self): + field_hashes = ( + hash(self.aws_access_key_id), + hash(self.aws_secret_access_key), + hash(self.aws_session_token), + hash(self.profile_name), + hash(self.region_name), + hash(frozenset(self.aws_client_parameters.dict().items())), + ) + return hash(field_hashes) + def get_boto3_session(self) -> boto3.Session: """ Returns an authenticated boto3 session that can be used to create clients @@ -104,7 +151,7 @@ def get_boto3_session(self) -> boto3.Session: region_name=self.region_name, ) - def get_client(self, client_type: Union[str, ClientType]) -> Any: + def get_client(self, client_type: Union[str, ClientType]): """ Helper method to dynamically get a client type. @@ -120,10 +167,7 @@ def get_client(self, client_type: Union[str, ClientType]) -> Any: if isinstance(client_type, ClientType): client_type = client_type.value - client = self.get_boto3_session().client( - service_name=client_type, **self.aws_client_parameters.get_params_override() - ) - return client + return _get_client_cached(ctx=self, client_type=client_type) def get_s3_client(self) -> S3Client: """ @@ -186,6 +230,21 @@ class MinIOCredentials(CredentialsBlock): description="Extra parameters to initialize the Client.", ) + class Config: + """Config class for pydantic model.""" + + arbitrary_types_allowed = True + + def __hash__(self): + return hash( + ( + hash(self.minio_root_user), + hash(self.minio_root_password), + hash(self.region_name), + hash(frozenset(self.aws_client_parameters.dict().items())), + ) + ) + def get_boto3_session(self) -> boto3.Session: """ Returns an authenticated boto3 session that can be used to create clients @@ -218,7 +277,7 @@ def get_boto3_session(self) -> boto3.Session: region_name=self.region_name, ) - def get_client(self, client_type: Union[str, ClientType]) -> Any: + def get_client(self, client_type: Union[str, ClientType]): """ Helper method to dynamically get a client type. @@ -234,10 +293,7 @@ def get_client(self, client_type: Union[str, ClientType]) -> Any: if isinstance(client_type, ClientType): client_type = client_type.value - client = self.get_boto3_session().client( - service_name=client_type, **self.aws_client_parameters.get_params_override() - ) - return client + return _get_client_cached(ctx=self, client_type=client_type) def get_s3_client(self) -> S3Client: """ diff --git a/prefect_aws/deployments/steps.py b/prefect_aws/deployments/steps.py index 7525a5e2..6161dfb8 100644 --- a/prefect_aws/deployments/steps.py +++ b/prefect_aws/deployments/steps.py @@ -62,7 +62,8 @@ def push_to_s3( bucket: The name of the S3 bucket where files will be uploaded. folder: The folder in the S3 bucket where files will be uploaded. credentials: A dictionary of AWS credentials (aws_access_key_id, - aws_secret_access_key, aws_session_token). + aws_secret_access_key, aws_session_token) or MinIO credentials + (minio_root_user, minio_root_password). client_parameters: A dictionary of additional parameters to pass to the boto3 client. ignore_file: The name of the file containing ignore patterns. @@ -139,7 +140,8 @@ def pull_from_s3( bucket: The name of the S3 bucket where files are stored. folder: The folder in the S3 bucket where files are stored. credentials: A dictionary of AWS credentials (aws_access_key_id, - aws_secret_access_key, aws_session_token). + aws_secret_access_key, aws_session_token) or MinIO credentials + (minio_root_user, minio_root_password). client_parameters: A dictionary of additional parameters to pass to the boto3 client. @@ -204,8 +206,12 @@ def get_s3_client( client_parameters = {} # Get credentials from credentials (regardless if block or not) - aws_access_key_id = credentials.get("aws_access_key_id", None) - aws_secret_access_key = credentials.get("aws_secret_access_key", None) + aws_access_key_id = credentials.get( + "aws_access_key_id", credentials.get("minio_root_user", None) + ) + aws_secret_access_key = credentials.get( + "aws_secret_access_key", credentials.get("minio_root_password", None) + ) aws_session_token = credentials.get("aws_session_token", None) # Get remaining session info from credentials, or client_parameters diff --git a/prefect_aws/ecs.py b/prefect_aws/ecs.py index 8e7052f2..ac5dd333 100644 --- a/prefect_aws/ecs.py +++ b/prefect_aws/ecs.py @@ -407,7 +407,7 @@ class ECSTask(Infrastructure): description=( "The type of ECS task run infrastructure that should be used. Note that" " 'FARGATE_SPOT' is not a formal ECS launch type, but we will configure" - " the proper capacity provider stategy if set here." + " the proper capacity provider strategy if set here." ), ) ) diff --git a/prefect_aws/s3.py b/prefect_aws/s3.py index 643d78ac..a10e2171 100644 --- a/prefect_aws/s3.py +++ b/prefect_aws/s3.py @@ -466,7 +466,7 @@ def _get_s3_client(self) -> boto3.client: Authenticate MinIO credentials or AWS credentials and return an S3 client. This is a helper function called by read_path() or write_path(). """ - return self.credentials.get_s3_client() + return self.credentials.get_client("s3") def _get_bucket_resource(self) -> boto3.resource: """ diff --git a/prefect_aws/workers/ecs_worker.py b/prefect_aws/workers/ecs_worker.py index c55e0f30..6d3c8ec7 100644 --- a/prefect_aws/workers/ecs_worker.py +++ b/prefect_aws/workers/ecs_worker.py @@ -420,7 +420,7 @@ class ECSVariables(BaseVariables): description=( "The type of ECS task run infrastructure that should be used. Note that" " 'FARGATE_SPOT' is not a formal ECS launch type, but we will configure" - " the proper capacity provider stategy if set here." + " the proper capacity provider strategy if set here." ), ) ) diff --git a/tests/deploments/test_steps.py b/tests/deployments/test_steps.py similarity index 82% rename from tests/deploments/test_steps.py rename to tests/deployments/test_steps.py index 22608bd7..15c4fc25 100644 --- a/tests/deploments/test_steps.py +++ b/tests/deployments/test_steps.py @@ -11,6 +11,16 @@ from prefect_aws.deployments.steps import get_s3_client, pull_from_s3, push_to_s3 +@pytest.fixture(scope="module", autouse=True) +def set_custom_endpoint(): + original = os.environ.get("MOTO_S3_CUSTOM_ENDPOINTS") + os.environ["MOTO_S3_CUSTOM_ENDPOINTS"] = "http://custom.minio.endpoint:9000" + yield + os.environ.pop("MOTO_S3_CUSTOM_ENDPOINTS") + if original is not None: + os.environ["MOTO_S3_CUSTOM_ENDPOINTS"] = original + + @pytest.fixture def s3_setup(): with mock_s3(): @@ -215,8 +225,15 @@ def test_s3_session_with_params(): }, ) get_s3_client(credentials=creds_block.dict()) + get_s3_client( + credentials={ + "minio_root_user": "MY_USER", + "minio_root_password": "MY_PASSWORD", + }, + client_parameters={"endpoint_url": "http://custom.minio.endpoint:9000"}, + ) all_calls = mock_session.mock_calls - assert len(all_calls) == 6 + assert len(all_calls) == 8 assert all_calls[0].kwargs == { "aws_access_key_id": "THE_KEY", "aws_secret_access_key": "SHHH!", @@ -265,6 +282,20 @@ def test_s3_session_with_params(): }.items() <= all_calls[5].kwargs.items() assert all_calls[5].kwargs.get("config").connect_timeout == 123 assert all_calls[5].kwargs.get("config").signature_version is None + assert all_calls[6].kwargs == { + "aws_access_key_id": "MY_USER", + "aws_secret_access_key": "MY_PASSWORD", + "aws_session_token": None, + "profile_name": None, + "region_name": None, + } + assert all_calls[7].args[0] == "s3" + assert { + "api_version": None, + "use_ssl": True, + "verify": None, + "endpoint_url": "http://custom.minio.endpoint:9000", + }.items() <= all_calls[7].kwargs.items() def test_custom_credentials_and_client_parameters(s3_setup, tmp_files): @@ -309,6 +340,47 @@ def test_custom_credentials_and_client_parameters(s3_setup, tmp_files): assert (tmp_path / file.name).exists() +def test_custom_credentials_and_client_parameters_minio(s3_setup, tmp_files): + s3, bucket_name = s3_setup + folder = "my-project" + + # Custom credentials and client parameters + custom_credentials = { + "minio_root_user": "fake_user", + "minio_root_password": "fake_password", + } + + custom_client_parameters = { + "endpoint_url": "http://custom.minio.endpoint:9000", + } + + os.chdir(tmp_files) + + # Test push_to_s3 with custom credentials and client parameters + push_to_s3( + bucket_name, + folder, + credentials=custom_credentials, + client_parameters=custom_client_parameters, + ) + + # Test pull_from_s3 with custom credentials and client parameters + tmp_path = tmp_files / "test_pull" + tmp_path.mkdir(parents=True, exist_ok=True) + os.chdir(tmp_path) + + pull_from_s3( + bucket_name, + folder, + credentials=custom_credentials, + client_parameters=custom_client_parameters, + ) + + for file in tmp_files.iterdir(): + if file.is_file() and file.name != ".prefectignore": + assert (tmp_path / file.name).exists() + + def test_without_prefectignore_file(s3_setup, tmp_files: Path, mock_aws_credentials): s3, bucket_name = s3_setup folder = "my-project" diff --git a/tests/test_credentials.py b/tests/test_credentials.py index 6e0a1ff8..96ecbd22 100644 --- a/tests/test_credentials.py +++ b/tests/test_credentials.py @@ -3,7 +3,12 @@ from botocore.client import BaseClient from moto import mock_s3 -from prefect_aws.credentials import AwsCredentials, ClientType, MinIOCredentials +from prefect_aws.credentials import ( + AwsCredentials, + ClientType, + MinIOCredentials, + _get_client_cached, +) def test_aws_credentials_get_boto3_session(): @@ -44,3 +49,118 @@ def test_minio_credentials_get_boto3_session(): def test_credentials_get_client(credentials, client_type): with mock_s3(): assert isinstance(credentials.get_client(client_type), BaseClient) + + +@pytest.mark.parametrize( + "credentials", + [ + AwsCredentials(region_name="us-east-1"), + MinIOCredentials( + minio_root_user="root_user", + minio_root_password="root_password", + region_name="us-east-1", + ), + ], +) +@pytest.mark.parametrize("client_type", [member.value for member in ClientType]) +def test_get_client_cached(credentials, client_type): + """ + Test to ensure that _get_client_cached function returns the same instance + for multiple calls with the same parameters and properly utilizes lru_cache. + """ + + _get_client_cached.cache_clear() + + assert _get_client_cached.cache_info().hits == 0, "Initial call count should be 0" + + credentials.get_client(client_type) + credentials.get_client(client_type) + credentials.get_client(client_type) + + assert _get_client_cached.cache_info().misses == 1 + assert _get_client_cached.cache_info().hits == 2 + + +@pytest.mark.parametrize("client_type", [member.value for member in ClientType]) +def test_aws_credentials_change_causes_cache_miss(client_type): + """ + Test to ensure that changing configuration on an AwsCredentials instance + after fetching a client causes a cache miss. + """ + + _get_client_cached.cache_clear() + + credentials = AwsCredentials(region_name="us-east-1") + + initial_client = credentials.get_client(client_type) + + credentials.region_name = "us-west-2" + + new_client = credentials.get_client(client_type) + + assert ( + initial_client is not new_client + ), "Client should be different after configuration change" + + assert _get_client_cached.cache_info().misses == 2, "Cache should miss twice" + + +@pytest.mark.parametrize("client_type", [member.value for member in ClientType]) +def test_minio_credentials_change_causes_cache_miss(client_type): + """ + Test to ensure that changing configuration on an AwsCredentials instance + after fetching a client causes a cache miss. + """ + + _get_client_cached.cache_clear() + + credentials = MinIOCredentials( + minio_root_user="root_user", + minio_root_password="root_password", + region_name="us-east-1", + ) + + initial_client = credentials.get_client(client_type) + + credentials.region_name = "us-west-2" + + new_client = credentials.get_client(client_type) + + assert ( + initial_client is not new_client + ), "Client should be different after configuration change" + + assert _get_client_cached.cache_info().misses == 2, "Cache should miss twice" + + +@pytest.mark.parametrize( + "credentials_type, initial_field, new_field", + [ + ( + AwsCredentials, + {"region_name": "us-east-1"}, + {"region_name": "us-east-2"}, + ), + ( + MinIOCredentials, + { + "region_name": "us-east-1", + "minio_root_user": "root_user", + "minio_root_password": "root_password", + }, + { + "region_name": "us-east-2", + "minio_root_user": "root_user", + "minio_root_password": "root_password", + }, + ), + ], +) +def test_aws_credentials_hash_changes(credentials_type, initial_field, new_field): + credentials = credentials_type(**initial_field) + initial_hash = hash(credentials) + + setattr(credentials, list(new_field.keys())[0], list(new_field.values())[0]) + new_hash = hash(credentials) + + assert initial_hash != new_hash, "Hash should change when region_name changes"