diff --git a/CHANGELOG.md b/CHANGELOG.md index a13add7cc0..50f124969d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ All notable changes to this project will be documented in this file. ### Added +- Added the capability to plot indexed alerts and vulnerabilities. ([#5518](https://github.com/wazuh/wazuh-qa/pull/5518)) \- (Framework) - Add functionality to unify data of the binary processes with their subprocesses to plot ([#5500](https://github.com/wazuh/wazuh-qa/pull/5500)) \- (Framework) ### Changed diff --git a/deps/wazuh_testing/wazuh_testing/data/data_visualizer/analysis_csv_headers.json b/deps/wazuh_testing/wazuh_testing/data/data_visualizer/analysis_csv_headers.json index 1bb8bea5ed..208bbc1f3e 100644 --- a/deps/wazuh_testing/wazuh_testing/data/data_visualizer/analysis_csv_headers.json +++ b/deps/wazuh_testing/wazuh_testing/data/data_visualizer/analysis_csv_headers.json @@ -32,9 +32,10 @@ ] }, "alerts_info": { - "title": "Alerts and events info.", + "title": "Alerts and events info", "columns": [ - "Events processed", "Events received", "Written alerts", "Written firewall", "Written fts" + "Events processed", "Events received", "Written alerts", "Written firewall", "Written fts", + "Written archives", "Written stats" ] } } diff --git a/deps/wazuh_testing/wazuh_testing/scripts/data_visualizations.py b/deps/wazuh_testing/wazuh_testing/scripts/data_visualizations.py index 8ba897af7b..1b44453732 100644 --- a/deps/wazuh_testing/wazuh_testing/scripts/data_visualizations.py +++ b/deps/wazuh_testing/wazuh_testing/scripts/data_visualizations.py @@ -3,16 +3,46 @@ from os.path import exists from tempfile import gettempdir -from wazuh_testing.tools.performance.visualization import DataVisualizer +from wazuh_testing.tools.performance.visualization import ( + BinaryDatavisualizer, + ClusterStatisticsVisualizer, + DaemonStatisticsVisualizer, + IndexerAlerts, + IndexerVulnerabilities, + LogcollectorStatisticsVisualizer, +) + +supported_targets = ['binary', 'analysis', 'remote', 'wazuhdb', 'logcollector', + 'cluster', 'indexer-alerts', + 'indexer-vulnerabilities'] +daemon_supported_statistics = ['analysis', 'remote', 'wazuhdb'] +strategy_plot_by_target = { + 'binary': BinaryDatavisualizer, + 'daemon-statistics': DaemonStatisticsVisualizer, + 'cluster': ClusterStatisticsVisualizer, + 'logcollector': LogcollectorStatisticsVisualizer, + 'indexer-alerts': IndexerAlerts, + 'indexer-vulnerabilities': IndexerVulnerabilities +} + + +def create_destination_directory(destination_directory): + if not exists(destination_directory): + makedirs(destination_directory) + + +def validate_arguments(options): + if options.visualization_target != 'binary' and options.unify: + raise ValueError('Unify option is not allowed for non binary data plotting') def get_script_arguments(): - parser = argparse.ArgumentParser(usage="%(prog)s [options]", description="Script to generate data visualizations", + parser = argparse.ArgumentParser(usage='%(prog)s [options]', description='Script to generate data visualizations', formatter_class=argparse.RawTextHelpFormatter) parser.add_argument('-s', '--sources', dest='csv_list', required=True, type=str, nargs='+', action='store', help='Paths to the CSV files separated by whitespace.') parser.add_argument('-t', '--target', dest='visualization_target', default='binary', - choices=['binary', 'analysis', 'remote', 'agent', 'logcollector', 'cluster', 'api', 'wazuhdb'], + choices=supported_targets, help='Generate data visualizations for a specific target. Default binary.') parser.add_argument('-d', '--destination', dest='destination', default=gettempdir(), help=f'Directory to store the images. Default {gettempdir()}') @@ -21,20 +51,33 @@ def get_script_arguments(): parser.add_argument('-c', '--columns', dest='columns', default=None, help=f'Path to Json with Columns to Plot. Default {None}.') parser.add_argument('-u', '--unify', dest='unify', action='store_true', - help=f'Unify data of the binary processes with their subprocesses to plot.') + help='Unify data of the binary processes with their subprocesses to plot.') return parser.parse_args() def main(): options = get_script_arguments() - destination = options.destination + create_destination_directory(options.destination) + + target = options.visualization_target + validate_arguments(options) + + visualization_options = { + 'dataframes': options.csv_list, + 'store_path': options.destination, + 'base_name': options.name + } + + strategy = target + if target in daemon_supported_statistics: + visualization_options['daemon'] = target + strategy = 'daemon-statistics' + elif target == 'binary': + visualization_options['unify_child_daemon_metrics'] = options.unify + + dv = strategy_plot_by_target[strategy](**visualization_options) - if not exists(destination): - makedirs(destination) - dv = DataVisualizer(dataframes=options.csv_list, target=options.visualization_target, - compare=False, store_path=options.destination, base_name=options.name, - columns_path=options.columns, unify_child_daemon_metrics=options.unify) dv.plot() diff --git a/deps/wazuh_testing/wazuh_testing/tools/performance/visualization.py b/deps/wazuh_testing/wazuh_testing/tools/performance/visualization.py index 31ead4be57..222cb8793e 100644 --- a/deps/wazuh_testing/wazuh_testing/tools/performance/visualization.py +++ b/deps/wazuh_testing/wazuh_testing/tools/performance/visualization.py @@ -1,118 +1,124 @@ +import json +import logging +from abc import ABC, abstractmethod from os.path import dirname, join, realpath from re import sub from tempfile import gettempdir -from matplotlib.ticker import LinearLocator -import json -import logging import matplotlib.dates as mdates import matplotlib.pyplot as plt import pandas as pd import seaborn as sns +from matplotlib.ticker import LinearLocator -class DataVisualizer: +class DataVisualizer(ABC): """Class that allows to visualize the data collected using the wazuh_metrics tool. - Args: - dataframes (list): list containing the paths. - target (str): string to set the visualization type. - compare (bool): boolean to compare the different datasets. - store_path (str): path to store the CSV images. Defaults to the temp directory. - x_ticks_granularity (string): granularity of the Timestamp. It is set by default to minutes. - x_ticks_interval (int): interval of the x-label. - base_name (str, optional): base name used to store the images. Attributes: dataframes_paths (list): paths of the CSVs. dataframe (pandas.Dataframe): dataframe containing the info from all the CSVs. - compare (bool): boolean to compare the different datasets. - target (str): string to set the visualization type. store_path (str): path to store the CSV images. Defaults to the temp directory. - x_ticks_granularity (string): granularity of the Timestamp. It is set by default to minutes. - x_ticks_interval (int): interval of the x-label. base_name (str, optional): base name used to store the images. """ - def __init__(self, dataframes, target, compare=False, store_path=gettempdir(), x_ticks_granularity='minutes', - x_ticks_interval=1, base_name=None, columns_path=None, unify_child_daemon_metrics=False): - self.dataframes_paths = dataframes - self.dataframe = None - self.compare = compare - self.target = target - self.store_path = store_path - self._load_dataframes() - self.x_ticks_granularity = x_ticks_granularity - self.x_ticks_interval = x_ticks_interval - self.base_name = base_name - sns.set(rc={'figure.figsize': (26, 9)}) - self.columns_to_plot = None - if target in ['binary', 'analysis', 'remote', 'agent', 'logcollector', 'wazuhdb']: - self.columns_to_plot = self._load_columns_to_plot(columns_path) + def __init__(self, dataframes_paths, store_path=gettempdir(), base_name=None): + """Initializes the DataVisualizer. - if unify_child_daemon_metrics: - if target == 'binary': - self.dataframe = self.dataframe.reset_index(drop=False) - self._unify_dataframes() - else: - logging.warning("Enabled unify is only available for binary data. Ignoring") + Args: + dataframes_paths (list): List of paths to CSV files. + store_path (str, optional): Path to store the CSV images. Defaults to the temp directory. + base_name (str, optional): Base name used to store the images. + """ + self.dataframes_paths = dataframes_paths + self.store_path = store_path + self.base_name = base_name + self.dataframe = pd.DataFrame() - @staticmethod - def _color_palette(size): - """Create a list of different colors. + self._load_dataframes() + sns.set_theme(rc={'figure.figsize': (26, 9)}) - Args: - size (int): number of elements. + @abstractmethod + def _get_expected_fields(self): + """Abstract method to define expected fields in the data. Returns: - list: list of colors. The colors are represented as a tuple of float values. + list: List of expected field names. """ - return sns.hls_palette(size if size > 1 else 1, h=.5) + pass - def _load_columns_to_plot(self, columns_path): - full_path = columns_path + @abstractmethod + def plot(self): + """Abstract method to create data visualizations.""" + pass - if full_path is None: - filename = None + def _validate_dataframe(self) -> None: + """Validates the loaded dataframe. - if self.target != 'binary': - filename = self.target + '_csv_headers.json' - else: - filename = self.target + '_non_printable_headers.json' + Raises: + ValueError: If there are missing mandatory fields or duplicated column names. + """ + self._check_missing_mandatory_fields() + self._check_no_duplicated() + self._check_unexpected_values() - full_path = join(dirname(realpath(__file__)), '..', '..', 'data', 'data_visualizer', filename) + def _check_no_duplicated(self): + """Checks for duplicated column names in the dataframe. - with open(full_path, 'r') as columns_file: - full_data = json.load(columns_file) + Raises: + ValueError: If duplicate column names are found. + """ + if self.dataframe.columns.duplicated().any(): + raise ValueError('Duplicate column names found in the CSV file.') - return full_data + def _check_missing_mandatory_fields(self): + """Checks if mandatory fields are present in the dataframe. - def _load_dataframes(self): - """Load the dataframes from dataframes_paths.""" - for df_path in self.dataframes_paths: - if self.dataframe is None and self.target != 'cluster': - self.dataframe = pd.read_csv(df_path, index_col="Timestamp", parse_dates=True) - else: - new_csv = pd.read_csv(df_path, index_col="Timestamp", parse_dates=True) - self.dataframe = pd.concat([self.dataframe, new_csv]) + Raises: + ValueError: If mandatory fields are missing. + """ + if not (set(self._get_expected_fields()).issubset(set(self._get_data_columns()))): + missing_fields = (set(self._get_expected_fields()) - set(self._get_data_columns())) + raise ValueError(f"Missing some of the mandatory values: {missing_fields}") - def _unify_dataframes(self): - """Unify the data of each process with their respective sub-processes. + def _check_unexpected_values(self): + """Checks for unexpected values in the dataframe. + + Raises: + ValueError: If unexpected values are found. """ - pids = self.dataframe[['Daemon', 'PID']].drop_duplicates() - versions = self.dataframe[['Daemon', 'Version']].drop_duplicates() + if not (set(self._get_data_columns()).issubset(set(self._get_expected_fields()))): + missing_fields = (set(self._get_data_columns()) - set(self._get_expected_fields())) + logging.warning(f"Unexpected fields provided. These will not be plotted: {missing_fields}") - daemons_list = [daemon_name for daemon_name in self._get_daemons() if "child" not in daemon_name] + def _get_data_columns(self) -> list: + """Retrieves the list of column names from the loaded dataframe. - for daemon_name in daemons_list: - self.dataframe.loc[self.dataframe['Daemon'].str.contains(daemon_name, na=False), 'Daemon'] = daemon_name + Returns: + list: List of column names. + """ + try: + return list(self.dataframe.columns) + except StopIteration: + return [] - columns_to_drop = ['Timestamp', 'Daemon', 'Version', 'PID'] - columns_to_sum = self.dataframe.columns.drop(columns_to_drop) + @staticmethod + def _color_palette(size): + """Create a list of different colors. - self.dataframe = self.dataframe.groupby(['Timestamp', 'Daemon'])[columns_to_sum].sum().reset_index(drop=False) + Args: + size (int): number of elements. - self.dataframe = self.dataframe.merge(pids[['Daemon', 'PID']], on='Daemon', how='left') - self.dataframe = self.dataframe.merge(versions[['Daemon', 'Version']], on='Daemon', how='left') + Returns: + list: list of colors. The colors are represented as a tuple of float values. + """ + return sns.hls_palette(size if size > 1 else 1, h=.5) + + def _load_dataframes(self): + """Load the dataframes from dataframes_paths.""" + for df_path in self.dataframes_paths: + new_csv = pd.read_csv(df_path, index_col="Timestamp", parse_dates=True) + self.dataframe = pd.concat([self.dataframe, new_csv]) def _set_x_ticks_interval(self, ax): """Set the number of labels that will appear in the X axis and their format. @@ -120,32 +126,30 @@ def _set_x_ticks_interval(self, ax): Args: ax (axes.SubplotBase): subplot base where the data will be printed. """ - if self.x_ticks_granularity == 'seconds': - ax.xaxis.set_major_locator(LinearLocator(30)) - elif self.x_ticks_granularity == 'minutes': - ax.xaxis.set_major_locator(LinearLocator(30)) + ax.xaxis.set_major_locator(LinearLocator(30)) ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S')) @staticmethod def _get_statistics(df, calculate_mean=True, calculate_median=False): - """Function for calculating statistics. + """Calculate data statistics. Args: df (pandas.DataFrame): dataframe on which the operations will be applied. calculate_mean (bool, optional): specify whether or not the mean will be calculated. calculate_median (bool, optional): specify whether or not the median will be calculated. """ - statistics = str() + statistics = '' + if calculate_mean: - statistics += f"Mean: {round(pd.DataFrame.mean(df), 3)}\n" + statistics += f"Mean: {round(pd.Series.mean(df), 3)}\n" if calculate_median: - statistics += f"Median: {round(pd.DataFrame.median(df), 3)}\n" + statistics += f"Median: {round(pd.Series.median(df), 3)}\n" return statistics @staticmethod def _basic_plot(ax, dataframe, label=None, color=None): - """Basic function to visualize a dataframe. + """Visualize simple dataframe. Args: ax (axes.SubplotBase): subplot base where the data will be printed. @@ -155,15 +159,15 @@ def _basic_plot(ax, dataframe, label=None, color=None): """ ax.plot(dataframe, label=label, color=color) - def _save_custom_plot(self, ax, y_label, title, rotation=90, cluster_log=False, statistics=None): - """Function to add info to the plot, the legend and save the SVG image. + def _save_custom_plot(self, ax, y_label, title, rotation=90, disable_x_labels=False, statistics=None): + """Add info to the plot, the legend and save the SVG image. Args: ax (axes.SubplotBase): subplot base where the data will be printed. y_label (str): label for the Y axis. title (str): title of the plot. rotation (int, optional): optional int to set the rotation of the X-axis labels. - cluster_log (bool, optional): optional flag used to plot specific graphics for the cluster. + disable_x_labels (bool, optional): If True, the plot will not display the x-axis labels (timestamps). statistics (str, optional): optional statistics measures. """ if statistics: @@ -173,7 +177,7 @@ def _save_custom_plot(self, ax, y_label, title, rotation=90, cluster_log=False, ax.set_ylabel(y_label) ax.set_title(title) - if not cluster_log: + if not disable_x_labels: self._set_x_ticks_interval(ax) plt.xticks(rotation=rotation) svg_name = sub(pattern=r'\(.*\)', string=y_label, repl='') @@ -182,125 +186,426 @@ def _save_custom_plot(self, ax, y_label, title, rotation=90, cluster_log=False, if self.base_name is not None: svg_name = f"{self.base_name}_{svg_name}" + plt.savefig(join(self.store_path, f"{svg_name}.svg"), dpi=1200, format='svg') - def _plot_data(self, elements, title=None, generic_label=None): - """Function to plot the different types of dataframes. + +class BinaryDatavisualizer(DataVisualizer): + """A class for visualizing binary metrics data. + + Attributes: + dataframes_paths (list): paths of the CSVs. + dataframe (pandas.Dataframe): dataframe containing the info from all the CSVs. + store_path (str): path to store the CSV images. Defaults to the temp directory. + base_name (str, optional): base name used to store the images. + binary_metrics_fields_to_plot (list): List of binary metrics fields to plot. + binary_metrics_extra_fields (list): List of additional binary metrics fields. + binary_metrics_fields (list): Combined list of binary metrics fields. + """ + binary_metrics_fields_to_plot = ["CPU", "VMS", "RSS", "USS", + "PSS", "SWAP", "FD", "Read_Ops", + "Write_Ops", "Disk_Read", "Disk_Written", + "Disk_Read_Speed", "Disk_Write_Speed"] + binary_metrics_extra_fields = ["Daemon", "Version", "PID"] + binary_metrics_fields = binary_metrics_fields_to_plot + binary_metrics_extra_fields + + def __init__(self, dataframes, store_path=gettempdir(), base_name=None, unify_child_daemon_metrics=False): + """Initialize the BinaryDatavisualizer. Args: - elements (list, pandas.columns): columns to plot. - title (str, optional): title of the plot. - generic_label (str, optional): set a generic label to plot all the columns. - """ - if self.target == 'binary': - for element in elements: - fig, ax = plt.subplots() - daemons = self._get_daemons() - colors = self._color_palette(len(daemons)) - for daemon, color in zip(daemons, colors): - self._basic_plot(ax, self.dataframe[self.dataframe.Daemon == daemon][element], - label=daemon, color=color) - self._save_custom_plot(ax, element, f"{element} {title}") - - elif self.target == 'logcollector': - for element in elements: - fig, ax = plt.subplots() - targets = self._get_logcollector_targets() - colors = self._color_palette(len(targets)) - for target, color in zip(targets, colors): - self._basic_plot(ax, self.dataframe[self.dataframe.Target == target][element], - label=target, color=color) - self._save_custom_plot(ax, element, title) - - elif self.target == 'cluster': - for element in elements: - fig, ax = plt.subplots() - nodes = self.dataframe[self.dataframe.activity == element]['node_name'].unique() - current_df = self.dataframe[self.dataframe.activity == element] - current_df.reset_index(drop=True, inplace=True) - for node, color in zip(nodes, self._color_palette(len(nodes))): - self._basic_plot(ax=ax, dataframe=current_df[current_df.node_name == node]['time_spent(s)'], - label=node, color=color) - self._save_custom_plot(ax, 'time_spent(s)', element.replace(' ', '_').lower(), cluster_log=True, - statistics=DataVisualizer._get_statistics( - current_df['time_spent(s)'], calculate_mean=True, calculate_median=True)) - - elif self.target == 'api': - for element in elements: - fig, ax = plt.subplots() - queries = self.dataframe.endpoint.unique() - colors = self._color_palette(len(queries)) - for endpoint, color in zip(queries, colors): - self._basic_plot(ax, self.dataframe[self.dataframe.endpoint == endpoint]['time_spent(s)'], - label=endpoint, color=color) - self._save_custom_plot(ax, element, 'API Response time') + dataframes (list): List of dataframes containing binary metrics data. + store_path (str, optional): Path to store visualizations. Defaults to system temp directory. + base_name (str, optional): Base name for saved visualizations. Defaults to None. + unify_child_daemon_metrics (bool, optional): Whether to unify child daemon metrics. Defaults to False. + """ + super().__init__(dataframes, store_path, base_name) + self._validate_dataframe() + if unify_child_daemon_metrics: + self.dataframe = self.dataframe.reset_index(drop=False) + self._unify_dataframes() - else: - fig, ax = plt.subplots() - colors = self._color_palette(len(elements)) - for element, color in zip(elements, colors): - self._basic_plot(ax, self.dataframe[element], label=element, color=color) - self._save_custom_plot(ax, generic_label, title) - - def _plot_binaries_dataset(self): - """Function to plot the hardware data of the binary.""" - for element in self.columns_to_plot: - columns = self.dataframe.columns.drop(self.columns_to_plot[element]['columns']) - title = self.columns_to_plot[element]['title'] - self._plot_data(elements=columns, title=title) - - def _plot_generic_dataset(self): - """Function to plot the statistics from analysisd, remoted, logcollector and wazuhdb.""" - for element in self.columns_to_plot: - columns = self.columns_to_plot[element]['columns'] - title = self.columns_to_plot[element]['title'] - self._plot_data(elements=columns, title=title, generic_label=element) - - def _plot_agentd_dataset(self): - """Function to plot the statistics from wazuh-agentd.""" - if 'diff_seconds' not in self.dataframe.columns: - self.dataframe['diff_seconds'] = abs(pd.to_datetime(self.dataframe['last_keepalive']) - - pd.to_datetime(self.dataframe['last_ack'])) - self.dataframe['diff_seconds'] = self.dataframe.diff_seconds.dt.total_seconds() - - for element in self.columns_to_plot: - columns = self.columns_to_plot[element]['columns'] - title = self.columns_to_plot[element]['title'] - self._plot_data(elements=columns, title=title, generic_label=element) - - def _plot_cluster_dataset(self): - """Function to plot the information from the cluster.log file.""" - self._plot_data(elements=list(self.dataframe['activity'].unique()), generic_label='Managers') - - def _plot_api_dataset(self): - """Function to plot the information from the api.log file.""" - self._plot_data(elements=['endpoint'], generic_label='Queries') + def _get_expected_fields(self): + """Get the list of expected fields for binary metrics. - def plot(self): - """Public function to plot the dataset.""" - if self.target == 'binary': - self._plot_binaries_dataset() - elif self.target == 'analysis': - self._plot_generic_dataset() - elif self.target == 'remote': - self._plot_generic_dataset() - elif self.target == 'agent': - self._plot_agentd_dataset() - elif self.target == 'logcollector': - self._plot_generic_dataset() - elif self.target == 'cluster': - self._plot_cluster_dataset() - elif self.target == 'api': - self._plot_api_dataset() - elif self.target == 'wazuhdb': - self._plot_generic_dataset() - else: - raise AttributeError(f"Invalid target {self.target}") + Returns: + list: List of expected binary metrics fields. + """ + return self.binary_metrics_fields + + def _normalize_column_name(self, column_name: str): + """Normalize column names by removing units within parentheses. + + Args: + column_name (str): The column name to normalize. + + Returns: + str: The normalized column name. + """ + if '(' in column_name: + return column_name.split('(')[0].strip() + return column_name + + def _get_data_columns(self): + """Get the list of data columns in the dataframe after normalization. + + Returns: + list: List of normalized data column names. + """ + column_names = self.dataframe.columns + normalized_columns = [self._normalize_column_name(col) for col in column_names.tolist()] + + return normalized_columns def _get_daemons(self): - """Get the list of Wazuh Daemons in the dataset.""" + """Get the list of unique Wazuh Daemons in the dataset. + + Returns: + list: List of unique Daemon names. + """ return self.dataframe.Daemon.unique() - def _get_logcollector_targets(self): - """Get the list of unique logcollector targets (sockets) in the dataset.""" - return self.dataframe.Target.unique() + def _get_fields_to_plot(self): + """Get the list of fields to plot from the dataframe. + + Returns: + list: List of fields to plot. + """ + column_names = self.dataframe.columns + fields_to_plot = [] + + for field_to_plot in column_names: + if self._normalize_column_name(field_to_plot) in self.binary_metrics_fields_to_plot: + fields_to_plot.append(field_to_plot) + + return fields_to_plot + + def _unify_dataframes(self): + """Unify the data of each process with their respective sub-processes.""" + pids = self.dataframe[['Daemon', 'PID']].drop_duplicates() + versions = self.dataframe[['Daemon', 'Version']].drop_duplicates() + + daemons_list = [daemon_name for daemon_name in self._get_daemons() if "child" not in daemon_name] + + for daemon_name in daemons_list: + self.dataframe.loc[self.dataframe['Daemon'].str.contains(daemon_name, na=False), 'Daemon'] = daemon_name + + columns_to_drop = ['Timestamp', 'Daemon', 'Version', 'PID'] + columns_to_sum = self.dataframe.columns.drop(columns_to_drop) + + self.dataframe = self.dataframe.groupby(['Timestamp', 'Daemon'])[columns_to_sum].sum().reset_index(drop=False) + + self.dataframe = self.dataframe.merge(pids[['Daemon', 'PID']], on='Daemon', how='left') + self.dataframe = self.dataframe.merge(versions[['Daemon', 'Version']], on='Daemon', how='left') + + def plot(self): + """Plot the binary metrics data for each field to be plotted. + + This method creates and saves plots for each binary metric field. + """ + columns_to_plot = self._get_fields_to_plot() + for element in columns_to_plot: + _, ax = plt.subplots() + daemons = self._get_daemons() + colors = self._color_palette(len(daemons)) + for daemon, color in zip(daemons, colors): + self._basic_plot(ax, self.dataframe[self.dataframe.Daemon == daemon][element], + label=daemon, color=color) + + self._save_custom_plot(ax, element, element) + + +class DaemonStatisticsVisualizer(DataVisualizer): + """A class for visualizing daemon statistics data. + + Attributes: + dataframes_paths (list): paths of the CSVs. + dataframe (pandas.Dataframe): dataframe containing the info from all the CSVs. + store_path (str): path to store the CSV images. Defaults to the temp directory. + base_name (str, optional): base name used to store the images. + daemon (str): Name of the daemon for which statistics are visualized. + plots_data (dict): Data required for plotting statistics. + expected_fields (list): List of expected fields for the daemon statistics. + """ + + general_fields = ['API Timestamp', 'Interval (Timestamp-Uptime)'] + statistics_plot_data_directory = join(dirname(realpath(__file__)), '..', '..', 'data', 'data_visualizer') + statistics_filename_suffix = '_csv_headers.json' + + def __init__(self, dataframes, daemon, store_path=gettempdir(), base_name=None): + """Initialize the DaemonStatisticsVisualizer. + + Args: + dataframes (list): List of dataframes containing daemon statistics data. + daemon (str): Name of the daemon for which statistics are visualized. + store_path (str, optional): Path to store visualizations. Defaults to system temp directory. + base_name (str, optional): Base name for saved visualizations. Defaults to None. + """ + self.daemon = daemon + super().__init__(dataframes, store_path, base_name) + self.plots_data = self._load_plot_data() + self.expected_fields = [] + for graph in self.plots_data.values(): + for column in graph['columns']: + self.expected_fields.append(column) + self.expected_fields.extend(self.general_fields) + self._validate_dataframe() + + def _get_expected_fields(self): + """Get the list of expected fields for the daemon statistics. + + Returns: + list: List of expected fields. + """ + return self.expected_fields + + def _get_statistic_plot_data_file(self): + """Get the file path for the statistics plot data file. + + Returns: + str: Path to the statistics plot data file. + """ + return join(self.statistics_plot_data_directory, self.daemon + self.statistics_filename_suffix) + + def _load_plot_data(self): + """Load the plot data from the statistics plot data file. + + Returns: + dict: Data required for plotting statistics. + """ + statistic_plot_data = self._get_statistic_plot_data_file() + with open(statistic_plot_data) as columns_file: + full_data = json.load(columns_file) + + return full_data + + def plot(self): + """Plot the daemon statistics data for each field to be plotted. + + This method creates and saves plots for each statistic field. + """ + for element in self.plots_data.values(): + columns = element['columns'] + title = element['title'] + colors = self._color_palette(len(columns)) + + _, ax = plt.subplots() + for column, color in zip(columns, colors): + self._basic_plot(ax, self.dataframe[column], label=column, color=color) + self._save_custom_plot(ax, title, title) + + +class LogcollectorStatisticsVisualizer(DaemonStatisticsVisualizer): + """A class for visualizing logcollector statistics data. + + Attributes: + dataframes_paths (list): paths of the CSVs. + dataframe (pandas.Dataframe): dataframe containing the info from all the CSVs. + store_path (str): path to store the CSV images. Defaults to the temp directory. + base_name (str, optional): base name used to store the images. + general_fields (list): List of general fields for logcollector statistics. + """ + general_fields = ['Location', 'Target'] + + def __init__(self, dataframes, store_path=gettempdir(), base_name=None): + """Initialize the LogcollectorStatisticsVisualizer. + + Args: + dataframes (list): List of dataframes containing logcollector statistics data. + store_path (str, optional): Path to store visualizations. Defaults to system temp directory. + base_name (str, optional): Base name for saved visualizations. Defaults to None. + """ + super().__init__(dataframes, 'logcollector', store_path, base_name) + + def _get_expected_fields(self): + """Get the list of expected fields for logcollector statistics. + + Returns: + list: List of expected fields. + """ + return self.general_fields + + def _get_logcollector_location(self): + """Get the list of unique logcollector targets (sockets) in the dataset. + + Returns: + numpy.ndarray: Array of unique logcollector targets. + """ + return self.dataframe.Location.unique() + + def plot(self): + """Plot the logcollector statistics data for each target. + + This method creates and saves plots for each logcollector target. + """ + for element in self.plots_data.values(): + _, ax = plt.subplots() + targets = self._get_logcollector_location() + colors = self._color_palette(len(targets)) + for target, color in zip(targets, colors): + self._basic_plot(ax, self.dataframe[self.dataframe.Location == target][element['columns']], + label=target, color=color) + + self._save_custom_plot(ax, element['title'], element['title']) + + +class ClusterStatisticsVisualizer(DataVisualizer): + """A class for visualizing cluster statistics data. + + Attributes: + dataframes_paths (list): paths of the CSVs. + dataframe (pandas.Dataframe): dataframe containing the info from all the CSVs. + store_path (str): path to store the CSV images. Defaults to the temp directory. + base_name (str, optional): base name used to store the images. + expected_cluster_fields (list): List of expected fields for cluster statistics. + """ + expected_cluster_fields = ['node_name', 'activity', 'time_spent(s)'] + + def __init__(self, dataframes_paths, store_path=gettempdir(), base_name=None): + """Initialize the ClusterStatisticsVisualizer. + + Args: + dataframes_paths (list): List of paths to dataframes containing cluster statistics data. + store_path (str, optional): Path to store visualizations. Defaults to system temp directory. + base_name (str, optional): Base name for saved visualizations. Defaults to None. + """ + super().__init__(dataframes_paths, store_path, base_name) + self._validate_dataframe() + + def _get_expected_fields(self) -> list: + """Get the list of expected fields for cluster statistics. + + Returns: + list: List of expected cluster fields. + """ + return self.expected_cluster_fields + + def plot(self): + """Plot the cluster statistics data for each activity. + + This method creates and saves plots for each cluster activity. + """ + elements = list(self.dataframe['activity'].unique()) + + for element in elements: + _, ax = plt.subplots() + nodes = self.dataframe[self.dataframe.activity == element]['node_name'].unique() + current_df = self.dataframe[self.dataframe.activity == element] + current_df.reset_index(drop=True, inplace=True) + for node, color in zip(nodes, self._color_palette(len(nodes))): + self._basic_plot(ax=ax, dataframe=current_df[current_df.node_name == node]['time_spent(s)'], + label=node, color=color) + self._save_custom_plot(ax, 'time_spent(s)', element.replace(' ', '_').lower(), disable_x_labels=True, + statistics=DataVisualizer._get_statistics( + current_df['time_spent(s)'], calculate_mean=True, calculate_median=True)) + + +class IndexerAlerts(DataVisualizer): + """A class for visualizing indexer alerts data. + + Attributes: + dataframes_paths (list): paths of the CSVs. + dataframe (pandas.Dataframe): dataframe containing the info from all the CSVs. + store_path (str): path to store the CSV images. Defaults to the temp directory. + expected_fields (list): List of expected fields for indexer alerts. + """ + expected_fields = ['Total alerts'] + + def __init__(self, dataframes_paths, store_path=gettempdir(), base_name=None): + """Initialize the IndexerAlerts visualizer. + + Args: + dataframes_paths (list): List of paths to dataframes containing indexer alerts data. + store_path (str, optional): Path to store visualizations. Defaults to system temp directory. + base_name (str, optional): Base name for saved visualizations. Defaults to None. + """ + super().__init__(dataframes_paths, store_path, base_name) + self._validate_dataframe() + + def _get_expected_fields(self): + """Get the list of expected fields for indexer alerts. + + Returns: + list: List of expected fields. + """ + return self.expected_fields + + def _calculate_timestamp_interval(self): + """Calculate the interval between timestamps in seconds. + + Returns: + float: Interval between timestamps in seconds. + """ + interval = self.dataframe.index[1] - self.dataframe.index[0] + return interval.total_seconds() + + def _plot_agregated_alerts(self): + """Plot the aggregated alerts per timestamp. + + This method creates and saves a plot for the aggregated alerts. + """ + _, ax = plt.subplots() + self.dataframe['Difference'] = self.dataframe['Total alerts'].diff() + self.dataframe['Difference'] = self.dataframe['Difference'] / self._calculate_timestamp_interval() + + self._basic_plot(ax=ax, dataframe=self.dataframe['Difference'], label='Alerts per timestamp', + color=self._color_palette(1)[0]) + self._save_custom_plot(ax, 'Different alerts', 'Difference alerts') + + def _plot_plain_alerts(self): + """Plot the total alerts. + + This method creates and saves a plot for the total alerts. + """ + _, ax = plt.subplots() + self._basic_plot(ax=ax, dataframe=self.dataframe['Total alerts'], label='Total alerts', + color=self._color_palette(1)[0]) + self._save_custom_plot(ax, 'Total alerts', 'Total alerts') + + def plot(self): + """Plot the indexer alerts data. + + This method creates and saves plots for both total alerts and aggregated alerts. + """ + self._plot_plain_alerts() + self._plot_agregated_alerts() + + +class IndexerVulnerabilities(DataVisualizer): + """A class for visualizing indexer vulnerabilities data. + + Attributes: + dataframes_paths (list): paths of the CSVs. + dataframe (pandas.Dataframe): dataframe containing the info from all the CSVs. + store_path (str): path to store the CSV images. Defaults to the temp directory. + expected_fields (list): List of expected fields for indexer vulnerabilities. + """ + expected_fields = ['Total vulnerabilities'] + + def __init__(self, dataframes_paths, store_path=gettempdir(), base_name=None): + """Initialize the IndexerVulnerabilities visualizer. + + Args: + dataframes_paths (list): List of paths to dataframes containing indexer vulnerabilities data. + store_path (str, optional): Path to store visualizations. Defaults to system temp directory. + base_name (str, optional): Base name for saved visualizations. Defaults to None. + """ + super().__init__(dataframes_paths, store_path, base_name) + self._validate_dataframe() + + def _get_expected_fields(self): + """Get the list of expected fields for indexer vulnerabilities. + + Returns: + list: List of expected fields. + """ + return self.expected_fields + + def plot(self): + """Plot the indexer vulnerabilities data. + + This method creates and saves a plot for the total vulnerabilities. + """ + _, ax = plt.subplots() + self._basic_plot(ax=ax, dataframe=self.dataframe['Total vulnerabilities'], label='Indexed Vulnerabilities', + color=self._color_palette(1)[0]) + self._save_custom_plot(ax, 'Total Vulnerabilities', 'Total vulnerabilities')