diff --git a/README.md b/README.md index f8a0a726..8d5b409c 100644 --- a/README.md +++ b/README.md @@ -3,30 +3,29 @@ |-----|-----|-----| [![Docker build](https://github.com/usdot-fhwa-stol/cda-telematics/actions/workflows/docker.yml/badge.svg?branch=develop)](https://github.com/usdot-fhwa-stol/cda-telematics/actions/workflows/docker.yml)| [![Docker build](https://github.com/usdot-fhwa-stol/cda-telematics/actions/workflows/docker.yml/badge.svg?branch=master)](https://github.com/usdot-fhwa-stol/cda-telematics/actions/workflows/docker.yml)| [![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=usdot-fhwa-stol_cda-telematics&metric=alert_status)](https://sonarcloud.io/summary/new_code?id=usdot-fhwa-stol_cda-telematics) - # CDA-Telematics This project will create an open-source Module that can be installed on any vehicle (e.g. a CARMA Platform and/or Messenger vehicle, an L0 or L1 production vehicle, etc.) that will collect data about the vehicle and wirelessly send it out in real time for data analysis. The same Module, with any modifications, if necessary, will also be compatible with CARMA Streets and CARMA Cloud. On the receiving end of this data, a user will have a Data Processing & Visualization Tool available to visualize and/or plot the data that was sent using the Module(s). This Module can be thought of as a Fleet Management tool with extra capabilities to support CDA research and education. ## Architecture Diagram [Detailed Design](https://usdot-carma.atlassian.net/wiki/spaces/WFD2/pages/2230321179/Detailed+System+Design) - + ![architecture](https://user-images.githubusercontent.com/34483068/171265484-67177ebb-69f7-4286-9602-016043079958.png) ## Release Notes The current version of CDA-Telematics tool and release history of the CARMA software platform: [CARMA Release Notes]() ## Documentation -Documentation of the setup, operation, and design of the CDA Telematics can be found on the project [Confluence](https://usdot-carma.atlassian.net/wiki/spaces/WFD2/overview) pages. +Documentation of the setup, operation, and design of the CDA Telematics can be found on the project [Confluence](https://usdot-carma.atlassian.net/wiki/spaces/WFD2/overview) pages. ## Contribution -Welcome to the CDA Telematics contributing guide. Please read this guide to learn about our development process, how to propose pull requests and improvements, and how to build and test your changes to this project. [CDA Telematics Contributing Guide](Contributing.md) +Welcome to the CDA Telematics contributing guide. Please read this guide to learn about our development process, how to propose pull requests and improvements, and how to build and test your changes to this project. [CDA Telematics Contributing Guide](Contributing.md) -## Code of Conduct +## Code of Conduct Please read our [CDA Telematics Code of Conduct](Code_of_Conduct.md) which outlines our expectations for participants within the developer community, as well as steps to reporting unacceptable behavior. We are committed to providing a welcoming and inspiring community for all and expect our code of conduct to be honored. Anyone who violates this code of conduct may be banned from the community. ## Attribution -The development team would like to acknowledge the people who have made direct contributions to the design and code in this repository. [CDA Telematics Attribution](ATTRIBUTION.md) +The development team would like to acknowledge the people who have made direct contributions to the design and code in this repository. [CDA Telematics Attribution](ATTRIBUTION.md) ## License By contributing to the Federal Highway Administration (FHWA) CDA Telematics repository, you agree that your contributions will be licensed under its Apache License 2.0 license. [CDA Telematics License]() diff --git a/telematic_system/docker-compose.cloud.servers.yml b/telematic_system/docker-compose.cloud.servers.yml index 63ead0bd..a755a309 100644 --- a/telematic_system/docker-compose.cloud.servers.yml +++ b/telematic_system/docker-compose.cloud.servers.yml @@ -27,7 +27,7 @@ services: command: bash -c 'wait-for-it localhost:4222 && java -jar /telematic_cloud_messaging/app.jar' env_file: - .env - + rosbag2_processing_service: build: context: ./telematic_historical_data_processing diff --git a/telematic_system/docker-compose.local.yml b/telematic_system/docker-compose.local.yml index c776dff1..d8c889fc 100644 --- a/telematic_system/docker-compose.local.yml +++ b/telematic_system/docker-compose.local.yml @@ -135,7 +135,7 @@ services: build: context: "./telematic_cloud_messaging" container_name: messaging_server - image: usdotfhwastoldev/telematic_local_messaging:develop + image: usdotfhwastoldev/telematic_cloud_messaging:develop depends_on: - nats - mysqldb diff --git a/telematic_system/docker-compose.webapp.yml b/telematic_system/docker-compose.webapp.yml index b5ca8398..d67b0e7c 100644 --- a/telematic_system/docker-compose.webapp.yml +++ b/telematic_system/docker-compose.webapp.yml @@ -5,7 +5,7 @@ services: context: "./telematic_apps/web_app/server" restart: always container_name: web_server - image: usdotfhwastoldev/telematic_web_server:develop + image: usdotfhwastoldevdev/telematic_web_server:develop logging: options: max-size: "10m" @@ -35,11 +35,11 @@ services: - UPLOAD_HTTP_PORT=9011 - UPLOAD_TIME_OUT=3600000 # Milliseconds - UPLOAD_MAX_FILE_SIZE=21474836480 #20 GB - - CONCURRENT_QUEUE_SIZE=5 - - PART_SIZE=10485760 + - CONCURRENT_QUEUE_SIZE=5 + - PART_SIZE=10485760 - NATS_SERVERS=:4222 - FILE_PROCESSING_SUBJECT=ui.file.processing - - FILE_EXTENSIONS=.mcap + - FILE_EXTENSIONS=.mcap command: bash -c '/app/service.sh' volumes: - /opt/apache2/grafana_htpasswd:/opt/apache2/grafana_htpasswd diff --git a/telematic_system/scripts/log_analysis/README.md b/telematic_system/scripts/log_analysis/README.md new file mode 100644 index 00000000..69d8ba7b --- /dev/null +++ b/telematic_system/scripts/log_analysis/README.md @@ -0,0 +1,147 @@ +# Prerequisite +- Preferred operating system ubuntu 20 or above +- Python environment setup + 1. Install python + ``` + sudo apt update + sudo apt install python3 + ``` + 2. Check python version + ``` + python3 --version + ``` + Recommended version is `3.10` + 3. Create a virtual environment. Navigate to `cda-telematics/telematic_system/scripts/log_analysis` directory, and run below command: + ``` + python3 -m venv .venv + ``` + 4. Activate virtual environment. + ``` + .venv\bin\activate + ``` + Note: Need to run this command to activate virtual environment every time openning a new terminal. +- Install depedencies: + - Install debian packages + ``` + sudo apt install libcairo2-dev libxt-dev libgirepository1.0-dev + + ``` + - Install python packages + ``` + pip install -r requirements.txt + ``` +- Clone repos: + - Clone cda-telematics GitHub repos + ``` + git clone https://github.com/usdot-fhwa-stol/cda-telematics.git + cd cda-telematics + ``` +- Download `log_timesheet.csv` +Most of the python analysis scripts refer to `log_timesheet.csv` for test runs and their duration. Since this `log_timesheet.csv` is generated during the verification/validation testing, ensure download the `log_timesheet.csv` file to this `log_analysis` folder before executing any python scripts. + + +# Process V2xHub bridge log +1. Navigate to `cda-telematics/telematic_system/scripts/log_analysis` directory +2. Download v2xhub logs to the current folder. +3. Run command to generate data publishing metrics. + ``` + python3 parse_v2xhub_telematic_plugin_logs.py --log_file_path + + e.g: + python3 parse_v2xhub_telematic_plugin_logs.py --log_file_path T20_R6-13_V2XHub.log + ``` + It will generate parsed bridge log in csv files. + +# Process Streets bridge log +1. Navigate to `cda-telematics/telematic_system/scripts/log_analysis` directory +2. Download streets bridge logs to the current folder. +3. Run command to generate data publishing metrics. + ``` + python3 parse_streets_bridge_logs.py + ``` + It will generate parsed bridge log in csv files. + +# Process Cloud bridge log +1. Navigate to `cda-telematics/telematic_system/scripts/log_analysis` directory +2. Download streets bridge logs to the current folder. +3. Run command to generate data publishing metrics. + ``` + parse_cloud_bridge_logs.py + + e.g: + python3 parse_cloud_bridge_logs.py T20_R6-9_carma_cloud.log + python3 parse_cloud_bridge_logs.py T20_R10-13_carma_cloud.log + ``` + It will generate parsed bridge log in csv files. + +# Process Vehicle bridge log +1. Navigate to `cda-telematics/telematic_system/scripts/log_analysis` directory +2. Download vehicle bridge logs to the current folder. +3. Run command to generate data publishing metrics. + ``` + python3 parse_vehicle_bridge_logs.py + + e.g: + python3 parse_vehicle_bridge_logs.py T20_R6_R13_fusion/T20_R6_fusion.log + ``` + It will generate parsed bridge log in csv files. + +# Process Messaging Server log +1. Navigate to `cda-telematics/telematic_system/scripts/log_analysis` directory +2. Download messaging server logs to the current folder. +3. Run command to generate data publishing metrics. + ``` + parse_messaging_server_logs.py + + e.g: + python3 parse_messaging_server_logs.py T20_R6-13_messaging_server.log + ``` + It will generate parsed messaging server delay and message drop log in csv files. + +# Metric analysis +## Latency +1. Create a folder with the test case name in the current `log_analysis` folder. +For example, test case 20: + ``` + mkdir T20 + ``` +2. Copy all the generated T20_*_messaging_server_*_delay_parsed.csv files to this new folder `T20` +3. Run plot latency script to generate plots for those csv files with delay metrics in folder `T20`. + ``` + python3 latencyPlotter.py + + e.g: + python3 latencyPlotter.py T20 + ``` + The generated plots are saved into `output` folder. +## Message loss +1. Create a folder with the test case name and message drop in the current `log_analysis` folder. +For example, test case 20: + ``` + mkdir T20_message_drop + + ``` +2. Copy all generated _*_messaging_server_*_message_drop_parsed.csv files to this new folder `_message_drop`. +3. Copy all generated bridge csv files into the same folder +4. Run message drop analysis script to analyze all files in the `_message_drop` folder. + ``` + python3 get_message_drop.py _message_drop + + e.g: + python3 get_message_drop.py T20_message_drop + ``` +Generated result is similar to below: +
+![Message_loss_result](https://github.com/user-attachments/assets/15fefacb-e929-4340-a0e3-6d7f6441ba8e) + +## Rosbag Processing time +1. Navigate to `cda-telematics/telematic_system/scripts/log_analysis` directory +2. Download historical data processing service logs to the current folder. +3. Run command to generate data publishing metrics. + ``` + parse_processing_service_logs.py + + e.g: + python3 parse_processing_service_logs.py T19_R1_R5_rosbag2.log + ``` + It will print the time required to process each rosbag .mcap file and the average time required for all the files in the log. diff --git a/telematic_system/scripts/log_analysis/get_message_drop.py b/telematic_system/scripts/log_analysis/get_message_drop.py index ada64cc6..dc18ae7e 100644 --- a/telematic_system/scripts/log_analysis/get_message_drop.py +++ b/telematic_system/scripts/log_analysis/get_message_drop.py @@ -9,17 +9,17 @@ import matplotlib.dates as mdates import matplotlib.pyplot as plt - +import os warnings.filterwarnings("ignore") ''' This script combines bridge logs with the messaging server logs to give the number of dropped messages from each unit. -Input: The script looks within the the argument directory for csv files from Messaging Server, Vehicle Bridge, Streets Bridge and Cloud Bridge log +Input: The script looks within the the argument directory for csv files from Messaging Server, Vehicle Bridge, Streets Bridge and Cloud Bridge log ,which are parsed log output from the bridges, to calculate the number of dropped messages from each unit. -Required Input File Format: -The csv files to be read currently need to follow a specific format. +Required Input File Format: +The csv files to be read currently need to follow a specific format. The messaging server parsed csv needs to start with the word "Messaging" separated by underscores Streets bridge parsed csv file name needs to start with the word Streets separated by underscores(_) Vehicle bridge parsed csv file name needs to start with the word Vehicle or BlueLexus or Fusion separated by underscores(_) @@ -32,32 +32,31 @@ def combineFiles(log_dir): path_obj = Path(log_dir) print(log_dir) filenames = [ f.name for f in path_obj.glob('*.csv')] - + bridge_csv_exist = False - bridge_csv_regex = r'.*(Streets|Vehicle|BlueLexus|Fusion|V2xHub|Cloud).*' + bridge_csv_regex = r'.*(Streets|Vehicle|BlueLexus|Fusion|V2xHub|Cloud|Ros2).*' bridges_csv = [] messaging_server_csv_exist = False - messaging_server_csv = "" + messaging_server_csv = [] - for filename in filenames: - if "Messaging" in filename: + for filename in filenames: + if "messaging" in filename.lower(): messaging_server_csv_exist = True - messaging_server_csv = log_dir + "/" + filename + messaging_server_csv.append(log_dir + "/" + filename) matched = re.match(bridge_csv_regex, filename, re.IGNORECASE) if matched: bridges_csv.append(log_dir + "/" + filename) bridge_csv_exist = True - + if not bridge_csv_exist: sys.exit("Did not find any Vehicle/Streets/Cloud/BlueLexus/Fusion/V2xHub bridge csv logs in directory: " +log_dir+ "") - + if not messaging_server_csv_exist: sys.exit("Did not find any Messaging server csv logs in directory: "+log_dir+ "") - - messaging_server_df = pd.read_csv(messaging_server_csv) + messaging_server_df = pd.concat(map(pd.read_csv, messaging_server_csv), ignore_index=True) infrastructure_units = ['streets_id', 'cloud_id'] ############# Load messaging server logs and get a list of dataframes for all unit ids @@ -69,27 +68,23 @@ def combineFiles(log_dir): if key not in infrastructure_units: value = value[~value['Message Time'].isnull()] # value = value.drop('Metadata',axis =1) - - - #Get dataframes from bridge logs - bridge_dfs = dict() - for bridge_csv in bridges_csv: - bridge_df = pd.read_csv(bridge_csv) - bridge_dfs.update(dict(tuple(bridge_df.groupby('Unit Id')))) - print(bridge_dfs.keys()) + bridge_df = pd.concat(map(pd.read_csv, bridges_csv), ignore_index=True) + bridge_dfs = dict(tuple(bridge_df.groupby('Unit Id'))) - # Create combined dataframes from + + # Create combined dataframes from for key in bridge_dfs: if key in messaging_server_dfs: - bridge_df_combined = pd.merge(bridge_dfs[key], messaging_server_dfs[key], how='left', left_on=['Topic','Payload Timestamp'], right_on = ['Topic','Message Time']) - bridge_df_combined.to_csv(log_dir + key + "_combined.csv") + if not os.path.exists("output"): + os.mkdir("output") + bridge_df_combined.to_csv("output/"+log_dir+"_"+ key + "_combined.csv") bridge_missing_message_count = bridge_df_combined['Log_Timestamp(s)'].isnull().sum() bridge_total_message_count = len(bridge_df_combined['Payload Timestamp']) - print("Message drop for unit: ", key) + print("\nMessage drop for unit: ", key) print("Missing count: ", bridge_missing_message_count) print("Total count: ", bridge_total_message_count) print("Percentage of messages received",(1 - (bridge_missing_message_count/bridge_total_message_count))*100) @@ -97,34 +92,11 @@ def combineFiles(log_dir): topics_with_empty_count = (bridge_df_combined['Message Time'].isnull().groupby([bridge_df_combined['Topic']]).sum().astype(int).reset_index(name='count')) topics_with_empty_count = topics_with_empty_count.loc[~(topics_with_empty_count['count']==0)] - + print("{} missed messages: ".format(key)) print(topics_with_empty_count) - # Plot vehicle data - bridge_df_combined = bridge_df_combined[bridge_df_combined['Message Time'].isnull()] - bridge_df_combined['Payload Timestamp'] = pd.to_datetime(bridge_df_combined['Payload Timestamp'], infer_datetime_format=True) - bridge_df_combined['Message Time'] = pd.to_datetime(bridge_df_combined['Message Time'], infer_datetime_format=True) - - - ax1 = plt.plot(bridge_df_combined['Topic'], bridge_df_combined['Payload Timestamp'], '|') - - #Plot start and end lines - start_time = pd.to_datetime(messaging_server_dfs[key]['Log_Timestamp(s)'].iloc[0]) - end_time = pd.to_datetime(messaging_server_dfs[key]['Log_Timestamp(s)'].iloc[-1]) - - plt.axhline(y = start_time, color = 'r', linestyle = '-', label = 'Test Start Time') - plt.axhline(y = end_time, color = 'r', linestyle = '-', label = 'Test End Time') - - plt.title('{} : Topics against time of dropped message'.format(key)) - plt.xlabel('Topics with dropped messages hours:mins:seconds') - plt.ylabel('Time of message drop') - xfmt = mdates.DateFormatter('%H:%M:%S') - plt.gcf().autofmt_xdate() - plt.show() - # plt.savefig('{}_Message_drop.png'.format(key)) - - + diff --git a/telematic_system/scripts/log_analysis/latencyPlotter.py b/telematic_system/scripts/log_analysis/latencyPlotter.py index b2cd827d..4dcef5ea 100644 --- a/telematic_system/scripts/log_analysis/latencyPlotter.py +++ b/telematic_system/scripts/log_analysis/latencyPlotter.py @@ -23,10 +23,12 @@ def concatRuns(folderName): allFiles.append(df) concatOutput = pd.concat(allFiles, axis=0, ignore_index=True) - concatOutput.to_csv(f'{folderName}_allruns.csv', index=False) + if not os.path.exists("output"): + os.mkdir("output") + concatOutput.to_csv(f'output/{folderName}_allruns.csv', index=False) def plotter(folderName): - allRuns = folderName + "_allruns.csv" + allRuns = "output/"+str(folderName) + "_allruns.csv" #read in the combined csv data data = pd.read_csv(allRuns) @@ -47,63 +49,30 @@ def plotter(folderName): print("95th Latency: " + str(trimmed_data["Delay(s)"].quantile(0.95))) #plot vehicle, streets, and cloud data histograms if they were part of the test - streets_data = trimmed_data[trimmed_data['Unit Id'] == "streets_id"] - - if len(streets_data) > 0: - fig, ax1 = plt.subplots() - fig.set_size_inches(10, 10) - sns.histplot(streets_data['Delay(s)'], kde=False) - plt.xlim(0, 0.75) - plt.xlabel('Latency(s)', fontsize=18) - plt.ylabel('Count', fontsize=18) - plt.xticks(fontsize=15) - plt.yticks(fontsize=15) - plt.title(folderName + " Streets Bridge Latency Histogram", fontsize=18) - plt.savefig(f'{folderName}_streets_latency_hist.png') + units = ["DOT-45244", "DOT-45254","DOT_45254","vehicle_id","rsu_1234","streets_id","cloud_id"] + for unit in units: + unit_data = trimmed_data[trimmed_data['Unit Id'] == unit] - - cloud_data = trimmed_data[trimmed_data['Unit Id'] == "cloud_id"] - - if len(cloud_data) > 0: - fig, ax1 = plt.subplots() - fig.set_size_inches(10, 10) - sns.histplot(cloud_data['Delay(s)'], kde=False) - plt.xlim(0, 0.75) - plt.xlabel('Latency(s)', fontsize=18) - plt.ylabel('Count', fontsize=18) - plt.xticks(fontsize=15) - plt.yticks(fontsize=15) - plt.title(folderName + " Cloud Bridge Latency Histogram", fontsize=18) - plt.savefig(f'{folderName}_cloud_latency_hist.png') - - vehicles = ["DOT-45244", "DOT-45254"] - for vehicle in vehicles: - vehicle_data = trimmed_data[trimmed_data['Unit Id'] == vehicle] - - if len(vehicle_data) > 0: + if len(unit_data) > 0: fig, ax1 = plt.subplots() - fig.set_size_inches(10, 10) - sns.histplot(vehicle_data['Delay(s)'], kde=False) + fig.set_size_inches(10, 10) + sns.histplot(unit_data['Delay(s)'], kde=False) plt.xlim(0, 0.75) plt.xlabel('Latency(s)', fontsize=18) plt.ylabel('Count', fontsize=18) plt.xticks(fontsize=15) plt.yticks(fontsize=15) - plt.title(folderName + " " + vehicle + " Vehicle Bridge Latency Histogram", fontsize=18) - plt.savefig(f'{folderName}_{vehicle}_latency_hist.png') - + plt.title(folderName + " " + unit + " Latency Histogram", fontsize=18) + plt.savefig(f'output/{folderName}_{unit}_latency_hist.png') def main(): if len(sys.argv) < 2: print('Run with: "python3 latencyPlotter.py testcase"') - else: + else: test = sys.argv[1] - + concatRuns(test) plotter(test) if __name__ == "__main__": main() - - - diff --git a/telematic_system/scripts/log_analysis/parse_messaging_server_logs.py b/telematic_system/scripts/log_analysis/parse_messaging_server_logs.py index 02ccd7a4..ac870669 100644 --- a/telematic_system/scripts/log_analysis/parse_messaging_server_logs.py +++ b/telematic_system/scripts/log_analysis/parse_messaging_server_logs.py @@ -20,7 +20,6 @@ def parseInfluxfile(logname, start_time_epoch, end_time_epoch, run_num): fileName = logname.split(".")[0] - with open(logname,'r') as influx_log: delay_results_file = open('{}_{}_delay_parsed.csv'.format(fileName,run_num), 'w') @@ -56,7 +55,6 @@ def parseInfluxfile(logname, start_time_epoch, end_time_epoch, run_num): # Since the message is a flattened json, it needs to be split manually and read to extract required info (unit_id, topic name, message/payload timestamp and log timestamp) for line in influx_log: split_line = line.split(":") - if search_string in split_line: # Get log json try: @@ -64,28 +62,28 @@ def parseInfluxfile(logname, start_time_epoch, end_time_epoch, run_num): except: skipped_lines_count += 1 continue - + log_line = json_object["log"] - # Get write time + # Get write time write_time_split = log_line.split("INFO") write_time_string = write_time_split[0][:len(write_time_split[0]) - 2] - log_time_in_datetime = datetime.datetime.strptime(write_time_string, '%Y-%m-%d %H:%M:%S.%f') - - + log_time_in_datetime = datetime.datetime.strptime(write_time_string, '%Y-%m-%dT%H:%M:%S.%fZ') + + payload_index = log_line.index(search_string) + 1 payload = log_line[payload_index + len(search_string) + 1:] - + # Convert Payload to a json - + payload = "event=" + payload # Remove timestamp at end of line payload = payload.rsplit(" ", 1)[0] payload_list = payload.split(",") - + unit_type = "" topic_name = "" metadata_field = "" - + for item in payload_list: # Get topic name @@ -102,17 +100,16 @@ def parseInfluxfile(logname, start_time_epoch, end_time_epoch, run_num): if "unit_id" in item_split: unit_id = item_split[1] continue - # If topic is map_msg, get timestamp from metadata.timestamp if topic_name == "v2xhub_map_msg_in": if "metadata.timestamp" in item_split: # Get metadata timestamp message_timestamp = int(item_split[1].split(' ')[0]) time_in_s = message_timestamp/1000 - payload_time_in_datetime = datetime.datetime.fromtimestamp(time_in_s) + payload_time_in_datetime = datetime.datetime.fromtimestamp(time_in_s) payload_time_in_datetime = (payload_time_in_datetime.astimezone(pytz.utc)).replace(tzinfo=None) - + # Convert timestamp to datetime try: @@ -120,15 +117,15 @@ def parseInfluxfile(logname, start_time_epoch, end_time_epoch, run_num): if topic_name != "v2xhub_map_msg_in": payload_timestamp_string = str(log_line.split(" ")[-1])[:-1] payload_timestamp = float(payload_timestamp_string) - payload_time_in_datetime = datetime.datetime.fromtimestamp(payload_timestamp/1e6) + payload_time_in_datetime = datetime.datetime.fromtimestamp(payload_timestamp/1e6) payload_time_in_datetime = (payload_time_in_datetime.astimezone(pytz.utc)).replace(tzinfo=None) except: print("Could not get payload timestamp from topic: {}. Skipping".format(topic_name)) continue - - + + # If within test window if log_time_in_datetime > start_time and log_time_in_datetime < end_time : delay = (log_time_in_datetime - payload_time_in_datetime).total_seconds() @@ -137,25 +134,25 @@ def parseInfluxfile(logname, start_time_epoch, end_time_epoch, run_num): else: delay_records.append(delay) delay_writer.writerow([unit_id, topic_name, payload_time_in_datetime, log_time_in_datetime, delay]) - + message_drop_writer.writerow([unit_id, topic_name, payload_time_in_datetime, log_time_in_datetime]) - + if topic_name not in unique_topics: - unique_topics.append(topic_name) - + unique_topics.append(topic_name) + total_rows += 1 - + if log_time_in_datetime > end_time: break - + print("Number of unique topics: ", len(unique_topics)) print(unique_topics) - + ## Calculate required statistics delay_np_array = np.array(delay_records) - if delay_np_array.size > 1: + if delay_np_array.size > 1: delay_max = np.amax(delay_np_array) delay_mean = np.mean(delay_np_array) delay_stdev = np.std(delay_np_array) @@ -172,17 +169,17 @@ def read_log_table(): log_df = pd.read_csv(log_csv) log_df = log_df.dropna() log_df_dict = dict(tuple(log_df.groupby('Test Case'))) - + return log_df_dict - + def main(): if len(sys.argv) < 2: print('Run with: "python3 parse_messaging_server_logs.py logname"') - else: + else: logname = sys.argv[1] log_timesheet_df = read_log_table() - + test_case = (logname.split("/")[-1]).split("_")[0] runs_string = ((logname.split("/")[-1]).split("_")[1].split(".")[0])[1:] runs_range_split = runs_string.split('-') @@ -190,28 +187,31 @@ def main(): runs_range = range(int(runs_range_split[0]),int(runs_range_split[0]) + 1) else: runs_range = range(int(runs_range_split[0]),int(runs_range_split[1]) + 1) - + test_df = log_timesheet_df[test_case] - + for index in range(0, len(test_df)): start_time_epoch = test_df['Start Time'].values[index] end_time_epoch = test_df['End Time'].values[index] - - local = pytz.timezone("America/New_York") - + + + + local = pytz.timezone("America/New_York") + run_num = test_df['Run'].values[index].split('R')[1] - - if int(run_num) in runs_range: + if int(run_num) in runs_range: + print("start time epoch: " + str(start_time_epoch)) + print("end time epoch: " + str(end_time_epoch)) + print("test case: "+ test_case) + print("runs_string: "+ runs_string) + print(runs_range) print("Run num: ", run_num) parseInfluxfile(logname, start_time_epoch, end_time_epoch, run_num) - + if __name__ == "__main__": main() - - - diff --git a/telematic_system/scripts/log_analysis/parse_processing_service_logs.py b/telematic_system/scripts/log_analysis/parse_processing_service_logs.py new file mode 100644 index 00000000..b0f3f348 --- /dev/null +++ b/telematic_system/scripts/log_analysis/parse_processing_service_logs.py @@ -0,0 +1,85 @@ +import re +import sys + +from enum import Enum +import datetime +import pytz +import numpy as np + +class ProcessingStatus(Enum): + IN_PROGRESS= "IN_PROGRESS" + COMPLETED= "COMPLETED" + ERROR= "ERROR" + + +def to_utc_datetime(self, timestamp): + return datetime.datetime.fromtimestamp(timestamp).astimezone(pytz.utc).replace(tzinfo=None) + +def parse_start_end_processing_line(line, search_str): + # Regex match to extract timestamp of the form: "log":"2024-07-23 15:43:45,821 + timestamp_match = re.search(r'"log":"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d+)', line) + if timestamp_match: + timestamp_str = timestamp_match.group(1) + else: + print("Could not find processing time for rosbag") + sys.exit() + + log_timestamp = datetime.datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S,%f') + + + # Extract the rosbag name + rosbag_match = re.search(fr'{search_str} (\S+\.mcap)', line) + rosbag_name = rosbag_match.group(1) if rosbag_match else None + + return log_timestamp, rosbag_name + + +def parse_processing_service_logs(logfile): + processing_started_msg_str = "Processing rosbag:" + processing_completed_msg_str = "Completed rosbag processing for:" + processing_status_str = "Updated mysql entry for" + + processing_times = [] + + with open(logfile, 'r') as file: + for line in file: + if processing_started_msg_str in line: + # Start line sample string: "log":"2024-07-23 17:38:16,055 - rosbag2_processing_service - process_rosbag - INFO - Processing rosbag: rosbag2_2024_06_06_201142_0_verification_T19_R1.mcap + start_timestamp, rosbag_name = parse_start_end_processing_line(line, processing_started_msg_str) + continue + + elif processing_completed_msg_str in line: + # {"log":"2024-07-23 17:38:15,044 - rosbag2_processing_service - process_rosbag - INFO - Completed rosbag processing for: rosbag2_2024_06_06_194729_0_verification_T19_R1.mcap\n","stream":"stderr","time":"2024-07-23T17:38:15.044890288Z"} + end_timestamp, rosbag_name = parse_start_end_processing_line(line, processing_completed_msg_str) + processing_time_in_seconds = (end_timestamp - start_timestamp).total_seconds() + continue + + elif processing_status_str in line: + status_pattern = r'\b(COMPLETED|IN_PROGRESS|ERROR)\b' + status_match = re.search(status_pattern, line) + status = status_match.group(1) if status_match else None + + if status == ProcessingStatus.COMPLETED.value: + print(f"rosbag_name: {rosbag_name} processing_time: {processing_time_in_seconds} status: {status}") + processing_times.append(processing_time_in_seconds) + + + arr = np.array(processing_times, dtype='float32') + average_completion_time = np.mean(arr, axis=0) + print(f'average_completion_time in seconds for {len(processing_times)} files: {average_completion_time}') + + + + + +def main(): + if len(sys.argv) < 2: + print('Run with: "python3 parse_processing_service_logs.py logname"') + else: + logname = sys.argv[1] + + parse_processing_service_logs(logname) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/telematic_system/scripts/log_analysis/parse_streets_bridge_logs.py b/telematic_system/scripts/log_analysis/parse_streets_bridge_logs.py index 3a46d091..04811ecd 100644 --- a/telematic_system/scripts/log_analysis/parse_streets_bridge_logs.py +++ b/telematic_system/scripts/log_analysis/parse_streets_bridge_logs.py @@ -7,6 +7,7 @@ import re import numpy as np import pandas as pd +import re def parseVehicleBridgeLogs(logname,start_time_epoch, end_time_epoch, run_num): @@ -18,20 +19,20 @@ def parseVehicleBridgeLogs(logname,start_time_epoch, end_time_epoch, run_num): writer.writerow(["Unit Id","Topic","Payload Timestamp", "Metadata"]) start_time = (datetime.datetime.fromtimestamp(start_time_epoch).astimezone(pytz.utc)).replace(tzinfo=None) - end_time = (datetime.datetime.fromtimestamp(end_time_epoch).astimezone(pytz.utc)).replace(tzinfo=None) + end_time = (datetime.datetime.fromtimestamp(end_time_epoch).astimezone(pytz.utc)).replace(tzinfo=None) print("Start time: ", start_time) print("End Time: ", end_time) # For each published message logged, convert payload to json and extract required info for line in vehicle_bridge_log: - + topic_name = "" unit_id = "" - + time_in_s = 0 payload_timestamp_utc = datetime.datetime.fromtimestamp(1681345530) metadata = payload_timestamp_utc - + line = line.replace("}\\n\"","") # Convert True and False to strings @@ -41,49 +42,44 @@ def parseVehicleBridgeLogs(logname,start_time_epoch, end_time_epoch, run_num): split = line.split("Publishing message: ") if len(split) < 2: continue - + p = re.compile('(? 10: - exponent = len(timestamp_string) - 10 - time_in_s = float(timestamp_string)/(10 **exponent) + time_in_s = float(timestamp_string)/1e6 + - else: print("Couldn't find timestamp in payload, exiting") break - + if time_in_s == 0: continue - payload_time_in_datetime = datetime.datetime.fromtimestamp(time_in_s) payload_timestamp_utc = (payload_time_in_datetime.astimezone(pytz.utc)).replace(tzinfo=None) metadata = payload_timestamp_utc if payload_timestamp_utc > start_time and payload_timestamp_utc < end_time: - - writer.writerow([unit_id, topic_name, payload_timestamp_utc, metadata]) + + writer.writerow([unit_id, topic_name, payload_timestamp_utc, metadata]) elif payload_timestamp_utc > end_time: - break - + break + def read_log_table(): log_csv = 'log_timesheet.csv' log_df = pd.read_csv(log_csv) log_df = log_df.dropna() log_df_dict = dict(tuple(log_df.groupby('Test Case'))) - # print(log_df_group) return log_df_dict @@ -91,7 +87,7 @@ def read_log_table(): def main(): if len(sys.argv) < 2: print('Run with: "python3 parse_streets_bridge.py logname"') - else: + else: logname = sys.argv[1] @@ -102,21 +98,21 @@ def main(): runs_range = range(int(runs_range_split[0]),int(runs_range_split[0]) + 1) else: runs_range = range(int(runs_range_split[0]),int(runs_range_split[1]) + 1) - + log_timesheet_df = read_log_table() test_df = log_timesheet_df[test_case] - + for index in range(0, len(test_df)): start_time_epoch = test_df['Start Time'].values[index] end_time_epoch = test_df['End Time'].values[index] - - local = pytz.timezone("America/New_York") - + + local = pytz.timezone("America/New_York") + run_num = test_df['Run'].values[index].split('R')[1] - - if int(run_num) in runs_range: + + if int(run_num) in runs_range: print("Run num: ", run_num) parseVehicleBridgeLogs(logname,start_time_epoch, end_time_epoch, run_num) diff --git a/telematic_system/scripts/log_analysis/parse_v2xhub_telematic_plugin_logs.py b/telematic_system/scripts/log_analysis/parse_v2xhub_telematic_plugin_logs.py index f64be0bd..128fade8 100644 --- a/telematic_system/scripts/log_analysis/parse_v2xhub_telematic_plugin_logs.py +++ b/telematic_system/scripts/log_analysis/parse_v2xhub_telematic_plugin_logs.py @@ -4,13 +4,16 @@ import sys import json.decoder as decoder from enum import Enum +import datetime +import pytz +import re class Column(Enum): ''' Column The column name of the output csv file. ''' - UNIT_ID= "Unit ID" + UNIT_ID= "Unit Id" TOPIC= "Topic" PAYLOAD_TS= "Payload Timestamp" PAYLOAD_SRC= "Payload Source" @@ -30,6 +33,25 @@ class LogKey(Enum): EVENT_NAME= "event_name" UNIT_TYPE= "unit_type" +class LogTimeSheet: + ''' Read the log_timesheet.csv file to store the list of test cases and their start and end time of all runs within each test case.''' + def __init__(self, log_timesheet_path: str) -> None: + self.log_df = pd.read_csv(log_timesheet_path) + self.log_df = self.log_df.dropna() + self.log_df_dict = dict(tuple(self.log_df.groupby('Test Case'))) + + def get_run_duration(self, test_case_name: str, target_run_num: int): + '''Return start and end time for a given test case and run number.''' + test_case_log_table = self.log_df_dict[test_case_name] + for index, row in test_case_log_table.iterrows(): + run_num = re.sub(r"[a-z|A-Z]*", "", row['Run']) + if int(run_num) == target_run_num: + return (row['Start Time'], row['End Time']) + return tuple() + + def to_utc_datetime(self, timestamp): + return datetime.datetime.fromtimestamp(timestamp).astimezone(pytz.utc).replace(tzinfo=None) + class TelematicMessageConvertor: """ The `TelematicMessageConvertor` class is responsible for parsing and converting v2xhub telematic bridge log files into a CSV format. @@ -53,26 +75,49 @@ def convert_to_json(self, msg: str): except (decoder.JSONDecodeError , TypeError) as e: return sys.exit("Could not decode JSON message " + str(e)) - def to_csv(self, output_path: str): - df = pd.DataFrame.from_dict(self.published_msg, orient='columns') - df.to_csv(output_path, index=False) + def to_csv(self, data: pd.DataFrame, output_path: str): + data.to_csv(output_path, index=False, date_format='%Y-%m-%d %H:%M:%S.%f') + + def get_filter_published_msg_frame(self, start_time_utc, end_time_utc): + listmsg = [] + published_msg_frame = pd.DataFrame.from_dict(self.published_msg) + for index, row in published_msg_frame.iterrows(): + if row['Payload Timestamp']>=start_time_utc and row['Payload Timestamp']<= end_time_utc: + listmsg.append(row) + filtered_published_msg=pd.DataFrame(listmsg) + return filtered_published_msg + def append_published_msg(self, msg_json: dict): - self.published_msg[Column.EVENT_NAME.value].append(msg_json[LogKey.EVENT_NAME.value]) - self.published_msg[Column.PAYLOAD_TS.value].append(str(msg_json[LogKey.PAYLOAD_TS.value])) + '''Append the msg in JSON format as a row in the published msg dictionary.''' + self.published_msg[Column.EVENT_NAME.value].append(msg_json[LogKey.EVENT_NAME.value]) + payload_time_str = str(msg_json[LogKey.PAYLOAD_TS.value]) + payload_timestamp_utc = self.convert_timestamp(payload_time_str) + self.published_msg[Column.PAYLOAD_TS.value].append(payload_timestamp_utc) self.published_msg[Column.TOPIC.value].append(msg_json[LogKey.TOPIC.value]) self.published_msg[Column.PAYLOAD_SRC.value].append(msg_json[LogKey.PAYLOAD.value][LogKey.SOURCE.value]) self.published_msg[Column.UNIT_ID.value].append(msg_json[LogKey.UNIT_ID.value]) self.published_msg[Column.UNIT_TYPE.value].append(msg_json[LogKey.UNIT_TYPE.value]) - def split_lines(self, chunk: str)-> tuple[list[str], str]: + def convert_timestamp(self, timestamp_str: str): + payload_timestamp = float(timestamp_str) + time_in_s = payload_timestamp/1e6 + time_in_s = float("{:.6f}".format(time_in_s)) + payload_timestamp_utc = self.to_utc_datetime(time_in_s) + return payload_timestamp_utc + + def to_utc_datetime(self, timestamp): + '''Convert timestamp to datetime and UTC timezone.''' + return datetime.datetime.fromtimestamp(timestamp).astimezone(pytz.utc).replace(tzinfo=None) + + def split_lines(self, chunk: str): lines = chunk.split('\n') remaining_part = lines.pop(-1) return (lines, remaining_part) def is_published_msg(self, line: str)->bool: return 'Published:' in line and 'TelematicUnit' in line - + def extract_published_msg(self, line: str)->str: msg_idx = line.find("Published:") msg_end_idx = line.find('n","stream"') @@ -102,13 +147,36 @@ def parse_log_file(self, log_file_path: str): except FileNotFoundError as e: sys.exit("Could not find file " + str(e)) + def get_test_case_run_nums(self, file_path: str): + '''Get the test case and run numbers for each test case given the filepath.''' + test_case = (file_path.split("/")[-1]).split("_")[0] + runs_string = ((file_path.split("/")[-1]).split("_")[1].split(".")[0])[1:] + runs_range_split = runs_string.split('-') + if len(runs_range_split) == 1: + runs_range = range(int(runs_range_split[0]),int(runs_range_split[0]) + 1) + else: + runs_range = range(int(runs_range_split[0]),int(runs_range_split[1]) + 1) + return test_case, runs_range + if __name__ == "__main__": parser = argparse.ArgumentParser(prog="v2xhub_telematic_bridge_log_parser", description="Parse v2xhub telematic bridge log file") parser.add_argument("--log_file_path", required=True, help="Path to v2xhub telematic bridge log file") - parser.add_argument("--output_path",required=False, default="v2xhub_telematic_published_msgs.csv",help="Path to output csv file") args = parser.parse_args() - + print("Parsing "+ args.log_file_path+" ...") converter = TelematicMessageConvertor() converter.parse_log_file(args.log_file_path) - converter.to_csv(args.output_path) \ No newline at end of file + test_case, target_run_range = converter.get_test_case_run_nums(args.log_file_path) + + log_timesheet=LogTimeSheet("log_timesheet.csv") + + for target_run_num in target_run_range: + start_time, end_time = log_timesheet.get_run_duration(test_case, target_run_num) + start_time_utc = log_timesheet.to_utc_datetime(start_time) + end_time_utc = log_timesheet.to_utc_datetime(end_time) + print("\n>>> Searching Test case: "+ test_case + ", run number: " + str(target_run_num)+ ", start time: " + str(start_time_utc) + ", end time: " + str(end_time_utc)) + filter_published_msg_frame = converter.get_filter_published_msg_frame(start_time_utc, end_time_utc) + print(filter_published_msg_frame) + if not filter_published_msg_frame.empty: + converter.to_csv(filter_published_msg_frame, args.log_file_path.split("/")[-1]+"_"+ str(target_run_num)+"_parsed.csv") + diff --git a/telematic_system/scripts/log_analysis/parse_vehicle_bridge_logs.py b/telematic_system/scripts/log_analysis/parse_vehicle_bridge_logs.py index 426447dd..8fae877b 100644 --- a/telematic_system/scripts/log_analysis/parse_vehicle_bridge_logs.py +++ b/telematic_system/scripts/log_analysis/parse_vehicle_bridge_logs.py @@ -11,23 +11,23 @@ def parseVehicleBridgeLogs(logname,start_time_epoch, end_time_epoch, run_num): filename = logname.split(".")[0] + print(filename) with open(logname, 'r') as vehicle_bridge_log: results_file = open(f'{filename}_{run_num}_parsed.csv', 'w') writer = csv.writer(results_file) writer.writerow(["Unit Id","Topic","Payload Timestamp", "Metadata"]) - + start_time = (datetime.datetime.fromtimestamp(start_time_epoch).astimezone(pytz.utc)).replace(tzinfo=None) - end_time = (datetime.datetime.fromtimestamp(end_time_epoch).astimezone(pytz.utc)).replace(tzinfo=None) - + end_time = (datetime.datetime.fromtimestamp(end_time_epoch).astimezone(pytz.utc)).replace(tzinfo=None) + # For each published message logged, convert payload to json and extract required info for line in vehicle_bridge_log: - print(line) topic_name = "" unit_id = "" - + time_in_s = 0 payload_timestamp_utc = datetime.datetime.fromtimestamp(1681345530) metadata = payload_timestamp_utc @@ -35,16 +35,17 @@ def parseVehicleBridgeLogs(logname,start_time_epoch, end_time_epoch, run_num): split = line.split("Publishing message: ") if len(split) < 2: continue - + line = line.split("Publishing message: ")[1] - - + + line_split = line.split("\\r\\n\"")[0] - + print(line_split) + payload = line_split.replace('\\', "") payload_json = json.loads(payload) - + topic_name = payload_json['topic_name'] unit_id = payload_json['unit_id'] @@ -52,29 +53,28 @@ def parseVehicleBridgeLogs(logname,start_time_epoch, end_time_epoch, run_num): if 'timestamp' in payload_json: timestamp_string = str(payload_json['timestamp'])[:-1] time_in_s = float(timestamp_string)/1e6 - + else: print("Couldn't find timestamp in payload, exiting") break - + if time_in_s == 0: continue - + payload_time_in_datetime = datetime.datetime.fromtimestamp(time_in_s) payload_timestamp_utc = (payload_time_in_datetime.astimezone(pytz.utc)).replace(tzinfo=None) metadata = payload_timestamp_utc if payload_timestamp_utc > start_time and payload_timestamp_utc < end_time: - writer.writerow([unit_id, topic_name, payload_timestamp_utc, metadata]) + writer.writerow([unit_id, topic_name, payload_timestamp_utc, metadata]) elif payload_timestamp_utc > end_time: - break - + break + def read_log_table(): log_csv = 'log_timesheet.csv' log_df = pd.read_csv(log_csv) log_df = log_df.dropna() log_df_dict = dict(tuple(log_df.groupby('Test Case'))) - # print(log_df_group) return log_df_dict @@ -82,33 +82,38 @@ def read_log_table(): def main(): if len(sys.argv) < 2: print('Run with: "python3 parse_vehicle_bridge_logs.py logname"') - else: + else: logname = sys.argv[1] test_case = (logname.split("/")[-1]).split("_")[0] runs_string = ((logname.split("/")[-1]).split("_")[1].split(".")[0])[1:] runs_range_split = runs_string.split('-') + print("test case: "+ test_case) + print("runs_string: "+ runs_string) if len(runs_range_split) == 1: runs_range = range(int(runs_range_split[0]),int(runs_range_split[0]) + 1) else: runs_range = range(int(runs_range_split[0]),int(runs_range_split[1]) + 1) - + log_timesheet_df = read_log_table() test_df = log_timesheet_df[test_case] - + for index in range(0, len(test_df)): start_time_epoch = test_df['Start Time'].values[index] end_time_epoch = test_df['End Time'].values[index] - - local = pytz.timezone("America/New_York") - + + local = pytz.timezone("America/New_York") + run_num = test_df['Run'].values[index].split('R')[1] - - if int(run_num) in runs_range: + if int(run_num) in runs_range: + + print("start time epoch: " + str(start_time_epoch)) + print("end time epoch: " + str(end_time_epoch)) + print(runs_range) print("Run num: ", run_num) parseVehicleBridgeLogs(logname,start_time_epoch, end_time_epoch, run_num) diff --git a/telematic_system/scripts/log_analysis/requirements.txt b/telematic_system/scripts/log_analysis/requirements.txt new file mode 100644 index 00000000..16d4d08e --- /dev/null +++ b/telematic_system/scripts/log_analysis/requirements.txt @@ -0,0 +1,7 @@ +pandas +seaborn +matplotlib +pycairo +PyGObject +pytz +numpy diff --git a/telematic_system/telematic.env b/telematic_system/telematic.env index 750631b0..9035287a 100644 --- a/telematic_system/telematic.env +++ b/telematic_system/telematic.env @@ -1,14 +1,14 @@ # INFLUX DB DOCKER_INFLUXDB_INIT_CLI_CONFIG_NAME=telematic INFLUXDB_DEV_TAG=latest #Required: The tag for influxDB image version. Current latest version is 2.4 -INFLUXDB_DEV_INIT_USERNAME= #Required: Create a credential username on container initial startup. -INFLUXDB_DEV_INIT_PASSWORD= #Required: Create a credential password on container initial startup. +INFLUXDB_DEV_INIT_USERNAME= #Required: Create a credential username on container initial startup. +INFLUXDB_DEV_INIT_PASSWORD= #Required: Create a credential password on container initial startup. INFLUXDB_DEV_INIT_BUCKET=platform-dev #Required: Create an bucket on container initial startup. You can create more buckets inside the influxDB container. DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=my-super-secret-auth-token # Manually set the token value INFLUXDB_DEV_ORG=my-org #Required: Create an organization on container startup. You can create more orgs inside the same influxDB container. INFLUXDB_DEV_RETENTION=7d #Optional: If not set, default is 0s meaning the initial bucket will retain data forever. -INFLUX_URL= #URL where influxdb is hosted -NATS_SERVER_IP_PORT=: #change to amazon ec2 ipv4 address:nats_port +INFLUX_URL= #URL where influxdb is hosted +NATS_SERVER_IP_PORT= #change to amazon ec2 ipv4 address:nats_port ##Cloud Bridge params CARMA_CLOUD_LOG=/opt/tomcat/logs/carmacloud.log #Required for carma cloud bridge, containing the directory where carma cloud log is located CARMA_CLOUD_BRIDGE_UNIT_ID=cloud_id @@ -88,13 +88,13 @@ FILE_EXTENSIONS=.mcap # Only query a list of objects with supported file extensi # Messaging Server Config ### Log level (INFO, DEBUG, ERROR) MESSAGING_LOGGING_LEVEL=INFO -MESSAGING_NATS_URI=nats://:4222 +MESSAGING_NATS_URI= #change to amazon ec2 ipv4 address:nats_port. Example - 127.0.0.1:4222 MESSAGING_NATS_MAX_RECONNECTS=5 MESSAGING_INFLUX_BUCKET_TYPE=ALL -MESSAGING_INFLUX_URI= -MESSAGING_INFLUX_PORT= -MESSAGING_INFLUX_USERNAME= -MESSAGING_INFLUX_PWD= +MESSAGING_INFLUX_URI= #Required: public IP for machine where influxdb is hosted +MESSAGING_INFLUX_PORT= #Required: Port at which influxdb is available +MESSAGING_INFLUX_USERNAME= #Required: Username for influxdb instance +MESSAGING_INFLUX_PWD= #Required: Password for influxdb instance MESSAGING_INFLUX_BUCKET_STREETS=infrastructure-dev MESSAGING_STREETS_SUBSCRIPTION_TOPIC=streets.*.data. MESSAGING_INFLUX_BUCKET_PLATFORM=platform-dev @@ -109,8 +109,8 @@ MESSAGING_INFLUX_ORG=my-org MESSAGING_INFLUX_TOKEN=my-super-secret-auth-token #Edge case for hostBSMId, sender_bsm_id, core_data.id and TCR/TCM ID where the Ids can be all digits or alpha characters MESSAGING_TO_STR_FIELDS=hostBSMId,TrafficControlRequest.reqid,tcmV01.reqid,m_header.sender_bsm_id,core_data.id -# Ignore data fields from message and do not save it into the influxDB. -# This is to prevent the data type conflicts: Some fields have random values like characters or numbers overtime. +# Ignore data fields from message and do not save it into the influxDB. +# This is to prevent the data type conflicts: Some fields have random values like characters or numbers overtime. # This confuse the influxDB as it does not know what type of field should be created to accommodate different values. MESSAGING_IGNORE_FIELDS=payload.MessageFrame.value.PersonalSafetyMessage.id # Connection timeout to influx bucket. Unit: milliseconds @@ -138,8 +138,8 @@ HPS_LOG_HANDLER_TYPE=console HPS_INFLUX_BUCKET=infrastructure-dev HPS_INFLUX_ORG=my-org HPS_INFLUX_TOKEN=my-super-secret-auth-token -HPS_INFLUX_IP= # IP address of the machine where influxdb is hosted -HPS_INFLUX_PORT= +HPS_INFLUX_IP= # IP address of the machine where influxdb is hosted +HPS_INFLUX_PORT= # Port at which influxdb is accessible on the host machine HPS_MYSQL_HOST=127.0.0.1 # Add host IP where mysql database is hosted HPS_MYSQL_PORT=3306 HPS_MYSQL_DB=wfd_grafana # Name of Mysql databse @@ -152,4 +152,3 @@ HPS_FILE_PROCESSING_SUBJECT=ui.file.processing HPS_TO_STR_FIELDS=['status','data','TrafficControlRequest.reqid','m_header.sender_bsm_id','core_data.id'] HPS_IGNORE_FIELDS=['payload.MessageFrame.value.PersonalSafetyMessage.id','tcr_v01.reqid.id'] HPS_TOPIC_EXCLUSION_LIST= /carma_system_version,/environment/map_filtered_points,/environment/points_clustered,/environment/points_in_map,/environment/detected_objects,/environment/roadway_objects,/environment/motion_computation_visualization,/hardware_interface/lidar/points_raw,/environment/points_ground,/environment/points_no_ground,/environment/points_in_base_link,/environment/lidar_detected_objects,/environment/map_update,/environment/semantic_map,/environment/base_map,/diagnostics,/environment/base_map,/environment/bounding_box_converter/transition_event,/environment/carma_wm_broadcaster/transition_event,/environment/detected_objects,/environment/environment_perception_controller/transition_event,/environment/external_object/transition_event,/environment/external_objects,/environment/external_objects_viz,/environment/leftover_clusters,/environment/lidar_bounding_boxes,/environment/lidar_bounding_boxes_viz,/environment/lidar_detected_objects,/environment/lidar_frame_transformer/transition_event,/environment/lidar_to_map_frame_transformer/transition_event,/environment/map_filtered_points,/environment/map_update,/environment/motion_computation_node/transition_event,/environment/motion_computation_visualize,/environment/motion_prediction_visualizer/transition_event,/environment/object_visualizer_node/transition_event,/environment/perception_points_filter_container/transition_event,/environment/points_clustered,/environment/points_ground,/environment/points_in_base_link,/environment/points_in_map,/environment/points_map_filter/transition_event,/environment/points_no_ground,/environment/roadway_objects,/environment/roadway_objects_node/transition_event,/environment/roadway_obstacles,/environment/roadway_obstacles_viz,/environment/semantic_map,/environment/tcm_visualizer,/environment/tracked_objects,/environment/traffic_incident_parser_node/transition_event,/guidance/arbitrator/transition_event,/guidance/cav_marker,/guidance/guidance_controller/transition_event,/guidance/guidance_node/transition_event,/guidance/host_marker,/guidance/label_marker,/guidance/mobilitypath_visualizer_node/transition_event,/guidance/plan_delegator/transition_event,/guidance/plugin_discovery,/guidance/plugins/inlanecruising_plugin/transition_event,/guidance/plugins/light_controlled_intersection_tactical_plugin/transition_event,/guidance/plugins/platooning_tactical_plugin_node/transition_event,/guidance/plugins/pure_pursuit_wrapper/transition_event,/guidance/plugins/route_following_plugin/transition_event,/guidance/plugins/sci_strategic_plugin/transition_event,/guidance/plugins/stop_and_wait_plugin/transition_event,/guidance/plugins/stop_controlled_intersection_tactical_plugin/transition_event,/guidance/plugins/yield_plugin/transition_event,/guidance/port_drayage_plugin_node/transition_event,/guidance/route_marker,/guidance/route_node/transition_event,/guidance/trajectory_executor_node/transition_event,/guidance/trajectory_visualizer,/guidance/trajectory_visualizer_node/transition_event,/hardware_interface/accel_aux_rpt,/hardware_interface/all_system_statuses,/hardware_interface/as/speed_model/transition_event,/hardware_interface/as/speed_model_config,/hardware_interface/as/steering_model/transition_event,/hardware_interface/as/steering_model_config,/hardware_interface/as/veh_controller/transition_event,/hardware_interface/as/veh_controller_config,/hardware_interface/as/veh_interface/transition_event,/hardware_interface/as/veh_interface_config,/hardware_interface/carma_novatel_driver_wrapper_node/transition_event,/hardware_interface/comms/inbound_binary_msg,/hardware_interface/comms/outbound_binary_msg,/hardware_interface/component_rpt,/hardware_interface/date_time_rpt,/hardware_interface/driver_shutdown_5cff53178b6c4b64891942011c611f9a/transition_event,/hardware_interface/driver_shutdown_8767cd437e204a1491116fbacbe0e0bc/transition_event,/hardware_interface/driver_shutdown_ab3249fe2c1842489af73c22ad5fd775/transition_event,/hardware_interface/driver_shutdown_fd9b1bee52464111a097362c7cfc70f4/transition_event,/hardware_interface/drivers_controller/transition_event,/hardware_interface/dsrc_driver_node/transition_event,/hardware_interface/imu_raw,/hardware_interface/interior_lights_rpt,/hardware_interface/lat_lon_heading_rpt,/hardware_interface/libkvaser_reader_node/transition_event,/hardware_interface/libkvaser_writer_node/transition_event,/hardware_interface/lidar/points_raw,/hardware_interface/novatel/oem7/bestpos,/hardware_interface/novatel/oem7/bestutm,/hardware_interface/novatel/oem7/bestvel,/hardware_interface/novatel/oem7/corrimu,/hardware_interface/novatel/oem7/fix,/hardware_interface/novatel/oem7/gps,/hardware_interface/novatel/oem7/heading2,/hardware_interface/novatel/oem7/imu/data,/hardware_interface/novatel/oem7/insconfig,/hardware_interface/novatel/oem7/inspva,/hardware_interface/novatel/oem7/inspvax,/hardware_interface/novatel/oem7/insstdev,/hardware_interface/novatel/oem7/odom,/hardware_interface/novatel/oem7/oem7raw,/hardware_interface/novatel/oem7/rxstatus,/hardware_interface/novatel/oem7/time,/hardware_interface/pacmod3_node/transition_event,/hardware_interface/ssc_converter_node/transition_event,/hardware_interface/ssc_interface_wrapper_node/transition_event,/hardware_interface/velodyne_lidar_driver_wrapper_node/transition_event,/hardware_interface/velodyne_packets,/joint_states,/localization/gnss_to_map_convertor/transition_event,/localization/localization_controller/transition_event,/localization/localization_manager/transition_event,/localization/map_param_loader/georeference,/message/bsm_generator_node/transition_event,/message/carma_cloud_client_node/transition_event,/message/cpp_message_node/transition_event,/message/incoming_j2735_bsm,/message/incoming_j2735_geofence_control,/message/incoming_j2735_geofence_request,/message/incoming_j2735_map,/message/incoming_j2735_psm,/message/incoming_j2735_spat,/message/incoming_map,/message/incoming_mobility_operation,/message/incoming_mobility_path,/message/incoming_mobility_request,/message/incoming_mobility_response,/message/incoming_psm,/message/incoming_spat,/message/j2735_convertor_node/transition_event,/message/mobilitypath_publisher_node/transition_event,/message/outgoing_j2735_bsm,/message/outgoing_j2735_geofence_control,/message/outgoing_j2735_geofence_request,/message/outgoing_j2735_psm,/message/v2x_controller/transition_event,/parameter_events,/robot_description,/rosout,/system_alert,/tf,/tf_static,/ui/client_count,/ui/connected_clients,/ui/ui_instructions,/guidance/twist_filter_node/transition_event,/guidance/approaching_emergency_vehicle_plugin/transition_event,/guidance/plugins/cooperative_lanechange/transition_event,/guidance/twist_gate_node/transition_event,/hardware_interface/driver_shutdown_494b616fe26443fca421ea1f2e04d1d4/transition_event,/hardware_interface/driver_shutdown_77e0426440b045b580837068ea149ea1/transition_event,/hardware_interface/driver_shutdown_9219ae46d4084143a883557f6b60bfc3/transition_event,/hardware_interface/driver_shutdown_494b616fe26443fca421ea1f2e04d1d4/transition_event,/hardware_interface/driver_shutdown_77e0426440b045b580837068ea149ea1/transition_event,/hardware_interface/driver_shutdown_9219ae46d4084143a883557f6b60bfc3/transition_event,/hardware_interface/lightbar_manager/transition_event,/hardware_interface/lightbar_manager_container/transition_event,/guidance/plugins/approaching_emergency_vehicle_plugin/transition_event,/environment/detected_objects,/environment/external_object_predictions,/environment/external_objects,/environment/intersection_signal_group_ids,/environment/leftover_clusters,/environment/map_filtered_points,/environment/roadway_objects,/environment/roadway_obstacles,/environment/tcm_visualizer,/environment/tcr_bounding_points,/guidance/final_maneuver_plan,/guidance/plan_trajectory,/guidance/plugins/cooperative_lanechange/transition_event,/guidance/plugins/debug/trajectory_planning,/guidance/plugins/platoon_control/plan_trajectory,/guidance/plugins/pure_pursuit/plan_trajectory,/guidance/route,/guidance/upcoming_lane_change_status,/hardware_interface/accel_aux_rpt,/hardware_interface/all_system_statuses,/hardware_interface/as/brake_command_echo,/hardware_interface/as/brake_feedback,/hardware_interface/as/curvature_feedback,/hardware_interface/as/gear_command_echo,/hardware_interface/as/gear_feedback,/hardware_interface/as/gear_select,/hardware_interface/as/pacmod/as_rx/shift_cmd,/hardware_interface/as/pacmod/parsed_tx/global_rpt,/hardware_interface/as/pacmod/parsed_tx/shift_rpt,/hardware_interface/as/pacmod/parsed_tx/wheel_speed_rpt,/hardware_interface/as/speed_pedals,/hardware_interface/as/steering_command_echo,/hardware_interface/as/steering_feedback,/hardware_interface/as/steering_wheel,/hardware_interface/as/throttle_command_echo,/hardware_interface/as/throttle_feedback,/hardware_interface/as/turn_signal_command,/hardware_interface/as/vehicle_platform,/hardware_interface/as/velocity_accel_cov,/hardware_interface/brake_aux_rpt,/hardware_interface/can/brake_position,/hardware_interface/can/steering_wheel_angle,/hardware_interface/can/transmission_state,/hardware_interface/can_rx,/hardware_interface/can_tx,/hardware_interface/hazard_lights_cmd,/hardware_interface/hazard_lights_rpt,/hardware_interface/headlight_aux_rpt,/hardware_interface/headlight_cmd,/hardware_interface/headlight_rpt,/hardware_interface/horn_cmd,/hardware_interface/horn_rpt,/hardware_interface/lat_lon_heading_rpt,/hardware_interface/pacmod/parsed_tx/brake_rpt,/hardware_interface/pacmod/parsed_tx/shift_rpt,/hardware_interface/pacmod/parsed_tx/steer_rpt,/hardware_interface/shift_aux_rpt,/hardware_interface/state,/hardware_interface/steering_aux_rpt,/hardware_interface/turn_aux_rpt,/hardware_interface/turn_rpt,/hardware_interface/vehicle/engage,/hardware_interface/velocity_accel_cov,/hardware_interface/vin_rpt,/localization/current_pose_with_covariance,/localization/gnss_pose,/localization/initialpose,/localization/localization_status,/localization/managed_initialpose,/localization/map_param_loader/georeference,/localization/ndt_pose,/localization/ndt_stat,/localization/selected_pose,/robot_description,/message/j2735_convertor_node/transition_event,/hardware_interface/driver_shutdown_81dc0955537246e38b742b0cc74a1a8d/transition_event,/hardware_interface/driver_shutdown_8d5d9db8edc243aca0016a9d9e60e918/transition_event,/hardware_interface/driver_shutdown_98a97ce77cda4cfbb7e8912c56276e62/transition_event,/hardware_interface/driver_shutdown_c19bda6e09c2471bbb762c8d2f0af03d/transition_event,/guidance/twist_filter_node/limitation_debug/ctrl/lateral_accel,/guidance/twist_filter_node/limitation_debug/ctrl/lateral_jerk,/guidance/twist_filter_node/limitation_debug/twist/lateral_accel,/guidance/twist_filter_node/limitation_debug/twist/lateral_jerk,/hardware_interface/as/pacmod/as_rx/enable,/hardware_interface/as/dbw_enabled_feedback,/hardware_interface/as/module_states - diff --git a/telematic_system/telematic.local.env b/telematic_system/telematic.local.env index 292b53e1..174fe063 100644 --- a/telematic_system/telematic.local.env +++ b/telematic_system/telematic.local.env @@ -7,7 +7,7 @@ DOCKER_INFLUXDB_INIT_PASSWORD=P@ssword1 #Required: Create a credential password DOCKER_INFLUXDB_INIT_BUCKET=infrastructure-dev #Required: Create an bucket on container initial startup. You can create more buckets inside the influxDB container. DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=my-super-secret-auth-token #Required: Manually set the token value DOCKER_INFLUXDB_INIT_ORG=my-org #Required: Create an organization on container startup. You can create more orgs inside the same influxDB container. -DOCKER_INFLUXDB_INIT_RETENTION=7d #Optional: If not set, default is 0s meaning the initial bucket will retain data forever. +DOCKER_INFLUXDB_INIT_RETENTION= #Optional: If not set, the initial bucket will retain data for one year. INFLUX_URL=127.0.0.1:8086 ################################################### diff --git a/telematic_system/telematic_apps/web_app/client/src/components/ros2_rosbag/ROS2RosbagRowItem.js b/telematic_system/telematic_apps/web_app/client/src/components/ros2_rosbag/ROS2RosbagRowItem.js index ddf17a37..88c88a9b 100644 --- a/telematic_system/telematic_apps/web_app/client/src/components/ros2_rosbag/ROS2RosbagRowItem.js +++ b/telematic_system/telematic_apps/web_app/client/src/components/ros2_rosbag/ROS2RosbagRowItem.js @@ -40,6 +40,7 @@ const ROS2RosbagRowItem = (props) => { let isGreen = (column.id === "process_status" && props.ROS2RosbagRow.process_status === PROCESSING_STATUS.COMPLETED) || (column.id === "upload_status" && props.ROS2RosbagRow.upload_status === UPLOAD_STATUS.COMPLETED); let isRed = (column.id === "process_status" && props.ROS2RosbagRow.process_status === PROCESSING_STATUS.ERROR) || (column.id === "upload_status" && props.ROS2RosbagRow.upload_status === UPLOAD_STATUS.ERROR); let createdBy = column.id === "created_by" && props.ROS2RosbagRow?.user?.login !== undefined ? props.ROS2RosbagRow?.user?.login : "NA"; + let columnsToLowercase = column.id === "process_status" || column.id === "upload_status"; value = column.id === "size" ? calFilesizes(value) : value; value = column.id === "created_by" ? createdBy : value; value = column.id === "created_at" ? new Date(value).toLocaleString() : value; @@ -62,7 +63,7 @@ const ROS2RosbagRowItem = (props) => { wordBreak: "break-word", }} > - {value.toLowerCase()} + {columnsToLowercase ? value.toLowerCase() : value} {column.id === "process_status" && isRed && ( { ); isValid = false; } + + //Check file name + const re = /^[A-Za-z0-9_]+\.(mcap)$/g; + if (newFileInfo?.filename?.length <0 || newFileInfo?.filename?.match(re)===null) { + messageList.push( + "Invalid filename: " + newFileInfo?.filename + ". Filename can only contain alphanumeric characters and underscore!" + ); + isValid = false; + } }); if (!isValid && messageList.length > 0) { setAlertStatus({ diff --git a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/message_converters/JSON2KeyValuePairsConverter.java b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/message_converters/JSON2KeyValuePairsConverter.java index 614d60ef..b9bfffd0 100644 --- a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/message_converters/JSON2KeyValuePairsConverter.java +++ b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/message_converters/JSON2KeyValuePairsConverter.java @@ -31,7 +31,7 @@ public String convertJson2KeyValuePairs(String jsonStr, List toStrFields else { for(String field: ignoreFields) { - if(key.toString().strip().equalsIgnoreCase(field)) + if(key.toString().strip().equalsIgnoreCase(field)) { isIgnored = true; } @@ -54,7 +54,7 @@ public String convertJson2KeyValuePairs(String jsonStr, List toStrFields else if (value.toString().matches("[-+]?\\d*\\.?\\d+")) { pairs += key + "=" + value; } - //Regex matching scientific notation. InfluxDB does not support scientific notation float syntax, temporarily set this kind of value = 0.0 + //Regex matching scientific notation. InfluxDB does not support scientific notation float syntax, temporarily set this kind of value = 0.0 else if (value.toString().matches("^[+-]?\\d+(?:\\.\\d*(?:[eE][+-]?\\d+)?)?$")) { pairs += key + "=" + 0.0; } @@ -67,9 +67,9 @@ else if (value.toString().toLowerCase().strip().equals("nan")){ else { pairs += key + "=\"" + value.toString().replaceAll("\\s", "") + "\""; } - } - } - + } + } + if (!isIgnored && json.keySet().size() != keyCount) { pairs += ","; } diff --git a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/Config.java b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/Config.java index d80f00b4..5d32eeb5 100644 --- a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/Config.java +++ b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/Config.java @@ -6,7 +6,7 @@ import org.springframework.stereotype.Component; /** - * The Config object instantiates a configuration object which stores information to create a connection to the telematic NATS server + * The Config object instantiates a configuration object which stores information to create a connection to the telematic NATS server * and influxdb bucket. */ @Component @@ -28,83 +28,83 @@ public enum BucketType{ @Value("${M_NATS_URI}") String natsUri; - // URI where the influxdb bucket is hosted - @Value("${M_INFLUX_URI}") + // URI where the influxdb bucket is hosted + @Value("${M_INFLUX_URI}") String influxUri; - + //PORT to which influxDB is connected - @Value("${M_INFLUX_PORT}") + @Value("${M_INFLUX_PORT}") String influxPort; - + // Influxdb bucket type: Can be Platform, Streets or All - @Value("${M_INFLUX_BUCKET_TYPE}") + @Value("${M_INFLUX_BUCKET_TYPE}") String influxBucketTypeStr; BucketType influxBucketType; - + // Influxdb bucket name for CARMA Streets bucket - @Value("${M_INFLUX_BUCKET_STREETS}") + @Value("${M_INFLUX_BUCKET_STREETS}") String influxBucketStreets; - - // NATS topic carma-streets data is published to. - @Value("${M_STREETS_SUBSCRIPTION_TOPIC}") + + // NATS topic carma-streets data is published to. + @Value("${M_STREETS_SUBSCRIPTION_TOPIC}") String streetsSubscriptionTopic; - + // Influxdb bucket name for CARMA Platform bucket - @Value("${M_INFLUX_BUCKET_PLATFORM}") + @Value("${M_INFLUX_BUCKET_PLATFORM}") String influxBucketPlatform; - + // NATS topic carma-platform data is published to - @Value("${M_PLATFORM_SUBSCRIPTION_TOPIC}") + @Value("${M_PLATFORM_SUBSCRIPTION_TOPIC}") String platformSubscriptionTopic; - + // Influxdb bucket name for CARMA Cloud bucket - @Value("${M_INFLUX_BUCKET_CLOUD}") + @Value("${M_INFLUX_BUCKET_CLOUD}") String influxBucketCloud; - - // NATS topic carma-cloud data is published to. - @Value("${M_CLOUD_SUBSCRIPTION_TOPIC}") + + // NATS topic carma-cloud data is published to. + @Value("${M_CLOUD_SUBSCRIPTION_TOPIC}") String cloudSubscriptionTopic; - + // Organization for the influxdb bucket - @Value("${M_INFLUX_ORG}") + @Value("${M_INFLUX_ORG}") String influxOrg; - + // Token to access influxdb bucket - @Value("${M_INFLUX_TOKEN}") + @Value("${M_INFLUX_TOKEN}") String influxToken; - + // Username for influxdb bucket - @Value("${M_INFLUX_USERNAME}") + @Value("${M_INFLUX_USERNAME}") String influxUsername; - + // Password for influxdb bucket - @Value("${M_INFLUX_PWD}") + @Value("${M_INFLUX_PWD}") String influxPwd; - + // Maximum number of times the service tries to establish a NATS connection - @Value("${M_NATS_MAX_RECONNECTS}") + @Value("${M_NATS_MAX_RECONNECTS}") int natsMaxReconnects; - + // Time in milliseconds after which the request to connect to the influxdb bucket times out @Value("${M_INFLUX_CONNECT_TIMEOUT}") int influxConnectTimeout; - + // Time in milliseconds after which the request to write data to the influxdb bucket times out @Value("${M_INFLUX_WRITE_TIMEOUT}") int influxWriteTimeout; - + // Maximum number of topics to assign to dispatcher - @Value("${M_NUMBER_TOPICS_PER_DISPATCHER}") + @Value("${M_NUMBER_TOPICS_PER_DISPATCHER}") int topicsPerDispatcher; - + // List of vehicle unit ids @Value("${M_VEHICLE_UNIT_ID_LIST}") String vehicleUnitIdList; - + // List of streets unit ids @Value("${M_STREETS_UNIT_ID_LIST}") String streetsUnitIdList; - + // List of cloud unit ids @Value("${M_CLOUD_UNIT_ID_LIST}") String cloudUnitIdList; @@ -112,23 +112,23 @@ public enum BucketType{ //List of fields in the stream that should only be set to string data type @Value("${M_TO_STR_FIELDS}") List toStrFields; - + //List of fields in the stream that should be ignored @Value("${M_IGNORE_FIELDS}") - List ignoreFields; + List ignoreFields; // Converts config object parameters to a string - public String toString(){ - return "Configuration: " + - "\nNATS uri: " + natsUri + - "\ninflux uri: " + influxUri + - "\ninflux bucket type: " + influxBucketType + - "\ninflux bucket streets: " + influxBucketStreets + + public String toString(){ + return "Configuration: " + + "\nNATS uri: " + natsUri + + "\ninflux uri: " + influxUri + + "\ninflux bucket type: " + influxBucketType + + "\ninflux bucket streets: " + influxBucketStreets + "\nstreets subscription topic: " + streetsSubscriptionTopic + "\ninflux bucket platform: " + influxBucketPlatform + "\nplatform subscription topic: " + platformSubscriptionTopic + - "\ninflux bucket cloud: " + influxBucketCloud + - "\ncloud subscription topic: " + cloudSubscriptionTopic + + "\ninflux bucket cloud: " + influxBucketCloud + + "\ncloud subscription topic: " + cloudSubscriptionTopic + "\ninflux org: " + influxOrg + "\ninflux token: " + influxToken + "\ninflux username:" + influxUsername + @@ -139,7 +139,7 @@ public String toString(){ "\nNATS topic per dispatcher: " + topicsPerDispatcher+ "\nvehicle unit id list: " + vehicleUnitIdList + "\nstreets unit id list: " + streetsUnitIdList + - "\ncloud unit id list: " + cloudUnitIdList + + "\ncloud unit id list: " + cloudUnitIdList + "\nto str fields:" + toStrFields.toString() + "\nignore fields:" + ignoreFields.toString(); } diff --git a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/InfluxDataWriter.java b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/InfluxDataWriter.java index d4711147..491675d2 100644 --- a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/InfluxDataWriter.java +++ b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/InfluxDataWriter.java @@ -26,7 +26,7 @@ * received from the nats server. Required parameters for connection to InfluxDb are included in * the config.properties file. */ -public class InfluxDataWriter { +public class InfluxDataWriter { private static final Logger logger = LoggerFactory.getLogger(InfluxDataWriter.class); Config influxConfig; @@ -51,7 +51,7 @@ else if(bucketType == Config.BucketType.STREETS){ else if(bucketType == Config.BucketType.CLOUD){ this.influxBucket = config.influxBucketCloud; } - + influxConfig = config; influxConnected = false; @@ -59,7 +59,7 @@ else if(bucketType == Config.BucketType.CLOUD){ logger.info("InfluxDb bucket name: {}", influxBucket); logger.info("InfluxDb org name: {}", influxConfig.influxOrg); } - + public List convertCloudDataToString(String incomingCloudData){ // This method returns a list of TCM messages breaking the list into individual components @@ -73,7 +73,7 @@ public List convertCloudDataToString(String incomingCloudData){ JSONObject tcmList = payloadJson.getJSONObject("TrafficControlMessageList"); try{ Object item = tcmList.get("TrafficControlMessage"); - + if(item instanceof JSONArray){ JSONArray tcmArray = tcmList.getJSONArray("TrafficControlMessage"); @@ -90,7 +90,7 @@ public List convertCloudDataToString(String incomingCloudData){ } else{ // If object is not a JSONArray it must be JSONObject - outputTcmMsgs.add(incomingCloudData); + outputTcmMsgs.add(incomingCloudData); } } catch (Exception e) { @@ -100,9 +100,9 @@ public List convertCloudDataToString(String incomingCloudData){ else{ outputTcmMsgs.add(incomingCloudData); } - + return outputTcmMsgs; - + } /** @@ -116,12 +116,12 @@ public boolean getInfluxConnected() { * Create an influxdb client using the configuration parameters in the config.properties and enable * asynchronous writing to the database. */ - public void influxConnect() { + public void influxConnect() { logger.debug("Attempting to create influxdb client"); try { OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient().newBuilder() - .connectTimeout(influxConfig.influxConnectTimeout, TimeUnit.MILLISECONDS) + .connectTimeout(influxConfig.influxConnectTimeout, TimeUnit.MILLISECONDS) .writeTimeout(influxConfig.influxWriteTimeout, TimeUnit.MILLISECONDS); InfluxDBClientOptions options = InfluxDBClientOptions .builder() @@ -130,7 +130,7 @@ public void influxConnect() { .org(influxConfig.influxOrg) .bucket(influxBucket) .okHttpClient(okHttpClientBuilder) - .build(); + .build(); influxDBClient = InfluxDBClientFactory.create(options); logger.info("Successfully created influxdb client"); @@ -142,7 +142,7 @@ public void influxConnect() { //Create a new asynchronous non-blocking Write client. writeApi = influxDBClient.makeWriteApi(); - } + } /** * @param publishData The data to publish to influxdb @@ -152,14 +152,14 @@ public void influxConnect() { public void publish(String publishData) { try { String influxRecord = influxStringConverter(publishData); - + logger.info("Sending to influxdb: {}" , influxRecord); writeApi.writeRecord(WritePrecision.US, influxRecord); writeApi.flush(); } catch (Exception e) { logger.error(ExceptionUtils.getStackTrace(e)); - } + } } /** @@ -171,11 +171,11 @@ public void publishCloudData(String publishData) { for(String cloudData : cloudDataList){ publish(cloudData); } - + } catch (Exception e) { logger.error(ExceptionUtils.getStackTrace(e)); - } + } } /** @@ -192,7 +192,7 @@ public String influxStringConverter(String publishData) { JSONObject publishDataJson = new JSONObject(publishData); JSONObject payloadJson = publishDataJson.getJSONObject("payload"); - + String flattenedPayloadJson = jsonFlattener.flattenJsonStr(payloadJson.toString()); String keyValuePairs = keyValueConverter.convertJson2KeyValuePairs(flattenedPayloadJson, influxConfig.toStrFields, influxConfig.ignoreFields); @@ -207,6 +207,6 @@ public String influxStringConverter(String publishData) { return eventName + "," + "unit_id=" + unitId + "," + "unit_type=" + unitType + "," + "location=" + location + "," + "testing_type=" + testingType + "," + "topic_name=" + topicName + " " + keyValuePairs + " " + timestamp; } - - + + } diff --git a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/NatsConsumer.java b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/NatsConsumer.java index 9738c97f..431ea40c 100644 --- a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/NatsConsumer.java +++ b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/NatsConsumer.java @@ -22,8 +22,8 @@ import io.nats.client.Options; /** - * The NatsConsumer object creates a connection to the telematic nats server and subscribes to - * all available subjects with ".data" in the subject name. It instantiates an InfluxDataWriter object + * The NatsConsumer object creates a connection to the telematic nats server and subscribes to + * all available subjects with ".data" in the subject name. It instantiates an InfluxDataWriter object * that is used to publish the received data to the Influx database. */ public class NatsConsumer { @@ -86,7 +86,7 @@ public String getNatsURI() { * Attempt to connect to the nats server using the uri from the application.properties file * @param uri The uri of the nats server to connect to */ - public void natsConnect() { + public void natsConnect() { try { Options options = new Options.Builder().server(natsUri).maxReconnects(natsMaxReconnects).build(); this.nc = Nats.connect(options); @@ -95,19 +95,17 @@ public void natsConnect() { } catch (InterruptedException | IOException e) { logger.error(ExceptionUtils.getStackTrace(e)); - /* Clean up whatever needs to be handled before interrupting */ - Thread.currentThread().interrupt(); } } - + /** * This method is used to get the list of available topics for each unit that is stored in NATS and updatte * the topic list variable */ public void updateAvailableTopicList() { - + for (String unitId: unitIdList) { - try + try { //Use nats request/reply to get available topics for the unit String availableTopicString = unitId + ".available_topics"; @@ -116,14 +114,14 @@ public void updateAvailableTopicList() { String reply = new String(msg.getData(), StandardCharsets.UTF_8); logger.debug("{} NatsConsumer available topics [{}] request. Reply: {}", this.unitType, availableTopicString, reply); - JSONObject jsonObject = new JSONObject(reply); + JSONObject jsonObject = new JSONObject(reply); Object topicsObject = jsonObject.get("topics"); //Add the topics to the topic list if they don't already exist if(topicsObject instanceof JSONArray) { JSONArray topics = (JSONArray)topicsObject; - for(int i=0; i newTo //calculate the number of dispatchers to create based on the topic list size int numberDispatchers = 1; - - //if there is a remainder in division, need to add 1 dispatcher + + //if there is a remainder in division, need to add 1 dispatcher if ((newTopicListSize % topicsPerDispatcher) > 0) { numberDispatchers = (newTopicListSize / topicsPerDispatcher) + 1; } @@ -217,7 +213,7 @@ public void asyncSubscribe(InfluxDataWriter influxDataWriter, List newTo logger.info("{} NatsConsumer dispatcher {} subscribed to {}{}", unitType, i, natsSubscribeStr, topicStr); } - } + } } /** @@ -243,6 +239,6 @@ public void unitStatusCheck(InfluxDataWriter influxDataWriter) { { asyncSubscribe(influxDataWriter, currentListCopy); } - + } } diff --git a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/NatsInfluxPush.java b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/NatsInfluxPush.java index f65aa9a8..fde7ba1f 100644 --- a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/NatsInfluxPush.java +++ b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/NatsInfluxPush.java @@ -10,14 +10,14 @@ import com.telematic.telematic_cloud_messaging.exceptions.ConfigurationException; /** - * The NatsInfluxPush object instantiates a NatsConsumer that creates a connection to the telematic nats server + * The NatsInfluxPush object instantiates a NatsConsumer that creates a connection to the telematic nats server * and subscribes to all available subjects. It also instantiates an InfluxDataWriter object that is used to publish the * received data to the Influx database. */ @Component @Profile("!test") //Skip Unit test on the CommandLineRunner task public class NatsInfluxPush implements CommandLineRunner { - + private static final Logger logger = LoggerFactory.getLogger(NatsInfluxPush.class); @@ -30,7 +30,7 @@ public class NatsInfluxPush implements CommandLineRunner { public NatsInfluxPush() { logger.info("Creating new NatsInfluxPush"); } - + public void initDataPersistentService(Config.BucketType bucketType) { // Create NATS and InfluxWriter @@ -108,7 +108,7 @@ private void adjustConfig() { logger.error("Bucket type cannot be null: {}. Error: {}", config.influxBucketTypeStr, ex.getMessage()); throw new ConfigurationException(String.format("M_INFLUX_BUCKET_TYPE: %s is not found!", config.influxBucketTypeStr)); } - + } /** @@ -116,9 +116,9 @@ private void adjustConfig() { */ @Override public void run(String... args) { - adjustConfig(); + adjustConfig(); if (config.influxBucketType == Config.BucketType.ALL) { - + for (Config.BucketType configType : Config.BucketType.values()) { new Thread() { @Override @@ -130,7 +130,7 @@ public void run(){ }.start(); } } - else if(config.influxBucketType.equals(Config.BucketType.PLATFORM) || config.influxBucketType.equals(Config.BucketType.STREETS) || + else if(config.influxBucketType.equals(Config.BucketType.PLATFORM) || config.influxBucketType.equals(Config.BucketType.STREETS) || config.influxBucketType.equals(Config.BucketType.CLOUD)) { // Create thread for specified type @@ -144,6 +144,6 @@ public void run(){ else{ logger.error("Invalid bucket type requested. Options are PLATFORM, STREETS, CLOUD and ALL"); } - + } } diff --git a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/web_services/UnitsStatusService.java b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/web_services/UnitsStatusService.java index 46cf3f7c..3b0bdad5 100644 --- a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/web_services/UnitsStatusService.java +++ b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/web_services/UnitsStatusService.java @@ -154,7 +154,7 @@ public void run(String... args) throws Exception { logger.info("Events status is reset!"); } catch (Exception e) { logger.error("Cannot reset events status! ERROR: {}" , e.getMessage()); - } + } Connection conn = natsConn.getConnection(); if (conn != null) { logger.debug("register units subscribe to subject: " + registerUnit); diff --git a/telematic_system/telematic_cloud_messaging/src/main/resources/application.properties b/telematic_system/telematic_cloud_messaging/src/main/resources/application.properties index 1596ed1f..176b4418 100644 --- a/telematic_system/telematic_cloud_messaging/src/main/resources/application.properties +++ b/telematic_system/telematic_cloud_messaging/src/main/resources/application.properties @@ -24,8 +24,8 @@ M_INFLUX_ORG=${MESSAGING_INFLUX_ORG} M_INFLUX_TOKEN=${MESSAGING_INFLUX_TOKEN} #Edge case for hostBSMId, sender_bsm_id, core_data.id and TCR/TCM ID where the Ids can be all digits or alpha characters M_TO_STR_FIELDS=${MESSAGING_TO_STR_FIELDS} -# Ignore data fields from message and do not save it into the influxDB. -# This is to prevent the data type conflicts: Some fields have random values like characters or numbers overtime. +# Ignore data fields from message and do not save it into the influxDB. +# This is to prevent the data type conflicts: Some fields have random values like characters or numbers overtime. # This confuse the influxDB as it does not know what type of field should be created to accommodate different values. M_IGNORE_FIELDS=${MESSAGING_IGNORE_FIELDS} # Connection timeout to influx bucket. Unit: milliseconds @@ -34,7 +34,7 @@ M_INFLUX_CONNECT_TIMEOUT=${MESSAGING_INFLUX_CONNECT_TIMEOUT} M_INFLUX_WRITE_TIMEOUT=${MESSAGING_INFLUX_WRITE_TIMEOUT} # MySQL connection properties spring.datasource.driver-class-name=${MESSAGING_DB_DRIVER} -spring.jpa.hibernate.ddl-auto=none +spring.jpa.hibernate.ddl-auto=none spring.datasource.url=${MESSAGING_DB_URL} spring.datasource.username=${MESSAGING_DB_USERNAME} spring.datasource.password=${MESSAGING_DB_PASSWORD} diff --git a/telematic_system/telematic_cloud_messaging/src/test/java/com/telematic/telematic_cloud_messaging/message_converters/JSON2KeyValuePairsConverterTests.java b/telematic_system/telematic_cloud_messaging/src/test/java/com/telematic/telematic_cloud_messaging/message_converters/JSON2KeyValuePairsConverterTests.java index 0e685a88..7ed59a37 100644 --- a/telematic_system/telematic_cloud_messaging/src/test/java/com/telematic/telematic_cloud_messaging/message_converters/JSON2KeyValuePairsConverterTests.java +++ b/telematic_system/telematic_cloud_messaging/src/test/java/com/telematic/telematic_cloud_messaging/message_converters/JSON2KeyValuePairsConverterTests.java @@ -50,7 +50,7 @@ public class JSON2KeyValuePairsConverterTests { @Autowired JSON2KeyValuePairsConverter converter; - + @Test public void convertJson2KeyValuePairs() { List to_str_fields = Arrays.asList("hostBSMId,TrafficControlRequest.reqid,tcmV01.reqid".split(",")); diff --git a/telematic_system/telematic_units/carma_cloud_bridge/cloud_nats_bridge/src/cloud_nats_bridge/cloud_nats_bridge.py b/telematic_system/telematic_units/carma_cloud_bridge/cloud_nats_bridge/src/cloud_nats_bridge/cloud_nats_bridge.py index 1728aabf..88c15b80 100755 --- a/telematic_system/telematic_units/carma_cloud_bridge/cloud_nats_bridge/src/cloud_nats_bridge/cloud_nats_bridge.py +++ b/telematic_system/telematic_units/carma_cloud_bridge/cloud_nats_bridge/src/cloud_nats_bridge/cloud_nats_bridge.py @@ -59,7 +59,6 @@ def __init__(self, cc_logpath, bridge_logname, tcr_search_string, tcm_search_str f.close() self.logger.info("FileListener created for: " + str(self.cc_log_path)) - def findNewCarmaCloudMessage(self): """This method will parse the newly generated line in the carma cloud log file and assign the xml and message type to the appropriate global variables. It also assigns the epoch_time @@ -230,6 +229,7 @@ def xmlToJson(self, xmlString): return json_data + async def queue_send(self): self.logger.info("In queue send")