diff --git a/telematic_system/scripts/log_analysis/README.md b/telematic_system/scripts/log_analysis/README.md new file mode 100644 index 00000000..0048c9bb --- /dev/null +++ b/telematic_system/scripts/log_analysis/README.md @@ -0,0 +1,136 @@ +# Prerequisite +- Preferred operating system ubuntu 20 or above +- Python environment setup + 1. Install python + ``` + sudo apt update + sudo apt install python3 + ``` + 2. Check python version + ``` + python3 --version + ``` + Recommended version is `3.10` + 3. Create a virtual environment. Navigate to `cda-telematics/telematic_system/scripts/log_analysis` directory, and run below command: + ``` + python3 -m venv .venv + ``` + 4. Activate virtual environment. + ``` + .venv\bin\activate + ``` + Note: Need to run this command to activate virtual environment every time openning a new terminal. +- Install depedencies: + - Install debian packages + ``` + sudo apt install libcairo2-dev libxt-dev libgirepository1.0-dev + + ``` + - Install python packages + ``` + pip install -r requirements.txt + ``` +- Clone repos: + - Clone cda-telematics GitHub repos + ``` + git clone https://github.com/usdot-fhwa-stol/cda-telematics.git + cd cda-telematics + ``` +- Download `log_timesheet.csv` +Most of the python analysis scripts refer to `log_timesheet.csv` for test runs and their duration. Since this `log_timesheet.csv` is generated during the verification/validation testing, ensure download the `log_timesheet.csv` file to this `log_analysis` folder before executing any python scripts. + + +# Process V2xHub bridge log +1. Navigate to `cda-telematics/telematic_system/scripts/log_analysis` directory +2. Download v2xhub logs to the current folder. +3. Run command to generate data publishing metrics. + ``` + python3 parse_v2xhub_telematic_plugin_logs.py --log_file_path + + e.g: + python3 parse_v2xhub_telematic_plugin_logs.py --log_file_path T20_R6-13_V2XHub.log + ``` + It will generate parsed bridge log in csv files. + +# Process Streets bridge log +1. Navigate to `cda-telematics/telematic_system/scripts/log_analysis` directory +2. Download streets bridge logs to the current folder. +3. Run command to generate data publishing metrics. + ``` + python3 parse_streets_bridge_logs.py + ``` + It will generate parsed bridge log in csv files. + +# Process Cloud bridge log +1. Navigate to `cda-telematics/telematic_system/scripts/log_analysis` directory +2. Download streets bridge logs to the current folder. +3. Run command to generate data publishing metrics. + ``` + parse_cloud_bridge_logs.py + + e.g: + python3 parse_cloud_bridge_logs.py T20_R6-9_carma_cloud.log + python3 parse_cloud_bridge_logs.py T20_R10-13_carma_cloud.log + ``` + It will generate parsed bridge log in csv files. + +# Process Vehicle bridge log +1. Navigate to `cda-telematics/telematic_system/scripts/log_analysis` directory +2. Download vehicle bridge logs to the current folder. +3. Run command to generate data publishing metrics. + ``` + python3 parse_vehicle_bridge_logs.py + + e.g: + python3 parse_vehicle_bridge_logs.py T20_R6_R13_fusion/T20_R6_fusion.log + ``` + It will generate parsed bridge log in csv files. + +# Process Messaging Server log +1. Navigate to `cda-telematics/telematic_system/scripts/log_analysis` directory +2. Download messaging server logs to the current folder. +3. Run command to generate data publishing metrics. + ``` + parse_messaging_server_logs.py + + e.g: + python3 parse_messaging_server_logs.py T20_R6-13_messaging_server.log + ``` + It will generate parsed messaging server delay and message drop log in csv files. + +# Metric analysis +## Latency +1. Create a folder with the test case name in the current `log_analysis` folder. +For example, test case 20: + ``` + mkdir T20 + ``` +2. Copy all the generated T20_*_messaging_server_*_delay_parsed.csv files to this new folder `T20` +3. Run plot latency script to generate plots for those csv files with delay metrics in folder `T20`. + ``` + python3 latencyPlotter.py + + e.g: + python3 latencyPlotter.py T20 + ``` + The generated plots are saved into `output` folder. +## Message loss +1. Create a folder with the test case name and message drop in the current `log_analysis` folder. +For example, test case 20: + ``` + mkdir T20_message_drop + + ``` +2. Copy all generated _*_messaging_server_*_message_drop_parsed.csv files to this new folder `_message_drop`. +3. Copy all generated bridge csv files into the same folder +4. Run message drop analysis script to analyze all files in the `_message_drop` folder. + ``` + python3 get_message_drop.py _message_drop + + e.g: + python3 get_message_drop.py T20_message_drop + ``` +Generated result is similar to below: +
+![Message_loss_result](https://github.com/user-attachments/assets/15fefacb-e929-4340-a0e3-6d7f6441ba8e) + diff --git a/telematic_system/scripts/log_analysis/get_message_drop.py b/telematic_system/scripts/log_analysis/get_message_drop.py index ada64cc6..20ad6628 100644 --- a/telematic_system/scripts/log_analysis/get_message_drop.py +++ b/telematic_system/scripts/log_analysis/get_message_drop.py @@ -9,7 +9,7 @@ import matplotlib.dates as mdates import matplotlib.pyplot as plt - +import os warnings.filterwarnings("ignore") ''' @@ -38,12 +38,12 @@ def combineFiles(log_dir): bridges_csv = [] messaging_server_csv_exist = False - messaging_server_csv = "" + messaging_server_csv = [] for filename in filenames: - if "Messaging" in filename: + if "messaging" in filename.lower(): messaging_server_csv_exist = True - messaging_server_csv = log_dir + "/" + filename + messaging_server_csv.append(log_dir + "/" + filename) matched = re.match(bridge_csv_regex, filename, re.IGNORECASE) if matched: @@ -55,9 +55,8 @@ def combineFiles(log_dir): if not messaging_server_csv_exist: sys.exit("Did not find any Messaging server csv logs in directory: "+log_dir+ "") - - messaging_server_df = pd.read_csv(messaging_server_csv) + messaging_server_df = pd.concat(map(pd.read_csv, messaging_server_csv), ignore_index=True) infrastructure_units = ['streets_id', 'cloud_id'] ############# Load messaging server logs and get a list of dataframes for all unit ids @@ -71,25 +70,21 @@ def combineFiles(log_dir): # value = value.drop('Metadata',axis =1) - #Get dataframes from bridge logs - bridge_dfs = dict() - for bridge_csv in bridges_csv: - bridge_df = pd.read_csv(bridge_csv) - bridge_dfs.update(dict(tuple(bridge_df.groupby('Unit Id')))) - - print(bridge_dfs.keys()) + bridge_df = pd.concat(map(pd.read_csv, bridges_csv), ignore_index=True) + bridge_dfs = dict(tuple(bridge_df.groupby('Unit Id'))) # Create combined dataframes from for key in bridge_dfs: if key in messaging_server_dfs: - bridge_df_combined = pd.merge(bridge_dfs[key], messaging_server_dfs[key], how='left', left_on=['Topic','Payload Timestamp'], right_on = ['Topic','Message Time']) - bridge_df_combined.to_csv(log_dir + key + "_combined.csv") + if not os.path.exists("output"): + os.mkdir("output") + bridge_df_combined.to_csv("output/"+log_dir+"_"+ key + "_combined.csv") bridge_missing_message_count = bridge_df_combined['Log_Timestamp(s)'].isnull().sum() bridge_total_message_count = len(bridge_df_combined['Payload Timestamp']) - print("Message drop for unit: ", key) + print("\nMessage drop for unit: ", key) print("Missing count: ", bridge_missing_message_count) print("Total count: ", bridge_total_message_count) print("Percentage of messages received",(1 - (bridge_missing_message_count/bridge_total_message_count))*100) @@ -101,29 +96,6 @@ def combineFiles(log_dir): print("{} missed messages: ".format(key)) print(topics_with_empty_count) - # Plot vehicle data - bridge_df_combined = bridge_df_combined[bridge_df_combined['Message Time'].isnull()] - bridge_df_combined['Payload Timestamp'] = pd.to_datetime(bridge_df_combined['Payload Timestamp'], infer_datetime_format=True) - bridge_df_combined['Message Time'] = pd.to_datetime(bridge_df_combined['Message Time'], infer_datetime_format=True) - - - ax1 = plt.plot(bridge_df_combined['Topic'], bridge_df_combined['Payload Timestamp'], '|') - - #Plot start and end lines - start_time = pd.to_datetime(messaging_server_dfs[key]['Log_Timestamp(s)'].iloc[0]) - end_time = pd.to_datetime(messaging_server_dfs[key]['Log_Timestamp(s)'].iloc[-1]) - - plt.axhline(y = start_time, color = 'r', linestyle = '-', label = 'Test Start Time') - plt.axhline(y = end_time, color = 'r', linestyle = '-', label = 'Test End Time') - - plt.title('{} : Topics against time of dropped message'.format(key)) - plt.xlabel('Topics with dropped messages hours:mins:seconds') - plt.ylabel('Time of message drop') - xfmt = mdates.DateFormatter('%H:%M:%S') - plt.gcf().autofmt_xdate() - plt.show() - # plt.savefig('{}_Message_drop.png'.format(key)) - diff --git a/telematic_system/scripts/log_analysis/latencyPlotter.py b/telematic_system/scripts/log_analysis/latencyPlotter.py index b2cd827d..a2930b44 100644 --- a/telematic_system/scripts/log_analysis/latencyPlotter.py +++ b/telematic_system/scripts/log_analysis/latencyPlotter.py @@ -23,10 +23,12 @@ def concatRuns(folderName): allFiles.append(df) concatOutput = pd.concat(allFiles, axis=0, ignore_index=True) - concatOutput.to_csv(f'{folderName}_allruns.csv', index=False) + if not os.path.exists("output"): + os.mkdir("output") + concatOutput.to_csv(f'output/{folderName}_allruns.csv', index=False) def plotter(folderName): - allRuns = folderName + "_allruns.csv" + allRuns = "output/"+str(folderName) + "_allruns.csv" #read in the combined csv data data = pd.read_csv(allRuns) @@ -47,51 +49,21 @@ def plotter(folderName): print("95th Latency: " + str(trimmed_data["Delay(s)"].quantile(0.95))) #plot vehicle, streets, and cloud data histograms if they were part of the test - streets_data = trimmed_data[trimmed_data['Unit Id'] == "streets_id"] - - if len(streets_data) > 0: - fig, ax1 = plt.subplots() - fig.set_size_inches(10, 10) - sns.histplot(streets_data['Delay(s)'], kde=False) - plt.xlim(0, 0.75) - plt.xlabel('Latency(s)', fontsize=18) - plt.ylabel('Count', fontsize=18) - plt.xticks(fontsize=15) - plt.yticks(fontsize=15) - plt.title(folderName + " Streets Bridge Latency Histogram", fontsize=18) - plt.savefig(f'{folderName}_streets_latency_hist.png') - - - cloud_data = trimmed_data[trimmed_data['Unit Id'] == "cloud_id"] - - if len(cloud_data) > 0: - fig, ax1 = plt.subplots() - fig.set_size_inches(10, 10) - sns.histplot(cloud_data['Delay(s)'], kde=False) - plt.xlim(0, 0.75) - plt.xlabel('Latency(s)', fontsize=18) - plt.ylabel('Count', fontsize=18) - plt.xticks(fontsize=15) - plt.yticks(fontsize=15) - plt.title(folderName + " Cloud Bridge Latency Histogram", fontsize=18) - plt.savefig(f'{folderName}_cloud_latency_hist.png') - - vehicles = ["DOT-45244", "DOT-45254"] - for vehicle in vehicles: - vehicle_data = trimmed_data[trimmed_data['Unit Id'] == vehicle] + units = ["DOT-45244", "DOT-45254","DOT_45254","vehicle_id","rsu_1234","streets_id","cloud_id"] + for unit in units: + unit_data = trimmed_data[trimmed_data['Unit Id'] == unit] - if len(vehicle_data) > 0: + if len(unit_data) > 0: fig, ax1 = plt.subplots() fig.set_size_inches(10, 10) - sns.histplot(vehicle_data['Delay(s)'], kde=False) + sns.histplot(unit_data['Delay(s)'], kde=False) plt.xlim(0, 0.75) plt.xlabel('Latency(s)', fontsize=18) plt.ylabel('Count', fontsize=18) plt.xticks(fontsize=15) plt.yticks(fontsize=15) - plt.title(folderName + " " + vehicle + " Vehicle Bridge Latency Histogram", fontsize=18) - plt.savefig(f'{folderName}_{vehicle}_latency_hist.png') - + plt.title(folderName + " " + unit + " Latency Histogram", fontsize=18) + plt.savefig(f'output/{folderName}_{unit}_latency_hist.png') def main(): if len(sys.argv) < 2: diff --git a/telematic_system/scripts/log_analysis/parse_messaging_server_logs.py b/telematic_system/scripts/log_analysis/parse_messaging_server_logs.py index 02ccd7a4..8b9b0ca7 100644 --- a/telematic_system/scripts/log_analysis/parse_messaging_server_logs.py +++ b/telematic_system/scripts/log_analysis/parse_messaging_server_logs.py @@ -69,7 +69,7 @@ def parseInfluxfile(logname, start_time_epoch, end_time_epoch, run_num): # Get write time write_time_split = log_line.split("INFO") write_time_string = write_time_split[0][:len(write_time_split[0]) - 2] - log_time_in_datetime = datetime.datetime.strptime(write_time_string, '%Y-%m-%d %H:%M:%S.%f') + log_time_in_datetime = datetime.datetime.strptime(write_time_string, '%Y-%m-%dT%H:%M:%S.%fZ') payload_index = log_line.index(search_string) + 1 @@ -197,6 +197,8 @@ def main(): for index in range(0, len(test_df)): start_time_epoch = test_df['Start Time'].values[index] end_time_epoch = test_df['End Time'].values[index] + + local = pytz.timezone("America/New_York") @@ -204,7 +206,11 @@ def main(): run_num = test_df['Run'].values[index].split('R')[1] if int(run_num) in runs_range: - + print("start time epoch: " + str(start_time_epoch)) + print("end time epoch: " + str(end_time_epoch)) + print("test case: "+ test_case) + print("runs_string: "+ runs_string) + print(runs_range) print("Run num: ", run_num) parseInfluxfile(logname, start_time_epoch, end_time_epoch, run_num) diff --git a/telematic_system/scripts/log_analysis/parse_v2xhub_telematic_plugin_logs.py b/telematic_system/scripts/log_analysis/parse_v2xhub_telematic_plugin_logs.py index f64be0bd..53e283ad 100644 --- a/telematic_system/scripts/log_analysis/parse_v2xhub_telematic_plugin_logs.py +++ b/telematic_system/scripts/log_analysis/parse_v2xhub_telematic_plugin_logs.py @@ -4,13 +4,16 @@ import sys import json.decoder as decoder from enum import Enum +import datetime +import pytz +import re class Column(Enum): ''' Column The column name of the output csv file. ''' - UNIT_ID= "Unit ID" + UNIT_ID= "Unit Id" TOPIC= "Topic" PAYLOAD_TS= "Payload Timestamp" PAYLOAD_SRC= "Payload Source" @@ -30,6 +33,25 @@ class LogKey(Enum): EVENT_NAME= "event_name" UNIT_TYPE= "unit_type" +class LogTimeSheet: + ''' Read the log_timesheet.csv file to store the list of test cases and their start and end time of all runs within each test case.''' + def __init__(self, log_timesheet_path: str) -> None: + self.log_df = pd.read_csv(log_timesheet_path) + self.log_df = self.log_df.dropna() + self.log_df_dict = dict(tuple(self.log_df.groupby('Test Case'))) + + def get_run_duration(self, test_case_name: str, target_run_num: int): + '''Return start and end time for a given test case and run number.''' + test_case_log_table = self.log_df_dict[test_case_name] + for index, row in test_case_log_table.iterrows(): + run_num = re.sub(r"[a-z|A-Z]*", "", row['Run']) + if int(run_num) == target_run_num: + return (row['Start Time'], row['End Time']) + return tuple() + + def to_utc_datetime(self, timestamp): + return datetime.datetime.fromtimestamp(timestamp).astimezone(pytz.utc).replace(tzinfo=None) + class TelematicMessageConvertor: """ The `TelematicMessageConvertor` class is responsible for parsing and converting v2xhub telematic bridge log files into a CSV format. @@ -53,18 +75,41 @@ def convert_to_json(self, msg: str): except (decoder.JSONDecodeError , TypeError) as e: return sys.exit("Could not decode JSON message " + str(e)) - def to_csv(self, output_path: str): - df = pd.DataFrame.from_dict(self.published_msg, orient='columns') - df.to_csv(output_path, index=False) + def to_csv(self, data: pd.DataFrame, output_path: str): + data.to_csv(output_path, index=False, date_format='%Y-%m-%d %H:%M:%S.%f') + + def get_filter_published_msg_frame(self, start_time_utc, end_time_utc): + listmsg = [] + published_msg_frame = pd.DataFrame.from_dict(self.published_msg) + for index, row in published_msg_frame.iterrows(): + if row['Payload Timestamp']>=start_time_utc and row['Payload Timestamp']<= end_time_utc: + listmsg.append(row) + filtered_published_msg=pd.DataFrame(listmsg) + return filtered_published_msg + def append_published_msg(self, msg_json: dict): - self.published_msg[Column.EVENT_NAME.value].append(msg_json[LogKey.EVENT_NAME.value]) - self.published_msg[Column.PAYLOAD_TS.value].append(str(msg_json[LogKey.PAYLOAD_TS.value])) + '''Append the msg in JSON format as a row in the published msg dictionary.''' + self.published_msg[Column.EVENT_NAME.value].append(msg_json[LogKey.EVENT_NAME.value]) + payload_time_str = str(msg_json[LogKey.PAYLOAD_TS.value]) + payload_timestamp_utc = self.convert_timestamp(payload_time_str) + self.published_msg[Column.PAYLOAD_TS.value].append(payload_timestamp_utc) self.published_msg[Column.TOPIC.value].append(msg_json[LogKey.TOPIC.value]) self.published_msg[Column.PAYLOAD_SRC.value].append(msg_json[LogKey.PAYLOAD.value][LogKey.SOURCE.value]) self.published_msg[Column.UNIT_ID.value].append(msg_json[LogKey.UNIT_ID.value]) self.published_msg[Column.UNIT_TYPE.value].append(msg_json[LogKey.UNIT_TYPE.value]) + def convert_timestamp(self, timestamp_str: str): + payload_timestamp = float(timestamp_str) + time_in_s = payload_timestamp/1e6 + time_in_s = float("{:.6f}".format(time_in_s)) + payload_timestamp_utc = self.to_utc_datetime(time_in_s) + return payload_timestamp_utc + + def to_utc_datetime(self, timestamp): + '''Convert timestamp to datetime and UTC timezone.''' + return datetime.datetime.fromtimestamp(timestamp).astimezone(pytz.utc).replace(tzinfo=None) + def split_lines(self, chunk: str)-> tuple[list[str], str]: lines = chunk.split('\n') remaining_part = lines.pop(-1) @@ -72,7 +117,7 @@ def split_lines(self, chunk: str)-> tuple[list[str], str]: def is_published_msg(self, line: str)->bool: return 'Published:' in line and 'TelematicUnit' in line - + def extract_published_msg(self, line: str)->str: msg_idx = line.find("Published:") msg_end_idx = line.find('n","stream"') @@ -102,13 +147,36 @@ def parse_log_file(self, log_file_path: str): except FileNotFoundError as e: sys.exit("Could not find file " + str(e)) + def get_test_case_run_nums(self, file_path: str): + '''Get the test case and run numbers for each test case given the filepath.''' + test_case = (file_path.split("/")[-1]).split("_")[0] + runs_string = ((file_path.split("/")[-1]).split("_")[1].split(".")[0])[1:] + runs_range_split = runs_string.split('-') + if len(runs_range_split) == 1: + runs_range = range(int(runs_range_split[0]),int(runs_range_split[0]) + 1) + else: + runs_range = range(int(runs_range_split[0]),int(runs_range_split[1]) + 1) + return test_case, runs_range + if __name__ == "__main__": parser = argparse.ArgumentParser(prog="v2xhub_telematic_bridge_log_parser", description="Parse v2xhub telematic bridge log file") parser.add_argument("--log_file_path", required=True, help="Path to v2xhub telematic bridge log file") - parser.add_argument("--output_path",required=False, default="v2xhub_telematic_published_msgs.csv",help="Path to output csv file") args = parser.parse_args() - + print("Parsing "+ args.log_file_path+" ...") converter = TelematicMessageConvertor() converter.parse_log_file(args.log_file_path) - converter.to_csv(args.output_path) \ No newline at end of file + test_case, target_run_range = converter.get_test_case_run_nums(args.log_file_path) + + log_timesheet=LogTimeSheet("log_timesheet.csv") + + for target_run_num in target_run_range: + start_time, end_time = log_timesheet.get_run_duration(test_case, target_run_num) + start_time_utc = log_timesheet.to_utc_datetime(start_time) + end_time_utc = log_timesheet.to_utc_datetime(end_time) + print("\n>>> Searching Test case: "+ test_case + ", run number: " + str(target_run_num)+ ", start time: " + str(start_time_utc) + ", end time: " + str(end_time_utc)) + filter_published_msg_frame = converter.get_filter_published_msg_frame(start_time_utc, end_time_utc) + print(filter_published_msg_frame) + if not filter_published_msg_frame.empty: + converter.to_csv(filter_published_msg_frame, args.log_file_path.split("/")[-1]+"_"+ str(target_run_num)+"_parsed.csv") + diff --git a/telematic_system/scripts/log_analysis/parse_vehicle_bridge_logs.py b/telematic_system/scripts/log_analysis/parse_vehicle_bridge_logs.py index 426447dd..6ba4a5f4 100644 --- a/telematic_system/scripts/log_analysis/parse_vehicle_bridge_logs.py +++ b/telematic_system/scripts/log_analysis/parse_vehicle_bridge_logs.py @@ -11,6 +11,7 @@ def parseVehicleBridgeLogs(logname,start_time_epoch, end_time_epoch, run_num): filename = logname.split(".")[0] + print(filename) with open(logname, 'r') as vehicle_bridge_log: results_file = open(f'{filename}_{run_num}_parsed.csv', 'w') @@ -89,6 +90,8 @@ def main(): test_case = (logname.split("/")[-1]).split("_")[0] runs_string = ((logname.split("/")[-1]).split("_")[1].split(".")[0])[1:] runs_range_split = runs_string.split('-') + print("test case: "+ test_case) + print("runs_string: "+ runs_string) if len(runs_range_split) == 1: runs_range = range(int(runs_range_split[0]),int(runs_range_split[0]) + 1) else: @@ -109,6 +112,9 @@ def main(): if int(run_num) in runs_range: + print("start time epoch: " + str(start_time_epoch)) + print("end time epoch: " + str(end_time_epoch)) + print(runs_range) print("Run num: ", run_num) parseVehicleBridgeLogs(logname,start_time_epoch, end_time_epoch, run_num) diff --git a/telematic_system/scripts/log_analysis/requirements.txt b/telematic_system/scripts/log_analysis/requirements.txt new file mode 100644 index 00000000..45685b92 --- /dev/null +++ b/telematic_system/scripts/log_analysis/requirements.txt @@ -0,0 +1,5 @@ +pandas +seaborn +matplotlib +pycairo +PyGObject