From 9bdab92af1523e885a08248b8aa0c057faf7c638 Mon Sep 17 00:00:00 2001 From: Dominic Tarro Date: Wed, 27 Sep 2023 11:48:21 -0400 Subject: [PATCH 01/10] methods for S3 native copy operations --- prefect_aws/s3.py | 181 ++++++++++++++++++++++++++++++++++++++++++++++ tests/test_s3.py | 77 +++++++++++++++++++- 2 files changed, 255 insertions(+), 3 deletions(-) diff --git a/prefect_aws/s3.py b/prefect_aws/s3.py index 17fe8870..07fbd8ce 100644 --- a/prefect_aws/s3.py +++ b/prefect_aws/s3.py @@ -147,6 +147,107 @@ async def example_s3_upload_flow(): return key +@task +async def s3_copy( + source_object, + target_object, + source_bucket, + aws_credentials: AwsCredentials, + target_bucket=None, + aws_client_parameters: AwsClientParameters = AwsClientParameters(), + **copy_kwargs, +) -> str: + """Uses S3's internal + [CopyObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html) + to copy objects within or between buckets. To copy objects between buckets, the + credentials must have permission to read the source object and write to the target + object. + + Args: + source_object: The path to the object to copy. Can be a string or `Path`. + target_object: The path to copy the object to. Can be a string or `Path`. + source_bucket: The bucket to copy the object from. + target_bucket: The bucket to copy the object to. If not provided, defaults to + `source_bucket`. + **copy_kwargs: Additional keyword arguments to pass to `S3Client.copy`. + + Returns: + The path that the object was copied to. Excludes the bucket name. + + Examples: + + Copy notes.txt from s3://my-bucket/my_folder/notes.txt to + s3://my-bucket/my_folder/notes_copy.txt. + + ```python + from prefect import flow + from prefect_aws import AwsCredentials + from prefect_aws.s3 import s3_copy + + aws_credentials = AwsCredentials.load("my-creds") + + @flow + async def example_copy_flow(): + await s3_copy( + source_object="my_folder/notes.txt", + target_object="my_folder/notes_copy.txt", + source_bucket="my-bucket", + aws_credentials=aws_credentials, + ) + + example_copy_flow() + ``` + + Copy notes.txt from s3://my-bucket/my_folder/notes.txt to + s3://other-bucket/notes_copy.txt. + + ```python + from prefect import flow + from prefect_aws import AwsCredentials + from prefect_aws.s3 import s3_copy + + aws_credentials = AwsCredentials.load("shared-creds") + + @flow + async def example_copy_flow(): + await s3_copy( + source_object="my_folder/notes.txt", + target_object="notes_copy.txt", + source_bucket="my-bucket", + aws_credentials=aws_credentials, + target_bucket="other-bucket", + ) + + example_copy_flow() + ``` + + """ + logger = get_run_logger() + + s3_client = aws_credentials.get_boto3_session().client( + "s3", **aws_client_parameters.get_params_override() + ) + + target_bucket = target_bucket or source_bucket + + logger.info( + "Copying object from bucket %s with key %s to bucket %s with key %s", + source_bucket, + source_object, + target_bucket, + target_object, + ) + + s3_client.copy( + CopySource={"Bucket": source_bucket, "Key": source_object}, + Bucket=target_bucket, + Key=target_object, + **copy_kwargs, + ) + + return target_object + + def _list_objects_sync(page_iterator: PageIterator): """ Synchronous method to collect S3 objects into a list @@ -1023,3 +1124,83 @@ async def upload_from_folder( ) return to_folder + + @sync_compatible + async def copy_object( + self, + from_path: Union[str, Path], + to_path: Union[str, Path], + to_bucket: Optional[Union["S3Bucket", str]] = None, + **copy_kwargs, + ) -> str: + """Uses S3's internal + [CopyObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html) + to copy objects within or between buckets. + + Args: + from_path: The path of the object to copy. + to_path: The path to copy the object to. + to_bucket: The bucket to copy to. Defaults to the current bucket. + **copy_kwargs: Additional keyword arguments to pass to `S3Client.copy`. + + Returns: + The path that the object was copied to. Excludes the bucket name. + + Examples: + + Copy notes.txt from my_folder/notes.txt to my_folder/notes_copy.txt. + + ```python + from prefect_aws.s3 import S3Bucket + + s3_bucket = S3Bucket.load("my-bucket") + s3_bucket.copy_object("my_folder/notes.txt", "my_folder/notes_copy.txt") + ``` + + Copy notes.txt from my_folder/notes.txt to my_folder/notes_copy.txt in + another bucket. + + ```python + from prefect_aws.s3 import S3Bucket + + s3_bucket = S3Bucket.load("my-bucket") + s3_bucket.copy_object( + "my_folder/notes.txt", + "my_folder/notes_copy.txt", + to_bucket="other-bucket" + ) + ``` + """ + s3_client = self.credentials.get_s3_client() + + source_object = self._resolve_path(Path(from_path).as_posix()) + target_object = self._resolve_path(Path(to_path).as_posix()) + + source_bucket = self.bucket_name + target_bucket = self.bucket_name + if isinstance(to_bucket, S3Bucket): + target_bucket = to_bucket.bucket_name + target_object = to_bucket._resolve_path(target_object) + elif isinstance(to_bucket, str): + target_bucket = to_bucket + elif to_bucket is not None: + raise TypeError( + f"to_bucket must be a string or S3Bucket, not {type(target_bucket)}" + ) + + self.logger.info( + "Copying object from bucket %s with key %s to bucket %s with key %s", + source_bucket, + source_object, + target_bucket, + target_object, + ) + + s3_client.copy( + CopySource={"Bucket": source_bucket, "Key": source_object}, + Bucket=target_bucket, + Key=target_object, + **copy_kwargs, + ) + + return target_object diff --git a/tests/test_s3.py b/tests/test_s3.py index c0159e25..9dbc3ca0 100644 --- a/tests/test_s3.py +++ b/tests/test_s3.py @@ -12,7 +12,7 @@ from prefect_aws import AwsCredentials, MinIOCredentials from prefect_aws.client_parameters import AwsClientParameters -from prefect_aws.s3 import S3Bucket, s3_download, s3_list_objects, s3_upload +from prefect_aws.s3 import S3Bucket, s3_copy, s3_download, s3_list_objects, s3_upload aws_clients = [ (lazy_fixture("aws_client_parameters_custom_endpoint")), @@ -47,6 +47,18 @@ def bucket(s3_mock, request): return bucket +@pytest.fixture +def bucket_2(s3_mock, request): + s3 = boto3.resource("s3") + bucket = s3.Bucket("bucket_2") + marker = request.node.get_closest_marker("is_public", None) + if marker and marker.args[0]: + bucket.create(ACL="public-read") + else: + bucket.create() + return bucket + + @pytest.fixture def object(bucket, tmp_path): file = tmp_path / "object.txt" @@ -205,6 +217,41 @@ async def test_flow(): assert output == b"NEW OBJECT" +# Ignore public bucket client parameters +@pytest.mark.parametrize("client_parameters", aws_clients[:-1], indirect=True) +async def test_s3_copy(object, bucket, bucket_2, client_parameters, aws_credentials): + def read(bucket, key): + stream = io.BytesIO() + bucket.download_fileobj(key, stream) + stream.seek(0) + return stream.read() + + @flow + async def test_flow(): + # Test cross-bucket copy + await s3_copy( + source_object="object", + source_bucket="bucket", + target_object="subfolder/new_object", + target_bucket="bucket_2", + aws_credentials=aws_credentials, + aws_client_parameters=client_parameters, + ) + + # Test within-bucket copy + await s3_copy( + source_object="object", + source_bucket="bucket", + target_object="subfolder/new_object", + aws_credentials=aws_credentials, + aws_client_parameters=client_parameters, + ) + + await test_flow() + assert read(bucket_2, "subfolder/new_object") == b"TEST" + assert read(bucket, "subfolder/new_object") == b"TEST" + + @pytest.mark.parametrize("client_parameters", aws_clients, indirect=True) async def test_s3_list_objects( object, client_parameters, object_in_folder, aws_credentials @@ -623,9 +670,9 @@ def s3_bucket_empty(self, credentials, bucket): return _s3_bucket @pytest.fixture - def s3_bucket_2_empty(self, credentials, bucket): + def s3_bucket_2_empty(self, credentials, bucket_2): _s3_bucket = S3Bucket( - bucket_name="bucket", + bucket_name="bucket_2", credentials=credentials, bucket_folder="subfolder", ) @@ -811,3 +858,27 @@ def test_upload_from_folder( break else: raise AssertionError("Files did upload") + + @pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) + def test_copy_object( + self, + s3_bucket_with_object: S3Bucket, + s3_bucket_2_empty: S3Bucket, + client_parameters, + ): + s3_bucket_with_object.copy_object("object", "object_copy_1") + assert s3_bucket_with_object.read_path("object_copy_1") == b"TEST" + + s3_bucket_with_object.copy_object("object", "folder/object_copy_2") + assert s3_bucket_with_object.read_path("folder/object_copy_2") == b"TEST" + + # S3Bucket for second bucket has a basepath + s3_bucket_with_object.copy_object( + "object", + s3_bucket_2_empty._resolve_path("object_copy_3"), + to_bucket="bucket_2", + ) + assert s3_bucket_2_empty.read_path("object_copy_3") == b"TEST" + + s3_bucket_with_object.copy_object("object", "object_copy_4", s3_bucket_2_empty) + assert s3_bucket_2_empty.read_path("object_copy_4") == b"TEST" From bd9a2ba17ea2f373a90e572a528b6fc4732f3e2a Mon Sep 17 00:00:00 2001 From: Dominic Tarro Date: Wed, 27 Sep 2023 12:13:35 -0400 Subject: [PATCH 02/10] better naming and docs --- prefect_aws/s3.py | 85 ++++++++++++++++++++++++----------------------- tests/test_s3.py | 14 ++++---- 2 files changed, 51 insertions(+), 48 deletions(-) diff --git a/prefect_aws/s3.py b/prefect_aws/s3.py index 07fbd8ce..b7256ba2 100644 --- a/prefect_aws/s3.py +++ b/prefect_aws/s3.py @@ -149,11 +149,11 @@ async def example_s3_upload_flow(): @task async def s3_copy( - source_object, - target_object, - source_bucket, + source_path: str, + target_path: str, + source_bucket_name: str, aws_credentials: AwsCredentials, - target_bucket=None, + target_bucket_name: Optional[str] = None, aws_client_parameters: AwsClientParameters = AwsClientParameters(), **copy_kwargs, ) -> str: @@ -164,11 +164,13 @@ async def s3_copy( object. Args: - source_object: The path to the object to copy. Can be a string or `Path`. - target_object: The path to copy the object to. Can be a string or `Path`. - source_bucket: The bucket to copy the object from. - target_bucket: The bucket to copy the object to. If not provided, defaults to - `source_bucket`. + source_path: The path to the object to copy. Can be a string or `Path`. + target_path: The path to copy the object to. Can be a string or `Path`. + source_bucket_name: The bucket to copy the object from. + aws_credentials: Credentials to use for authentication with AWS. + target_bucket_name: The bucket to copy the object to. If not provided, defaults + to `source_bucket`. + aws_client_parameters: Custom parameter for the boto3 client initialization. **copy_kwargs: Additional keyword arguments to pass to `S3Client.copy`. Returns: @@ -189,9 +191,9 @@ async def s3_copy( @flow async def example_copy_flow(): await s3_copy( - source_object="my_folder/notes.txt", - target_object="my_folder/notes_copy.txt", - source_bucket="my-bucket", + source_path="my_folder/notes.txt", + target_path="my_folder/notes_copy.txt", + source_bucket_name="my-bucket", aws_credentials=aws_credentials, ) @@ -211,11 +213,11 @@ async def example_copy_flow(): @flow async def example_copy_flow(): await s3_copy( - source_object="my_folder/notes.txt", - target_object="notes_copy.txt", - source_bucket="my-bucket", + source_path="my_folder/notes.txt", + target_path="notes_copy.txt", + source_bucket_name="my-bucket", aws_credentials=aws_credentials, - target_bucket="other-bucket", + target_bucket_name="other-bucket", ) example_copy_flow() @@ -228,24 +230,24 @@ async def example_copy_flow(): "s3", **aws_client_parameters.get_params_override() ) - target_bucket = target_bucket or source_bucket + target_bucket_name = target_bucket_name or source_bucket_name logger.info( "Copying object from bucket %s with key %s to bucket %s with key %s", - source_bucket, - source_object, - target_bucket, - target_object, + source_bucket_name, + source_path, + target_bucket_name, + target_path, ) s3_client.copy( - CopySource={"Bucket": source_bucket, "Key": source_object}, - Bucket=target_bucket, - Key=target_object, + CopySource={"Bucket": source_bucket_name, "Key": source_path}, + Bucket=target_bucket_name, + Key=target_path, **copy_kwargs, ) - return target_object + return target_path def _list_objects_sync(page_iterator: PageIterator): @@ -1173,34 +1175,35 @@ async def copy_object( """ s3_client = self.credentials.get_s3_client() - source_object = self._resolve_path(Path(from_path).as_posix()) - target_object = self._resolve_path(Path(to_path).as_posix()) + source_path = self._resolve_path(Path(from_path).as_posix()) + target_path = self._resolve_path(Path(to_path).as_posix()) - source_bucket = self.bucket_name - target_bucket = self.bucket_name + source_bucket_name = self.bucket_name + target_bucket_name = self.bucket_name if isinstance(to_bucket, S3Bucket): - target_bucket = to_bucket.bucket_name - target_object = to_bucket._resolve_path(target_object) + target_bucket_name = to_bucket.bucket_name + target_path = to_bucket._resolve_path(target_path) elif isinstance(to_bucket, str): - target_bucket = to_bucket + target_bucket_name = to_bucket elif to_bucket is not None: raise TypeError( - f"to_bucket must be a string or S3Bucket, not {type(target_bucket)}" + "to_bucket must be a string or S3Bucket, not" + f" {type(target_bucket_name)}" ) self.logger.info( "Copying object from bucket %s with key %s to bucket %s with key %s", - source_bucket, - source_object, - target_bucket, - target_object, + source_bucket_name, + source_path, + target_bucket_name, + target_path, ) s3_client.copy( - CopySource={"Bucket": source_bucket, "Key": source_object}, - Bucket=target_bucket, - Key=target_object, + CopySource={"Bucket": source_bucket_name, "Key": source_path}, + Bucket=target_bucket_name, + Key=target_path, **copy_kwargs, ) - return target_object + return target_path diff --git a/tests/test_s3.py b/tests/test_s3.py index 9dbc3ca0..2b07ce10 100644 --- a/tests/test_s3.py +++ b/tests/test_s3.py @@ -230,19 +230,19 @@ def read(bucket, key): async def test_flow(): # Test cross-bucket copy await s3_copy( - source_object="object", - source_bucket="bucket", - target_object="subfolder/new_object", - target_bucket="bucket_2", + source_path="object", + target_path="subfolder/new_object", + source_bucket_name="bucket", aws_credentials=aws_credentials, + target_bucket_name="bucket_2", aws_client_parameters=client_parameters, ) # Test within-bucket copy await s3_copy( - source_object="object", - source_bucket="bucket", - target_object="subfolder/new_object", + source_path="object", + target_path="subfolder/new_object", + source_bucket_name="bucket", aws_credentials=aws_credentials, aws_client_parameters=client_parameters, ) From c86ff5ed14160d63082638635e1a6664465b3231 Mon Sep 17 00:00:00 2001 From: Dominic Tarro Date: Wed, 27 Sep 2023 12:18:15 -0400 Subject: [PATCH 03/10] use correct client copy method --- prefect_aws/s3.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/prefect_aws/s3.py b/prefect_aws/s3.py index b7256ba2..ea5224b4 100644 --- a/prefect_aws/s3.py +++ b/prefect_aws/s3.py @@ -171,7 +171,7 @@ async def s3_copy( target_bucket_name: The bucket to copy the object to. If not provided, defaults to `source_bucket`. aws_client_parameters: Custom parameter for the boto3 client initialization. - **copy_kwargs: Additional keyword arguments to pass to `S3Client.copy`. + **copy_kwargs: Additional keyword arguments to pass to `S3Client.copy_object`. Returns: The path that the object was copied to. Excludes the bucket name. @@ -240,7 +240,7 @@ async def example_copy_flow(): target_path, ) - s3_client.copy( + s3_client.copy_object( CopySource={"Bucket": source_bucket_name, "Key": source_path}, Bucket=target_bucket_name, Key=target_path, @@ -1143,7 +1143,8 @@ async def copy_object( from_path: The path of the object to copy. to_path: The path to copy the object to. to_bucket: The bucket to copy to. Defaults to the current bucket. - **copy_kwargs: Additional keyword arguments to pass to `S3Client.copy`. + **copy_kwargs: Additional keyword arguments to pass to + `S3Client.copy_object`. Returns: The path that the object was copied to. Excludes the bucket name. @@ -1199,7 +1200,7 @@ async def copy_object( target_path, ) - s3_client.copy( + s3_client.copy_object( CopySource={"Bucket": source_bucket_name, "Key": source_path}, Bucket=target_bucket_name, Key=target_path, From 21f9f09ec20b4446d38efc9f70338c55731cbedf Mon Sep 17 00:00:00 2001 From: Dominic Tarro Date: Wed, 27 Sep 2023 13:51:46 -0400 Subject: [PATCH 04/10] `S3Bucket.move_object` and tests --- prefect_aws/s3.py | 78 +++++++++++++++++++++++++++++++++++++++++++++++ tests/test_s3.py | 49 +++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+) diff --git a/prefect_aws/s3.py b/prefect_aws/s3.py index ea5224b4..ec282e33 100644 --- a/prefect_aws/s3.py +++ b/prefect_aws/s3.py @@ -1208,3 +1208,81 @@ async def copy_object( ) return target_path + + @sync_compatible + async def move_object( + self, + from_path: Union[str, Path], + to_path: Union[str, Path], + to_bucket: Optional[Union["S3Bucket", str]] = None, + ) -> str: + """Uses S3's internal CopyObject and DeleteObject to move objects within or + between buckets. + + Args: + from_path: The path of the object to move. + to_path: The path to move the object to. + to_bucket: The bucket to move to. Defaults to the current bucket. + + Returns: + The path that the object was moved to. Excludes the bucket name. + + Examples: + + Move notes.txt from my_folder/notes.txt to my_folder/notes_copy.txt. + + ```python + from prefect_aws.s3 import S3Bucket + + s3_bucket = S3Bucket.load("my-bucket") + s3_bucket.move_object("my_folder/notes.txt", "my_folder/notes_copy.txt") + ``` + + Move notes.txt from my_folder/notes.txt to my_folder/notes_copy.txt in + another bucket. + + ```python + from prefect_aws.s3 import S3Bucket + + s3_bucket = S3Bucket.load("my-bucket") + s3_bucket.move_object( + "my_folder/notes.txt", + "my_folder/notes_copy.txt", + to_bucket="other-bucket" + ) + ``` + """ + s3_client = self.credentials.get_s3_client() + + source_path = self._resolve_path(Path(from_path).as_posix()) + target_path = self._resolve_path(Path(to_path).as_posix()) + + source_bucket_name = self.bucket_name + target_bucket_name = self.bucket_name + if isinstance(to_bucket, S3Bucket): + target_bucket_name = to_bucket.bucket_name + target_path = to_bucket._resolve_path(target_path) + elif isinstance(to_bucket, str): + target_bucket_name = to_bucket + elif to_bucket is not None: + raise TypeError( + "to_bucket must be a string or S3Bucket, not" + f" {type(target_bucket_name)}" + ) + + self.logger.info( + "Moving object from s3://%s/%s to s3://%s/%s", + source_bucket_name, + source_path, + target_bucket_name, + target_path, + ) + + # If invalid, should error and prevent next operation + s3_client.copy( + CopySource={"Bucket": source_bucket_name, "Key": source_path}, + Bucket=target_bucket_name, + Key=target_path, + ) + s3_client.delete_object(Bucket=source_bucket_name, Key=source_path) + return target_path diff --git a/tests/test_s3.py b/tests/test_s3.py index 2b07ce10..0507372f 100644 --- a/tests/test_s3.py +++ b/tests/test_s3.py @@ -882,3 +882,52 @@ def test_copy_object( s3_bucket_with_object.copy_object("object", "object_copy_4", s3_bucket_2_empty) assert s3_bucket_2_empty.read_path("object_copy_4") == b"TEST" + + @pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) + def test_move_object_within_bucket( + self, + s3_bucket_with_object: S3Bucket, + client_parameters, + ): + s3_bucket_with_object.move_object("object", "object_copy_1") + assert s3_bucket_with_object.read_path("object_copy_1") == b"TEST" + + with pytest.raises(ClientError): + assert s3_bucket_with_object.read_path("object") == b"TEST" + + @pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) + def test_move_object_to_nonexistent_bucket_fails( + self, + s3_bucket_with_object: S3Bucket, + client_parameters, + ): + with pytest.raises(ClientError): + s3_bucket_with_object.move_object( + "object", "object_copy_1", to_bucket="nonexistent-bucket" + ) + assert s3_bucket_with_object.read_path("object") == b"TEST" + + @pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) + def test_move_object_onto_itself_fails( + self, + s3_bucket_with_object: S3Bucket, + client_parameters, + ): + with pytest.raises(ClientError): + s3_bucket_with_object.move_object("object", "object") + assert s3_bucket_with_object.read_path("object") == b"TEST" + + @pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) + def test_move_object_between_buckets( + self, + s3_bucket_with_object: S3Bucket, + s3_bucket_2_empty: S3Bucket, + client_parameters, + ): + s3_bucket_with_object.move_object( + "object", "object_copy_1", to_bucket=s3_bucket_2_empty + ) + assert s3_bucket_with_object.read_path("object_copy_1") == b"TEST" + + with pytest.raises(ClientError): + assert s3_bucket_with_object.read_path("object") == b"TEST" From d98a10238eb304ee1ff4dfa5c054dfb6299a825d Mon Sep 17 00:00:00 2001 From: Dominic Tarro Date: Wed, 27 Sep 2023 14:46:33 -0400 Subject: [PATCH 05/10] move object task --- prefect_aws/s3.py | 54 +++++++++++++++++++ tests/test_s3.py | 129 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 182 insertions(+), 1 deletion(-) diff --git a/prefect_aws/s3.py b/prefect_aws/s3.py index ec282e33..64212927 100644 --- a/prefect_aws/s3.py +++ b/prefect_aws/s3.py @@ -250,6 +250,60 @@ async def example_copy_flow(): return target_path +@task +async def s3_move( + source_path: str, + target_path: str, + source_bucket_name: str, + aws_credentials: AwsCredentials, + target_bucket_name: Optional[str] = None, + aws_client_parameters: AwsClientParameters = AwsClientParameters(), +) -> str: + """ + Move an object from one S3 location to another + + Args: + source_path: The path of the object to move + target_path: The path to move the object to + source_bucket_name: The name of the bucket containing the source object + aws_credentials: Credentials to use for authentication with AWS. + target_bucket_name: The bucket to copy the object to. If not provided, defaults + to `source_bucket`. + aws_client_parameters: Custom parameter for the boto3 client initialization. + + Returns: + The path that the object was moved to. Excludes the bucket name. + """ + logger = get_run_logger() + + s3_client = aws_credentials.get_boto3_session().client( + "s3", **aws_client_parameters.get_params_override() + ) + + # If target bucket is not provided, assume it's the same as the source bucket + target_bucket_name = target_bucket_name or source_bucket_name + + logger.info( + "Moving object from s3://%s/%s s3://%s/%s", + source_bucket_name, + source_path, + target_bucket_name, + target_path, + ) + + # Copy the object to the new location + s3_client.copy_object( + Bucket=target_bucket_name, + CopySource={"Bucket": source_bucket_name, "Key": source_path}, + Key=target_path, + ) + + # Delete the original object + s3_client.delete_object(Bucket=source_bucket_name, Key=source_path) + + return target_path + + def _list_objects_sync(page_iterator: PageIterator): """ Synchronous method to collect S3 objects into a list diff --git a/tests/test_s3.py b/tests/test_s3.py index 0507372f..4abbd9cd 100644 --- a/tests/test_s3.py +++ b/tests/test_s3.py @@ -12,7 +12,14 @@ from prefect_aws import AwsCredentials, MinIOCredentials from prefect_aws.client_parameters import AwsClientParameters -from prefect_aws.s3 import S3Bucket, s3_copy, s3_download, s3_list_objects, s3_upload +from prefect_aws.s3 import ( + S3Bucket, + s3_copy, + s3_download, + s3_list_objects, + s3_move, + s3_upload, +) aws_clients = [ (lazy_fixture("aws_client_parameters_custom_endpoint")), @@ -252,6 +259,126 @@ async def test_flow(): assert read(bucket, "subfolder/new_object") == b"TEST" +# Ignore public bucket client parameters +@pytest.mark.parametrize("client_parameters", aws_clients[:-1], indirect=True) +async def test_s3_move(object, bucket, bucket_2, client_parameters, aws_credentials): + def read(bucket, key): + stream = io.BytesIO() + bucket.download_fileobj(key, stream) + stream.seek(0) + return stream.read() + + @flow + async def test_flow(): + # Test within-bucket move + await s3_move( + source_path="object", + target_path="subfolder/object_copy", + source_bucket_name="bucket", + aws_credentials=aws_credentials, + aws_client_parameters=client_parameters, + ) + + # Test cross-bucket move + await s3_move( + source_path="subfolder/object_copy", + target_path="object_copy_2", + source_bucket_name="bucket", + target_bucket_name="bucket_2", + aws_credentials=aws_credentials, + aws_client_parameters=client_parameters, + ) + + await test_flow() + + assert read(bucket_2, "object_copy_2") == b"TEST" + + with pytest.raises(ClientError): + read(bucket, "object") + + with pytest.raises(ClientError): + read(bucket, "subfolder/object_copy") + + +@pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) +async def test_move_object_to_nonexistent_bucket_fails( + object, + bucket, + client_parameters, + aws_credentials, +): + def read(bucket, key): + stream = io.BytesIO() + bucket.download_fileobj(key, stream) + stream.seek(0) + return stream.read() + + @flow + async def test_flow(): + # Test cross-bucket move + await s3_move( + source_path="object", + target_path="subfolder/new_object", + source_bucket_name="bucket", + aws_credentials=aws_credentials, + target_bucket_name="nonexistent-bucket", + aws_client_parameters=client_parameters, + ) + + with pytest.raises(ClientError): + await test_flow() + + assert read(bucket, "object") == b"TEST" + + +@pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) +async def test_move_object_fail_cases( + object, + bucket, + client_parameters, + aws_credentials, +): + def read(bucket, key): + stream = io.BytesIO() + bucket.download_fileobj(key, stream) + stream.seek(0) + return stream.read() + + @flow + async def test_flow( + source_path, target_path, source_bucket_name, target_bucket_name + ): + # Test cross-bucket move + await s3_move( + source_path=source_path, + target_path=target_path, + source_bucket_name=source_bucket_name, + aws_credentials=aws_credentials, + target_bucket_name=target_bucket_name, + aws_client_parameters=client_parameters, + ) + + # Move to non-existent bucket + with pytest.raises(ClientError): + await test_flow( + source_path="object", + target_path="subfolder/new_object", + source_bucket_name="bucket", + target_bucket_name="nonexistent-bucket", + ) + assert read(bucket, "object") == b"TEST" + + # Move onto self + with pytest.raises(ClientError): + await test_flow( + source_path="object", + target_path="object", + source_bucket_name="bucket", + target_bucket_name="bucket", + ) + assert read(bucket, "object") == b"TEST" + + @pytest.mark.parametrize("client_parameters", aws_clients, indirect=True) async def test_s3_list_objects( object, client_parameters, object_in_folder, aws_credentials From dc3c3d2b9b5eb5761b8555824e9c111335487c2a Mon Sep 17 00:00:00 2001 From: Dominic Tarro Date: Wed, 27 Sep 2023 15:08:18 -0400 Subject: [PATCH 06/10] update CHANGELOG.md --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e0a447c8..22282b5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- AWS S3 copy and move tasks and `S3Bucket` methods - [#316](https://github.com/PrefectHQ/prefect-aws/pull/316) + ### Added - Added retries to ECS task run creation for ECS worker - [#303](https://github.com/PrefectHQ/prefect-aws/pull/303) From b540fa37a4c9fa3a6010b2460910e14913e3b502 Mon Sep 17 00:00:00 2001 From: Dominic Tarro Date: Thu, 28 Sep 2023 08:55:12 -0400 Subject: [PATCH 07/10] fix move test --- tests/test_s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_s3.py b/tests/test_s3.py index 4abbd9cd..92fd6976 100644 --- a/tests/test_s3.py +++ b/tests/test_s3.py @@ -1054,7 +1054,7 @@ def test_move_object_between_buckets( s3_bucket_with_object.move_object( "object", "object_copy_1", to_bucket=s3_bucket_2_empty ) - assert s3_bucket_with_object.read_path("object_copy_1") == b"TEST" + assert s3_bucket_2_empty.read_path("object_copy_1") == b"TEST" with pytest.raises(ClientError): assert s3_bucket_with_object.read_path("object") == b"TEST" From f553adc5435572d30c07563a832b145d8d551efa Mon Sep 17 00:00:00 2001 From: Dominic Tarro Date: Thu, 28 Sep 2023 14:38:50 -0400 Subject: [PATCH 08/10] method docstring update --- prefect_aws/s3.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/prefect_aws/s3.py b/prefect_aws/s3.py index 64212927..a42e6ad9 100644 --- a/prefect_aws/s3.py +++ b/prefect_aws/s3.py @@ -161,7 +161,8 @@ async def s3_copy( [CopyObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html) to copy objects within or between buckets. To copy objects between buckets, the credentials must have permission to read the source object and write to the target - object. + object. If the credentials do not have those permissions, try using + `S3Bucket.stream_from`. Args: source_path: The path to the object to copy. Can be a string or `Path`. @@ -260,7 +261,11 @@ async def s3_move( aws_client_parameters: AwsClientParameters = AwsClientParameters(), ) -> str: """ - Move an object from one S3 location to another + Move an object from one S3 location to another. To move objects between buckets, + the credentials must have permission to read and delete the source object and write + to the target object. If the credentials do not have those permissions, this method + will raise an error. If the credentials have permission to read the source object + but not delete it, the object will be copied but not deleted. Args: source_path: The path of the object to move @@ -952,7 +957,9 @@ async def stream_from( to_path: Optional[str] = None, **upload_kwargs: Dict[str, Any], ) -> str: - """Streams an object from another bucket to this bucket. + """Streams an object from another bucket to this bucket. Requires the + object to be downloaded and uploaded in chunks. If `self`'s credentials + allow for writes to the other bucket, try using `S3Bucket.copy_object`. Args: bucket: The bucket to stream from. @@ -1191,7 +1198,10 @@ async def copy_object( ) -> str: """Uses S3's internal [CopyObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html) - to copy objects within or between buckets. + to copy objects within or between buckets. To copy objects between buckets, + `self`'s credentials must have permission to read the source object and write + to the target object. If the credentials do not have those permissions, try + using `S3Bucket.stream_from`. Args: from_path: The path of the object to copy. @@ -1271,7 +1281,11 @@ async def move_object( to_bucket: Optional[Union["S3Bucket", str]] = None, ) -> str: """Uses S3's internal CopyObject and DeleteObject to move objects within or - between buckets. + between buckets. To move objects between buckets, `self`'s credentials must + have permission to read and delete the source object and write to the target + object. If the credentials do not have those permissions, this method will + raise an error. If the credentials have permission to read the source object + but not delete it, the object will be copied but not deleted. Args: from_path: The path of the object to move. From 910471a0a5d4c7ca6e9c98736790bf8396dc994a Mon Sep 17 00:00:00 2001 From: Dominic Tarro Date: Thu, 28 Sep 2023 14:48:57 -0400 Subject: [PATCH 09/10] remove client parameters --- prefect_aws/s3.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/prefect_aws/s3.py b/prefect_aws/s3.py index a42e6ad9..c0a22de0 100644 --- a/prefect_aws/s3.py +++ b/prefect_aws/s3.py @@ -154,7 +154,6 @@ async def s3_copy( source_bucket_name: str, aws_credentials: AwsCredentials, target_bucket_name: Optional[str] = None, - aws_client_parameters: AwsClientParameters = AwsClientParameters(), **copy_kwargs, ) -> str: """Uses S3's internal @@ -171,7 +170,6 @@ async def s3_copy( aws_credentials: Credentials to use for authentication with AWS. target_bucket_name: The bucket to copy the object to. If not provided, defaults to `source_bucket`. - aws_client_parameters: Custom parameter for the boto3 client initialization. **copy_kwargs: Additional keyword arguments to pass to `S3Client.copy_object`. Returns: @@ -227,9 +225,7 @@ async def example_copy_flow(): """ logger = get_run_logger() - s3_client = aws_credentials.get_boto3_session().client( - "s3", **aws_client_parameters.get_params_override() - ) + s3_client = aws_credentials.get_s3_client() target_bucket_name = target_bucket_name or source_bucket_name @@ -258,7 +254,6 @@ async def s3_move( source_bucket_name: str, aws_credentials: AwsCredentials, target_bucket_name: Optional[str] = None, - aws_client_parameters: AwsClientParameters = AwsClientParameters(), ) -> str: """ Move an object from one S3 location to another. To move objects between buckets, @@ -274,16 +269,13 @@ async def s3_move( aws_credentials: Credentials to use for authentication with AWS. target_bucket_name: The bucket to copy the object to. If not provided, defaults to `source_bucket`. - aws_client_parameters: Custom parameter for the boto3 client initialization. Returns: The path that the object was moved to. Excludes the bucket name. """ logger = get_run_logger() - s3_client = aws_credentials.get_boto3_session().client( - "s3", **aws_client_parameters.get_params_override() - ) + s3_client = aws_credentials.get_s3_client() # If target bucket is not provided, assume it's the same as the source bucket target_bucket_name = target_bucket_name or source_bucket_name From 15ccec3d3154b1e99660c6d208e38ab51c4ed628 Mon Sep 17 00:00:00 2001 From: Dominic Tarro Date: Thu, 28 Sep 2023 15:00:36 -0400 Subject: [PATCH 10/10] correct client_parameter fixture uses --- tests/test_s3.py | 27 ++++++--------------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/tests/test_s3.py b/tests/test_s3.py index 92fd6976..89a39f7d 100644 --- a/tests/test_s3.py +++ b/tests/test_s3.py @@ -224,9 +224,8 @@ async def test_flow(): assert output == b"NEW OBJECT" -# Ignore public bucket client parameters -@pytest.mark.parametrize("client_parameters", aws_clients[:-1], indirect=True) -async def test_s3_copy(object, bucket, bucket_2, client_parameters, aws_credentials): +@pytest.mark.parametrize("client_parameters", aws_clients, indirect=True) +async def test_s3_copy(object, bucket, bucket_2, aws_credentials): def read(bucket, key): stream = io.BytesIO() bucket.download_fileobj(key, stream) @@ -242,7 +241,6 @@ async def test_flow(): source_bucket_name="bucket", aws_credentials=aws_credentials, target_bucket_name="bucket_2", - aws_client_parameters=client_parameters, ) # Test within-bucket copy @@ -251,7 +249,6 @@ async def test_flow(): target_path="subfolder/new_object", source_bucket_name="bucket", aws_credentials=aws_credentials, - aws_client_parameters=client_parameters, ) await test_flow() @@ -259,9 +256,8 @@ async def test_flow(): assert read(bucket, "subfolder/new_object") == b"TEST" -# Ignore public bucket client parameters -@pytest.mark.parametrize("client_parameters", aws_clients[:-1], indirect=True) -async def test_s3_move(object, bucket, bucket_2, client_parameters, aws_credentials): +@pytest.mark.parametrize("client_parameters", aws_clients, indirect=True) +async def test_s3_move(object, bucket, bucket_2, aws_credentials): def read(bucket, key): stream = io.BytesIO() bucket.download_fileobj(key, stream) @@ -276,7 +272,6 @@ async def test_flow(): target_path="subfolder/object_copy", source_bucket_name="bucket", aws_credentials=aws_credentials, - aws_client_parameters=client_parameters, ) # Test cross-bucket move @@ -286,7 +281,6 @@ async def test_flow(): source_bucket_name="bucket", target_bucket_name="bucket_2", aws_credentials=aws_credentials, - aws_client_parameters=client_parameters, ) await test_flow() @@ -300,11 +294,10 @@ async def test_flow(): read(bucket, "subfolder/object_copy") -@pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) +@pytest.mark.parametrize("client_parameters", aws_clients, indirect=True) async def test_move_object_to_nonexistent_bucket_fails( object, bucket, - client_parameters, aws_credentials, ): def read(bucket, key): @@ -322,7 +315,6 @@ async def test_flow(): source_bucket_name="bucket", aws_credentials=aws_credentials, target_bucket_name="nonexistent-bucket", - aws_client_parameters=client_parameters, ) with pytest.raises(ClientError): @@ -331,11 +323,10 @@ async def test_flow(): assert read(bucket, "object") == b"TEST" -@pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) +@pytest.mark.parametrize("client_parameters", aws_clients, indirect=True) async def test_move_object_fail_cases( object, bucket, - client_parameters, aws_credentials, ): def read(bucket, key): @@ -355,7 +346,6 @@ async def test_flow( source_bucket_name=source_bucket_name, aws_credentials=aws_credentials, target_bucket_name=target_bucket_name, - aws_client_parameters=client_parameters, ) # Move to non-existent bucket @@ -991,7 +981,6 @@ def test_copy_object( self, s3_bucket_with_object: S3Bucket, s3_bucket_2_empty: S3Bucket, - client_parameters, ): s3_bucket_with_object.copy_object("object", "object_copy_1") assert s3_bucket_with_object.read_path("object_copy_1") == b"TEST" @@ -1014,7 +1003,6 @@ def test_copy_object( def test_move_object_within_bucket( self, s3_bucket_with_object: S3Bucket, - client_parameters, ): s3_bucket_with_object.move_object("object", "object_copy_1") assert s3_bucket_with_object.read_path("object_copy_1") == b"TEST" @@ -1026,7 +1014,6 @@ def test_move_object_within_bucket( def test_move_object_to_nonexistent_bucket_fails( self, s3_bucket_with_object: S3Bucket, - client_parameters, ): with pytest.raises(ClientError): s3_bucket_with_object.move_object( @@ -1038,7 +1025,6 @@ def test_move_object_to_nonexistent_bucket_fails( def test_move_object_onto_itself_fails( self, s3_bucket_with_object: S3Bucket, - client_parameters, ): with pytest.raises(ClientError): s3_bucket_with_object.move_object("object", "object") @@ -1049,7 +1035,6 @@ def test_move_object_between_buckets( self, s3_bucket_with_object: S3Bucket, s3_bucket_2_empty: S3Bucket, - client_parameters, ): s3_bucket_with_object.move_object( "object", "object_copy_1", to_bucket=s3_bucket_2_empty