Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
AWS S3 copy and move tasks and S3Bucket methods (#316)
Browse files Browse the repository at this point in the history
  • Loading branch information
dominictarro authored Oct 10, 2023
1 parent 8650dbf commit 9ba5424
Show file tree
Hide file tree
Showing 3 changed files with 561 additions and 4 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- AWS S3 copy and move tasks and `S3Bucket` methods - [#316](https://github.com/PrefectHQ/prefect-aws/pull/316)

### Added

- Added retries to ECS task run creation for ECS worker - [#303](https://github.com/PrefectHQ/prefect-aws/pull/303)
Expand Down
325 changes: 324 additions & 1 deletion prefect_aws/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,160 @@ async def example_s3_upload_flow():
return key


@task
async def s3_copy(
source_path: str,
target_path: str,
source_bucket_name: str,
aws_credentials: AwsCredentials,
target_bucket_name: Optional[str] = None,
**copy_kwargs,
) -> str:
"""Uses S3's internal
[CopyObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html)
to copy objects within or between buckets. To copy objects between buckets, the
credentials must have permission to read the source object and write to the target
object. If the credentials do not have those permissions, try using
`S3Bucket.stream_from`.
Args:
source_path: The path to the object to copy. Can be a string or `Path`.
target_path: The path to copy the object to. Can be a string or `Path`.
source_bucket_name: The bucket to copy the object from.
aws_credentials: Credentials to use for authentication with AWS.
target_bucket_name: The bucket to copy the object to. If not provided, defaults
to `source_bucket`.
**copy_kwargs: Additional keyword arguments to pass to `S3Client.copy_object`.
Returns:
The path that the object was copied to. Excludes the bucket name.
Examples:
Copy notes.txt from s3://my-bucket/my_folder/notes.txt to
s3://my-bucket/my_folder/notes_copy.txt.
```python
from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.s3 import s3_copy
aws_credentials = AwsCredentials.load("my-creds")
@flow
async def example_copy_flow():
await s3_copy(
source_path="my_folder/notes.txt",
target_path="my_folder/notes_copy.txt",
source_bucket_name="my-bucket",
aws_credentials=aws_credentials,
)
example_copy_flow()
```
Copy notes.txt from s3://my-bucket/my_folder/notes.txt to
s3://other-bucket/notes_copy.txt.
```python
from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.s3 import s3_copy
aws_credentials = AwsCredentials.load("shared-creds")
@flow
async def example_copy_flow():
await s3_copy(
source_path="my_folder/notes.txt",
target_path="notes_copy.txt",
source_bucket_name="my-bucket",
aws_credentials=aws_credentials,
target_bucket_name="other-bucket",
)
example_copy_flow()
```
"""
logger = get_run_logger()

s3_client = aws_credentials.get_s3_client()

target_bucket_name = target_bucket_name or source_bucket_name

logger.info(
"Copying object from bucket %s with key %s to bucket %s with key %s",
source_bucket_name,
source_path,
target_bucket_name,
target_path,
)

s3_client.copy_object(
CopySource={"Bucket": source_bucket_name, "Key": source_path},
Bucket=target_bucket_name,
Key=target_path,
**copy_kwargs,
)

return target_path


@task
async def s3_move(
source_path: str,
target_path: str,
source_bucket_name: str,
aws_credentials: AwsCredentials,
target_bucket_name: Optional[str] = None,
) -> str:
"""
Move an object from one S3 location to another. To move objects between buckets,
the credentials must have permission to read and delete the source object and write
to the target object. If the credentials do not have those permissions, this method
will raise an error. If the credentials have permission to read the source object
but not delete it, the object will be copied but not deleted.
Args:
source_path: The path of the object to move
target_path: The path to move the object to
source_bucket_name: The name of the bucket containing the source object
aws_credentials: Credentials to use for authentication with AWS.
target_bucket_name: The bucket to copy the object to. If not provided, defaults
to `source_bucket`.
Returns:
The path that the object was moved to. Excludes the bucket name.
"""
logger = get_run_logger()

s3_client = aws_credentials.get_s3_client()

# If target bucket is not provided, assume it's the same as the source bucket
target_bucket_name = target_bucket_name or source_bucket_name

logger.info(
"Moving object from s3://%s/%s s3://%s/%s",
source_bucket_name,
source_path,
target_bucket_name,
target_path,
)

# Copy the object to the new location
s3_client.copy_object(
Bucket=target_bucket_name,
CopySource={"Bucket": source_bucket_name, "Key": source_path},
Key=target_path,
)

# Delete the original object
s3_client.delete_object(Bucket=source_bucket_name, Key=source_path)

return target_path


def _list_objects_sync(page_iterator: PageIterator):
"""
Synchronous method to collect S3 objects into a list
Expand Down Expand Up @@ -800,7 +954,9 @@ async def stream_from(
to_path: Optional[str] = None,
**upload_kwargs: Dict[str, Any],
) -> str:
"""Streams an object from another bucket to this bucket.
"""Streams an object from another bucket to this bucket. Requires the
object to be downloaded and uploaded in chunks. If `self`'s credentials
allow for writes to the other bucket, try using `S3Bucket.copy_object`.
Args:
bucket: The bucket to stream from.
Expand Down Expand Up @@ -1028,3 +1184,170 @@ async def upload_from_folder(
)

return to_folder

@sync_compatible
async def copy_object(
self,
from_path: Union[str, Path],
to_path: Union[str, Path],
to_bucket: Optional[Union["S3Bucket", str]] = None,
**copy_kwargs,
) -> str:
"""Uses S3's internal
[CopyObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html)
to copy objects within or between buckets. To copy objects between buckets,
`self`'s credentials must have permission to read the source object and write
to the target object. If the credentials do not have those permissions, try
using `S3Bucket.stream_from`.
Args:
from_path: The path of the object to copy.
to_path: The path to copy the object to.
to_bucket: The bucket to copy to. Defaults to the current bucket.
**copy_kwargs: Additional keyword arguments to pass to
`S3Client.copy_object`.
Returns:
The path that the object was copied to. Excludes the bucket name.
Examples:
Copy notes.txt from my_folder/notes.txt to my_folder/notes_copy.txt.
```python
from prefect_aws.s3 import S3Bucket
s3_bucket = S3Bucket.load("my-bucket")
s3_bucket.copy_object("my_folder/notes.txt", "my_folder/notes_copy.txt")
```
Copy notes.txt from my_folder/notes.txt to my_folder/notes_copy.txt in
another bucket.
```python
from prefect_aws.s3 import S3Bucket
s3_bucket = S3Bucket.load("my-bucket")
s3_bucket.copy_object(
"my_folder/notes.txt",
"my_folder/notes_copy.txt",
to_bucket="other-bucket"
)
```
"""
s3_client = self.credentials.get_s3_client()

source_path = self._resolve_path(Path(from_path).as_posix())
target_path = self._resolve_path(Path(to_path).as_posix())

source_bucket_name = self.bucket_name
target_bucket_name = self.bucket_name
if isinstance(to_bucket, S3Bucket):
target_bucket_name = to_bucket.bucket_name
target_path = to_bucket._resolve_path(target_path)
elif isinstance(to_bucket, str):
target_bucket_name = to_bucket
elif to_bucket is not None:
raise TypeError(
"to_bucket must be a string or S3Bucket, not"
f" {type(target_bucket_name)}"
)

self.logger.info(
"Copying object from bucket %s with key %s to bucket %s with key %s",
source_bucket_name,
source_path,
target_bucket_name,
target_path,
)

s3_client.copy_object(
CopySource={"Bucket": source_bucket_name, "Key": source_path},
Bucket=target_bucket_name,
Key=target_path,
**copy_kwargs,
)

return target_path

@sync_compatible
async def move_object(
self,
from_path: Union[str, Path],
to_path: Union[str, Path],
to_bucket: Optional[Union["S3Bucket", str]] = None,
) -> str:
"""Uses S3's internal CopyObject and DeleteObject to move objects within or
between buckets. To move objects between buckets, `self`'s credentials must
have permission to read and delete the source object and write to the target
object. If the credentials do not have those permissions, this method will
raise an error. If the credentials have permission to read the source object
but not delete it, the object will be copied but not deleted.
Args:
from_path: The path of the object to move.
to_path: The path to move the object to.
to_bucket: The bucket to move to. Defaults to the current bucket.
Returns:
The path that the object was moved to. Excludes the bucket name.
Examples:
Move notes.txt from my_folder/notes.txt to my_folder/notes_copy.txt.
```python
from prefect_aws.s3 import S3Bucket
s3_bucket = S3Bucket.load("my-bucket")
s3_bucket.move_object("my_folder/notes.txt", "my_folder/notes_copy.txt")
```
Move notes.txt from my_folder/notes.txt to my_folder/notes_copy.txt in
another bucket.
```python
from prefect_aws.s3 import S3Bucket
s3_bucket = S3Bucket.load("my-bucket")
s3_bucket.move_object(
"my_folder/notes.txt",
"my_folder/notes_copy.txt",
to_bucket="other-bucket"
)
```
"""
s3_client = self.credentials.get_s3_client()

source_path = self._resolve_path(Path(from_path).as_posix())
target_path = self._resolve_path(Path(to_path).as_posix())

source_bucket_name = self.bucket_name
target_bucket_name = self.bucket_name
if isinstance(to_bucket, S3Bucket):
target_bucket_name = to_bucket.bucket_name
target_path = to_bucket._resolve_path(target_path)
elif isinstance(to_bucket, str):
target_bucket_name = to_bucket
elif to_bucket is not None:
raise TypeError(
"to_bucket must be a string or S3Bucket, not"
f" {type(target_bucket_name)}"
)

self.logger.info(
"Moving object from s3://%s/%s to s3://%s/%s",
source_bucket_name,
source_path,
target_bucket_name,
target_path,
)

# If invalid, should error and prevent next operation
s3_client.copy(
CopySource={"Bucket": source_bucket_name, "Key": source_path},
Bucket=target_bucket_name,
Key=target_path,
)
s3_client.delete_object(Bucket=source_bucket_name, Key=source_path)
return target_path
Loading

0 comments on commit 9ba5424

Please sign in to comment.