Skip to content

Commit

Permalink
https://github.com/andgineer/async-s3/issues/16
Browse files Browse the repository at this point in the history
refactored spaghetti code
  • Loading branch information
andgineer committed Jun 16, 2024
1 parent 739782e commit 935da90
Showing 1 changed file with 108 additions and 50 deletions.
158 changes: 108 additions & 50 deletions src/async_s3/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import asyncio
import time
from typing import Iterable, Dict, Any, Optional, Callable
from typing import Iterable, Dict, Any, Optional, Callable, Tuple
import rich_click as click
from rich.progress import Progress, TextColumn, BarColumn, TaskProgressColumn
import botocore.exceptions
Expand All @@ -27,7 +27,7 @@ def print_summary(objects: Iterable[Dict[str, Any]]) -> None:
total_size = sum(obj["Size"] for obj in objects)
message = (
f"{click.style('Total objects: ', fg='yellow')}"
f"{click.style(str(len(list(objects))), fg='yellow', bold=True)}, "
f"{click.style(f'{len(list(objects)):,}', fg='yellow', bold=True)}, "
f"{click.style('size: ', fg='yellow')}"
f"{click.style(human_readable_size(total_size), fg='yellow', bold=True)}"
)
Expand Down Expand Up @@ -75,12 +75,20 @@ def list_objects_options(func: Callable[[Any], None]) -> Callable[[Any], None]:
"--delimiter",
"-d",
type=str,
callback=validate_delimiter,
default="/",
help="Delimiter for 'folders'. Default is '/'.",
)(func)
return func


def validate_delimiter(ctx: click.Context, param: click.Parameter, value: str) -> str: # pylint: disable=unused-argument
"""Validate the `Delimiter` option."""
if len(value) != 1:
raise click.BadParameter("Delimiter must be exactly one character.")
return value


