Skip to content

Commit

Permalink
Data type conflict on message fields with intermittent alphanumeric v…
Browse files Browse the repository at this point in the history
…alues prevents writing to influx (#140)

<!-- Thanks for the contribution, this is awesome. -->

# PR Details
## Description
Data type conflict on message fields with intermittent alphanumeric
values prevents writing to influx
<!--- Describe your changes in detail -->

## Related Issue
#138
<!--- This project only accepts pull requests related to open issues -->
<!--- If suggesting a new feature or change, please discuss it in an
issue first -->
<!--- If fixing a bug, there should be an issue describing it with steps
to reproduce -->
<!--- Please link to the issue here: -->

## Motivation and Context

<!--- Why is this change required? What problem does it solve? -->

## How Has This Been Tested?

<!--- Please describe in detail how you tested your changes. -->
<!--- Include details of your testing environment, and the tests you ran
to -->
<!--- see how your change affects other areas of the code, etc. -->

## Types of changes

<!--- What types of changes does your code introduce? Put an `x` in all
the boxes that apply: -->

- [x] Defect fix (non-breaking change that fixes an issue)
- [ ] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that cause existing functionality
to change)

## Checklist:

<!--- Go over all the following points, and put an `x` in all the boxes
that apply. -->
<!--- If you're unsure about any of these, don't hesitate to ask. We're
here to help! -->

