diff --git a/CHANGELOG.md b/CHANGELOG.md index e0a447c8..22282b5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- AWS S3 copy and move tasks and `S3Bucket` methods - [#316](https://github.com/PrefectHQ/prefect-aws/pull/316) + ### Added - Added retries to ECS task run creation for ECS worker - [#303](https://github.com/PrefectHQ/prefect-aws/pull/303) diff --git a/prefect_aws/s3.py b/prefect_aws/s3.py index 17fe8870..c0a22de0 100644 --- a/prefect_aws/s3.py +++ b/prefect_aws/s3.py @@ -147,6 +147,160 @@ async def example_s3_upload_flow(): return key +@task +async def s3_copy( + source_path: str, + target_path: str, + source_bucket_name: str, + aws_credentials: AwsCredentials, + target_bucket_name: Optional[str] = None, + **copy_kwargs, +) -> str: + """Uses S3's internal + [CopyObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html) + to copy objects within or between buckets. To copy objects between buckets, the + credentials must have permission to read the source object and write to the target + object. If the credentials do not have those permissions, try using + `S3Bucket.stream_from`. + + Args: + source_path: The path to the object to copy. Can be a string or `Path`. + target_path: The path to copy the object to. Can be a string or `Path`. + source_bucket_name: The bucket to copy the object from. + aws_credentials: Credentials to use for authentication with AWS. + target_bucket_name: The bucket to copy the object to. If not provided, defaults + to `source_bucket`. + **copy_kwargs: Additional keyword arguments to pass to `S3Client.copy_object`. + + Returns: + The path that the object was copied to. Excludes the bucket name. + + Examples: + + Copy notes.txt from s3://my-bucket/my_folder/notes.txt to + s3://my-bucket/my_folder/notes_copy.txt. + + ```python + from prefect import flow + from prefect_aws import AwsCredentials + from prefect_aws.s3 import s3_copy + + aws_credentials = AwsCredentials.load("my-creds") + + @flow + async def example_copy_flow(): + await s3_copy( + source_path="my_folder/notes.txt", + target_path="my_folder/notes_copy.txt", + source_bucket_name="my-bucket", + aws_credentials=aws_credentials, + ) + + example_copy_flow() + ``` + + Copy notes.txt from s3://my-bucket/my_folder/notes.txt to + s3://other-bucket/notes_copy.txt. + + ```python + from prefect import flow + from prefect_aws import AwsCredentials + from prefect_aws.s3 import s3_copy + + aws_credentials = AwsCredentials.load("shared-creds") + + @flow + async def example_copy_flow(): + await s3_copy( + source_path="my_folder/notes.txt", + target_path="notes_copy.txt", + source_bucket_name="my-bucket", + aws_credentials=aws_credentials, + target_bucket_name="other-bucket", + ) + + example_copy_flow() + ``` + + """ + logger = get_run_logger() + + s3_client = aws_credentials.get_s3_client() + + target_bucket_name = target_bucket_name or source_bucket_name + + logger.info( + "Copying object from bucket %s with key %s to bucket %s with key %s", + source_bucket_name, + source_path, + target_bucket_name, + target_path, + ) + + s3_client.copy_object( + CopySource={"Bucket": source_bucket_name, "Key": source_path}, + Bucket=target_bucket_name, + Key=target_path, + **copy_kwargs, + ) + + return target_path + + +@task +async def s3_move( + source_path: str, + target_path: str, + source_bucket_name: str, + aws_credentials: AwsCredentials, + target_bucket_name: Optional[str] = None, +) -> str: + """ + Move an object from one S3 location to another. To move objects between buckets, + the credentials must have permission to read and delete the source object and write + to the target object. If the credentials do not have those permissions, this method + will raise an error. If the credentials have permission to read the source object + but not delete it, the object will be copied but not deleted. + + Args: + source_path: The path of the object to move + target_path: The path to move the object to + source_bucket_name: The name of the bucket containing the source object + aws_credentials: Credentials to use for authentication with AWS. + target_bucket_name: The bucket to copy the object to. If not provided, defaults + to `source_bucket`. + + Returns: + The path that the object was moved to. Excludes the bucket name. + """ + logger = get_run_logger() + + s3_client = aws_credentials.get_s3_client() + + # If target bucket is not provided, assume it's the same as the source bucket + target_bucket_name = target_bucket_name or source_bucket_name + + logger.info( + "Moving object from s3://%s/%s s3://%s/%s", + source_bucket_name, + source_path, + target_bucket_name, + target_path, + ) + + # Copy the object to the new location + s3_client.copy_object( + Bucket=target_bucket_name, + CopySource={"Bucket": source_bucket_name, "Key": source_path}, + Key=target_path, + ) + + # Delete the original object + s3_client.delete_object(Bucket=source_bucket_name, Key=source_path) + + return target_path + + def _list_objects_sync(page_iterator: PageIterator): """ Synchronous method to collect S3 objects into a list @@ -795,7 +949,9 @@ async def stream_from( to_path: Optional[str] = None, **upload_kwargs: Dict[str, Any], ) -> str: - """Streams an object from another bucket to this bucket. + """Streams an object from another bucket to this bucket. Requires the + object to be downloaded and uploaded in chunks. If `self`'s credentials + allow for writes to the other bucket, try using `S3Bucket.copy_object`. Args: bucket: The bucket to stream from. @@ -1023,3 +1179,170 @@ async def upload_from_folder( ) return to_folder + + @sync_compatible + async def copy_object( + self, + from_path: Union[str, Path], + to_path: Union[str, Path], + to_bucket: Optional[Union["S3Bucket", str]] = None, + **copy_kwargs, + ) -> str: + """Uses S3's internal + [CopyObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html) + to copy objects within or between buckets. To copy objects between buckets, + `self`'s credentials must have permission to read the source object and write + to the target object. If the credentials do not have those permissions, try + using `S3Bucket.stream_from`. + + Args: + from_path: The path of the object to copy. + to_path: The path to copy the object to. + to_bucket: The bucket to copy to. Defaults to the current bucket. + **copy_kwargs: Additional keyword arguments to pass to + `S3Client.copy_object`. + + Returns: + The path that the object was copied to. Excludes the bucket name. + + Examples: + + Copy notes.txt from my_folder/notes.txt to my_folder/notes_copy.txt. + + ```python + from prefect_aws.s3 import S3Bucket + + s3_bucket = S3Bucket.load("my-bucket") + s3_bucket.copy_object("my_folder/notes.txt", "my_folder/notes_copy.txt") + ``` + + Copy notes.txt from my_folder/notes.txt to my_folder/notes_copy.txt in + another bucket. + + ```python + from prefect_aws.s3 import S3Bucket + + s3_bucket = S3Bucket.load("my-bucket") + s3_bucket.copy_object( + "my_folder/notes.txt", + "my_folder/notes_copy.txt", + to_bucket="other-bucket" + ) + ``` + """ + s3_client = self.credentials.get_s3_client() + + source_path = self._resolve_path(Path(from_path).as_posix()) + target_path = self._resolve_path(Path(to_path).as_posix()) + + source_bucket_name = self.bucket_name + target_bucket_name = self.bucket_name + if isinstance(to_bucket, S3Bucket): + target_bucket_name = to_bucket.bucket_name + target_path = to_bucket._resolve_path(target_path) + elif isinstance(to_bucket, str): + target_bucket_name = to_bucket + elif to_bucket is not None: + raise TypeError( + "to_bucket must be a string or S3Bucket, not" + f" {type(target_bucket_name)}" + ) + + self.logger.info( + "Copying object from bucket %s with key %s to bucket %s with key %s", + source_bucket_name, + source_path, + target_bucket_name, + target_path, + ) + + s3_client.copy_object( + CopySource={"Bucket": source_bucket_name, "Key": source_path}, + Bucket=target_bucket_name, + Key=target_path, + **copy_kwargs, + ) + + return target_path + + @sync_compatible + async def move_object( + self, + from_path: Union[str, Path], + to_path: Union[str, Path], + to_bucket: Optional[Union["S3Bucket", str]] = None, + ) -> str: + """Uses S3's internal CopyObject and DeleteObject to move objects within or + between buckets. To move objects between buckets, `self`'s credentials must + have permission to read and delete the source object and write to the target + object. If the credentials do not have those permissions, this method will + raise an error. If the credentials have permission to read the source object + but not delete it, the object will be copied but not deleted. + + Args: + from_path: The path of the object to move. + to_path: The path to move the object to. + to_bucket: The bucket to move to. Defaults to the current bucket. + + Returns: + The path that the object was moved to. Excludes the bucket name. + + Examples: + + Move notes.txt from my_folder/notes.txt to my_folder/notes_copy.txt. + + ```python + from prefect_aws.s3 import S3Bucket + + s3_bucket = S3Bucket.load("my-bucket") + s3_bucket.move_object("my_folder/notes.txt", "my_folder/notes_copy.txt") + ``` + + Move notes.txt from my_folder/notes.txt to my_folder/notes_copy.txt in + another bucket. + + ```python + from prefect_aws.s3 import S3Bucket + + s3_bucket = S3Bucket.load("my-bucket") + s3_bucket.move_object( + "my_folder/notes.txt", + "my_folder/notes_copy.txt", + to_bucket="other-bucket" + ) + ``` + """ + s3_client = self.credentials.get_s3_client() + + source_path = self._resolve_path(Path(from_path).as_posix()) + target_path = self._resolve_path(Path(to_path).as_posix()) + + source_bucket_name = self.bucket_name + target_bucket_name = self.bucket_name + if isinstance(to_bucket, S3Bucket): + target_bucket_name = to_bucket.bucket_name + target_path = to_bucket._resolve_path(target_path) + elif isinstance(to_bucket, str): + target_bucket_name = to_bucket + elif to_bucket is not None: + raise TypeError( + "to_bucket must be a string or S3Bucket, not" + f" {type(target_bucket_name)}" + ) + + self.logger.info( + "Moving object from s3://%s/%s to s3://%s/%s", + source_bucket_name, + source_path, + target_bucket_name, + target_path, + ) + + # If invalid, should error and prevent next operation + s3_client.copy( + CopySource={"Bucket": source_bucket_name, "Key": source_path}, + Bucket=target_bucket_name, + Key=target_path, + ) + s3_client.delete_object(Bucket=source_bucket_name, Key=source_path) + return target_path diff --git a/tests/test_s3.py b/tests/test_s3.py index c0159e25..89a39f7d 100644 --- a/tests/test_s3.py +++ b/tests/test_s3.py @@ -12,7 +12,14 @@ from prefect_aws import AwsCredentials, MinIOCredentials from prefect_aws.client_parameters import AwsClientParameters -from prefect_aws.s3 import S3Bucket, s3_download, s3_list_objects, s3_upload +from prefect_aws.s3 import ( + S3Bucket, + s3_copy, + s3_download, + s3_list_objects, + s3_move, + s3_upload, +) aws_clients = [ (lazy_fixture("aws_client_parameters_custom_endpoint")), @@ -47,6 +54,18 @@ def bucket(s3_mock, request): return bucket +@pytest.fixture +def bucket_2(s3_mock, request): + s3 = boto3.resource("s3") + bucket = s3.Bucket("bucket_2") + marker = request.node.get_closest_marker("is_public", None) + if marker and marker.args[0]: + bucket.create(ACL="public-read") + else: + bucket.create() + return bucket + + @pytest.fixture def object(bucket, tmp_path): file = tmp_path / "object.txt" @@ -205,6 +224,151 @@ async def test_flow(): assert output == b"NEW OBJECT" +@pytest.mark.parametrize("client_parameters", aws_clients, indirect=True) +async def test_s3_copy(object, bucket, bucket_2, aws_credentials): + def read(bucket, key): + stream = io.BytesIO() + bucket.download_fileobj(key, stream) + stream.seek(0) + return stream.read() + + @flow + async def test_flow(): + # Test cross-bucket copy + await s3_copy( + source_path="object", + target_path="subfolder/new_object", + source_bucket_name="bucket", + aws_credentials=aws_credentials, + target_bucket_name="bucket_2", + ) + + # Test within-bucket copy + await s3_copy( + source_path="object", + target_path="subfolder/new_object", + source_bucket_name="bucket", + aws_credentials=aws_credentials, + ) + + await test_flow() + assert read(bucket_2, "subfolder/new_object") == b"TEST" + assert read(bucket, "subfolder/new_object") == b"TEST" + + +@pytest.mark.parametrize("client_parameters", aws_clients, indirect=True) +async def test_s3_move(object, bucket, bucket_2, aws_credentials): + def read(bucket, key): + stream = io.BytesIO() + bucket.download_fileobj(key, stream) + stream.seek(0) + return stream.read() + + @flow + async def test_flow(): + # Test within-bucket move + await s3_move( + source_path="object", + target_path="subfolder/object_copy", + source_bucket_name="bucket", + aws_credentials=aws_credentials, + ) + + # Test cross-bucket move + await s3_move( + source_path="subfolder/object_copy", + target_path="object_copy_2", + source_bucket_name="bucket", + target_bucket_name="bucket_2", + aws_credentials=aws_credentials, + ) + + await test_flow() + + assert read(bucket_2, "object_copy_2") == b"TEST" + + with pytest.raises(ClientError): + read(bucket, "object") + + with pytest.raises(ClientError): + read(bucket, "subfolder/object_copy") + + +@pytest.mark.parametrize("client_parameters", aws_clients, indirect=True) +async def test_move_object_to_nonexistent_bucket_fails( + object, + bucket, + aws_credentials, +): + def read(bucket, key): + stream = io.BytesIO() + bucket.download_fileobj(key, stream) + stream.seek(0) + return stream.read() + + @flow + async def test_flow(): + # Test cross-bucket move + await s3_move( + source_path="object", + target_path="subfolder/new_object", + source_bucket_name="bucket", + aws_credentials=aws_credentials, + target_bucket_name="nonexistent-bucket", + ) + + with pytest.raises(ClientError): + await test_flow() + + assert read(bucket, "object") == b"TEST" + + +@pytest.mark.parametrize("client_parameters", aws_clients, indirect=True) +async def test_move_object_fail_cases( + object, + bucket, + aws_credentials, +): + def read(bucket, key): + stream = io.BytesIO() + bucket.download_fileobj(key, stream) + stream.seek(0) + return stream.read() + + @flow + async def test_flow( + source_path, target_path, source_bucket_name, target_bucket_name + ): + # Test cross-bucket move + await s3_move( + source_path=source_path, + target_path=target_path, + source_bucket_name=source_bucket_name, + aws_credentials=aws_credentials, + target_bucket_name=target_bucket_name, + ) + + # Move to non-existent bucket + with pytest.raises(ClientError): + await test_flow( + source_path="object", + target_path="subfolder/new_object", + source_bucket_name="bucket", + target_bucket_name="nonexistent-bucket", + ) + assert read(bucket, "object") == b"TEST" + + # Move onto self + with pytest.raises(ClientError): + await test_flow( + source_path="object", + target_path="object", + source_bucket_name="bucket", + target_bucket_name="bucket", + ) + assert read(bucket, "object") == b"TEST" + + @pytest.mark.parametrize("client_parameters", aws_clients, indirect=True) async def test_s3_list_objects( object, client_parameters, object_in_folder, aws_credentials @@ -623,9 +787,9 @@ def s3_bucket_empty(self, credentials, bucket): return _s3_bucket @pytest.fixture - def s3_bucket_2_empty(self, credentials, bucket): + def s3_bucket_2_empty(self, credentials, bucket_2): _s3_bucket = S3Bucket( - bucket_name="bucket", + bucket_name="bucket_2", credentials=credentials, bucket_folder="subfolder", ) @@ -811,3 +975,71 @@ def test_upload_from_folder( break else: raise AssertionError("Files did upload") + + @pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) + def test_copy_object( + self, + s3_bucket_with_object: S3Bucket, + s3_bucket_2_empty: S3Bucket, + ): + s3_bucket_with_object.copy_object("object", "object_copy_1") + assert s3_bucket_with_object.read_path("object_copy_1") == b"TEST" + + s3_bucket_with_object.copy_object("object", "folder/object_copy_2") + assert s3_bucket_with_object.read_path("folder/object_copy_2") == b"TEST" + + # S3Bucket for second bucket has a basepath + s3_bucket_with_object.copy_object( + "object", + s3_bucket_2_empty._resolve_path("object_copy_3"), + to_bucket="bucket_2", + ) + assert s3_bucket_2_empty.read_path("object_copy_3") == b"TEST" + + s3_bucket_with_object.copy_object("object", "object_copy_4", s3_bucket_2_empty) + assert s3_bucket_2_empty.read_path("object_copy_4") == b"TEST" + + @pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) + def test_move_object_within_bucket( + self, + s3_bucket_with_object: S3Bucket, + ): + s3_bucket_with_object.move_object("object", "object_copy_1") + assert s3_bucket_with_object.read_path("object_copy_1") == b"TEST" + + with pytest.raises(ClientError): + assert s3_bucket_with_object.read_path("object") == b"TEST" + + @pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) + def test_move_object_to_nonexistent_bucket_fails( + self, + s3_bucket_with_object: S3Bucket, + ): + with pytest.raises(ClientError): + s3_bucket_with_object.move_object( + "object", "object_copy_1", to_bucket="nonexistent-bucket" + ) + assert s3_bucket_with_object.read_path("object") == b"TEST" + + @pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) + def test_move_object_onto_itself_fails( + self, + s3_bucket_with_object: S3Bucket, + ): + with pytest.raises(ClientError): + s3_bucket_with_object.move_object("object", "object") + assert s3_bucket_with_object.read_path("object") == b"TEST" + + @pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) + def test_move_object_between_buckets( + self, + s3_bucket_with_object: S3Bucket, + s3_bucket_2_empty: S3Bucket, + ): + s3_bucket_with_object.move_object( + "object", "object_copy_1", to_bucket=s3_bucket_2_empty + ) + assert s3_bucket_2_empty.read_path("object_copy_1") == b"TEST" + + with pytest.raises(ClientError): + assert s3_bucket_with_object.read_path("object") == b"TEST"