Skip to content

Commit

Permalink
https://github.com/andgineer/async-s3/issues/13
Browse files Browse the repository at this point in the history
max-depth -> max-level
add customizable delimiter and parallelism
  • Loading branch information
andgineer committed Jun 15, 2024
1 parent d1d0baa commit df4b69d
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 46 deletions.
2 changes: 1 addition & 1 deletion docs/includes/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@


async def main():
objects = await S3BucketObjects("my-bucket").list("my-prefix/", max_depth=2, max_folders=20)
objects = await S3BucketObjects("my-bucket").list("my-prefix/", max_level=2, max_folders=20)

for obj in objects:
print(obj["Key"])
Expand Down
2 changes: 1 addition & 1 deletion docs/src/en/as3.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Simple utility to debug classes from the package.
For example

```bash
as3 du s3://my-bucket/my-key -d 1 -f 20 -r 3
as3 du s3://my-bucket/my-key -l 1 -f 20 -r 3
```

Show the size and number of objects in `s3://my-bucket/my-key`.
Expand Down
4 changes: 2 additions & 2 deletions docs/src/en/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Read detailed explanation in [the blog post](https://sorokin.engineer/posts/en/a
```python
--8<-- "list.py"
```
You can control the depth of recursion by specifying the `max_depth` parameter,
You can control the depth of recursion by specifying the `max_level` parameter,
by default depth is not limited.

`max_folders` parameter allows you to group folders by prefix to reduce the number of API calls.
Expand All @@ -25,7 +25,7 @@ by default depth is not limited.
Process objects asynchronously while gathering the objects list from AWS.

```python
async for page in S3BucketObjects(bucket='my-bucket').iter("my-prefix/", max_depth=2, max_folders=10):
async for page in S3BucketObjects(bucket='my-bucket').iter("my-prefix/", max_level=2, max_folders=10):
for obj in page:
print(obj['Key'])
```
Expand Down
2 changes: 1 addition & 1 deletion docs/src/ru/as3.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
Например

```bash
as3 du s3://my-bucket/my-key -d 1 -f 20 -r 3
as3 du s3://my-bucket/my-key -l 1 -f 20 -r 3
```

Покажет размер и количество объектов в `s3://my-bucket/my-key`.
Expand Down
4 changes: 2 additions & 2 deletions docs/src/ru/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
```python
--8<-- "list.py"
```
Вы можете контролировать глубину рекурсии, указывая параметр max_depth, по умолчанию глубина не ограничена.
Вы можете контролировать глубину рекурсии, указывая параметр max_level, по умолчанию глубина не ограничена.

С помощью `max_folders` вы можете группировать каталоги по префиксам для уменьшения количества API вызовов.

Expand All @@ -27,7 +27,7 @@
Обрабатывайте объекты асинхронно, одновременно с получением списка объектов из AWS.

```python
async for page in S3BucketObjects(bucket='my-bucket').iter("my-prefix/", max_depth=2, max_folders=10):
async for page in S3BucketObjects(bucket='my-bucket').iter("my-prefix/", max_level=2, max_folders=10):
for obj in page:
print(obj['Key'])
```
Expand Down
92 changes: 75 additions & 17 deletions src/async_s3/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ def list_objects_options(func: Callable[[Any], None]) -> Callable[[Any], None]:
"""Add common options to commands using list_objects."""
func = click.argument("s3_url")(func)
func = click.option(
"--max-depth",
"-d",
"--max-level",
"-l",
type=int,
default=None,
help="The maximum folders depth to traverse in separate requests. By default traverse all levels.",
help="The maximum folders level to traverse in separate requests. By default traverse all levels.",
)(func)
func = click.option(
"--max-folders",
Expand All @@ -60,14 +60,35 @@ def list_objects_options(func: Callable[[Any], None]) -> Callable[[Any], None]:
"-r",
type=int,
default=1,
help="Repeat the operation multiple times to average elapsed time.",
help="Repeat the operation multiple times to average elapsed time. By default repeat once.",
)(func)
func = click.option(
"--parallelism",
"-p",
type=int,
default=100,
help="The maximum number of concurrent requests to AWS S3. By default 100.",
)(func)
func = click.option(
"--delimiter",
"-d",
type=str,
default="/",
help="Delimiter for 'folders'. Default is '/'.",
)(func)
return func


@list_objects_options
@as3.command()
def ls(s3_url: str, max_depth: Optional[int], max_folders: Optional[int], repeat: int) -> None:
def ls( # pylint: disable=too-many-arguments
s3_url: str,
max_level: Optional[int],
max_folders: Optional[int],
repeat: int,
parallelism: int,
delimiter: str,
) -> None:
"""
List objects in an S3 bucket.
Expand All @@ -77,14 +98,28 @@ def ls(s3_url: str, max_depth: Optional[int], max_folders: Optional[int], repeat
if not s3_url.startswith(S3PROTO):
error("Invalid S3 URL. It should start with s3://")

objects = list_objects(s3_url, max_depth=max_depth, max_folders=max_folders, repeat=repeat)
objects = list_objects(
s3_url,
max_level=max_level,
max_folders=max_folders,
repeat=repeat,
parallelism=parallelism,
delimiter=delimiter,
)
click.echo("\n".join([obj["Key"] for obj in objects]))
print_summary(objects)


@list_objects_options
@as3.command()
def du(s3_url: str, max_depth: Optional[int], max_folders: Optional[int], repeat: int) -> None:
def du( # pylint: disable=too-many-arguments
s3_url: str,
max_level: Optional[int],
max_folders: Optional[int],
repeat: int,
parallelism: int,
delimiter: str,
) -> None:
"""
Show count and size for objects in an S3 bucket.
Expand All @@ -94,7 +129,14 @@ def du(s3_url: str, max_depth: Optional[int], max_folders: Optional[int], repeat
if not s3_url.startswith(S3PROTO):
error("Invalid S3 URL. It should start with s3://")

objects = list_objects(s3_url, max_depth=max_depth, max_folders=max_folders, repeat=repeat)
objects = list_objects(
s3_url,
max_level=max_level,
max_folders=max_folders,
repeat=repeat,
parallelism=parallelism,
delimiter=delimiter,
)
print_summary(objects)


Expand All @@ -107,20 +149,34 @@ def human_readable_size(size: float, decimal_places: int = 2) -> str:
return f"{size:.{decimal_places}f} {unit}"


def list_objects(
def list_objects( # pylint: disable=too-many-arguments
s3_url: str,
max_depth: Optional[int] = None,
max_level: Optional[int] = None,
max_folders: Optional[int] = None,
repeat: int = 1,
parallelism: int = 100,
delimiter: str = "/",
) -> Iterable[Dict[str, Any]]:
"""List objects in an S3 bucket."""
return asyncio.run(
list_objects_async(s3_url, max_depth=max_depth, max_folders=max_folders, repeat=repeat)
list_objects_async(
s3_url,
max_level=max_level,
max_folders=max_folders,
repeat=repeat,
parallelism=parallelism,
delimiter=delimiter,
)
)


async def list_objects_async(
s3_url: str, max_depth: Optional[int], max_folders: Optional[int], repeat: int
async def list_objects_async( # pylint: disable=too-many-arguments
s3_url: str,
max_level: Optional[int],
max_folders: Optional[int],
repeat: int,
parallelism: int,
delimiter: str,
) -> Iterable[Dict[str, Any]]:
"""List objects in an S3 bucket."""
assert repeat > 0
Expand All @@ -129,21 +185,23 @@ async def list_objects_async(
f"{click.style(s3_url, fg='green', bold=True)}"
)
click.echo(
f"{click.style('max_depth: ', fg='green')}"
f"{click.style(str(max_depth), fg='green', bold=True)}, "
f"{click.style('max_level: ', fg='green')}"
f"{click.style(str(max_level), fg='green', bold=True)}, "
f"{click.style('max_folders: ', fg='green')}"
f"{click.style(str(max_folders), fg='green', bold=True)}, "
f"{click.style(str(repeat), fg='green', bold=True)}"
f"{click.style(' times.', fg='green')}"
)
bucket, key = s3_url[len(S3PROTO) :].split("/", 1)
s3_list = S3BucketObjects(bucket)
s3_list = S3BucketObjects(bucket, parallelism=parallelism)

total_time = 0.0
for _ in range(repeat):
start_time = time.time()
try:
result = await s3_list.list(key, max_depth=max_depth, max_folders=max_folders)
result = await s3_list.list(
key, max_level=max_level, max_folders=max_folders, delimiter=delimiter
)
except botocore.exceptions.ClientError as exc:
error(f"Error: {exc}")
end_time = time.time()
Expand Down
42 changes: 28 additions & 14 deletions src/async_s3/s3_bucket_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from async_s3.group_by_prefix import group_by_prefix


MAX_CONCURRENT_TASKS = 100
DEFAULT_PARALLELISM = 100


@functools.lru_cache()
Expand All @@ -28,17 +28,23 @@ def get_s3_client() -> aiobotocore.client.AioBaseClient:


class S3BucketObjects:
def __init__(self, bucket: str) -> None:
def __init__(self, bucket: str, *, parallelism: int = DEFAULT_PARALLELISM) -> None:
"""Initialize the S3BucketObjects object.
bucket: The name of the S3 bucket.
parallelism: The maximum number of concurrent requests to AWS S3.
"""
self._bucket = bucket
self.semaphore = asyncio.Semaphore(MAX_CONCURRENT_TASKS)
self.semaphore = asyncio.Semaphore(parallelism)

async def _list_objects( # pylint: disable=too-many-arguments, too-many-locals
self,
s3_client: aiobotocore.client.AioBaseClient,
prefix: str,
current_depth: int,
max_depth: Optional[int],
max_level: Optional[int],
max_folders: Optional[int],
delimiter: str,
objects_keys: Set[str],
queue: asyncio.Queue[List[Dict[str, Any]]],
active_tasks: Set[asyncio.Task[None]],
Expand All @@ -48,15 +54,15 @@ async def _list_objects( # pylint: disable=too-many-arguments, too-many-locals
prefixes = []

params = {"Bucket": self._bucket, "Prefix": prefix}
if (current_depth != -1) and (max_depth is None or current_depth < max_depth):
params["Delimiter"] = "/"
if (current_depth != -1) and (max_level is None or current_depth < max_level):
params["Delimiter"] = delimiter

async for page in paginator.paginate(**params):
objects = page.get("Contents", [])
new_keys = {
obj["Key"]
for obj in objects
if not obj["Key"].endswith("/") and obj["Key"] not in objects_keys
if not obj["Key"].endswith(delimiter) and obj["Key"] not in objects_keys
}
cleared_objects = [obj for obj in objects if obj["Key"] in new_keys]
objects_keys.update(new_keys)
Expand All @@ -78,8 +84,9 @@ async def _list_objects( # pylint: disable=too-many-arguments, too-many-locals
s3_client,
folder,
level,
max_depth,
max_level,
max_folders,
delimiter,
objects_keys,
queue,
active_tasks,
Expand All @@ -95,17 +102,19 @@ async def iter(
self,
prefix: str = "/",
*,
max_depth: Optional[int] = None,
max_level: Optional[int] = None,
max_folders: Optional[int] = None,
delimiter: str = "/",
) -> AsyncIterator[List[Dict[str, Any]]]:
"""Generator that yields objects in the bucket with the given prefix.
Yield objects by partial chunks (list of AWS S3 object dicts) as they are collected from AWS asynchronously.
max_depth: The maximum folders depth to traverse in separate requests. If None, traverse all levels.
max_level: The maximum folders depth to traverse in separate requests. If None, traverse all levels.
max_folders: The maximum number of folders to load in separate requests. If None, requests all folders.
Otherwise, the folders are grouped by prefixes before loading in separate requests.
Try to group in the given number of folders if possible.
delimiter: The delimiter for "folders".
"""
# if we group by prefixes, some objects may be listed multiple times
# to avoid this, we store the keys of the objects already listed
Expand All @@ -125,8 +134,9 @@ async def iter(
s3_client,
prefix,
0,
max_depth,
max_level,
max_folders,
delimiter,
objects_keys,
queue,
active_tasks,
Expand Down Expand Up @@ -165,17 +175,21 @@ async def list(
self,
prefix: str = "/",
*,
max_depth: Optional[int] = None,
max_level: Optional[int] = None,
max_folders: Optional[int] = None,
delimiter: str = "/",
) -> List[Dict[str, Any]]:
"""List all objects in the bucket with the given prefix.
max_depth: The maximum folders depth to traverse in separate requests. If None, traverse all levels.
max_level: The maximum folders depth to traverse in separate requests. If None, traverse all levels.
max_folders: The maximum number of folders to load in separate requests. If None, requests all folders.
Otherwise, the folders are grouped by prefixes before loading in separate requests.
Try to group to the given `max_folders` if possible.
delimiter: The delimiter for "folders".
"""
objects = []
async for objects_page in self.iter(prefix, max_depth=max_depth, max_folders=max_folders):
async for objects_page in self.iter(
prefix, max_level=max_level, max_folders=max_folders, delimiter=delimiter
):
objects.extend(objects_page)
return objects
16 changes: 10 additions & 6 deletions tests/test_as3.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,16 @@ async def test_list_objects_async():
instance.list = AsyncMock(return_value=mock_result)

s3_url = "s3://bucket/key"
max_depth = 1
max_level = 1
max_folders = 1
repeat = 1
parallelism = 100
delimiter = '/'

result = await list_objects_async(s3_url, max_depth, max_folders, repeat)
result = await list_objects_async(s3_url, max_level, max_folders, repeat, parallelism, delimiter)

assert result == mock_result
instance.list.assert_awaited_once_with('key', max_depth=max_depth, max_folders=max_folders)
instance.list.assert_awaited_once_with('key', max_level=max_level, max_folders=max_folders, delimiter=delimiter)

@pytest.mark.asyncio
async def test_list_objects_async_repeat():
Expand All @@ -101,12 +103,14 @@ async def test_list_objects_async_repeat():
instance.list = AsyncMock(return_value=mock_result)

s3_url = "s3://bucket/key"
max_depth = 1
max_level = 1
max_folders = 1
repeat = 3
parallelism = 100
delimiter = '/'

result = await list_objects_async(s3_url, max_depth, max_folders, repeat)
result = await list_objects_async(s3_url, max_level, max_folders, repeat, parallelism, delimiter)

assert result == mock_result
assert instance.list.call_count == repeat
instance.list.assert_awaited_with('key', max_depth=max_depth, max_folders=max_folders)
instance.list.assert_awaited_with('key', max_level=max_level, max_folders=max_folders, delimiter=delimiter)
Loading

0 comments on commit df4b69d

Please sign in to comment.