Skip to content

Commit

Permalink
Telematic bridge nats (#567)
Browse files Browse the repository at this point in the history
<!-- Thanks for the contribution, this is awesome. -->

# PR Details
## Description
This is the implementation story for the telematic bridge to stream
J2735 data from V2xHub into the cloud at near real time. The module
design refers to
https://usdot-carma.atlassian.net/wiki/spaces/WFD2/pages/2640379977/V2xHub+WFD+Bridge+Design
The following functionalities should be implemented for this module:

Able to forward the JSON message to telematic cloud when the message
type is requested by end users through telematic UI.

Able to register itself with WFD cloud services.

Able to map each TMX message type to each individual topic.  

Able to provide available topics to telematic cloud.

Able to stream the requested TMX message in JSON format to telematic
cloud.
<!--- Describe your changes in detail -->

## Related Issue
NA
<!--- This project only accepts pull requests related to open issues -->
<!--- If suggesting a new feature or change, please discuss it in an
issue first -->
<!--- If fixing a bug, there should be an issue describing it with steps
to reproduce -->
<!--- Please link to the issue here: -->

## Motivation and Context
Telematic tool data collection
<!--- Why is this change required? What problem does it solve? -->

## How Has This Been Tested?
Unit test and local integration testing
<!--- Please describe in detail how you tested your changes. -->
<!--- Include details of your testing environment, and the tests you ran
to -->
<!--- see how your change affects other areas of the code, etc. -->

## Types of changes

<!--- What types of changes does your code introduce? Put an `x` in all
the boxes that apply: -->

- [ ] Defect fix (non-breaking change that fixes an issue)
- [x] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that cause existing functionality
to change)

## Checklist:

<!--- Go over all the following points, and put an `x` in all the boxes
that apply. -->
<!--- If you're unsure about any of these, don't hesitate to ask. We're
here to help! -->

