Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

AWS S3 copy and move tasks and S3Bucket methods #316

Merged
merged 10 commits into from
Oct 10, 2023
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- AWS S3 copy and move tasks and `S3Bucket` methods - [#316](https://github.com/PrefectHQ/prefect-aws/pull/316)

### Added

- Added retries to ECS task run creation for ECS worker - [#303](https://github.com/PrefectHQ/prefect-aws/pull/303)
Expand Down
317 changes: 317 additions & 0 deletions prefect_aws/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,163 @@ async def example_s3_upload_flow():
return key


@task
async def s3_copy(
source_path: str,
target_path: str,
source_bucket_name: str,
aws_credentials: AwsCredentials,
target_bucket_name: Optional[str] = None,
aws_client_parameters: AwsClientParameters = AwsClientParameters(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can set client parameters on a AwsCredntials object, so I think we can remove this parameter from the task.

**copy_kwargs,
) -> str:
"""Uses S3's internal
[CopyObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html)
to copy objects within or between buckets. To copy objects between buckets, the
credentials must have permission to read the source object and write to the target
object.

Args:
source_path: The path to the object to copy. Can be a string or `Path`.
target_path: The path to copy the object to. Can be a string or `Path`.
source_bucket_name: The bucket to copy the object from.
aws_credentials: Credentials to use for authentication with AWS.
target_bucket_name: The bucket to copy the object to. If not provided, defaults
to `source_bucket`.
aws_client_parameters: Custom parameter for the boto3 client initialization.
**copy_kwargs: Additional keyword arguments to pass to `S3Client.copy_object`.

Returns:
The path that the object was copied to. Excludes the bucket name.

Examples:

Copy notes.txt from s3://my-bucket/my_folder/notes.txt to
s3://my-bucket/my_folder/notes_copy.txt.

```python
from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.s3 import s3_copy

aws_credentials = AwsCredentials.load("my-creds")

@flow
async def example_copy_flow():
await s3_copy(
source_path="my_folder/notes.txt",
target_path="my_folder/notes_copy.txt",
source_bucket_name="my-bucket",
aws_credentials=aws_credentials,
)

example_copy_flow()
```

Copy notes.txt from s3://my-bucket/my_folder/notes.txt to
s3://other-bucket/notes_copy.txt.

```python
from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.s3 import s3_copy

aws_credentials = AwsCredentials.load("shared-creds")

@flow
async def example_copy_flow():
await s3_copy(
source_path="my_folder/notes.txt",
target_path="notes_copy.txt",
source_bucket_name="my-bucket",
aws_credentials=aws_credentials,
target_bucket_name="other-bucket",
)

example_copy_flow()
```

"""
logger = get_run_logger()

s3_client = aws_credentials.get_boto3_session().client(
"s3", **aws_client_parameters.get_params_override()
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend using get_client to ensure that the client parameters on aws_credentials are respected.

Suggested change
s3_client = aws_credentials.get_boto3_session().client(
"s3", **aws_client_parameters.get_params_override()
)
s3_client = aws_credentials.get_client("s3")


target_bucket_name = target_bucket_name or source_bucket_name

logger.info(
"Copying object from bucket %s with key %s to bucket %s with key %s",
source_bucket_name,
source_path,
target_bucket_name,
target_path,
)

s3_client.copy_object(
CopySource={"Bucket": source_bucket_name, "Key": source_path},
Bucket=target_bucket_name,
Key=target_path,
**copy_kwargs,
)

return target_path


@task
async def s3_move(
source_path: str,
target_path: str,
source_bucket_name: str,
aws_credentials: AwsCredentials,
target_bucket_name: Optional[str] = None,
aws_client_parameters: AwsClientParameters = AwsClientParameters(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can set client parameters on a AwsCredntials object, so I think we can remove this parameter from the task.

) -> str:
"""
Move an object from one S3 location to another

Args:
source_path: The path of the object to move
target_path: The path to move the object to
source_bucket_name: The name of the bucket containing the source object
aws_credentials: Credentials to use for authentication with AWS.
target_bucket_name: The bucket to copy the object to. If not provided, defaults
to `source_bucket`.
aws_client_parameters: Custom parameter for the boto3 client initialization.

Returns:
The path that the object was moved to. Excludes the bucket name.
"""
logger = get_run_logger()

s3_client = aws_credentials.get_boto3_session().client(
"s3", **aws_client_parameters.get_params_override()
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend using get_client to ensure that the client parameters on aws_credentials are respected.

Suggested change
s3_client = aws_credentials.get_boto3_session().client(
"s3", **aws_client_parameters.get_params_override()
)
s3_client = aws_credentials.get_client("s3")


# If target bucket is not provided, assume it's the same as the source bucket
target_bucket_name = target_bucket_name or source_bucket_name

logger.info(
"Moving object from s3://%s/%s s3://%s/%s",
source_bucket_name,
source_path,
target_bucket_name,
target_path,
)

# Copy the object to the new location
s3_client.copy_object(
Bucket=target_bucket_name,
CopySource={"Bucket": source_bucket_name, "Key": source_path},
Key=target_path,
)

# Delete the original object
s3_client.delete_object(Bucket=source_bucket_name, Key=source_path)

return target_path


def _list_objects_sync(page_iterator: PageIterator):
"""
Synchronous method to collect S3 objects into a list
Expand Down Expand Up @@ -1023,3 +1180,163 @@ async def upload_from_folder(
)

return to_folder

@sync_compatible
async def copy_object(
self,
from_path: Union[str, Path],
to_path: Union[str, Path],
to_bucket: Optional[Union["S3Bucket", str]] = None,
**copy_kwargs,
) -> str:
"""Uses S3's internal
[CopyObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html)
to copy objects within or between buckets.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also have a stream_from method on this class, but this method looks like it is different in subtle ways. Could you update the docstring to explicitly callout when you would use the function vs. stream_from?

Copy link
Contributor Author

@dominictarro dominictarro Sep 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah so the stream_from is good when there isn't a single set of credentials that can read from the source and write to the destination. The example I gave in #276 was with a public bucket like NOAA's GHCN-D bucket, which must be read via an unsigned request. Its shortcoming is that the object has to be downloaded to the Python process, then uploaded again. Sometimes that is necessary, but sometimes we can just use S3's internal copy operation, which completes large copies in seconds.

Will add context to the docstrings of both methods.


Args:
from_path: The path of the object to copy.
to_path: The path to copy the object to.
to_bucket: The bucket to copy to. Defaults to the current bucket.
**copy_kwargs: Additional keyword arguments to pass to
`S3Client.copy_object`.

Returns:
The path that the object was copied to. Excludes the bucket name.

Examples:

Copy notes.txt from my_folder/notes.txt to my_folder/notes_copy.txt.

```python
from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
s3_bucket.copy_object("my_folder/notes.txt", "my_folder/notes_copy.txt")
```

Copy notes.txt from my_folder/notes.txt to my_folder/notes_copy.txt in
another bucket.

```python
from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
s3_bucket.copy_object(
"my_folder/notes.txt",
"my_folder/notes_copy.txt",
to_bucket="other-bucket"
)
```
"""
s3_client = self.credentials.get_s3_client()

source_path = self._resolve_path(Path(from_path).as_posix())
target_path = self._resolve_path(Path(to_path).as_posix())

source_bucket_name = self.bucket_name
target_bucket_name = self.bucket_name
if isinstance(to_bucket, S3Bucket):
target_bucket_name = to_bucket.bucket_name
target_path = to_bucket._resolve_path(target_path)
elif isinstance(to_bucket, str):
target_bucket_name = to_bucket
elif to_bucket is not None:
raise TypeError(
"to_bucket must be a string or S3Bucket, not"
f" {type(target_bucket_name)}"
)

self.logger.info(
"Copying object from bucket %s with key %s to bucket %s with key %s",
source_bucket_name,
source_path,
target_bucket_name,
target_path,
)

s3_client.copy_object(
CopySource={"Bucket": source_bucket_name, "Key": source_path},
Bucket=target_bucket_name,
Key=target_path,
**copy_kwargs,
)

return target_path

@sync_compatible
async def move_object(
self,
from_path: Union[str, Path],
to_path: Union[str, Path],
to_bucket: Optional[Union["S3Bucket", str]] = None,
) -> str:
"""Uses S3's internal CopyObject and DeleteObject to move objects within or
between buckets.

Args:
from_path: The path of the object to move.
to_path: The path to move the object to.
to_bucket: The bucket to move to. Defaults to the current bucket.

Returns:
The path that the object was moved to. Excludes the bucket name.

Examples:

Move notes.txt from my_folder/notes.txt to my_folder/notes_copy.txt.

```python
from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
s3_bucket.move_object("my_folder/notes.txt", "my_folder/notes_copy.txt")
```

Move notes.txt from my_folder/notes.txt to my_folder/notes_copy.txt in
another bucket.

```python
from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
s3_bucket.move_object(
"my_folder/notes.txt",
"my_folder/notes_copy.txt",
to_bucket="other-bucket"
)
```
"""
s3_client = self.credentials.get_s3_client()

source_path = self._resolve_path(Path(from_path).as_posix())
target_path = self._resolve_path(Path(to_path).as_posix())

source_bucket_name = self.bucket_name
target_bucket_name = self.bucket_name
if isinstance(to_bucket, S3Bucket):
target_bucket_name = to_bucket.bucket_name
target_path = to_bucket._resolve_path(target_path)
elif isinstance(to_bucket, str):
target_bucket_name = to_bucket
elif to_bucket is not None:
raise TypeError(
"to_bucket must be a string or S3Bucket, not"
f" {type(target_bucket_name)}"
)

self.logger.info(
"Moving object from s3://%s/%s to s3://%s/%s",
source_bucket_name,
source_path,
target_bucket_name,
target_path,
)

# If invalid, should error and prevent next operation
s3_client.copy(
CopySource={"Bucket": source_bucket_name, "Key": source_path},
Bucket=target_bucket_name,
Key=target_path,
)
s3_client.delete_object(Bucket=source_bucket_name, Key=source_path)
return target_path
Loading