@list_objects_options
@as3.command()
def ls( # pylint: disable=too-many-arguments
Expand Down Expand Up @@ -172,7 +180,7 @@ def list_objects( # pylint: disable=too-many-arguments
)


async def list_objects_async( # pylint: disable=too-many-arguments, too-many-locals
async def list_objects_async( # pylint: disable=too-many-arguments
s3_url: str,
max_level: Optional[int],
max_folders: Optional[int],
Expand All @@ -182,6 +190,35 @@ async def list_objects_async( # pylint: disable=too-many-arguments, too-many-lo
) -> Iterable[Dict[str, Any]]:
"""List objects in an S3 bucket."""
assert repeat > 0
print_start_info(s3_url, max_level, max_folders, delimiter, parallelism, repeat)

bucket, key = split_s3_url(s3_url)
s3_list = S3BucketObjects(bucket, parallelism=parallelism)
total_time = 0.0
result: Iterable[Dict[str, Any]] = []

for attempt in range(repeat):
result, duration = await list_objects_with_progress(
s3_list, key, max_level, max_folders, delimiter
)
total_time += duration
print_attempt_info(attempt, duration)

if repeat > 1:
print_average_time(total_time, repeat)

return result


def print_start_info( # pylint: disable=too-many-arguments
s3_url: str,
max_level: Optional[int],
max_folders: Optional[int],
delimiter: str,
parallelism: int,
repeat: int,
) -> None:
"""Print the command parameters."""
click.echo(
f"{click.style('Listing objects in ', fg='yellow')}"
f"{click.style(s3_url, fg='yellow', bold=True)}"
Expand All @@ -198,54 +235,75 @@ async def list_objects_async( # pylint: disable=too-many-arguments, too-many-lo
f"{click.style(str(repeat), fg='yellow', bold=True)}"
f"{click.style(' times.', fg='yellow')}"
)
bucket, key = s3_url[len(S3PROTO) :].split("/", 1)
s3_list = S3BucketObjects(bucket, parallelism=parallelism)

total_time = 0.0
for attempt in range(repeat):
start_time = time.time()
try:
result = []
total_size = 0
last_update_time = start_time - PROGRESS_REFRESH_INTERVAL
with Progress(
TextColumn("[progress.description]{task.description}{task.completed:>,}"),
BarColumn(),
TaskProgressColumn(),
transient=True,
) as progress:
objects_bar = progress.add_task("[green]Objects: ", total=None)
size_bar = progress.add_task("[green]Size: ", total=None)
async for objects_page in s3_list.iter(
key, max_level=max_level, max_folders=max_folders, delimiter=delimiter
):
result.extend(objects_page)
page_size = sum(obj["Size"] for obj in objects_page)
total_size += page_size
current_time = time.time()
if current_time - last_update_time >= PROGRESS_REFRESH_INTERVAL:
progress.update(objects_bar, advance=len(objects_page))
progress.update(size_bar, advance=page_size)
last_update_time = current_time
progress.remove_task(objects_bar)
progress.remove_task(size_bar)
except botocore.exceptions.ClientError as exc:
error(f"Error: {exc}")
end_time = time.time()
duration = end_time - start_time
click.echo(
f"{click.style(f'({attempt + 1}) Elapsed time: ', fg='green')}"
f"{click.style(f'{duration:.2f}', fg='green', bold=True)} "
f"{click.style('seconds', fg='green')}"
)
total_time += duration
if repeat > 1:
click.echo(
f"{click.style('Average time: ', fg='yellow')}"
f"{click.style(f'{total_time / repeat:.2f}', fg='yellow', bold=True)} "
f"{click.style('seconds', fg='yellow')}"
)
return result

def split_s3_url(s3_url: str) -> Iterable[str]:
"""Split an S3 URL into bucket and key."""
return s3_url[len(S3PROTO) :].split("/", 1)


async def list_objects_with_progress( # pylint: disable=too-many-locals
s3_list: S3BucketObjects,
key: str,
max_level: Optional[int],
max_folders: Optional[int],
delimiter: str,
) -> Tuple[Iterable[Dict[str, Any]], float]:
"""List objects in an S3 bucket with a progress bar.
Returns:
(The objects, the elapsed time)
"""
start_time = time.time()
result = []
total_size = 0
last_update_time = start_time - PROGRESS_REFRESH_INTERVAL

try:
with Progress(
TextColumn("[progress.description]{task.description}{task.completed:>,}"),
BarColumn(),
TaskProgressColumn(),
transient=True,
) as progress:
objects_bar = progress.add_task("[green]Objects: ", total=None)
size_bar = progress.add_task("[green]Size: ", total=None)
async for objects_page in s3_list.iter(
key, max_level=max_level, max_folders=max_folders, delimiter=delimiter
):
result.extend(objects_page)
total_size += sum(obj["Size"] for obj in objects_page)
current_time = time.time()
if current_time - last_update_time >= PROGRESS_REFRESH_INTERVAL:
progress.update(objects_bar, advance=len(objects_page))
progress.update(size_bar, advance=total_size)
last_update_time = current_time
progress.remove_task(objects_bar)
progress.remove_task(size_bar)
except botocore.exceptions.ClientError as exc:
error(f"Error: {exc}")

end_time = time.time()
duration = end_time - start_time
return result, duration


def print_attempt_info(attempt: int, duration: float) -> None:
"""Print the elapsed time for an attempt."""
click.echo(
f"{click.style(f'({attempt + 1}) Elapsed time: ', fg='green')}"
f"{click.style(f'{duration:.2f}', fg='green', bold=True)} "
f"{click.style('seconds', fg='green')}"
)


def print_average_time(total_time: float, repeat: int) -> None:
"""Print the average elapsed time."""
click.echo(
f"{click.style('Average time: ', fg='yellow')}"
f"{click.style(f'{total_time / repeat:.2f}', fg='yellow', bold=True)} "
f"{click.style('seconds', fg='yellow')}"
)


if __name__ == "__main__": # pragma: no cover
Expand Down

0 comments on commit 935da90

Please sign in to comment.