- [ ] I have added any new packages to the sonar-scanner.properties file
- [ ] My change requires a change to the documentation.
- [ ] I have updated the documentation accordingly.
- [x] I have read the **CONTRIBUTING** document.
[V2XHUB Contributing
Guide](https://github.com/usdot-fhwa-OPS/V2X-Hub/blob/develop/Contributing.md)
- [ ] I have added tests to cover my changes.
- [ ] All new and existing tests passed.
  • Loading branch information
dan-du-car authored Nov 30, 2023
1 parent 774b31f commit 6b604b7
Show file tree
Hide file tree
Showing 11 changed files with 876 additions and 12 deletions.
3 changes: 2 additions & 1 deletion .devcontainer/docker-compose-vscode.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ services:
- SIM_V2X_PORT=5757
- SIM_INTERACTION_PORT=7576
- V2X_PORT=8686
- INFRASTRUCTURE_ID=1
- INFRASTRUCTURE_ID=rsu_<J2735 MAP MESSAGE INTERSECTION ID>
- INFRASTRUCTURE_NAME=<RSU_NAME>
- SENSOR_JSON_FILE_PATH=/var/www/plugins/MAP/sensors.json
secrets:
- mysql_password
Expand Down
2 changes: 2 additions & 0 deletions configuration/amd64/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ services:
- db
environment:
- MYSQL_PASSWORD=/run/secrets/mysql_password
- INFRASTRUCTURE_ID=rsu_<J2735 MAP MESSAGE INTERSECTION ID>
- INFRASTRUCTURE_NAME=<RSU_NAME>
secrets:
- mysql_password
volumes:
Expand Down
6 changes: 6 additions & 0 deletions src/tmx/TmxUtils/src/simulation/SimulationEnvUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ namespace tmx::utils::sim{
* for CDASim connection.
*/
constexpr inline static const char *INFRASTRUCTURE_ID = "INFRASTRUCTURE_ID";

/**
* @brief Name of environment variable for storing infrastructure name of v2xhub. Only necessary in SIMULATION MODE
* for CDASim connection.
*/
constexpr inline static const char *INFRASTRUCTURE_NAME = "INFRASTRUCTURE_NAME";
/**
* @brief Function to return bool indicating whether V2X-Hub deployment is in SIMULATION MODE or not.
* @return true if SIMULATION_MODE is "true" or "TRUE" and false otherwise.
Expand Down
6 changes: 4 additions & 2 deletions src/v2i-hub/TelematicBridgePlugin/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ PROJECT (TelematicBridgePlugin VERSION 7.5.1 LANGUAGES CXX)
set (TMX_PLUGIN_NAME "Telematic Bridge")

BuildTmxPlugin()
TARGET_LINK_LIBRARIES ( ${PROJECT_NAME} tmxutils jsoncpp)
TARGET_LINK_LIBRARIES ( ${PROJECT_NAME} tmxutils jsoncpp nats)

####################################################
################## Testing #######################
####################################################
add_library(${PROJECT_NAME}_lib src/TelematicUnit.cpp)
target_link_libraries(${PROJECT_NAME}_lib PUBLIC tmxutils jsoncpp nats)
enable_testing()
include_directories(${PROJECT_SOURCE_DIR}/src)
file(GLOB_RECURSE TEST_SOURCES LIST_DIRECTORIES false test/*.h test/*.cpp)
add_executable(${PROJECT_NAME}_test ${TEST_SOURCES})
target_link_libraries(${PROJECT_NAME}_test PRIVATE gtest tmxutils jsoncpp)
target_link_libraries(${PROJECT_NAME}_test PRIVATE gtest ${PROJECT_NAME}_lib)
96 changes: 96 additions & 0 deletions src/v2i-hub/TelematicBridgePlugin/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
## Telematic bridge introduction
Telematic bridge is a V2xHub plugin used to collect data stream generated by V2xHub and send the data stream into [telematic tool](https://github.com/usdot-fhwa-stol/cda-telematics) hosted in the AWS cloud.
## NATS Publisher/Subscriber
### NATS Connections and registration
#### Telematic plugin sends registration request to telematic server
##### Subject
NATS subject: *.register_unit
##### Request
```
{
"unit_id": "<unit_id>"
}
```
##### Response
```
{
"unit_id": "<unit_id>",
"unit_type": "infrastructure",
"unit_name": "East Intersection",
"timestamp": "1678998191815233965",
"event_name": "wfd_integration_testing",
"location": "TFHRC",
"testing_type": "Integration"
}
```

### Available topics
#### Telematic plugin receives request from telematic UI
##### Subject
NATS subject: <unit_id>.available_topics
##### Request
```
{
"unit_id": "<unit_id>"
}
```
##### Reply

```
{
"unit_id": "<unit_id>",
"unit_type": "infrastructure",
"unit_name": "East Intersection",
"timestamp": "1678998191815233965",
"event_name": "wfd_integration_testing",
"location": "TFHRC",
"testing_type": "Integration",
"topics": [
{
"name": "J2735_TMSG03-P_CARMAStreetsPlugin"
},
{
"name": "<topic_name_2>"
}
]
}
```

### Selected topics
#### Telematic plugin receives selected topics from telematic UI
##### Subject
NATS subject: <unit_id>.publish_topics
#### Request
```
{
"unit_id": "<unit_id>",
"unit_type": "infrastructure",
"timestamp": 1663084528513000400,
"event_name": "wfd_integration_testing",
"location": "TFHRC",
"testing_type": "Integration",
"topics": [
"<topic_name_1>",
"<topic_name_2>"
]
}
```
##### Reply
```
"request received!"
```

## Check status
#### Telematic plugin receives live status request from telematic server
##### Subject
NATS subject: <unit_id>.check_status
##### Request
```
{
"unit_id": "<unit_id>"
}
```
##### Reponse
```
"OK"
```
10 changes: 10 additions & 0 deletions src/v2i-hub/TelematicBridgePlugin/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@
"key": "LogLevel",
"default": "INFO",
"description": "The log level for this plugin"
},
{
"key": "NATSUrl",
"default": "nats://127.0.0.1:4222",
"description": "The NATS connection URL"
},
{
"key": "MessageExclusionList",
"default": "System_KeepAlive_CommandPlugin,System_KeepAlive_CARMAStreetsPlugin,System_KeepAlive_CDASimAdapter,System_KeepAlive_MessageReceiver",
"description": "The list of messages are excluded from the available message list. Message name is a combination of message type, subtype and source separated by underscore. E.G: type_subtype_source"
}
]
}
54 changes: 50 additions & 4 deletions src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
#include "TelematicBridgePlugin.h"

using namespace tmx::utils;
using namespace std;

namespace TelematicBridge
{
TelematicBridgePlugin::TelematicBridgePlugin(const string &name) : PluginClient(name)
{
_telematicUnitPtr = make_unique<TelematicUnit>();
_unitId = std::getenv("INFRASTRUCTURE_ID");
_unitName = std::getenv("INFRASTRUCTURE_NAME");
UpdateConfigSettings();
AddMessageFilter("*", "*", IvpMsgFlags_None);
AddMessageFilter("J2735", "*", IvpMsgFlags_RouteDSRC);
SubscribeToMessages();
Expand All @@ -20,14 +27,53 @@ namespace TelematicBridge
auto messageFm = (MessageFrame_t *)calloc(1, sizeof(MessageFrame_t));
DecodeJ2735Msg(msg->payload->valuestring, messageFm);
string xml_payload_str = ConvertJ2735FrameToXML(messageFm);
ASN_STRUCT_FREE(asn_DEF_MessageFrame, messageFm);
json["payload"] = StringToJson(xml2Json(xml_payload_str));
ASN_STRUCT_FREE(asn_DEF_MessageFrame, messageFm);
string json_payload_str = xml2Json(xml_payload_str.c_str());
json["payload"] = StringToJson(json_payload_str);
}

auto jsonStr = JsonToString(json);
PLOG(logINFO) << jsonStr;
stringstream topic;
topic << (msg->type ? msg->type : "") << "_" << (msg->subtype ? msg->subtype : "") << "_" << (msg->source ? msg->source : "");
auto topicStr = topic.str();
_telematicUnitPtr->updateAvailableTopics(topicStr);
if (_telematicUnitPtr->inSelectedTopics(topicStr))
{
_telematicUnitPtr->publishMessage(topicStr, json);
}
}
}

void TelematicBridgePlugin::UpdateConfigSettings()
{
lock_guard<mutex> lock(_configMutex);
GetConfigValue<string>("NATSUrl", _natsURL);
GetConfigValue<string>("MessageExclusionList", _excludedMessages);
unit_st unit = {_unitId, _unitName, UNIT_TYPE_INFRASTRUCTURE};
if (_telematicUnitPtr)
{
_telematicUnitPtr->setUnit(unit);
_telematicUnitPtr->updateExcludedTopics(_excludedMessages);
}
}

void TelematicBridgePlugin::OnStateChange(IvpPluginState state)
{
PluginClient::OnStateChange(state);
if (state == IvpPluginState_registered)
{
UpdateConfigSettings();
if (_telematicUnitPtr)
{
_telematicUnitPtr->connect(_natsURL);
}
}
}

void TelematicBridgePlugin::OnConfigChanged(const char *key, const char *value)
{
PluginClient::OnConfigChanged(key, value);
UpdateConfigSettings();
}
}

// The main entry point for this application.
Expand Down
19 changes: 14 additions & 5 deletions src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,30 @@

#include "PluginClient.h"
#include "TelematicBridgeMsgWorker.h"
#include "TelematicUnit.h"
#include <simulation/SimulationEnvUtils.h>


using namespace tmx::utils;
using namespace std;
namespace TelematicBridge
{


class TelematicBridgePlugin : public tmx::utils::PluginClient
{
private:
static CONSTEXPR const char *Telematic_MSGTYPE_J2735_STRING = "J2735";
static CONSTEXPR const char *UNIT_TYPE_INFRASTRUCTURE = "Infrastructure";
std::unique_ptr<TelematicUnit> _telematicUnitPtr;
std::string _unitId;
std::string _unitName;
std::string _natsURL;
std::string _excludedMessages;
std::mutex _configMutex;
void OnMessageReceived(IvpMessage *msg);

public:
explicit TelematicBridgePlugin(const string& name);
explicit TelematicBridgePlugin(const std::string &name);
void OnConfigChanged(const char *key, const char *value) override;
void OnStateChange(IvpPluginState state) override;
void UpdateConfigSettings();
~TelematicBridgePlugin() override = default;
};

Expand Down
Loading

0 comments on commit 6b604b7

Please sign in to comment.