Skip to content

Commit

Permalink
Resolve merge conflicts on neon (#252)
Browse files Browse the repository at this point in the history
* initial queue implementation cc bridge

* add some comments

* fix unsub logic (#126)

* init

* cc bridge simplify logic

* Cloud bridge investigation (#131)

* initial queue implementation cc bridge

* add some comments

* cc bridge simplify logic

* back to queue, add in file listener

* Fix unit status and UI notification and event live status (#129)

* init

* add comments

* fix warning

* update text

* Remove credentials (#134)

* updating log levels, docker compose log size parameter

* revert local changes

* init

* remove hardcoded values

* add analysis scripts

* messaging server log size parameter

* init

* add nan

* Fix json key value convertor logic (#146)

fix comparison

* Fix/drop nan value fields (#148)

drop fields with NaN values

* clean up readme.md

* fix merge conflicts

* fix - revert change

* revert imports

---------

Co-authored-by: Abey <abey.yoseph@leidos.com>
Co-authored-by: dan-du-car <dan.du@leidos.com>
Co-authored-by: abey-yoseph <59704440+abey-yoseph@users.noreply.github.com>
Co-authored-by: dan-du-car <62157949+dan-du-car@users.noreply.github.com>
Co-authored-by: Saikrishna Bairamoni <84093461+SaikrishnaBairamoni@users.noreply.github.com>
Co-authored-by: Cody Garver <codygarver@users.noreply.github.com>
  • Loading branch information
7 people authored Aug 27, 2024
1 parent eecec73 commit 0dc84cf
Show file tree
Hide file tree
Showing 14 changed files with 189 additions and 196 deletions.
11 changes: 5 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,29 @@
|-----|-----|-----|
[![Docker build](https://github.com/usdot-fhwa-stol/cda-telematics/actions/workflows/docker.yml/badge.svg?branch=develop)](https://github.com/usdot-fhwa-stol/cda-telematics/actions/workflows/docker.yml)| [![Docker build](https://github.com/usdot-fhwa-stol/cda-telematics/actions/workflows/docker.yml/badge.svg?branch=master)](https://github.com/usdot-fhwa-stol/cda-telematics/actions/workflows/docker.yml)| [![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=usdot-fhwa-stol_cda-telematics&metric=alert_status)](https://sonarcloud.io/summary/new_code?id=usdot-fhwa-stol_cda-telematics)


# CDA-Telematics
This project will create an open-source Module that can be installed on any vehicle (e.g. a CARMA Platform and/or Messenger vehicle, an L0 or L1 production vehicle, etc.) that will collect data about the vehicle and wirelessly send it out in real time for data analysis. The same Module, with any modifications, if necessary, will also be compatible with CARMA Streets and CARMA Cloud. On the receiving end of this data, a user will have a Data Processing & Visualization Tool available to visualize and/or plot the data that was sent using the Module(s). This Module can be thought of as a Fleet Management tool with extra capabilities to support CDA research and education.

## Architecture Diagram
[Detailed Design](https://usdot-carma.atlassian.net/wiki/spaces/WFD2/pages/2230321179/Detailed+System+Design)

![architecture](https://user-images.githubusercontent.com/34483068/171265484-67177ebb-69f7-4286-9602-016043079958.png)

## Release Notes
The current version of CDA-Telematics tool and release history of the CARMA software platform: [CARMA Release Notes](<docs/Release_notes.md>)

## Documentation
Documentation of the setup, operation, and design of the CDA Telematics can be found on the project [Confluence](https://usdot-carma.atlassian.net/wiki/spaces/WFD2/overview) pages.
Documentation of the setup, operation, and design of the CDA Telematics can be found on the project [Confluence](https://usdot-carma.atlassian.net/wiki/spaces/WFD2/overview) pages.


## Contribution
Welcome to the CDA Telematics contributing guide. Please read this guide to learn about our development process, how to propose pull requests and improvements, and how to build and test your changes to this project. [CDA Telematics Contributing Guide](Contributing.md)
Welcome to the CDA Telematics contributing guide. Please read this guide to learn about our development process, how to propose pull requests and improvements, and how to build and test your changes to this project. [CDA Telematics Contributing Guide](Contributing.md)

## Code of Conduct
## Code of Conduct
Please read our [CDA Telematics Code of Conduct](Code_of_Conduct.md) which outlines our expectations for participants within the developer community, as well as steps to reporting unacceptable behavior. We are committed to providing a welcoming and inspiring community for all and expect our code of conduct to be honored. Anyone who violates this code of conduct may be banned from the community.

## Attribution
The development team would like to acknowledge the people who have made direct contributions to the design and code in this repository. [CDA Telematics Attribution](ATTRIBUTION.md)
The development team would like to acknowledge the people who have made direct contributions to the design and code in this repository. [CDA Telematics Attribution](ATTRIBUTION.md)

## License
By contributing to the Federal Highway Administration (FHWA) CDA Telematics repository, you agree that your contributions will be licensed under its Apache License 2.0 license. [CDA Telematics License](<docs/License.md>)
Expand Down
24 changes: 12 additions & 12 deletions telematic_system/scripts/log_analysis/get_message_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
'''
This script combines bridge logs with the messaging server logs to give the number of dropped messages from each unit.
Input: The script looks within the the argument directory for csv files from Messaging Server, Vehicle Bridge, Streets Bridge and Cloud Bridge log
Input: The script looks within the the argument directory for csv files from Messaging Server, Vehicle Bridge, Streets Bridge and Cloud Bridge log
,which are parsed log output from the bridges, to calculate the number of dropped messages from each unit.
Required Input File Format:
The csv files to be read currently need to follow a specific format.
Required Input File Format:
The csv files to be read currently need to follow a specific format.
The messaging server parsed csv needs to start with the word "Messaging" separated by underscores
Streets bridge parsed csv file name needs to start with the word Streets separated by underscores(_)
Vehicle bridge parsed csv file name needs to start with the word Vehicle or BlueLexus or Fusion separated by underscores(_)
Expand All @@ -32,15 +32,15 @@ def combineFiles(log_dir):
path_obj = Path(log_dir)
print(log_dir)
filenames = [ f.name for f in path_obj.glob('*.csv')]

bridge_csv_exist = False
bridge_csv_regex = r'.*(Streets|Vehicle|BlueLexus|Fusion|V2xHub|Cloud|Ros2).*'
bridges_csv = []

messaging_server_csv_exist = False
messaging_server_csv = []

for filename in filenames:
for filename in filenames:
if "messaging" in filename.lower():
messaging_server_csv_exist = True
messaging_server_csv.append(log_dir + "/" + filename)
Expand All @@ -49,10 +49,10 @@ def combineFiles(log_dir):
if matched:
bridges_csv.append(log_dir + "/" + filename)
bridge_csv_exist = True

if not bridge_csv_exist:
sys.exit("Did not find any Vehicle/Streets/Cloud/BlueLexus/Fusion/V2xHub bridge csv logs in directory: " +log_dir+ "")

if not messaging_server_csv_exist:
sys.exit("Did not find any Messaging server csv logs in directory: "+log_dir+ "")

Expand All @@ -68,13 +68,13 @@ def combineFiles(log_dir):
if key not in infrastructure_units:
value = value[~value['Message Time'].isnull()]
# value = value.drop('Metadata',axis =1)


bridge_df = pd.concat(map(pd.read_csv, bridges_csv), ignore_index=True)
bridge_dfs = dict(tuple(bridge_df.groupby('Unit Id')))


# Create combined dataframes from
# Create combined dataframes from
for key in bridge_dfs:
if key in messaging_server_dfs:
bridge_df_combined = pd.merge(bridge_dfs[key], messaging_server_dfs[key], how='left', left_on=['Topic','Payload Timestamp'], right_on = ['Topic','Message Time'])
Expand All @@ -92,11 +92,11 @@ def combineFiles(log_dir):

topics_with_empty_count = (bridge_df_combined['Message Time'].isnull().groupby([bridge_df_combined['Topic']]).sum().astype(int).reset_index(name='count'))
topics_with_empty_count = topics_with_empty_count.loc[~(topics_with_empty_count['count']==0)]

print("{} missed messages: ".format(key))
print(topics_with_empty_count)





Expand Down
11 changes: 4 additions & 7 deletions telematic_system/scripts/log_analysis/latencyPlotter.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ def plotter(folderName):
units = ["DOT-45244", "DOT-45254","DOT_45254","vehicle_id","rsu_1234","streets_id","cloud_id"]
for unit in units:
unit_data = trimmed_data[trimmed_data['Unit Id'] == unit]

if len(unit_data) > 0:
fig, ax1 = plt.subplots()
fig.set_size_inches(10, 10)
fig.set_size_inches(10, 10)
sns.histplot(unit_data['Delay(s)'], kde=False)
plt.xlim(0, 0.75)
plt.xlabel('Latency(s)', fontsize=18)
Expand All @@ -68,14 +68,11 @@ def plotter(folderName):
def main():
if len(sys.argv) < 2:
print('Run with: "python3 latencyPlotter.py testcase"')
else:
else:
test = sys.argv[1]

concatRuns(test)
plotter(test)

if __name__ == "__main__":
main()



Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

def parseInfluxfile(logname, start_time_epoch, end_time_epoch, run_num):
fileName = logname.split(".")[0]

with open(logname,'r') as influx_log:

delay_results_file = open('{}_{}_delay_parsed.csv'.format(fileName,run_num), 'w')
Expand Down Expand Up @@ -56,36 +56,36 @@ def parseInfluxfile(logname, start_time_epoch, end_time_epoch, run_num):
# Since the message is a flattened json, it needs to be split manually and read to extract required info (unit_id, topic name, message/payload timestamp and log timestamp)
for line in influx_log:
split_line = line.split(":")

if search_string in split_line:
# Get log json
try:
json_object = json.loads(line)
except:
skipped_lines_count += 1
continue

log_line = json_object["log"]
# Get write time
# Get write time
write_time_split = log_line.split("INFO")
write_time_string = write_time_split[0][:len(write_time_split[0]) - 2]
log_time_in_datetime = datetime.datetime.strptime(write_time_string, '%Y-%m-%dT%H:%M:%S.%fZ')


payload_index = log_line.index(search_string) + 1
payload = log_line[payload_index + len(search_string) + 1:]

# Convert Payload to a json

payload = "event=" + payload
# Remove timestamp at end of line
payload = payload.rsplit(" ", 1)[0]
payload_list = payload.split(",")

unit_type = ""
topic_name = ""
metadata_field = ""

for item in payload_list:

# Get topic name
Expand All @@ -102,33 +102,33 @@ def parseInfluxfile(logname, start_time_epoch, end_time_epoch, run_num):
if "unit_id" in item_split:
unit_id = item_split[1]
continue

# If topic is map_msg, get timestamp from metadata.timestamp
if topic_name == "v2xhub_map_msg_in":
if "metadata.timestamp" in item_split:
# Get metadata timestamp
message_timestamp = int(item_split[1].split(' ')[0])
time_in_s = message_timestamp/1000
payload_time_in_datetime = datetime.datetime.fromtimestamp(time_in_s)
payload_time_in_datetime = datetime.datetime.fromtimestamp(time_in_s)
payload_time_in_datetime = (payload_time_in_datetime.astimezone(pytz.utc)).replace(tzinfo=None)



# Convert timestamp to datetime
try:
# Get payload timestamp from bridge
if topic_name != "v2xhub_map_msg_in":
payload_timestamp_string = str(log_line.split(" ")[-1])[:-1]
payload_timestamp = float(payload_timestamp_string)
payload_time_in_datetime = datetime.datetime.fromtimestamp(payload_timestamp/1e6)
payload_time_in_datetime = datetime.datetime.fromtimestamp(payload_timestamp/1e6)
payload_time_in_datetime = (payload_time_in_datetime.astimezone(pytz.utc)).replace(tzinfo=None)

except:
print("Could not get payload timestamp from topic: {}. Skipping".format(topic_name))
continue



# If within test window
if log_time_in_datetime > start_time and log_time_in_datetime < end_time :
delay = (log_time_in_datetime - payload_time_in_datetime).total_seconds()
Expand All @@ -137,25 +137,25 @@ def parseInfluxfile(logname, start_time_epoch, end_time_epoch, run_num):
else:
delay_records.append(delay)
delay_writer.writerow([unit_id, topic_name, payload_time_in_datetime, log_time_in_datetime, delay])

message_drop_writer.writerow([unit_id, topic_name, payload_time_in_datetime, log_time_in_datetime])

if topic_name not in unique_topics:
unique_topics.append(topic_name)
unique_topics.append(topic_name)

total_rows += 1

if log_time_in_datetime > end_time:
break



print("Number of unique topics: ", len(unique_topics))
print(unique_topics)

## Calculate required statistics
delay_np_array = np.array(delay_records)
if delay_np_array.size > 1:
if delay_np_array.size > 1:
delay_max = np.amax(delay_np_array)
delay_mean = np.mean(delay_np_array)
delay_stdev = np.std(delay_np_array)
Expand All @@ -172,52 +172,49 @@ def read_log_table():
log_df = pd.read_csv(log_csv)
log_df = log_df.dropna()
log_df_dict = dict(tuple(log_df.groupby('Test Case')))

return log_df_dict


def main():
if len(sys.argv) < 2:
print('Run with: "python3 parse_messaging_server_logs.py logname"')
else:
else:
logname = sys.argv[1]
log_timesheet_df = read_log_table()

test_case = (logname.split("/")[-1]).split("_")[0]
runs_string = ((logname.split("/")[-1]).split("_")[1].split(".")[0])[1:]
runs_range_split = runs_string.split('-')
if len(runs_range_split) == 1:
runs_range = range(int(runs_range_split[0]),int(runs_range_split[0]) + 1)
else:
runs_range = range(int(runs_range_split[0]),int(runs_range_split[1]) + 1)


test_df = log_timesheet_df[test_case]

for index in range(0, len(test_df)):
start_time_epoch = test_df['Start Time'].values[index]
end_time_epoch = test_df['End Time'].values[index]


local = pytz.timezone("America/New_York")

local = pytz.timezone("America/New_York")


run_num = test_df['Run'].values[index].split('R')[1]
if int(run_num) in runs_range:

if int(run_num) in runs_range:
print("start time epoch: " + str(start_time_epoch))
print("end time epoch: " + str(end_time_epoch))
print("test case: "+ test_case)
print("runs_string: "+ runs_string)
print(runs_range)
print("Run num: ", run_num)
parseInfluxfile(logname, start_time_epoch, end_time_epoch, run_num)



if __name__ == "__main__":
main()



Loading

0 comments on commit 0dc84cf

Please sign in to comment.