From 02848eb8f6a80dab9baeea6e4ff74f7314b36708 Mon Sep 17 00:00:00 2001 From: Arham-Nasir Date: Thu, 26 Dec 2024 12:23:49 +0500 Subject: [PATCH] Add show commands for memory statistics and configuration Signed-off-by: Arham-Nasir --- show/memory_statistics.py | 432 ++++++++++++++++ tests/show_memory_statistics_test.py | 732 +++++++++++++++++++++++++++ 2 files changed, 1164 insertions(+) create mode 100644 show/memory_statistics.py create mode 100644 tests/show_memory_statistics_test.py diff --git a/show/memory_statistics.py b/show/memory_statistics.py new file mode 100644 index 0000000000..aff83f5a58 --- /dev/null +++ b/show/memory_statistics.py @@ -0,0 +1,432 @@ +# Standard library imports +import json +import os +import signal +import socket +import sys +import syslog +import time +from typing import Any, Dict, Union + +# Third-party library imports +import click +from dataclasses import dataclass + +# Local application/library imports +from swsscommon.swsscommon import ConfigDBConnector + + +@dataclass +class Config: + SOCKET_PATH: str = '/var/run/dbus/memstats.socket' + SOCKET_TIMEOUT: int = 30 + BUFFER_SIZE: int = 8192 + MAX_RETRIES: int = 3 + RETRY_DELAY: float = 1.0 + + DEFAULT_CONFIG = { + "enabled": "false", + "sampling_interval": "5", + "retention_period": "15" + } + + +class ConnectionError(Exception): + """Custom exception for connection-related errors.""" + pass + + +class Dict2Obj: + """Converts dictionaries or lists into objects with attribute-style access.""" + def __init__(self, d: Union[Dict[str, Any], list]) -> None: + if not isinstance(d, (dict, list)): + raise ValueError("Input should be a dictionary or a list") + + if isinstance(d, dict): + for key, value in d.items(): + if isinstance(value, (list, tuple)): + setattr( + self, + key, + [Dict2Obj(x) if isinstance(x, dict) else x for x in value], + ) + else: + setattr( + self, key, Dict2Obj(value) if isinstance(value, dict) else value + ) + elif isinstance(d, list): + self.items = [Dict2Obj(x) if isinstance(x, dict) else x for x in d] + + def to_dict(self) -> Dict[str, Any]: + """Converts the object back to a dictionary format.""" + result = {} + if hasattr(self, "items"): + return [x.to_dict() if isinstance(x, Dict2Obj) else x for x in self.items] + + for key in self.__dict__: + value = getattr(self, key) + if isinstance(value, Dict2Obj): + result[key] = value.to_dict() + elif isinstance(value, list): + result[key] = [v.to_dict() if isinstance(v, Dict2Obj) else v for v in value] + else: + result[key] = value + return result + + def __repr__(self) -> str: + """Provides a string representation of the object for debugging.""" + return f"<{self.__class__.__name__} {self.to_dict()}>" + + +class SonicDBConnector: + """Handles interactions with SONiC's configuration database with improved connection handling.""" + def __init__(self) -> None: + """Initialize the database connector with retry mechanism.""" + self.config_db = ConfigDBConnector() + self.connect_with_retry() + + def connect_with_retry(self, max_retries: int = 3, retry_delay: float = 1.0) -> None: + """ + Attempts to connect to the database with a retry mechanism. + """ + retries = 0 + last_error = None + + while retries < max_retries: + try: + self.config_db.connect() + syslog.syslog(syslog.LOG_INFO, "Successfully connected to SONiC config database") + return + except Exception as e: + last_error = e + retries += 1 + if retries < max_retries: + syslog.syslog(syslog.LOG_WARNING, + f"Failed to connect to database" + f"(attempt {retries}/{max_retries}): {str(e)}") + time.sleep(retry_delay) + + error_msg = ( + f"Failed to connect to SONiC config database after {max_retries} attempts. " + f"Last error: {str(last_error)}" + ) + syslog.syslog(syslog.LOG_ERR, error_msg) + raise ConnectionError(error_msg) + + def get_memory_statistics_config(self) -> Dict[str, str]: + """ + Retrieves memory statistics configuration with error handling. + """ + try: + config = self.config_db.get_table('MEMORY_STATISTICS') + if not isinstance(config, dict) or 'memory_statistics' not in config: + return Config.DEFAULT_CONFIG.copy() + + current_config = config.get('memory_statistics', {}) + if not isinstance(current_config, dict): + return Config.DEFAULT_CONFIG.copy() + + result_config = Config.DEFAULT_CONFIG.copy() + for key, value in current_config.items(): + if value is not None and value != "": + result_config[key] = value + + return result_config + + except Exception as e: + error_msg = f"Error retrieving memory statistics configuration: {str(e)}" + syslog.syslog(syslog.LOG_ERR, error_msg) + raise RuntimeError(error_msg) + + +class SocketManager: + """Manages Unix domain socket connections with improved reliability.""" + def __init__(self, socket_path: str = Config.SOCKET_PATH): + self.socket_path = socket_path + self.sock = None + self._validate_socket_path() + + def _validate_socket_path(self) -> None: + """Validates the socket path exists or can be created.""" + socket_dir = os.path.dirname(self.socket_path) + if not os.path.exists(socket_dir): + error_msg = f"Socket directory {socket_dir} does not exist" + syslog.syslog(syslog.LOG_ERR, error_msg) + raise ConnectionError(error_msg) + + def connect(self) -> None: + """Establishes socket connection with improved error handling.""" + retries = 0 + last_error = None + + while retries < Config.MAX_RETRIES: + try: + if self.sock: + self.close() + + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.sock.settimeout(Config.SOCKET_TIMEOUT) + self.sock.connect(self.socket_path) + syslog.syslog(syslog.LOG_INFO, "Successfully connected to memory statistics service") + return + except socket.error as e: + last_error = e + retries += 1 + if retries < Config.MAX_RETRIES: + syslog.syslog(syslog.LOG_WARNING, + f"Failed to connect to socket (attempt {retries}/{Config.MAX_RETRIES}): {str(e)}") + time.sleep(Config.RETRY_DELAY) + self.close() + + error_msg = ( + f"Failed to connect to memory statistics service after {Config.MAX_RETRIES} " + f"attempts. Last error: {str(last_error)}. " + f"Please verify that the service is running and socket file exists at {self.socket_path}" + ) + syslog.syslog(syslog.LOG_ERR, error_msg) + raise ConnectionError(error_msg) + + def receive_all(self) -> str: + """Receives all data with improved error handling.""" + if not self.sock: + raise ConnectionError("No active socket connection") + + chunks = [] + while True: + try: + chunk = self.sock.recv(Config.BUFFER_SIZE) + if not chunk: + break + chunks.append(chunk) + except socket.timeout: + error_msg = f"Socket operation timed out after {Config.SOCKET_TIMEOUT} seconds" + syslog.syslog(syslog.LOG_ERR, error_msg) + raise ConnectionError(error_msg) + except socket.error as e: + error_msg = f"Socket error during receive: {str(e)}" + syslog.syslog(syslog.LOG_ERR, error_msg) + raise ConnectionError(error_msg) + + return b''.join(chunks).decode('utf-8') + + def send(self, data: str) -> None: + """Sends data with improved error handling.""" + if not self.sock: + raise ConnectionError("No active socket connection") + + try: + self.sock.sendall(data.encode('utf-8')) + except socket.error as e: + error_msg = f"Failed to send data: {str(e)}" + syslog.syslog(syslog.LOG_ERR, error_msg) + raise ConnectionError(error_msg) + + def close(self) -> None: + """Closes the socket connection safely.""" + if self.sock: + try: + self.sock.close() + except Exception as e: + syslog.syslog(syslog.LOG_WARNING, f"Error closing socket: {str(e)}") + finally: + self.sock = None + + +def send_data(command: str, data: Dict[str, Any], quiet: bool = False) -> Dict2Obj: + """ + Sends a command and data to the memory statistics service. + + Time format for statistics retrieval are given below. + + - Relative time formats: + - 'X days ago', 'X hours ago', 'X minutes ago' + - 'yesterday', 'today' + - Specific times and dates: + - 'now' + - 'July 23', 'July 23, 2024', '2 November 2024' + - '7/24', '1/2' + - Time expressions: + - '2 am', '3:15 pm' + - 'Aug 01 06:43:40', 'July 1 3:00:00' + - Named months: + - 'jan', 'feb', 'march', 'september', etc. + - Full month names: 'January', 'February', 'March', etc. + - ISO 8601 format (e.g., '2024-07-01T15:00:00') + """ + socket_manager = SocketManager() + + try: + socket_manager.connect() + request = {"command": command, "data": data} + socket_manager.sock.sendall(json.dumps(request).encode('utf-8')) + + response = socket_manager.receive_all() + if not response: + raise ConnectionError("No response received from memory statistics service") + + try: + jdata = json.loads(response) + except json.JSONDecodeError as e: + error_msg = f"Failed to parse server response: {str(e)}" + syslog.syslog(syslog.LOG_ERR, error_msg) + raise ValueError(error_msg) + + if not isinstance(jdata, dict): + raise ValueError("Invalid response format from server") + + response_obj = Dict2Obj(jdata) + if not getattr(response_obj, 'status', True): + error_msg = getattr(response_obj, 'msg', 'Unknown error occurred') + raise RuntimeError(error_msg) + + return response_obj + + except Exception as e: + if not quiet: + click.echo(f"Error: {str(e)}", err=True) + raise + finally: + socket_manager.close() + + +def format_field_value(field_name: str, value: str) -> str: + """Formats configuration field values for display.""" + if field_name == "enabled": + return "True" if value.lower() == "true" else "False" + return value if value != "Unknown" else "Not configured" + + +def display_config(db_connector: SonicDBConnector) -> None: + """Displays memory statistics configuration.""" + try: + config = db_connector.get_memory_statistics_config() + enabled = format_field_value("enabled", config.get("enabled", "Unknown")) + retention = format_field_value("retention_period", config.get("retention_period", "Unknown")) + sampling = format_field_value("sampling_interval", config.get("sampling_interval", "Unknown")) + + click.echo(f"{'Configuration Field':<30}{'Value'}") + click.echo("-" * 50) + click.echo(f"{'Enabled':<30}{enabled}") + click.echo(f"{'Retention Time (days)':<30}{retention}") + click.echo(f"{'Sampling Interval (minutes)':<30}{sampling}") + except Exception as e: + error_msg = f"Failed to retrieve configuration: {str(e)}" + syslog.syslog(syslog.LOG_ERR, error_msg) + raise click.ClickException(error_msg) + + +@click.group() +def cli(): + """Main entry point for the SONiC Memory Statistics CLI.""" + pass + + +@cli.group() +def show(): + """Show commands for memory statistics.""" + pass + + +@show.command(name="memory-stats") +@click.option( + '--from', 'from_time', + help='Start time for memory statistics (e.g., "15 hours ago", "7 days ago", "ISO Format")' +) +@click.option( + '--to', 'to_time', + help='End time for memory statistics (e.g., "now", "ISO Format")' +) +@click.option( + '--select', 'select_metric', + help='Show statistics for specific metric (e.g., total_memory, used_memory)' +) +def show_statistics(from_time: str, to_time: str, select_metric: str): + """Display memory statistics.""" + try: + request_data = { + "type": "system", + "metric_name": select_metric, + "from": from_time, + "to": to_time + } + + response = send_data("memory_statistics_command_request_handler", request_data) + stats_data = response.to_dict() + + if isinstance(stats_data, dict): + memory_stats = stats_data.get("data", "") + if memory_stats: + cleaned_output = memory_stats.replace("\n", "\n").strip() + click.echo(f"Memory Statistics:\n{cleaned_output}") + else: + click.echo("No memory statistics data available.") + else: + click.echo("Error: Invalid data format received") + + except Exception as e: + click.echo(f"Error: {str(e)}", err=True) + sys.exit(1) + + +@show.command(name="memory-stats-config") +def show_configuration(): + """Display memory statistics configuration.""" + try: + db_connector = SonicDBConnector() + display_config(db_connector) + except Exception as e: + click.echo(f"Error: {str(e)}", err=True) + sys.exit(1) + + +def cleanup_resources(): + """Performs cleanup of resources before shutdown.""" + try: + if hasattr(cleanup_resources, 'db_connector'): + del cleanup_resources.db_connector + + if hasattr(cleanup_resources, 'socket_manager'): + cleanup_resources.socket_manager.close() + + syslog.syslog(syslog.LOG_INFO, "Successfully cleaned up resources during shutdown") + except Exception as e: + syslog.syslog(syslog.LOG_ERR, f"Error during cleanup: {str(e)}") + + +def shutdown_handler(signum: int, frame) -> None: + """ + Signal handler for graceful shutdown. + Handles SIGTERM signal with proper resource cleanup. + + Args: + signum: Signal number + frame: Current stack frame + """ + try: + syslog.syslog(syslog.LOG_INFO, "Received SIGTERM signal, initiating graceful shutdown...") + cleanup_resources() + click.echo("\nShutting down gracefully...") + sys.exit(0) + except Exception as e: + syslog.syslog(syslog.LOG_ERR, f"Error during shutdown: {str(e)}") + sys.exit(1) + + +def main(): + """Main entry point with enhanced error handling and shutdown management.""" + try: + signal.signal(signal.SIGTERM, shutdown_handler) + + cleanup_resources.db_connector = None + cleanup_resources.socket_manager = None + + cli() + except Exception as e: + syslog.syslog(syslog.LOG_ERR, f"Fatal error in main: {str(e)}") + click.echo(f"Error: {str(e)}", err=True) + cleanup_resources() + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/tests/show_memory_statistics_test.py b/tests/show_memory_statistics_test.py new file mode 100644 index 0000000000..621854556d --- /dev/null +++ b/tests/show_memory_statistics_test.py @@ -0,0 +1,732 @@ +import json +import os +import signal +import socket +import syslog +import unittest +from unittest.mock import MagicMock, Mock, patch + +import click +from click.testing import CliRunner +import pytest + +from show.memory_statistics import ( + Config, + ConnectionError, + Dict2Obj, + SonicDBConnector, + SocketManager, + cleanup_resources, + display_config, + format_field_value, + main, + send_data, + show_configuration, + show_statistics, + shutdown_handler, +) + + +class TestConfig(unittest.TestCase): + """Tests for Config class""" + + def test_default_config_values(self): + """Test that Config class has correct default values""" + self.assertEqual(Config.SOCKET_PATH, '/var/run/dbus/memstats.socket') + self.assertEqual(Config.SOCKET_TIMEOUT, 30) + self.assertEqual(Config.BUFFER_SIZE, 8192) + self.assertEqual(Config.MAX_RETRIES, 3) + self.assertEqual(Config.RETRY_DELAY, 1.0) + + def test_default_config_dictionary(self): + """Test the DEFAULT_CONFIG dictionary has correct values""" + expected = { + "enabled": "false", + "sampling_interval": "5", + "retention_period": "15" + } + self.assertEqual(Config.DEFAULT_CONFIG, expected) + + +class TestDict2Obj(unittest.TestCase): + """Tests for Dict2Obj class""" + + def test_dict_conversion(self): + """Test basic dictionary conversion""" + test_dict = {"name": "test", "value": 123} + obj = Dict2Obj(test_dict) + self.assertEqual(obj.name, "test") + self.assertEqual(obj.value, 123) + + def test_nested_dict_conversion(self): + """Test nested dictionary conversion""" + test_dict = { + "outer": { + "inner": "value", + "number": 42 + } + } + obj = Dict2Obj(test_dict) + self.assertEqual(obj.outer.inner, "value") + self.assertEqual(obj.outer.number, 42) + + def test_list_conversion(self): + """Test list conversion""" + test_list = [{"name": "item1"}, {"name": "item2"}] + obj = Dict2Obj(test_list) + self.assertEqual(obj.items[0].name, "item1") + self.assertEqual(obj.items[1].name, "item2") + + def test_invalid_input(self): + """Test invalid input handling""" + with self.assertRaises(ValueError): + Dict2Obj("invalid") + + def test_to_dict_conversion(self): + """Test conversion back to dictionary""" + original = {"name": "test", "nested": {"value": 123}} + obj = Dict2Obj(original) + result = obj.to_dict() + self.assertEqual(result, original) + + def test_nested_list_conversion(self): + """Test conversion of nested lists with dictionaries""" + test_dict = { + "items": [ + {"id": 1, "subitems": [{"name": "sub1"}, {"name": "sub2"}]}, + {"id": 2, "subitems": [{"name": "sub3"}, {"name": "sub4"}]} + ] + } + obj = Dict2Obj(test_dict) + self.assertEqual(obj.items[0].subitems[0].name, "sub1") + self.assertEqual(obj.items[1].subitems[1].name, "sub4") + + def test_empty_structures(self): + """Test conversion of empty structures""" + self.assertEqual(Dict2Obj({}).to_dict(), {}) + self.assertEqual(Dict2Obj([]).to_dict(), []) + + def test_complex_nested_structure(self): + """Test conversion of complex nested structures""" + test_dict = { + "level1": { + "level2": { + "level3": { + "value": 42, + "list": [1, 2, {"nested": "value"}] + } + } + } + } + obj = Dict2Obj(test_dict) + self.assertEqual(obj.level1.level2.level3.value, 42) + self.assertEqual(obj.level1.level2.level3.list[2].nested, "value") + + +class TestSonicDBConnector(unittest.TestCase): + """Tests for SonicDBConnector class""" + + @patch('show.memory_statistics.ConfigDBConnector') + def setUp(self, mock_config_db): + self.mock_config_db = mock_config_db + self.connector = SonicDBConnector() + self.mock_config_db.reset_mock() + + def test_successful_connection(self): + """Test successful database connection""" + self.mock_config_db.return_value.connect.return_value = None + self.connector.connect_with_retry() + self.mock_config_db.return_value.connect.assert_called_once() + + @patch('time.sleep') + def test_connection_retry(self, mock_sleep): + """Test connection retry mechanism""" + self.mock_config_db.return_value.connect.side_effect = [ + Exception("Connection failed"), + None + ] + self.connector.connect_with_retry(max_retries=2, retry_delay=0.1) + self.assertEqual(mock_sleep.call_count, 1) + self.assertEqual(self.mock_config_db.return_value.connect.call_count, 2) + + def test_connection_failure(self): + """Test connection failure after max retries""" + self.mock_config_db.return_value.connect.side_effect = Exception("Connection failed") + with self.assertRaises(ConnectionError): + self.connector.connect_with_retry(max_retries=1) + + def test_get_memory_statistics_config_success(self): + """Test successful config retrieval""" + expected_config = { + "memory_statistics": { + "enabled": "true", + "sampling_interval": "10", + "retention_period": "30" + } + } + self.mock_config_db.return_value.get_table.return_value = expected_config + result = self.connector.get_memory_statistics_config() + self.assertEqual(result["enabled"], "true") + self.assertEqual(result["sampling_interval"], "10") + self.assertEqual(result["retention_period"], "30") + + def test_get_memory_statistics_config_default(self): + """Test default config when table is empty""" + self.mock_config_db.return_value.get_table.return_value = {} + result = self.connector.get_memory_statistics_config() + self.assertEqual(result, Config.DEFAULT_CONFIG) + + def test_invalid_config_format(self): + """Test handling of invalid configuration format""" + self.mock_config_db.return_value.get_table.return_value = { + "memory_statistics": "invalid_string_instead_of_dict" + } + result = self.connector.get_memory_statistics_config() + self.assertEqual(result, Config.DEFAULT_CONFIG) + + def test_partial_config(self): + """Test handling of partial configuration""" + self.mock_config_db.return_value.get_table.return_value = { + "memory_statistics": { + "enabled": "true" + # missing other fields + } + } + result = self.connector.get_memory_statistics_config() + self.assertEqual(result["enabled"], "true") + self.assertEqual(result["sampling_interval"], "5") # default value + self.assertEqual(result["retention_period"], "15") # default value + + +class TestSocketManager(unittest.TestCase): + """Tests for SocketManager class""" + + def setUp(self): + self.test_socket_path = "/tmp/test_socket" + os.makedirs(os.path.dirname(self.test_socket_path), exist_ok=True) + self.socket_manager = SocketManager(self.test_socket_path) + + @patch('socket.socket') + def test_successful_connection(self, mock_socket): + """Test successful socket connection""" + mock_sock = Mock() + mock_socket.return_value = mock_sock + self.socket_manager.connect() + mock_sock.connect.assert_called_once_with(self.test_socket_path) + + @patch('socket.socket') + @patch('time.sleep') + def test_connection_retry(self, mock_sleep, mock_socket): + """Test socket connection retry mechanism""" + mock_sock = Mock() + mock_sock.connect.side_effect = [socket.error, None] + mock_socket.return_value = mock_sock + self.socket_manager.connect() + self.assertEqual(mock_sock.connect.call_count, 2) + + @patch('socket.socket') + def test_receive_all(self, mock_socket): + """Test receiving data from socket""" + mock_sock = Mock() + mock_sock.recv.side_effect = [b'test', b''] + mock_socket.return_value = mock_sock + self.socket_manager.sock = mock_sock + result = self.socket_manager.receive_all() + self.assertEqual(result, 'test') + + @patch('socket.socket') + def test_send_data(self, mock_socket): + """Test sending data through socket""" + mock_sock = Mock() + mock_socket.return_value = mock_sock + self.socket_manager.sock = mock_sock + self.socket_manager.send("test_data") + mock_sock.sendall.assert_called_once_with(b'test_data') + + def test_close_connection(self): + """Test closing socket connection""" + mock_sock = Mock() + self.socket_manager.sock = mock_sock + self.socket_manager.close() + mock_sock.close.assert_called_once() + self.assertIsNone(self.socket_manager.sock) + + def test_invalid_socket_path(self): + """Test socket creation with invalid path""" + with self.assertRaises(ConnectionError): + SocketManager("/nonexistent/path/socket") + + @patch('socket.socket') + def test_connection_max_retries_exceeded(self, mock_socket): + """Test connection failure after max retries""" + mock_sock = Mock() + mock_sock.connect.side_effect = socket.error("Connection failed") + mock_socket.return_value = mock_sock + + with self.assertRaises(ConnectionError) as ctx: + self.socket_manager.connect() + self.assertIn("Failed to connect to memory statistics service", str(ctx.exception)) + + @patch('socket.socket') + def test_receive_timeout(self, mock_socket): + """Test socket timeout during receive""" + mock_sock = Mock() + mock_sock.recv.side_effect = socket.timeout + self.socket_manager.sock = mock_sock + + with self.assertRaises(ConnectionError) as context: + self.socket_manager.receive_all() + self.assertIn("timed out", str(context.exception)) + + @patch('socket.socket') + def test_receive_with_socket_error(self, mock_socket): + """Test receive with socket error""" + mock_sock = Mock() + mock_sock.recv.side_effect = socket.error("Receive error") + self.socket_manager.sock = mock_sock + + with self.assertRaises(ConnectionError) as ctx: + self.socket_manager.receive_all() + self.assertIn("Socket error during receive", str(ctx.exception)) + + @patch('socket.socket') + def test_send_without_connection(self, mock_socket): + """Test sending data without an active connection""" + self.socket_manager.sock = None + with self.assertRaises(ConnectionError) as context: + self.socket_manager.send("test") + self.assertIn("No active socket connection", str(context.exception)) + + @patch('socket.socket') + def test_multiple_chunk_receive(self, mock_socket): + """Test receiving multiple chunks of data""" + mock_sock = Mock() + mock_sock.recv.side_effect = [b'chunk1', b'chunk2', b'chunk3', b''] + self.socket_manager.sock = mock_sock + result = self.socket_manager.receive_all() + self.assertEqual(result, 'chunk1chunk2chunk3') + + +class TestCLICommands(unittest.TestCase): + """Tests for CLI commands""" + + def setUp(self): + self.runner = CliRunner() + + @patch('show.memory_statistics.send_data') + def test_show_statistics(self, mock_send_data): + """Test show statistics command""" + mock_response = Dict2Obj({ + "status": True, + "data": "Memory Statistics Data" + }) + mock_send_data.return_value = mock_response + + result = self.runner.invoke(show_statistics, ['--from', '1h', '--to', 'now']) + self.assertEqual(result.exit_code, 0) + self.assertIn("Memory Statistics", result.output) + + @patch('show.memory_statistics.send_data') + def test_show_statistics_with_metric(self, mock_send_data): + """Test show statistics with specific metric""" + mock_response = Dict2Obj({ + "status": True, + "data": "Memory Usage: 75%" + }) + mock_send_data.return_value = mock_response + + result = self.runner.invoke(show_statistics, + ['--select', 'used_memory']) + self.assertEqual(result.exit_code, 0) + self.assertIn("Memory Usage", result.output) + + @patch('show.memory_statistics.send_data') + def test_show_statistics_error_handling(self, mock_send_data): + """Test error handling in show statistics""" + mock_send_data.side_effect = ConnectionError("Failed to connect") + + result = self.runner.invoke(show_statistics) + self.assertEqual(result.exit_code, 1) + self.assertIn("Error", result.output) + + @patch('show.memory_statistics.send_data') + def test_show_statistics_empty_data(self, mock_send): + """Test show_statistics with empty data""" + mock_send.return_value = Dict2Obj({"data": ""}) + result = self.runner.invoke(show_statistics) + self.assertIn("No memory statistics data available", result.output) + + +class TestShowConfiguration(unittest.TestCase): + """Tests for show_configuration command""" + + def setUp(self): + self.runner = CliRunner() + + @patch('show.memory_statistics.SonicDBConnector') + def test_show_config_error(self, mock_db): + """Test show_configuration error handling""" + mock_db.side_effect = Exception("DB Connection Error") + result = self.runner.invoke(show_configuration) + self.assertEqual(result.exit_code, 1) + self.assertIn("Error", result.output) + + +class TestErrorHandling(unittest.TestCase): + """Tests for error handling""" + + def test_cleanup_resources(self): + """Test resource cleanup""" + mock_db = Mock() + mock_socket = Mock() + cleanup_resources.db_connector = mock_db + cleanup_resources.socket_manager = mock_socket + cleanup_resources() + self.assertFalse(hasattr(cleanup_resources, 'db_connector')) + mock_socket.close.assert_called_once() + + @patch('sys.exit') + def test_shutdown_handler(self, mock_exit): + """Test shutdown handler""" + shutdown_handler(None, None) + mock_exit.assert_called_once_with(0) + + @patch('syslog.syslog') + def test_cleanup_with_exceptions(self, mock_syslog): + """Test cleanup with exceptions""" + mock_socket = Mock() + mock_socket.close.side_effect = Exception("Cleanup failed") + cleanup_resources.socket_manager = mock_socket + + cleanup_resources() + mock_syslog.assert_any_call(syslog.LOG_ERR, "Error during cleanup: Cleanup failed") + + @patch('syslog.syslog') + def test_cleanup_with_missing_attributes(self, mock_syslog): + """Test cleanup when attributes don't exist""" + # Ensure attributes don't exist + if hasattr(cleanup_resources, 'db_connector'): + delattr(cleanup_resources, 'db_connector') + if hasattr(cleanup_resources, 'socket_manager'): + delattr(cleanup_resources, 'socket_manager') + + cleanup_resources() + mock_syslog.assert_any_call(syslog.LOG_INFO, "Successfully cleaned up resources during shutdown") + + @patch('sys.exit') + @patch('syslog.syslog') + def test_shutdown_handler_cleanup_error(self, mock_syslog, mock_exit): + """Test shutdown handler with cleanup error""" + @patch('show.memory_statistics.cleanup_resources', side_effect=Exception("Cleanup Error")) + def test(mock_cleanup): + shutdown_handler(signal.SIGTERM, None) + mock_syslog.assert_any_call(syslog.LOG_ERR, "Error during shutdown: Cleanup Error") + mock_exit.assert_called_once_with(1) + test() + + +class TestHelperFunctions(unittest.TestCase): + """Tests for helper functions""" + + def test_format_field_value(self): + """Test field value formatting""" + self.assertEqual(format_field_value("enabled", "true"), "True") + self.assertEqual(format_field_value("enabled", "false"), "False") + self.assertEqual(format_field_value("retention_period", "15"), "15") + self.assertEqual(format_field_value("sampling_interval", "Unknown"), "Not configured") + + +class TestSendData(unittest.TestCase): + """Tests for send_data function""" + + @patch('show.memory_statistics.SocketManager') + def test_send_data_invalid_response(self, mock_socket_manager): + """Test send_data with invalid JSON response""" + mock_instance = Mock() + mock_socket_manager.return_value = mock_instance + mock_instance.receive_all.return_value = "invalid json" + + with self.assertRaises(ValueError): + send_data("test_command", {}) + + @patch('show.memory_statistics.SocketManager') + def test_send_data_non_dict_response(self, mock_socket_manager): + """Test send_data with non-dict response""" + mock_instance = Mock() + mock_socket_manager.return_value = mock_instance + mock_instance.receive_all.return_value = json.dumps(["not a dict"]) + + with self.assertRaises(ValueError): + send_data("test_command", {}) + + @patch('show.memory_statistics.SocketManager') + def test_successful_response_with_status(self, mock_socket_manager): + """Test successful response with status field""" + mock_instance = Mock() + mock_socket_manager.return_value = mock_instance + response_data = { + "status": True, + "data": "test data" + } + mock_instance.receive_all.return_value = json.dumps(response_data) + + result = send_data("test_command", {}) + self.assertTrue(result.status) + self.assertEqual(result.data, "test data") + + @patch('show.memory_statistics.SocketManager') + def test_response_without_status_field(self, mock_socket_manager): + """Test response without status field (should default to True)""" + mock_instance = Mock() + mock_socket_manager.return_value = mock_instance + response_data = { + "data": "test data" + } + mock_instance.receive_all.return_value = json.dumps(response_data) + + result = send_data("test_command", {}) + self.assertTrue(getattr(result, 'status', True)) + self.assertEqual(result.data, "test data") + + @patch('show.memory_statistics.SocketManager') + def test_failed_response_with_error_message(self, mock_socket_manager): + """Test response with status False and error message""" + mock_instance = Mock() + mock_socket_manager.return_value = mock_instance + response_data = { + "status": False, + "msg": "Operation failed" + } + mock_instance.receive_all.return_value = json.dumps(response_data) + + with self.assertRaises(RuntimeError) as context: + send_data("test_command", {}) + self.assertEqual(str(context.exception), "Operation failed") + + @patch('show.memory_statistics.SocketManager') + def test_failed_response_without_message(self, mock_socket_manager): + """Test response with status False but no error message""" + mock_instance = Mock() + mock_socket_manager.return_value = mock_instance + response_data = { + "status": False + } + mock_instance.receive_all.return_value = json.dumps(response_data) + + with self.assertRaises(RuntimeError) as context: + send_data("test_command", {}) + self.assertEqual(str(context.exception), "Unknown error occurred") + + @patch('show.memory_statistics.SocketManager') + def test_complex_response_object_conversion(self, mock_socket_manager): + """Test conversion of complex response object""" + mock_instance = Mock() + mock_socket_manager.return_value = mock_instance + response_data = { + "status": True, + "data": { + "metrics": [ + {"name": "memory", "value": 100}, + {"name": "cpu", "value": 50} + ], + "timestamp": "2024-01-01" + } + } + mock_instance.receive_all.return_value = json.dumps(response_data) + + result = send_data("test_command", {}) + self.assertTrue(result.status) + self.assertEqual(result.data.metrics[0].name, "memory") + self.assertEqual(result.data.metrics[1].value, 50) + self.assertEqual(result.data.timestamp, "2024-01-01") + + +class TestDisplayConfig(unittest.TestCase): + """Tests for display_config function""" + + def test_display_config_success(self): + """Test successful config display""" + mock_connector = MagicMock() + mock_connector.get_memory_statistics_config.return_value = { + "enabled": "true", + "retention_period": "15", + "sampling_interval": "5" + } + + runner = CliRunner() + with runner.isolation(): + display_config(mock_connector) + + def test_display_config_error(self): + """Test error handling in display config""" + mock_connector = MagicMock() + mock_connector.get_memory_statistics_config.side_effect = RuntimeError("Config error") + + with pytest.raises(click.ClickException): + display_config(mock_connector) + + +class TestMainFunction(unittest.TestCase): + """Tests for main function""" + + @patch('signal.signal') + @patch('show.memory_statistics.cli') + def test_successful_execution(self, mock_cli, mock_signal): + """Test successful execution of main function""" + main() + mock_signal.assert_called_once_with(signal.SIGTERM, shutdown_handler) + mock_cli.assert_called_once() + + @patch('signal.signal') + @patch('show.memory_statistics.cli') + @patch('show.memory_statistics.cleanup_resources') + def test_main_with_exception(self, mock_cleanup, mock_cli, mock_signal): + """Test main function with exception""" + mock_cli.side_effect = Exception("CLI error") + + with self.assertRaises(SystemExit): + main() + mock_cleanup.assert_called_once() + + +class TestFormatFieldValue: + """Tests for format_field_value function using pytest""" + + @pytest.mark.parametrize("field_name,value,expected", [ + ("enabled", "true", "True"), + ("enabled", "false", "False"), + ("enabled", "TRUE", "True"), + ("enabled", "FALSE", "False"), + ("retention_period", "15", "15"), + ("sampling_interval", "5", "5"), + ("any_field", "Unknown", "Not configured"), + ]) + def test_format_field_value(self, field_name, value, expected): + assert format_field_value(field_name, value) == expected + + +class TestMemoryStatistics(unittest.TestCase): + def setUp(self): + self.cli_runner = CliRunner() + + def test_dict2obj_invalid_input(self): + """Test Dict2Obj with invalid input (line 71)""" + with self.assertRaises(ValueError): + Dict2Obj("invalid input") + + def test_dict2obj_empty_list(self): + """Test Dict2Obj with empty list (line 78)""" + obj = Dict2Obj([]) + self.assertEqual(obj.to_dict(), []) + + @patch('socket.socket') + def test_socket_receive_timeout(self, mock_socket): + """Test socket timeout during receive (lines 137-140)""" + manager = SocketManager() + mock_socket.return_value.recv.side_effect = socket.timeout + manager.sock = mock_socket.return_value + + with self.assertRaises(ConnectionError): + manager.receive_all() + + @patch('socket.socket') + def test_socket_send_error(self, mock_socket): + """Test socket send error (line 166)""" + manager = SocketManager() + mock_socket.return_value.sendall.side_effect = socket.error("Send failed") + manager.sock = mock_socket.return_value + + with self.assertRaises(ConnectionError): + manager.send("test data") + + @patch('syslog.syslog') + def test_cleanup_resources_error(self, mock_syslog): + """Test cleanup resources error handling (lines 220-223)""" + cleanup_resources.socket_manager = MagicMock() + cleanup_resources.socket_manager.close.side_effect = Exception("Cleanup failed") + + cleanup_resources() + mock_syslog.assert_called_with(syslog.LOG_ERR, "Error during cleanup: Cleanup failed") + + @patch('show.memory_statistics.send_data') + def test_show_statistics_invalid_data(self, mock_send): + """Test show statistics with invalid data format (line 247)""" + mock_send.return_value = Dict2Obj(["invalid"]) + result = self.cli_runner.invoke(show_statistics, []) + self.assertIn("Error: Invalid data format received", result.output) + + @patch('show.memory_statistics.SonicDBConnector') + def test_show_configuration_error(self, mock_db): + """Test show configuration error (line 302)""" + mock_db.side_effect = Exception("DB connection failed") + result = self.cli_runner.invoke(show_configuration) + self.assertIn("Error: DB connection failed", result.output) + + @patch('show.memory_statistics.signal.signal') + def test_main_error(self, mock_signal): + """Test main function error handling (lines 344, 355)""" + mock_signal.side_effect = Exception("Signal registration failed") + + with self.assertRaises(SystemExit): + main() + + def test_socket_manager_validation(self): + """Test socket path validation (line 409)""" + with self.assertRaises(ConnectionError): + SocketManager("/nonexistent/path/socket") + + +class TestAdditionalMemoryStatisticsCLI(unittest.TestCase): + + def setUp(self): + self.runner = CliRunner() + + def test_dict2obj_with_nested_data(self): + """Test Dict2Obj with deeply nested dictionaries""" + data = {'a': {'b': {'c': 1}}, 'list': [1, {'d': 2}]} + obj = Dict2Obj(data) + self.assertEqual(obj.a.b.c, 1) + self.assertEqual(obj.list[1].d, 2) + self.assertEqual(obj.to_dict(), data) + + @patch('show.memory_statistics.SocketManager') + def test_socket_manager_close_exception(self, mock_socket_manager): + """Test SocketManager close handles exceptions gracefully""" + mock_socket_instance = mock_socket_manager.return_value + mock_socket_instance.close.side_effect = Exception("Close error") + + manager = SocketManager() + manager.sock = mock_socket_instance + + with patch('syslog.syslog') as mock_syslog: + manager.close() + mock_syslog.assert_any_call(4, "Error closing socket: Close error") + + def test_dict2obj_repr(self): + """Test the __repr__ method of Dict2Obj""" + data = {'a': 1, 'b': {'c': 2}} + obj = Dict2Obj(data) + repr_str = repr(obj) + self.assertTrue(repr_str.startswith('