Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

provide ability to cache boto client instances directly and on S3Bucket #369

Merged
merged 33 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
c584ed7
move s3_client up
mattiamatrix Dec 19, 2023
70ba4b7
get s3 client
mattiamatrix Dec 19, 2023
2389cbe
add print for testing
mattiamatrix Dec 19, 2023
4a74401
revert changes
mattiamatrix Dec 19, 2023
362c97f
update client
mattiamatrix Dec 19, 2023
5feb200
remove print
mattiamatrix Dec 22, 2023
43fad43
add @lru_cache
mattiamatrix Dec 22, 2023
1df6cb3
Add docs
mattiamatrix Dec 22, 2023
e23c67b
update docs
mattiamatrix Dec 22, 2023
7428ae3
revert changes
mattiamatrix Dec 22, 2023
a1a8866
fix docs
mattiamatrix Dec 22, 2023
dd0cca4
Update CHANGELOG.md
mattiamatrix Dec 22, 2023
bcf2bed
Add maxsize and typed=True
mattiamatrix Dec 22, 2023
4ec6a1e
Merge branch 'main' into use-s3-client-more-efficiently
zzstoatzz Jan 3, 2024
ab03c45
add test
mattiamatrix Jan 3, 2024
92e5d72
Merge branch 'main' into use-s3-client-more-efficiently
mattiamatrix Jan 3, 2024
59e38d1
Test with cache_info
mattiamatrix Jan 3, 2024
3cd1eab
Update AwsCredentials
mattiamatrix Jan 3, 2024
e0a927b
Only S3
mattiamatrix Jan 3, 2024
0de61f8
Empty-Commit
mattiamatrix Jan 3, 2024
dd09cb1
Merge branch 'main' into use-s3-client-more-efficiently
zzstoatzz Jan 10, 2024
283050e
Update hash function
mattiamatrix Jan 18, 2024
3200967
Revert changes
mattiamatrix Jan 18, 2024
e8b61e0
Update hash
mattiamatrix Jan 18, 2024
56210f8
Test different hash
mattiamatrix Jan 18, 2024
865975c
avoid modifying default behavior
zzstoatzz Jan 18, 2024
9b6cd9b
run pre-commits
zzstoatzz Jan 18, 2024
1f9c3d0
no way
zzstoatzz Jan 18, 2024
7202545
test caching via s3
zzstoatzz Jan 18, 2024
5aad7ab
Update prefect_aws/s3.py
zzstoatzz Jan 18, 2024
8379d3d
caching by default, remove toggle
zzstoatzz Jan 19, 2024
95df7a0
region
zzstoatzz Jan 19, 2024
1fde31e
check hashing on both creds classes
zzstoatzz Jan 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- Handle `boto3` clients more efficiently with `lru_cache` - [#361](https://github.com/PrefectHQ/prefect-aws/pull/361)

### Fixed

### Deprecated
Expand Down Expand Up @@ -105,6 +107,7 @@ Released August 31st, 2023.
Released July 20th, 2023.

### Changed

- Promoted workers to GA, removed beta disclaimers

## 0.3.5
Expand Down Expand Up @@ -293,6 +296,7 @@ Released on October 28th, 2022.
- `ECSTask` is no longer experimental — [#137](https://github.com/PrefectHQ/prefect-aws/pull/137)

### Fixed

- Fix ignore_file option in `S3Bucket` skipping files which should be included — [#139](https://github.com/PrefectHQ/prefect-aws/pull/139)
- Fixed bug where `basepath` is used twice in the path when using `S3Bucket.put_directory` - [#143](https://github.com/PrefectHQ/prefect-aws/pull/143)

Expand Down
12 changes: 12 additions & 0 deletions prefect_aws/client_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ class AwsClientParameters(BaseModel):
title="Botocore Config",
)

def __hash__(self):
return hash(
(
self.api_version,
self.use_ssl,
self.verify,
self.verify_cert_path,
self.endpoint_url,
self.config,
)
)

@validator("config", pre=True)
def instantiate_config(cls, value: Union[Config, Dict[str, Any]]) -> Dict[str, Any]:
"""
Expand Down
85 changes: 75 additions & 10 deletions prefect_aws/credentials.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Module handling AWS credentials"""

from enum import Enum
from functools import lru_cache
from typing import Any, Optional, Union

import boto3
Expand All @@ -18,12 +19,38 @@


class ClientType(Enum):
"""The supported boto3 clients."""

S3 = "s3"
ECS = "ecs"
BATCH = "batch"
SECRETS_MANAGER = "secretsmanager"


@lru_cache(maxsize=8, typed=True)
def _get_client_cached(ctx, client_type: Union[str, ClientType]) -> Any:
"""
Helper method to cache and dynamically get a client type.

Args:
client_type: The client's service name.

Returns:
An authenticated client.

Raises:
ValueError: if the client is not supported.
"""
if isinstance(client_type, ClientType):
client_type = client_type.value

client = ctx.get_boto3_session().client(
service_name=client_type,
**ctx.aws_client_parameters.get_params_override(),
)
return client


class AwsCredentials(CredentialsBlock):
"""
Block used to manage authentication with AWS. AWS authentication is
Expand Down Expand Up @@ -75,6 +102,23 @@ class AwsCredentials(CredentialsBlock):
title="AWS Client Parameters",
)

class Config:
"""Config class for pydantic model."""

arbitrary_types_allowed = True

def __hash__(self):
return hash(
(
self.aws_access_key_id,
self.aws_secret_access_key,
self.aws_session_token,
self.profile_name,
self.region_name,
self.aws_client_parameters,
)
)

def get_boto3_session(self) -> boto3.Session:
"""
Returns an authenticated boto3 session that can be used to create clients
Expand Down Expand Up @@ -104,7 +148,7 @@ def get_boto3_session(self) -> boto3.Session:
region_name=self.region_name,
)

def get_client(self, client_type: Union[str, ClientType]) -> Any:
def get_client(self, client_type: Union[str, ClientType], use_cache: bool = False):
"""
Helper method to dynamically get a client type.

Expand All @@ -120,10 +164,13 @@ def get_client(self, client_type: Union[str, ClientType]) -> Any:
if isinstance(client_type, ClientType):
client_type = client_type.value

client = self.get_boto3_session().client(
service_name=client_type, **self.aws_client_parameters.get_params_override()
)
return client
if not use_cache:
return self.get_boto3_session().client(
service_name=client_type,
**self.aws_client_parameters.get_params_override(),
)

return _get_client_cached(ctx=self, client_type=client_type)

def get_s3_client(self) -> S3Client:
"""
Expand Down Expand Up @@ -186,6 +233,21 @@ class MinIOCredentials(CredentialsBlock):
description="Extra parameters to initialize the Client.",
)

class Config:
"""Config class for pydantic model."""

arbitrary_types_allowed = True

def __hash__(self):
return hash(
(
self.minio_root_user,
self.minio_root_password,
self.region_name,
self.aws_client_parameters,
)
)

def get_boto3_session(self) -> boto3.Session:
"""
Returns an authenticated boto3 session that can be used to create clients
Expand Down Expand Up @@ -218,7 +280,7 @@ def get_boto3_session(self) -> boto3.Session:
region_name=self.region_name,
)

def get_client(self, client_type: Union[str, ClientType]) -> Any:
def get_client(self, client_type: Union[str, ClientType], use_cache: bool = False):
"""
Helper method to dynamically get a client type.

Expand All @@ -234,10 +296,13 @@ def get_client(self, client_type: Union[str, ClientType]) -> Any:
if isinstance(client_type, ClientType):
client_type = client_type.value

client = self.get_boto3_session().client(
service_name=client_type, **self.aws_client_parameters.get_params_override()
)
return client
if not use_cache:
return self.get_boto3_session().client(
service_name=client_type,
**self.aws_client_parameters.get_params_override(),
)

return _get_client_cached(ctx=self, client_type=client_type)

def get_s3_client(self) -> S3Client:
"""
Expand Down
11 changes: 10 additions & 1 deletion prefect_aws/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,15 @@ class S3Bucket(WritableFileSystem, WritableDeploymentStorage, ObjectStorageBlock
),
)

cache_client: bool = Field(
default=False,
description=(
"If True, the S3 client will be cached. This is useful for "
"performance, but can cause issues if the S3 client is used "
"in multiple threads."
zzstoatzz marked this conversation as resolved.
Show resolved Hide resolved
),
)

# Property to maintain compatibility with storage block based deployments
@property
def basepath(self) -> str:
Expand Down Expand Up @@ -466,7 +475,7 @@ def _get_s3_client(self) -> boto3.client:
Authenticate MinIO credentials or AWS credentials and return an S3 client.
This is a helper function called by read_path() or write_path().
"""
return self.credentials.get_s3_client()
return self.credentials.get_client("s3", use_cache=self.cache_client)

def _get_bucket_resource(self) -> boto3.resource:
"""
Expand Down
36 changes: 35 additions & 1 deletion tests/test_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
from botocore.client import BaseClient
from moto import mock_s3

from prefect_aws.credentials import AwsCredentials, ClientType, MinIOCredentials
from prefect_aws.credentials import (
AwsCredentials,
ClientType,
MinIOCredentials,
_get_client_cached,
)


def test_aws_credentials_get_boto3_session():
Expand Down Expand Up @@ -44,3 +49,32 @@ def test_minio_credentials_get_boto3_session():
def test_credentials_get_client(credentials, client_type):
with mock_s3():
assert isinstance(credentials.get_client(client_type), BaseClient)


@pytest.mark.parametrize("client_type", [member.value for member in ClientType])
def test_get_client_cached(client_type):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test that ensures that changing some configuration on an AwsCredentials instance after instantiation and fetching a client causes a cache miss?

"""
Test to ensure that _get_client_cached function returns the same instance
for multiple calls with the same parameters and properly utilizes lru_cache.
"""

# Create a mock AwsCredentials instance
aws_credentials_block = AwsCredentials(region_name="us-east-1")

# Clear cache
_get_client_cached.cache_clear()

assert _get_client_cached.cache_info().hits == 0, "Initial call count should be 0"

assert aws_credentials_block.get_client(client_type) is not None

assert _get_client_cached.cache_info().hits == 0, "Cache should not yet be used"

# Call get_client multiple times with the same parameters
aws_credentials_block.get_client(client_type, use_cache=True)
aws_credentials_block.get_client(client_type, use_cache=True)
aws_credentials_block.get_client(client_type, use_cache=True)

# Verify that _get_client_cached is called only once due to caching
assert _get_client_cached.cache_info().misses == 1
assert _get_client_cached.cache_info().hits == 2
14 changes: 14 additions & 0 deletions tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from prefect_aws import AwsCredentials, MinIOCredentials
from prefect_aws.client_parameters import AwsClientParameters
from prefect_aws.credentials import _get_client_cached
from prefect_aws.s3 import (
S3Bucket,
s3_copy,
Expand Down Expand Up @@ -1047,3 +1048,16 @@ def test_move_object_between_buckets(

with pytest.raises(ClientError):
assert s3_bucket_with_object.read_path("object") == b"TEST"

def test_client_is_cached_when_specified(self, aws_creds_block):
s3_bucket = S3Bucket(
bucket_name="bucket", credentials=aws_creds_block, cache_client=True
)

_get_client_cached.cache_clear()

s3_bucket._get_s3_client()
s3_bucket._get_s3_client()

assert _get_client_cached.cache_info().hits == 1
assert _get_client_cached.cache_info().misses == 1