Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge branch 'main' into dependabot/github_actions/actions/upload-art…
Browse files Browse the repository at this point in the history
…ifact-4
  • Loading branch information
zzstoatzz authored Jan 22, 2024
2 parents 7b5c3a5 + 7a25492 commit adee7ff
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 24 deletions.
26 changes: 21 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,30 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Support MinIO Credentials as `credentials` dict for `push_to_s3` and `pull_from_s3` - [#366](https://github.com/PrefectHQ/prefect-aws/pull/366)

### Changed

### Fixed
- Handle `boto3` clients more efficiently with `lru_cache` - [#361](https://github.com/PrefectHQ/prefect-aws/pull/361)

- Bug where `S3Bucket.load()` constructed `AwsCredentials` instead of `MinIOCredentials` - [#359](https://github.com/PrefectHQ/prefect-aws/pull/359)
### Fixed

### Deprecated

### Removed

## 0.4.7

Released January 3rd, 2024.

### Added

- `LambdaFunction` block to invoke lambda functions - [#355](https://github.com/PrefectHQ/prefect-aws/pull/355)

### Fixed

- Bug where `S3Bucket.load()` constructed `AwsCredentials` instead of `MinIOCredentials` - [#359](https://github.com/PrefectHQ/prefect-aws/pull/359)

## 0.4.6

Released December 11th, 2023.
Expand Down Expand Up @@ -95,6 +109,7 @@ Released August 31st, 2023.
Released July 20th, 2023.

### Changed

- Promoted workers to GA, removed beta disclaimers

## 0.3.5
Expand Down Expand Up @@ -128,7 +143,7 @@ Released on June 13th, 2023.

### Fixed

- Change prefect.docker import to prefect.utilities.dockerutils to fix a crash when using custom blocks based on S3Bucket - [#273](https://github.com/PrefectHQ/prefect-aws/pull/273)
- Change prefect.docker import to prefect.utilities.dockerutils to fix a crash when using custom blocks based on S3Bucket - [#273](https://github.com/PrefectHQ/prefect-aws/pull/273)

## 0.3.2

Expand Down Expand Up @@ -217,7 +232,7 @@ Released on January 4th, 2023.

- `list_objects`, `download_object_to_path`, `download_object_to_file_object`, `download_folder_to_path`, `upload_from_path`, `upload_from_file_object`, `upload_from_folder` methods in `S3Bucket` - [#85](https://github.com/PrefectHQ/prefect-aws/pull/175)
- `aws_client_parameters` as a field in `AwsCredentials` and `MinioCredentials` blocks - [#175](https://github.com/PrefectHQ/prefect-aws/pull/175)
- `get_client` and `get_s3_client` methods to `AwsCredentials` and `MinioCredentials` blocks - [#175](https://github.com/PrefectHQ/prefect-aws/pull/175)
- `get_client` and `get_s3_client` methods to `AwsCredentials` and `MinioCredentials` blocks - [#175](https://github.com/PrefectHQ/prefect-aws/pull/175)

### Changed

Expand All @@ -229,7 +244,7 @@ Released on January 4th, 2023.

- `endpoint_url` field in S3Bucket; specify `aws_client_parameters` in `AwsCredentials` or `MinIOCredentials` instead - [#175](https://github.com/PrefectHQ/prefect-aws/pull/175)
- `basepath` field in S3Bucket; specify `bucket_folder` instead - [#175](https://github.com/PrefectHQ/prefect-aws/pull/175)
- `minio_credentials` and `aws_credentials` field in S3Bucket; use the `credentials` field instead - [#175](https://github.com/PrefectHQ/prefect-aws/pull/175)
- `minio_credentials` and `aws_credentials` field in S3Bucket; use the `credentials` field instead - [#175](https://github.com/PrefectHQ/prefect-aws/pull/175)

## 0.2.1

Expand Down Expand Up @@ -283,6 +298,7 @@ Released on October 28th, 2022.
- `ECSTask` is no longer experimental — [#137](https://github.com/PrefectHQ/prefect-aws/pull/137)

### Fixed

- Fix ignore_file option in `S3Bucket` skipping files which should be included — [#139](https://github.com/PrefectHQ/prefect-aws/pull/139)
- Fixed bug where `basepath` is used twice in the path when using `S3Bucket.put_directory` - [#143](https://github.com/PrefectHQ/prefect-aws/pull/143)

Expand Down
12 changes: 12 additions & 0 deletions prefect_aws/client_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ class AwsClientParameters(BaseModel):
title="Botocore Config",
)

def __hash__(self):
return hash(
(
self.api_version,
self.use_ssl,
self.verify,
self.verify_cert_path,
self.endpoint_url,
self.config,
)
)

@validator("config", pre=True)
def instantiate_config(cls, value: Union[Config, Dict[str, Any]]) -> Dict[str, Any]:
"""
Expand Down
76 changes: 66 additions & 10 deletions prefect_aws/credentials.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Module handling AWS credentials"""

from enum import Enum
from functools import lru_cache
from threading import Lock
from typing import Any, Optional, Union

import boto3
Expand All @@ -16,14 +18,43 @@

from prefect_aws.client_parameters import AwsClientParameters

_LOCK = Lock()


class ClientType(Enum):
"""The supported boto3 clients."""

S3 = "s3"
ECS = "ecs"
BATCH = "batch"
SECRETS_MANAGER = "secretsmanager"


@lru_cache(maxsize=8, typed=True)
def _get_client_cached(ctx, client_type: Union[str, ClientType]) -> Any:
"""
Helper method to cache and dynamically get a client type.
Args:
client_type: The client's service name.
Returns:
An authenticated client.
Raises:
ValueError: if the client is not supported.
"""
with _LOCK:
if isinstance(client_type, ClientType):
client_type = client_type.value

client = ctx.get_boto3_session().client(
service_name=client_type,
**ctx.aws_client_parameters.get_params_override(),
)
return client


class AwsCredentials(CredentialsBlock):
"""
Block used to manage authentication with AWS. AWS authentication is
Expand Down Expand Up @@ -75,6 +106,22 @@ class AwsCredentials(CredentialsBlock):
title="AWS Client Parameters",
)

class Config:
"""Config class for pydantic model."""

arbitrary_types_allowed = True

def __hash__(self):
field_hashes = (
hash(self.aws_access_key_id),
hash(self.aws_secret_access_key),
hash(self.aws_session_token),
hash(self.profile_name),
hash(self.region_name),
hash(frozenset(self.aws_client_parameters.dict().items())),
)
return hash(field_hashes)

def get_boto3_session(self) -> boto3.Session:
"""
Returns an authenticated boto3 session that can be used to create clients
Expand Down Expand Up @@ -104,7 +151,7 @@ def get_boto3_session(self) -> boto3.Session:
region_name=self.region_name,
)

def get_client(self, client_type: Union[str, ClientType]) -> Any:
def get_client(self, client_type: Union[str, ClientType]):
"""
Helper method to dynamically get a client type.
Expand All @@ -120,10 +167,7 @@ def get_client(self, client_type: Union[str, ClientType]) -> Any:
if isinstance(client_type, ClientType):
client_type = client_type.value

client = self.get_boto3_session().client(
service_name=client_type, **self.aws_client_parameters.get_params_override()
)
return client
return _get_client_cached(ctx=self, client_type=client_type)

def get_s3_client(self) -> S3Client:
"""
Expand Down Expand Up @@ -186,6 +230,21 @@ class MinIOCredentials(CredentialsBlock):
description="Extra parameters to initialize the Client.",
)

class Config:
"""Config class for pydantic model."""

arbitrary_types_allowed = True

def __hash__(self):
return hash(
(
hash(self.minio_root_user),
hash(self.minio_root_password),
hash(self.region_name),
hash(frozenset(self.aws_client_parameters.dict().items())),
)
)

def get_boto3_session(self) -> boto3.Session:
"""
Returns an authenticated boto3 session that can be used to create clients
Expand Down Expand Up @@ -218,7 +277,7 @@ def get_boto3_session(self) -> boto3.Session:
region_name=self.region_name,
)

def get_client(self, client_type: Union[str, ClientType]) -> Any:
def get_client(self, client_type: Union[str, ClientType]):
"""
Helper method to dynamically get a client type.
Expand All @@ -234,10 +293,7 @@ def get_client(self, client_type: Union[str, ClientType]) -> Any:
if isinstance(client_type, ClientType):
client_type = client_type.value

client = self.get_boto3_session().client(
service_name=client_type, **self.aws_client_parameters.get_params_override()
)
return client
return _get_client_cached(ctx=self, client_type=client_type)

def get_s3_client(self) -> S3Client:
"""
Expand Down
14 changes: 10 additions & 4 deletions prefect_aws/deployments/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ def push_to_s3(
bucket: The name of the S3 bucket where files will be uploaded.
folder: The folder in the S3 bucket where files will be uploaded.
credentials: A dictionary of AWS credentials (aws_access_key_id,
aws_secret_access_key, aws_session_token).
aws_secret_access_key, aws_session_token) or MinIO credentials
(minio_root_user, minio_root_password).
client_parameters: A dictionary of additional parameters to pass to the boto3
client.
ignore_file: The name of the file containing ignore patterns.
Expand Down Expand Up @@ -139,7 +140,8 @@ def pull_from_s3(
bucket: The name of the S3 bucket where files are stored.
folder: The folder in the S3 bucket where files are stored.
credentials: A dictionary of AWS credentials (aws_access_key_id,
aws_secret_access_key, aws_session_token).
aws_secret_access_key, aws_session_token) or MinIO credentials
(minio_root_user, minio_root_password).
client_parameters: A dictionary of additional parameters to pass to the
boto3 client.
Expand Down Expand Up @@ -204,8 +206,12 @@ def get_s3_client(
client_parameters = {}

# Get credentials from credentials (regardless if block or not)
aws_access_key_id = credentials.get("aws_access_key_id", None)
aws_secret_access_key = credentials.get("aws_secret_access_key", None)
aws_access_key_id = credentials.get(
"aws_access_key_id", credentials.get("minio_root_user", None)
)
aws_secret_access_key = credentials.get(
"aws_secret_access_key", credentials.get("minio_root_password", None)
)
aws_session_token = credentials.get("aws_session_token", None)

# Get remaining session info from credentials, or client_parameters
Expand Down
2 changes: 1 addition & 1 deletion prefect_aws/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ class ECSTask(Infrastructure):
description=(
"The type of ECS task run infrastructure that should be used. Note that"
" 'FARGATE_SPOT' is not a formal ECS launch type, but we will configure"
" the proper capacity provider stategy if set here."
" the proper capacity provider strategy if set here."
),
)
)
Expand Down
2 changes: 1 addition & 1 deletion prefect_aws/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ def _get_s3_client(self) -> boto3.client:
Authenticate MinIO credentials or AWS credentials and return an S3 client.
This is a helper function called by read_path() or write_path().
"""
return self.credentials.get_s3_client()
return self.credentials.get_client("s3")

def _get_bucket_resource(self) -> boto3.resource:
"""
Expand Down
2 changes: 1 addition & 1 deletion prefect_aws/workers/ecs_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ class ECSVariables(BaseVariables):
description=(
"The type of ECS task run infrastructure that should be used. Note that"
" 'FARGATE_SPOT' is not a formal ECS launch type, but we will configure"
" the proper capacity provider stategy if set here."
" the proper capacity provider strategy if set here."
),
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@
from prefect_aws.deployments.steps import get_s3_client, pull_from_s3, push_to_s3


@pytest.fixture(scope="module", autouse=True)
def set_custom_endpoint():
original = os.environ.get("MOTO_S3_CUSTOM_ENDPOINTS")
os.environ["MOTO_S3_CUSTOM_ENDPOINTS"] = "http://custom.minio.endpoint:9000"
yield
os.environ.pop("MOTO_S3_CUSTOM_ENDPOINTS")
if original is not None:
os.environ["MOTO_S3_CUSTOM_ENDPOINTS"] = original


@pytest.fixture
def s3_setup():
with mock_s3():
Expand Down Expand Up @@ -215,8 +225,15 @@ def test_s3_session_with_params():
},
)
get_s3_client(credentials=creds_block.dict())
get_s3_client(
credentials={
"minio_root_user": "MY_USER",
"minio_root_password": "MY_PASSWORD",
},
client_parameters={"endpoint_url": "http://custom.minio.endpoint:9000"},
)
all_calls = mock_session.mock_calls
assert len(all_calls) == 6
assert len(all_calls) == 8
assert all_calls[0].kwargs == {
"aws_access_key_id": "THE_KEY",
"aws_secret_access_key": "SHHH!",
Expand Down Expand Up @@ -265,6 +282,20 @@ def test_s3_session_with_params():
}.items() <= all_calls[5].kwargs.items()
assert all_calls[5].kwargs.get("config").connect_timeout == 123
assert all_calls[5].kwargs.get("config").signature_version is None
assert all_calls[6].kwargs == {
"aws_access_key_id": "MY_USER",
"aws_secret_access_key": "MY_PASSWORD",
"aws_session_token": None,
"profile_name": None,
"region_name": None,
}
assert all_calls[7].args[0] == "s3"
assert {
"api_version": None,
"use_ssl": True,
"verify": None,
"endpoint_url": "http://custom.minio.endpoint:9000",
}.items() <= all_calls[7].kwargs.items()


def test_custom_credentials_and_client_parameters(s3_setup, tmp_files):
Expand Down Expand Up @@ -309,6 +340,47 @@ def test_custom_credentials_and_client_parameters(s3_setup, tmp_files):
assert (tmp_path / file.name).exists()


def test_custom_credentials_and_client_parameters_minio(s3_setup, tmp_files):
s3, bucket_name = s3_setup
folder = "my-project"

# Custom credentials and client parameters
custom_credentials = {
"minio_root_user": "fake_user",
"minio_root_password": "fake_password",
}

custom_client_parameters = {
"endpoint_url": "http://custom.minio.endpoint:9000",
}

os.chdir(tmp_files)

# Test push_to_s3 with custom credentials and client parameters
push_to_s3(
bucket_name,
folder,
credentials=custom_credentials,
client_parameters=custom_client_parameters,
)

# Test pull_from_s3 with custom credentials and client parameters
tmp_path = tmp_files / "test_pull"
tmp_path.mkdir(parents=True, exist_ok=True)
os.chdir(tmp_path)

pull_from_s3(
bucket_name,
folder,
credentials=custom_credentials,
client_parameters=custom_client_parameters,
)

for file in tmp_files.iterdir():
if file.is_file() and file.name != ".prefectignore":
assert (tmp_path / file.name).exists()


def test_without_prefectignore_file(s3_setup, tmp_files: Path, mock_aws_credentials):
s3, bucket_name = s3_setup
folder = "my-project"
Expand Down
Loading

0 comments on commit adee7ff

Please sign in to comment.