From c0955406f81c2e87d6b27296cce871a2c6839587 Mon Sep 17 00:00:00 2001 From: Saikrishna Bairamoni <84093461+SaikrishnaBairamoni@users.noreply.github.com> Date: Tue, 28 Nov 2023 15:26:49 -0500 Subject: [PATCH] Merge Master branch changes to develop (#159) * initial queue implementation cc bridge * add some comments * fix unsub logic (#126) * init * cc bridge simplify logic * Cloud bridge investigation (#131) * initial queue implementation cc bridge * add some comments * cc bridge simplify logic * back to queue, add in file listener * Fix unit status and UI notification and event live status (#129) * init * add comments * fix warning * update text * Remove credentials (#134) * updating log levels, docker compose log size parameter * revert local changes * init * remove hardcoded values * add analysis scripts * messaging server log size parameter * init * add nan * Fix json key value convertor logic (#146) fix comparison * Fix/drop nan value fields (#148) drop fields with NaN values * clean up readme.md --------- Co-authored-by: Abey Co-authored-by: Anish_deva <51463994+adev4a@users.noreply.github.com> Co-authored-by: dan-du-car Co-authored-by: abey-yoseph <59704440+abey-yoseph@users.noreply.github.com> Co-authored-by: dan-du-car <62157949+dan-du-car@users.noreply.github.com> Co-authored-by: Anish Co-authored-by: Cody Garver --- README.md | 19 +-- .../docker-compose.cloud.servers.yml | 6 +- telematic_system/docker-compose.units.yml | 6 +- telematic_system/telematic.env | 6 +- .../web_app/client/src/api/api-topics.js | 2 +- .../client/src/api/user_topic_request.js | 6 +- .../web_app/client/src/pages/TopicPage.js | 15 +- .../user_topic_request.controller.js | 10 +- .../JSON2KeyValuePairsConverter.java | 50 ++++-- .../nats_influx_connection/Config.java | 7 +- .../InfluxDataWriter.java | 12 +- .../nats_influx_connection/NatsConsumer.java | 7 - .../NatsInfluxPush.java | 3 + .../repository/EventRepository.java | 8 +- .../repository/EventsService.java | 3 + .../web_services/UnitsStatusService.java | 7 + .../src/main/resources/application.properties | 12 +- .../JSON2KeyValuePairsConverterTests.java | 27 +++- .../JSONFlattenerHelperTests.java | 39 +++++ .../cloud_nats_bridge/cloud_nats_bridge.py | 146 ++++++++---------- .../src/cloud_nats_bridge/main.py | 2 +- .../ros2_nats_bridge/ros2_nats_bridge/api.py | 21 ++- 22 files changed, 245 insertions(+), 169 deletions(-) diff --git a/README.md b/README.md index 727eef86..e92267cb 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,9 @@ -| Sonar Code Quality | -|--------------------| -[![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=usdot-fhwa-stol_cda-telematics&metric=alert_status)](https://sonarcloud.io/dashboard?id=usdot-fhwa-stol_cda-telematics) | - -# GitHub Actions Build status +## GitHub Actions Build status | Carma-streets-bridge | Carma-vehicle-bridge | telematics-cloud-messaging | |-----|-----|-----| [![carma-streets-bridge](https://github.com/usdot-fhwa-stol/cda-telematics/actions/workflows/carma-streets-bridge.yml/badge.svg?branch=feature_gha)](https://github.com/usdot-fhwa-stol/cda-telematics/actions/workflows/carma-streets-bridge.yml) | [![carma-vehicle-bridge](https://github.com/usdot-fhwa-stol/cda-telematics/actions/workflows/carma-vehicle-bridge.yml/badge.svg)](https://github.com/usdot-fhwa-stol/cda-telematics/actions/workflows/carma-vehicle-bridge.yml) | [![telematic-cloud-messaging](https://github.com/usdot-fhwa-stol/cda-telematics/actions/workflows/telematic-cloud-messaging.yml/badge.svg)](https://github.com/usdot-fhwa-stol/cda-telematics/actions/workflows/telematic-cloud-messaging.yml) -# DockerHub Release Builds -| Carma streets nats bridge | Carma streets vehicle nats bridge | telematic cloud messaging | -|-----|-----|-----| -[![Docker Cloud Build Status](https://img.shields.io/docker/cloud/build/usdotfhwastoldev/carma_street_nats_bridge?label=carma_street_nats_bridge)](https://hub.docker.com/repository/docker/usdotfhwastoldev/carma_street_nats_bridge) | [![Docker Cloud Build Status](https://img.shields.io/docker/cloud/build/usdotfhwastoldev/carma_vehicle_nats_bridge?label=carma_vehicle_nats_bridge)](https://hub.docker.com/repository/docker/usdotfhwastoldev/carma_vehicle_nats_bridge) | [![Docker Cloud Build Status](https://img.shields.io/docker/cloud/build/usdotfhwastoldev/telematic_cloud_messaging?label=telematic_cloud_messaging)](https://hub.docker.com/repository/docker/usdotfhwastoldev/telematic_cloud_messaging) -# DockerHub Release Candidate Builds -Need to add the build status checks for release candidate builds during first demo release. -# DockerHub Develop Builds -Need to add the build status checks for official release builds during first demo release. - -# cda-telematics +# CDA-Telematics This project will create an open-source Module that can be installed on any vehicle (e.g. a CARMA Platform and/or Messenger vehicle, an L0 or L1 production vehicle, etc.) that will collect data about the vehicle and wirelessly send it out in real time for data analysis. The same Module, with any modifications, if necessary, will also be compatible with CARMA Streets and CARMA Cloud. On the receiving end of this data, a user will have a Data Processing & Visualization Tool available to visualize and/or plot the data that was sent using the Module(s). This Module can be thought of as a Fleet Management tool with extra capabilities to support CDA research and education. ## Architecture Diagram @@ -24,6 +11,8 @@ This project will create an open-source Module that can be installed on any vehi ![architecture](https://user-images.githubusercontent.com/34483068/171265484-67177ebb-69f7-4286-9602-016043079958.png) +## Release Notes +The current version of CDA-Telematics tool and release history of the CARMA software platform: [CARMA Release Notes]() ## Documentation Documentation of the setup, operation, and design of the CDA Telematics can be found on the project [Confluence](https://usdot-carma.atlassian.net/wiki/spaces/WFD2/overview) pages. diff --git a/telematic_system/docker-compose.cloud.servers.yml b/telematic_system/docker-compose.cloud.servers.yml index 5eec9267..c7144f3b 100644 --- a/telematic_system/docker-compose.cloud.servers.yml +++ b/telematic_system/docker-compose.cloud.servers.yml @@ -5,7 +5,7 @@ services: restart: always logging: options: - max-size: "10m" + max-size: "2g" max-file: "1" network_mode: host @@ -17,8 +17,8 @@ services: restart: always logging: options: - max-size: "10m" + max-size: "2g" max-file: "1" network_mode: host environment: - - NATS_URI=nats://localhost:4222 \ No newline at end of file + - NATS_URI=nats://localhost:4222 diff --git a/telematic_system/docker-compose.units.yml b/telematic_system/docker-compose.units.yml index 1b71cb20..a8bbcd4b 100755 --- a/telematic_system/docker-compose.units.yml +++ b/telematic_system/docker-compose.units.yml @@ -7,7 +7,7 @@ services: image: usdotfhwastoldev/carma_vehicle_nats_bridge:develop logging: options: - max-size: "10m" + max-size: "2g" max-file: "1" container_name: carma_vehicle_bridge network_mode: host @@ -35,7 +35,7 @@ services: image: usdotfhwastoldev/carma_street_nats_bridge:develop logging: options: - max-size: "10m" + max-size: "2g" max-file: "1" container_name: carma_street_bridge network_mode: host @@ -63,7 +63,7 @@ services: image: usdotfhwastoldev/carma_cloud_nats_bridge:develop logging: options: - max-size: "10m" + max-size: "2g" max-file: "1" container_name: carma_cloud_bridge network_mode: host diff --git a/telematic_system/telematic.env b/telematic_system/telematic.env index cf19b00f..a3574f2d 100644 --- a/telematic_system/telematic.env +++ b/telematic_system/telematic.env @@ -1,10 +1,10 @@ INFLUXDB_DEV_TAG=latest #Required: The tag for influxDB image version. Current latest version is 2.4 -INFLUXDB_DEV_INIT_USERNAME=admin #Required: Create a credential username on container initial startup. -INFLUXDB_DEV_INIT_PASSWORD=adminpwd #Required: Create a credential password on container initial startup. +INFLUXDB_DEV_INIT_USERNAME= #Required: Create a credential username on container initial startup. +INFLUXDB_DEV_INIT_PASSWORD= #Required: Create a credential password on container initial startup. INFLUXDB_DEV_INIT_BUCKET=platform-dev #Required: Create an bucket on container initial startup. You can create more buckets inside the influxDB container. INFLUXDB_DEV_ORG=my-org #Required: Create an organization on container startup. You can create more orgs inside the same influxDB container. INFLUXDB_DEV_RETENTION=7d #Optional: If not set, default is 0s meaning the initial bucket will retain data forever. -NATS_SERVER_IP_PORT=44.206.13.7:4222 #change to amazon ec2 ipv4 address +NATS_SERVER_IP_PORT=: #change to amazon ec2 ipv4 address:nats_port ##Cloud Bridge params CARMA_CLOUD_LOG=/opt/tomcat/logs/carmacloud.log #Required for carma cloud bridge, containing the directory where carma cloud log is located CARMA_CLOUD_BRIDGE_UNIT_ID=cloud_id diff --git a/telematic_system/telematic_apps/web_app/client/src/api/api-topics.js b/telematic_system/telematic_apps/web_app/client/src/api/api-topics.js index d835a9e6..769ebe6c 100644 --- a/telematic_system/telematic_apps/web_app/client/src/api/api-topics.js +++ b/telematic_system/telematic_apps/web_app/client/src/api/api-topics.js @@ -54,7 +54,7 @@ const requestSelectedLiveUnitsTopics = async (seletedUnitTopicListToConfirm) => } catch (err) { sentStatus.push({ errCode: err.response !== undefined ? err.response.status : "404", - errMsg: err.response !== undefined ? unitStatus + err.response.statusText + err.response.data : unitStatus + "No reponse from server." + errMsg: err.response !== undefined ? unitStatus + err.response.statusText +". " + err.response.data : unitStatus + "No reponse from server." }); } return sentStatus; diff --git a/telematic_system/telematic_apps/web_app/client/src/api/user_topic_request.js b/telematic_system/telematic_apps/web_app/client/src/api/user_topic_request.js index e608a679..45134b4b 100644 --- a/telematic_system/telematic_apps/web_app/client/src/api/user_topic_request.js +++ b/telematic_system/telematic_apps/web_app/client/src/api/user_topic_request.js @@ -30,10 +30,11 @@ const upsertUserTopicRequestForEventUnits = async (seletedUnitsTopics, user_id) /** *@brief Load or find the user topic request for the given event and list of units * @Params event id used to uniquely identifer each event + * @Params current user id to exclude * @Params selectedUnitIdentifiers: A list of unit identifiers. Each unit identifier is a string and is used to uniquely identify each unit. * @Return Response status and load a bulk of topics for each event and list of units for the event */ -const findAllUserTopicRequestByEventUnits = async (event_id, selectedUnitIdentifiers) => { +const findUsersTopicRequestByEventUnits = async (event_id, selectedUnitIdentifiers, exclude_user_id) => { const URL = `${process.env.REACT_APP_WEB_SERVER_URI}/api/user_topic_request/all` if (selectedUnitIdentifiers.length === 0 || event_id === 0 || event_id === undefined) { return { errCode: CanceledError.ERR_BAD_REQUEST, errMsg: "Event id or units cannot be empty" }; @@ -44,6 +45,7 @@ const findAllUserTopicRequestByEventUnits = async (event_id, selectedUnitIdentif withCredentials: true, params: { event_id: event_id, + exclude_user_id: exclude_user_id, unit_identifiers: selectedUnitIdentifiers, }, },); @@ -83,4 +85,4 @@ const findAllUserTopicRequestByEventUnits = async (event_id, selectedUnitIdentif } } -export { upsertUserTopicRequestForEventUnits,findUserTopicRequestByUserEventUnits, findAllUserTopicRequestByEventUnits, } +export { upsertUserTopicRequestForEventUnits,findUserTopicRequestByUserEventUnits, findUsersTopicRequestByEventUnits, } diff --git a/telematic_system/telematic_apps/web_app/client/src/pages/TopicPage.js b/telematic_system/telematic_apps/web_app/client/src/pages/TopicPage.js index 2a92c28d..caffce0d 100644 --- a/telematic_system/telematic_apps/web_app/client/src/pages/TopicPage.js +++ b/telematic_system/telematic_apps/web_app/client/src/pages/TopicPage.js @@ -20,7 +20,7 @@ import { findAllEvents } from '../api/api-events'; import { findAllLocations } from '../api/api-locations'; import { findAllTestingTypes } from '../api/api-testing-types'; import { requestSelectedLiveUnitsTopics } from '../api/api-topics'; -import { findAllUserTopicRequestByEventUnits, upsertUserTopicRequestForEventUnits } from '../api/user_topic_request'; +import { findUsersTopicRequestByEventUnits, upsertUserTopicRequestForEventUnits } from '../api/user_topic_request'; import { VALID_UNIT_TYPES } from '../components/events/EventMetadata'; import InfrastructureTopicList from '../components/topics/InfrastructureTopicList'; import { NOTIFICATION_STATUS } from '../components/topics/TopicMetadata'; @@ -117,12 +117,11 @@ const TopicPage = React.memo(() => { event_id = item.event_id; }); - //Find all existing users's topic request before sending the current user's topic request - findAllUserTopicRequestByEventUnits(event_id, unit_identifiers).then(data => { + //Find all existing users's topic request except current user topic request before sending the current user's topic request + findUsersTopicRequestByEventUnits(event_id, unit_identifiers, authCtx.user_id).then(data => { let updatedSeletedUnitTopicListToConfirm = []; if (data !== undefined && Array.isArray(data) - && data.length > 0 && seletedUnitTopicListToConfirm !== undefined && Array.isArray(seletedUnitTopicListToConfirm)) { seletedUnitTopicListToConfirm.forEach(item => { @@ -145,7 +144,6 @@ const TopicPage = React.memo(() => { }); }); } - //Loop through existing all users' topic request data.forEach(dataItem => { //Check the same unit for current user topics request and all users' topics request @@ -187,9 +185,12 @@ const TopicPage = React.memo(() => { } }); } - + if(num_failed === 0 && num_success === 0) + { + messageList.push("Failed to send request. Please click the confirm selected topics button again.") + } //Notification - let severity = num_failed === 0 ? NOTIFICATION_STATUS.SUCCESS : (num_success === 0 ? NOTIFICATION_STATUS.ERROR : NOTIFICATION_STATUS.WARNING); + let severity = num_failed === 0 && num_success === 0 ? NOTIFICATION_STATUS.ERROR : (num_failed === 0 && num_success !== 0 ? NOTIFICATION_STATUS.SUCCESS: (num_failed !==0 && num_success===0? NOTIFICATION_STATUS.ERROR : NOTIFICATION_STATUS.WARNING)); setAlertStatus({ open: true, severity: severity, diff --git a/telematic_system/telematic_apps/web_app/server/controllers/user_topic_request.controller.js b/telematic_system/telematic_apps/web_app/server/controllers/user_topic_request.controller.js index d065e551..34c5c7e3 100644 --- a/telematic_system/telematic_apps/web_app/server/controllers/user_topic_request.controller.js +++ b/telematic_system/telematic_apps/web_app/server/controllers/user_topic_request.controller.js @@ -115,10 +115,12 @@ exports.findUserRequestByUserEventUnit = (req, res) => { exports.findAllUserRequestByEventUnit = (req, res) => { const event_id = req.query.event_id; const unit_identifiers = req.query.unit_identifiers; - var condition = []; - condition.push({ event_id: event_id }); - condition.push({ unit_identifier: unit_identifiers }); - user_topic_request.findAll({ where: condition }) + const exclude_user_id = req.query.exclude_user_id; + user_topic_request.findAll({ where: { + event_id: event_id, + unit_identifier: unit_identifiers, + updated_by :{ [Op.ne]: exclude_user_id} + } }) .then(data => { res.status(200).send(data); }).catch(err => { diff --git a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/message_converters/JSON2KeyValuePairsConverter.java b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/message_converters/JSON2KeyValuePairsConverter.java index 7fd309ba..3dca5e93 100644 --- a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/message_converters/JSON2KeyValuePairsConverter.java +++ b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/message_converters/JSON2KeyValuePairsConverter.java @@ -1,5 +1,6 @@ package com.telematic.telematic_cloud_messaging.message_converters; +import java.util.List; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; @@ -9,9 +10,10 @@ public class JSON2KeyValuePairsConverter { /** * @param json_str String JSON format consumed from NATS subject + * @param to_str_values Array of String to match in the JSON and convert the match to String data type value * @return String of key value pairs separated by commas. */ - public String convertJson2KeyValuePairs(String json_str) { + public String convertJson2KeyValuePairs(String json_str, List to_str_values) { String pairs = ""; JSONParser parser = new JSONParser(); try { @@ -26,22 +28,38 @@ public String convertJson2KeyValuePairs(String json_str) { pairs += key + "=\"NA\""; } else { - // Regex matching integers - if (value.toString().matches("[-+]?\\d*")) { - pairs += key + "=" + value; - } - // Regex matching decimals - else if (value.toString().matches("[-+]?\\d*\\.?\\d+")) { - pairs += key + "=" + value; - } - //Regex matching scientific notation. InfluxDB does not support scientific notation float syntax, temporarily set this kind of value = 0.0 - else if (value.toString().matches("^[+-]?\\d+(?:\\.\\d*(?:[eE][+-]?\\d+)?)?$")) { - pairs += key + "=" + 0.0; - } - // If none of the above Regex matches, considering it as string - else { - pairs += key + "=\"" + value.toString().replaceAll("\\s", "") + "\""; + boolean is_skip = false; + for (String value_item: to_str_values) + { + if (key.toString().strip().equals(value_item)){ + pairs += key + "=\"" + value.toString().replaceAll("\\s", "") + "\""; + is_skip = true; + } } + if(!is_skip) + { + // Regex matching integers + if (value.toString().matches("[-+]?\\d*")) { + pairs += key + "=" + value; + } + // Regex matching decimals + else if (value.toString().matches("[-+]?\\d*\\.?\\d+")) { + pairs += key + "=" + value; + } + //Regex matching scientific notation. InfluxDB does not support scientific notation float syntax, temporarily set this kind of value = 0.0 + else if (value.toString().matches("^[+-]?\\d+(?:\\.\\d*(?:[eE][+-]?\\d+)?)?$")) { + pairs += key + "=" + 0.0; + } + //Leave NaN value as float (Assuming influx treats NaN value as float type) and do not convert to string + else if (value.toString().toLowerCase().strip().equals("nan")){ + // Drop fields with NaN values + continue; + } + // If none of the above Regex matches, considering it as string + else { + pairs += key + "=\"" + value.toString().replaceAll("\\s", "") + "\""; + } + } } if (json.keySet().size() != key_count) { diff --git a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/Config.java b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/Config.java index 9d441a8f..0d2bec3f 100644 --- a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/Config.java +++ b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/Config.java @@ -1,5 +1,7 @@ package com.telematic.telematic_cloud_messaging.nats_influx_connection; +import java.util.*; + /** * The Config object instantiates a configuration object which stores information to create a connection to the telematic nats server * and influxdb bucket. @@ -61,6 +63,8 @@ public enum BucketType{ String streets_unit_id_list; // List of cloud unit ids String cloud_unit_id_list; + //List of values in the stream that should only be set to string data type + List to_str_values; public Config(){} @@ -88,7 +92,8 @@ public String ToString(){ "\nnats_topic_per_dispatcher: " + topics_per_dispatcher+ "\nvehicle_unit_id_list: " + vehicle_unit_id_list + "\nstreets_unit_id_list: " + streets_unit_id_list + - "\ncloud_unit_id_list: " + cloud_unit_id_list); + "\ncloud_unit_id_list: " + cloud_unit_id_list + + "\nto_str_values:" + to_str_values.toString()); return config_str; diff --git a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/InfluxDataWriter.java b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/InfluxDataWriter.java index f5bfd06b..a59e3375 100644 --- a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/InfluxDataWriter.java +++ b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/InfluxDataWriter.java @@ -1,18 +1,12 @@ package com.telematic.telematic_cloud_messaging.nats_influx_connection; -import java.io.*; -import java.util.Properties; + import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.Arrays; import java.util.List; import java.util.ArrayList; -import com.fasterxml.jackson.annotation.JsonValue; -import com.google.gson.JsonObject; import com.influxdb.client.*; import com.influxdb.client.InfluxDBClientOptions; -import com.influxdb.client.domain.Authorization; import com.influxdb.client.domain.WritePrecision; import com.telematic.telematic_cloud_messaging.message_converters.JSONFlattenerHelper; @@ -20,8 +14,6 @@ import com.telematic.telematic_cloud_messaging.message_converters.JSON2KeyValuePairsConverter; -import org.json.simple.JSONValue; -import org.json.simple.parser.JSONParser; import org.json.*; import org.slf4j.LoggerFactory; import org.slf4j.Logger; @@ -202,7 +194,7 @@ public String influxStringConverter(String publishData) { JSONObject payloadJson = publishDataJson.getJSONObject("payload"); String flattenedPayloadJson = jsonFlattener.flattenJsonStr(payloadJson.toString()); - String keyValuePairs = keyValueConverter.convertJson2KeyValuePairs(flattenedPayloadJson); + String keyValuePairs = keyValueConverter.convertJson2KeyValuePairs(flattenedPayloadJson, config_.to_str_values); String unit_id = publishDataJson.getString("unit_id").replaceAll("\\s", "_"); String unit_type = publishDataJson.getString("unit_type").replaceAll("\\s", "_"); diff --git a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/NatsConsumer.java b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/NatsConsumer.java index e9b67f4c..df19d0a7 100644 --- a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/NatsConsumer.java +++ b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/NatsConsumer.java @@ -1,15 +1,9 @@ package com.telematic.telematic_cloud_messaging.nats_influx_connection; import io.nats.client.*; -import java.io.*; -import java.util.Properties; import java.util.List; import java.util.ArrayList; import java.nio.charset.StandardCharsets; -import org.springframework.boot.CommandLineRunner; -import com.telematic.telematic_cloud_messaging.nats_influx_connection.InfluxDataWriter; -import com.telematic.telematic_cloud_messaging.message_converters.JSONFlattenerHelper; -import com.telematic.telematic_cloud_messaging.message_converters.JSON2KeyValuePairsConverter; import org.slf4j.LoggerFactory; import org.slf4j.Logger; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -102,7 +96,6 @@ public void nats_connect() { * the topic list variable */ public void updateAvailableTopicList() { - String error_msg = ""; for (String unit_id: unit_id_list) { try diff --git a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/NatsInfluxPush.java b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/NatsInfluxPush.java index 634494ca..bb95cff0 100644 --- a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/NatsInfluxPush.java +++ b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/nats_influx_connection/NatsInfluxPush.java @@ -5,6 +5,8 @@ import org.springframework.context.annotation.Profile; import java.io.*; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Properties; import com.telematic.telematic_cloud_messaging.nats_influx_connection.Config; import com.telematic.telematic_cloud_messaging.nats_influx_connection.InfluxDataWriter; @@ -71,6 +73,7 @@ static Config getConfigValues() { config.vehicle_unit_id_list = prop.getProperty("VEHICLE_UNIT_ID_LIST"); config.streets_unit_id_list = prop.getProperty("STREETS_UNIT_ID_LIST"); config.cloud_unit_id_list = prop.getProperty("CLOUD_UNIT_ID_LIST"); + config.to_str_values = Arrays.asList(prop.getProperty("TO_STR_VALUES").split(",")); try{ config.influx_bucket_type = BucketType.valueOf(prop.getProperty("INFLUX_BUCKET_TYPE")); diff --git a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/repository/EventRepository.java b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/repository/EventRepository.java index 1f4cd787..99f107c2 100644 --- a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/repository/EventRepository.java +++ b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/repository/EventRepository.java @@ -1,9 +1,15 @@ package com.telematic.telematic_cloud_messaging.repository; - +import org.springframework.data.jpa.repository.Modifying; +import org.springframework.data.jpa.repository.Query; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; +import org.springframework.transaction.annotation.Transactional; import com.telematic.telematic_cloud_messaging.models.Events; @Repository public interface EventRepository extends JpaRepository { + @Transactional + @Modifying + @Query("UPDATE Events e SET e.status = ''") + void resetEventStatus(); } \ No newline at end of file diff --git a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/repository/EventsService.java b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/repository/EventsService.java index 3fac48a6..22dc69d6 100644 --- a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/repository/EventsService.java +++ b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/repository/EventsService.java @@ -24,4 +24,7 @@ public void updateEventStatus(String event_status, Integer event_id) { eventRepository.save(event.get()); } } + public void resetEventStatus(){ + eventRepository.resetEventStatus(); + } } \ No newline at end of file diff --git a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/web_services/UnitsStatusService.java b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/web_services/UnitsStatusService.java index 226cc677..16b10209 100644 --- a/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/web_services/UnitsStatusService.java +++ b/telematic_system/telematic_cloud_messaging/src/main/java/com/telematic/telematic_cloud_messaging/web_services/UnitsStatusService.java @@ -151,6 +151,13 @@ public void checkUnitsStatus() throws IOException, InterruptedException { */ @Override public void run(String... args) throws Exception { + //on startup: reset all event status in case some live event status is not cleared + try { + eventsService.resetEventStatus(); + logger.info("Events status is reset!"); + } catch (Exception e) { + logger.error("Cannot reset events status! ERROR: " + e.getMessage()); + } Connection conn = natsConn.getConnection(); if (conn != null) { logger.debug("register units subscribe to subject: " + registerUnit); diff --git a/telematic_system/telematic_cloud_messaging/src/main/resources/application.properties b/telematic_system/telematic_cloud_messaging/src/main/resources/application.properties index 881e5967..f1cc9c74 100644 --- a/telematic_system/telematic_cloud_messaging/src/main/resources/application.properties +++ b/telematic_system/telematic_cloud_messaging/src/main/resources/application.properties @@ -10,13 +10,13 @@ INFLUX_PORT= INFLUX_USERNAME= INFLUX_PWD= INFLUX_BUCKET_STREETS=infrastructure-dev -INFLUX_BUCKET_ID_STREETS=30c930a3f306cfd6 +INFLUX_BUCKET_ID_STREETS= STREETS_SUBSCRIPTION_TOPIC=streets.*.data. INFLUX_BUCKET_PLATFORM=platform-dev -INFLUX_BUCKET_ID_PLATFORM=835df46d76b5e7bb +INFLUX_BUCKET_ID_PLATFORM= PLATFORM_SUBSCRIPTION_TOPIC=platform.*.data. INFLUX_BUCKET_CLOUD=infrastructure-dev -INFLUX_BUCKET_ID_CLOUD=30c930a3f306cfd6 +INFLUX_BUCKET_ID_CLOUD= CLOUD_SUBSCRIPTION_TOPIC=cloud.*.data. #The maximum number of topics that should be assigned to an individual dispatcher NUMBER_TOPICS_PER_DISPATCHER=3 @@ -24,8 +24,10 @@ VEHICLE_UNIT_ID_LIST=DOT-45244,DOT-45254,DOT-45241 STREETS_UNIT_ID_LIST=streets_id,v2xhub_id CLOUD_UNIT_ID_LIST=cloud_id INFLUX_ORG=my-org -INFLUX_ORG_ID=12bdc4164c2e8141 -INFLUX_TOKEN=iJwbytxMMQ1PpecjvVSRgbK1xUaDeZvU6DLHfXkoezqUfZfVYc8Q1nTIISceFmWvjcJA8NCPX_FMAm2Zw0Q5UA== +INFLUX_ORG_ID= +INFLUX_TOKEN= +#Edge case for hostBSMId, sender_bsm_id, core_data.id and TCR/TCM ID where the Ids can be all digits or alpha characters +TO_STR_VALUES=hostBSMId,TrafficControlRequest.reqid,tcmV01.reqid,m_header.sender_bsm_id,core_data.id # Connection timeout to influx bucket. Unit: milliseconds INFLUX_CONNECT_TIMEOUT=1000 # Timeout while writing data to influx. Unit: milliseconds diff --git a/telematic_system/telematic_cloud_messaging/src/test/java/com/telematic/telematic_cloud_messaging/message_converters/JSON2KeyValuePairsConverterTests.java b/telematic_system/telematic_cloud_messaging/src/test/java/com/telematic/telematic_cloud_messaging/message_converters/JSON2KeyValuePairsConverterTests.java index d5db7c5f..17d0aaf1 100644 --- a/telematic_system/telematic_cloud_messaging/src/test/java/com/telematic/telematic_cloud_messaging/message_converters/JSON2KeyValuePairsConverterTests.java +++ b/telematic_system/telematic_cloud_messaging/src/test/java/com/telematic/telematic_cloud_messaging/message_converters/JSON2KeyValuePairsConverterTests.java @@ -2,6 +2,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; +import java.util.Arrays; +import java.util.List; + import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @@ -15,9 +18,31 @@ public class JSON2KeyValuePairsConverterTests { @Test public void convertJson2KeyValuePairs() { + List to_str_values = Arrays.asList("hostBSMId,TrafficControlRequest.reqid,tcmV01.reqid".split(",")); String json_str = "{\"metadata.unit_name\":\"BlackPacifica\",\"metadata.event_name\":\"UC3\",\"metadata.location\":\"TFHRC\",\"metadata.unit_type\":\"Platform\",\"metadata.unit_id\":\"DOT-508\",\"metadata.testing_type\":\"Integration\",\"payload.core_data.sec_mark\":\"40328\",\"payload.core_data.heading\":\"6160\",\"payload.core_data.brakes.scs\":\"0\",\"payload.core_data.brakes.abs\":\"0\",\"payload.core_data.brakes.aux_brakes\":\"0\",\"payload.core_data.brakes.brake_boost\":\"0\",\"payload.core_data.brakes.traction\":\"0\",\"payload.core_data.accel_set.vert\":\"-127\",\"payload.core_data.accel_set.lat\":\"2\",\"payload.core_data.accel_set.long\":\"-79\",\"payload.core_data.accel_set.yaw\":\"12\",\"payload.core_data.accuracy.orientation\":\"65535\",\"payload.core_data.accuracy.semi_minor\":\"255\",\"payload.core_data.accuracy.semi_major\":\"255\",\"payload.core_data.long\":\"-77.1476267\",\"payload.core_data.speed\":\"282\",\"payload.core_data.transmission\":\"9.890579531202093e-05\",\"payload.core_data.size.length\":\"500\",\"payload.core_data.size.width\":\"200\",\"payload.core_data.elev\":\"384\",\"payload.core_data.angle\":\"127\",\"payload.core_data.id\":\"11111111\",\"payload.core_data.lat\":\"38.9549740\",\"payload.core_data.msg_count\":\"70\"}"; - String k_v_pairs = converter.convertJson2KeyValuePairs(json_str); + String k_v_pairs = converter.convertJson2KeyValuePairs(json_str, to_str_values); String expected = "metadata.unit_id=\"DOT-508\",payload.core_data.accuracy.semi_major=255,payload.core_data.brakes.abs=0,payload.core_data.transmission=0.0,payload.core_data.brakes.brake_boost=0,payload.core_data.accel_set.long=-79,payload.core_data.accuracy.semi_minor=255,metadata.unit_type=\"Platform\",payload.core_data.brakes.scs=0,payload.core_data.brakes.traction=0,payload.core_data.lat=38.9549740,payload.core_data.heading=6160,payload.core_data.accel_set.yaw=12,payload.core_data.size.length=500,payload.core_data.size.width=200,metadata.testing_type=\"Integration\",payload.core_data.speed=282,payload.core_data.angle=127,metadata.event_name=\"UC3\",payload.core_data.long=-77.1476267,payload.core_data.brakes.aux_brakes=0,payload.core_data.msg_count=70,payload.core_data.accel_set.lat=2,metadata.location=\"TFHRC\",payload.core_data.sec_mark=40328,payload.core_data.id=11111111,metadata.unit_name=\"BlackPacifica\",payload.core_data.accel_set.vert=-127,payload.core_data.elev=384,payload.core_data.accuracy.orientation=65535"; assertEquals(expected, k_v_pairs); + + String mo_json_str = "{\"unit_name\":\"WestIntersection\",\"payload.metadata.hostStaticId\":\"DOT-45244\",\"payload.metadata.targetStaticId\":\"UNSET\",\"payload.metadata.hostBSMId\":\"1234562\",\"payload.metadata.planId\":\"a95fe925-1c09-43eb-b40e-5abe7611b187\",\"payload.metadata.timestamp\":\"0000001681146252589\",\"payload.trajectory.offsets[0].offsetX\":51,\"payload.trajectory.offsets[0].offsetZ\":-18,\"payload.trajectory.offsets[0].offsetY\":-3,\"payload.trajectory.offsets[1].offsetX\":96,\"payload.trajectory.offsets[1].offsetZ\":-28,\"payload.trajectory.offsets[1].offsetY\":-1,\"payload.trajectory.offsets[2].offsetX\":96,\"payload.trajectory.offsets[2].offsetZ\":-29,\"payload.trajectory.offsets[2].offsetY\":-2,\"payload.trajectory.offsets[3].offsetX\":96,\"payload.trajectory.offsets[3].offsetZ\":-28,\"payload.trajectory.offsets[3].offsetY\":-2,\"payload.trajectory.offsets[4].offsetX\":96,\"payload.trajectory.offsets[4].offsetZ\":-28,\"payload.trajectory.offsets[4].offsetY\":-1,\"payload.trajectory.offsets[5].offsetX\":96,\"payload.trajectory.offsets[5].offsetZ\":-29,\"payload.trajectory.offsets[5].offsetY\":-2,\"payload.trajectory.offsets[6].offsetX\":96,\"payload.trajectory.offsets[6].offsetZ\":-28,\"payload.trajectory.offsets[6].offsetY\":-1,\"payload.trajectory.offsets[7].offsetX\":96,\"payload.trajectory.offsets[7].offsetZ\":-29,\"payload.trajectory.offsets[7].offsetY\":-2,\"payload.trajectory.offsets[8].offsetX\":97,\"payload.trajectory.offsets[8].offsetZ\":-28,\"payload.trajectory.offsets[8].offsetY\":-1,\"payload.trajectory.offsets[9].offsetX\":96,\"payload.trajectory.offsets[9].offsetZ\":-28,\"payload.trajectory.offsets[9].offsetY\":-2,\"payload.trajectory.offsets[10].offsetX\":96,\"payload.trajectory.offsets[10].offsetZ\":-27,\"payload.trajectory.offsets[10].offsetY\":0,\"payload.trajectory.offsets[11].offsetX\":97,\"payload.trajectory.offsets[11].offsetZ\":-27,\"payload.trajectory.offsets[11].offsetY\":-1,\"payload.trajectory.offsets[12].offsetX\":96,\"payload.trajectory.offsets[12].offsetZ\":-26,\"payload.trajectory.offsets[12].offsetY\":1,\"payload.trajectory.offsets[13].offsetX\":97,\"payload.trajectory.offsets[13].offsetZ\":-26,\"payload.trajectory.offsets[13].offsetY\":1,\"payload.trajectory.offsets[14].offsetX\":97,\"payload.trajectory.offsets[14].offsetZ\":-25,\"payload.trajectory.offsets[14].offsetY\":1,\"payload.trajectory.offsets[15].offsetX\":98,\"payload.trajectory.offsets[15].offsetZ\":-24,\"payload.trajectory.offsets[15].offsetY\":2,\"payload.trajectory.offsets[16].offsetX\":97,\"payload.trajectory.offsets[16].offsetZ\":-24,\"payload.trajectory.offsets[16].offsetY\":3,\"payload.trajectory.offsets[17].offsetX\":97,\"payload.trajectory.offsets[17].offsetZ\":-23,\"payload.trajectory.offsets[17].offsetY\":3,\"payload.trajectory.offsets[18].offsetX\":98,\"payload.trajectory.offsets[18].offsetZ\":-22,\"payload.trajectory.offsets[18].offsetY\":3,\"payload.trajectory.offsets[19].offsetX\":98,\"payload.trajectory.offsets[19].offsetZ\":-22,\"payload.trajectory.offsets[19].offsetY\":5,\"payload.trajectory.offsets[20].offsetX\":97,\"payload.trajectory.offsets[20].offsetZ\":-21,\"payload.trajectory.offsets[20].offsetY\":4,\"payload.trajectory.offsets[21].offsetX\":77,\"payload.trajectory.offsets[21].offsetZ\":-16,\"payload.trajectory.offsets[21].offsetY\":4,\"payload.trajectory.offsets[22].offsetX\":100,\"payload.trajectory.offsets[22].offsetZ\":-21,\"payload.trajectory.offsets[22].offsetY\":6,\"payload.trajectory.offsets[23].offsetX\":100,\"payload.trajectory.offsets[23].offsetZ\":-20,\"payload.trajectory.offsets[23].offsetY\":7,\"payload.trajectory.offsets[24].offsetX\":100,\"payload.trajectory.offsets[24].offsetZ\":-19,\"payload.trajectory.offsets[24].offsetY\":6,\"payload.trajectory.offsets[25].offsetX\":100,\"payload.trajectory.offsets[25].offsetZ\":-18,\"payload.trajectory.offsets[25].offsetY\":8,\"payload.trajectory.offsets[26].offsetX\":100,\"payload.trajectory.offsets[26].offsetZ\":-18,\"payload.trajectory.offsets[26].offsetY\":8,\"payload.trajectory.offsets[27].offsetX\":100,\"payload.trajectory.offsets[27].offsetZ\":-17,\"payload.trajectory.offsets[27].offsetY\":9,\"payload.trajectory.offsets[28].offsetX\":100,\"payload.trajectory.offsets[28].offsetZ\":-16,\"payload.trajectory.offsets[28].offsetY\":9,\"payload.trajectory.offsets[29].offsetX\":101,\"payload.trajectory.offsets[29].offsetZ\":-16,\"payload.trajectory.offsets[29].offsetY\":10,\"payload.trajectory.offsets[30].offsetX\":100,\"payload.trajectory.offsets[30].offsetZ\":-15,\"payload.trajectory.offsets[30].offsetY\":10,\"payload.trajectory.offsets[31].offsetX\":100,\"payload.trajectory.offsets[31].offsetZ\":-16,\"payload.trajectory.offsets[31].offsetY\":10,\"payload.trajectory.offsets[32].offsetX\":101,\"payload.trajectory.offsets[32].offsetZ\":-14,\"payload.trajectory.offsets[32].offsetY\":11,\"payload.trajectory.offsets[33].offsetX\":100,\"payload.trajectory.offsets[33].offsetZ\":-15,\"payload.trajectory.offsets[33].offsetY\":11,\"payload.trajectory.offsets[34].offsetX\":100,\"payload.trajectory.offsets[34].offsetZ\":-15,\"payload.trajectory.offsets[34].offsetY\":11,\"payload.trajectory.offsets[35].offsetX\":101,\"payload.trajectory.offsets[35].offsetZ\":-14,\"payload.trajectory.offsets[35].offsetY\":10,\"payload.trajectory.offsets[36].offsetX\":100,\"payload.trajectory.offsets[36].offsetZ\":-14,\"payload.trajectory.offsets[36].offsetY\":11,\"payload.trajectory.offsets[37].offsetX\":100,\"payload.trajectory.offsets[37].offsetZ\":-15,\"payload.trajectory.offsets[37].offsetY\":12,\"payload.trajectory.offsets[38].offsetX\":101,\"payload.trajectory.offsets[38].offsetZ\":-14,\"payload.trajectory.offsets[38].offsetY\":11,\"payload.trajectory.offsets[39].offsetX\":100,\"payload.trajectory.offsets[39].offsetZ\":-14,\"payload.trajectory.offsets[39].offsetY\":11,\"payload.trajectory.offsets[40].offsetX\":100,\"payload.trajectory.offsets[40].offsetZ\":-14,\"payload.trajectory.offsets[40].offsetY\":12,\"payload.trajectory.offsets[41].offsetX\":101,\"payload.trajectory.offsets[41].offsetZ\":-13,\"payload.trajectory.offsets[41].offsetY\":11,\"payload.trajectory.offsets[42].offsetX\":100,\"payload.trajectory.offsets[42].offsetZ\":-14,\"payload.trajectory.offsets[42].offsetY\":12,\"payload.trajectory.offsets[43].offsetX\":101,\"payload.trajectory.offsets[43].offsetZ\":-13,\"payload.trajectory.offsets[43].offsetY\":11,\"payload.trajectory.location.ecefX\":110457775,\"payload.trajectory.location.ecefZ\":398843563,\"payload.trajectory.location.ecefY\":-484207718,\"payload.trajectory.location.timestamp\":\"0000000000000000000\",\"topic_name\":\"v2xhub_mobility_path_in\",\"msg_type\":\"v2xhub_mobility_path_in\",\"event_name\":\"WFD_Verification_2\",\"location\":\"TFHRC\",\"unit_type\":\"infrastructure\",\"unit_id\":\"streets_id\",\"testing_type\":\"Verification\",\"timestamp\":1681146252589000}"; + String mo_k_v_pairs = converter.convertJson2KeyValuePairs(mo_json_str, to_str_values); + expected = "payload.metadata.hostStaticId=\"DOT-45244\",payload.trajectory.offsets[41].offsetZ=-13,payload.trajectory.offsets[41].offsetY=11,payload.trajectory.offsets[41].offsetX=101,payload.trajectory.offsets[32].offsetX=101,payload.trajectory.offsets[32].offsetZ=-14,payload.trajectory.offsets[32].offsetY=11,payload.trajectory.offsets[3].offsetZ=-28,payload.trajectory.offsets[3].offsetY=-2,payload.trajectory.offsets[29].offsetX=101,payload.trajectory.offsets[29].offsetZ=-16,payload.trajectory.offsets[29].offsetY=10,payload.trajectory.offsets[3].offsetX=96,payload.trajectory.offsets[38].offsetX=101,payload.trajectory.offsets[8].offsetY=-1,payload.trajectory.offsets[38].offsetY=11,payload.trajectory.offsets[8].offsetX=97,payload.trajectory.offsets[38].offsetZ=-14,payload.trajectory.offsets[31].offsetZ=-16,payload.trajectory.offsets[37].offsetY=12,payload.trajectory.offsets[8].offsetZ=-28,payload.trajectory.offsets[37].offsetX=100,payload.trajectory.offsets[31].offsetX=100,payload.trajectory.offsets[31].offsetY=10,payload.trajectory.offsets[2].offsetX=96,payload.trajectory.offsets[2].offsetY=-2,payload.trajectory.offsets[13].offsetZ=-26,payload.trajectory.offsets[2].offsetZ=-29,payload.trajectory.offsets[13].offsetY=1,payload.trajectory.offsets[14].offsetX=97,payload.trajectory.offsets[13].offsetX=97,payload.trajectory.offsets[14].offsetZ=-25,payload.trajectory.offsets[14].offsetY=1,payload.trajectory.offsets[19].offsetZ=-22,payload.trajectory.offsets[37].offsetZ=-15,payload.metadata.planId=\"a95fe925-1c09-43eb-b40e-5abe7611b187\",payload.trajectory.offsets[19].offsetX=98,payload.trajectory.offsets[19].offsetY=5,testing_type=\"Verification\",payload.trajectory.offsets[18].offsetX=98,payload.trajectory.offsets[1].offsetZ=-28,payload.trajectory.offsets[7].offsetX=96,payload.trajectory.offsets[1].offsetY=-1,payload.trajectory.offsets[1].offsetX=96,payload.trajectory.offsets[18].offsetZ=-22,payload.trajectory.offsets[18].offsetY=3,payload.trajectory.offsets[27].offsetY=9,payload.trajectory.offsets[27].offsetZ=-17,payload.trajectory.offsets[27].offsetX=100,payload.trajectory.offsets[33].offsetZ=-15,payload.trajectory.offsets[33].offsetY=11,payload.trajectory.offsets[33].offsetX=100,payload.trajectory.offsets[24].offsetZ=-19,payload.trajectory.offsets[24].offsetX=100,payload.trajectory.offsets[24].offsetY=6,payload.trajectory.offsets[30].offsetX=100,unit_id=\"streets_id\",payload.trajectory.offsets[30].offsetZ=-15,payload.trajectory.offsets[30].offsetY=10,timestamp=1681146252589000,payload.trajectory.offsets[22].offsetZ=-21,payload.trajectory.offsets[22].offsetY=6,payload.trajectory.offsets[22].offsetX=100,payload.metadata.timestamp=0000001681146252589,payload.trajectory.offsets[9].offsetY=-2,payload.trajectory.offsets[9].offsetZ=-28,payload.trajectory.offsets[9].offsetX=96,payload.trajectory.offsets[16].offsetX=97,payload.trajectory.offsets[16].offsetY=3,payload.metadata.hostBSMId=\"1234562\",payload.trajectory.offsets[16].offsetZ=-24,payload.trajectory.offsets[11].offsetY=-1,payload.trajectory.offsets[4].offsetZ=-28,payload.trajectory.offsets[11].offsetX=97,payload.trajectory.offsets[4].offsetY=-1,payload.trajectory.offsets[4].offsetX=96,payload.trajectory.offsets[11].offsetZ=-27,payload.trajectory.offsets[35].offsetX=101,payload.trajectory.offsets[35].offsetY=10,payload.trajectory.offsets[35].offsetZ=-14,payload.trajectory.offsets[26].offsetZ=-18,payload.trajectory.offsets[6].offsetZ=-28,payload.trajectory.offsets[0].offsetZ=-18,payload.metadata.targetStaticId=\"UNSET\",payload.trajectory.offsets[6].offsetX=96,payload.trajectory.offsets[26].offsetY=8,payload.trajectory.offsets[6].offsetY=-1,payload.trajectory.offsets[26].offsetX=100,payload.trajectory.location.timestamp=0000000000000000000,payload.trajectory.offsets[0].offsetY=-3,payload.trajectory.offsets[0].offsetX=51,payload.trajectory.offsets[25].offsetZ=-18,payload.trajectory.offsets[25].offsetY=8,unit_name=\"WestIntersection\",payload.trajectory.offsets[20].offsetX=97,payload.trajectory.offsets[20].offsetY=4,payload.trajectory.offsets[20].offsetZ=-21,payload.trajectory.offsets[43].offsetX=101,msg_type=\"v2xhub_mobility_path_in\",event_name=\"WFD_Verification_2\",payload.trajectory.offsets[7].offsetZ=-29,payload.trajectory.offsets[7].offsetY=-2,payload.trajectory.offsets[25].offsetX=100,payload.trajectory.offsets[43].offsetY=11,payload.trajectory.offsets[43].offsetZ=-13,payload.trajectory.offsets[12].offsetY=1,payload.trajectory.offsets[15].offsetX=98,payload.trajectory.offsets[12].offsetZ=-26,payload.trajectory.offsets[15].offsetY=2,payload.trajectory.offsets[12].offsetX=96,payload.trajectory.location.ecefX=110457775,payload.trajectory.location.ecefY=-484207718,payload.trajectory.location.ecefZ=398843563,unit_type=\"infrastructure\",payload.trajectory.offsets[36].offsetX=100,payload.trajectory.offsets[36].offsetZ=-14,payload.trajectory.offsets[36].offsetY=11,payload.trajectory.offsets[42].offsetX=100,payload.trajectory.offsets[42].offsetY=12,payload.trajectory.offsets[42].offsetZ=-14,payload.trajectory.offsets[21].offsetZ=-16,topic_name=\"v2xhub_mobility_path_in\",payload.trajectory.offsets[21].offsetY=4,payload.trajectory.offsets[15].offsetZ=-24,payload.trajectory.offsets[21].offsetX=77,payload.trajectory.offsets[28].offsetZ=-16,payload.trajectory.offsets[28].offsetY=9,payload.trajectory.offsets[28].offsetX=100,payload.trajectory.offsets[5].offsetX=96,payload.trajectory.offsets[5].offsetY=-2,payload.trajectory.offsets[10].offsetZ=-27,payload.trajectory.offsets[5].offsetZ=-29,payload.trajectory.offsets[10].offsetY=0,payload.trajectory.offsets[10].offsetX=96,payload.trajectory.offsets[17].offsetY=3,payload.trajectory.offsets[17].offsetX=97,payload.trajectory.offsets[17].offsetZ=-23,payload.trajectory.offsets[39].offsetZ=-14,payload.trajectory.offsets[23].offsetX=100,payload.trajectory.offsets[23].offsetY=7,payload.trajectory.offsets[39].offsetX=100,payload.trajectory.offsets[23].offsetZ=-20,payload.trajectory.offsets[39].offsetY=11,payload.trajectory.offsets[40].offsetZ=-14,payload.trajectory.offsets[40].offsetY=12,location=\"TFHRC\",payload.trajectory.offsets[40].offsetX=100,payload.trajectory.offsets[34].offsetY=11,payload.trajectory.offsets[34].offsetZ=-15,payload.trajectory.offsets[34].offsetX=100"; + assertEquals(expected, mo_k_v_pairs); + + String tcr_json_str = "{\"unit_name\":\"DevCC\",\"log_timestamp\":1681309089425000,\"payload.TrafficControlRequest.@list\":\"true\",\"payload.TrafficControlRequest.bounds.oldest\":\"28000218\",\"payload.TrafficControlRequest.bounds.offsets[0].deltax\":\"3232\",\"payload.TrafficControlRequest.bounds.offsets[0].deltay\":\"0\",\"payload.TrafficControlRequest.bounds.offsets[1].deltax\":\"3232\",\"payload.TrafficControlRequest.bounds.offsets[1].deltay\":\"1577\",\"payload.TrafficControlRequest.bounds.offsets[2].deltax\":\"0\",\"payload.TrafficControlRequest.bounds.offsets[2].deltay\":\"1577\",\"payload.TrafficControlRequest.bounds.reflat\":\"389543898\",\"payload.TrafficControlRequest.bounds.reflon\":\"-771510185\",\"payload.TrafficControlRequest.scale\":\"-1\",\"payload.TrafficControlRequest.@port\":\"22222\",\"payload.TrafficControlRequest.reqseq\":\"0\",\"payload.TrafficControlRequest.reqid\":\"4947918446524149\",\"topic_name\":\"TCR\",\"msg_type\":\"TCR\",\"event_name\":\"WFD_Verification_2\",\"location\":\"TFHRC\",\"unit_type\":\"infrastructure\",\"unit_id\":\"cloud_id\",\"testing_type\":\"Verification\",\"timestamp\":1681309089575201}"; + String tcr_k_v_pairs = converter.convertJson2KeyValuePairs(tcr_json_str, to_str_values); + expected = "log_timestamp=1681309089425000,payload.TrafficControlRequest.reqseq=0,payload.TrafficControlRequest.bounds.oldest=28000218,payload.TrafficControlRequest.bounds.reflon=-771510185,unit_type=\"infrastructure\",payload.TrafficControlRequest.@list=\"true\",unit_name=\"DevCC\",payload.TrafficControlRequest.scale=-1,payload.TrafficControlRequest.bounds.offsets[2].deltay=1577,payload.TrafficControlRequest.bounds.offsets[2].deltax=0,payload.TrafficControlRequest.@port=22222,payload.TrafficControlRequest.bounds.offsets[1].deltax=3232,payload.TrafficControlRequest.bounds.offsets[1].deltay=1577,payload.TrafficControlRequest.reqid=\"4947918446524149\",payload.TrafficControlRequest.bounds.offsets[0].deltax=3232,topic_name=\"TCR\",payload.TrafficControlRequest.bounds.offsets[0].deltay=0,msg_type=\"TCR\",event_name=\"WFD_Verification_2\",location=\"TFHRC\",payload.TrafficControlRequest.bounds.reflat=389543898,unit_id=\"cloud_id\",testing_type=\"Verification\",timestamp=1681309089575201"; + assertEquals(expected, tcr_k_v_pairs); + + String tcm_json_str = "{\"payload.TrafficControlMessage.tcmV01.msgnum\":\"1\",\"payload.TrafficControlMessage.tcmV01.geometry.datum\":\"WGS84\",\"payload.TrafficControlMessage.tcmV01.geometry.reftime\":\"28004757\",\"payload.TrafficControlMessage.tcmV01.geometry.refwidth\":\"405\",\"payload.TrafficControlMessage.tcmV01.geometry.heading\":\"3312\",\"payload.TrafficControlMessage.tcmV01.geometry.proj\":\"epsg:3785\",\"payload.TrafficControlMessage.tcmV01.geometry.reflat\":\"389549140\",\"payload.TrafficControlMessage.tcmV01.geometry.refelv\":\"0\",\"payload.TrafficControlMessage.tcmV01.geometry.reflon\":\"-771490031\",\"payload.TrafficControlMessage.tcmV01.id\":\"007d1d1c5ea3f134ab2e9d868a033372\",\"payload.TrafficControlMessage.tcmV01.params.schedule.start\":\"28004757\",\"payload.TrafficControlMessage.tcmV01.params.schedule.end\":\"153722867280912\",\"payload.TrafficControlMessage.tcmV01.params.schedule.dow\":\"1111111\",\"payload.TrafficControlMessage.tcmV01.params.regulatory.true\":\"None\",\"payload.TrafficControlMessage.tcmV01.params.vclasses.bus\":\"None\",\"payload.TrafficControlMessage.tcmV01.params.vclasses.four-or-fewer-axle-single-trailer-truck\":\"None\",\"payload.TrafficControlMessage.tcmV01.params.vclasses.two-axle-six-tire-single-unit-truck\":\"None\",\"payload.TrafficControlMessage.tcmV01.params.vclasses.five-or-fewer-axle-multi-trailer-truck\":\"None\",\"payload.TrafficControlMessage.tcmV01.params.vclasses.five-axle-single-trailer-truck\":\"None\",\"payload.TrafficControlMessage.tcmV01.params.vclasses.six-or-more-axle-single-trailer-truck\":\"None\",\"payload.TrafficControlMessage.tcmV01.params.vclasses.motorcycle\":\"None\",\"payload.TrafficControlMessage.tcmV01.params.vclasses.passenger-car\":\"None\",\"payload.TrafficControlMessage.tcmV01.params.vclasses.six-axle-multi-trailer-truck\":\"None\",\"payload.TrafficControlMessage.tcmV01.params.vclasses.seven-or-more-axle-multi-trailer-truck\":\"None\",\"payload.TrafficControlMessage.tcmV01.params.vclasses.micromobile\":\"None\",\"payload.TrafficControlMessage.tcmV01.params.vclasses.light-truck-van\":\"None\",\"payload.TrafficControlMessage.tcmV01.params.vclasses.three-axle-single-unit-truck\":\"None\",\"payload.TrafficControlMessage.tcmV01.params.vclasses.four-or-more-axle-single-unit-truck\":\"None\",\"payload.TrafficControlMessage.tcmV01.params.detail.maxspeed\":\"45\",\"payload.TrafficControlMessage.tcmV01.reqseq\":\"0\",\"payload.TrafficControlMessage.tcmV01.msgtot\":\"1\",\"payload.TrafficControlMessage.tcmV01.updated\":\"0\",\"payload.TrafficControlMessage.tcmV01.reqid\":\"4947918446524142\"}"; + String tcm_k_v_pairs = converter.convertJson2KeyValuePairs(tcm_json_str, to_str_values); + expected = "payload.TrafficControlMessage.tcmV01.id=\"007d1d1c5ea3f134ab2e9d868a033372\",payload.TrafficControlMessage.tcmV01.params.vclasses.seven-or-more-axle-multi-trailer-truck=\"None\",payload.TrafficControlMessage.tcmV01.reqid=\"4947918446524142\",payload.TrafficControlMessage.tcmV01.geometry.reflat=389549140,payload.TrafficControlMessage.tcmV01.params.vclasses.light-truck-van=\"None\",payload.TrafficControlMessage.tcmV01.params.vclasses.four-or-more-axle-single-unit-truck=\"None\",payload.TrafficControlMessage.tcmV01.geometry.refwidth=405,payload.TrafficControlMessage.tcmV01.params.vclasses.bus=\"None\",payload.TrafficControlMessage.tcmV01.geometry.heading=3312,payload.TrafficControlMessage.tcmV01.params.regulatory.true=\"None\",payload.TrafficControlMessage.tcmV01.params.vclasses.four-or-fewer-axle-single-trailer-truck=\"None\",payload.TrafficControlMessage.tcmV01.params.vclasses.motorcycle=\"None\",payload.TrafficControlMessage.tcmV01.geometry.reftime=28004757,payload.TrafficControlMessage.tcmV01.params.vclasses.six-axle-multi-trailer-truck=\"None\",payload.TrafficControlMessage.tcmV01.params.vclasses.three-axle-single-unit-truck=\"None\",payload.TrafficControlMessage.tcmV01.params.vclasses.passenger-car=\"None\",payload.TrafficControlMessage.tcmV01.geometry.reflon=-771490031,payload.TrafficControlMessage.tcmV01.msgtot=1,payload.TrafficControlMessage.tcmV01.params.detail.maxspeed=45,payload.TrafficControlMessage.tcmV01.params.schedule.start=28004757,payload.TrafficControlMessage.tcmV01.params.schedule.dow=1111111,payload.TrafficControlMessage.tcmV01.geometry.datum=\"WGS84\",payload.TrafficControlMessage.tcmV01.params.vclasses.two-axle-six-tire-single-unit-truck=\"None\",payload.TrafficControlMessage.tcmV01.msgnum=1,payload.TrafficControlMessage.tcmV01.reqseq=0,payload.TrafficControlMessage.tcmV01.params.schedule.end=153722867280912,payload.TrafficControlMessage.tcmV01.params.vclasses.micromobile=\"None\",payload.TrafficControlMessage.tcmV01.params.vclasses.six-or-more-axle-single-trailer-truck=\"None\",payload.TrafficControlMessage.tcmV01.updated=0,payload.TrafficControlMessage.tcmV01.params.vclasses.five-or-fewer-axle-multi-trailer-truck=\"None\",payload.TrafficControlMessage.tcmV01.params.vclasses.five-axle-single-trailer-truck=\"None\",payload.TrafficControlMessage.tcmV01.geometry.proj=\"epsg:3785\",payload.TrafficControlMessage.tcmV01.geometry.refelv=0"; + assertEquals(expected, tcm_k_v_pairs); + + //Message that has NaN value + String nan_json_str = "{\"metadata.unit_name\":\"BlackPacifica\",\"metadata.event_name\":\"UC3\",\"metadata.location\":\"TFHRC\",\"metadata.unit_type\":\"Platform\",\"metadata.unit_id\":\"DOT-508\",\"metadata.testing_type\":\"Integration\",\"payload.core_data.sec_mark\":\"40328\",\"payload.core_data.heading\":\"6160\",\"payload.core_data.brakes.scs\":\"0\",\"payload.core_data.brakes.abs\":\"NaN\",\"payload.core_data.brakes.aux_brakes\":\"0\",\"payload.core_data.brakes.brake_boost\":\"0\",\"payload.core_data.brakes.traction\":\"0\",\"payload.core_data.accel_set.vert\":\"-127\",\"payload.core_data.accel_set.lat\":\"2\",\"payload.core_data.accel_set.long\":\"-79\",\"payload.core_data.accel_set.yaw\":\"12\",\"payload.core_data.accuracy.orientation\":\"65535\",\"payload.core_data.accuracy.semi_minor\":\"255\",\"payload.core_data.accuracy.semi_major\":\"255\",\"payload.core_data.long\":\"-77.1476267\",\"payload.core_data.speed\":\"282\",\"payload.core_data.transmission\":\"9.890579531202093e-05\",\"payload.core_data.size.length\":\"500\",\"payload.core_data.size.width\":\"200\",\"payload.core_data.elev\":\"384\",\"payload.core_data.angle\":\"127\",\"payload.core_data.id\":\"11111111\",\"payload.core_data.lat\":\"38.9549740\",\"payload.core_data.msg_count\":\"70\"}"; + String nan_k_v_pairs = converter.convertJson2KeyValuePairs(nan_json_str, to_str_values); + expected = "metadata.unit_id=\"DOT-508\",payload.core_data.accuracy.semi_major=255,payload.core_data.brakes.abs=NaN,payload.core_data.transmission=0.0,payload.core_data.brakes.brake_boost=0,payload.core_data.accel_set.long=-79,payload.core_data.accuracy.semi_minor=255,metadata.unit_type=\"Platform\",payload.core_data.brakes.scs=0,payload.core_data.brakes.traction=0,payload.core_data.lat=38.9549740,payload.core_data.heading=6160,payload.core_data.accel_set.yaw=12,payload.core_data.size.length=500,payload.core_data.size.width=200,metadata.testing_type=\"Integration\",payload.core_data.speed=282,payload.core_data.angle=127,metadata.event_name=\"UC3\",payload.core_data.long=-77.1476267,payload.core_data.brakes.aux_brakes=0,payload.core_data.msg_count=70,payload.core_data.accel_set.lat=2,metadata.location=\"TFHRC\",payload.core_data.sec_mark=40328,payload.core_data.id=11111111,metadata.unit_name=\"BlackPacifica\",payload.core_data.accel_set.vert=-127,payload.core_data.elev=384,payload.core_data.accuracy.orientation=65535"; + assertEquals(expected, nan_k_v_pairs); } } diff --git a/telematic_system/telematic_cloud_messaging/src/test/java/com/telematic/telematic_cloud_messaging/message_converters/JSONFlattenerHelperTests.java b/telematic_system/telematic_cloud_messaging/src/test/java/com/telematic/telematic_cloud_messaging/message_converters/JSONFlattenerHelperTests.java index 44e10307..e450794c 100644 --- a/telematic_system/telematic_cloud_messaging/src/test/java/com/telematic/telematic_cloud_messaging/message_converters/JSONFlattenerHelperTests.java +++ b/telematic_system/telematic_cloud_messaging/src/test/java/com/telematic/telematic_cloud_messaging/message_converters/JSONFlattenerHelperTests.java @@ -34,5 +34,44 @@ public void flattenJson() { } catch (ParseException e) { e.printStackTrace(); } + + String mo_json_str = "{\"payload\":{\"metadata\":{\"hostBSMId\":\"bb906b4e\",\"hostStaticId\":\"DOT-45244\",\"planId\":\"a95fe925-1c09-43eb-b40e-5abe7611b187\",\"targetStaticId\":\"UNSET\",\"timestamp\":\"0000001681146252589\"},\"trajectory\":{\"location\":{\"ecefX\":110457775,\"ecefY\":-484207718,\"ecefZ\":398843563,\"timestamp\":\"0000000000000000000\"},\"offsets\":[{\"offsetX\":51,\"offsetY\":-3,\"offsetZ\":-18},{\"offsetX\":96,\"offsetY\":-1,\"offsetZ\":-28},{\"offsetX\":96,\"offsetY\":-2,\"offsetZ\":-29},{\"offsetX\":96,\"offsetY\":-2,\"offsetZ\":-28},{\"offsetX\":96,\"offsetY\":-1,\"offsetZ\":-28},{\"offsetX\":96,\"offsetY\":-2,\"offsetZ\":-29},{\"offsetX\":96,\"offsetY\":-1,\"offsetZ\":-28},{\"offsetX\":96,\"offsetY\":-2,\"offsetZ\":-29},{\"offsetX\":97,\"offsetY\":-1,\"offsetZ\":-28},{\"offsetX\":96,\"offsetY\":-2,\"offsetZ\":-28},{\"offsetX\":96,\"offsetY\":0,\"offsetZ\":-27},{\"offsetX\":97,\"offsetY\":-1,\"offsetZ\":-27},{\"offsetX\":96,\"offsetY\":1,\"offsetZ\":-26},{\"offsetX\":97,\"offsetY\":1,\"offsetZ\":-26},{\"offsetX\":97,\"offsetY\":1,\"offsetZ\":-25},{\"offsetX\":98,\"offsetY\":2,\"offsetZ\":-24},{\"offsetX\":97,\"offsetY\":3,\"offsetZ\":-24},{\"offsetX\":97,\"offsetY\":3,\"offsetZ\":-23},{\"offsetX\":98,\"offsetY\":3,\"offsetZ\":-22},{\"offsetX\":98,\"offsetY\":5,\"offsetZ\":-22},{\"offsetX\":97,\"offsetY\":4,\"offsetZ\":-21},{\"offsetX\":77,\"offsetY\":4,\"offsetZ\":-16},{\"offsetX\":100,\"offsetY\":6,\"offsetZ\":-21},{\"offsetX\":100,\"offsetY\":7,\"offsetZ\":-20},{\"offsetX\":100,\"offsetY\":6,\"offsetZ\":-19},{\"offsetX\":100,\"offsetY\":8,\"offsetZ\":-18},{\"offsetX\":100,\"offsetY\":8,\"offsetZ\":-18},{\"offsetX\":100,\"offsetY\":9,\"offsetZ\":-17},{\"offsetX\":100,\"offsetY\":9,\"offsetZ\":-16},{\"offsetX\":101,\"offsetY\":10,\"offsetZ\":-16},{\"offsetX\":100,\"offsetY\":10,\"offsetZ\":-15},{\"offsetX\":100,\"offsetY\":10,\"offsetZ\":-16},{\"offsetX\":101,\"offsetY\":11,\"offsetZ\":-14},{\"offsetX\":100,\"offsetY\":11,\"offsetZ\":-15},{\"offsetX\":100,\"offsetY\":11,\"offsetZ\":-15},{\"offsetX\":101,\"offsetY\":10,\"offsetZ\":-14},{\"offsetX\":100,\"offsetY\":11,\"offsetZ\":-14},{\"offsetX\":100,\"offsetY\":12,\"offsetZ\":-15},{\"offsetX\":101,\"offsetY\":11,\"offsetZ\":-14},{\"offsetX\":100,\"offsetY\":11,\"offsetZ\":-14},{\"offsetX\":100,\"offsetY\":12,\"offsetZ\":-14},{\"offsetX\":101,\"offsetY\":11,\"offsetZ\":-13},{\"offsetX\":100,\"offsetY\":12,\"offsetZ\":-14},{\"offsetX\":101,\"offsetY\":11,\"offsetZ\":-13}]}},\"unit_id\":\"streets_id\",\"unit_type\":\"infrastructure\",\"unit_name\":\"WestIntersection\",\"msg_type\":\"v2xhub_mobility_path_in\",\"event_name\":\"WFD_Verification_2\",\"testing_type\":\"Verification\",\"location\":\"TFHRC\",\"topic_name\":\"v2xhub_mobility_path_in\",\"timestamp\":1681146252589000}"; + String flattened_mo_json_str = helper.flattenJsonStr(mo_json_str); + try { + JSONObject flattened_json = (JSONObject) parser.parse(flattened_mo_json_str); + assertEquals("bb906b4e", flattened_json.get("payload.metadata.hostBSMId")); + } catch (ParseException e) { + e.printStackTrace(); + } + + String tcr_json_str = "{\"payload\":{\"TrafficControlRequest\":{\"@port\":\"22222\",\"@list\":\"true\",\"reqid\":\"4947918446524149\",\"reqseq\":\"0\",\"scale\":\"-1\",\"bounds\":{\"oldest\":\"28000218\",\"reflon\":\"-771510185\",\"reflat\":\"389543898\",\"offsets\":[{\"deltax\":\"3232\",\"deltay\":\"0\"},{\"deltax\":\"3232\",\"deltay\":\"1577\"},{\"deltax\":\"0\",\"deltay\":\"1577\"}]}}},\"unit_id\":\"cloud_id\",\"unit_type\":\"infrastructure\",\"unit_name\":\"DevCC\",\"msg_type\":\"TCR\",\"event_name\":\"WFD_Verification_2\",\"testing_type\":\"Verification\",\"location\":\"TFHRC\",\"topic_name\":\"TCR\",\"timestamp\":1681309089575201.0,\"log_timestamp\":1681309089425000.0}"; + String flattened_tcr_json_str = helper.flattenJsonStr(tcr_json_str); + try { + JSONObject flattened_json = (JSONObject) parser.parse(flattened_tcr_json_str); + assertEquals("4947918446524149", flattened_json.get("payload.TrafficControlRequest.reqid")); + } catch (ParseException e) { + e.printStackTrace(); + } + + String tcm_json_str ="{\"payload\":{\"TrafficControlMessage\":{\"tcmV01\":{\"reqid\":\"4947918446524142\",\"reqseq\":\"0\",\"msgtot\":\"1\",\"msgnum\":\"1\",\"id\":\"007d1d1c5ea3f134ab2e9d868a033372\",\"updated\":\"0\",\"params\":{\"vclasses\":{\"micromobile\":\"None\",\"motorcycle\":\"None\",\"passenger-car\":\"None\",\"light-truck-van\":\"None\",\"bus\":\"None\",\"two-axle-six-tire-single-unit-truck\":\"None\",\"three-axle-single-unit-truck\":\"None\",\"four-or-more-axle-single-unit-truck\":\"None\",\"four-or-fewer-axle-single-trailer-truck\":\"None\",\"five-axle-single-trailer-truck\":\"None\",\"six-or-more-axle-single-trailer-truck\":\"None\",\"five-or-fewer-axle-multi-trailer-truck\":\"None\",\"six-axle-multi-trailer-truck\":\"None\",\"seven-or-more-axle-multi-trailer-truck\":\"None\"},\"schedule\":{\"start\":\"28004757\",\"end\":\"153722867280912\",\"dow\":\"1111111\"},\"regulatory\":{\"true\":\"None\"},\"detail\":{\"maxspeed\":\"45\"}},\"geometry\":{\"proj\":\"epsg:3785\",\"datum\":\"WGS84\",\"reftime\":\"28004757\",\"reflon\":\"-771490031\",\"reflat\":\"389549140\",\"refelv\":\"0\",\"refwidth\":\"405\",\"heading\":\"3312\"}}}}}"; + String flattened_tcm_json_str = helper.flattenJsonStr(tcm_json_str); + System.out.println(flattened_tcm_json_str); + try { + JSONObject flattened_json = (JSONObject) parser.parse(flattened_tcm_json_str); + assertEquals("4947918446524142", flattened_json.get("payload.TrafficControlMessage.tcmV01.reqid")); + } catch (ParseException e) { + e.printStackTrace(); + } + + String nan_json_metadata_str = "{\"unit_name\":\"BlackPacifica\",\"event_name\":\"UC3\",\"location\":\"TFHRC\",\"unit_type\":\"Platform\",\"unit_id\":\"DOT-508\",\"testing_type\":\"Integration\"}"; + String nan_BSM_json_payload_str = "{\"core_data\":{\"accel_set\":{\"lat\":\"2\",\"long\":\"-79\",\"vert\":\"-127\",\"yaw\":\"12\"},\"accuracy\":{\"orientation\":\"65535\",\"semi_major\":\"255\",\"semi_minor\":\"255\"},\"angle\":\"127\",\"brakes\":{\"abs\":\"NaN\",\"aux_brakes\":\"0\",\"brake_boost\":\"0\",\"scs\":\"0\",\"traction\":\"0\"},\"elev\":\"384\",\"heading\":\"6160\",\"id\":\"11111111\",\"lat\":\"38.9549740\",\"long\":\"-77.1476267\",\"msg_count\":\"70\",\"sec_mark\":\"40328\",\"size\":{\"length\":\"500\",\"width\":\"200\"},\"speed\":\"282\",\"transmission\":\"0\"}}"; + String nan_json_str = "{ \"metadata\":" + nan_json_metadata_str + ",\"payload\":" + nan_BSM_json_payload_str + "}"; + String nan_flattened_json_str = helper.flattenJsonStr(nan_json_str); + try { + JSONObject flattened_json = (JSONObject) parser.parse(nan_flattened_json_str); + assertEquals("NaN", flattened_json.get("payload.core_data.brakes.abs")); + } catch (ParseException e) { + e.printStackTrace(); + } } } diff --git a/telematic_system/telematic_units/carma_cloud_bridge/cloud_nats_bridge/src/cloud_nats_bridge/cloud_nats_bridge.py b/telematic_system/telematic_units/carma_cloud_bridge/cloud_nats_bridge/src/cloud_nats_bridge/cloud_nats_bridge.py index f02973c1..1728aabf 100755 --- a/telematic_system/telematic_units/carma_cloud_bridge/cloud_nats_bridge/src/cloud_nats_bridge/cloud_nats_bridge.py +++ b/telematic_system/telematic_units/carma_cloud_bridge/cloud_nats_bridge/src/cloud_nats_bridge/cloud_nats_bridge.py @@ -15,12 +15,12 @@ from watchdog.events import FileSystemEventHandler, LoggingEventHandler import xmltodict import pandas as pd +from multiprocessing import Queue +from queue import Empty -#global variables used to store TCR/TCM strings for publishing to nats -new_carma_cloud_message_type = "" -new_carma_cloud_message = "" -last_carma_cloud_message = "" -epoch_time = "" +#global variable for current list of topics subscribed to +subscriber_list = [] +message_queue = Queue() class EventKeys(Enum): EVENT_NAME = "event_name" @@ -58,14 +58,13 @@ def __init__(self, cc_logpath, bridge_logname, tcr_search_string, tcm_search_str self.current_lines = len(f.readlines()) f.close() - self.logger.info("Monitoring this carma cloud file: " + str(self.cc_log_path)) + self.logger.info("FileListener created for: " + str(self.cc_log_path)) def findNewCarmaCloudMessage(self): """This method will parse the newly generated line in the carma cloud log file and assign the xml and message type to the appropriate global variables. It also assigns the epoch_time variable which will be used to create a bridge timestamp that will be added to the message sent to nats """ - global new_carma_cloud_message, new_carma_cloud_message_type, epoch_time with open(f'{self.cc_log_path}', 'r', encoding="utf-8") as f: line_count = 1 @@ -78,24 +77,30 @@ def findNewCarmaCloudMessage(self): continue timestamp = newLine.split(" ")[1] date = self.today.strftime("%m/%d/%y") - datetime = date + " " + timestamp - datetime_converted = pd.to_datetime(datetime) - epoch_time = datetime_converted.timestamp() * 1000000 #convert to microseconds - - messageType = "" - #find beginning of TCR/TCM - if self.tcm_search_string in newLine: - messageType = "TCM" + dt = date + " " + timestamp + dt_converted = pd.to_datetime(dt) + epoch_time = dt_converted.timestamp() * 1000000 #convert to microseconds + + topic = "" + #find beginning of TCR/TCM and send to NATS if the topic is in the subscriber list + if self.tcm_search_string in newLine and "TCM" in subscriber_list: + topic = "TCM" startingIndex = newLine.find("<") - new_carma_cloud_message_type = messageType new_carma_cloud_message = newLine[startingIndex:] - self.logger.info("Carma Cloud generated new " + str(messageType) + " message with payload: " + str(new_carma_cloud_message)) - elif self.tcr_search_string in newLine: - messageType = "TCR" + self.logger.info("Carma Cloud generated new " + str(topic) + " message with payload: " + str(new_carma_cloud_message)) + + message_queue.put([topic, new_carma_cloud_message, epoch_time]) + self.logger.info("Current queue size: " + str(message_queue.qsize())) + + elif self.tcr_search_string in newLine and "TCR" in subscriber_list: + topic = "TCR" startingIndex = newLine.find("<") - new_carma_cloud_message_type = messageType new_carma_cloud_message = newLine[startingIndex:] - self.logger.info("Carma Cloud generated new " + str(messageType) + " message with payload: " + str(new_carma_cloud_message)) + self.logger.info("Carma Cloud generated new " + str(topic) + " message with payload: " + str(new_carma_cloud_message)) + + message_queue.put([topic, new_carma_cloud_message, epoch_time]) + self.logger.info("Current queue size: " + str(message_queue.qsize())) + self.current_lines = line_count line_count += 1 @@ -109,11 +114,6 @@ def on_modified(self, event): with self.lock: self.findNewCarmaCloudMessage() - #Getter method for testing - def getNewCarmaCloudMessageType(self): - global new_carma_cloud_message_type - return new_carma_cloud_message_type - class CloudNatsBridge(): """ The CloudNatsBridge is capable of listening to the carma cloud log file and streaming @@ -142,8 +142,7 @@ def __init__(self): self.unit_name = "Dev CC" self.nc = NATS() self.cloud_topics = ["TCR","TCM"] # list of available carma-cloud topics - self.subscribers_list = [] # list of topics the user has requested to publish - self.async_sleep_rate = 0.0001 # asyncio sleep rate + self.async_sleep_rate = 0.00001 # asyncio sleep rate self.registered = False #Member variables to store the exclusion list self.exclusion_list = [] @@ -170,10 +169,9 @@ def __init__(self): self.exclusion_list.append(excluded.strip()) self.logger.info("Exclusion list: " + str(self.exclusion_list)) + self.file_listener_start() self.logger.info(" Created Cloud-NATS bridge object") - # Start listening to the carma cloud log file - self.file_listener_start() def createLogger(self, log_type): """Creates log file for the Carma cloud bridge with configuration items based on the settings input in the params.yaml file""" @@ -210,7 +208,7 @@ def file_listener_start(self): """ Creates a FileListener object and monitors the carma cloud log assigned in the params yaml file. """ - self.logger.info(" Creating file listener for " + str(self.carma_cloud_log)) + self.logger.info(" Creating file listener for " + str(self.carma_cloud_log) + " with topics: " + str(subscriber_list)) splitter = self.carma_cloud_log.split("/") directory = "/" + splitter[1] + "/" + splitter[2] + "/" + splitter[3] @@ -232,54 +230,39 @@ def xmlToJson(self, xmlString): return json_data - async def nats_send(self): - """ - Sends newly generated TCR/TCMs to nats based on the current subscriber list. - """ - self.logger.info(" In nats_send: Ready to send to nats") - - global new_carma_cloud_message_type, new_carma_cloud_message, last_carma_cloud_message, epoch_time - - try: - while(True): - topic = new_carma_cloud_message_type - #check if topic in subscriber list and compare the new cc message with the last - if topic in self.subscribers_list and (new_carma_cloud_message != last_carma_cloud_message): - #convert the new TCR/TCM to a json - json_data = self.xmlToJson(new_carma_cloud_message) - - #publish to nats if a valid xml was received - if json_data != "": - #add required metadata to TCR/TCM payload - message = {} - message["payload"] = json.loads(json_data) - message[UnitKeys.UNIT_ID.value] = self.unit_id - message[UnitKeys.UNIT_TYPE.value] = self.unit_type - message[UnitKeys.UNIT_NAME.value] = self.unit_name - message[TopicKeys.MSG_TYPE.value] = new_carma_cloud_message_type - message[EventKeys.EVENT_NAME.value] = self.cloud_info[EventKeys.EVENT_NAME.value] - message[EventKeys.TESTING_TYPE.value] = self.cloud_info[EventKeys.TESTING_TYPE.value] - message[EventKeys.LOCATION.value] = self.cloud_info[EventKeys.LOCATION.value] - message[TopicKeys.TOPIC_NAME.value] = topic - message["timestamp"] = datetime.now(timezone.utc).timestamp()*1000000 # utc timestamp in microseconds - message["log_timestamp"] = epoch_time - - # telematic cloud server will look for topic names with the pattern ".data." - self.topic_name = "cloud." + self.unit_id + ".data." + topic - - # publish the encoded data to the nats server - self.logger.info(" In nats_send: Publishing to nats: " + str(message)) - - last_carma_cloud_message = new_carma_cloud_message - - await self.nc.publish(self.topic_name, json.dumps(message).encode('utf-8')) - else: - last_carma_cloud_message = new_carma_cloud_message + async def queue_send(self): + self.logger.info("In queue send") + while(True): + #Try to get a message from the queue, sleep if the queue is empty + try: + cc_message = message_queue.get(block=False) + topic = cc_message[0] + payload = cc_message[1] + log_timestamp = cc_message[2] + json_data = self.xmlToJson(payload) + + #add required metadata to TCR/TCM payload + message = {} + message["payload"] = json.loads(json_data) + message[UnitKeys.UNIT_ID.value] = self.unit_id + message[UnitKeys.UNIT_TYPE.value] = self.unit_type + message[UnitKeys.UNIT_NAME.value] = self.unit_name + message[TopicKeys.MSG_TYPE.value] = topic + message[EventKeys.EVENT_NAME.value] = self.cloud_info[EventKeys.EVENT_NAME.value] + message[EventKeys.TESTING_TYPE.value] = self.cloud_info[EventKeys.TESTING_TYPE.value] + message[EventKeys.LOCATION.value] = self.cloud_info[EventKeys.LOCATION.value] + message[TopicKeys.TOPIC_NAME.value] = topic + message["timestamp"] = datetime.now(timezone.utc).timestamp()*1000000 # utc timestamp in microseconds + message["log_timestamp"] = log_timestamp + + # telematic cloud server will look for topic names with the pattern ".data." + self.topic_name = "cloud." + self.unit_id + ".data." + topic + + self.logger.info(" In queue_send: Publishing to nats: " + str(message)) + await self.nc.publish(self.topic_name, json.dumps(message).encode('utf-8')) + except Empty: await asyncio.sleep(self.async_sleep_rate) - except: - self.logger.error("Error publishing message") - pass async def nats_connect(self): """ @@ -380,7 +363,7 @@ async def send_status(msg): async def publish_topics(self): """ Waits for request from telematic server to create subscriber to selected topics and receive data. When a request - has been received, the topic name is then added to the CloudNatsBridge subscribers_list variable, which will + has been received, the topic name is then added to the CloudNatsBridge subscriber_list variable, which will trigger publishing of that data. """ async def topic_request(msg): @@ -392,10 +375,11 @@ async def topic_request(msg): requested_topics = data['topics'] self.logger.info(" In topic_request: Received a request to publish/remove the following topics: " + str(requested_topics)) - # Update subscriber list with the latest topic request - self.subscribers_list = requested_topics + # Update subscriber list global variable with the latest topic request + global subscriber_list + subscriber_list = requested_topics - self.logger.info(" In topic_request: UPDATED subscriber list: " + str(self.subscribers_list)) + self.logger.info(" In topic_request: UPDATED subscriber list: " + str(subscriber_list)) # Wait for request to publish specific topic and call topic_request callback function try: diff --git a/telematic_system/telematic_units/carma_cloud_bridge/cloud_nats_bridge/src/cloud_nats_bridge/main.py b/telematic_system/telematic_units/carma_cloud_bridge/cloud_nats_bridge/src/cloud_nats_bridge/main.py index e7287b3d..8b5463a7 100755 --- a/telematic_system/telematic_units/carma_cloud_bridge/cloud_nats_bridge/src/cloud_nats_bridge/main.py +++ b/telematic_system/telematic_units/carma_cloud_bridge/cloud_nats_bridge/src/cloud_nats_bridge/main.py @@ -31,8 +31,8 @@ def main(): tasks = [ loop.create_task(cloud_nats_bridge.nats_connect()), loop.create_task(cloud_nats_bridge.register_unit()), - loop.create_task(cloud_nats_bridge.nats_send()), loop.create_task(cloud_nats_bridge.check_status()), + loop.create_task(cloud_nats_bridge.queue_send()), loop.create_task(cloud_nats_bridge.available_topics()), loop.create_task(cloud_nats_bridge.publish_topics()) ] diff --git a/telematic_system/telematic_units/carma_vehicle_bridge/ros2_nats_bridge/ros2_nats_bridge/api.py b/telematic_system/telematic_units/carma_vehicle_bridge/ros2_nats_bridge/ros2_nats_bridge/api.py index 62ade00f..93e786c2 100755 --- a/telematic_system/telematic_units/carma_vehicle_bridge/ros2_nats_bridge/ros2_nats_bridge/api.py +++ b/telematic_system/telematic_units/carma_vehicle_bridge/ros2_nats_bridge/ros2_nats_bridge/api.py @@ -171,7 +171,7 @@ async def register_unit(self): try: response = await self.nc.request(self.vehicle_info[UnitKeys.UNIT_ID.value] + ".register_unit", vehicle_info_message, timeout=5) message = response.data.decode('utf-8') - self.logger.warn( + self.logger.info( "Registering unit received response: {message}".format(message=message)) message_json = json.loads(message) self.vehicle_info[EventKeys.EVENT_NAME.value] = message_json[EventKeys.EVENT_NAME.value] @@ -203,7 +203,7 @@ async def available_topics(self): receives request from server and responds with available topics """ async def send_list_of_topics(msg): - self.logger.warn( + self.logger.info( f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}") self.vehicle_info["timestamp"] = str( @@ -215,7 +215,7 @@ async def send_list_of_topics(msg): await self.nc.publish(msg.reply, message) try: - self.logger.error("Awaiting for available_topics") + self.logger.info("Awaiting for available_topics") await self.nc.subscribe(self.vehicle_info[UnitKeys.UNIT_ID.value] + ".available_topics", self.vehicle_info[UnitKeys.UNIT_ID.value], send_list_of_topics) except: self.logger.error("Error for available_topics") @@ -236,7 +236,7 @@ async def topic_unsubscribe_request(topic): self.destroy_subscription(self.subscribers_list[topic]) # Remove iteration with "topic" del self.subscribers_list[topic] - self.logger.warn('Unsubscribed from "%s"' % topic) + self.logger.info('Unsubscribed from "%s"' % topic) except: self.logger.error("Unable to remove subscription to topic") @@ -246,7 +246,7 @@ async def topic_request(msg): import message type to scope create subscriber for every topic in request message """ - self.logger.warn( + self.logger.info( f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}") await self.nc.publish(msg.reply, b"request received!") data = json.loads(msg.data.decode("utf-8")) @@ -264,10 +264,15 @@ async def topic_request(msg): # Remove topics from subscribers list that weren't called in new request for existing_topic in list(self.subscribers_list.keys()): + is_topic_in_list = False for topics in incoming_topics: - if (existing_topic != topics[0]): - self.logger.info('Trying to unsubscribe from topic: "%s"' % existing_topic) - await topic_unsubscribe_request(existing_topic) + if (existing_topic == topics[0]): + is_topic_in_list = True + break + + if not is_topic_in_list: + self.logger.info('Trying to unsubscribe from topic: "%s"' % existing_topic) + await topic_unsubscribe_request(existing_topic) # Subscribe to topics not in subscriber list for i in incoming_topics: