diff --git a/src/async_s3/main.py b/src/async_s3/main.py index 045ca84..6f8cd8e 100644 --- a/src/async_s3/main.py +++ b/src/async_s3/main.py @@ -2,7 +2,7 @@ import asyncio import time -from typing import Iterable, Dict, Any, Optional, Callable +from typing import Iterable, Dict, Any, Optional, Callable, Tuple import rich_click as click from rich.progress import Progress, TextColumn, BarColumn, TaskProgressColumn import botocore.exceptions @@ -27,7 +27,7 @@ def print_summary(objects: Iterable[Dict[str, Any]]) -> None: total_size = sum(obj["Size"] for obj in objects) message = ( f"{click.style('Total objects: ', fg='yellow')}" - f"{click.style(str(len(list(objects))), fg='yellow', bold=True)}, " + f"{click.style(f'{len(list(objects)):,}', fg='yellow', bold=True)}, " f"{click.style('size: ', fg='yellow')}" f"{click.style(human_readable_size(total_size), fg='yellow', bold=True)}" ) @@ -75,12 +75,20 @@ def list_objects_options(func: Callable[[Any], None]) -> Callable[[Any], None]: "--delimiter", "-d", type=str, + callback=validate_delimiter, default="/", help="Delimiter for 'folders'. Default is '/'.", )(func) return func +def validate_delimiter(ctx: click.Context, param: click.Parameter, value: str) -> str: # pylint: disable=unused-argument + """Validate the `Delimiter` option.""" + if len(value) != 1: + raise click.BadParameter("Delimiter must be exactly one character.") + return value + + @list_objects_options @as3.command() def ls( # pylint: disable=too-many-arguments @@ -172,7 +180,7 @@ def list_objects( # pylint: disable=too-many-arguments ) -async def list_objects_async( # pylint: disable=too-many-arguments, too-many-locals +async def list_objects_async( # pylint: disable=too-many-arguments s3_url: str, max_level: Optional[int], max_folders: Optional[int], @@ -182,6 +190,35 @@ async def list_objects_async( # pylint: disable=too-many-arguments, too-many-lo ) -> Iterable[Dict[str, Any]]: """List objects in an S3 bucket.""" assert repeat > 0 + print_start_info(s3_url, max_level, max_folders, delimiter, parallelism, repeat) + + bucket, key = split_s3_url(s3_url) + s3_list = S3BucketObjects(bucket, parallelism=parallelism) + total_time = 0.0 + result: Iterable[Dict[str, Any]] = [] + + for attempt in range(repeat): + result, duration = await list_objects_with_progress( + s3_list, key, max_level, max_folders, delimiter + ) + total_time += duration + print_attempt_info(attempt, duration) + + if repeat > 1: + print_average_time(total_time, repeat) + + return result + + +def print_start_info( # pylint: disable=too-many-arguments + s3_url: str, + max_level: Optional[int], + max_folders: Optional[int], + delimiter: str, + parallelism: int, + repeat: int, +) -> None: + """Print the command parameters.""" click.echo( f"{click.style('Listing objects in ', fg='yellow')}" f"{click.style(s3_url, fg='yellow', bold=True)}" @@ -198,54 +235,75 @@ async def list_objects_async( # pylint: disable=too-many-arguments, too-many-lo f"{click.style(str(repeat), fg='yellow', bold=True)}" f"{click.style(' times.', fg='yellow')}" ) - bucket, key = s3_url[len(S3PROTO) :].split("/", 1) - s3_list = S3BucketObjects(bucket, parallelism=parallelism) - total_time = 0.0 - for attempt in range(repeat): - start_time = time.time() - try: - result = [] - total_size = 0 - last_update_time = start_time - PROGRESS_REFRESH_INTERVAL - with Progress( - TextColumn("[progress.description]{task.description}{task.completed:>,}"), - BarColumn(), - TaskProgressColumn(), - transient=True, - ) as progress: - objects_bar = progress.add_task("[green]Objects: ", total=None) - size_bar = progress.add_task("[green]Size: ", total=None) - async for objects_page in s3_list.iter( - key, max_level=max_level, max_folders=max_folders, delimiter=delimiter - ): - result.extend(objects_page) - page_size = sum(obj["Size"] for obj in objects_page) - total_size += page_size - current_time = time.time() - if current_time - last_update_time >= PROGRESS_REFRESH_INTERVAL: - progress.update(objects_bar, advance=len(objects_page)) - progress.update(size_bar, advance=page_size) - last_update_time = current_time - progress.remove_task(objects_bar) - progress.remove_task(size_bar) - except botocore.exceptions.ClientError as exc: - error(f"Error: {exc}") - end_time = time.time() - duration = end_time - start_time - click.echo( - f"{click.style(f'({attempt + 1}) Elapsed time: ', fg='green')}" - f"{click.style(f'{duration:.2f}', fg='green', bold=True)} " - f"{click.style('seconds', fg='green')}" - ) - total_time += duration - if repeat > 1: - click.echo( - f"{click.style('Average time: ', fg='yellow')}" - f"{click.style(f'{total_time / repeat:.2f}', fg='yellow', bold=True)} " - f"{click.style('seconds', fg='yellow')}" - ) - return result + +def split_s3_url(s3_url: str) -> Iterable[str]: + """Split an S3 URL into bucket and key.""" + return s3_url[len(S3PROTO) :].split("/", 1) + + +async def list_objects_with_progress( # pylint: disable=too-many-locals + s3_list: S3BucketObjects, + key: str, + max_level: Optional[int], + max_folders: Optional[int], + delimiter: str, +) -> Tuple[Iterable[Dict[str, Any]], float]: + """List objects in an S3 bucket with a progress bar. + + Returns: + (The objects, the elapsed time) + """ + start_time = time.time() + result = [] + total_size = 0 + last_update_time = start_time - PROGRESS_REFRESH_INTERVAL + + try: + with Progress( + TextColumn("[progress.description]{task.description}{task.completed:>,}"), + BarColumn(), + TaskProgressColumn(), + transient=True, + ) as progress: + objects_bar = progress.add_task("[green]Objects: ", total=None) + size_bar = progress.add_task("[green]Size: ", total=None) + async for objects_page in s3_list.iter( + key, max_level=max_level, max_folders=max_folders, delimiter=delimiter + ): + result.extend(objects_page) + total_size += sum(obj["Size"] for obj in objects_page) + current_time = time.time() + if current_time - last_update_time >= PROGRESS_REFRESH_INTERVAL: + progress.update(objects_bar, advance=len(objects_page)) + progress.update(size_bar, advance=total_size) + last_update_time = current_time + progress.remove_task(objects_bar) + progress.remove_task(size_bar) + except botocore.exceptions.ClientError as exc: + error(f"Error: {exc}") + + end_time = time.time() + duration = end_time - start_time + return result, duration + + +def print_attempt_info(attempt: int, duration: float) -> None: + """Print the elapsed time for an attempt.""" + click.echo( + f"{click.style(f'({attempt + 1}) Elapsed time: ', fg='green')}" + f"{click.style(f'{duration:.2f}', fg='green', bold=True)} " + f"{click.style('seconds', fg='green')}" + ) + + +def print_average_time(total_time: float, repeat: int) -> None: + """Print the average elapsed time.""" + click.echo( + f"{click.style('Average time: ', fg='yellow')}" + f"{click.style(f'{total_time / repeat:.2f}', fg='yellow', bold=True)} " + f"{click.style('seconds', fg='yellow')}" + ) if __name__ == "__main__": # pragma: no cover