Skip to content

Commit

Permalink
https://github.com/andgineer/async-s3/issues/15
Browse files Browse the repository at this point in the history
Optimize buzy loop
renamed objects
  • Loading branch information
andgineer committed Jun 15, 2024
1 parent e8e79e3 commit 580ca3d
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 59 deletions.
7 changes: 2 additions & 5 deletions docs/includes/list.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import asyncio
from async_s3 import ListObjectsAsync
from async_s3 import S3BucketObjects


async def main():
bucket_name = "your-bucket-name"
prefix = "your-prefix/"
list_objects = ListObjectsAsync(bucket_name)
objects = await list_objects.list_objects(prefix=prefix, max_depth=2, max_folders=20)
objects = await S3BucketObjects("my-bucket").list("my-prefix/", max_depth=2, max_folders=20)

for obj in objects:
print(obj["Key"])
Expand Down
15 changes: 13 additions & 2 deletions docs/src/en/index.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
# ListObjectsAsync
# S3BucketObjects

## Features
- Utilizes aiobotocore for non-blocking IO operations.
- Supports recursive listing of "folders" with controllable depth control.
- Groups "folders" by prefixes to reduce the number of API calls.
- Handles S3 pagination to provide a efficient traversal of long S3 objects lists.
- Utilize AWS retry strategies.
- Allows processing objects as they are collected from AWS.

