diff --git a/src/aoc/aoc2024/day_03.py b/src/aoc/aoc2024/day_03.py index 4131d14..708f57b 100644 --- a/src/aoc/aoc2024/day_03.py +++ b/src/aoc/aoc2024/day_03.py @@ -42,4 +42,4 @@ def main(txt: str) -> None: if __name__ == "__main__": aoc = Aoc(day=get_day(), years=YEAR) - aoc.run(main, submit=True, part="both", readme_update=True) + aoc.run(main, submit=False, part="both", readme_update=True, profile=True) diff --git a/src/aoc/aoc_helper.py b/src/aoc/aoc_helper.py index d72db58..2ddc1c7 100644 --- a/src/aoc/aoc_helper.py +++ b/src/aoc/aoc_helper.py @@ -1,8 +1,7 @@ import cProfile import importlib -import inspect -import logging import os +import pstats from dataclasses import dataclass from datetime import datetime from pathlib import Path @@ -14,43 +13,144 @@ import psutil from aocd import get_data, submit from aocd.models import Puzzle -from icecream import ic +from rich import box from rich.console import Console -from rich.live import Live +from rich.layout import Layout from rich.panel import Panel -from rich.progress import Progress, SpinnerColumn, TextColumn +from rich.progress import track from rich.table import Table -from rich.text import Text -from rich.traceback import install - -from src.aoc import solve PROJECT_ROOT = Path(__file__).parent.parent.parent - -# Setup rich console and error handling -install() console = Console() +def format_memory(bytes_value: float) -> str: + """Convert bytes to human readable format with appropriate unit""" + for unit in ["B", "KB", "MB", "GB", "TB"]: + if bytes_value < 1024.0: + return f"{bytes_value:.2f} {unit}" + bytes_value /= 1024.0 + return f"{bytes_value:.2f} PB" + + +@dataclass +class ComplexityResult: + """Results from complexity analysis""" + + time_complexity: str + r_squared: float + fit_graph: Optional[Any] = None + + @dataclass class PerformanceMetrics: - """Stores performance metrics for a solution.""" + """Performance metrics for a solution""" execution_time_us: float # Execution time in microseconds - time_std_us: float # Standard deviation of execution time - memory_mb: float # Memory usage in megabytes - cpu_percent: float # CPU usage percentage - complexity: str # Algorithmic complexity estimation - result: Any # Solution result + time_std_us: float # Standard deviation + memory_bytes: float # Memory usage in bytes + memory_peak: float # Peak memory usage in bytes + cpu_percent: float # CPU usage + complexity: Optional[ComplexityResult] = None # Complexity analysis results + result: Any = None # Solution result + profile_stats: Optional[pstats.Stats] = None # Optional profiling stats + + +class ComplexityAnalyzer: + """Analyzes time complexity of functions using big_o library""" + + @staticmethod + def create_data_generator(data: str) -> Callable: + """Creates a data generator function for big_o analysis""" + + def data_generator(n: int) -> str: + if isinstance(data, str): + lines = data.split("\n") + scaled_lines = lines[: max(1, int(len(lines) * n / len(lines)))] + return "\n".join(scaled_lines) + return data[:n] + + return data_generator + + # @staticmethod + # def create_data_generator(data: str): + # """Creates an appropriate data generator based on input type""" + # # Pre-split the lines once + # lines = data.split("\n") if isinstance(data, str) else data + # total_size = len(lines) + # + # def data_generator(n: int) -> str: + # # Calculate how many lines to include based on the ratio n/total_size + # subset_size = max(1, int(n)) + # subset_size = min(subset_size, total_size) # Don't exceed available data + # + # if isinstance(data, str): + # return "\n".join(lines[:subset_size]) + # return lines[:subset_size] + # + # return data_generator, total_size + @staticmethod + def create_aoc_generator(data: str): + """Creates an optimized data generator for AOC problems with big_O analysis""" + lines = data.splitlines() + base_size = len(lines) + + def generator(n: int) -> list: + # Ensure we generate meaningful test data sizes + size = min(max(1, int(n)), base_size) + + # Return data as a list for consistent handling + return lines[:size] + + return generator, base_size + + @staticmethod + def analyze(func: Callable, data: str) -> ComplexityResult: + """Analyze time complexity using big_o library""" + try: + # Create data generator and get total size + data_generator, data_size = ComplexityAnalyzer.create_data_generator(data) + + # Set minimum size to be at least 2 but not more than 10% of data + min_n = max(2, data_size // 10) + + # Use big_o analysis + best_fit, measurements = big_o.big_o( + func, + data_generator=data_generator, + min_n=min_n, + max_n=data_size, + n_repeats=3, + n_timings=3, + ) + # Calculate R-squared if we have measurements + r_squared = 0.0 + if measurements: + try: + ns, times = zip(*measurements) + if len(ns) > 1: + r_squared = np.corrcoef(ns, times)[0, 1] ** 2 + except: + r_squared = 0.0 + + return ComplexityResult(time_complexity=str(best_fit), r_squared=r_squared) + except Exception as e: + console.print(f"[yellow]Warning: Complexity analysis failed: {str(e)}") + return ComplexityResult( + time_complexity="Unable to determine", r_squared=0.0 + ) + except Exception as e: + console.print(f"[yellow]Warning: Complexity analysis failed: {str(e)}") + return ComplexityResult( + time_complexity="Unable to determine", r_squared=0.0 + ) -class Aoc: - """Main class for Advent of Code solution handling and performance analysis.""" +class Aoc: def __init__( self, day: int = int(datetime.now().day), years: int = int(datetime.now().year) ): - """Initialize with specific day and year, defaulting to current date.""" self.day = day self.year = years self.data = get_data(day=self.day, year=self.year) @@ -59,80 +159,69 @@ def __init__( ) self.process = psutil.Process() - def analyze_performance(self, func: Callable, runs: int = 10) -> PerformanceMetrics: - """Analyze performance metrics using actual AoC data.""" - with Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - console=console, - ) as progress: - task = progress.add_task("[cyan]Running performance analysis...", total=3) - - # Time analysis with multiple runs for statistical significance + def analyze_performance( + self, + func: Callable, + runs: int = 10, + with_profile: bool = False, + analyze_complexity: bool = True, + ) -> PerformanceMetrics: + """Run comprehensive performance analysis on a solution function""" times = [] - for _ in range(runs): + start_mem = self.process.memory_info().rss + peak_mem = start_mem + result = None + profile_stats = None + complexity_result = None + + # Analyze complexity if requested + if analyze_complexity: + complexity_result = ComplexityAnalyzer.analyze(func, self.data) + + # Profiling run if requested + if with_profile: + profiler = cProfile.Profile() + profiler.enable() + result = func(self.data) + profiler.disable() + profile_stats = pstats.Stats(profiler) + + # Performance measurement runs + for _ in track(range(runs), description="Running performance analysis..."): + if hasattr(func, "cache"): + func.cache.clear() + + # Track memory before run + pre_mem = self.process.memory_info().rss + + # Time execution start = timer() result = func(self.data) - times.append((timer() - start) * 1e6) # Convert to microseconds - progress.advance(task) + end = timer() + + # Track memory after run + post_mem = self.process.memory_info().rss + peak_mem = max(peak_mem, post_mem) + + times.append((end - start) * 1e6) # Convert to microseconds - # Memory analysis using process resources - start_mem = self.process.memory_info().rss - func(self.data) end_mem = self.process.memory_info().rss - memory_used = (end_mem - start_mem) / (1024 * 1024) # Convert to MB - progress.advance(task) - - # Complexity analysis using input scaling - try: - - def data_generator(n): - if isinstance(self.data, str): - lines = self.data.split("\n") - scaled_lines = lines[: max(1, int(len(lines) * n / len(lines)))] - return "\n".join(scaled_lines) - return self.data[:n] - - # data_generator -- Function returning input data of 'length' N. - # Input data for the argument `func` is created as - # `data_generator(N)`. Common data generators are defined - # in the submodule `big_o.datagen` - # - # min_n, max_n, n_measures -- The execution time of func is measured - # at `n_measures` points between `min_n` and - # `max_n` (included) - # - # n_repeats -- Number of times func is called to compute execution time - # (return the cumulative time of execution) - # - # n_timings -- Number of times the timing measurement is repeated. - # The minimum time for all the measurements is kept. - # - best_fit, _ = big_o.big_o( - func, - data_generator=data_generator, - min_n=len(self.data) // 10, - max_n=len(self.data), - n_repeats=5, - n_timings=5, - ) - complexity = str(best_fit) - except Exception as e: - console.print(f"[yellow]Warning: Complexity analysis failed: {str(e)}") - complexity = "Unable to determine" - progress.advance(task) + memory_used = end_mem - start_mem + peak_memory_used = peak_mem - start_mem return PerformanceMetrics( execution_time_us=np.mean(times), time_std_us=np.std(times), - memory_mb=memory_used, + memory_bytes=memory_used, + memory_peak=peak_memory_used, cpu_percent=self.process.cpu_percent(), - complexity=complexity, + complexity=complexity_result, result=result, + profile_stats=profile_stats, ) def print_metrics(self, metrics: PerformanceMetrics, part: str): - """Display performance metrics in a formatted table.""" + """Display performance metrics in a nicely formatted table""" def format_time(t: float) -> str: if t < 0.000001: @@ -143,23 +232,38 @@ def format_time(t: float) -> str: return f"{t*1000:.1f}ms" return f"{t:.2f}s" - table = Table(show_header=False, box=None) - table.add_row("[bold cyan]Result:", f"[green]{metrics.result}") + # Create metrics table + table = Table(box=box.ROUNDED, title=f"Part {part.upper()} Analysis") + table.add_column("Metric", style="cyan") + table.add_column("Value", style="green") + + # Add execution time with standard deviation table.add_row( - "[bold cyan]Time:", - f"[green]{format_time(metrics.execution_time_us/1e6)} ± {format_time(metrics.time_std_us/1e6)}", + "Execution Time", + f"{format_time(metrics.execution_time_us/1e6)} ± {format_time(metrics.time_std_us/1e6)}", ) - table.add_row("[bold cyan]Memory:", f"[green]{metrics.memory_mb:.2f}MB") - table.add_row("[bold cyan]CPU Usage:", f"[green]{metrics.cpu_percent:.1f}%") - table.add_row("[bold cyan]Complexity:", f"[green]{metrics.complexity}") - panel = Panel( - table, title=f"[bold blue]Part {part.upper()} Analysis", border_style="blue" - ) - console.print(panel) + # Add memory metrics + table.add_row("Memory Usage", format_memory(metrics.memory_bytes)) + table.add_row("Peak Memory", format_memory(metrics.memory_peak)) + table.add_row("CPU Usage", f"{metrics.cpu_percent:.1f}%") + + # Add complexity information if available + if metrics.complexity: + table.add_row("Time Complexity", metrics.complexity.time_complexity) + + table.add_row("Result", str(metrics.result)) + + console.print(table) + + # Print profile stats if available + if metrics.profile_stats: + console.print("\n[bold cyan]Profile Details:[/bold cyan]") + console.print("[dim]Top 10 functions by cumulative time:[/dim]") + metrics.profile_stats.sort_stats("cumulative").print_stats(10) def run_test(self, part: Literal["a", "b"]) -> bool: - """Run a specific test for the given part.""" + """Run test for a specific part""" test_func = getattr(self.test_module, f"test_{part}", None) if not test_func: console.print(f"[red]✗ No test found for part {part}") @@ -173,62 +277,95 @@ def run_test(self, part: Literal["a", "b"]) -> bool: console.print(f"[red]✗ Part {part} test failed: {str(e)}") return False except Exception as e: - console.print(f"[red]✗ Error running part {part} test: {str(e)}") + console.print(f"[red]✗ Error in part {part} test: {str(e)}") return False def run( self, func=None, - submit: bool = False, part: Union[None, str] = None, readme_update: bool = False, - profile: bool = True, - runs: int = 1000, - ) -> Dict[str, PerformanceMetrics]: - """Main execution method handling tests, profiling, and submissions.""" + profile: bool = False, + profile_runs: int = 10, + submit: bool = False, + ) -> None: + """Main execution method for running solutions""" + # Header console.rule(f"[bold blue]Advent of Code {self.year} - Day {self.day}") - metrics = {} if func is not None: with console.status("[cyan]Running main function..."): - func(self.get_data()) + func(self.data) - modules = importlib.import_module(f"src.aoc.aoc{self.year}.day_{self.day:02d}") + if submit or profile: + modules = importlib.import_module( + f"src.aoc.aoc{self.year}.day_{self.day:02d}" + ) - for part_name in ("a", "b"): - if part not in (part_name, "both"): + # Run tests first + tests = (self.run_test("a"), self.run_test("b")) + options = { + "a": (0,), + "b": (1,), + "both": (0, 1), + } + + console.print(f"\n[cyan]Test Results:") + console.print(f"Part A: {'[green]PASS' if tests[0] else '[red]FAIL'}") + console.print(f"Part B: {'[green]PASS' if tests[1] else '[red]FAIL'}") + console.print(f"Selected Part: [cyan]{part}") + + for i in options.get(part, ()): + current_part = "a" if i == 0 else "b" + part_func = getattr(modules, f"part_{current_part}") + + if not tests[i]: + console.print( + f"\n[red]Cannot proceed with part {current_part} - test failed" + ) continue - console.rule(f"[yellow]Part {part_name.upper()}") - part_func = getattr(modules, f"part_{part_name}") - test_passed = self.run_test(part_name) - + # Run performance analysis if requested if profile: - metrics[part_name] = self.analyze_performance(part_func, runs) - self.print_metrics(metrics[part_name], part_name) + console.rule( + f"[yellow]Performance Analysis - Part {current_part.upper()}" + ) + metrics = self.analyze_performance( + part_func, + runs=profile_runs, + with_profile=True, + ) + self.print_metrics(metrics, current_part) + result = metrics.result + else: + result = part_func(self.data) - if test_passed and submit: - with console.status(f"[green]Submitting part {part_name}..."): - self.submit(metrics[part_name].result, part=part_name) - console.print(f"[green]✓ Part {part_name} submitted") + # Submit if requested and tests passed + if submit: + with console.status(f"[green]Submitting part {current_part}..."): + self.submit(result, part=current_part) + console.print(f"[green]✓ Part {current_part} submitted") if readme_update: with console.status("[blue]Updating README..."): self.update_readme() console.print("[green]✓ README updated") - return metrics - def submit(self, answer, part=None) -> None: - """Submit an answer to Advent of Code.""" + """Submit an answer to Advent of Code""" submit(answer, part=part, day=self.day, year=self.year) def get_data(self) -> str: - """Get the input data for the current day.""" + """Get the input data for the current day""" return self.data + def get_problem_name(self) -> str: + """Get the title of the current day's problem""" + puzzle = Puzzle(year=self.year, day=self.day) + return puzzle.title + def update_readme(self) -> None: - """Update the README with current progress.""" + """Update the README with current progress""" readme_dir = os.path.join(PROJECT_ROOT, f"src/aoc/aoc{self.year}") readme_path = os.path.join(readme_dir, "readme.md") os.makedirs(readme_dir, exist_ok=True) @@ -270,8 +407,3 @@ def create_row(day: int) -> str: lines[i] = new_line break f.writelines(lines) - - def get_problem_name(self) -> str: - """Get the title of the current day's problem.""" - puzzle = Puzzle(year=self.year, day=self.day) - return puzzle.title