diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..789b2e3 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,36 @@ +name: Memoraith CI + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [3.7, 3.8, 3.9, '3.10'] + + steps: + - uses: actions/checkout@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + - name: Run tests + run: | + pytest tests/ + - name: Lint with flake8 + run: | + pip install flake8 + flake8 . + - name: Check type hints with mypy + run: | + pip install mypy + mypy memoraith/ \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9610b9a --- /dev/null +++ b/.gitignore @@ -0,0 +1,63 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# Environment +.env +.venv +env/ +venv/ +ENV/ + +# IDE +.vscode/ +.idea/ + +# Memoraith specific +memoraith_reports/ +*profiling_results/ \ No newline at end of file diff --git a/.pypirc b/.pypirc new file mode 100644 index 0000000..fc8e279 --- /dev/null +++ b/.pypirc @@ -0,0 +1,8 @@ +[distutils] +index-servers = + pypi + +[pypi] +repository: https://upload.pypi.org/legacy/ +username: __token__ +password: AgEIcHlwaS5vcmcCJGM1MWFhNzNiLTEwYWMtNGY2NS05Y2IzLTY0ZWEwYWNiYmNjNAACKlszLCI5MjdjMzIwZC1mMzE3LTQ3N2YtYmQ2MC02OTQ4NjMyMWYwZTYiXQAABiDPAHUyjQvh2o5rbrII2gDJu6clMbqadbt3r0oCDy62Mw diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..3537b16 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,45 @@ +# Contributing to Memoraith + +We welcome contributions to Memoraith! This document provides guidelines for contributing to the project. + +## Setting up the development environment + +1. Fork the repository on GitHub. +2. Clone your fork locally: + ``` + git clone https://github.com/your-username/memoraith.git + cd memoraith + ``` +3. Create a virtual environment and install dependencies: + ``` + python -m venv venv + source venv/bin/activate # On Windows, use `venv\Scripts\activate` + pip install -r requirements.txt + ``` + +## Making Changes + +1. Create a new branch for your changes: + ``` + git checkout -b your-feature-branch + ``` +2. Make your changes and commit them with a clear commit message. +3. Push your changes to your fork on GitHub: + ``` + git push origin your-feature-branch + ``` +4. Open a pull request from your fork to the main Memoraith repository. + +## Coding Standards + +- Follow PEP 8 guidelines for Python code. +- Use type hints for all function arguments and return values. +- Write clear, concise docstrings for all classes and functions. +- Add unit tests for new functionality. + +## Running Tests + +Run the test suite using pytest: + +``` +pytest \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..96ea4c4 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2023 Your Name + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/Manifest.in b/Manifest.in new file mode 100644 index 0000000..60b2268 --- /dev/null +++ b/Manifest.in @@ -0,0 +1,4 @@ +include README.md +include LICENSE +recursive-include memoraith/templates *.html +recursive-include examples *.py \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..bc6cf50 --- /dev/null +++ b/README.md @@ -0,0 +1,92 @@ +# Memoraith + +Memoraith is a lightweight model profiler for deep learning frameworks, designed to help you optimize your neural network models by providing detailed insights into their performance characteristics. + +## Features + +- Supports PyTorch and TensorFlow models +- Profiles memory usage (CPU and GPU) +- Measures computation time for each layer +- Detects bottlenecks and anomalies +- Generates comprehensive reports with visualizations +- Provides real-time visualization capabilities +- Offers both programmatic and command-line interfaces + +## Installation + +You can install Memoraith using pip: + +```bash +pip install memoraith +``` + +For GPU support, install with: + +```bash +pip install memoraith[gpu] +``` + +## Quick Start + +Here's a simple example of how to use Memoraith with a PyTorch model: + +```python +from memoraith import profile_model, set_output_path +import torch +import torch.nn as nn + +set_output_path('profiling_results/') + +class SimpleNet(nn.Module): + def __init__(self): + super(SimpleNet, self).__init__() + self.fc = nn.Linear(10, 5) + + def forward(self, x): + return self.fc(x) + +@profile_model(memory=True, computation=True, gpu=True) +def train_model(model): + optimizer = torch.optim.Adam(model.parameters()) + for _ in range(100): + input_data = torch.randn(32, 10) + output = model(input_data) + loss = output.sum() + loss.backward() + optimizer.step() + +if __name__ == "__main__": + model = SimpleNet() + train_model(model) +``` + +This will generate a profiling report in the 'profiling_results/' directory. + +## Documentation + +For more detailed information on how to use Memoraith, please refer to our [documentation](https://memoraith.readthedocs.io). + +## Contributing + +We welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for more details. + +## License + +Memoraith is released under the MIT License. See the [LICENSE](LICENSE) file for more details. + +## Support + +If you encounter any issues or have questions, please file an issue on the [GitHub issue tracker](https://github.com/yourusername/memoraith/issues). + +## Citing Memoraith + +If you use Memoraith in your research, please cite it as follows: + +``` +@software{memoraith, + author = {Your Name}, + title = {Memoraith: A Lightweight Model Profiler for Deep Learning}, + year = {2023}, + url = {https://github.com/yourusername/memoraith} +} +``` \ No newline at end of file diff --git a/blabla.py b/blabla.py new file mode 100644 index 0000000..37fe8bc --- /dev/null +++ b/blabla.py @@ -0,0 +1,42 @@ +import os +import re + +def create_or_update_file(file_path, content): + os.makedirs(os.path.dirname(file_path), exist_ok=True) + with open(file_path, 'w', encoding='utf-8') as file: + file.write(content) + print(f"Created/Updated: {file_path}") + +def extract_classes(content): + class_pattern = r'(class\s+\w+[\s\S]*?(?=\n\n|$))' + return re.findall(class_pattern, content) + +def extract_functions(content): + function_pattern = r'(def\s+\w+[\s\S]*?(?=\n\n|$))' + return re.findall(function_pattern, content) + +def update_memoraith(base_path, content): + # Extract file contents + file_pattern = r'# (memoraith/.*?\.py)\n\n([\s\S]*?)(?=\n# memoraith/|\Z)' + matches = re.findall(file_pattern, content) + + for file_path, file_content in matches: + full_path = os.path.join(base_path, file_path) + + # Extract classes and functions + classes = extract_classes(file_content) + functions = extract_functions(file_content) + + # Combine classes and functions + components = classes + functions + + # Create or update the file + create_or_update_file(full_path, "\n\n".join(components)) + +if __name__ == "__main__": + base_path = r"C:\Users\PC\Desktop\Leo-Major\Memoraith" + with open(r"C:\Users\PC\Desktop\Leo-Major\alop.txt", 'r', encoding='utf-8') as f: + content = f.read() + + update_memoraith(base_path, content) + print("Memoraith project has been updated successfully!") \ No newline at end of file diff --git a/examples/example_pytorch.py b/examples/example_pytorch.py new file mode 100644 index 0000000..8ca719b --- /dev/null +++ b/examples/example_pytorch.py @@ -0,0 +1 @@ +# PyTorch example diff --git a/examples/example_tensorflow.py b/examples/example_tensorflow.py new file mode 100644 index 0000000..e9d8e4e --- /dev/null +++ b/examples/example_tensorflow.py @@ -0,0 +1,25 @@ +import tensorflow as tf +from memoraith import profile_model, set_output_path + +def create_model(): + model = tf.keras.Sequential([ + tf.keras.layers.Dense(64, activation='relu', input_shape=(10,)), + tf.keras.layers.Dense(64, activation='relu'), + tf.keras.layers.Dense(1) + ]) + return model + +@profile_model(memory=True, computation=True, gpu=True) +def train_model(model, epochs=5): + model.compile(optimizer='adam', loss='mse') + + # Generate some dummy data + x_train = tf.random.normal((1000, 10)) + y_train = tf.random.normal((1000, 1)) + + model.fit(x_train, y_train, epochs=epochs, verbose=1) + +if __name__ == "__main__": + set_output_path('tensorflow_profiling_results/') + model = create_model() + train_model(model) \ No newline at end of file diff --git a/memoraith/__init__.py b/memoraith/__init__.py new file mode 100644 index 0000000..a0823ed --- /dev/null +++ b/memoraith/__init__.py @@ -0,0 +1,10 @@ +""" +Memoraith: Lightweight Model Profiler +""" + +__version__ = '0.3.0' + +from .profiler import profile_model, set_output_path +from .config import Config +from .exceptions import MemoraithError +from .visualization.real_time_visualizer import RealTimeVisualizer \ No newline at end of file diff --git a/memoraith/analysis/__init__.py b/memoraith/analysis/__init__.py new file mode 100644 index 0000000..532aec4 --- /dev/null +++ b/memoraith/analysis/__init__.py @@ -0,0 +1,5 @@ +from .analyzer import Analyzer +from .metrics import MetricsCalculator +from .bottleneck import BottleneckDetector +from .recommendations import RecommendationEngine +from .anomaly_detection import AnomalyDetector diff --git a/memoraith/analysis/analyzer.py b/memoraith/analysis/analyzer.py new file mode 100644 index 0000000..e88cd4b --- /dev/null +++ b/memoraith/analysis/analyzer.py @@ -0,0 +1,192 @@ +from typing import Dict, Any, List, Optional +import asyncio +import logging +from .metrics import MetricsCalculator +from .bottleneck import BottleneckDetector +from .recommendations import RecommendationEngine +from .anomaly_detection import AnomalyDetector + +class Analyzer: + """Complete analyzer for profiling data to identify bottlenecks and suggest optimizations.""" + + def __init__(self, data: Dict[str, Any]): + self.data = data + self.metrics = MetricsCalculator(data) + self.bottleneck_detector = BottleneckDetector() + self.recommendation_engine = RecommendationEngine() + self.anomaly_detector = AnomalyDetector() + self.logger = logging.getLogger(__name__) + + async def run_analysis(self) -> Dict[str, Any]: + """Run the complete analysis pipeline asynchronously.""" + try: + metrics = await self.metrics.calculate() + + bottlenecks, anomalies, recommendations = await asyncio.gather( + self.bottleneck_detector.detect(metrics), + self.anomaly_detector.detect(metrics), + self.recommendation_engine.generate(metrics) + ) + + performance_score = await self.get_overall_performance_score(metrics) + + return { + 'metrics': metrics, + 'bottlenecks': bottlenecks, + 'anomalies': anomalies, + 'recommendations': recommendations, + 'performance_score': performance_score + } + except Exception as e: + self.logger.error(f"Error during analysis: {str(e)}") + raise + + async def get_layer_analysis(self, layer_name: str) -> Dict[str, Any]: + """Get detailed analysis for a specific layer.""" + metrics = await self.metrics.calculate() + layer_metrics = metrics.get(layer_name, {}) + + if not layer_metrics: + return {'error': f"Layer '{layer_name}' not found in metrics."} + + bottlenecks = await self.bottleneck_detector.detect_for_layer(layer_metrics) + anomalies = await self.anomaly_detector.detect_for_layer(layer_metrics) + recommendations = await self.recommendation_engine.generate_for_layer(layer_metrics) + + return { + 'metrics': layer_metrics, + 'bottlenecks': bottlenecks, + 'anomalies': anomalies, + 'recommendations': recommendations + } + + async def get_top_bottlenecks(self, n: int = 5) -> List[Dict[str, Any]]: + """Get the top N bottlenecks across all layers.""" + metrics = await self.metrics.calculate() + all_bottlenecks = await self.bottleneck_detector.detect(metrics) + sorted_bottlenecks = sorted(all_bottlenecks, key=lambda x: x.get('severity', 0), reverse=True) + return sorted_bottlenecks[:n] + + async def get_overall_performance_score(self, metrics: Optional[Dict[str, Any]] = None) -> float: + """Calculate an overall performance score based on various metrics.""" + if metrics is None: + metrics = await self.metrics.calculate() + + total_time = sum(layer['total_time'] for layer in metrics.values() if isinstance(layer, dict)) + total_memory = sum(layer['total_cpu_memory'] for layer in metrics.values() if isinstance(layer, dict)) + num_bottlenecks = len(await self.bottleneck_detector.detect(metrics)) + num_anomalies = len(await self.anomaly_detector.detect(metrics)) + + # Lower times, memory usage, and fewer bottlenecks/anomalies result in a higher score + score = 100 - (total_time * 10 + total_memory / 100 + num_bottlenecks * 5 + num_anomalies * 3) + return max(0, min(score, 100)) # Ensure the score is between 0 and 100 + + async def compare_runs(self, other_data: Dict[str, Any]) -> Dict[str, Any]: + """Compare the current run with another run.""" + current_metrics = await self.metrics.calculate() + other_analyzer = Analyzer(other_data) + other_metrics = await other_analyzer.metrics.calculate() + + comparison = {} + for layer in set(current_metrics.keys()) | set(other_metrics.keys()): + comparison[layer] = { + 'current': current_metrics.get(layer, {}), + 'other': other_metrics.get(layer, {}), + 'time_diff': current_metrics.get(layer, {}).get('total_time', 0) - other_metrics.get(layer, {}).get('total_time', 0), + 'memory_diff': current_metrics.get(layer, {}).get('total_cpu_memory', 0) - other_metrics.get(layer, {}).get('total_cpu_memory', 0) + } + return comparison + + async def get_memory_profile(self) -> Dict[str, Any]: + """Get a detailed memory profile of the model.""" + metrics = await self.metrics.calculate() + total_memory = sum(layer['total_cpu_memory'] for layer in metrics.values() if isinstance(layer, dict)) + peak_memory = max(layer['total_cpu_memory'] for layer in metrics.values() if isinstance(layer, dict)) + + memory_intensive_layers = sorted( + [(layer, metrics[layer]['total_cpu_memory']) for layer in metrics if isinstance(metrics[layer], dict)], + key=lambda x: x[1], + reverse=True + )[:5] + + return { + 'total_memory': total_memory, + 'peak_memory': peak_memory, + 'memory_intensive_layers': memory_intensive_layers + } + + async def get_time_profile(self) -> Dict[str, Any]: + """Get a detailed time profile of the model.""" + metrics = await self.metrics.calculate() + total_time = sum(layer['total_time'] for layer in metrics.values() if isinstance(layer, dict)) + + time_intensive_layers = sorted( + [(layer, metrics[layer]['total_time']) for layer in metrics if isinstance(metrics[layer], dict)], + key=lambda x: x[1], + reverse=True + )[:5] + + return { + 'total_time': total_time, + 'time_intensive_layers': time_intensive_layers + } + + async def generate_optimization_plan(self) -> List[Dict[str, Any]]: + """Generate a step-by-step optimization plan based on the analysis.""" + analysis_results = await self.run_analysis() + optimization_plan = [] + for bottleneck in analysis_results['bottlenecks']: + recommendation = next((r for r in analysis_results['recommendations'] if r['layer'] == bottleneck['layer']), None) + if recommendation: + optimization_plan.append({ + 'step': len(optimization_plan) + 1, + 'layer': bottleneck['layer'], + 'issue': bottleneck['type'], + 'recommendation': recommendation['recommendation'], + 'expected_improvement': 'Medium to High' # This could be more sophisticated + }) + return optimization_plan + + async def get_layer_efficiency(self) -> Dict[str, float]: + """Calculate and return the efficiency of each layer.""" + metrics = await self.metrics.calculate() + efficiency = {} + for layer, layer_metrics in metrics.items(): + if isinstance(layer_metrics, dict): + time = layer_metrics.get('total_time', 1e-6) # Avoid division by zero + params = layer_metrics.get('parameters', 0) + efficiency[layer] = params / time if time > 0 else 0 + return efficiency + + async def get_model_complexity(self) -> Dict[str, Any]: + """Analyze and return the complexity of the model.""" + metrics = await self.metrics.calculate() + total_params = sum(layer['parameters'] for layer in metrics.values() if isinstance(layer, dict)) + total_flops = sum(layer.get('flops', 0) for layer in metrics.values() if isinstance(layer, dict)) + + return { + 'total_parameters': total_params, + 'total_flops': total_flops, + 'complexity_score': total_flops / total_params if total_params > 0 else 0 + } + + async def get_training_efficiency(self, training_data: Dict[str, Any]) -> Dict[str, Any]: + """Analyze the efficiency of the training process.""" + total_epochs = len(training_data) + total_time = sum(epoch['epoch_time'] for epoch in training_data) + avg_time_per_epoch = total_time / total_epochs if total_epochs > 0 else 0 + + return { + 'total_epochs': total_epochs, + 'total_training_time': total_time, + 'avg_time_per_epoch': avg_time_per_epoch, + 'training_speed': 1 / avg_time_per_epoch if avg_time_per_epoch > 0 else 0 + } + + def __str__(self) -> str: + """String representation of the Analyzer.""" + return f"Analyzer(data_size={len(self.data)})" + + def __repr__(self) -> str: + """Representation of the Analyzer.""" + return self.__str__() \ No newline at end of file diff --git a/memoraith/analysis/anomaly_detection.py b/memoraith/analysis/anomaly_detection.py new file mode 100644 index 0000000..a9ae2d8 --- /dev/null +++ b/memoraith/analysis/anomaly_detection.py @@ -0,0 +1,61 @@ +import numpy as np +from typing import Dict, Any, List +import logging + +class AnomalyDetector: + """Detects anomalies in the performance metrics.""" + + def __init__(self, z_threshold: float = 3.0): + self.z_threshold = z_threshold + self.logger = logging.getLogger(__name__) + + async def detect(self, metrics: Dict[str, Any]) -> List[Dict[str, Any]]: + """ + Detect anomalies such as sudden spikes in memory or time. + + Args: + metrics (Dict[str, Any]): Calculated metrics for each layer + + Returns: + List[Dict[str, Any]]: List of detected anomalies + """ + anomalies = [] + layers = list(metrics.keys()) + + try: + cpu_memory = [metrics[layer].get('total_cpu_memory', 0) for layer in layers] + gpu_memory = [metrics[layer].get('total_gpu_memory', 0) for layer in layers] + time = [metrics[layer].get('total_time', 0) for layer in layers] + + anomalies.extend(self._detect_anomalies(cpu_memory, 'CPU Memory', layers)) + anomalies.extend(self._detect_anomalies(gpu_memory, 'GPU Memory', layers)) + anomalies.extend(self._detect_anomalies(time, 'Computation Time', layers)) + except Exception as e: + self.logger.error(f"Error during anomaly detection: {str(e)}") + + return anomalies + + def _detect_anomalies(self, data: List[float], data_type: str, layers: List[str]) -> List[Dict[str, Any]]: + """Helper method to detect anomalies using z-score.""" + anomalies = [] + if not data: + return anomalies + + mean = np.mean(data) + std = np.std(data) + + if std == 0: + return anomalies # No variation, so no anomalies + + z_scores = [(x - mean) / std for x in data] + + for layer, z_score, value in zip(layers, z_scores, data): + if abs(z_score) > self.z_threshold: + anomalies.append({ + 'layer': layer, + 'type': data_type, + 'value': value, + 'z_score': z_score + }) + + return anomalies \ No newline at end of file diff --git a/memoraith/analysis/bottleneck.py b/memoraith/analysis/bottleneck.py new file mode 100644 index 0000000..9f50b14 --- /dev/null +++ b/memoraith/analysis/bottleneck.py @@ -0,0 +1,83 @@ +from typing import Dict, Any, List +import logging + +class BottleneckDetector: + """Identifies layers or operations that are bottlenecks.""" + + def __init__(self, time_threshold: float = 0.1, memory_threshold: float = 0.2): + self.time_threshold = time_threshold + self.memory_threshold = memory_threshold + self.logger = logging.getLogger(__name__) + + async def detect(self, metrics: Dict[str, Any]) -> List[Dict[str, Any]]: + """ + Detect bottlenecks in the model based on time and memory usage. + + Args: + metrics (Dict[str, Any]): Calculated metrics for each layer + + Returns: + List[Dict[str, Any]]: List of detected bottlenecks + """ + bottlenecks = [] + total_time = sum(layer['total_time'] for layer in metrics.values() if isinstance(layer, dict)) + total_memory = sum(layer['total_cpu_memory'] for layer in metrics.values() if isinstance(layer, dict)) + + try: + for layer, layer_metrics in metrics.items(): + if not isinstance(layer_metrics, dict): + continue + + time_ratio = layer_metrics['total_time'] / total_time if total_time else 0 + memory_ratio = layer_metrics['total_cpu_memory'] / total_memory if total_memory else 0 + + if time_ratio > self.time_threshold: + bottlenecks.append({ + 'layer': layer, + 'type': 'time', + 'value': layer_metrics['total_time'], + 'ratio': time_ratio + }) + + if memory_ratio > self.memory_threshold: + bottlenecks.append({ + 'layer': layer, + 'type': 'memory', + 'value': layer_metrics['total_cpu_memory'], + 'ratio': memory_ratio + }) + except Exception as e: + self.logger.error(f"Error during bottleneck detection: {str(e)}") + + return bottlenecks + + async def detect_for_layer(self, layer_metrics: Dict[str, Any]) -> List[Dict[str, Any]]: + """ + Detect bottlenecks for a specific layer. + + Args: + layer_metrics (Dict[str, Any]): Metrics for a specific layer + + Returns: + List[Dict[str, Any]]: List of detected bottlenecks for the layer + """ + bottlenecks = [] + + try: + if layer_metrics['total_time'] > self.time_threshold: + bottlenecks.append({ + 'type': 'time', + 'value': layer_metrics['total_time'] + }) + + if layer_metrics['total_cpu_memory'] > self.memory_threshold: + bottlenecks.append({ + 'type': 'memory', + 'value': layer_metrics['total_cpu_memory'] + }) + except KeyError as e: + self.logger.error(f"Missing key in layer metrics: {str(e)}") + except Exception as e: + self.logger.error(f"Error during layer bottleneck detection: {str(e)}") + + return bottlenecks \ No newline at end of file diff --git a/memoraith/analysis/metrics.py b/memoraith/analysis/metrics.py new file mode 100644 index 0000000..f758392 --- /dev/null +++ b/memoraith/analysis/metrics.py @@ -0,0 +1,82 @@ +from typing import Dict, Any +import logging +import asyncio + +class MetricsCalculator: + """Calculates various performance metrics from the collected data.""" + + def __init__(self, data: Dict[str, Any]): + self.data = data + self.logger = logging.getLogger(__name__) + + async def calculate(self) -> Dict[str, Any]: + """ + Compute metrics such as total memory and time per layer. + + Returns: + Dict[str, Any]: Calculated metrics for each layer and global metrics + """ + metrics = {} + try: + for layer_name, stats in self.data.items(): + metrics[layer_name] = await self._calculate_layer_metrics(stats) + + metrics['global'] = await self._calculate_global_metrics(metrics) + except Exception as e: + self.logger.error(f"Error calculating metrics: {str(e)}") + + return metrics + + async def _calculate_layer_metrics(self, stats: Dict[str, Any]) -> Dict[str, Any]: + """Calculate metrics for a single layer.""" + return { + 'total_cpu_memory': stats.get('cpu_memory', 0), + 'total_gpu_memory': stats.get('gpu_memory', 0), + 'total_time': stats.get('time', 0), + 'parameters': stats.get('parameters', 0), + 'output_shape': stats.get('output_shape', []), + 'flops': await self._estimate_flops(stats) + } + + async def _calculate_global_metrics(self, layer_metrics: Dict[str, Any]) -> Dict[str, Any]: + """Calculate global metrics across all layers.""" + total_time = sum(layer['total_time'] for layer in layer_metrics.values() if isinstance(layer, dict)) + total_parameters = sum(layer['parameters'] for layer in layer_metrics.values() if isinstance(layer, dict)) + total_flops = sum(layer['flops'] for layer in layer_metrics.values() if isinstance(layer, dict)) + + return { + 'total_time': total_time, + 'total_parameters': total_parameters, + 'peak_cpu_memory': max((layer['total_cpu_memory'] for layer in layer_metrics.values() if isinstance(layer, dict)), default=0), + 'peak_gpu_memory': max((layer['total_gpu_memory'] for layer in layer_metrics.values() if isinstance(layer, dict)), default=0), + 'total_flops': total_flops + } + + async def _estimate_flops(self, stats: Dict[str, Any]) -> int: + """Estimate FLOPs for a layer based on its type and parameters.""" + layer_type = stats.get('type', '') + input_shape = stats.get('input_shape', []) + output_shape = stats.get('output_shape', []) + parameters = stats.get('parameters', 0) + + # This is a simplified estimation and should be expanded for different layer types + if 'conv' in layer_type.lower(): + return parameters * output_shape[1] * output_shape[2] # Assuming NCHW format + elif 'linear' in layer_type.lower(): + return 2 * parameters # multiply-add operations + else: + return parameters # fallback estimation + + async def get_layer_efficiency(self, layer_name: str) -> float: + """Calculate the efficiency of a layer (FLOPs per second).""" + layer_metrics = self.data.get(layer_name, {}) + flops = await self._estimate_flops(layer_metrics) + time = layer_metrics.get('time', 1e-6) # Avoid division by zero + return flops / time if time > 0 else 0 + + async def get_model_efficiency(self) -> float: + """Calculate the overall model efficiency (FLOPs per second).""" + metrics = await self.calculate() + total_flops = metrics['global']['total_flops'] + total_time = metrics['global']['total_time'] + return total_flops / total_time if total_time > 0 else 0 \ No newline at end of file diff --git a/memoraith/analysis/recommendations.py b/memoraith/analysis/recommendations.py new file mode 100644 index 0000000..e12c2be --- /dev/null +++ b/memoraith/analysis/recommendations.py @@ -0,0 +1,69 @@ +from typing import Dict, Any, List +import logging + +class RecommendationEngine: + """Provides optimization suggestions based on detected bottlenecks.""" + + def __init__(self): + self.logger = logging.getLogger(__name__) + + async def generate(self, metrics: Dict[str, Any]) -> List[Dict[str, str]]: + """ + Generate recommendations for each bottleneck. + + Args: + metrics (Dict[str, Any]): Calculated metrics for each layer + + Returns: + List[Dict[str, str]]: List of recommendations + """ + recommendations = [] + try: + for layer, layer_metrics in metrics.items(): + if not isinstance(layer_metrics, dict): + continue + + layer_recommendations = await self.generate_for_layer(layer, layer_metrics) + recommendations.extend(layer_recommendations) + except Exception as e: + self.logger.error(f"Error generating recommendations: {str(e)}") + + return recommendations + + async def generate_for_layer(self, layer: str, layer_metrics: Dict[str, Any]) -> List[Dict[str, str]]: + """ + Generate recommendations for a specific layer. + + Args: + layer (str): Name of the layer + layer_metrics (Dict[str, Any]): Metrics for the specific layer + + Returns: + List[Dict[str, str]]: List of recommendations for the layer + """ + recommendations = [] + + try: + if layer_metrics['total_time'] > 0.1: # Arbitrary threshold + recommendations.append({ + 'layer': layer, + 'recommendation': f"Consider optimizing the {layer} for speed. It's taking {layer_metrics['total_time']:.4f} seconds." + }) + + if layer_metrics['total_cpu_memory'] > 1000: # Arbitrary threshold (1000 MB) + recommendations.append({ + 'layer': layer, + 'recommendation': f"The {layer} is using a lot of memory ({layer_metrics['total_cpu_memory']:.2f} MB). Consider reducing its size or using more memory-efficient operations." + }) + + if 'parameters' in layer_metrics and layer_metrics['parameters'] > 1000000: # Arbitrary threshold (1M parameters) + recommendations.append({ + 'layer': layer, + 'recommendation': f"The {layer} has a large number of parameters ({layer_metrics['parameters']:,}). Consider using techniques like pruning or quantization to reduce model size." + }) + except KeyError as e: + self.logger.error(f"Missing key in layer metrics for {layer}: {str(e)}") + except Exception as e: + self.logger.error(f"Error generating recommendations for {layer}: {str(e)}") + + return recommendations \ No newline at end of file diff --git a/memoraith/cli.py b/memoraith/cli.py new file mode 100644 index 0000000..d7003ba --- /dev/null +++ b/memoraith/cli.py @@ -0,0 +1,51 @@ +import argparse +import asyncio +from typing import Any +from memoraith import profile_model, set_output_path +from memoraith.config import config +from memoraith.exceptions import MemoraithError + +async def main() -> None: + parser = argparse.ArgumentParser(description="Memoraith: Lightweight Model Profiler") + parser.add_argument("module", help="Python module containing the model and training function") + parser.add_argument("function", help="Name of the function to profile") + parser.add_argument("--output", default="memoraith_reports", help="Output directory for profiling results") + parser.add_argument("--memory", action="store_true", help="Enable memory profiling") + parser.add_argument("--computation", action="store_true", help="Enable computation time profiling") + parser.add_argument("--gpu", action="store_true", help="Enable GPU profiling") + parser.add_argument("--real-time", action="store_true", help="Enable real-time visualization") + parser.add_argument("--config", help="Path to configuration file") + parser.add_argument("--report-format", choices=['html', 'pdf'], default='html', help="Report format") + + args = parser.parse_args() + + try: + if args.config: + config.load_from_file(args.config) + + set_output_path(args.output) + + module = __import__(args.module) + func = getattr(module, args.function) + + @profile_model(memory=args.memory, computation=args.computation, gpu=args.gpu, + real_time_viz=args.real_time, report_format=args.report_format) + async def wrapped_func(*args: Any, **kwargs: Any) -> Any: + if asyncio.iscoroutinefunction(func): + return await func(*args, **kwargs) + else: + return await asyncio.to_thread(func, *args, **kwargs) + + await wrapped_func() + + except ImportError as e: + print(f"Error: Could not import module '{args.module}'. {str(e)}") + except AttributeError as e: + print(f"Error: Function '{args.function}' not found in module '{args.module}'. {str(e)}") + except MemoraithError as e: + print(f"Memoraith Error: {str(e)}") + except Exception as e: + print(f"An unexpected error occurred: {str(e)}") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/memoraith/config.py b/memoraith/config.py new file mode 100644 index 0000000..28c4c5c --- /dev/null +++ b/memoraith/config.py @@ -0,0 +1,434 @@ +import os +import logging +from pathlib import Path +from typing import Dict, Any, Optional +from dotenv import load_dotenv +import yaml +import json +import torch.optim as optim +import torch.nn as nn + +class Config: + """ + Comprehensive configuration management for Memoraith. + Includes all existing functionality plus enhancements. + """ + + def __init__(self): + self.output_path = Path('memoraith_reports/') + self.enable_gpu = False + self.enable_memory = True + self.enable_time = True + self.log_level = logging.INFO + self.report_format = 'html' + self.real_time_viz = False + self.profiling_interval = 0.1 + self.max_memory_samples = 1000 + self.bottleneck_threshold = 0.1 + self.anomaly_threshold = 3.0 + self.batch_size = 32 + self.max_epochs = 100 + self.learning_rate = 0.001 + self.optimizer = 'adam' + self.loss_function = 'cross_entropy' + + # Load environment variables + load_dotenv() + + # Load config from environment variables + self.load_from_env() + + def __getitem__(self, key: str) -> Any: + return getattr(self, key, None) + + def __setitem__(self, key: str, value: Any) -> None: + setattr(self, key, value) + + def set_output_path(self, path: str) -> None: + """Set the output path for profiling reports.""" + self.output_path = Path(path) + self.output_path.mkdir(parents=True, exist_ok=True) + + def enable_gpu_profiling(self, enable: bool) -> None: + """Enable or disable GPU profiling.""" + self.enable_gpu = enable + + def set_log_level(self, level: int) -> None: + """Set the logging level.""" + self.log_level = level + + def set_batch_size(self, size: int) -> None: + """Set the batch size for training.""" + self.batch_size = size + + def set_max_epochs(self, epochs: int) -> None: + """Set the maximum number of epochs for training.""" + self.max_epochs = epochs + + def set_learning_rate(self, lr: float) -> None: + """Set the learning rate for training.""" + self.learning_rate = lr + + def set_optimizer(self, optimizer: str) -> None: + """Set the optimizer for training.""" + self.optimizer = optimizer + + def set_loss_function(self, loss: str) -> None: + """Set the loss function for training.""" + self.loss_function = loss + + def load_from_env(self) -> None: + """Load configuration from environment variables.""" + self.output_path = Path(os.getenv('MEMORAITH_OUTPUT_PATH', str(self.output_path))) + self.enable_gpu = os.getenv('MEMORAITH_ENABLE_GPU', str(self.enable_gpu)).lower() == 'true' + self.enable_memory = os.getenv('MEMORAITH_ENABLE_MEMORY', str(self.enable_memory)).lower() == 'true' + self.enable_time = os.getenv('MEMORAITH_ENABLE_TIME', str(self.enable_time)).lower() == 'true' + self.log_level = getattr(logging, os.getenv('MEMORAITH_LOG_LEVEL', 'INFO')) + self.batch_size = int(os.getenv('MEMORAITH_BATCH_SIZE', str(self.batch_size))) + self.max_epochs = int(os.getenv('MEMORAITH_MAX_EPOCHS', str(self.max_epochs))) + self.learning_rate = float(os.getenv('MEMORAITH_LEARNING_RATE', str(self.learning_rate))) + self.optimizer = os.getenv('MEMORAITH_OPTIMIZER', self.optimizer) + self.loss_function = os.getenv('MEMORAITH_LOSS_FUNCTION', self.loss_function) + + def load_from_file(self, config_file: str) -> None: + """Load configuration from a YAML file.""" + with open(config_file, 'r') as f: + config_data = yaml.safe_load(f) + + for key, value in config_data.items(): + if hasattr(self, key): + setattr(self, key, value) + + # Update main attributes if they're in the loaded config + if 'output_path' in config_data: + self.set_output_path(config_data['output_path']) + if 'enable_gpu' in config_data: + self.enable_gpu_profiling(config_data['enable_gpu']) + if 'log_level' in config_data: + self.set_log_level(getattr(logging, config_data['log_level'])) + if 'batch_size' in config_data: + self.set_batch_size(config_data['batch_size']) + if 'max_epochs' in config_data: + self.set_max_epochs(config_data['max_epochs']) + if 'learning_rate' in config_data: + self.set_learning_rate(config_data['learning_rate']) + if 'optimizer' in config_data: + self.set_optimizer(config_data['optimizer']) + if 'loss_function' in config_data: + self.set_loss_function(config_data['loss_function']) + + def to_dict(self) -> Dict[str, Any]: + """Convert configuration to a dictionary.""" + return {k: v for k, v in self.__dict__.items() if not k.startswith('_')} + + def get_optimizer(self, parameters: Any) -> Optional[Any]: + """Get the optimizer instance based on the configuration.""" + optimizer_map = { + 'adam': optim.Adam, + 'sgd': optim.SGD, + 'rmsprop': optim.RMSprop, + # Add more optimizers as needed + } + optimizer_class = optimizer_map.get(self.optimizer.lower()) + if optimizer_class: + return optimizer_class(parameters, lr=self.learning_rate) + else: + logging.error(f"Optimizer '{self.optimizer}' not supported.") + return None + + def get_loss_function(self) -> Optional[Any]: + """Get the loss function based on the configuration.""" + loss_map = { + 'cross_entropy': nn.CrossEntropyLoss, + 'mse': nn.MSELoss, + 'bce': nn.BCELoss, + # Add more loss functions as needed + } + loss_class = loss_map.get(self.loss_function.lower()) + if loss_class: + return loss_class() + else: + logging.error(f"Loss function '{self.loss_function}' not supported.") + return None + + def save_to_file(self, filename: str) -> None: + """Save the current configuration to a JSON file.""" + with open(filename, 'w') as f: + json.dump(self.to_dict(), f, indent=2) + logging.info(f"Configuration saved to {filename}") + + def validate(self) -> bool: + """Validate the current configuration.""" + # Add validation logic here + valid = True + if not isinstance(self.output_path, Path): + logging.error("output_path must be a Path object") + valid = False + if not isinstance(self.enable_gpu, bool): + logging.error("enable_gpu must be a boolean") + valid = False + # Add more validation checks as needed + return valid + + def set_profiling_interval(self, interval: float) -> None: + """Set the profiling interval.""" + self.profiling_interval = interval + + def set_max_memory_samples(self, samples: int) -> None: + """Set the maximum number of memory samples to collect.""" + self.max_memory_samples = samples + + def set_bottleneck_threshold(self, threshold: float) -> None: + """Set the threshold for detecting bottlenecks.""" + self.bottleneck_threshold = threshold + + def set_anomaly_threshold(self, threshold: float) -> None: + """Set the threshold for detecting anomalies.""" + self.anomaly_threshold = threshold + + def enable_real_time_visualization(self, enable: bool) -> None: + """Enable or disable real-time visualization.""" + self.real_time_viz = enable + + def set_report_format(self, format: str) -> None: + """Set the report format (html or pdf).""" + if format.lower() in ['html', 'pdf']: + self.report_format = format.lower() + else: + logging.error(f"Unsupported report format: {format}. Using default (html).") + + def get_full_config(self) -> Dict[str, Any]: + """Get the full configuration as a dictionary.""" + return self.to_dict() + + def reset_to_defaults(self) -> None: + """Reset all configuration options to their default values.""" + self.__init__() + + def __str__(self) -> str: + """String representation of the Config object.""" + return f"Config(output_path={self.output_path}, enable_gpu={self.enable_gpu}, ...)" + + def __repr__(self) -> str: + """Detailed string representation of the Config object.""" + return self.__str__() + +# Global configuration instance +config = Config() +import os +import logging +from pathlib import Path +from typing import Dict, Any, Optional +from dotenv import load_dotenv +import yaml +import json +import torch.optim as optim +import torch.nn as nn + +class Config: + """ + Comprehensive configuration management for Memoraith. + Includes all existing functionality plus enhancements. + """ + + def __init__(self): + self.output_path = Path('memoraith_reports/') + self.enable_gpu = False + self.enable_memory = True + self.enable_time = True + self.log_level = logging.INFO + self.report_format = 'html' + self.real_time_viz = False + self.profiling_interval = 0.1 + self.max_memory_samples = 1000 + self.bottleneck_threshold = 0.1 + self.anomaly_threshold = 3.0 + self.batch_size = 32 + self.max_epochs = 100 + self.learning_rate = 0.001 + self.optimizer = 'adam' + self.loss_function = 'cross_entropy' + + # Load environment variables + load_dotenv() + + # Load config from environment variables + self.load_from_env() + + def __getitem__(self, key: str) -> Any: + return getattr(self, key, None) + + def __setitem__(self, key: str, value: Any) -> None: + setattr(self, key, value) + + def set_output_path(self, path: str) -> None: + """Set the output path for profiling reports.""" + self.output_path = Path(path) + self.output_path.mkdir(parents=True, exist_ok=True) + + def enable_gpu_profiling(self, enable: bool) -> None: + """Enable or disable GPU profiling.""" + self.enable_gpu = enable + + def set_log_level(self, level: str) -> None: + """Set the logging level.""" + self.log_level = level + + def set_batch_size(self, size: int) -> None: + """Set the batch size for training.""" + self.batch_size = size + + def set_max_epochs(self, epochs: int) -> None: + """Set the maximum number of epochs for training.""" + self.max_epochs = epochs + + def set_learning_rate(self, lr: float) -> None: + """Set the learning rate for training.""" + self.learning_rate = lr + + def set_optimizer(self, optimizer: str) -> None: + """Set the optimizer for training.""" + self.optimizer = optimizer + + def set_loss_function(self, loss: str) -> None: + """Set the loss function for training.""" + self.loss_function = loss + + def load_from_env(self) -> None: + """Load configuration from environment variables.""" + self.output_path = Path(os.getenv('MEMORAITH_OUTPUT_PATH', str(self.output_path))) + self.enable_gpu = os.getenv('MEMORAITH_ENABLE_GPU', str(self.enable_gpu)).lower() == 'true' + self.enable_memory = os.getenv('MEMORAITH_ENABLE_MEMORY', str(self.enable_memory)).lower() == 'true' + self.enable_time = os.getenv('MEMORAITH_ENABLE_TIME', str(self.enable_time)).lower() == 'true' + self.log_level = os.getenv('MEMORAITH_LOG_LEVEL', 'INFO') + self.batch_size = int(os.getenv('MEMORAITH_BATCH_SIZE', str(self.batch_size))) + self.max_epochs = int(os.getenv('MEMORAITH_MAX_EPOCHS', str(self.max_epochs))) + self.learning_rate = float(os.getenv('MEMORAITH_LEARNING_RATE', str(self.learning_rate))) + self.optimizer = os.getenv('MEMORAITH_OPTIMIZER', self.optimizer) + self.loss_function = os.getenv('MEMORAITH_LOSS_FUNCTION', self.loss_function) + + def load_from_file(self, config_file: str) -> None: + """Load configuration from a YAML file.""" + with open(config_file, 'r') as f: + config_data = yaml.safe_load(f) + + for key, value in config_data.items(): + if hasattr(self, key): + setattr(self, key, value) + + # Update main attributes if they're in the loaded config + if 'output_path' in config_data: + self.set_output_path(config_data['output_path']) + if 'enable_gpu' in config_data: + self.enable_gpu_profiling(config_data['enable_gpu']) + if 'log_level' in config_data: + self.log_level = config_data['log_level'] + if 'batch_size' in config_data: + self.set_batch_size(config_data['batch_size']) + if 'max_epochs' in config_data: + self.set_max_epochs(config_data['max_epochs']) + if 'learning_rate' in config_data: + self.set_learning_rate(config_data['learning_rate']) + if 'optimizer' in config_data: + self.set_optimizer(config_data['optimizer']) + if 'loss_function' in config_data: + self.set_loss_function(config_data['loss_function']) + + def to_dict(self) -> Dict[str, Any]: + """Convert configuration to a dictionary.""" + return {k: v for k, v in self.__dict__.items() if not k.startswith('_')} + + def get_optimizer(self, parameters: Any) -> Optional[Any]: + """Get the optimizer instance based on the configuration.""" + optimizer_map = { + 'adam': optim.Adam, + 'sgd': optim.SGD, + 'rmsprop': optim.RMSprop, + # Add more optimizers as needed + } + optimizer_class = optimizer_map.get(self.optimizer.lower()) + if optimizer_class: + return optimizer_class(parameters, lr=self.learning_rate) + else: + logging.error(f"Optimizer '{self.optimizer}' not supported.") + return None + + def get_loss_function(self) -> Optional[Any]: + """Get the loss function based on the configuration.""" + loss_map = { + 'cross_entropy': nn.CrossEntropyLoss, + 'mse': nn.MSELoss, + 'bce': nn.BCELoss, + # Add more loss functions as needed + } + loss_class = loss_map.get(self.loss_function.lower()) + if loss_class: + return loss_class() + else: + logging.error(f"Loss function '{self.loss_function}' not supported.") + return None + + def save_to_file(self, filename: str) -> None: + """Save the current configuration to a JSON file.""" + with open(filename, 'w') as f: + json.dump(self.to_dict(), f, indent=2) + logging.info(f"Configuration saved to {filename}") + + def validate(self) -> bool: + """Validate the current configuration.""" + # Add validation logic here + valid = True + if not isinstance(self.output_path, Path): + logging.error("output_path must be a Path object") + valid = False + if not isinstance(self.enable_gpu, bool): + logging.error("enable_gpu must be a boolean") + valid = False + # Add more validation checks as needed + return valid + + def set_profiling_interval(self, interval: float) -> None: + """Set the profiling interval.""" + self.profiling_interval = interval + + def set_max_memory_samples(self, samples: int) -> None: + """Set the maximum number of memory samples to collect.""" + self.max_memory_samples = samples + + def set_bottleneck_threshold(self, threshold: float) -> None: + """Set the threshold for detecting bottlenecks.""" + self.bottleneck_threshold = threshold + + def set_anomaly_threshold(self, threshold: float) -> None: + """Set the threshold for detecting anomalies.""" + self.anomaly_threshold = threshold + + def enable_real_time_visualization(self, enable: bool) -> None: + """Enable or disable real-time visualization.""" + self.real_time_viz = enable + + def set_report_format(self, format: str) -> None: + """Set the report format (html or pdf).""" + if format.lower() in ['html', 'pdf']: + self.report_format = format.lower() + else: + logging.error(f"Unsupported report format: {format}. Using default (html).") + + def get_full_config(self) -> Dict[str, Any]: + """Get the full configuration as a dictionary.""" + return self.to_dict() + + def reset_to_defaults(self) -> None: + """Reset all configuration options to their default values.""" + self.__init__() + + def __str__(self) -> str: + """String representation of the Config object.""" + return f"Config(output_path={self.output_path}, enable_gpu={self.enable_gpu}, ...)" + + def __repr__(self) -> str: + """Detailed string representation of the Config object.""" + return self.__str__() + +# Global configuration instance +config = Config() \ No newline at end of file diff --git a/memoraith/data_collection/__init__.py b/memoraith/data_collection/__init__.py new file mode 100644 index 0000000..8ea0527 --- /dev/null +++ b/memoraith/data_collection/__init__.py @@ -0,0 +1,4 @@ +from .cpu_memory import CPUMemoryTracker +from .gpu_memory import GPUMemoryTracker +from .time_tracking import TimeTracker +from .resource_lock import ResourceLock diff --git a/memoraith/data_collection/cpu_memory.py b/memoraith/data_collection/cpu_memory.py new file mode 100644 index 0000000..dea4c9f --- /dev/null +++ b/memoraith/data_collection/cpu_memory.py @@ -0,0 +1,85 @@ +import psutil +import threading +import time +from typing import List, Dict, Any +import logging + +class CPUMemoryTracker: + """ + Advanced CPU memory usage tracker with detailed memory breakdown. + """ + + def __init__(self, interval: float = 0.1, detailed: bool = True): + self.interval = interval + self.detailed = detailed + self.memory_usage: List[Dict[str, Any]] = [] + self._stop_event = threading.Event() + self._lock = threading.Lock() + self._thread: threading.Thread = None + self.logger = logging.getLogger(__name__) + + def start(self) -> None: + """Start tracking CPU memory usage.""" + self._thread = threading.Thread(target=self._track_memory, daemon=True) + self._thread.start() + self.logger.info("CPU memory tracking started") + + def stop(self) -> None: + """Stop tracking CPU memory usage.""" + self._stop_event.set() + if self._thread: + self._thread.join() + self.logger.info("CPU memory tracking stopped") + + def _track_memory(self) -> None: + process = psutil.Process() + while not self._stop_event.is_set(): + try: + mem_info = process.memory_info() + mem_data = { + 'timestamp': time.time(), + 'rss': mem_info.rss / (1024 * 1024), # RSS in MB + 'vms': mem_info.vms / (1024 * 1024), # VMS in MB + } + + if self.detailed: + mem_maps = process.memory_maps(grouped=True) + mem_data.update({ + 'shared': sum(m.shared for m in mem_maps) / (1024 * 1024), + 'private': sum(m.private for m in mem_maps) / (1024 * 1024), + 'swap': sum(m.swap for m in mem_maps) / (1024 * 1024), + }) + + with self._lock: + self.memory_usage.append(mem_data) + except Exception as e: + self.logger.error(f"Error tracking CPU memory: {str(e)}") + + time.sleep(self.interval) + + def get_peak_memory(self) -> Dict[str, float]: + """Get the peak memory usage.""" + with self._lock: + if not self.memory_usage: + return {} + return max(self.memory_usage, key=lambda x: x['rss']) + + def get_average_memory(self) -> Dict[str, float]: + """Get the average memory usage.""" + with self._lock: + if not self.memory_usage: + return {} + avg_mem = {key: sum(m[key] for m in self.memory_usage) / len(self.memory_usage) + for key in self.memory_usage[0] if key != 'timestamp'} + return avg_mem + + def get_memory_timeline(self) -> List[Dict[str, Any]]: + """Get the full timeline of memory usage.""" + with self._lock: + return self.memory_usage.copy() + + def reset(self) -> None: + """Reset the memory usage data.""" + with self._lock: + self.memory_usage.clear() + self.logger.info("CPU memory tracking data reset") \ No newline at end of file diff --git a/memoraith/data_collection/gpu_memory.py b/memoraith/data_collection/gpu_memory.py new file mode 100644 index 0000000..6d846dd --- /dev/null +++ b/memoraith/data_collection/gpu_memory.py @@ -0,0 +1,121 @@ +import logging +from typing import List, Optional +import asyncio + +logger = logging.getLogger(__name__) + +try: + from pynvml import * + PYNVML_AVAILABLE = True +except ImportError: + logger.warning("pynvml not available. GPU memory tracking will be disabled.") + PYNVML_AVAILABLE = False + +from ..exceptions import GPUNotAvailableError + +class GPUMemoryTracker: + """Tracks GPU memory usage over time for NVIDIA GPUs.""" + + def __init__(self, device_id: int = 0, interval: float = 0.1): + self.device_id = device_id + self.interval = interval + self.memory_usage: List[float] = [] + self._stop_event: Optional[asyncio.Event] = None + self._tracking_task: Optional[asyncio.Task] = None + self.logger = logging.getLogger(__name__) + + async def start(self) -> None: + """Start tracking GPU memory usage.""" + if not PYNVML_AVAILABLE: + self.logger.warning("GPU memory tracking is not available.") + return + + try: + nvmlInit() + self.device = nvmlDeviceGetHandleByIndex(self.device_id) + self._stop_event = asyncio.Event() + self._tracking_task = asyncio.create_task(self._track_memory()) + self.logger.info(f"Started GPU memory tracking for device {self.device_id}") + except NVMLError as e: + self.logger.error(f"Failed to initialize GPU memory tracking: {str(e)}") + raise GPUNotAvailableError(f"GPU tracking failed: {str(e)}") + + async def stop(self) -> None: + """Stop tracking GPU memory usage.""" + if not PYNVML_AVAILABLE: + return + + if self._stop_event: + self._stop_event.set() + if self._tracking_task: + await self._tracking_task + try: + nvmlShutdown() + self.logger.info("Stopped GPU memory tracking") + except NVMLError as e: + self.logger.error(f"Error during NVML shutdown: {str(e)}") + + async def _track_memory(self) -> None: + """Internal method to track memory usage.""" + while not self._stop_event.is_set(): + try: + mem_info = nvmlDeviceGetMemoryInfo(self.device) + self.memory_usage.append(mem_info.used / 1024**2) # Convert to MB + except NVMLError as e: + self.logger.error(f"Error tracking GPU memory: {str(e)}") + await asyncio.sleep(self.interval) + + async def get_peak_memory(self) -> float: + """Get the peak GPU memory usage.""" + return max(self.memory_usage) if self.memory_usage else 0 + + async def get_average_memory(self) -> float: + """Get the average GPU memory usage.""" + return sum(self.memory_usage) / len(self.memory_usage) if self.memory_usage else 0 + + async def get_current_memory(self) -> float: + """Get the current GPU memory usage.""" + if not PYNVML_AVAILABLE: + return 0 + + try: + mem_info = nvmlDeviceGetMemoryInfo(self.device) + return mem_info.used / 1024**2 # Convert to MB + except NVMLError as e: + self.logger.error(f"Error getting current GPU memory: {str(e)}") + return 0 + + async def get_memory_history(self) -> List[float]: + """Get the full history of memory usage.""" + return self.memory_usage + + async def get_device_info(self) -> dict: + """Get information about the GPU device being tracked.""" + if not PYNVML_AVAILABLE: + return {"error": "GPU information not available"} + + try: + device_name = nvmlDeviceGetName(self.device).decode('utf-8') + total_memory = nvmlDeviceGetMemoryInfo(self.device).total / 1024**2 # Convert to MB + cuda_version = nvmlSystemGetCudaDriverVersion() + return { + "device_id": self.device_id, + "device_name": device_name, + "total_memory": total_memory, + "cuda_version": f"{cuda_version // 1000}.{(cuda_version % 1000) // 10}" + } + except NVMLError as e: + self.logger.error(f"Error getting GPU device info: {str(e)}") + return {"error": str(e)} + + async def reset(self) -> None: + """Reset the memory usage history.""" + self.memory_usage = [] + + def __del__(self): + """Ensure NVML is shut down when the object is deleted.""" + if PYNVML_AVAILABLE: + try: + nvmlShutdown() + except: + pass # Ignore errors during shutdown in destructor \ No newline at end of file diff --git a/memoraith/data_collection/resource_lock.py b/memoraith/data_collection/resource_lock.py new file mode 100644 index 0000000..808c5b1 --- /dev/null +++ b/memoraith/data_collection/resource_lock.py @@ -0,0 +1,78 @@ +import threading +from contextlib import contextmanager +import time +import logging +from typing import Optional + +class ResourceLock: + """ + Advanced resource locking mechanism with timeout and deadlock detection. + """ + + def __init__(self, name: str, timeout: float = 5.0): + self._lock = threading.Lock() + self._owner: Optional[int] = None + self.name = name + self.timeout = timeout + self.logger = logging.getLogger(__name__) + + @contextmanager + def __call__(self, timeout: Optional[float] = None): + acquired = self.acquire(timeout) + if not acquired: + raise TimeoutError(f"Failed to acquire lock '{self.name}' within the specified timeout.") + try: + yield + finally: + self.release() + + def acquire(self, timeout: Optional[float] = None) -> bool: + """ + Attempt to acquire the lock with a specified timeout. + + Args: + timeout (Optional[float]): The maximum time to wait for the lock. If None, use the default timeout. + + Returns: + bool: True if the lock was acquired, False otherwise. + """ + start_time = time.time() + timeout = timeout or self.timeout + + while True: + if self._lock.acquire(blocking=False): + self._owner = threading.get_ident() + self.logger.debug(f"Lock '{self.name}' acquired by thread {self._owner}") + return True + + if time.time() - start_time > timeout: + self.logger.warning(f"Timeout while attempting to acquire lock '{self.name}'") + return False + + time.sleep(0.1) + + def release(self) -> None: + """Release the lock.""" + if self._owner != threading.get_ident(): + raise RuntimeError(f"Attempt to release lock '{self.name}' by non-owner thread") + + self._owner = None + self._lock.release() + self.logger.debug(f"Lock '{self.name}' released by thread {threading.get_ident()}") + + def __enter__(self): + if not self.acquire(): + raise TimeoutError(f"Failed to acquire lock '{self.name}' within the specified timeout.") + + def __exit__(self, exc_type, exc_value, traceback): + self.release() + + @property + def locked(self) -> bool: + """Check if the lock is currently held.""" + return self._lock.locked() + + @property + def owner(self) -> Optional[int]: + """Get the ID of the thread that currently holds the lock, if any.""" + return self._owner \ No newline at end of file diff --git a/memoraith/data_collection/time_tracking.py b/memoraith/data_collection/time_tracking.py new file mode 100644 index 0000000..7d884e8 --- /dev/null +++ b/memoraith/data_collection/time_tracking.py @@ -0,0 +1,114 @@ +import time +from typing import Dict, Optional, List +import threading +import logging + +class TimeTracker: + """Advanced time tracking for operations with nested timing support.""" + + def __init__(self): + self.start_times: Dict[str, List[float]] = {} + self.end_times: Dict[str, List[float]] = {} + self.durations: Dict[str, List[float]] = {} + self._lock = threading.Lock() + self.logger = logging.getLogger(__name__) + + def start(self, key: str) -> None: + """ + Start timing an operation. Supports nested timing for the same key. + + Args: + key (str): Unique identifier for the operation + """ + with self._lock: + if key not in self.start_times: + self.start_times[key] = [] + self.end_times[key] = [] + self.durations[key] = [] + self.start_times[key].append(time.perf_counter()) + self.logger.debug(f"Started timing for {key}") + + def stop(self, key: str) -> None: + """ + Stop timing an operation. Matches the most recent start for the key. + + Args: + key (str): Unique identifier for the operation + """ + end_time = time.perf_counter() + with self._lock: + if key not in self.start_times or not self.start_times[key]: + raise ValueError(f"No matching start time found for key: {key}") + start_time = self.start_times[key].pop() + self.end_times[key].append(end_time) + duration = end_time - start_time + self.durations[key].append(duration) + self.logger.debug(f"Stopped timing for {key}. Duration: {duration:.6f} seconds") + + def get_duration(self, key: str) -> Optional[float]: + """ + Get the total duration of all timings for a key. + + Args: + key (str): Unique identifier for the operation + + Returns: + Optional[float]: Total duration in seconds, or None if not available + """ + with self._lock: + if key in self.durations: + return sum(self.durations[key]) + return None + + def get_average_duration(self, key: str) -> Optional[float]: + """ + Get the average duration of all timings for a key. + + Args: + key (str): Unique identifier for the operation + + Returns: + Optional[float]: Average duration in seconds, or None if not available + """ + with self._lock: + if key in self.durations and self.durations[key]: + return sum(self.durations[key]) / len(self.durations[key]) + return None + + def reset(self) -> None: + """Reset all timings.""" + with self._lock: + self.start_times.clear() + self.end_times.clear() + self.durations.clear() + self.logger.info("All timings have been reset") + + def get_all_durations(self) -> Dict[str, List[float]]: + """ + Get durations for all tracked operations. + + Returns: + Dict[str, List[float]]: Dictionary of operation keys and their durations + """ + with self._lock: + return {key: durations.copy() for key, durations in self.durations.items()} + + def get_summary(self) -> Dict[str, Dict[str, float]]: + """ + Get a summary of all tracked operations. + + Returns: + Dict[str, Dict[str, float]]: Dictionary with total, average, min, and max durations for each key + """ + summary = {} + with self._lock: + for key, durations in self.durations.items(): + if durations: + summary[key] = { + 'total': sum(durations), + 'average': sum(durations) / len(durations), + 'min': min(durations), + 'max': max(durations), + 'count': len(durations) + } + return summary \ No newline at end of file diff --git a/memoraith/exceptions.py b/memoraith/exceptions.py new file mode 100644 index 0000000..4933b7e --- /dev/null +++ b/memoraith/exceptions.py @@ -0,0 +1,59 @@ +class MemoraithError(Exception): + """Base exception class for Memoraith.""" + def __init__(self, message: str): + self.message = message + super().__init__(self.message) + +class FrameworkNotSupportedError(MemoraithError): + """Exception raised when an unsupported framework is used.""" + def __init__(self, framework_name: str): + self.framework_name = framework_name + message = f"Framework '{framework_name}' is not supported." + super().__init__(message) + +class ConfigurationError(MemoraithError): + """Exception raised when there's an issue with the configuration.""" + def __init__(self, config_item: str, details: str): + self.config_item = config_item + self.details = details + message = f"Configuration error for {config_item}: {details}" + super().__init__(message) + +class ProfilingError(MemoraithError): + """Exception raised when there's an error during profiling.""" + def __init__(self, component: str, details: str): + self.component = component + self.details = details + message = f"Profiling error in {component}: {details}" + super().__init__(message) + +class DataCollectionError(MemoraithError): + """Exception raised when there's an error collecting profiling data.""" + def __init__(self, data_type: str, details: str): + self.data_type = data_type + self.details = details + message = f"Error collecting {data_type} data: {details}" + super().__init__(message) + +class AnalysisError(MemoraithError): + """Exception raised when there's an error during data analysis.""" + def __init__(self, analysis_type: str, details: str): + self.analysis_type = analysis_type + self.details = details + message = f"Error during {analysis_type} analysis: {details}" + super().__init__(message) + +class ReportGenerationError(MemoraithError): + """Exception raised when there's an error generating reports.""" + def __init__(self, report_type: str, details: str): + self.report_type = report_type + self.details = details + message = f"Error generating {report_type} report: {details}" + super().__init__(message) + +class GPUNotAvailableError(MemoraithError): + """Exception raised when GPU profiling is requested but not available.""" + def __init__(self, details: str): + self.details = details + message = f"GPU profiling not available: {details}" + super().__init__(message) \ No newline at end of file diff --git a/memoraith/integration/__init__.py b/memoraith/integration/__init__.py new file mode 100644 index 0000000..09dc1f2 --- /dev/null +++ b/memoraith/integration/__init__.py @@ -0,0 +1,26 @@ +from .framework_adapter import FrameworkAdapter +from .pytorch_adapter import PyTorchAdapter +from .tensorflow_adapter import TensorFlowAdapter +from .common_utils import identify_framework +from memoraith.exceptions import FrameworkNotSupportedError + +def get_framework_adapter(model): + """ + Returns the appropriate framework adapter based on the model type. + + Args: + model: The machine learning model to be profiled + + Returns: + FrameworkAdapter: An instance of the appropriate adapter + + Raises: + FrameworkNotSupportedError: If the framework is not supported + """ + framework_name = identify_framework(model) + if framework_name == 'pytorch': + return PyTorchAdapter(model) + elif framework_name == 'tensorflow': + return TensorFlowAdapter(model) + else: + raise FrameworkNotSupportedError(framework_name) diff --git a/memoraith/integration/common_utils.py b/memoraith/integration/common_utils.py new file mode 100644 index 0000000..6698195 --- /dev/null +++ b/memoraith/integration/common_utils.py @@ -0,0 +1,140 @@ +from typing import Any, Dict, List +import inspect +import logging + +logger = logging.getLogger(__name__) + +def identify_framework(model: Any) -> str: + """ + Identify the deep learning framework of the given model. + + Args: + model: The machine learning model to identify + + Returns: + str: The identified framework name ('pytorch', 'tensorflow', 'unknown') + """ + try: + model_type = type(model).__module__.split('.')[0] + if model_type == 'torch': + return 'pytorch' + elif model_type in ['tensorflow', 'keras']: + return 'tensorflow' + else: + logger.warning(f"Unknown framework for model type: {model_type}") + return 'unknown' + except Exception as e: + logger.error(f"Error identifying framework: {str(e)}") + return 'unknown' + +def get_model_structure(model: Any) -> Dict[str, Any]: + """ + Get a structured representation of the model's architecture. + + Args: + model: The machine learning model + + Returns: + Dict[str, Any]: A dictionary representing the model's structure + """ + try: + framework = identify_framework(model) + if framework == 'pytorch': + return _get_pytorch_structure(model) + elif framework == 'tensorflow': + return _get_tensorflow_structure(model) + else: + return {'error': 'Unsupported framework'} + except Exception as e: + logger.error(f"Error getting model structure: {str(e)}") + return {'error': str(e)} + +def _get_pytorch_structure(model: Any) -> Dict[str, Any]: + """Helper function to get PyTorch model structure.""" + structure = {} + for name, module in model.named_modules(): + if list(module.children()): # Skip container modules + continue + structure[name] = { + 'type': type(module).__name__, + 'parameters': sum(p.numel() for p in module.parameters()), + 'trainable': sum(p.numel() for p in module.parameters() if p.requires_grad) + } + return structure + +def _get_tensorflow_structure(model: Any) -> Dict[str, Any]: + """Helper function to get TensorFlow model structure.""" + structure = {} + for layer in model.layers: + structure[layer.name] = { + 'type': type(layer).__name__, + 'parameters': layer.count_params(), + 'trainable': sum(tf.keras.backend.count_params(w) for w in layer.trainable_weights) + } + return structure + +def estimate_model_size(model: Any) -> Dict[str, float]: + """ + Estimate the size of the model in memory. + + Args: + model: The machine learning model + + Returns: + Dict[str, float]: Estimated size in MB for parameters and buffers + """ + try: + framework = identify_framework(model) + if framework == 'pytorch': + return _estimate_pytorch_size(model) + elif framework == 'tensorflow': + return _estimate_tensorflow_size(model) + else: + return {'error': 'Unsupported framework'} + except Exception as e: + logger.error(f"Error estimating model size: {str(e)}") + return {'error': str(e)} + +def _estimate_pytorch_size(model: Any) -> Dict[str, float]: + """Helper function to estimate PyTorch model size.""" + param_size = sum(p.numel() * p.element_size() for p in model.parameters()) + buffer_size = sum(b.numel() * b.element_size() for b in model.buffers()) + return { + 'parameters': param_size / (1024 * 1024), + 'buffers': buffer_size / (1024 * 1024) + } + +def _estimate_tensorflow_size(model: Any) -> Dict[str, float]: + """Helper function to estimate TensorFlow model size.""" + param_size = sum(tf.keras.backend.count_params(w) * w.dtype.size for w in model.weights) + return { + 'parameters': param_size / (1024 * 1024), + 'buffers': 0 # TensorFlow doesn't have a direct equivalent to PyTorch's buffers + } + +def get_function_info(func: callable) -> Dict[str, Any]: + """ + Get detailed information about a function. + + Args: + func: The function to inspect + + Returns: + Dict[str, Any]: Information about the function + """ + try: + signature = inspect.signature(func) + return { + 'name': func.__name__, + 'module': func.__module__, + 'docstring': inspect.getdoc(func), + 'parameters': [{'name': name, 'annotation': param.annotation.__name__ if param.annotation != inspect.Parameter.empty else None} + for name, param in signature.parameters.items()], + 'return_annotation': signature.return_annotation.__name__ if signature.return_annotation != inspect.Signature.empty else None, + 'is_coroutine': inspect.iscoroutinefunction(func), + 'is_generator': inspect.isgeneratorfunction(func), + 'source': inspect.getsource(func) + } + except Exception as e: + logger.error(f"Error getting function info: {str(e)}") + return {'error': str(e)} \ No newline at end of file diff --git a/memoraith/integration/framework_adapter.py b/memoraith/integration/framework_adapter.py new file mode 100644 index 0000000..4321cc9 --- /dev/null +++ b/memoraith/integration/framework_adapter.py @@ -0,0 +1,116 @@ +from abc import ABC, abstractmethod +from typing import Dict, Any, List +import logging + +class FrameworkAdapter(ABC): + """Abstract base class for framework-specific adapters with enhanced functionality.""" + + def __init__(self, model: Any): + self.model = model + self.data: Dict[str, Any] = {} + self.logger = logging.getLogger(__name__) + + @abstractmethod + async def start_profiling(self) -> None: + """Start profiling the model.""" + pass + + @abstractmethod + async def stop_profiling(self) -> None: + """Stop profiling the model.""" + pass + + @abstractmethod + async def profile_inference(self, input_data: Any) -> Dict[str, Any]: + """Profile the inference process for a single input.""" + pass + + @abstractmethod + async def profile_training_step(self, input_data: Any, target: Any) -> Dict[str, Any]: + """Profile a single training step.""" + pass + + @abstractmethod + async def get_model_summary(self) -> Dict[str, Any]: + """Get a summary of the model architecture.""" + pass + + @abstractmethod + async def get_layer_info(self, layer_name: str) -> Dict[str, Any]: + """Get detailed information about a specific layer.""" + pass + + @abstractmethod + async def profile_memory_usage(self) -> Dict[str, float]: + """Profile the memory usage of the model.""" + pass + + @abstractmethod + def get_flops(self) -> int: + """Calculate the total number of FLOPs for the model.""" + pass + + async def __aenter__(self): + await self.start_profiling() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.stop_profiling() + + async def profile_full_training(self, train_loader: Any, num_epochs: int) -> List[Dict[str, Any]]: + """Profile the full training process.""" + epoch_data = [] + for epoch in range(num_epochs): + epoch_start_time = await self.get_current_time() + epoch_loss = 0.0 + batch_data = [] + + for batch_idx, (data, target) in enumerate(train_loader): + batch_profile = await self.profile_training_step(data, target) + epoch_loss += batch_profile['loss'] + batch_data.append(batch_profile) + + epoch_end_time = await self.get_current_time() + epoch_time = epoch_end_time - epoch_start_time + + epoch_data.append({ + 'epoch': epoch, + 'epoch_time': epoch_time, + 'epoch_loss': epoch_loss / len(train_loader), + 'batch_data': batch_data + }) + + return epoch_data + + @abstractmethod + async def get_current_time(self) -> float: + """Get the current time in a high-precision format.""" + pass + + @abstractmethod + async def export_model(self, path: str, format: str) -> None: + """Export the model to a specified format.""" + pass + + @abstractmethod + async def visualize_model(self, output_path: str) -> None: + """Generate a visual representation of the model architecture.""" + pass + + async def log_profiling_start(self) -> None: + """Log the start of profiling.""" + self.logger.info(f"Started profiling for {type(self.model).__name__}") + + async def log_profiling_stop(self) -> None: + """Log the end of profiling.""" + self.logger.info(f"Stopped profiling for {type(self.model).__name__}") + + @abstractmethod + async def get_optimizer_info(self) -> Dict[str, Any]: + """Get information about the current optimizer.""" + pass + + @abstractmethod + async def get_loss_function_info(self) -> Dict[str, Any]: + """Get information about the current loss function.""" + pass \ No newline at end of file diff --git a/memoraith/integration/pytorch_adapter.py b/memoraith/integration/pytorch_adapter.py new file mode 100644 index 0000000..85b8d24 --- /dev/null +++ b/memoraith/integration/pytorch_adapter.py @@ -0,0 +1,283 @@ +import torch +import logging +from typing import Dict, Any, List, Optional +import asyncio +import time +from .framework_adapter import FrameworkAdapter +from ..data_collection import TimeTracker, CPUMemoryTracker, GPUMemoryTracker +from ..config import config + +class PyTorchAdapter(FrameworkAdapter): + """Complete adapter for integrating with PyTorch models.""" + + def __init__(self, model: torch.nn.Module): + super().__init__(model) + self.time_tracker = TimeTracker() + self.cpu_tracker = CPUMemoryTracker() + self.gpu_tracker = GPUMemoryTracker() if torch.cuda.is_available() and config.enable_gpu else None + self.handles: List[torch.utils.hooks.RemovableHandle] = [] + self.logger = logging.getLogger(__name__) + self.device = torch.device("cuda" if torch.cuda.is_available() and config.enable_gpu else "cpu") + self.model.to(self.device) + + async def start_profiling(self) -> None: + """Start profiling by registering hooks.""" + for name, module in self.model.named_modules(): + if len(list(module.children())) == 0: # Only leaf modules + pre_handle = module.register_forward_pre_hook(self._pre_forward_hook) + post_handle = module.register_forward_hook(self._forward_hook) + self.handles.extend([pre_handle, post_handle]) + + await self.cpu_tracker.start() + if self.gpu_tracker: + await self.gpu_tracker.start() + + async def stop_profiling(self) -> None: + """Remove hooks after profiling.""" + for handle in self.handles: + handle.remove() + self.handles.clear() + + await self.cpu_tracker.stop() + if self.gpu_tracker: + await self.gpu_tracker.stop() + + async def _forward_hook(self, module: torch.nn.Module, input: Any, output: Any) -> None: + layer_name = f"{module.__class__.__name__}_{id(module)}" + self.time_tracker.stop(layer_name) + + try: + self.data[layer_name] = self.data.get(layer_name, {}) + self.data[layer_name]['time'] = self.time_tracker.get_duration(layer_name) + self.data[layer_name]['parameters'] = sum(p.numel() for p in module.parameters()) + + if config.enable_memory: + self.data[layer_name]['cpu_memory'] = await self.cpu_tracker.get_peak_memory() + if self.gpu_tracker: + self.data[layer_name]['gpu_memory'] = await self.gpu_tracker.get_peak_memory() + + if isinstance(output, torch.Tensor): + self.data[layer_name]['output_shape'] = list(output.shape) + elif isinstance(output, (list, tuple)) and all(isinstance(o, torch.Tensor) for o in output): + self.data[layer_name]['output_shape'] = [list(o.shape) for o in output] + except Exception as e: + self.logger.error(f"Error in forward hook for layer {layer_name}: {str(e)}") + + async def _pre_forward_hook(self, module: torch.nn.Module, input: Any) -> None: + layer_name = f"{module.__class__.__name__}_{id(module)}" + self.time_tracker.start(layer_name) + + async def profile_inference(self, input_data: torch.Tensor) -> Dict[str, Any]: + """Profile the inference process for a single input.""" + await self.start_profiling() + + try: + with torch.no_grad(): + start_time = torch.cuda.Event(enable_timing=True) + end_time = torch.cuda.Event(enable_timing=True) + + start_time.record() + output = self.model(input_data.to(self.device)) + end_time.record() + + torch.cuda.synchronize() + inference_time = start_time.elapsed_time(end_time) / 1000 # Convert to seconds + + profiling_data = self.data.copy() + profiling_data['inference_time'] = inference_time + + return profiling_data + finally: + await self.stop_profiling() + + async def profile_training_step(self, input_data: torch.Tensor, target: torch.Tensor) -> Dict[str, Any]: + """Profile a single training step.""" + await self.start_profiling() + + try: + optimizer = config.get_optimizer(self.model.parameters()) + loss_function = config.get_loss_function() + + if optimizer is None or loss_function is None: + raise ValueError("Optimizer or loss function not properly configured") + + start_time = torch.cuda.Event(enable_timing=True) + end_time = torch.cuda.Event(enable_timing=True) + + start_time.record() + optimizer.zero_grad() + output = self.model(input_data.to(self.device)) + loss = loss_function(output, target.to(self.device)) + loss.backward() + optimizer.step() + end_time.record() + + torch.cuda.synchronize() + step_time = start_time.elapsed_time(end_time) / 1000 # Convert to seconds + + profiling_data = self.data.copy() + profiling_data['step_time'] = step_time + profiling_data['loss'] = loss.item() + + return profiling_data + finally: + await self.stop_profiling() + + async def profile_full_training(self, train_loader: torch.utils.data.DataLoader, num_epochs: Optional[int] = None) -> List[Dict[str, Any]]: + """Profile the full training process.""" + epochs_to_run = num_epochs or config.max_epochs + epoch_data = [] + + for epoch in range(epochs_to_run): + epoch_start_time = torch.cuda.Event(enable_timing=True) + epoch_end_time = torch.cuda.Event(enable_timing=True) + + epoch_start_time.record() + epoch_loss = 0.0 + batch_data = [] + + for batch_idx, (data, target) in enumerate(train_loader): + batch_profile = await self.profile_training_step(data, target) + epoch_loss += batch_profile['loss'] + batch_data.append(batch_profile) + + epoch_end_time.record() + torch.cuda.synchronize() + epoch_time = epoch_start_time.elapsed_time(epoch_end_time) / 1000 # Convert to seconds + + epoch_data.append({ + 'epoch': epoch, + 'epoch_time': epoch_time, + 'epoch_loss': epoch_loss / len(train_loader), + 'batch_data': batch_data + }) + + return epoch_data + + async def get_model_summary(self) -> Dict[str, Any]: + """Get a summary of the model architecture.""" + summary = {} + total_params = sum(p.numel() for p in self.model.parameters()) + trainable_params = sum(p.numel() for p in self.model.parameters() if p.requires_grad) + + summary['total_params'] = total_params + summary['trainable_params'] = trainable_params + summary['non_trainable_params'] = total_params - trainable_params + summary['model_size_mb'] = total_params * 4 / (1024 * 1024) # Assuming float32 + + return summary + + async def get_layer_info(self, layer_name: str) -> Dict[str, Any]: + """Get detailed information about a specific layer.""" + for name, module in self.model.named_modules(): + if name == layer_name: + return { + 'type': type(module).__name__, + 'parameters': sum(p.numel() for p in module.parameters()), + 'trainable_parameters': sum(p.numel() for p in module.parameters() if p.requires_grad), + 'input_shape': getattr(module, 'in_features', None) or getattr(module, 'in_channels', None), + 'output_shape': getattr(module, 'out_features', None) or getattr(module, 'out_channels', None), + 'activation': getattr(module, 'activation', None), + 'dropout_rate': getattr(module, 'p', None) if isinstance(module, torch.nn.Dropout) else None, + } + return {} + + async def profile_memory_usage(self) -> Dict[str, float]: + """Profile the memory usage of the model.""" + torch.cuda.empty_cache() + + def get_memory_usage(): + if torch.cuda.is_available(): + return torch.cuda.memory_allocated() / 1024**2 # Convert to MB + else: + import psutil + return psutil.Process().memory_info().rss / 1024**2 # Convert to MB + + initial_mem = get_memory_usage() + + # Profile memory usage during forward pass + dummy_input = torch.randn(1, *self.model.input_size).to(self.device) + self.model(dummy_input) + forward_mem = get_memory_usage() + + # Profile memory usage during backward pass + loss = self.model(dummy_input).sum() + loss.backward() + backward_mem = get_memory_usage() + + return { + 'initial_memory': initial_mem, + 'forward_pass_memory': forward_mem - initial_mem, + 'backward_pass_memory': backward_mem - forward_mem, + 'total_memory': backward_mem - initial_mem + } + + def get_flops(self) -> int: + """Calculate the total number of FLOPs for the model.""" + from torch.autograd import Variable + + flops = 0 + for module in self.model.modules(): + if isinstance(module, torch.nn.Conv2d): + flops += (2 * module.in_channels * module.out_channels * module.kernel_size[0] * module.kernel_size[1] - 1) * (module.input_size[2] * module.input_size[3] / (module.stride[0] * module.stride[1])) + elif isinstance(module, torch.nn.Linear): + flops += 2 * module.in_features * module.out_features - 1 + + return int(flops) + + async def visualize_model(self, output_path: str) -> None: + """Generate a visual representation of the model architecture.""" + from torchviz import make_dot + + dummy_input = torch.randn(1, *self.model.input_size).to(self.device) + y = self.model(dummy_input) + + dot = make_dot(y, params=dict(self.model.named_parameters())) + dot.render(output_path, format='png') + self.logger.info(f"Model visualization saved to {output_path}.png") + + async def export_onnx(self, output_path: str) -> None: + """Export the model to ONNX format.""" + dummy_input = torch.randn(1, *self.model.input_size).to(self.device) + torch.onnx.export(self.model, dummy_input, output_path, verbose=True) + self.logger.info(f"Model exported to ONNX format at {output_path}") + + async def export_model(self, path: str, format: str) -> None: + """Export the model to a specified format.""" + if format.lower() == 'onnx': + await self.export_onnx(path) + elif format.lower() == 'torchscript': + script_module = torch.jit.script(self.model) + torch.jit.save(script_module, path) + self.logger.info(f"Model exported to TorchScript format at {path}") + else: + raise ValueError(f"Unsupported export format: {format}") + + async def get_current_time(self) -> float: + """Get the current time in a high-precision format.""" + return time.perf_counter() + + async def get_optimizer_info(self) -> Dict[str, Any]: + """Get information about the current optimizer.""" + optimizer = getattr(self.model, 'optimizer', None) + if optimizer: + return { + 'name': optimizer.__class__.__name__, + 'learning_rate': optimizer.param_groups[0]['lr'], + 'parameters': {k: v for k, v in optimizer.defaults.items() if k != 'params'} + } + return {'error': 'No optimizer found'} + + async def get_loss_function_info(self) -> Dict[str, Any]: + """Get information about the current loss function.""" + loss_fn = getattr(self.model, 'loss_fn', None) + if loss_fn: + return { + 'name': loss_fn.__class__.__name__, + 'parameters': {k: v for k, v in loss_fn.__dict__.items() if not k.startswith('_')} + } + return {'error': 'No loss function found'} + + def __del__(self): + """Cleanup method to ensure all profiling is stopped.""" + asyncio.run(self.stop_profiling()) \ No newline at end of file diff --git a/memoraith/integration/tensorflow_adapter.py b/memoraith/integration/tensorflow_adapter.py new file mode 100644 index 0000000..2b3f4ef --- /dev/null +++ b/memoraith/integration/tensorflow_adapter.py @@ -0,0 +1,211 @@ +import tensorflow as tf +import time +from typing import Dict, Any, List +import logging +from .framework_adapter import FrameworkAdapter +from ..data_collection import TimeTracker, CPUMemoryTracker, GPUMemoryTracker +from ..config import config + +class TensorFlowAdapter(FrameworkAdapter): + """Advanced adapter for integrating with TensorFlow models.""" + + def __init__(self, model: tf.keras.Model): + super().__init__(model) + self.time_tracker = TimeTracker() + self.cpu_tracker = CPUMemoryTracker() + self.gpu_tracker = GPUMemoryTracker() if tf.test.is_built_with_cuda() and config.enable_gpu else None + self.original_call = None + self.logger = logging.getLogger(__name__) + + async def start_profiling(self) -> None: + """Start profiling by wrapping the model's call method.""" + self.original_call = self.model.call + self.model.call = self._wrapped_call + + await self.cpu_tracker.start() + if self.gpu_tracker: + await self.gpu_tracker.start() + await self.log_profiling_start() + + async def stop_profiling(self) -> None: + """Restore the original call method after profiling.""" + if self.original_call: + self.model.call = self.original_call + self.original_call = None + + await self.cpu_tracker.stop() + if self.gpu_tracker: + await self.gpu_tracker.stop() + await self.log_profiling_stop() + + async def _wrapped_call(self, *args, **kwargs): + """Wrapped call method for profiling each layer.""" + for layer in self.model.layers: + layer_name = f"{layer.__class__.__name__}_{id(layer)}" + self.time_tracker.start(layer_name) + output = layer(*args, **kwargs) + self.time_tracker.stop(layer_name) + + try: + self.data[layer_name] = self.data.get(layer_name, {}) + self.data[layer_name]['time'] = self.time_tracker.get_duration(layer_name) + self.data[layer_name]['parameters'] = layer.count_params() + + if config.enable_memory: + self.data[layer_name]['cpu_memory'] = await self.cpu_tracker.get_peak_memory() + if self.gpu_tracker: + self.data[layer_name]['gpu_memory'] = await self.gpu_tracker.get_peak_memory() + + if hasattr(output, 'shape'): + self.data[layer_name]['output_shape'] = output.shape.as_list() + except Exception as e: + self.logger.error(f"Error in _wrapped_call for layer {layer_name}: {str(e)}") + + args = (output,) + + return await self.original_call(*args, **kwargs) + + async def profile_inference(self, input_data: tf.Tensor) -> Dict[str, Any]: + """Profile the inference process for a single input.""" + await self.start_profiling() + try: + start_time = time.perf_counter() + output = self.model(input_data) + end_time = time.perf_counter() + + inference_time = end_time - start_time + profiling_data = self.data.copy() + profiling_data['inference_time'] = inference_time + + return profiling_data + finally: + await self.stop_profiling() + + async def profile_training_step(self, input_data: tf.Tensor, target: tf.Tensor) -> Dict[str, Any]: + """Profile a single training step.""" + await self.start_profiling() + try: + optimizer = self.model.optimizer + loss_function = self.model.loss + + if optimizer is None or loss_function is None: + raise ValueError("Optimizer or loss function not properly configured") + + start_time = time.perf_counter() + with tf.GradientTape() as tape: + predictions = self.model(input_data, training=True) + loss = loss_function(target, predictions) + gradients = tape.gradient(loss, self.model.trainable_variables) + optimizer.apply_gradients(zip(gradients, self.model.trainable_variables)) + end_time = time.perf_counter() + + step_time = end_time - start_time + profiling_data = self.data.copy() + profiling_data['step_time'] = step_time + profiling_data['loss'] = loss.numpy().item() + + return profiling_data + finally: + await self.stop_profiling() + + async def get_model_summary(self) -> Dict[str, Any]: + """Get a summary of the model architecture.""" + summary = {} + total_params = self.model.count_params() + trainable_params = sum(tf.keras.backend.count_params(w) for w in self.model.trainable_weights) + + summary['total_params'] = total_params + summary['trainable_params'] = trainable_params + summary['non_trainable_params'] = total_params - trainable_params + summary['model_size_mb'] = total_params * 4 / (1024 * 1024) # Assuming float32 + + return summary + + async def get_layer_info(self, layer_name: str) -> Dict[str, Any]: + """Get detailed information about a specific layer.""" + for layer in self.model.layers: + if layer.name == layer_name: + return { + 'type': type(layer).__name__, + 'parameters': layer.count_params(), + 'trainable_parameters': sum(tf.keras.backend.count_params(w) for w in layer.trainable_weights), + 'input_shape': layer.input_shape, + 'output_shape': layer.output_shape, + 'activation': layer.activation.__name__ if hasattr(layer, 'activation') else None, + } + return {} + + async def profile_memory_usage(self) -> Dict[str, float]: + """Profile the memory usage of the model.""" + tf.keras.backend.clear_session() + + initial_mem = tf.config.experimental.get_memory_info('GPU:0')['current'] if tf.test.is_gpu_available() else 0 + + # Profile memory usage during forward pass + dummy_input = tf.random.normal(shape=[1] + self.model.input_shape[1:]) + self.model(dummy_input) + forward_mem = tf.config.experimental.get_memory_info('GPU:0')['current'] if tf.test.is_gpu_available() else 0 + + # Profile memory usage during backward pass + with tf.GradientTape() as tape: + predictions = self.model(dummy_input) + loss = tf.reduce_mean(predictions) + _ = tape.gradient(loss, self.model.trainable_variables) + backward_mem = tf.config.experimental.get_memory_info('GPU:0')['current'] if tf.test.is_gpu_available() else 0 + + return { + 'initial_memory': initial_mem / (1024 * 1024), # Convert to MB + 'forward_pass_memory': (forward_mem - initial_mem) / (1024 * 1024), + 'backward_pass_memory': (backward_mem - forward_mem) / (1024 * 1024), + 'total_memory': (backward_mem - initial_mem) / (1024 * 1024) + } + + def get_flops(self) -> int: + """Calculate the total number of FLOPs for the model.""" + run_meta = tf.compat.v1.RunMetadata() + opts = tf.compat.v1.profiler.ProfileOptionBuilder.float_operation() + flops = tf.compat.v1.profiler.profile(tf.compat.v1.get_default_graph(), run_meta=run_meta, cmd='scope', options=opts) + return flops.total_float_ops + + async def get_current_time(self) -> float: + """Get the current time in a high-precision format.""" + return time.perf_counter() + + async def export_model(self, path: str, format: str) -> None: + """Export the model to a specified format.""" + if format.lower() == 'savedmodel': + tf.saved_model.save(self.model, path) + elif format.lower() == 'h5': + self.model.save(path, save_format='h5') + else: + raise ValueError(f"Unsupported export format: {format}") + self.logger.info(f"Model exported to {format} format at {path}") + + async def visualize_model(self, output_path: str) -> None: + """Generate a visual representation of the model architecture.""" + tf.keras.utils.plot_model(self.model, to_file=output_path, show_shapes=True, show_layer_names=True) + self.logger.info(f"Model visualization saved to {output_path}") + + async def get_optimizer_info(self) -> Dict[str, Any]: + """Get information about the current optimizer.""" + optimizer = self.model.optimizer + return { + 'name': optimizer.__class__.__name__, + 'learning_rate': float(tf.keras.backend.get_value(optimizer.lr)), + 'parameters': {k: v.numpy() for k, v in optimizer.get_config().items() if k != 'name'} + } + + async def get_loss_function_info(self) -> Dict[str, Any]: + """Get information about the current loss function.""" + loss = self.model.loss + if callable(loss): + return {'name': loss.__name__} + elif isinstance(loss, str): + return {'name': loss} + elif isinstance(loss, tf.keras.losses.Loss): + return { + 'name': loss.__class__.__name__, + 'parameters': loss.get_config() + } + else: + return {'error': 'Unknown loss type'} \ No newline at end of file diff --git a/memoraith/logging_config.py b/memoraith/logging_config.py new file mode 100644 index 0000000..049fa32 --- /dev/null +++ b/memoraith/logging_config.py @@ -0,0 +1,100 @@ +import logging +import sys +from typing import Optional +from pathlib import Path + +def setup_logging(log_level: int, log_file: Optional[str] = None, log_format: Optional[str] = None): + """ + Configure logging for Memoraith with enhanced features. + + Args: + log_level (int): The logging level to set + log_file (str, optional): Path to the log file. If None, logs to console only. + log_format (str, optional): Custom log format. If None, uses a default format. + """ + logger = logging.getLogger('memoraith') + logger.setLevel(log_level) + + # Use custom format if provided, otherwise use a default + if log_format is None: + log_format = '[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s' + formatter = logging.Formatter(log_format, datefmt='%Y-%m-%d %H:%M:%S') + + # Console handler + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + + # File handler (if log_file is provided) + if log_file: + file_handler = logging.FileHandler(log_file) + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + + # Suppress logs from other libraries + for lib in ['matplotlib', 'PIL', 'tensorflow', 'torch']: + logging.getLogger(lib).setLevel(logging.WARNING) + + logger.info("Logging initialized") + +def get_logger(name: str) -> logging.Logger: + """ + Get a logger with the given name. + + Args: + name (str): Name of the logger + + Returns: + logging.Logger: Logger instance + """ + return logging.getLogger(f"memoraith.{name}") + +def log_exception(logger: logging.Logger, exc: Exception, level: int = logging.ERROR): + """ + Log an exception with full traceback. + + Args: + logger (logging.Logger): Logger instance + exc (Exception): Exception to log + level (int): Logging level for the exception + """ + logger.log(level, "Exception occurred", exc_info=True) + +def create_log_directory(base_path: str) -> str: + """ + Create a directory for log files if it doesn't exist. + + Args: + base_path (str): Base path for creating the log directory + + Returns: + str: Path to the created log directory + """ + log_dir = Path(base_path) / "logs" + log_dir.mkdir(parents=True, exist_ok=True) + return str(log_dir) + +def set_log_level(logger: logging.Logger, level: int): + """ + Set the logging level for a specific logger. + + Args: + logger (logging.Logger): Logger instance + level (int): Logging level to set + """ + logger.setLevel(level) + +def add_file_handler(logger: logging.Logger, file_path: str, level: int = logging.DEBUG): + """ + Add a file handler to a logger. + + Args: + logger (logging.Logger): Logger instance + file_path (str): Path to the log file + level (int): Logging level for the file handler + """ + handler = logging.FileHandler(file_path) + handler.setLevel(level) + formatter = logging.Formatter('[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s') + handler.setFormatter(formatter) + logger.addHandler(handler) \ No newline at end of file diff --git a/memoraith/profiler.py b/memoraith/profiler.py new file mode 100644 index 0000000..4a6d549 --- /dev/null +++ b/memoraith/profiler.py @@ -0,0 +1,81 @@ +import functools +import logging +import asyncio +from typing import Callable, Any, Optional +from .config import config +from .logging_config import setup_logging +from .integration import get_framework_adapter +from .analysis import Analyzer +from .reporting import ReportGenerator +from .exceptions import MemoraithError +from .visualization.real_time_visualizer import RealTimeVisualizer + +def profile_model( + memory: bool = True, + computation: bool = True, + gpu: bool = False, + save_report: bool = True, + report_format: str = 'html', + real_time_viz: bool = False +) -> Callable: + """ + Decorator to profile a model's training or inference function. + + Args: + memory (bool): Enable memory profiling + computation (bool): Enable computation time profiling + gpu (bool): Enable GPU profiling + save_report (bool): Save the profiling report + report_format (str): Format of the saved report ('html' or 'pdf') + real_time_viz (bool): Enable real-time visualization + + Returns: + Callable: Decorated function + """ + def decorator(func: Callable) -> Callable: + @functools.wraps(func) + async def wrapper(*args: Any, **kwargs: Any) -> Any: + setup_logging(config.log_level) + logger = logging.getLogger('memoraith') + logger.info("Starting Memoraith Profiler...") + + config.enable_memory = memory + config.enable_time = computation + config.enable_gpu = gpu + + try: + model = kwargs.get('model') or args[0] + adapter = get_framework_adapter(model) + + visualizer = RealTimeVisualizer() if real_time_viz else None + + async with adapter: + if asyncio.iscoroutinefunction(func): + result = await func(*args, **kwargs) + else: + result = await asyncio.to_thread(func, *args, **kwargs) + + if visualizer: + await visualizer.update(adapter.data) + + analysis_results = await Analyzer(adapter.data).run_analysis() + + if save_report: + await ReportGenerator(analysis_results).generate(format=report_format) + + logger.info("Memoraith Profiling Completed.") + return result + + except MemoraithError as e: + logger.error(f"MemoraithError: {e}") + raise + except Exception as e: + logger.exception("An unexpected error occurred during profiling.") + raise + + return wrapper + return decorator + +def set_output_path(path: str) -> None: + """Set the output path for profiling reports.""" + config.set_output_path(path) \ No newline at end of file diff --git a/memoraith/reporting/__init__.py b/memoraith/reporting/__init__.py new file mode 100644 index 0000000..93f0bdf --- /dev/null +++ b/memoraith/reporting/__init__.py @@ -0,0 +1,3 @@ +from .report_generator import ReportGenerator +from .console_report import ConsoleReport +from .export_utils import save_report_as_pdf diff --git a/memoraith/reporting/console_report.py b/memoraith/reporting/console_report.py new file mode 100644 index 0000000..370cc5c --- /dev/null +++ b/memoraith/reporting/console_report.py @@ -0,0 +1,100 @@ +from typing import Dict, Any +import logging +from colorama import Fore, Style, init + +init(autoreset=True) # Initialize colorama + +class ConsoleReport: + """Generates a comprehensive console report from the analysis results.""" + + def __init__(self, analysis_results: Dict[str, Any]): + self.analysis_results = analysis_results + self.logger = logging.getLogger(__name__) + + def display(self) -> None: + """Display the report in the console with enhanced formatting and colors.""" + try: + print(f"\n{Fore.CYAN}{Style.BRIGHT}==== Memoraith Profiling Report ===={Style.RESET_ALL}") + + self._display_global_metrics() + self._display_top_consumers('time', 'Time Consumers', 'total_time', 's') + self._display_top_consumers('cpu_memory', 'CPU Memory Consumers', 'total_cpu_memory', 'MB') + self._display_top_consumers('gpu_memory', 'GPU Memory Consumers', 'total_gpu_memory', 'MB') + self._display_bottlenecks() + self._display_recommendations() + self._display_anomalies() + + print(f"\n{Fore.YELLOW}For detailed visualizations and interactive dashboard, please refer to the generated HTML report.") + except Exception as e: + self.logger.error(f"Error displaying console report: {str(e)}") + + def _display_global_metrics(self) -> None: + """Display global metrics.""" + print(f"\n{Fore.GREEN}{Style.BRIGHT}Global Metrics:{Style.RESET_ALL}") + global_metrics = self.analysis_results['metrics'].get('global', {}) + print(f"Total Time: {Fore.YELLOW}{global_metrics.get('total_time', 0):.4f} s{Style.RESET_ALL}") + print(f"Peak CPU Memory: {Fore.YELLOW}{global_metrics.get('peak_cpu_memory', 0):.2f} MB{Style.RESET_ALL}") + print(f"Peak GPU Memory: {Fore.YELLOW}{global_metrics.get('peak_gpu_memory', 0):.2f} MB{Style.RESET_ALL}") + print(f"Total Parameters: {Fore.YELLOW}{global_metrics.get('total_parameters', 0):,}{Style.RESET_ALL}") + if 'total_flops' in global_metrics: + print(f"Total FLOPs: {Fore.YELLOW}{global_metrics['total_flops']:,}{Style.RESET_ALL}") + + def _display_top_consumers(self, metric_type: str, title: str, metric_key: str, unit: str, top_n: int = 5) -> None: + """Display top consumers for a specific metric.""" + print(f"\n{Fore.GREEN}{Style.BRIGHT}Top {top_n} {title}:{Style.RESET_ALL}") + sorted_layers = sorted( + [(layer, metrics[metric_key]) for layer, metrics in self.analysis_results['metrics'].items() if isinstance(metrics, dict)], + key=lambda x: x[1], + reverse=True + )[:top_n] + + for layer, value in sorted_layers: + print(f"Layer: {Fore.CYAN}{layer}{Style.RESET_ALL}, {metric_type.capitalize()}: {Fore.YELLOW}{value:.4f} {unit}{Style.RESET_ALL}") + + def _display_bottlenecks(self) -> None: + """Display detected bottlenecks.""" + print(f"\n{Fore.GREEN}{Style.BRIGHT}Detected Bottlenecks:{Style.RESET_ALL}") + for bottleneck in self.analysis_results['bottlenecks']: + print(f"Layer: {Fore.CYAN}{bottleneck['layer']}{Style.RESET_ALL}, " + f"Type: {Fore.MAGENTA}{bottleneck['type']}{Style.RESET_ALL}, " + f"Value: {Fore.YELLOW}{bottleneck['value']:.4f}{Style.RESET_ALL}, " + f"Ratio: {Fore.YELLOW}{bottleneck['ratio']:.2%}{Style.RESET_ALL}") + + def _display_recommendations(self) -> None: + """Display optimization recommendations.""" + print(f"\n{Fore.GREEN}{Style.BRIGHT}Recommendations:{Style.RESET_ALL}") + for rec in self.analysis_results['recommendations']: + print(f"Layer: {Fore.CYAN}{rec['layer']}{Style.RESET_ALL}") + print(f"Recommendation: {Fore.YELLOW}{rec['recommendation']}{Style.RESET_ALL}") + print() + + def _display_anomalies(self) -> None: + """Display detected anomalies.""" + print(f"\n{Fore.GREEN}{Style.BRIGHT}Detected Anomalies:{Style.RESET_ALL}") + for anomaly in self.analysis_results['anomalies']: + print(f"Layer: {Fore.CYAN}{anomaly['layer']}{Style.RESET_ALL}, " + f"Type: {Fore.MAGENTA}{anomaly['type']}{Style.RESET_ALL}, " + f"Value: {Fore.YELLOW}{anomaly['value']:.4f}{Style.RESET_ALL}, " + f"Z-Score: {Fore.YELLOW}{anomaly.get('z_score', 'N/A'):.2f}{Style.RESET_ALL}") + + def _display_performance_score(self) -> None: + """Display the overall performance score.""" + if 'performance_score' in self.analysis_results: + score = self.analysis_results['performance_score'] + color = Fore.GREEN if score > 80 else (Fore.YELLOW if score > 60 else Fore.RED) + print(f"\n{Fore.GREEN}{Style.BRIGHT}Overall Performance Score:{Style.RESET_ALL}") + print(f"{color}{score:.2f}/100{Style.RESET_ALL}") + + def save_to_file(self, file_path: str) -> None: + """Save the console report to a text file.""" + try: + with open(file_path, 'w') as f: + # Redirect print output to the file + import sys + original_stdout = sys.stdout + sys.stdout = f + self.display() + sys.stdout = original_stdout + self.logger.info(f"Console report saved to {file_path}") + except Exception as e: + self.logger.error(f"Error saving console report to file: {str(e)}") \ No newline at end of file diff --git a/memoraith/reporting/export_utils.py b/memoraith/reporting/export_utils.py new file mode 100644 index 0000000..0deac91 --- /dev/null +++ b/memoraith/reporting/export_utils.py @@ -0,0 +1,100 @@ +from typing import Optional, Dict, Any +import pdfkit +import csv +import json +import logging +from pathlib import Path + +logger = logging.getLogger(__name__) + +def save_report_as_pdf(html_report_path: str, pdf_output_path: str, config: Optional[Dict[str, Any]] = None) -> None: + """ + Convert HTML report to PDF with enhanced error handling and logging. + + Args: + html_report_path (str): Path to the HTML report file + pdf_output_path (str): Path where the PDF report should be saved + config (Optional[Dict[str, Any]]): Configuration options for pdfkit + """ + options = { + 'page-size': 'A4', + 'margin-top': '0.75in', + 'margin-right': '0.75in', + 'margin-bottom': '0.75in', + 'margin-left': '0.75in', + 'encoding': "UTF-8", + 'no-outline': None, + 'enable-local-file-access': None + } + + if config: + options.update(config) + + try: + pdfkit.from_file(html_report_path, pdf_output_path, options=options) + logger.info(f"PDF report saved successfully at: {pdf_output_path}") + except OSError as e: + if 'wkhtmltopdf' in str(e): + logger.error("wkhtmltopdf is not installed or not found in the system PATH.") + logger.info("Please install wkhtmltopdf: https://wkhtmltopdf.org/downloads.html") + else: + logger.error(f"OS error occurred while generating PDF: {str(e)}") + except Exception as e: + logger.error(f"Unexpected error occurred while generating PDF: {str(e)}") + +def export_metrics_to_csv(metrics: Dict[str, Any], csv_output_path: str) -> None: + """ + Export metrics data to a CSV file. + + Args: + metrics (Dict[str, Any]): Metrics data to be exported + csv_output_path (str): Path where the CSV file should be saved + """ + try: + with open(csv_output_path, 'w', newline='') as csvfile: + fieldnames = ['layer', 'total_time', 'total_cpu_memory', 'total_gpu_memory', 'parameters'] + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + + writer.writeheader() + for layer, data in metrics.items(): + if isinstance(data, dict): + writer.writerow({ + 'layer': layer, + 'total_time': data.get('total_time', 'N/A'), + 'total_cpu_memory': data.get('total_cpu_memory', 'N/A'), + 'total_gpu_memory': data.get('total_gpu_memory', 'N/A'), + 'parameters': data.get('parameters', 'N/A') + }) + logger.info(f"Metrics exported to CSV: {csv_output_path}") + except Exception as e: + logger.error(f"Error exporting metrics to CSV: {str(e)}") + +def export_analysis_to_json(analysis_results: Dict[str, Any], json_output_path: str) -> None: + """ + Export the full analysis results to a JSON file. + + Args: + analysis_results (Dict[str, Any]): Analysis results to be exported + json_output_path (str): Path where the JSON file should be saved + """ + try: + with open(json_output_path, 'w') as jsonfile: + json.dump(analysis_results, jsonfile, indent=2) + logger.info(f"Analysis results exported to JSON: {json_output_path}") + except Exception as e: + logger.error(f"Error exporting analysis results to JSON: {str(e)}") + +def create_export_directory(base_path: str) -> str: + """ + Create a directory for exporting files if it doesn't exist. + + Args: + base_path (str): Base path for creating the export directory + + Returns: + str: Path to the created export directory + """ + export_dir = Path(base_path) / "memoraith_exports" + export_dir.mkdir(parents=True, exist_ok=True) + logger.info(f"Export directory created: {export_dir}") + return str(export_dir) \ No newline at end of file diff --git a/memoraith/reporting/report_generator.py b/memoraith/reporting/report_generator.py new file mode 100644 index 0000000..5fadf64 --- /dev/null +++ b/memoraith/reporting/report_generator.py @@ -0,0 +1,114 @@ +import os +import asyncio +from typing import Dict, Any +import aiofiles +import logging +from jinja2 import Environment, FileSystemLoader +from ..visualization import plot_memory_usage, plot_time_usage, generate_heatmap, InteractiveDashboard +from ..config import config +from .export_utils import save_report_as_pdf, export_metrics_to_csv, export_analysis_to_json, create_export_directory + +class ReportGenerator: + """Generates comprehensive reports from the analysis results with enhanced functionality.""" + + def __init__(self, analysis_results: Dict[str, Any]): + self.analysis_results = analysis_results + self.output_path = str(config.output_path) + self.logger = logging.getLogger(__name__) + self.export_dir = create_export_directory(self.output_path) + + async def generate(self, format: str = 'html') -> None: + """ + Generate the report and save it to the output path. + + Args: + format (str): The format of the report ('html' or 'pdf') + """ + try: + # Generate visualizations asynchronously + await asyncio.gather( + plot_memory_usage(self.analysis_results['metrics'], self.export_dir), + plot_time_usage(self.analysis_results['metrics'], self.export_dir), + generate_heatmap(self.analysis_results['metrics'], self.export_dir), + InteractiveDashboard(self.analysis_results['metrics']).generate(self.export_dir) + ) + + # Generate HTML report + html_content = self._generate_html_report() + html_path = os.path.join(self.export_dir, 'memoraith_report.html') + await self._save_html_report(html_content, html_path) + + # Generate PDF if requested + if format.lower() == 'pdf': + pdf_path = os.path.join(self.export_dir, 'memoraith_report.pdf') + await self._save_pdf_report(html_path, pdf_path) + + # Export additional data + await asyncio.gather( + self._export_metrics_csv(), + self._export_analysis_json() + ) + + self.logger.info(f"Report generation completed. Files saved in {self.export_dir}") + except Exception as e: + self.logger.error(f"Error during report generation: {str(e)}") + raise + + def _generate_html_report(self) -> str: + """Generate the HTML content for the report.""" + try: + template_dir = os.path.join(os.path.dirname(__file__), '..', 'templates') + env = Environment(loader=FileSystemLoader(template_dir)) + template = env.get_template('report_template.html') + return template.render( + analysis_results=self.analysis_results, + config=config, + memory_plot_path='memory_usage.png', + time_plot_path='time_usage.png', + heatmap_path='metrics_heatmap.png', + dashboard_path='interactive_dashboard.html' + ) + except Exception as e: + self.logger.error(f"Error generating HTML report: {str(e)}") + raise + + async def _save_html_report(self, content: str, file_path: str) -> None: + """Save the HTML report to a file.""" + try: + async with aiofiles.open(file_path, 'w', encoding='utf-8') as f: + await f.write(content) + self.logger.info(f"HTML report saved to {file_path}") + except Exception as e: + self.logger.error(f"Error saving HTML report: {str(e)}") + raise + + async def _save_pdf_report(self, html_path: str, pdf_path: str) -> None: + """Save the report as a PDF file.""" + try: + await asyncio.to_thread(save_report_as_pdf, html_path, pdf_path) + except Exception as e: + self.logger.error(f"Error saving PDF report: {str(e)}") + raise + + async def _export_metrics_csv(self) -> None: + """Export metrics to a CSV file.""" + csv_path = os.path.join(self.export_dir, 'metrics.csv') + await asyncio.to_thread(export_metrics_to_csv, self.analysis_results['metrics'], csv_path) + + async def _export_analysis_json(self) -> None: + """Export the full analysis results to a JSON file.""" + json_path = os.path.join(self.export_dir, 'analysis_results.json') + await asyncio.to_thread(export_analysis_to_json, self.analysis_results, json_path) + + def get_report_files(self) -> Dict[str, str]: + """Get a dictionary of generated report files and their paths.""" + return { + 'html_report': os.path.join(self.export_dir, 'memoraith_report.html'), + 'pdf_report': os.path.join(self.export_dir, 'memoraith_report.pdf'), + 'metrics_csv': os.path.join(self.export_dir, 'metrics.csv'), + 'analysis_json': os.path.join(self.export_dir, 'analysis_results.json'), + 'memory_plot': os.path.join(self.export_dir, 'memory_usage.png'), + 'time_plot': os.path.join(self.export_dir, 'time_usage.png'), + 'heatmap': os.path.join(self.export_dir, 'metrics_heatmap.png'), + 'interactive_dashboard': os.path.join(self.export_dir, 'interactive_dashboard.html') + } \ No newline at end of file diff --git a/memoraith/templates/report_template.html b/memoraith/templates/report_template.html new file mode 100644 index 0000000..77f47a0 --- /dev/null +++ b/memoraith/templates/report_template.html @@ -0,0 +1 @@ + diff --git a/memoraith/visualization/__init__.py b/memoraith/visualization/__init__.py new file mode 100644 index 0000000..60244d0 --- /dev/null +++ b/memoraith/visualization/__init__.py @@ -0,0 +1,8 @@ +from .plot_memory import plot_memory_usage +from .plot_time import plot_time_usage +from .heatmap import generate_heatmap +from .interactive_dashboard import InteractiveDashboard + + +def config(): + return None \ No newline at end of file diff --git a/memoraith/visualization/heatmap.py b/memoraith/visualization/heatmap.py new file mode 100644 index 0000000..1347c7b --- /dev/null +++ b/memoraith/visualization/heatmap.py @@ -0,0 +1,60 @@ +import seaborn as sns +import matplotlib.pyplot as plt +import pandas as pd +import numpy as np +from typing import Dict, Any +import logging + +logger = logging.getLogger(__name__) + +def generate_heatmap(metrics: Dict[str, Any], output_path: str, figsize: tuple = (12, 8), cmap: str = 'viridis', annot: bool = True) -> None: + """ + Generate an enhanced heatmap showing memory and time intensity. + + Args: + metrics (Dict[str, Any]): Calculated metrics for each layer + output_path (str): Path to save the generated heatmap + figsize (tuple): Figure size (width, height) in inches + cmap (str): Colormap for the heatmap + annot (bool): Whether to annotate each cell with the value + """ + try: + data = [] + for layer, layer_metrics in metrics.items(): + if isinstance(layer_metrics, dict): + data.append({ + 'Layer': layer, + 'CPU Memory (MB)': layer_metrics.get('total_cpu_memory', 0), + 'GPU Memory (MB)': layer_metrics.get('total_gpu_memory', 0), + 'Time (s)': layer_metrics.get('total_time', 0) + }) + + df = pd.DataFrame(data) + df.set_index('Layer', inplace=True) + + # Normalize data for better visualization + for column in df.columns: + df[column] = (df[column] - df[column].min()) / (df[column].max() - df[column].min()) + + plt.figure(figsize=figsize) + ax = sns.heatmap(df, annot=annot, cmap=cmap, fmt='.2f', + linewidths=0.5, cbar_kws={'label': 'Normalized Intensity'}) + + plt.title('Layer Metrics Heatmap (Normalized)', fontsize=16) + plt.xlabel('Metrics', fontsize=12) + plt.ylabel('Layers', fontsize=12) + + # Rotate x-axis labels for better readability + plt.xticks(rotation=45, ha='right') + + # Adjust layout to prevent clipping of labels + plt.tight_layout() + + # Save the figure + plt.savefig(f"{output_path}/metrics_heatmap.png", dpi=300, bbox_inches='tight') + plt.close() + + logger.info(f"Heatmap generated and saved to {output_path}/metrics_heatmap.png") + except Exception as e: + logger.error(f"Error generating heatmap: {str(e)}") + raise \ No newline at end of file diff --git a/memoraith/visualization/interactive_dashboard.py b/memoraith/visualization/interactive_dashboard.py new file mode 100644 index 0000000..50a43ac --- /dev/null +++ b/memoraith/visualization/interactive_dashboard.py @@ -0,0 +1,110 @@ +import plotly.graph_objects as go +from plotly.subplots import make_subplots +import pandas as pd +from typing import Dict, Any +import logging + +class InteractiveDashboard: + """ + Generates a comprehensive interactive dashboard for exploring profiling data. + """ + + def __init__(self, metrics: Dict[str, Any]): + self.metrics = metrics + self.logger = logging.getLogger(__name__) + + def generate(self, output_path: str) -> None: + """ + Generate an advanced interactive dashboard and save it as an HTML file. + + Args: + output_path (str): Path to save the generated dashboard + """ + try: + df = pd.DataFrame.from_dict(self.metrics, orient='index') + df.reset_index(inplace=True) + df.rename(columns={'index': 'Layer'}, inplace=True) + + fig = make_subplots( + rows=3, cols=2, + subplot_titles=( + "Memory Usage", "Computation Time", + "Parameters Count", "Layer Efficiency", + "Memory vs Time", "Cumulative Metrics" + ), + specs=[[{"type": "bar"}, {"type": "bar"}], + [{"type": "bar"}, {"type": "scatter"}], + [{"type": "scatter"}, {"type": "scatter"}]] + ) + + # Memory Usage + fig.add_trace(go.Bar(x=df['Layer'], y=df['total_cpu_memory'], name='CPU Memory'), + row=1, col=1) + fig.add_trace(go.Bar(x=df['Layer'], y=df['total_gpu_memory'], name='GPU Memory'), + row=1, col=1) + + # Computation Time + fig.add_trace(go.Bar(x=df['Layer'], y=df['total_time'], name='Time'), + row=1, col=2) + + # Parameters Count + fig.add_trace(go.Bar(x=df['Layer'], y=df['parameters'], name='Parameters'), + row=2, col=1) + + # Layer Efficiency + efficiency = df['parameters'] / df['total_time'] + fig.add_trace(go.Scatter(x=df['Layer'], y=efficiency, mode='lines+markers', name='Efficiency'), + row=2, col=2) + + # Memory vs Time Scatter + fig.add_trace(go.Scatter(x=df['total_time'], y=df['total_cpu_memory'], mode='markers', name='CPU Memory vs Time'), + row=3, col=1) + fig.add_trace(go.Scatter(x=df['total_time'], y=df['total_gpu_memory'], mode='markers', name='GPU Memory vs Time'), + row=3, col=1) + + # Cumulative Metrics + df_sorted = df.sort_values('total_time') + df_sorted['cumulative_time'] = df_sorted['total_time'].cumsum() + df_sorted['cumulative_memory'] = df_sorted['total_cpu_memory'].cumsum() + fig.add_trace(go.Scatter(x=df_sorted['Layer'], y=df_sorted['cumulative_time'], mode='lines', name='Cumulative Time'), + row=3, col=2) + fig.add_trace(go.Scatter(x=df_sorted['Layer'], y=df_sorted['cumulative_memory'], mode='lines', name='Cumulative Memory'), + row=3, col=2) + + fig.update_layout(height=1200, width=1600, title_text="Memoraith Advanced Profiling Results") + fig.write_html(f"{output_path}/interactive_dashboard.html") + self.logger.info(f"Interactive dashboard generated and saved to {output_path}/interactive_dashboard.html") + except Exception as e: + self.logger.error(f"Error generating interactive dashboard: {str(e)}") + + def generate_layer_comparison(self, output_path: str) -> None: + """ + Generate a separate dashboard for layer-by-layer comparison. + + Args: + output_path (str): Path to save the generated dashboard + """ + try: + df = pd.DataFrame.from_dict(self.metrics, orient='index') + df.reset_index(inplace=True) + df.rename(columns={'index': 'Layer'}, inplace=True) + + fig = go.Figure() + + for metric in ['total_cpu_memory', 'total_gpu_memory', 'total_time', 'parameters']: + fig.add_trace(go.Bar(x=df['Layer'], y=df[metric], name=metric)) + + fig.update_layout( + barmode='group', + height=600, + width=1200, + title_text="Layer-by-Layer Comparison", + xaxis_title="Layers", + yaxis_title="Metric Value (log scale)", + yaxis_type="log" + ) + + fig.write_html(f"{output_path}/layer_comparison.html") + self.logger.info(f"Layer comparison dashboard generated and saved to {output_path}/layer_comparison.html") + except Exception as e: + self.logger.error(f"Error generating layer comparison dashboard: {str(e)}") \ No newline at end of file diff --git a/memoraith/visualization/plot_memory.py b/memoraith/visualization/plot_memory.py new file mode 100644 index 0000000..6b11ed8 --- /dev/null +++ b/memoraith/visualization/plot_memory.py @@ -0,0 +1,67 @@ +import matplotlib.pyplot as plt +import numpy as np +from typing import Dict, Any +import logging + +logger = logging.getLogger(__name__) + +def plot_memory_usage(metrics: Dict[str, Any], output_path: str, figsize: tuple = (12, 6), colors: tuple = ('skyblue', 'orange')) -> None: + """ + Generate an enhanced bar chart for memory usage per layer. + + Args: + metrics (Dict[str, Any]): Calculated metrics for each layer + output_path (str): Path to save the generated plot + figsize (tuple): Figure size (width, height) in inches + colors (tuple): Colors for CPU and GPU memory bars + """ + try: + layers = [] + cpu_memory = [] + gpu_memory = [] + + for layer, layer_metrics in metrics.items(): + if isinstance(layer_metrics, dict): + layers.append(layer) + cpu_memory.append(layer_metrics.get('total_cpu_memory', 0)) + gpu_memory.append(layer_metrics.get('total_gpu_memory', 0)) + + fig, ax = plt.subplots(figsize=figsize) + + x = np.arange(len(layers)) + bar_width = 0.35 + + cpu_bars = ax.bar(x - bar_width/2, cpu_memory, bar_width, label='CPU Memory', color=colors[0]) + gpu_bars = ax.bar(x + bar_width/2, gpu_memory, bar_width, label='GPU Memory', color=colors[1]) + + ax.set_xlabel('Layers', fontsize=12) + ax.set_ylabel('Memory Usage (MB)', fontsize=12) + ax.set_title('Memory Usage per Layer', fontsize=16) + ax.set_xticks(x) + ax.set_xticklabels(layers, rotation=45, ha='right') + ax.legend() + + # Add value labels on top of bars + def autolabel(rects): + for rect in rects: + height = rect.get_height() + ax.annotate(f'{height:.1f}', + xy=(rect.get_x() + rect.get_width() / 2, height), + xytext=(0, 3), # 3 points vertical offset + textcoords="offset points", + ha='center', va='bottom', rotation=90) + + autolabel(cpu_bars) + autolabel(gpu_bars) + + # Adjust layout to prevent clipping of labels + plt.tight_layout() + + # Save the figure + plt.savefig(f"{output_path}/memory_usage.png", dpi=300, bbox_inches='tight') + plt.close() + + logger.info(f"Memory usage plot generated and saved to {output_path}/memory_usage.png") + except Exception as e: + logger.error(f"Error generating memory usage plot: {str(e)}") + raise \ No newline at end of file diff --git a/memoraith/visualization/plot_time.py b/memoraith/visualization/plot_time.py new file mode 100644 index 0000000..f08c2aa --- /dev/null +++ b/memoraith/visualization/plot_time.py @@ -0,0 +1,63 @@ +import matplotlib.pyplot as plt +import numpy as np +from typing import Dict, Any +import logging + +logger = logging.getLogger(__name__) + +def plot_time_usage(metrics: Dict[str, Any], output_path: str, figsize: tuple = (12, 6), color: str = 'salmon') -> None: + """ + Generate an enhanced bar chart for computation time per layer. + + Args: + metrics (Dict[str, Any]): Calculated metrics for each layer + output_path (str): Path to save the generated plot + figsize (tuple): Figure size (width, height) in inches + color (str): Color for the time usage bars + """ + try: + layers = [] + times = [] + + for layer, layer_metrics in metrics.items(): + if isinstance(layer_metrics, dict): + layers.append(layer) + times.append(layer_metrics.get('total_time', 0)) + + fig, ax = plt.subplots(figsize=figsize) + + bars = ax.barh(layers, times, color=color) + + ax.set_xlabel('Computation Time (s)', fontsize=12) + ax.set_ylabel('Layers', fontsize=12) + ax.set_title('Computation Time per Layer', fontsize=16) + + # Add value labels at the end of bars + for i, v in enumerate(times): + ax.text(v, i, f' {v:.4f}s', va='center') + + # Highlight the top 3 time-consuming layers + top_3_indices = np.argsort(times)[-3:] + for i in top_3_indices: + bars[i].set_color('red') + bars[i].set_alpha(0.8) + + # Add a text box with summary statistics + total_time = sum(times) + avg_time = np.mean(times) + textstr = f'Total Time: {total_time:.4f}s\nAvg Time: {avg_time:.4f}s' + props = dict(boxstyle='round', facecolor='wheat', alpha=0.5) + ax.text(0.05, 0.95, textstr, transform=ax.transAxes, fontsize=10, + verticalalignment='top', bbox=props) + + # Adjust layout to prevent clipping of labels + plt.tight_layout() + + # Save the figure + plt.savefig(f"{output_path}/time_usage.png", dpi=300, bbox_inches='tight') + plt.close() + + logger.info(f"Time usage plot generated and saved to {output_path}/time_usage.png") + except Exception as e: + logger.error(f"Error generating time usage plot: {str(e)}") + raise \ No newline at end of file diff --git a/memoraith/visualization/real_time_visualizer.py b/memoraith/visualization/real_time_visualizer.py new file mode 100644 index 0000000..dcbe677 --- /dev/null +++ b/memoraith/visualization/real_time_visualizer.py @@ -0,0 +1,85 @@ +import asyncio +import matplotlib.pyplot as plt +from matplotlib.animation import FuncAnimation +from typing import Dict, Any +import numpy as np + +class RealTimeVisualizer: + """Real-time visualization of profiling data.""" + + def __init__(self): + self.fig, (self.ax1, self.ax2) = plt.subplots(2, 1, figsize=(10, 10)) + self.memory_data = {} + self.time_data = {} + plt.ion() # Turn on interactive mode + self.fig.show() + + async def update(self, data: Dict[str, Any]) -> None: + """Update the visualization with new data.""" + for layer, layer_data in data.items(): + self.memory_data[layer] = layer_data.get('cpu_memory', 0) + self.time_data[layer] = layer_data.get('time', 0) + + await self._draw() + + async def _draw(self) -> None: + """Draw the updated visualization.""" + self.ax1.clear() + self.ax2.clear() + + layers = list(self.memory_data.keys()) + memory_values = list(self.memory_data.values()) + time_values = list(self.time_data.values()) + + self.ax1.barh(layers, memory_values) + self.ax1.set_xlabel('Memory Usage (MB)') + self.ax1.set_title('Real-time Memory Usage by Layer') + + self.ax2.barh(layers, time_values) + self.ax2.set_xlabel('Computation Time (s)') + self.ax2.set_title('Real-time Computation Time by Layer') + + plt.tight_layout() + await asyncio.to_thread(self.fig.canvas.draw) + await asyncio.to_thread(self.fig.canvas.flush_events) + + def close(self): + """Close the visualization window.""" + plt.close(self.fig) + + async def add_data(self, memory, time): + self.memory_data.append(memory) + self.time_data.append(time) + + async def stop(self): + if self.animation: + self.animation.event_source.stop() + plt.close(self.fig) + +# Usage +async def main(): + visualizer = RealTimeVisualizer() + await visualizer.start() + + for _ in range(100): + await visualizer.add_data(np.random.rand() * 100, np.random.rand()) + await asyncio.sleep(0.1) + + await visualizer.stop() + +if __name__ == "__main__": + asyncio.run(main()) + + # Usage + async def main(): + visualizer = RealTimeVisualizer() + await visualizer.start() + + for _ in range(100): + await visualizer.add_data(np.random.rand() * 100, np.random.rand()) + await asyncio.sleep(0.1) + + await visualizer.stop() + + if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/problems.py b/problems.py new file mode 100644 index 0000000..ad35e6b --- /dev/null +++ b/problems.py @@ -0,0 +1,98 @@ +import os +import subprocess +import json +from pathlib import Path + +def run_pylint(project_dir): + """ + Runs pylint on the specified project directory and returns the JSON output. + """ + try: + # Run pylint with JSON output + result = subprocess.run( + ['pylint', project_dir, '--output-format=json'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + check=False # Don't raise exception on non-zero exit + ) + + if result.stderr: + print("Pylint encountered an error:") + print(result.stderr) + # Continue processing even if pylint reports errors (like syntax errors) + + # Parse JSON output + pylint_output = json.loads(result.stdout) + return pylint_output + + except FileNotFoundError: + print("Pylint is not installed or not found in the system PATH.") + return None + except json.JSONDecodeError: + print("Failed to parse pylint output. Ensure pylint is producing valid JSON.") + return None + +def extract_errors(pylint_output): + """ + Extracts only error and fatal issues from pylint output. + + Args: + pylint_output (list): The JSON-parsed output from pylint. + + Returns: + list: Filtered list of error issues. + """ + error_issues = [ + { + 'File': issue.get('path', ''), + 'Line': issue.get('line', ''), + 'Column': issue.get('column', ''), + 'Symbol': issue.get('symbol', ''), + 'Message': issue.get('message', ''), + 'Type': issue.get('type', '') + } + for issue in pylint_output + if issue.get('type', '').lower() in ['error', 'fatal'] and issue.get('message-id', '').startswith(('E', 'F')) + ] + + return error_issues + +def main(): + # Define your project directory + project_dir = Path(r'C:\Users\PC\Desktop\Leo-Major\Memoraith') + + if not project_dir.exists(): + print(f"The directory {project_dir} does not exist.") + return + + print(f"Running pylint on {project_dir}...") + + pylint_output = run_pylint(str(project_dir)) + + if pylint_output is None: + print("No pylint output to process.") + return + + relevant_errors = extract_errors(pylint_output) + + print("\n=== Pylint Errors ===") + if relevant_errors: + for issue in relevant_errors: + print(f"{issue['File']}:{issue['Line']}:{issue['Column']} - {issue['Message']} [{issue['Symbol']}] ({issue['Type'].capitalize()})") + else: + print("No errors found.") + + # Optionally, save the results to a file + save_results = True # Set to False if you don't want to save + if save_results: + errors_file = project_dir / 'pylint_errors.txt' + + with open(errors_file, 'w', encoding='utf-8') as f: + for issue in relevant_errors: + f.write(f"{issue['File']}:{issue['Line']}:{issue['Column']} - {issue['Message']} [{issue['Symbol']}] ({issue['Type'].capitalize()})\n") + + print(f"\nErrors saved to {errors_file}") + +if __name__ == "__main__": + main() diff --git a/pylint_errors.txt b/pylint_errors.txt new file mode 100644 index 0000000..b489e55 --- /dev/null +++ b/pylint_errors.txt @@ -0,0 +1,2 @@ +__init__.py:1:0 - error while code parsing: Unable to load file C:\Users\PC\Desktop\Leo-Major\Memoraith\__init__.py: +[Errno 2] No such file or directory: 'C:\\Users\\PC\\Desktop\\Leo-Major\\Memoraith\\__init__.py' [parse-error] (Fatal) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..627d528 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,18 @@ +torch>=1.7.0 +tensorflow>=2.4.0 +matplotlib>=3.3.0 +seaborn>=0.11.0 +plotly>=4.14.0 +pandas>=1.2.0 +jinja2>=2.11.0 +pdfkit>=0.6.0 +psutil>=5.8.0 +pynvml>=8.0.0 +colorama>=0.4.4 +tqdm>=4.60.0 +pytest>=6.2.0 +black>=20.8b1 +isort>=5.7.0 +flake8>=3.8.0 +mypy>=0.800 +pdfkit==1.0.0 \ No newline at end of file diff --git a/ressource_cleaner.py b/ressource_cleaner.py new file mode 100644 index 0000000..7068147 --- /dev/null +++ b/ressource_cleaner.py @@ -0,0 +1,29 @@ +class ResourceManager: + def __init__(self): + self.resources = [] + + def acquire_resource(self, resource): + self.resources.append(resource) + + def release_resources(self): + for resource in self.resources: + try: + resource.close() + except Exception as e: + print(f"Error releasing resource: {e}") + self.resources.clear() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.release_resources() + +# Usage +with ResourceManager() as manager: + resource1 = open('file1.txt', 'r') + manager.acquire_resource(resource1) + resource2 = open('file2.txt', 'r') + manager.acquire_resource(resource2) + +# Resources are automatically released when exiting the context \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..a0e29ed --- /dev/null +++ b/setup.py @@ -0,0 +1,45 @@ +from setuptools import setup, find_packages + +with open('README.md', encoding='utf-8') as f: + long_description = f.read() + +setup( + name="memoraith", + version="0.1.0", + author="Your Name", + author_email="your.email@example.com", + description="A lightweight model profiler for deep learning frameworks", + long_description=long_description, + long_description_content_type="text/markdown", + url="https://github.com/yourusername/memoraith", + packages=find_packages(), + classifiers=[ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + ], + python_requires='>=3.7', + install_requires=[ + 'torch>=1.7.0', + 'tensorflow>=2.4.0', + 'matplotlib>=3.3.0', + 'seaborn>=0.11.0', + 'pdfkit>=0.6.0', + 'aiofiles>=0.6.0', + 'jinja2>=2.11.0', + 'pynvml>=8.0.0', + ], + extras_require={ + 'dev': [ + 'pytest>=6.2.0', + 'pytest-asyncio>=0.14.0', + 'black>=20.8b1', + 'isort>=5.7.0', + 'flake8>=3.8.0', + ], + }, +) \ No newline at end of file diff --git a/testdata.py b/testdata.py new file mode 100644 index 0000000..f8e0466 --- /dev/null +++ b/testdata.py @@ -0,0 +1,91 @@ +import os +import re + +def generate_project_structure(directory, indent_level=0): + structure = "" + for root, dirs, files in os.walk(directory): + if 'venv' in root: + continue + + level = root.replace(directory, '').count(os.sep) + indent = '│ ' * (level - indent_level) + structure += f"{indent}├── {os.path.basename(root)}/\n" + sub_indent = '│ ' * (level + 1 - indent_level) + for file in files: + structure += f"{sub_indent}├── {file}\n" + dirs[:] = [d for d in dirs if d != 'venv'] # Skip venv directory + + return structure + +def extract_classes_and_methods(content): + # Regular expressions to capture class and method definitions in Python files + class_regex = r'class\s+(\w+)\s*(\(.*?\))?:' + method_regex = r'def\s+(\w+)\s*\(.*?\):' + + extracted_content = "" + class_matches = re.findall(class_regex, content) + + for class_match in class_matches: + class_name = class_match[0] + extracted_content += f"\nClass: {class_name}\n" + extracted_content += "-" * 80 + "\n" + + # Now, extract methods inside the class + method_matches = re.findall(method_regex, content) + for method_match in method_matches: + extracted_content += f" Method: {method_match}\n" + + return extracted_content + +def read_files_recursively(directory): + content = "" + for root, dirs, files in os.walk(directory): + if 'venv' in root: + continue + + for file in files: + file_path = os.path.join(root, file) + print(f"Processing file: {file_path}") + content += f"File: {file_path}\n\n" + try: + # Attempt to read every file as a text file + with open(file_path, 'r', encoding='utf-8') as f: + file_content = f.read() + content += file_content + + # If it's a Python file, extract class and method definitions + if file.endswith('.py'): + extracted_classes_methods = extract_classes_and_methods(file_content) + content += extracted_classes_methods + + except UnicodeDecodeError: + try: + with open(file_path, 'r', encoding='ISO-8859-1') as f: + file_content = f.read() + content += file_content + except Exception as e: + content += f"Error reading file: {e}" + content += "\n\n" + "-"*80 + "\n\n" + return content + +def save_content_to_txt(directory, output_file): + print("Starting the process...") + project_structure = generate_project_structure(directory) + file_content = read_files_recursively(directory) + with open(output_file, 'w', encoding='utf-8') as f: + f.write("Project Structure:\n\n") + f.write(project_structure) + f.write("\n\n" + "="*80 + "\n\n") + f.write(file_content) + print("Process completed successfully.") + +# Usage +project_directory = r"C:\Users\PC\Desktop\Leo-Major\Memoraith" +output_file = r"C:\Users\PC\Desktop\Leo-Major\projetoo_content.txt" + +try: + save_content_to_txt(project_directory, output_file) +except PermissionError: + print("Permission denied. Please check your write permissions or choose a different output location.") +except Exception as e: + print(f"An error occurred: {e}") diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_analyzer.py b/tests/test_analyzer.py new file mode 100644 index 0000000..dc0503f --- /dev/null +++ b/tests/test_analyzer.py @@ -0,0 +1,27 @@ +import unittest +from memoraith.analysis.analyzer import Analyzer + +class TestAnalyzer(unittest.TestCase): + def setUp(self): + self.sample_data = { + 'layer1': {'total_time': 0.1, 'total_cpu_memory': 100, 'total_gpu_memory': 200}, + 'layer2': {'total_time': 0.2, 'total_cpu_memory': 150, 'total_gpu_memory': 250}, + } + self.analyzer = Analyzer(self.sample_data) + + async def test_run_analysis(self): + results = await self.analyzer.run_analysis() + self.assertIn('metrics', results) + self.assertIn('bottlenecks', results) + self.assertIn('anomalies', results) + self.assertIn('recommendations', results) + + async def test_get_layer_analysis(self): + layer_analysis = await self.analyzer.get_layer_analysis('layer1') + self.assertIn('metrics', layer_analysis) + self.assertIn('bottlenecks', layer_analysis) + self.assertIn('anomalies', layer_analysis) + self.assertIn('recommendations', layer_analysis) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/test_profiler.py b/tests/test_profiler.py new file mode 100644 index 0000000..fffd7d9 --- /dev/null +++ b/tests/test_profiler.py @@ -0,0 +1,57 @@ +import unittest +import asyncio +import torch +import torch.nn as nn +import tensorflow as tf +from memoraith import profile_model, set_output_path +from memoraith.config import Config +from memoraith.data_collection.gpu_memory import PYNVML_AVAILABLE + +class TestProfiler(unittest.TestCase): + def setUp(self): + set_output_path('test_profiling_results/') + + class SimpleModel(nn.Module): + def __init__(self): + super(SimpleModel, self).__init__() + self.fc = nn.Linear(10, 5) + + def forward(self, x): + return self.fc(x) + + self.model = SimpleModel() + + @profile_model() + async def dummy_train(self, model): + optimizer = torch.optim.SGD(model.parameters(), lr=0.01) + for _ in range(10): + input_data = torch.randn(32, 10) + output = model(input_data) + loss = output.sum() + loss.backward() + optimizer.step() + + @unittest.skipIf(not PYNVML_AVAILABLE, "PYNVML not available") + async def test_profiler_decorator(self): + try: + await self.dummy_train(self.model) + except Exception as e: + self.fail(f"Profiler decorator raised an exception: {e}") + + async def test_output_files(self): + import os + await self.dummy_train(self.model) + self.assertTrue(os.path.exists('test_profiling_results/memoraith_report.html')) + self.assertTrue(os.path.exists('test_profiling_results/memory_usage.png')) + self.assertTrue(os.path.exists('test_profiling_results/time_usage.png')) + self.assertTrue(os.path.exists('test_profiling_results/metrics_heatmap.png')) + self.assertTrue(os.path.exists('test_profiling_results/interactive_dashboard.html')) + + def test_config_loading(self): + config = Config() + config.load_from_file('test_config.yaml') + self.assertEqual(config.enable_gpu, True) + self.assertEqual(config.log_level, 'DEBUG') + +if __name__ == '__main__': + unittest.main() diff --git a/testu.py b/testu.py new file mode 100644 index 0000000..74866e0 --- /dev/null +++ b/testu.py @@ -0,0 +1,371 @@ +import unittest +import asyncio +import tempfile +import os +import torch +import torch.nn as nn +import tensorflow as tf +from unittest.mock import patch, MagicMock +import pytest +import logging +import time +from memoraith import profile_model, set_output_path +from memoraith.config import Config +from memoraith.exceptions import MemoraithError, FrameworkNotSupportedError +from memoraith.logging_config import setup_logging, get_logger +from memoraith.analysis.analyzer import Analyzer +from memoraith.analysis.anomaly_detection import AnomalyDetector +from memoraith.analysis.bottleneck import BottleneckDetector +from memoraith.analysis.metrics import MetricsCalculator +from memoraith.analysis.recommendations import RecommendationEngine +from memoraith.data_collection.cpu_memory import CPUMemoryTracker +from memoraith.data_collection.gpu_memory import GPUMemoryTracker, PYNVML_AVAILABLE +from memoraith.data_collection.time_tracking import TimeTracker +from memoraith.data_collection.resource_lock import ResourceLock +from memoraith.integration.common_utils import identify_framework, get_model_structure, estimate_model_size +from memoraith.integration.framework_adapter import FrameworkAdapter +from memoraith.integration.pytorch_adapter import PyTorchAdapter +from memoraith.integration.tensorflow_adapter import TensorFlowAdapter +from memoraith.reporting.console_report import ConsoleReport +from memoraith.reporting.export_utils import save_report_as_pdf, export_metrics_to_csv, export_analysis_to_json +from memoraith.reporting.report_generator import ReportGenerator +from memoraith.visualization.heatmap import generate_heatmap +from memoraith.visualization.interactive_dashboard import InteractiveDashboard +from memoraith.visualization.plot_memory import plot_memory_usage +from memoraith.visualization.plot_time import plot_time_usage +from memoraith.visualization.real_time_visualizer import RealTimeVisualizer + +@pytest.mark.asyncio +class UnifiedMemoraithTests(unittest.TestCase): + async def asyncSetUp(self): + set_output_path('test_profiling_results/') + self.config = Config() + + class SimpleModel(nn.Module): + def __init__(self): + super(SimpleModel, self).__init__() + self.fc = nn.Linear(10, 5) + + def forward(self, x): + return self.fc(x) + + self.model = SimpleModel() + + self.sample_data = { + 'layer1': {'total_time': 0.1, 'total_cpu_memory': 100, 'total_gpu_memory': 200}, + 'layer2': {'total_time': 0.2, 'total_cpu_memory': 150, 'total_gpu_memory': 250}, + } + self.analyzer = Analyzer(self.sample_data) + + self.analysis_results = { + 'metrics': {'global': {'total_time': 1.0, 'peak_cpu_memory': 100, 'peak_gpu_memory': 200}}, + 'bottlenecks': [{'layer': 'layer1', 'type': 'time', 'value': 0.5, 'ratio': 0.5}], + 'anomalies': [{'layer': 'layer2', 'type': 'memory', 'value': 1000, 'z_score': 3.0}], + 'recommendations': [{'layer': 'layer3', 'recommendation': 'Optimize this layer'}] + } + + def setUp(self): + self.loop = asyncio.get_event_loop() + self.loop.run_until_complete(self.asyncSetUp()) + + async def test_setup_logging(self): + with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_file: + setup_logging(logging.DEBUG, temp_file.name) + logger = get_logger("test") + logger.debug("Test log message") + temp_file.close() + + await asyncio.sleep(0.1) + + try: + with open(temp_file.name, 'r') as f: + content = f.read() + self.assertIn("Test log message", content) + finally: + try: + os.unlink(temp_file.name) + except PermissionError: + pass + + async def test_load_from_env(self): + with patch.dict('os.environ', {'MEMORAITH_ENABLE_GPU': 'true', 'MEMORAITH_LOG_LEVEL': 'DEBUG'}): + self.config.load_from_env() + self.assertTrue(self.config.enable_gpu) + self.assertEqual(self.config.log_level, 'DEBUG') + + async def test_load_from_file(self): + with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_file: + temp_file.write("enable_gpu: true\nlog_level: DEBUG") + temp_file.flush() + self.config.load_from_file(temp_file.name) + self.assertTrue(self.config.enable_gpu) + self.assertEqual(self.config.log_level, 'DEBUG') + os.unlink(temp_file.name) + + async def test_get_optimizer(self): + mock_params = [torch.nn.Parameter(torch.randn(10, 10))] + optimizer = self.config.get_optimizer(mock_params) + self.assertIsNotNone(optimizer) + self.assertIsInstance(optimizer, torch.optim.Optimizer) + + async def test_get_loss_function(self): + loss_function = self.config.get_loss_function() + self.assertIsNotNone(loss_function) + + async def test_memoraith_error(self): + with self.assertRaises(MemoraithError): + raise MemoraithError("Test error") + + async def test_framework_not_supported_error(self): + with self.assertRaises(FrameworkNotSupportedError): + raise FrameworkNotSupportedError("Test framework") + + async def test_run_analysis(self): + results = await self.analyzer.run_analysis() + self.assertIn('metrics', results) + self.assertIn('bottlenecks', results) + self.assertIn('anomalies', results) + self.assertIn('recommendations', results) + + async def test_get_layer_analysis(self): + layer_analysis = await self.analyzer.get_layer_analysis('layer1') + self.assertIn('metrics', layer_analysis) + self.assertIn('bottlenecks', layer_analysis) + self.assertIn('anomalies', layer_analysis) + self.assertIn('recommendations', layer_analysis) + + async def test_anomaly_detect(self): + detector = AnomalyDetector() + metrics = { + 'layer1': {'total_cpu_memory': 100, 'total_gpu_memory': 200, 'total_time': 0.1}, + 'layer2': {'total_cpu_memory': 1000, 'total_gpu_memory': 2000, 'total_time': 1.0}, + } + anomalies = await detector.detect(metrics) + self.assertGreater(len(anomalies), 0) + + async def test_bottleneck_detect(self): + detector = BottleneckDetector() + metrics = { + 'layer1': {'total_time': 0.1, 'total_cpu_memory': 100}, + 'layer2': {'total_time': 1.0, 'total_cpu_memory': 1000}, + } + bottlenecks = await detector.detect(metrics) + self.assertGreater(len(bottlenecks), 0) + + async def test_metrics_calculate(self): + data = { + 'layer1': {'time': 0.1, 'cpu_memory': 100, 'gpu_memory': 200, 'parameters': 1000}, + 'layer2': {'time': 0.2, 'cpu_memory': 150, 'gpu_memory': 250, 'parameters': 2000}, + } + calculator = MetricsCalculator(data) + metrics = await calculator.calculate() + self.assertIn('layer1', metrics) + self.assertIn('layer2', metrics) + self.assertIn('global', metrics) + + async def test_recommendation_generate(self): + engine = RecommendationEngine() + metrics = { + 'layer1': {'total_time': 0.1, 'total_cpu_memory': 100, 'parameters': 1000}, + 'layer2': {'total_time': 1.0, 'total_cpu_memory': 1000, 'parameters': 1000000}, + } + recommendations = await engine.generate(metrics) + self.assertGreater(len(recommendations), 0) + + async def test_cpu_memory_tracker(self): + tracker = CPUMemoryTracker() + tracker.start() + await asyncio.sleep(0.1) + tracker.stop() + peak_memory = tracker.get_peak_memory() + self.assertIsInstance(peak_memory, dict) + + @unittest.skipIf(not PYNVML_AVAILABLE, "PYNVML not available") + async def test_gpu_memory_tracker(self): + tracker = GPUMemoryTracker() + await tracker.start() + await asyncio.sleep(0.1) + await tracker.stop() + peak_memory = await tracker.get_peak_memory() + self.assertIsInstance(peak_memory, float) + + async def test_time_tracker(self): + tracker = TimeTracker() + tracker.start('test_op') + await asyncio.sleep(0.1) + tracker.stop('test_op') + duration = tracker.get_duration('test_op') + self.assertIsInstance(duration, float) + + async def test_resource_lock(self): + lock = ResourceLock('test_lock') + acquired = lock.acquire() + self.assertTrue(acquired) + lock.release() + + async def test_identify_framework(self): + model = nn.Linear(10, 5) + framework = identify_framework(model) + self.assertEqual(framework, 'pytorch') + + async def test_get_model_structure(self): + model = nn.Sequential(nn.Linear(10, 5), nn.ReLU()) + structure = get_model_structure(model) + self.assertIsInstance(structure, dict) + + async def test_estimate_model_size(self): + model = nn.Linear(10, 5) + size = estimate_model_size(model) + self.assertIn('parameters', size) + + async def test_framework_adapter_abstract_methods(self): + with self.assertRaises(TypeError): + FrameworkAdapter() + + async def test_pytorch_adapter(self): + model = nn.Linear(10, 5) + adapter = PyTorchAdapter(model) + await adapter.start_profiling() + await adapter.stop_profiling() + input_data = torch.randn(1, 10) + result = await adapter.profile_inference(input_data) + self.assertIn('inference_time', result) + + async def test_tensorflow_adapter(self): + model = tf.keras.Sequential([tf.keras.layers.Dense(5, input_shape=(10,))]) + adapter = TensorFlowAdapter(model) + await adapter.start_profiling() + await adapter.stop_profiling() + input_data = tf.random.normal((1, 10)) + result = await adapter.profile_inference(input_data) + self.assertIn('inference_time', result) + + async def test_console_report(self): + report = ConsoleReport(self.analysis_results) + with patch('builtins.print') as mock_print: + report.display() + mock_print.assert_called() + + async def test_save_report_as_pdf(self): + with patch('pdfkit.from_file') as mock_from_file: + save_report_as_pdf('input.html', 'output.pdf') + mock_from_file.assert_called_once() + + async def test_export_metrics_to_csv(self): + metrics = {'layer1': {'total_time': 0.1, 'total_cpu_memory': 100}} + with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_file: + export_metrics_to_csv(metrics, temp_file.name) + with open(temp_file.name, 'r') as f: + content = f.read() + self.assertIn('layer1', content) + os.unlink(temp_file.name) + + async def test_export_analysis_to_json(self): + analysis = {'metrics': {'layer1': {'total_time': 0.1}}} + with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_file: + export_analysis_to_json(analysis, temp_file.name) + with open(temp_file.name, 'r') as f: + content = f.read() + self.assertIn('metrics', content) + os.unlink(temp_file.name) + + async def test_report_generator(self): + with patch('memoraith.reporting.report_generator.plot_memory_usage') as mock_memory, \ + patch('memoraith.reporting.report_generator.plot_time_usage') as mock_time, \ + patch('memoraith.reporting.report_generator.generate_heatmap') as mock_heatmap, \ + patch('memoraith.reporting.report_generator.InteractiveDashboard') as mock_dashboard: + + generator = ReportGenerator(self.analysis_results) + with tempfile.TemporaryDirectory() as tmpdir: + generator.output_path = tmpdir + await generator.generate() + mock_memory.assert_called_once() + mock_time.assert_called_once() + mock_heatmap.assert_called_once() + mock_dashboard.return_value.generate.assert_called_once() + + async def test_generate_heatmap(self): + metrics = { + 'layer1': {'total_cpu_memory': 100, 'total_gpu_memory': 200, 'total_time': 0.1}, + 'layer2': {'total_cpu_memory': 150, 'total_gpu_memory': 250, 'total_time': 0.2}, + } + with tempfile.TemporaryDirectory() as tmpdir: + generate_heatmap(metrics, tmpdir) + self.assertTrue(os.path.exists(os.path.join(tmpdir, 'metrics_heatmap.png'))) + + async def test_interactive_dashboard(self): + metrics = { + 'layer1': {'total_cpu_memory': 100, 'total_gpu_memory': 200, 'total_time': 0.1, 'parameters': 1000}, + 'layer2': {'total_cpu_memory': 150, 'total_gpu_memory': 250, 'total_time': 0.2, 'parameters': 2000}, + } + dashboard = InteractiveDashboard(metrics) + with tempfile.TemporaryDirectory() as tmpdir: + dashboard.generate(tmpdir) + self.assertTrue(os.path.exists(os.path.join(tmpdir, 'interactive_dashboard.html'))) + + async def test_plot_memory_usage(self): + metrics = { + 'layer1': {'total_cpu_memory': 100, 'total_gpu_memory': 200}, + 'layer2': {'total_cpu_memory': 150, 'total_gpu_memory': 250}, + } + with tempfile.TemporaryDirectory() as tmpdir: + plot_memory_usage(metrics, tmpdir) + self.assertTrue(os.path.exists(os.path.join(tmpdir, 'memory_usage.png'))) + + async def test_plot_time_usage(self): + metrics = { + 'layer1': {'total_time': 0.1}, + 'layer2': {'total_time': 0.2}, + } + with tempfile.TemporaryDirectory() as tmpdir: + plot_time_usage(metrics, tmpdir) + self.assertTrue(os.path.exists(os.path.join(tmpdir, 'time_usage.png'))) + + async def test_real_time_visualizer(self): + visualizer = RealTimeVisualizer() + data = { + 'layer1': {'cpu_memory': 100, 'time': 0.1}, + 'layer2': {'cpu_memory': 150, 'time': 0.2}, + } + with patch('matplotlib.pyplot.show') as mock_show: + await visualizer.update(data) + mock_show.assert_called() + + async def test_real_time_visualizer_close(self): + visualizer = RealTimeVisualizer() + with patch('matplotlib.pyplot.close') as mock_close: + visualizer.close() + mock_close.assert_called_once() + + @profile_model() + async def dummy_train(self, model): + optimizer = torch.optim.SGD(model.parameters(), lr=0.01) + for _ in range(10): + input_data = torch.randn(32, 10) + output = model(input_data) + loss = output.sum() + loss.backward() + optimizer.step() + + async def test_profiler_decorator(self): + try: + await self.dummy_train(self.model) + except Exception as e: + self.fail(f"Profiler decorator raised an exception: {e}") + + async def test_output_files(self): + await self.dummy_train(self.model) + self.assertTrue(os.path.exists('test_profiling_results/memoraith_report.html')) + self.assertTrue(os.path.exists('test_profiling_results/memory_usage.png')) + self.assertTrue(os.path.exists('test_profiling_results/time_usage.png')) + self.assertTrue(os.path.exists('test_profiling_results/metrics_heatmap.png')) + self.assertTrue(os.path.exists('test_profiling_results/interactive_dashboard.html')) + + async def test_config_loading(self): + config = Config() + config.load_from_file('test_config.yaml') + self.assertEqual(config.enable_gpu, True) + self.assertEqual(config.log_level, 'DEBUG') + +if __name__ == '__main__': + pytest.main([__file__]) \ No newline at end of file diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..1597174 --- /dev/null +++ b/tox.ini @@ -0,0 +1,25 @@ +[tox] +envlist = py37, py38, py39, flake8, mypy +isolated_build = true + +[testenv] +deps = + pytest>=6.2.0 + -r requirements.txt +commands = + pytest tests + +[testenv:flake8] +deps = flake8 +commands = flake8 memoraith tests + +[testenv:mypy] +deps = mypy +commands = mypy memoraith + +[flake8] +max-line-length = 120 +exclude = .tox,*.egg,build,data + +[mypy] +ignore_missing_imports = True