diff --git a/scripts/log-kubectl.py b/scripts/log-kubectl.py new file mode 100644 index 00000000..c630fa44 --- /dev/null +++ b/scripts/log-kubectl.py @@ -0,0 +1,96 @@ +import subprocess +import click +import signal +import sys +import os +import time + +# Construct the absolute path to the reana-client directory +current_dir = os.path.dirname(os.path.abspath(__file__)) +reana_client_path = os.path.abspath(os.path.join(current_dir, '..', '..', '..', 'reana-client')) + +# Add the reana-client directory to the sys.path +if reana_client_path not in sys.path: + sys.path.insert(0, reana_client_path) + +from reana_client.api.client import get_workflow_logs + +# Global variable to control the loop +continue_logging = True + +def signal_handler(sig, frame): + global continue_logging + continue_logging = False + print("Stopping log collection...") + +def run_command_with_retries(command, max_retries=5, delay=2): + attempt = 0 + while attempt < max_retries: + try: + process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + return process + except Exception as e: + print(f"An error occurred: {e}. Attempt {attempt + 1} of {max_retries}") + attempt += 1 + time.sleep(delay) + print("All attempts to run the command have failed.") + return None + +@click.command() +@click.option('--log-file', required=True, help='The name of the file to save the logs.') +@click.option('--workflow', required=True, help='The name of the workflow') +def run_command_and_save_logs(log_file, workflow): + global continue_logging + + # Get the access token from environment variable + access_token = os.getenv('REANA_ACCESS_TOKEN') + if not access_token: + print("Error: REANA_ACCESS_TOKEN environment variable is not set.") + sys.exit(1) + + # Call get_workflow_logs with the workflow and access_token + service = get_workflow_logs(workflow, access_token) + # Extract batch_id from the service response + batch_id = service.get('workflow_id') + if not batch_id: + print("Error: 'workflow_id' not found in the service response.") + sys.exit(1) + print("The batch_id is suppose to be this:", batch_id) + + # Register the signal handler + signal.signal(signal.SIGINT, signal_handler) + + command = ( + "kubectl get pod -o='custom-columns=NAME:.metadata.name,PHASE:.status.phase," + "CREATED:.metadata.creationTimestamp,STARTED:.status.startTime," + "STARTED_CONTAINERS:.status.containerStatuses[*].state.*.startedAt," + "FINISHED_CONTAINERS:.status.containerStatuses[*].state.*.finishedAt' -w" + ) + + with open(log_file, "a") as file: # 'a' is for appending to the file without truncating it + # Popen - command is executed in a new shell process, standard output and standard error are redirected to pipes + while continue_logging: + process = run_command_with_retries(command) + if not process: + print("Failed to start the command") + break + try: + while continue_logging: + line = process.stdout.readline() + if not line: + break + line = line.strip() # Strip the line of leading/trailing whitespace + print(line) + file.write(line + "\n") + if f"reana-run-batch-{batch_id}" in line and "Failed" in line: + file.write(line + '\n') # Ensure newline is added when writing to file + continue_logging = False + break + except Exception as e: + print(f"An error occurred while reading the process output: {e}") + finally: + process.terminate() + process.wait() + +if __name__ == "__main__": + run_command_and_save_logs() diff --git a/scripts/logs_lifetime_benchmarking_test.py b/scripts/logs_lifetime_benchmarking_test.py index 6b524ff6..a4050162 100644 --- a/scripts/logs_lifetime_benchmarking_test.py +++ b/scripts/logs_lifetime_benchmarking_test.py @@ -1,47 +1,78 @@ -import pandas as pd import subprocess +import pandas as pd import matplotlib.pyplot as plt from matplotlib.dates import DateFormatter, SecondLocator import click +import os +import re +import sys +import logging +from reana_client.api.client import get_workflow_logs + +# Ensure REANA_ACCESS_TOKEN is set +access_token = os.getenv('REANA_ACCESS_TOKEN') +if not access_token: + print("Error: REANA_ACCESS_TOKEN environment variable is not set.") + sys.exit(1) + +# Set up logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Function to extract job IDs from lines +def extract_job_ids_from_lines(lines): + job_ids = set() + for line in lines: + match = re.search(r'reana-run-job-\w{8}-\w{4}-\w{4}-\w{4}-\w{12}', line) + if match: + job_ids.add(match.group(0)) + return job_ids -"""Run this script to generate the plots of current job status""" -"""First compare the logs from reana-client logs command and the job pod ID's from statistics file""" - -def run_reana_client_logs(command): - command = f"reana-client logs -w {workflow}" - result = subprocess.run(command, shell=True, capture_output=True, text=True) - if result.returncode == 0: - return result.stdout - else: - print("Comamnd failed to run, error:") - print(result.stderr) - return None - +# Function to parse log file def parse_log_file(file_path): with open(file_path, 'r') as f: lines = f.readlines() return lines +# Function to write filtered lines to a new file +def write_filtered_log_file(filtered_lines, filtered_file_path): + with open(filtered_file_path, 'w') as f: + f.writelines(filtered_lines) + +# Function to extract unique jobs from lines def extract_unique_jobs(lines): unique_jobs = {} for line in lines: - if line.strip().startswith('reana-run-job-'): - job_id = line.strip().split()[0] + match = re.match(r'(reana-run-job-\w{8}-\w{4}-\w{4}-\w{4}-\w{12})', line) + if match: + job_id = match.group(0) unique_jobs[job_id] = line.strip() return unique_jobs.values() +# Function to extract succeeded timestamps from unique jobs def extract_succeeded_timestamps(unique_jobs): succeeded_timestamps = [line.split()[5] for line in unique_jobs if line.split()[5] != ""] succeeded_timestamps = [ts.split(',')[0] for ts in succeeded_timestamps] return pd.to_datetime(succeeded_timestamps, errors='coerce') +# Function to get sorted data by timestamp def get_sorted_data(lines): - sorted_data = sorted(lines, key=lambda x: x.split()[1]) - return sorted_data - + sorted_lines = [] + for line in lines: + parts = line.split() + try: + timestamp = pd.to_datetime(parts[2]) + sorted_lines.append((timestamp, line)) + except Exception as e: + logger.error(f"Error parsing date from line: {line}. Error: {e}") + sorted_lines.sort(key=lambda x: x[0]) + return [line for _, line in sorted_lines] + +# Function to filter jobs based on status def filter_jobs(sorted_data, status): return [line for line in sorted_data if line.split()[1] == status] +# Function to extract running timestamps def extract_running_timestamps(running_jobs): timestamps_running = [] encountered_jobs_running = set() @@ -64,6 +95,7 @@ def extract_running_timestamps(running_jobs): timestamps_running.sort() return timestamps_running +# Function to extract pending timestamps def extract_pending_timestamps(pending_jobs): timestamps_pending = [] encountered_jobs_pending = set() @@ -90,6 +122,7 @@ def extract_pending_timestamps(pending_jobs): timestamps_pending.sort() return timestamps_pending +# Function to calculate cumulative timestamps def calculate_cumulative(timestamps): x = [] y = [] @@ -99,19 +132,13 @@ def calculate_cumulative(timestamps): x.append(timestamp[0]) y.append(cumulative_sum) return x, y - -def plot_data(succeeded_counts, x_running, y_running, x_pending, y_pending): - plt.figure(figsize=figsize) - # Plot succeeded jobs +# Function to plot data +def plot_data(succeeded_counts, x_running, y_running, x_pending, y_pending, title, figsize): + plt.figure(figsize=figsize) plt.plot(succeeded_counts.index, succeeded_counts.cumsum(), label='Finished', linestyle='-', color='green', alpha=0.5) - - # Plot running jobs plt.plot(x_running, y_running, linestyle='-', color='blue', alpha=0.5, linewidth=3, label='Running') - - # Plot pending jobs plt.plot(x_pending, y_pending, linestyle='-', color='orange', alpha=0.5, linewidth=3, label='Pending') - plt.xlabel('Processing time') plt.ylabel('Number of Jobs') plt.title(title) @@ -127,39 +154,45 @@ def plot_data(succeeded_counts, x_running, y_running, x_pending, y_pending): @click.argument('file_path') @click.option('--title', default='Analysis Results', help='Title of the analysis results') @click.option('--figsize', nargs=2, type=float, default=(12, 8), help='Figure size as two float values') -@click.option('--workflow', required=False, help='Name of the REANA workflow the same as the processed .txt file') +@click.option('--workflow', required=True, help='Name of the REANA workflow') def main(file_path, title, figsize, workflow): - """Compare the reana-client logs and the jobs from the analysis results - Run benchmarking tests. Generate matplotlib plot + """ This script allows to plot the workflow lifetime statistics. + As a results of evaluating the logs file with pod life cycle information, + the statistics of how many jobe were running in parallel can be found. - The script requires matplotlib and pandas packages - - Steps to run benchmarking workflow lifetime test: + Steps to run benchmarking workflow lifetime test: .. code-block:: console - \b #To run this script $ kubectl #To save a live logs $ cd reana/scripts - $ python lifetime.py logs.txt # insert your .txt file with logs - + $ python lifetime.py logs.txt # insert your .txt file with logs and the name of the workflow """ - reana_logs = run_reana_client_logs(workflow) - reana_job_ids = set() - for line in reana_logs.splitlines(): - if line.strip().startswith('reana-run-job'): - job_id = line.strip().split()[0] - reana_job_ids.add(job_id) + service = get_workflow_logs(workflow, access_token) + log_string = service['logs'] + reana_run_job_ids = re.findall(r'reana-run-job-\w{8}-\w{4}-\w{4}-\w{4}-\w{12}', log_string) + lines = parse_log_file(file_path) + file_job_ids = extract_job_ids_from_lines(lines) + + diff_job_ids = set(reana_run_job_ids).symmetric_difference(file_job_ids) + if diff_job_ids: + print("Differing Job IDs:") + for job_id in diff_job_ids: + print(job_id) + else: + print("No differing Job IDs found.") - unique_jobs = extract_unique_jobs(lines) + filtered_lines = [line for line in lines if not any(job_id in line for job_id in diff_job_ids)] + filtered_file_path = 'filtered_' + file_path + write_filtered_log_file(filtered_lines, filtered_file_path) + + unique_jobs = extract_unique_jobs(filtered_lines) succeeded_timestamps = extract_succeeded_timestamps(unique_jobs) - first_succeeded_timestamp = succeeded_timestamps.dropna().min() succeeded_counts = succeeded_timestamps.value_counts().sort_index() - sorted_data = get_sorted_data(lines) - + sorted_data = get_sorted_data(filtered_lines) running_jobs = filter_jobs(sorted_data, 'Running') timestamps_running = extract_running_timestamps(running_jobs) x_running, y_running = calculate_cumulative(timestamps_running) @@ -168,7 +201,7 @@ def main(file_path, title, figsize, workflow): timestamps_pending = extract_pending_timestamps(pending_jobs) x_pending, y_pending = calculate_cumulative(timestamps_pending) - plot_data(succeeded_counts, x_running, y_running, x_pending, y_pending) + plot_data(succeeded_counts, x_running, y_running, x_pending, y_pending, title, figsize) if __name__ == "__main__": main()