Read detailed explanation in [the blog post](https://sorokin.engineer/posts/en/aws_s3_async_list.html).

## Usage

### Simple
```python
--8<-- "list.py"
```
Expand All @@ -19,6 +21,15 @@ by default depth is not limited.

`max_folders` parameter allows you to group folders by prefix to reduce the number of API calls.

### Full async potential
Process objects asynchronously while gathering the objects list from AWS.

```python
async for page in S3BucketObjects(bucket='my-bucket').iter("my-prefix/", max_depth=2, max_folders=10):
for obj in page:
print(obj['Key'])
```

## Docstrings
[ListObjectsAsync][async_s3]
[S3BucketObjects][async_s3]

17 changes: 15 additions & 2 deletions docs/src/ru/index.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# ListObjectsAsync
# S3BucketObjects

## Возможности

Expand All @@ -7,17 +7,30 @@
- Группирует "каталоги" по префиксам для уменьшения количества API вызовов.
- Скачивает результат по страницам для обеспечения эффективного обхода длинных списков объектов.
- Использует стратегии повторных попыток AWS.
- Позволяет обрабатывать объекты по мере их получения из AWS.

Подробнее о логике работы рассказано в [статье](https://sorokin.engineer/posts/ru/aws_s3_async_list.html).

## Использование

### Простой пример

```python
--8<-- "list.py"
```
Вы можете контролировать глубину рекурсии, указывая параметр max_depth, по умолчанию глубина не ограничена.

С помощью `max_folders` вы можете группировать каталоги по префиксам для уменьшения количества API вызовов.

### Полный потенциал async

Обрабатывайте объекты асинхронно, одновременно с получением списка объектов из AWS.

```python
async for page in S3BucketObjects(bucket='my-bucket').iter("my-prefix/", max_depth=2, max_folders=10):
for obj in page:
print(obj['Key'])
```

## Сгенерированная из исходников документация
[ListObjectsAsync][async_s3]
[S3BucketObjects][async_s3]
4 changes: 2 additions & 2 deletions src/async_s3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
"""

from async_s3.__about__ import __version__
from async_s3.list_objects_async import ListObjectsAsync
from async_s3.s3_bucket_objects import S3BucketObjects

__all__ = ["__version__", "ListObjectsAsync"]
__all__ = ["__version__", "S3BucketObjects"]
6 changes: 3 additions & 3 deletions src/async_s3/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Iterable, Dict, Any, Optional, Callable
import rich_click as click
import botocore.exceptions
from async_s3.list_objects_async import ListObjectsAsync
from async_s3.s3_bucket_objects import S3BucketObjects
from async_s3 import __version__


Expand Down Expand Up @@ -137,13 +137,13 @@ async def list_objects_async(
f"{click.style(' times.', fg='green')}"
)
bucket, key = s3_url[len(S3PROTO) :].split("/", 1)
s3_list = ListObjectsAsync(bucket)
s3_list = S3BucketObjects(bucket)

total_time = 0.0
for _ in range(repeat):
start_time = time.time()
try:
result = await s3_list.list_objects(key, max_depth=max_depth, max_folders=max_folders)
result = await s3_list.list(key, max_depth=max_depth, max_folders=max_folders)
except botocore.exceptions.ClientError as exc:
error(f"Error: {exc}")
end_time = time.time()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def get_s3_client() -> aiobotocore.client.AioBaseClient:
return session.create_client("s3", config=config)


class ListObjectsAsync:
class S3BucketObjects:
def __init__(self, bucket: str) -> None:
self._bucket = bucket
self.semaphore = asyncio.Semaphore(MAX_CONCURRENT_TASKS)
Expand Down Expand Up @@ -85,18 +85,22 @@ async def _list_objects( # pylint: disable=too-many-arguments, too-many-locals
active_tasks,
)
)
active_tasks.add(task)
task.add_done_callback(lambda t: self._task_done(t, active_tasks))
except Exception as e:
self.semaphore.release()
raise e
active_tasks.add(task)
task.add_done_callback(lambda t: self._task_done(t, active_tasks, queue))

async def pages(
self, prefix: str = "/", max_depth: Optional[int] = None, max_folders: Optional[int] = None
async def iter(
self,
prefix: str = "/",
*,
max_depth: Optional[int] = None,
max_folders: Optional[int] = None,
) -> AsyncIterator[List[Dict[str, Any]]]:
"""Generator that yields objects in the bucket with the given prefix.
Yield objects by separate pages (list of AWS S3 object dicts).
Yield objects by partial chunks (list of AWS S3 object dicts) as they are collected from AWS asynchronously.
max_depth: The maximum folders depth to traverse in separate requests. If None, traverse all levels.
max_folders: The maximum number of folders to load in separate requests. If None, requests all folders.
Expand All @@ -114,30 +118,55 @@ async def pages(
active_tasks: Set[asyncio.Task[None]] = set()

async with get_s3_client() as s3_client:
root_task = asyncio.create_task(
self._list_objects(
s3_client, prefix, 0, max_depth, max_folders, objects_keys, queue, active_tasks
await self.semaphore.acquire()
try:
root_task = asyncio.create_task(
self._list_objects(
s3_client,
prefix,
0,
max_depth,
max_folders,
objects_keys,
queue,
active_tasks,
)
)
)
except Exception as e:
self.semaphore.release()
raise e
active_tasks.add(root_task)
root_task.add_done_callback(lambda t: self._task_done(t, active_tasks))
root_task.add_done_callback(lambda t: self._task_done(t, active_tasks, queue))

while active_tasks:
try:
yield queue.get_nowait()
page = await queue.get()
if page:
yield page
except asyncio.QueueEmpty:
await asyncio.sleep(0)

if active_tasks:
await asyncio.gather(*active_tasks)

def _task_done(self, task: asyncio.Task[None], active_tasks: Set[asyncio.Task[None]]) -> None:
def _task_done(
self,
task: asyncio.Task[None],
active_tasks: Set[asyncio.Task[None]],
queue: asyncio.Queue[List[Dict[str, Any]]],
) -> None:
"""Callback for when a task is done."""
active_tasks.discard(task)
self.semaphore.release()

async def list_objects(
self, prefix: str = "/", max_depth: Optional[int] = None, max_folders: Optional[int] = None
async def async_task_done() -> None:
active_tasks.discard(task)
self.semaphore.release()
await queue.put([]) # signal that the task is done

asyncio.create_task(async_task_done())

async def list(
self,
prefix: str = "/",
*,
max_depth: Optional[int] = None,
max_folders: Optional[int] = None,
) -> List[Dict[str, Any]]:
"""List all objects in the bucket with the given prefix.
Expand All @@ -147,6 +176,6 @@ async def list_objects(
Try to group to the given `max_folders` if possible.
"""
objects = []
async for objects_page in self.pages(prefix, max_depth, max_folders):
async for objects_page in self.iter(prefix, max_depth=max_depth, max_folders=max_folders):
objects.extend(objects_page)
return objects
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import boto3
import aiobotocore
import aiobotocore.session
from async_s3 import ListObjectsAsync
from async_s3 import S3BucketObjects


@pytest.fixture(scope="session", autouse=True)
Expand Down Expand Up @@ -161,6 +161,6 @@ def s3_client_proxy(fake_s3_server, monkeypatch):
real_client = asyncio.run(s3_async_client_factory().__aenter__())
mock_client = MockS3Client(real_client)

monkeypatch.setattr("async_s3.list_objects_async.get_s3_client", lambda: mock_client)
monkeypatch.setattr("async_s3.s3_bucket_objects.get_s3_client", lambda: mock_client)
yield mock_client
asyncio.run(real_client.__aexit__(None, None, None))
18 changes: 9 additions & 9 deletions tests/test_as3.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ async def test_list_objects_async():
{"Key": "file2.txt", "Size": 5678},
]

with patch('async_s3.main.ListObjectsAsync') as MockListObjectsAsync:
instance = MockListObjectsAsync.return_value
instance.list_objects = AsyncMock(return_value=mock_result)
with patch('async_s3.main.S3BucketObjects') as MockS3BucketObjects:
instance = MockS3BucketObjects.return_value
instance.list = AsyncMock(return_value=mock_result)

s3_url = "s3://bucket/key"
max_depth = 1
Expand All @@ -87,7 +87,7 @@ async def test_list_objects_async():
result = await list_objects_async(s3_url, max_depth, max_folders, repeat)

assert result == mock_result
instance.list_objects.assert_awaited_once_with('key', max_depth=max_depth, max_folders=max_folders)
instance.list.assert_awaited_once_with('key', max_depth=max_depth, max_folders=max_folders)

@pytest.mark.asyncio
async def test_list_objects_async_repeat():
Expand All @@ -96,9 +96,9 @@ async def test_list_objects_async_repeat():
{"Key": "file2.txt", "Size": 5678},
]

with patch('async_s3.main.ListObjectsAsync') as MockListObjectsAsync:
instance = MockListObjectsAsync.return_value
instance.list_objects = AsyncMock(return_value=mock_result)
with patch('async_s3.main.S3BucketObjects') as MockS3BucketObjects:
instance = MockS3BucketObjects.return_value
instance.list = AsyncMock(return_value=mock_result)

s3_url = "s3://bucket/key"
max_depth = 1
Expand All @@ -108,5 +108,5 @@ async def test_list_objects_async_repeat():
result = await list_objects_async(s3_url, max_depth, max_folders, repeat)

assert result == mock_result
assert instance.list_objects.call_count == repeat
instance.list_objects.assert_awaited_with('key', max_depth=max_depth, max_folders=max_folders)
assert instance.list.call_count == repeat
instance.list.assert_awaited_with('key', max_depth=max_depth, max_folders=max_folders)
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
from unittest.mock import call

import pytest
from async_s3 import ListObjectsAsync
from async_s3 import S3BucketObjects


@pytest.mark.asyncio
@pytest.mark.parametrize("mock_s3_structure", [
{
"bucket_structure_file": "bucket_keys.yml",
"get_s3_client_function": "async_s3.list_objects_async.get_s3_client"
"get_s3_client_function": "async_s3.s3_bucket_objects.get_s3_client"
}
], indirect=True)
async def test_list_objects_functional(mock_s3_structure):
walker = ListObjectsAsync("mock-bucket")
keys = sorted([object["Key"] for object in await walker.list_objects(prefix="root/")])
async def test_s3_bucket_objects_functional(mock_s3_structure):
walker = S3BucketObjects("mock-bucket")
keys = sorted([object["Key"] for object in await walker.list(prefix="root/")])

expected_keys = sorted([
'root/data01/image01.png',
Expand All @@ -37,14 +37,14 @@ async def test_list_objects_functional(mock_s3_structure):
@pytest.mark.parametrize("mock_s3_structure", [
{
"bucket_structure_file": "bucket_keys.yml",
"get_s3_client_function": "async_s3.list_objects_async.get_s3_client",
"get_s3_client_function": "async_s3.s3_bucket_objects.get_s3_client",
"bucket_name": "mock-bucket"
}
], indirect=True)
async def test_list_objects_with_max_depth(s3_client_proxy, mock_s3_structure):
walker = ListObjectsAsync("mock-bucket")
async def test_s3_bucket_objects_with_max_depth(s3_client_proxy, mock_s3_structure):
walker = S3BucketObjects("mock-bucket")

objects = await walker.list_objects(prefix="root/", max_depth=2)
objects = await walker.list(prefix="root/", max_depth=2)
expected_keys = {
'root/data01/image01.png',
'root/data01/images/img11.jpg',
Expand Down Expand Up @@ -99,14 +99,14 @@ async def test_list_objects_with_max_depth(s3_client_proxy, mock_s3_structure):
@pytest.mark.parametrize("mock_s3_structure", [
{
"bucket_structure_file": "bucket_keys.yml",
"get_s3_client_function": "async_s3.list_objects_async.get_s3_client",
"get_s3_client_function": "async_s3.s3_bucket_objects.get_s3_client",
"bucket_name": "mock-bucket"
}
], indirect=True)
async def test_list_objects_with_max_folders(s3_client_proxy, mock_s3_structure):
walker = ListObjectsAsync("mock-bucket")
async def test_s3_bucket_objects_with_max_folders(s3_client_proxy, mock_s3_structure):
walker = S3BucketObjects("mock-bucket")

objects = await walker.list_objects(prefix="root/", max_folders=2)
objects = await walker.list(prefix="root/", max_folders=2)
expected_keys = {
'root/data01/image01.png',
'root/data01/images/img11.jpg',
Expand Down

0 comments on commit 580ca3d

Please sign in to comment.