- [ ] I have added any new packages to the sonar-scanner.properties file
- [ ] My change requires a change to the documentation.
- [ ] I have updated the documentation accordingly.
- [x] I have read the **CONTRIBUTING** document.
[CDA Telematics Contributing
Guide](https://github.com/usdot-fhwa-stol/cda-telematics/blob/main/Contributing.md)
- [ ] I have added tests to cover my changes.
- [ ] All new and existing tests passed.
  • Loading branch information
adev4a authored Apr 25, 2023
2 parents d6d46a2 + c6a113a commit 5e21503
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.telematic.telematic_cloud_messaging.message_converters;

import java.util.List;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
Expand All @@ -9,9 +10,10 @@
public class JSON2KeyValuePairsConverter {
/**
* @param json_str String JSON format consumed from NATS subject
* @param to_str_values Array of String to match in the JSON and convert the match to String data type value
* @return String of key value pairs separated by commas.
*/
public String convertJson2KeyValuePairs(String json_str) {
public String convertJson2KeyValuePairs(String json_str, List<String> to_str_values) {
String pairs = "";
JSONParser parser = new JSONParser();
try {
Expand All @@ -26,22 +28,33 @@ public String convertJson2KeyValuePairs(String json_str) {
pairs += key + "=\"NA\"";
}
else {
// Regex matching integers
if (value.toString().matches("[-+]?\\d*")) {
pairs += key + "=" + value;
}
// Regex matching decimals
else if (value.toString().matches("[-+]?\\d*\\.?\\d+")) {
pairs += key + "=" + value;
}
//Regex matching scientific notation. InfluxDB does not support scientific notation float syntax, temporarily set this kind of value = 0.0
else if (value.toString().matches("^[+-]?\\d+(?:\\.\\d*(?:[eE][+-]?\\d+)?)?$")) {
pairs += key + "=" + 0.0;
}
// If none of the above Regex matches, considering it as string
else {
pairs += key + "=\"" + value.toString().replaceAll("\\s", "") + "\"";
boolean is_skip = false;
for (String value_item: to_str_values)
{
if (key.toString().contains(value_item)){
pairs += key + "=\"" + value.toString().replaceAll("\\s", "") + "\"";
is_skip = true;
}
}
if(!is_skip)
{
// Regex matching integers
if (value.toString().matches("[-+]?\\d*")) {
pairs += key + "=" + value;
}
// Regex matching decimals
else if (value.toString().matches("[-+]?\\d*\\.?\\d+")) {
pairs += key + "=" + value;
}
//Regex matching scientific notation. InfluxDB does not support scientific notation float syntax, temporarily set this kind of value = 0.0
else if (value.toString().matches("^[+-]?\\d+(?:\\.\\d*(?:[eE][+-]?\\d+)?)?$")) {
pairs += key + "=" + 0.0;
}
// If none of the above Regex matches, considering it as string
else {
pairs += key + "=\"" + value.toString().replaceAll("\\s", "") + "\"";
}
}
}

if (json.keySet().size() != key_count) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.telematic.telematic_cloud_messaging.nats_influx_connection;

import java.util.*;

/**
* The Config object instantiates a configuration object which stores information to create a connection to the telematic nats server
* and influxdb bucket.
Expand Down Expand Up @@ -61,6 +63,8 @@ public enum BucketType{
String streets_unit_id_list;
// List of cloud unit ids
String cloud_unit_id_list;
//List of values in the stream that should only be set to string data type
List<String> to_str_values;

public Config(){}

Expand Down Expand Up @@ -88,7 +92,8 @@ public String ToString(){
"\nnats_topic_per_dispatcher: " + topics_per_dispatcher+
"\nvehicle_unit_id_list: " + vehicle_unit_id_list +
"\nstreets_unit_id_list: " + streets_unit_id_list +
"\ncloud_unit_id_list: " + cloud_unit_id_list);
"\ncloud_unit_id_list: " + cloud_unit_id_list +
"\nto_str_values:" + to_str_values.toString());

return config_str;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,19 @@
package com.telematic.telematic_cloud_messaging.nats_influx_connection;

import java.io.*;
import java.util.Properties;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;

import com.fasterxml.jackson.annotation.JsonValue;
import com.google.gson.JsonObject;
import com.influxdb.client.*;
import com.influxdb.client.InfluxDBClientOptions;
import com.influxdb.client.domain.Authorization;
import com.influxdb.client.domain.WritePrecision;
import com.telematic.telematic_cloud_messaging.message_converters.JSONFlattenerHelper;

import okhttp3.OkHttpClient;

import com.telematic.telematic_cloud_messaging.message_converters.JSON2KeyValuePairsConverter;

import org.json.simple.JSONValue;
import org.json.simple.parser.JSONParser;
import org.json.*;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
Expand Down Expand Up @@ -202,7 +194,7 @@ public String influxStringConverter(String publishData) {
JSONObject payloadJson = publishDataJson.getJSONObject("payload");

String flattenedPayloadJson = jsonFlattener.flattenJsonStr(payloadJson.toString());
String keyValuePairs = keyValueConverter.convertJson2KeyValuePairs(flattenedPayloadJson);
String keyValuePairs = keyValueConverter.convertJson2KeyValuePairs(flattenedPayloadJson, config_.to_str_values);

String unit_id = publishDataJson.getString("unit_id").replaceAll("\\s", "_");
String unit_type = publishDataJson.getString("unit_type").replaceAll("\\s", "_");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
package com.telematic.telematic_cloud_messaging.nats_influx_connection;

import io.nats.client.*;
import java.io.*;
import java.util.Properties;
import java.util.List;
import java.util.ArrayList;
import java.nio.charset.StandardCharsets;
import org.springframework.boot.CommandLineRunner;
import com.telematic.telematic_cloud_messaging.nats_influx_connection.InfluxDataWriter;
import com.telematic.telematic_cloud_messaging.message_converters.JSONFlattenerHelper;
import com.telematic.telematic_cloud_messaging.message_converters.JSON2KeyValuePairsConverter;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import org.apache.commons.lang3.exception.ExceptionUtils;
Expand Down Expand Up @@ -102,7 +96,6 @@ public void nats_connect() {
* the topic list variable
*/
public void updateAvailableTopicList() {
String error_msg = "";

for (String unit_id: unit_id_list) {
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import org.springframework.context.annotation.Profile;

import java.io.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
import com.telematic.telematic_cloud_messaging.nats_influx_connection.Config;
import com.telematic.telematic_cloud_messaging.nats_influx_connection.InfluxDataWriter;
Expand Down Expand Up @@ -71,6 +73,7 @@ static Config getConfigValues() {
config.vehicle_unit_id_list = prop.getProperty("VEHICLE_UNIT_ID_LIST");
config.streets_unit_id_list = prop.getProperty("STREETS_UNIT_ID_LIST");
config.cloud_unit_id_list = prop.getProperty("CLOUD_UNIT_ID_LIST");
config.to_str_values = Arrays.asList(prop.getProperty("TO_STR_VALUES").split(","));

try{
config.influx_bucket_type = BucketType.valueOf(prop.getProperty("INFLUX_BUCKET_TYPE"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ CLOUD_UNIT_ID_LIST=cloud_id
INFLUX_ORG=my-org
INFLUX_ORG_ID=<INFLUX_ORG_ID>
INFLUX_TOKEN=<INFLUXDB-TOKEN>
#Edge case for hostBSMId and TCR/TCM ID where the Ids can be all digits or alpha characters
TO_STR_VALUES=hostBSMId,TrafficControlRequest.reqid,tcmV01.reqid
# Connection timeout to influx bucket. Unit: milliseconds
INFLUX_CONNECT_TIMEOUT=1000
# Timeout while writing data to influx. Unit: milliseconds
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,33 @@ public void flattenJson() {
} catch (ParseException e) {
e.printStackTrace();
}

String mo_json_str = "{\"payload\":{\"metadata\":{\"hostBSMId\":\"bb906b4e\",\"hostStaticId\":\"DOT-45244\",\"planId\":\"a95fe925-1c09-43eb-b40e-5abe7611b187\",\"targetStaticId\":\"UNSET\",\"timestamp\":\"0000001681146252589\"},\"trajectory\":{\"location\":{\"ecefX\":110457775,\"ecefY\":-484207718,\"ecefZ\":398843563,\"timestamp\":\"0000000000000000000\"},\"offsets\":[{\"offsetX\":51,\"offsetY\":-3,\"offsetZ\":-18},{\"offsetX\":96,\"offsetY\":-1,\"offsetZ\":-28},{\"offsetX\":96,\"offsetY\":-2,\"offsetZ\":-29},{\"offsetX\":96,\"offsetY\":-2,\"offsetZ\":-28},{\"offsetX\":96,\"offsetY\":-1,\"offsetZ\":-28},{\"offsetX\":96,\"offsetY\":-2,\"offsetZ\":-29},{\"offsetX\":96,\"offsetY\":-1,\"offsetZ\":-28},{\"offsetX\":96,\"offsetY\":-2,\"offsetZ\":-29},{\"offsetX\":97,\"offsetY\":-1,\"offsetZ\":-28},{\"offsetX\":96,\"offsetY\":-2,\"offsetZ\":-28},{\"offsetX\":96,\"offsetY\":0,\"offsetZ\":-27},{\"offsetX\":97,\"offsetY\":-1,\"offsetZ\":-27},{\"offsetX\":96,\"offsetY\":1,\"offsetZ\":-26},{\"offsetX\":97,\"offsetY\":1,\"offsetZ\":-26},{\"offsetX\":97,\"offsetY\":1,\"offsetZ\":-25},{\"offsetX\":98,\"offsetY\":2,\"offsetZ\":-24},{\"offsetX\":97,\"offsetY\":3,\"offsetZ\":-24},{\"offsetX\":97,\"offsetY\":3,\"offsetZ\":-23},{\"offsetX\":98,\"offsetY\":3,\"offsetZ\":-22},{\"offsetX\":98,\"offsetY\":5,\"offsetZ\":-22},{\"offsetX\":97,\"offsetY\":4,\"offsetZ\":-21},{\"offsetX\":77,\"offsetY\":4,\"offsetZ\":-16},{\"offsetX\":100,\"offsetY\":6,\"offsetZ\":-21},{\"offsetX\":100,\"offsetY\":7,\"offsetZ\":-20},{\"offsetX\":100,\"offsetY\":6,\"offsetZ\":-19},{\"offsetX\":100,\"offsetY\":8,\"offsetZ\":-18},{\"offsetX\":100,\"offsetY\":8,\"offsetZ\":-18},{\"offsetX\":100,\"offsetY\":9,\"offsetZ\":-17},{\"offsetX\":100,\"offsetY\":9,\"offsetZ\":-16},{\"offsetX\":101,\"offsetY\":10,\"offsetZ\":-16},{\"offsetX\":100,\"offsetY\":10,\"offsetZ\":-15},{\"offsetX\":100,\"offsetY\":10,\"offsetZ\":-16},{\"offsetX\":101,\"offsetY\":11,\"offsetZ\":-14},{\"offsetX\":100,\"offsetY\":11,\"offsetZ\":-15},{\"offsetX\":100,\"offsetY\":11,\"offsetZ\":-15},{\"offsetX\":101,\"offsetY\":10,\"offsetZ\":-14},{\"offsetX\":100,\"offsetY\":11,\"offsetZ\":-14},{\"offsetX\":100,\"offsetY\":12,\"offsetZ\":-15},{\"offsetX\":101,\"offsetY\":11,\"offsetZ\":-14},{\"offsetX\":100,\"offsetY\":11,\"offsetZ\":-14},{\"offsetX\":100,\"offsetY\":12,\"offsetZ\":-14},{\"offsetX\":101,\"offsetY\":11,\"offsetZ\":-13},{\"offsetX\":100,\"offsetY\":12,\"offsetZ\":-14},{\"offsetX\":101,\"offsetY\":11,\"offsetZ\":-13}]}},\"unit_id\":\"streets_id\",\"unit_type\":\"infrastructure\",\"unit_name\":\"WestIntersection\",\"msg_type\":\"v2xhub_mobility_path_in\",\"event_name\":\"WFD_Verification_2\",\"testing_type\":\"Verification\",\"location\":\"TFHRC\",\"topic_name\":\"v2xhub_mobility_path_in\",\"timestamp\":1681146252589000}";
String flattened_mo_json_str = helper.flattenJsonStr(mo_json_str);
try {
JSONObject flattened_json = (JSONObject) parser.parse(flattened_mo_json_str);
assertEquals("bb906b4e", flattened_json.get("payload.metadata.hostBSMId"));
} catch (ParseException e) {
e.printStackTrace();
}

String tcr_json_str = "{\"payload\":{\"TrafficControlRequest\":{\"@port\":\"22222\",\"@list\":\"true\",\"reqid\":\"4947918446524149\",\"reqseq\":\"0\",\"scale\":\"-1\",\"bounds\":{\"oldest\":\"28000218\",\"reflon\":\"-771510185\",\"reflat\":\"389543898\",\"offsets\":[{\"deltax\":\"3232\",\"deltay\":\"0\"},{\"deltax\":\"3232\",\"deltay\":\"1577\"},{\"deltax\":\"0\",\"deltay\":\"1577\"}]}}},\"unit_id\":\"cloud_id\",\"unit_type\":\"infrastructure\",\"unit_name\":\"DevCC\",\"msg_type\":\"TCR\",\"event_name\":\"WFD_Verification_2\",\"testing_type\":\"Verification\",\"location\":\"TFHRC\",\"topic_name\":\"TCR\",\"timestamp\":1681309089575201.0,\"log_timestamp\":1681309089425000.0}";
String flattened_tcr_json_str = helper.flattenJsonStr(tcr_json_str);
try {
JSONObject flattened_json = (JSONObject) parser.parse(flattened_tcr_json_str);
assertEquals("4947918446524149", flattened_json.get("payload.TrafficControlRequest.reqid"));
} catch (ParseException e) {
e.printStackTrace();
}

String tcm_json_str ="{\"payload\":{\"TrafficControlMessage\":{\"tcmV01\":{\"reqid\":\"4947918446524142\",\"reqseq\":\"0\",\"msgtot\":\"1\",\"msgnum\":\"1\",\"id\":\"007d1d1c5ea3f134ab2e9d868a033372\",\"updated\":\"0\",\"params\":{\"vclasses\":{\"micromobile\":\"None\",\"motorcycle\":\"None\",\"passenger-car\":\"None\",\"light-truck-van\":\"None\",\"bus\":\"None\",\"two-axle-six-tire-single-unit-truck\":\"None\",\"three-axle-single-unit-truck\":\"None\",\"four-or-more-axle-single-unit-truck\":\"None\",\"four-or-fewer-axle-single-trailer-truck\":\"None\",\"five-axle-single-trailer-truck\":\"None\",\"six-or-more-axle-single-trailer-truck\":\"None\",\"five-or-fewer-axle-multi-trailer-truck\":\"None\",\"six-axle-multi-trailer-truck\":\"None\",\"seven-or-more-axle-multi-trailer-truck\":\"None\"},\"schedule\":{\"start\":\"28004757\",\"end\":\"153722867280912\",\"dow\":\"1111111\"},\"regulatory\":{\"true\":\"None\"},\"detail\":{\"maxspeed\":\"45\"}},\"geometry\":{\"proj\":\"epsg:3785\",\"datum\":\"WGS84\",\"reftime\":\"28004757\",\"reflon\":\"-771490031\",\"reflat\":\"389549140\",\"refelv\":\"0\",\"refwidth\":\"405\",\"heading\":\"3312\"}}}}}";
String flattened_tcm_json_str = helper.flattenJsonStr(tcm_json_str);
System.out.println(flattened_tcm_json_str);
try {
JSONObject flattened_json = (JSONObject) parser.parse(flattened_tcm_json_str);
assertEquals("4947918446524142", flattened_json.get("payload.TrafficControlMessage.tcmV01.reqid"));
} catch (ParseException e) {
e.printStackTrace();
}
}
}

0 comments on commit 5e21503

Please sign in to comment.