-
Notifications
You must be signed in to change notification settings - Fork 40
AWS S3 copy and move tasks and S3Bucket
methods
#316
Changes from 7 commits
9bdab92
bd9a2ba
c86ff5e
21f9f09
d98a102
dc3c3d2
b540fa3
f553adc
910471a
15ccec3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -147,6 +147,163 @@ async def example_s3_upload_flow(): | |||||||||
return key | ||||||||||
|
||||||||||
|
||||||||||
@task | ||||||||||
async def s3_copy( | ||||||||||
source_path: str, | ||||||||||
target_path: str, | ||||||||||
source_bucket_name: str, | ||||||||||
aws_credentials: AwsCredentials, | ||||||||||
target_bucket_name: Optional[str] = None, | ||||||||||
aws_client_parameters: AwsClientParameters = AwsClientParameters(), | ||||||||||
**copy_kwargs, | ||||||||||
) -> str: | ||||||||||
"""Uses S3's internal | ||||||||||
[CopyObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html) | ||||||||||
to copy objects within or between buckets. To copy objects between buckets, the | ||||||||||
credentials must have permission to read the source object and write to the target | ||||||||||
object. | ||||||||||
|
||||||||||
Args: | ||||||||||
source_path: The path to the object to copy. Can be a string or `Path`. | ||||||||||
target_path: The path to copy the object to. Can be a string or `Path`. | ||||||||||
source_bucket_name: The bucket to copy the object from. | ||||||||||
aws_credentials: Credentials to use for authentication with AWS. | ||||||||||
target_bucket_name: The bucket to copy the object to. If not provided, defaults | ||||||||||
to `source_bucket`. | ||||||||||
aws_client_parameters: Custom parameter for the boto3 client initialization. | ||||||||||
**copy_kwargs: Additional keyword arguments to pass to `S3Client.copy_object`. | ||||||||||
|
||||||||||
Returns: | ||||||||||
The path that the object was copied to. Excludes the bucket name. | ||||||||||
|
||||||||||
Examples: | ||||||||||
|
||||||||||
Copy notes.txt from s3://my-bucket/my_folder/notes.txt to | ||||||||||
s3://my-bucket/my_folder/notes_copy.txt. | ||||||||||
|
||||||||||
```python | ||||||||||
from prefect import flow | ||||||||||
from prefect_aws import AwsCredentials | ||||||||||
from prefect_aws.s3 import s3_copy | ||||||||||
|
||||||||||
aws_credentials = AwsCredentials.load("my-creds") | ||||||||||
|
||||||||||
@flow | ||||||||||
async def example_copy_flow(): | ||||||||||
await s3_copy( | ||||||||||
source_path="my_folder/notes.txt", | ||||||||||
target_path="my_folder/notes_copy.txt", | ||||||||||
source_bucket_name="my-bucket", | ||||||||||
aws_credentials=aws_credentials, | ||||||||||
) | ||||||||||
|
||||||||||
example_copy_flow() | ||||||||||
``` | ||||||||||
|
||||||||||
Copy notes.txt from s3://my-bucket/my_folder/notes.txt to | ||||||||||
s3://other-bucket/notes_copy.txt. | ||||||||||
|
||||||||||
```python | ||||||||||
from prefect import flow | ||||||||||
from prefect_aws import AwsCredentials | ||||||||||
from prefect_aws.s3 import s3_copy | ||||||||||
|
||||||||||
aws_credentials = AwsCredentials.load("shared-creds") | ||||||||||
|
||||||||||
@flow | ||||||||||
async def example_copy_flow(): | ||||||||||
await s3_copy( | ||||||||||
source_path="my_folder/notes.txt", | ||||||||||
target_path="notes_copy.txt", | ||||||||||
source_bucket_name="my-bucket", | ||||||||||
aws_credentials=aws_credentials, | ||||||||||
target_bucket_name="other-bucket", | ||||||||||
) | ||||||||||
|
||||||||||
example_copy_flow() | ||||||||||
``` | ||||||||||
|
||||||||||
""" | ||||||||||
logger = get_run_logger() | ||||||||||
|
||||||||||
s3_client = aws_credentials.get_boto3_session().client( | ||||||||||
"s3", **aws_client_parameters.get_params_override() | ||||||||||
) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd recommend using
Suggested change
|
||||||||||
|
||||||||||
target_bucket_name = target_bucket_name or source_bucket_name | ||||||||||
|
||||||||||
logger.info( | ||||||||||
"Copying object from bucket %s with key %s to bucket %s with key %s", | ||||||||||
source_bucket_name, | ||||||||||
source_path, | ||||||||||
target_bucket_name, | ||||||||||
target_path, | ||||||||||
) | ||||||||||
|
||||||||||
s3_client.copy_object( | ||||||||||
CopySource={"Bucket": source_bucket_name, "Key": source_path}, | ||||||||||
Bucket=target_bucket_name, | ||||||||||
Key=target_path, | ||||||||||
**copy_kwargs, | ||||||||||
) | ||||||||||
|
||||||||||
return target_path | ||||||||||
|
||||||||||
|
||||||||||
@task | ||||||||||
async def s3_move( | ||||||||||
source_path: str, | ||||||||||
target_path: str, | ||||||||||
source_bucket_name: str, | ||||||||||
aws_credentials: AwsCredentials, | ||||||||||
target_bucket_name: Optional[str] = None, | ||||||||||
aws_client_parameters: AwsClientParameters = AwsClientParameters(), | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can set client parameters on a |
||||||||||
) -> str: | ||||||||||
""" | ||||||||||
Move an object from one S3 location to another | ||||||||||
|
||||||||||
Args: | ||||||||||
source_path: The path of the object to move | ||||||||||
target_path: The path to move the object to | ||||||||||
source_bucket_name: The name of the bucket containing the source object | ||||||||||
aws_credentials: Credentials to use for authentication with AWS. | ||||||||||
target_bucket_name: The bucket to copy the object to. If not provided, defaults | ||||||||||
to `source_bucket`. | ||||||||||
aws_client_parameters: Custom parameter for the boto3 client initialization. | ||||||||||
|
||||||||||
Returns: | ||||||||||
The path that the object was moved to. Excludes the bucket name. | ||||||||||
""" | ||||||||||
logger = get_run_logger() | ||||||||||
|
||||||||||
s3_client = aws_credentials.get_boto3_session().client( | ||||||||||
"s3", **aws_client_parameters.get_params_override() | ||||||||||
) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd recommend using
Suggested change
|
||||||||||
|
||||||||||
# If target bucket is not provided, assume it's the same as the source bucket | ||||||||||
target_bucket_name = target_bucket_name or source_bucket_name | ||||||||||
|
||||||||||
logger.info( | ||||||||||
"Moving object from s3://%s/%s s3://%s/%s", | ||||||||||
source_bucket_name, | ||||||||||
source_path, | ||||||||||
target_bucket_name, | ||||||||||
target_path, | ||||||||||
) | ||||||||||
|
||||||||||
# Copy the object to the new location | ||||||||||
s3_client.copy_object( | ||||||||||
Bucket=target_bucket_name, | ||||||||||
CopySource={"Bucket": source_bucket_name, "Key": source_path}, | ||||||||||
Key=target_path, | ||||||||||
) | ||||||||||
|
||||||||||
# Delete the original object | ||||||||||
s3_client.delete_object(Bucket=source_bucket_name, Key=source_path) | ||||||||||
|
||||||||||
return target_path | ||||||||||
|
||||||||||
|
||||||||||
def _list_objects_sync(page_iterator: PageIterator): | ||||||||||
""" | ||||||||||
Synchronous method to collect S3 objects into a list | ||||||||||
|
@@ -1023,3 +1180,163 @@ async def upload_from_folder( | |||||||||
) | ||||||||||
|
||||||||||
return to_folder | ||||||||||
|
||||||||||
@sync_compatible | ||||||||||
async def copy_object( | ||||||||||
self, | ||||||||||
from_path: Union[str, Path], | ||||||||||
to_path: Union[str, Path], | ||||||||||
to_bucket: Optional[Union["S3Bucket", str]] = None, | ||||||||||
**copy_kwargs, | ||||||||||
) -> str: | ||||||||||
"""Uses S3's internal | ||||||||||
[CopyObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html) | ||||||||||
to copy objects within or between buckets. | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We also have a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah so the Will add context to the docstrings of both methods. |
||||||||||
|
||||||||||
Args: | ||||||||||
from_path: The path of the object to copy. | ||||||||||
to_path: The path to copy the object to. | ||||||||||
to_bucket: The bucket to copy to. Defaults to the current bucket. | ||||||||||
**copy_kwargs: Additional keyword arguments to pass to | ||||||||||
`S3Client.copy_object`. | ||||||||||
|
||||||||||
Returns: | ||||||||||
The path that the object was copied to. Excludes the bucket name. | ||||||||||
|
||||||||||
Examples: | ||||||||||
|
||||||||||
Copy notes.txt from my_folder/notes.txt to my_folder/notes_copy.txt. | ||||||||||
|
||||||||||
```python | ||||||||||
from prefect_aws.s3 import S3Bucket | ||||||||||
|
||||||||||
s3_bucket = S3Bucket.load("my-bucket") | ||||||||||
s3_bucket.copy_object("my_folder/notes.txt", "my_folder/notes_copy.txt") | ||||||||||
``` | ||||||||||
|
||||||||||
Copy notes.txt from my_folder/notes.txt to my_folder/notes_copy.txt in | ||||||||||
another bucket. | ||||||||||
|
||||||||||
```python | ||||||||||
from prefect_aws.s3 import S3Bucket | ||||||||||
|
||||||||||
s3_bucket = S3Bucket.load("my-bucket") | ||||||||||
s3_bucket.copy_object( | ||||||||||
"my_folder/notes.txt", | ||||||||||
"my_folder/notes_copy.txt", | ||||||||||
to_bucket="other-bucket" | ||||||||||
) | ||||||||||
``` | ||||||||||
""" | ||||||||||
s3_client = self.credentials.get_s3_client() | ||||||||||
|
||||||||||
source_path = self._resolve_path(Path(from_path).as_posix()) | ||||||||||
target_path = self._resolve_path(Path(to_path).as_posix()) | ||||||||||
|
||||||||||
source_bucket_name = self.bucket_name | ||||||||||
target_bucket_name = self.bucket_name | ||||||||||
if isinstance(to_bucket, S3Bucket): | ||||||||||
target_bucket_name = to_bucket.bucket_name | ||||||||||
target_path = to_bucket._resolve_path(target_path) | ||||||||||
elif isinstance(to_bucket, str): | ||||||||||
target_bucket_name = to_bucket | ||||||||||
elif to_bucket is not None: | ||||||||||
raise TypeError( | ||||||||||
"to_bucket must be a string or S3Bucket, not" | ||||||||||
f" {type(target_bucket_name)}" | ||||||||||
) | ||||||||||
|
||||||||||
self.logger.info( | ||||||||||
"Copying object from bucket %s with key %s to bucket %s with key %s", | ||||||||||
source_bucket_name, | ||||||||||
source_path, | ||||||||||
target_bucket_name, | ||||||||||
target_path, | ||||||||||
) | ||||||||||
|
||||||||||
s3_client.copy_object( | ||||||||||
CopySource={"Bucket": source_bucket_name, "Key": source_path}, | ||||||||||
Bucket=target_bucket_name, | ||||||||||
Key=target_path, | ||||||||||
**copy_kwargs, | ||||||||||
) | ||||||||||
|
||||||||||
return target_path | ||||||||||
|
||||||||||
@sync_compatible | ||||||||||
async def move_object( | ||||||||||
self, | ||||||||||
from_path: Union[str, Path], | ||||||||||
to_path: Union[str, Path], | ||||||||||
to_bucket: Optional[Union["S3Bucket", str]] = None, | ||||||||||
) -> str: | ||||||||||
"""Uses S3's internal CopyObject and DeleteObject to move objects within or | ||||||||||
between buckets. | ||||||||||
|
||||||||||
Args: | ||||||||||
from_path: The path of the object to move. | ||||||||||
to_path: The path to move the object to. | ||||||||||
to_bucket: The bucket to move to. Defaults to the current bucket. | ||||||||||
|
||||||||||
Returns: | ||||||||||
The path that the object was moved to. Excludes the bucket name. | ||||||||||
|
||||||||||
Examples: | ||||||||||
|
||||||||||
Move notes.txt from my_folder/notes.txt to my_folder/notes_copy.txt. | ||||||||||
|
||||||||||
```python | ||||||||||
from prefect_aws.s3 import S3Bucket | ||||||||||
|
||||||||||
s3_bucket = S3Bucket.load("my-bucket") | ||||||||||
s3_bucket.move_object("my_folder/notes.txt", "my_folder/notes_copy.txt") | ||||||||||
``` | ||||||||||
|
||||||||||
Move notes.txt from my_folder/notes.txt to my_folder/notes_copy.txt in | ||||||||||
another bucket. | ||||||||||
|
||||||||||
```python | ||||||||||
from prefect_aws.s3 import S3Bucket | ||||||||||
|
||||||||||
s3_bucket = S3Bucket.load("my-bucket") | ||||||||||
s3_bucket.move_object( | ||||||||||
"my_folder/notes.txt", | ||||||||||
"my_folder/notes_copy.txt", | ||||||||||
to_bucket="other-bucket" | ||||||||||
) | ||||||||||
``` | ||||||||||
""" | ||||||||||
s3_client = self.credentials.get_s3_client() | ||||||||||
|
||||||||||
source_path = self._resolve_path(Path(from_path).as_posix()) | ||||||||||
target_path = self._resolve_path(Path(to_path).as_posix()) | ||||||||||
|
||||||||||
source_bucket_name = self.bucket_name | ||||||||||
target_bucket_name = self.bucket_name | ||||||||||
if isinstance(to_bucket, S3Bucket): | ||||||||||
target_bucket_name = to_bucket.bucket_name | ||||||||||
target_path = to_bucket._resolve_path(target_path) | ||||||||||
elif isinstance(to_bucket, str): | ||||||||||
target_bucket_name = to_bucket | ||||||||||
elif to_bucket is not None: | ||||||||||
raise TypeError( | ||||||||||
"to_bucket must be a string or S3Bucket, not" | ||||||||||
f" {type(target_bucket_name)}" | ||||||||||
) | ||||||||||
|
||||||||||
self.logger.info( | ||||||||||
"Moving object from s3://%s/%s to s3://%s/%s", | ||||||||||
source_bucket_name, | ||||||||||
source_path, | ||||||||||
target_bucket_name, | ||||||||||
target_path, | ||||||||||
) | ||||||||||
|
||||||||||
# If invalid, should error and prevent next operation | ||||||||||
s3_client.copy( | ||||||||||
CopySource={"Bucket": source_bucket_name, "Key": source_path}, | ||||||||||
Bucket=target_bucket_name, | ||||||||||
Key=target_path, | ||||||||||
) | ||||||||||
s3_client.delete_object(Bucket=source_bucket_name, Key=source_path) | ||||||||||
return target_path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can set client parameters on a
AwsCredntials
object, so I think we can remove this parameter from the task.