Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'origin/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
knakazawa99 committed Jan 24, 2024
2 parents 41367dd + 12299d4 commit 0d04479
Show file tree
Hide file tree
Showing 20 changed files with 1,128 additions and 34 deletions.
64 changes: 60 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,68 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

### Changed
- Added 'SecretBrinary' suport to `AwsSecret` block - [#274](https://github.com/PrefectHQ/prefect-aws/pull/274)

### Fixed

### Deprecated

### Removed

## 0.4.8
Released January 23rd, 2024;

### Added

- Support MinIO Credentials as `credentials` dict for `push_to_s3` and `pull_from_s3` - [#366](https://github.com/PrefectHQ/prefect-aws/pull/366)

### Changed

- Handle `boto3` clients more efficiently with `lru_cache` - [#361](https://github.com/PrefectHQ/prefect-aws/pull/361)

## 0.4.7

Released January 3rd, 2024.

### Added

- `LambdaFunction` block to invoke lambda functions - [#355](https://github.com/PrefectHQ/prefect-aws/pull/355)

### Fixed

- Bug where `S3Bucket.load()` constructed `AwsCredentials` instead of `MinIOCredentials` - [#359](https://github.com/PrefectHQ/prefect-aws/pull/359)

## 0.4.6

Released December 11th, 2023.

### Added

Ability to publish `ECSTask`` block as an ecs work pool - [#353](https://github.com/PrefectHQ/prefect-aws/pull/353)

## 0.4.5

Released November 30th, 2023.

### Fixed

- Bug where Prefect API key provided to ECS tasks was masked - [#347](https://github.com/PrefectHQ/prefect-aws/pull/347)

## 0.4.4

Released November 29th, 2023.

### Changed

- Mask Prefect API key in logs - [#341](https://github.com/PrefectHQ/prefect-aws/pull/341)

## 0.4.3

Released November 13th, 2023.

### Added

- `SecretBrinary` suport to `AwsSecret` block - [#274](https://github.com/PrefectHQ/prefect-aws/pull/274)

## 0.4.2

Released November 6th, 2023.
Expand Down Expand Up @@ -62,6 +116,7 @@ Released August 31st, 2023.
Released July 20th, 2023.

### Changed

- Promoted workers to GA, removed beta disclaimers

## 0.3.5
Expand Down Expand Up @@ -95,7 +150,7 @@ Released on June 13th, 2023.

### Fixed

- Change prefect.docker import to prefect.utilities.dockerutils to fix a crash when using custom blocks based on S3Bucket - [#273](https://github.com/PrefectHQ/prefect-aws/pull/273)
- Change prefect.docker import to prefect.utilities.dockerutils to fix a crash when using custom blocks based on S3Bucket - [#273](https://github.com/PrefectHQ/prefect-aws/pull/273)

## 0.3.2

Expand Down Expand Up @@ -184,7 +239,7 @@ Released on January 4th, 2023.

- `list_objects`, `download_object_to_path`, `download_object_to_file_object`, `download_folder_to_path`, `upload_from_path`, `upload_from_file_object`, `upload_from_folder` methods in `S3Bucket` - [#85](https://github.com/PrefectHQ/prefect-aws/pull/175)
- `aws_client_parameters` as a field in `AwsCredentials` and `MinioCredentials` blocks - [#175](https://github.com/PrefectHQ/prefect-aws/pull/175)
- `get_client` and `get_s3_client` methods to `AwsCredentials` and `MinioCredentials` blocks - [#175](https://github.com/PrefectHQ/prefect-aws/pull/175)
- `get_client` and `get_s3_client` methods to `AwsCredentials` and `MinioCredentials` blocks - [#175](https://github.com/PrefectHQ/prefect-aws/pull/175)

### Changed

Expand All @@ -196,7 +251,7 @@ Released on January 4th, 2023.

- `endpoint_url` field in S3Bucket; specify `aws_client_parameters` in `AwsCredentials` or `MinIOCredentials` instead - [#175](https://github.com/PrefectHQ/prefect-aws/pull/175)
- `basepath` field in S3Bucket; specify `bucket_folder` instead - [#175](https://github.com/PrefectHQ/prefect-aws/pull/175)
- `minio_credentials` and `aws_credentials` field in S3Bucket; use the `credentials` field instead - [#175](https://github.com/PrefectHQ/prefect-aws/pull/175)
- `minio_credentials` and `aws_credentials` field in S3Bucket; use the `credentials` field instead - [#175](https://github.com/PrefectHQ/prefect-aws/pull/175)

## 0.2.1

Expand Down Expand Up @@ -250,6 +305,7 @@ Released on October 28th, 2022.
- `ECSTask` is no longer experimental — [#137](https://github.com/PrefectHQ/prefect-aws/pull/137)

### Fixed

- Fix ignore_file option in `S3Bucket` skipping files which should be included — [#139](https://github.com/PrefectHQ/prefect-aws/pull/139)
- Fixed bug where `basepath` is used twice in the path when using `S3Bucket.put_directory` - [#143](https://github.com/PrefectHQ/prefect-aws/pull/143)

Expand Down
6 changes: 3 additions & 3 deletions docs/ecs_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ ECS (Elastic Container Service) tasks are a good option for executing Prefect 2

## ECS Flow Run Execution

Prefect enables remote flow execution via [workers](https://docs.prefect.io/2.11.1/concepts/work-pools/#worker-overview) and [work pools](https://docs.prefect.io/2.11.1/concepts/work-pools/#work-pool-overview). To learn more about these concepts please see our [deployment tutorial](https://docs.prefect.io/2.11.1/tutorial/deployments/).
Prefect enables remote flow execution via [workers](https://docs.prefect.io/concepts/work-pools/#worker-overview) and [work pools](https://docs.prefect.io/concepts/work-pools/#work-pool-overview). To learn more about these concepts please see our [deployment tutorial](https://docs.prefect.io/tutorial/deployments/).

For details on how workers and work pools are implemented for ECS, see the diagram below:
#### Architecture Diagram
Expand Down Expand Up @@ -288,10 +288,10 @@ To create an [IAM role](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_role
- Do your flow runs require higher `CPU`?
- Would an EC2 `Launch Type` speed up your flow run execution?

These infrastructure configuration values can be set on your ECS work pool or they can be overridden on the deployment level through [job_variables](https://docs.prefect.io/2.11.0/concepts/infrastructure/#kubernetesjob-overrides-and-customizations) if desired.
These infrastructure configuration values can be set on your ECS work pool or they can be overridden on the deployment level through [job_variables](https://docs.prefect.io/concepts/infrastructure/#kubernetesjob-overrides-and-customizations) if desired.


2. Consider adding a [build action](https://docs.prefect.io/2.11.0/concepts/deployments-ux/#the-build-action) to your Prefect Project [`prefect.yaml`](https://docs.prefect.io/2.11.0/concepts/deployments-ux/#the-prefect-yaml-file) if you want to automatically build a Docker image and push it to an image registry `prefect deploy` is run.
2. Consider adding a [build action](https://docs.prefect.io/concepts/deployments-ux/#the-build-action) to your Prefect Project [`prefect.yaml`](https://docs.prefect.io/concepts/deployments-ux/#the-prefect-yaml-file) if you want to automatically build a Docker image and push it to an image registry `prefect deploy` is run.

Here is an example build action for ECR:
```yaml
Expand Down
6 changes: 6 additions & 0 deletions docs/lambda_function.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
description: Module handling AWS Lambda functions
notes: This documentation page is generated from source file docstrings.
---

::: prefect_aws.lambda_function
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ nav:
- Credentials: credentials.md
- ECS Worker: ecs_worker.md
- ECS: ecs.md
- Lambda: lambda_function.md
- Deployments:
- Steps: deployments/steps.md
- Glue Job: glue_job.md
Expand Down
2 changes: 2 additions & 0 deletions prefect_aws/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from . import _version
from .credentials import AwsCredentials, MinIOCredentials
from .client_parameters import AwsClientParameters
from .lambda_function import LambdaFunction
from .s3 import S3Bucket
from .ecs import ECSTask
from .secrets_manager import AwsSecret
Expand All @@ -17,6 +18,7 @@
__all__ = [
"AwsCredentials",
"AwsClientParameters",
"LambdaFunction",
"MinIOCredentials",
"S3Bucket",
"ECSTask",
Expand Down
12 changes: 12 additions & 0 deletions prefect_aws/client_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ class AwsClientParameters(BaseModel):
title="Botocore Config",
)

def __hash__(self):
return hash(
(
self.api_version,
self.use_ssl,
self.verify,
self.verify_cert_path,
self.endpoint_url,
self.config,
)
)

@validator("config", pre=True)
def instantiate_config(cls, value: Union[Config, Dict[str, Any]]) -> Dict[str, Any]:
"""
Expand Down
76 changes: 66 additions & 10 deletions prefect_aws/credentials.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Module handling AWS credentials"""

from enum import Enum
from functools import lru_cache
from threading import Lock
from typing import Any, Optional, Union

import boto3
Expand All @@ -16,14 +18,43 @@

from prefect_aws.client_parameters import AwsClientParameters

_LOCK = Lock()


class ClientType(Enum):
"""The supported boto3 clients."""

S3 = "s3"
ECS = "ecs"
BATCH = "batch"
SECRETS_MANAGER = "secretsmanager"


@lru_cache(maxsize=8, typed=True)
def _get_client_cached(ctx, client_type: Union[str, ClientType]) -> Any:
"""
Helper method to cache and dynamically get a client type.
Args:
client_type: The client's service name.
Returns:
An authenticated client.
Raises:
ValueError: if the client is not supported.
"""
with _LOCK:
if isinstance(client_type, ClientType):
client_type = client_type.value

client = ctx.get_boto3_session().client(
service_name=client_type,
**ctx.aws_client_parameters.get_params_override(),
)
return client


class AwsCredentials(CredentialsBlock):
"""
Block used to manage authentication with AWS. AWS authentication is
Expand Down Expand Up @@ -75,6 +106,22 @@ class AwsCredentials(CredentialsBlock):
title="AWS Client Parameters",
)

class Config:
"""Config class for pydantic model."""

arbitrary_types_allowed = True

def __hash__(self):
field_hashes = (
hash(self.aws_access_key_id),
hash(self.aws_secret_access_key),
hash(self.aws_session_token),
hash(self.profile_name),
hash(self.region_name),
hash(frozenset(self.aws_client_parameters.dict().items())),
)
return hash(field_hashes)

def get_boto3_session(self) -> boto3.Session:
"""
Returns an authenticated boto3 session that can be used to create clients
Expand Down Expand Up @@ -104,7 +151,7 @@ def get_boto3_session(self) -> boto3.Session:
region_name=self.region_name,
)

def get_client(self, client_type: Union[str, ClientType]) -> Any:
def get_client(self, client_type: Union[str, ClientType]):
"""
Helper method to dynamically get a client type.
Expand All @@ -120,10 +167,7 @@ def get_client(self, client_type: Union[str, ClientType]) -> Any:
if isinstance(client_type, ClientType):
client_type = client_type.value

client = self.get_boto3_session().client(
service_name=client_type, **self.aws_client_parameters.get_params_override()
)
return client
return _get_client_cached(ctx=self, client_type=client_type)

def get_s3_client(self) -> S3Client:
"""
Expand Down Expand Up @@ -186,6 +230,21 @@ class MinIOCredentials(CredentialsBlock):
description="Extra parameters to initialize the Client.",
)

class Config:
"""Config class for pydantic model."""

arbitrary_types_allowed = True

def __hash__(self):
return hash(
(
hash(self.minio_root_user),
hash(self.minio_root_password),
hash(self.region_name),
hash(frozenset(self.aws_client_parameters.dict().items())),
)
)

def get_boto3_session(self) -> boto3.Session:
"""
Returns an authenticated boto3 session that can be used to create clients
Expand Down Expand Up @@ -218,7 +277,7 @@ def get_boto3_session(self) -> boto3.Session:
region_name=self.region_name,
)

def get_client(self, client_type: Union[str, ClientType]) -> Any:
def get_client(self, client_type: Union[str, ClientType]):
"""
Helper method to dynamically get a client type.
Expand All @@ -234,10 +293,7 @@ def get_client(self, client_type: Union[str, ClientType]) -> Any:
if isinstance(client_type, ClientType):
client_type = client_type.value

client = self.get_boto3_session().client(
service_name=client_type, **self.aws_client_parameters.get_params_override()
)
return client
return _get_client_cached(ctx=self, client_type=client_type)

def get_s3_client(self) -> S3Client:
"""
Expand Down
14 changes: 10 additions & 4 deletions prefect_aws/deployments/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ def push_to_s3(
bucket: The name of the S3 bucket where files will be uploaded.
folder: The folder in the S3 bucket where files will be uploaded.
credentials: A dictionary of AWS credentials (aws_access_key_id,
aws_secret_access_key, aws_session_token).
aws_secret_access_key, aws_session_token) or MinIO credentials
(minio_root_user, minio_root_password).
client_parameters: A dictionary of additional parameters to pass to the boto3
client.
ignore_file: The name of the file containing ignore patterns.
Expand Down Expand Up @@ -139,7 +140,8 @@ def pull_from_s3(
bucket: The name of the S3 bucket where files are stored.
folder: The folder in the S3 bucket where files are stored.
credentials: A dictionary of AWS credentials (aws_access_key_id,
aws_secret_access_key, aws_session_token).
aws_secret_access_key, aws_session_token) or MinIO credentials
(minio_root_user, minio_root_password).
client_parameters: A dictionary of additional parameters to pass to the
boto3 client.
Expand Down Expand Up @@ -204,8 +206,12 @@ def get_s3_client(
client_parameters = {}

# Get credentials from credentials (regardless if block or not)
aws_access_key_id = credentials.get("aws_access_key_id", None)
aws_secret_access_key = credentials.get("aws_secret_access_key", None)
aws_access_key_id = credentials.get(
"aws_access_key_id", credentials.get("minio_root_user", None)
)
aws_secret_access_key = credentials.get(
"aws_secret_access_key", credentials.get("minio_root_password", None)
)
aws_session_token = credentials.get("aws_session_token", None)

# Get remaining session info from credentials, or client_parameters
Expand Down
Loading

0 comments on commit 0d04479

Please sign in to comment.