diff --git a/docs/includes/list.py b/docs/includes/list.py index 67095b9..1e9c783 100644 --- a/docs/includes/list.py +++ b/docs/includes/list.py @@ -3,7 +3,7 @@ async def main(): - objects = await S3BucketObjects("my-bucket").list("my-prefix/", max_depth=2, max_folders=20) + objects = await S3BucketObjects("my-bucket").list("my-prefix/", max_level=2, max_folders=20) for obj in objects: print(obj["Key"]) diff --git a/docs/src/en/as3.md b/docs/src/en/as3.md index 428ba4f..4c0f574 100644 --- a/docs/src/en/as3.md +++ b/docs/src/en/as3.md @@ -5,7 +5,7 @@ Simple utility to debug classes from the package. For example ```bash -as3 du s3://my-bucket/my-key -d 1 -f 20 -r 3 +as3 du s3://my-bucket/my-key -l 1 -f 20 -r 3 ``` Show the size and number of objects in `s3://my-bucket/my-key`. diff --git a/docs/src/en/index.md b/docs/src/en/index.md index 3830926..82d6bd3 100644 --- a/docs/src/en/index.md +++ b/docs/src/en/index.md @@ -16,7 +16,7 @@ Read detailed explanation in [the blog post](https://sorokin.engineer/posts/en/a ```python --8<-- "list.py" ``` -You can control the depth of recursion by specifying the `max_depth` parameter, +You can control the depth of recursion by specifying the `max_level` parameter, by default depth is not limited. `max_folders` parameter allows you to group folders by prefix to reduce the number of API calls. @@ -25,7 +25,7 @@ by default depth is not limited. Process objects asynchronously while gathering the objects list from AWS. ```python -async for page in S3BucketObjects(bucket='my-bucket').iter("my-prefix/", max_depth=2, max_folders=10): +async for page in S3BucketObjects(bucket='my-bucket').iter("my-prefix/", max_level=2, max_folders=10): for obj in page: print(obj['Key']) ``` diff --git a/docs/src/ru/as3.md b/docs/src/ru/as3.md index 97e486e..b1676a5 100644 --- a/docs/src/ru/as3.md +++ b/docs/src/ru/as3.md @@ -5,7 +5,7 @@ Например ```bash -as3 du s3://my-bucket/my-key -d 1 -f 20 -r 3 +as3 du s3://my-bucket/my-key -l 1 -f 20 -r 3 ``` Покажет размер и количество объектов в `s3://my-bucket/my-key`. diff --git a/docs/src/ru/index.md b/docs/src/ru/index.md index 3d1d46e..2855b66 100644 --- a/docs/src/ru/index.md +++ b/docs/src/ru/index.md @@ -18,7 +18,7 @@ ```python --8<-- "list.py" ``` -Вы можете контролировать глубину рекурсии, указывая параметр max_depth, по умолчанию глубина не ограничена. +Вы можете контролировать глубину рекурсии, указывая параметр max_level, по умолчанию глубина не ограничена. С помощью `max_folders` вы можете группировать каталоги по префиксам для уменьшения количества API вызовов. @@ -27,7 +27,7 @@ Обрабатывайте объекты асинхронно, одновременно с получением списка объектов из AWS. ```python -async for page in S3BucketObjects(bucket='my-bucket').iter("my-prefix/", max_depth=2, max_folders=10): +async for page in S3BucketObjects(bucket='my-bucket').iter("my-prefix/", max_level=2, max_folders=10): for obj in page: print(obj['Key']) ``` diff --git a/src/async_s3/main.py b/src/async_s3/main.py index 7bc6bd5..23eff90 100644 --- a/src/async_s3/main.py +++ b/src/async_s3/main.py @@ -42,11 +42,11 @@ def list_objects_options(func: Callable[[Any], None]) -> Callable[[Any], None]: """Add common options to commands using list_objects.""" func = click.argument("s3_url")(func) func = click.option( - "--max-depth", - "-d", + "--max-level", + "-l", type=int, default=None, - help="The maximum folders depth to traverse in separate requests. By default traverse all levels.", + help="The maximum folders level to traverse in separate requests. By default traverse all levels.", )(func) func = click.option( "--max-folders", @@ -60,14 +60,35 @@ def list_objects_options(func: Callable[[Any], None]) -> Callable[[Any], None]: "-r", type=int, default=1, - help="Repeat the operation multiple times to average elapsed time.", + help="Repeat the operation multiple times to average elapsed time. By default repeat once.", + )(func) + func = click.option( + "--parallelism", + "-p", + type=int, + default=100, + help="The maximum number of concurrent requests to AWS S3. By default 100.", + )(func) + func = click.option( + "--delimiter", + "-d", + type=str, + default="/", + help="Delimiter for 'folders'. Default is '/'.", )(func) return func @list_objects_options @as3.command() -def ls(s3_url: str, max_depth: Optional[int], max_folders: Optional[int], repeat: int) -> None: +def ls( # pylint: disable=too-many-arguments + s3_url: str, + max_level: Optional[int], + max_folders: Optional[int], + repeat: int, + parallelism: int, + delimiter: str, +) -> None: """ List objects in an S3 bucket. @@ -77,14 +98,28 @@ def ls(s3_url: str, max_depth: Optional[int], max_folders: Optional[int], repeat if not s3_url.startswith(S3PROTO): error("Invalid S3 URL. It should start with s3://") - objects = list_objects(s3_url, max_depth=max_depth, max_folders=max_folders, repeat=repeat) + objects = list_objects( + s3_url, + max_level=max_level, + max_folders=max_folders, + repeat=repeat, + parallelism=parallelism, + delimiter=delimiter, + ) click.echo("\n".join([obj["Key"] for obj in objects])) print_summary(objects) @list_objects_options @as3.command() -def du(s3_url: str, max_depth: Optional[int], max_folders: Optional[int], repeat: int) -> None: +def du( # pylint: disable=too-many-arguments + s3_url: str, + max_level: Optional[int], + max_folders: Optional[int], + repeat: int, + parallelism: int, + delimiter: str, +) -> None: """ Show count and size for objects in an S3 bucket. @@ -94,7 +129,14 @@ def du(s3_url: str, max_depth: Optional[int], max_folders: Optional[int], repeat if not s3_url.startswith(S3PROTO): error("Invalid S3 URL. It should start with s3://") - objects = list_objects(s3_url, max_depth=max_depth, max_folders=max_folders, repeat=repeat) + objects = list_objects( + s3_url, + max_level=max_level, + max_folders=max_folders, + repeat=repeat, + parallelism=parallelism, + delimiter=delimiter, + ) print_summary(objects) @@ -107,20 +149,34 @@ def human_readable_size(size: float, decimal_places: int = 2) -> str: return f"{size:.{decimal_places}f} {unit}" -def list_objects( +def list_objects( # pylint: disable=too-many-arguments s3_url: str, - max_depth: Optional[int] = None, + max_level: Optional[int] = None, max_folders: Optional[int] = None, repeat: int = 1, + parallelism: int = 100, + delimiter: str = "/", ) -> Iterable[Dict[str, Any]]: """List objects in an S3 bucket.""" return asyncio.run( - list_objects_async(s3_url, max_depth=max_depth, max_folders=max_folders, repeat=repeat) + list_objects_async( + s3_url, + max_level=max_level, + max_folders=max_folders, + repeat=repeat, + parallelism=parallelism, + delimiter=delimiter, + ) ) -async def list_objects_async( - s3_url: str, max_depth: Optional[int], max_folders: Optional[int], repeat: int +async def list_objects_async( # pylint: disable=too-many-arguments + s3_url: str, + max_level: Optional[int], + max_folders: Optional[int], + repeat: int, + parallelism: int, + delimiter: str, ) -> Iterable[Dict[str, Any]]: """List objects in an S3 bucket.""" assert repeat > 0 @@ -129,21 +185,23 @@ async def list_objects_async( f"{click.style(s3_url, fg='green', bold=True)}" ) click.echo( - f"{click.style('max_depth: ', fg='green')}" - f"{click.style(str(max_depth), fg='green', bold=True)}, " + f"{click.style('max_level: ', fg='green')}" + f"{click.style(str(max_level), fg='green', bold=True)}, " f"{click.style('max_folders: ', fg='green')}" f"{click.style(str(max_folders), fg='green', bold=True)}, " f"{click.style(str(repeat), fg='green', bold=True)}" f"{click.style(' times.', fg='green')}" ) bucket, key = s3_url[len(S3PROTO) :].split("/", 1) - s3_list = S3BucketObjects(bucket) + s3_list = S3BucketObjects(bucket, parallelism=parallelism) total_time = 0.0 for _ in range(repeat): start_time = time.time() try: - result = await s3_list.list(key, max_depth=max_depth, max_folders=max_folders) + result = await s3_list.list( + key, max_level=max_level, max_folders=max_folders, delimiter=delimiter + ) except botocore.exceptions.ClientError as exc: error(f"Error: {exc}") end_time = time.time() diff --git a/src/async_s3/s3_bucket_objects.py b/src/async_s3/s3_bucket_objects.py index 4fcafdd..3b41a2d 100644 --- a/src/async_s3/s3_bucket_objects.py +++ b/src/async_s3/s3_bucket_objects.py @@ -9,7 +9,7 @@ from async_s3.group_by_prefix import group_by_prefix -MAX_CONCURRENT_TASKS = 100 +DEFAULT_PARALLELISM = 100 @functools.lru_cache() @@ -28,17 +28,23 @@ def get_s3_client() -> aiobotocore.client.AioBaseClient: class S3BucketObjects: - def __init__(self, bucket: str) -> None: + def __init__(self, bucket: str, *, parallelism: int = DEFAULT_PARALLELISM) -> None: + """Initialize the S3BucketObjects object. + + bucket: The name of the S3 bucket. + parallelism: The maximum number of concurrent requests to AWS S3. + """ self._bucket = bucket - self.semaphore = asyncio.Semaphore(MAX_CONCURRENT_TASKS) + self.semaphore = asyncio.Semaphore(parallelism) async def _list_objects( # pylint: disable=too-many-arguments, too-many-locals self, s3_client: aiobotocore.client.AioBaseClient, prefix: str, current_depth: int, - max_depth: Optional[int], + max_level: Optional[int], max_folders: Optional[int], + delimiter: str, objects_keys: Set[str], queue: asyncio.Queue[List[Dict[str, Any]]], active_tasks: Set[asyncio.Task[None]], @@ -48,15 +54,15 @@ async def _list_objects( # pylint: disable=too-many-arguments, too-many-locals prefixes = [] params = {"Bucket": self._bucket, "Prefix": prefix} - if (current_depth != -1) and (max_depth is None or current_depth < max_depth): - params["Delimiter"] = "/" + if (current_depth != -1) and (max_level is None or current_depth < max_level): + params["Delimiter"] = delimiter async for page in paginator.paginate(**params): objects = page.get("Contents", []) new_keys = { obj["Key"] for obj in objects - if not obj["Key"].endswith("/") and obj["Key"] not in objects_keys + if not obj["Key"].endswith(delimiter) and obj["Key"] not in objects_keys } cleared_objects = [obj for obj in objects if obj["Key"] in new_keys] objects_keys.update(new_keys) @@ -78,8 +84,9 @@ async def _list_objects( # pylint: disable=too-many-arguments, too-many-locals s3_client, folder, level, - max_depth, + max_level, max_folders, + delimiter, objects_keys, queue, active_tasks, @@ -95,17 +102,19 @@ async def iter( self, prefix: str = "/", *, - max_depth: Optional[int] = None, + max_level: Optional[int] = None, max_folders: Optional[int] = None, + delimiter: str = "/", ) -> AsyncIterator[List[Dict[str, Any]]]: """Generator that yields objects in the bucket with the given prefix. Yield objects by partial chunks (list of AWS S3 object dicts) as they are collected from AWS asynchronously. - max_depth: The maximum folders depth to traverse in separate requests. If None, traverse all levels. + max_level: The maximum folders depth to traverse in separate requests. If None, traverse all levels. max_folders: The maximum number of folders to load in separate requests. If None, requests all folders. Otherwise, the folders are grouped by prefixes before loading in separate requests. Try to group in the given number of folders if possible. + delimiter: The delimiter for "folders". """ # if we group by prefixes, some objects may be listed multiple times # to avoid this, we store the keys of the objects already listed @@ -125,8 +134,9 @@ async def iter( s3_client, prefix, 0, - max_depth, + max_level, max_folders, + delimiter, objects_keys, queue, active_tasks, @@ -165,17 +175,21 @@ async def list( self, prefix: str = "/", *, - max_depth: Optional[int] = None, + max_level: Optional[int] = None, max_folders: Optional[int] = None, + delimiter: str = "/", ) -> List[Dict[str, Any]]: """List all objects in the bucket with the given prefix. - max_depth: The maximum folders depth to traverse in separate requests. If None, traverse all levels. + max_level: The maximum folders depth to traverse in separate requests. If None, traverse all levels. max_folders: The maximum number of folders to load in separate requests. If None, requests all folders. Otherwise, the folders are grouped by prefixes before loading in separate requests. Try to group to the given `max_folders` if possible. + delimiter: The delimiter for "folders". """ objects = [] - async for objects_page in self.iter(prefix, max_depth=max_depth, max_folders=max_folders): + async for objects_page in self.iter( + prefix, max_level=max_level, max_folders=max_folders, delimiter=delimiter + ): objects.extend(objects_page) return objects diff --git a/tests/test_as3.py b/tests/test_as3.py index 3c2884f..83d15bb 100644 --- a/tests/test_as3.py +++ b/tests/test_as3.py @@ -80,14 +80,16 @@ async def test_list_objects_async(): instance.list = AsyncMock(return_value=mock_result) s3_url = "s3://bucket/key" - max_depth = 1 + max_level = 1 max_folders = 1 repeat = 1 + parallelism = 100 + delimiter = '/' - result = await list_objects_async(s3_url, max_depth, max_folders, repeat) + result = await list_objects_async(s3_url, max_level, max_folders, repeat, parallelism, delimiter) assert result == mock_result - instance.list.assert_awaited_once_with('key', max_depth=max_depth, max_folders=max_folders) + instance.list.assert_awaited_once_with('key', max_level=max_level, max_folders=max_folders, delimiter=delimiter) @pytest.mark.asyncio async def test_list_objects_async_repeat(): @@ -101,12 +103,14 @@ async def test_list_objects_async_repeat(): instance.list = AsyncMock(return_value=mock_result) s3_url = "s3://bucket/key" - max_depth = 1 + max_level = 1 max_folders = 1 repeat = 3 + parallelism = 100 + delimiter = '/' - result = await list_objects_async(s3_url, max_depth, max_folders, repeat) + result = await list_objects_async(s3_url, max_level, max_folders, repeat, parallelism, delimiter) assert result == mock_result assert instance.list.call_count == repeat - instance.list.assert_awaited_with('key', max_depth=max_depth, max_folders=max_folders) + instance.list.assert_awaited_with('key', max_level=max_level, max_folders=max_folders, delimiter=delimiter) diff --git a/tests/test_s3_bucket_objects.py b/tests/test_s3_bucket_objects.py index deeff69..70a7ad4 100644 --- a/tests/test_s3_bucket_objects.py +++ b/tests/test_s3_bucket_objects.py @@ -41,10 +41,10 @@ async def test_s3_bucket_objects_functional(mock_s3_structure): "bucket_name": "mock-bucket" } ], indirect=True) -async def test_s3_bucket_objects_with_max_depth(s3_client_proxy, mock_s3_structure): +async def test_s3_bucket_objects_with_max_level(s3_client_proxy, mock_s3_structure): walker = S3BucketObjects("mock-bucket") - objects = await walker.list(prefix="root/", max_depth=2) + objects = await walker.list(prefix="root/", max_level=2) expected_keys = { 'root/data01/image01.png', 'root/data01/images/img11.jpg', @@ -140,4 +140,59 @@ async def test_s3_bucket_objects_with_max_folders(s3_client_proxy, mock_s3_struc call.get_paginator('list_objects_v2'), call.get_paginator().paginate(Bucket='mock-bucket', Prefix='root/data04'), ] + assert s3_client_proxy.calls == expected_calls + + +@pytest.mark.asyncio +@pytest.mark.parametrize("mock_s3_structure", [ + { + "bucket_structure_file": "bucket_keys.yml", + "get_s3_client_function": "async_s3.s3_bucket_objects.get_s3_client", + "bucket_name": "mock-bucket" + } +], indirect=True) +async def test_s3_bucket_objects_delimiter(s3_client_proxy, mock_s3_structure): + walker = S3BucketObjects("mock-bucket") + + objects = await walker.list(prefix="root/", max_folders=2, delimiter="a") + keys = [obj["Key"] for obj in objects] + from pprint import pprint + pprint(keys) + + expected_keys = { + 'root/data01/image01.png', + 'root/data01/images/img11.jpg', + 'root/data01/temp/', + 'root/data01/docs/doc12.pdf', + 'root/data01/archives/archive13a.zip', + 'root/data01/archives/archive13b.zip', + 'root/data02/report02.docx', + 'root/data02/reports/report21.docx', + 'root/data02/logs/log22.txt', + 'root/data02/scripts/script23.py', + 'root/data03/video03a.mp4', + 'root/data03/video03b.mp4', + 'root/data03/video03c.mp4', + 'root/data04/' + } + pprint(expected_keys) + + assert set(keys) == expected_keys + seen_keys = set() + duplicates = [key for key in keys if key in seen_keys or seen_keys.add(key)] + assert not duplicates, f"Found duplicate keys: {duplicates}" + + # Check calls to the S3 client + expected_calls = [ + call.get_paginator("list_objects_v2"), + call.get_paginator().paginate(Bucket="mock-bucket", Prefix="root/", Delimiter="a"), + call.get_paginator('list_objects_v2'), + call.get_paginator().paginate(Bucket='mock-bucket', Prefix='root/da', Delimiter="a"), + call.get_paginator('list_objects_v2'), + call.get_paginator().paginate(Bucket='mock-bucket', Prefix='root/data', Delimiter="a"), + call.get_paginator('list_objects_v2'), + call.get_paginator().paginate(Bucket='mock-bucket', Prefix='root/data01'), + call.get_paginator('list_objects_v2'), + call.get_paginator().paginate(Bucket='mock-bucket', Prefix='root/data03'), + ] assert s3_client_proxy.calls == expected_calls \ No newline at end of file