diff --git a/.devcontainer/docker-compose-vscode.yml b/.devcontainer/docker-compose-vscode.yml index cc113ce62..9035d1f9d 100755 --- a/.devcontainer/docker-compose-vscode.yml +++ b/.devcontainer/docker-compose-vscode.yml @@ -24,7 +24,8 @@ services: - SIM_V2X_PORT=5757 - SIM_INTERACTION_PORT=7576 - V2X_PORT=8686 - - INFRASTRUCTURE_ID=1 + - INFRASTRUCTURE_ID=rsu_ + - INFRASTRUCTURE_NAME= - SENSOR_JSON_FILE_PATH=/var/www/plugins/MAP/sensors.json secrets: - mysql_password diff --git a/configuration/amd64/docker-compose.yml b/configuration/amd64/docker-compose.yml index 6b5188297..0c9d5feb2 100755 --- a/configuration/amd64/docker-compose.yml +++ b/configuration/amd64/docker-compose.yml @@ -38,6 +38,8 @@ services: - db environment: - MYSQL_PASSWORD=/run/secrets/mysql_password + - INFRASTRUCTURE_ID=rsu_ + - INFRASTRUCTURE_NAME= secrets: - mysql_password volumes: diff --git a/src/tmx/TmxUtils/src/simulation/SimulationEnvUtils.h b/src/tmx/TmxUtils/src/simulation/SimulationEnvUtils.h index 7498a7922..efffc4ffa 100644 --- a/src/tmx/TmxUtils/src/simulation/SimulationEnvUtils.h +++ b/src/tmx/TmxUtils/src/simulation/SimulationEnvUtils.h @@ -66,6 +66,12 @@ namespace tmx::utils::sim{ * for CDASim connection. */ constexpr inline static const char *INFRASTRUCTURE_ID = "INFRASTRUCTURE_ID"; + + /** + * @brief Name of environment variable for storing infrastructure name of v2xhub. Only necessary in SIMULATION MODE + * for CDASim connection. + */ + constexpr inline static const char *INFRASTRUCTURE_NAME = "INFRASTRUCTURE_NAME"; /** * @brief Function to return bool indicating whether V2X-Hub deployment is in SIMULATION MODE or not. * @return true if SIMULATION_MODE is "true" or "TRUE" and false otherwise. diff --git a/src/v2i-hub/TelematicBridgePlugin/CMakeLists.txt b/src/v2i-hub/TelematicBridgePlugin/CMakeLists.txt index 5134ae38b..df7b1c0f7 100644 --- a/src/v2i-hub/TelematicBridgePlugin/CMakeLists.txt +++ b/src/v2i-hub/TelematicBridgePlugin/CMakeLists.txt @@ -3,13 +3,15 @@ PROJECT (TelematicBridgePlugin VERSION 7.5.1 LANGUAGES CXX) set (TMX_PLUGIN_NAME "Telematic Bridge") BuildTmxPlugin() -TARGET_LINK_LIBRARIES ( ${PROJECT_NAME} tmxutils jsoncpp) +TARGET_LINK_LIBRARIES ( ${PROJECT_NAME} tmxutils jsoncpp nats) #################################################### ################## Testing ####################### #################################################### +add_library(${PROJECT_NAME}_lib src/TelematicUnit.cpp) +target_link_libraries(${PROJECT_NAME}_lib PUBLIC tmxutils jsoncpp nats) enable_testing() include_directories(${PROJECT_SOURCE_DIR}/src) file(GLOB_RECURSE TEST_SOURCES LIST_DIRECTORIES false test/*.h test/*.cpp) add_executable(${PROJECT_NAME}_test ${TEST_SOURCES}) -target_link_libraries(${PROJECT_NAME}_test PRIVATE gtest tmxutils jsoncpp) \ No newline at end of file +target_link_libraries(${PROJECT_NAME}_test PRIVATE gtest ${PROJECT_NAME}_lib) \ No newline at end of file diff --git a/src/v2i-hub/TelematicBridgePlugin/README.md b/src/v2i-hub/TelematicBridgePlugin/README.md new file mode 100644 index 000000000..6ed0a0497 --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/README.md @@ -0,0 +1,96 @@ +## Telematic bridge introduction +Telematic bridge is a V2xHub plugin used to collect data stream generated by V2xHub and send the data stream into [telematic tool](https://github.com/usdot-fhwa-stol/cda-telematics) hosted in the AWS cloud. +## NATS Publisher/Subscriber +### NATS Connections and registration +#### Telematic plugin sends registration request to telematic server +##### Subject +NATS subject: *.register_unit +##### Request +``` +{ + "unit_id": "" +} +``` +##### Response +``` +{ + "unit_id": "", + "unit_type": "infrastructure", + "unit_name": "East Intersection", + "timestamp": "1678998191815233965", + "event_name": "wfd_integration_testing", + "location": "TFHRC", + "testing_type": "Integration" +} +``` + +### Available topics +#### Telematic plugin receives request from telematic UI +##### Subject +NATS subject: .available_topics +##### Request +``` +{ + "unit_id": "" +} +``` +##### Reply + +``` + { + "unit_id": "", + "unit_type": "infrastructure", + "unit_name": "East Intersection", + "timestamp": "1678998191815233965", + "event_name": "wfd_integration_testing", + "location": "TFHRC", + "testing_type": "Integration", + "topics": [ + { + "name": "J2735_TMSG03-P_CARMAStreetsPlugin" + }, + { + "name": "" + } + ] +} +``` + +### Selected topics +#### Telematic plugin receives selected topics from telematic UI +##### Subject +NATS subject: .publish_topics +#### Request +``` +{ + "unit_id": "", + "unit_type": "infrastructure", + "timestamp": 1663084528513000400, + "event_name": "wfd_integration_testing", + "location": "TFHRC", + "testing_type": "Integration", + "topics": [ + "", + "" + ] +} +``` +##### Reply +``` +"request received!" +``` + +## Check status +#### Telematic plugin receives live status request from telematic server +##### Subject +NATS subject: .check_status +##### Request +``` +{ + "unit_id": "" +} +``` +##### Reponse +``` +"OK" +``` \ No newline at end of file diff --git a/src/v2i-hub/TelematicBridgePlugin/manifest.json b/src/v2i-hub/TelematicBridgePlugin/manifest.json index 530dbe0eb..d133ceb0f 100644 --- a/src/v2i-hub/TelematicBridgePlugin/manifest.json +++ b/src/v2i-hub/TelematicBridgePlugin/manifest.json @@ -11,6 +11,16 @@ "key": "LogLevel", "default": "INFO", "description": "The log level for this plugin" + }, + { + "key": "NATSUrl", + "default": "nats://127.0.0.1:4222", + "description": "The NATS connection URL" + }, + { + "key": "MessageExclusionList", + "default": "System_KeepAlive_CommandPlugin,System_KeepAlive_CARMAStreetsPlugin,System_KeepAlive_CDASimAdapter,System_KeepAlive_MessageReceiver", + "description": "The list of messages are excluded from the available message list. Message name is a combination of message type, subtype and source separated by underscore. E.G: type_subtype_source" } ] } \ No newline at end of file diff --git a/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.cpp b/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.cpp index a316084f4..44a6738f2 100644 --- a/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.cpp +++ b/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.cpp @@ -1,9 +1,16 @@ #include "TelematicBridgePlugin.h" +using namespace tmx::utils; +using namespace std; + namespace TelematicBridge { TelematicBridgePlugin::TelematicBridgePlugin(const string &name) : PluginClient(name) { + _telematicUnitPtr = make_unique(); + _unitId = std::getenv("INFRASTRUCTURE_ID"); + _unitName = std::getenv("INFRASTRUCTURE_NAME"); + UpdateConfigSettings(); AddMessageFilter("*", "*", IvpMsgFlags_None); AddMessageFilter("J2735", "*", IvpMsgFlags_RouteDSRC); SubscribeToMessages(); @@ -20,14 +27,53 @@ namespace TelematicBridge auto messageFm = (MessageFrame_t *)calloc(1, sizeof(MessageFrame_t)); DecodeJ2735Msg(msg->payload->valuestring, messageFm); string xml_payload_str = ConvertJ2735FrameToXML(messageFm); - ASN_STRUCT_FREE(asn_DEF_MessageFrame, messageFm); - json["payload"] = StringToJson(xml2Json(xml_payload_str)); + ASN_STRUCT_FREE(asn_DEF_MessageFrame, messageFm); + string json_payload_str = xml2Json(xml_payload_str.c_str()); + json["payload"] = StringToJson(json_payload_str); } - auto jsonStr = JsonToString(json); - PLOG(logINFO) << jsonStr; + stringstream topic; + topic << (msg->type ? msg->type : "") << "_" << (msg->subtype ? msg->subtype : "") << "_" << (msg->source ? msg->source : ""); + auto topicStr = topic.str(); + _telematicUnitPtr->updateAvailableTopics(topicStr); + if (_telematicUnitPtr->inSelectedTopics(topicStr)) + { + _telematicUnitPtr->publishMessage(topicStr, json); + } + } + } + + void TelematicBridgePlugin::UpdateConfigSettings() + { + lock_guard lock(_configMutex); + GetConfigValue("NATSUrl", _natsURL); + GetConfigValue("MessageExclusionList", _excludedMessages); + unit_st unit = {_unitId, _unitName, UNIT_TYPE_INFRASTRUCTURE}; + if (_telematicUnitPtr) + { + _telematicUnitPtr->setUnit(unit); + _telematicUnitPtr->updateExcludedTopics(_excludedMessages); + } + } + + void TelematicBridgePlugin::OnStateChange(IvpPluginState state) + { + PluginClient::OnStateChange(state); + if (state == IvpPluginState_registered) + { + UpdateConfigSettings(); + if (_telematicUnitPtr) + { + _telematicUnitPtr->connect(_natsURL); + } } } + + void TelematicBridgePlugin::OnConfigChanged(const char *key, const char *value) + { + PluginClient::OnConfigChanged(key, value); + UpdateConfigSettings(); + } } // The main entry point for this application. diff --git a/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.h b/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.h index c790b6ca4..a75b21bbe 100644 --- a/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.h +++ b/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.h @@ -3,21 +3,30 @@ #include "PluginClient.h" #include "TelematicBridgeMsgWorker.h" +#include "TelematicUnit.h" +#include + -using namespace tmx::utils; -using namespace std; namespace TelematicBridge { - - class TelematicBridgePlugin : public tmx::utils::PluginClient { private: static CONSTEXPR const char *Telematic_MSGTYPE_J2735_STRING = "J2735"; + static CONSTEXPR const char *UNIT_TYPE_INFRASTRUCTURE = "Infrastructure"; + std::unique_ptr _telematicUnitPtr; + std::string _unitId; + std::string _unitName; + std::string _natsURL; + std::string _excludedMessages; + std::mutex _configMutex; void OnMessageReceived(IvpMessage *msg); public: - explicit TelematicBridgePlugin(const string& name); + explicit TelematicBridgePlugin(const std::string &name); + void OnConfigChanged(const char *key, const char *value) override; + void OnStateChange(IvpPluginState state) override; + void UpdateConfigSettings(); ~TelematicBridgePlugin() override = default; }; diff --git a/src/v2i-hub/TelematicBridgePlugin/src/TelematicUnit.cpp b/src/v2i-hub/TelematicBridgePlugin/src/TelematicUnit.cpp new file mode 100644 index 000000000..e99029b53 --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/src/TelematicUnit.cpp @@ -0,0 +1,320 @@ +#include "TelematicUnit.h" + +using namespace std; +using namespace tmx::utils; +using namespace std::chrono; + +namespace TelematicBridge +{ + void TelematicUnit::connect(const string &natsURL) + { + auto s = natsConnection_ConnectTo(&_conn, natsURL.c_str()); + PLOG(logINFO) << "NATS connection returned: " << natsStatus_GetText(s); + if (s == NATS_OK) + { + registerUnitRequestor(); + } + else + { + throw TelematicBridgeException(natsStatus_GetText(s)); + } + } + + void TelematicUnit::registerUnitRequestor() + { + // Reset registration status + bool isRegistered = false; + int attempts_count = 0; + + while (!isRegistered && attempts_count < REGISTRATION_MAX_ATTEMPTS) + { + attempts_count++; + PLOG(logDEBUG2) << "Inside register unit requestor"; + natsMsg *reply = nullptr; + string payload = "{\"unit_id\":\"" + _unit.unitId + "\"}"; + auto s = natsConnection_RequestString(&reply, _conn, REGISTER_UNIT_TOPIC, payload.c_str(), TIME_OUT); + if (s == NATS_OK) + { + auto replyStr = natsMsg_GetData(reply); + PLOG(logINFO) << "Received registered reply: " << replyStr; + + // Unit is registered when server responds with event information (location, testing_type, event_name) + isRegistered = validateRegisterStatus(replyStr); + natsMsg_Destroy(reply); + } + else + { + throw TelematicBridgeException(natsStatus_GetText(s)); + } + sleep(1); + } + + if (isRegistered) + { + // Provide below services when the unit is registered + availableTopicsReplier(); + selectedTopicsReplier(); + checkStatusReplier(); + } + } + + bool TelematicUnit::validateRegisterStatus(const string ®isterReply) + { + auto root = parseJson(registerReply); + if (root.isMember(LOCATION_KEY) && root.isMember(TESTING_TYPE_KEY) && root.isMember(EVENT_NAME_KEY)) + { + _eventLocation = root[LOCATION_KEY].asString(); + _testingType = root[TESTING_TYPE_KEY].asString(); + _eventName = root[EVENT_NAME_KEY].asString(); + return true; + } + PLOG(logERROR) << "Failed to register unit as event information (locatoin, testing type and event name) does not exist."; + return false; + } + + void TelematicUnit::availableTopicsReplier() + { + if (!_subAvailableTopic) + { + PLOG(logDEBUG2) << "Inside available topic replier"; + stringstream topic; + topic << _unit.unitId << AVAILABLE_TOPICS; + natsConnection_Subscribe(&_subAvailableTopic, _conn, topic.str().c_str(), onAvailableTopicsCallback, this); + } + } + + void TelematicUnit::selectedTopicsReplier() + { + if (!_subSelectedTopic) + { + PLOG(logDEBUG2) << "Inside selected topic replier"; + stringstream topic; + topic << _unit.unitId << PUBLISH_TOPICS; + natsConnection_Subscribe(&_subSelectedTopic, _conn, topic.str().c_str(), onSelectedTopicsCallback, this); + } + } + + void TelematicUnit::checkStatusReplier() + { + if (!_subCheckStatus) + { + PLOG(logDEBUG2) << "Inside check status replier"; + stringstream topic; + topic << _unit.unitId << CHECK_STATUS; + natsConnection_Subscribe(&_subCheckStatus, _conn, topic.str().c_str(), onCheckStatusCallback, this); + } + } + + void TelematicUnit::publishMessage(const string &topic, const Json::Value &payload) + { + auto pubMsgTopic = "streets." + _unit.unitId + ".data." + topic; + auto jsonStr = constructPublishedDataString(_unit, _eventLocation, _testingType, _eventName, topic, payload); + auto s = natsConnection_PublishString(_conn, pubMsgTopic.c_str(), jsonStr.c_str()); + if (s == NATS_OK) + { + PLOG(logINFO) << "Topic: " << pubMsgTopic << ". Published: " << jsonStr; + } + else + { + throw TelematicBridgeException(natsStatus_GetText(s)); + } + } + + void TelematicUnit::onAvailableTopicsCallback(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *object) + { + PLOG(logDEBUG3) << "Received available topics: " << natsMsg_GetSubject(msg) << " " << natsMsg_GetData(msg); + // Sends a reply + if (object && natsMsg_GetReply(msg) != nullptr) + { + const auto obj = (TelematicUnit *)object; + auto reply = constructAvailableTopicsReplyString(obj->getUnit(), obj->getEventLocation(), obj->getTestingType(), obj->getEventName(), obj->getAvailableTopics(), obj->getExcludedTopics()); + auto s = natsConnection_PublishString(nc, natsMsg_GetReply(msg), reply.c_str()); + natsMsg_Destroy(msg); + if (s == NATS_OK) + { + PLOG(logDEBUG3) << "Available topics replied: " << reply; + } + } + } + + void TelematicUnit::onSelectedTopicsCallback(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *object) + { + PLOG(logDEBUG3) << "Received selected topics: " << natsMsg_GetSubject(msg) << " " << natsMsg_GetData(msg); + // Sends a reply + if (natsMsg_GetReply(msg) != nullptr) + { + auto msgStr = natsMsg_GetData(msg); + auto root = parseJson(msgStr); + if (object && root.isMember(TOPICS_KEY) && root[TOPICS_KEY].isArray()) + { + auto obj = (TelematicUnit *)object; + // clear old selected topics + obj->clearSelectedTopics(); + + // update selected topics with selected topics from latest request + for (auto itr = root[TOPICS_KEY].begin(); itr != root[TOPICS_KEY].end(); itr++) + { + obj->addSelectedTopic(itr->asString()); + } + } + string reply = "request received!"; + auto s = natsConnection_PublishString(nc, natsMsg_GetReply(msg), reply.c_str()); + natsMsg_Destroy(msg); + if (s == NATS_OK) + { + PLOG(logDEBUG3) << "Selected topics replied: " << reply; + } + } + } + + void TelematicUnit::onCheckStatusCallback(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *object) + { + if (natsMsg_GetReply(msg) != nullptr) + { + auto s = natsConnection_PublishString(nc, natsMsg_GetReply(msg), "OK"); + if (s == NATS_OK) + { + PLOG(logDEBUG3) << "Received check status msg: " << natsMsg_GetSubject(msg) << " " << natsMsg_GetData(msg) << ". Replied: OK"; + } + natsMsg_Destroy(msg); + } + } + + string TelematicUnit::constructPublishedDataString(const unit_st &unit, const string &eventLocation, const string &testingType, const string &eventName, const string &topicName, const Json::Value &payload) const + { + Json::Value message; + message[UNIT_ID_KEY] = unit.unitId; + message[UNIT_NAME_KEY] = unit.unitName; + message[UNIT_TYPE_KEY] = unit.unitType; + message[LOCATION_KEY] = eventLocation; + message[TESTING_TYPE_KEY] = testingType; + message[EVENT_NAME_KEY] = eventName; + message[TOPIC_NAME_KEY] = topicName; + message[TIMESTAMP_KEY] = payload.isMember("timestamp") ? payload["timestamp"].asUInt64() * MILLI_TO_MICRO : duration_cast(system_clock::now().time_since_epoch()).count(); + message[PAYLOAD_KEY] = payload; + Json::FastWriter fasterWirter; + string jsonStr = fasterWirter.write(message); + return jsonStr; + } + + Json::Value TelematicUnit::parseJson(const string &jsonStr) + { + Json::Value root; + Json::Reader reader; + bool parsingSuccessful = reader.parse(jsonStr, root); + if (!parsingSuccessful) + { + throw TelematicBridgeException("Error parsing the reply message"); + } + return root; + } + + string TelematicUnit::constructAvailableTopicsReplyString(const unit_st &unit, const string &eventLocation, const string &testingType, const string &eventName, const vector &availableTopicList, const string &excludedTopics) + { + Json::Value message; + message[UNIT_ID_KEY] = unit.unitId; + message[UNIT_NAME_KEY] = unit.unitName; + message[UNIT_TYPE_KEY] = unit.unitType; + message[LOCATION_KEY] = eventLocation; + message[TESTING_TYPE_KEY] = testingType; + message[EVENT_NAME_KEY] = eventName; + message[TIMESTAMP_KEY] = duration_cast(system_clock::now().time_since_epoch()).count(); + Json::Value topics; + for (const auto &topic : availableTopicList) + { + if (!boost::icontains(excludedTopics, topic)) + { + Json::Value topicJson; + topicJson[NAME_KEY] = topic; + topics.append(topicJson); + } + } + message[TOPICS_KEY] = topics; + Json::FastWriter fasterWirter; + string reply = fasterWirter.write(message); + return reply; + } + + void TelematicUnit::updateAvailableTopics(const string &newTopic) + { + if (find(_availableTopics.begin(), _availableTopics.end(), newTopic) == _availableTopics.end()) + { + lock_guard lock(_availableTopicsMutex); + _availableTopics.push_back(newTopic); + PLOG(logINFO) << "Add topic (= " << newTopic << ") to available topics list. Size: " << _availableTopics.size(); + } + } + + void TelematicUnit::updateExcludedTopics(const string &excludedTopics) + { + lock_guard lock(_excludedTopicsMutex); + _excludedTopics = excludedTopics; + } + + bool TelematicUnit::inSelectedTopics(const string &topic) + { + if (find(_selectedTopics.begin(), _selectedTopics.end(), topic) == _selectedTopics.end()) + { + return false; + } + return true; + } + + void TelematicUnit::setUnit(const unit_st &unit) + { + lock_guard lock(_unitMutex); + _unit = unit; + } + + unit_st TelematicUnit::getUnit() const + { + return _unit; + } + + string TelematicUnit::getEventName() const + { + return _eventName; + } + + string TelematicUnit::getEventLocation() const + { + return _eventLocation; + } + + string TelematicUnit::getTestingType() const + { + return _testingType; + } + + vector TelematicUnit::getAvailableTopics() const + { + return _availableTopics; + } + + string TelematicUnit::getExcludedTopics() const + { + return _excludedTopics; + } + + void TelematicUnit::addSelectedTopic(const string &newSelectedTopic) + { + _selectedTopics.push_back(newSelectedTopic); + } + + void TelematicUnit::clearSelectedTopics() + { + _selectedTopics.clear(); + } + + TelematicUnit::~TelematicUnit() + { + natsSubscription_Destroy(_subAvailableTopic); + natsSubscription_Destroy(_subSelectedTopic); + natsSubscription_Destroy(_subCheckStatus); + natsConnection_Destroy(_conn); + _conn = nullptr; + _subAvailableTopic = nullptr; + _subSelectedTopic = nullptr; + _subCheckStatus = nullptr; + } +} \ No newline at end of file diff --git a/src/v2i-hub/TelematicBridgePlugin/src/TelematicUnit.h b/src/v2i-hub/TelematicBridgePlugin/src/TelematicUnit.h new file mode 100644 index 000000000..3a72f080b --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/src/TelematicUnit.h @@ -0,0 +1,216 @@ +#pragma once +#include +#include +#include +#include +#include +#include "PluginLog.h" +#include "ThreadTimer.h" +#include "TelematicBridgeException.h" +#include +#include + + +namespace TelematicBridge +{ + using unit_st = struct unit + { + std::string unitId; // Unique identifier for each unit + std::string unitName; // Descriptive name for each unit + std::string unitType; // Unit categorized base on unit type: platform or infrastructure + }; + + class TelematicUnit + { + private: + std::mutex _unitMutex; + std::mutex _availableTopicsMutex; + std::mutex _excludedTopicsMutex; + unit_st _unit; // Global variable to store the unit information + std::vector _availableTopics; // Global variable to store available topics + std::string _excludedTopics; // Global variable to store topics that are excluded by the users + std::vector _selectedTopics; // Global variable to store selected topics confirmed by users + static CONSTEXPR const char *AVAILABLE_TOPICS = ".available_topics"; // NATS subject to pub/sub available topics + static CONSTEXPR const char *REGISTER_UNIT_TOPIC = "*.register_unit"; // NATS subject to pub/sub registering unit + static CONSTEXPR const char *PUBLISH_TOPICS = ".publish_topics"; // NATS subject to publish data stream + static CONSTEXPR const char *CHECK_STATUS = ".check_status"; // NATS subject to pub/sub checking unit status + natsConnection *_conn = nullptr; // Global NATS connection object + natsSubscription *_subAvailableTopic = nullptr; // Global NATS subscription object + natsSubscription *_subSelectedTopic = nullptr; // Global NATS subscription object + natsSubscription *_subCheckStatus = nullptr; // Global NATS subscription object + int64_t TIME_OUT = 10000; // NATS Connection time out in milliseconds + std::string _eventName; // Testing event the unit is assigned to + std::string _eventLocation; // Testing event location + std::string _testingType; // Testing type + static CONSTEXPR const char *LOCATION_KEY = "location"; // location key used to find location value from JSON + static CONSTEXPR const char *TESTING_TYPE_KEY = "testing_type"; // testing_type key used to find testing_type value from JSON + static CONSTEXPR const char *EVENT_NAME_KEY = "event_name"; // event_name key used to find event_name value from JSON + static CONSTEXPR const char *UNIT_ID_KEY = "unit_id"; // unit_id key used to find unit_id value from JSON + static CONSTEXPR const char *UNIT_NAME_KEY = "unit_name"; // unit_name key used to find unit_name value from JSON + static CONSTEXPR const char *UNIT_TYPE_KEY = "unit_type"; // unit_type key used to find unit_type value from JSON + static CONSTEXPR const char *TOPIC_NAME_KEY = "topic_name"; // topic_name key used to find topic_name value from JSON + static CONSTEXPR const char *TIMESTAMP_KEY = "timestamp"; // timestamp key used to find timestamp value from JSON + static CONSTEXPR const char *PAYLOAD_KEY = "payload"; // payload key used to find payload value from JSON + static CONSTEXPR const char *TOPICS_KEY = "topics"; // topics key used to find topics value from JSON + static CONSTEXPR const char *NAME_KEY = "name"; // topics key used to find topics value from JSON + static const int MILLI_TO_MICRO = 1000; + static const int REGISTRATION_MAX_ATTEMPTS = 30; //The maximum numbers of attempts allowed to register this unit with server + + public: + /** + *@brief Construct telematic unit + */ + explicit TelematicUnit() = default; + /** + * @brief A function for telematic unit to connect to NATS server. Throw exception is connection failed. * + * @param string string NATS server URL + */ + void connect(const std::string &natsURL); + + /** + * @brief A NATS requestor for telematic unit to send register request to NATS server. + * If receives a response, it will update the isRegistered flag to indicate the unit is registered. + * If no response after the specified time out (unit of second) period, it considered register failed. + * */ + void registerUnitRequestor(); + + /** + * @brief A NATS replier to subscribe to NATS server and receive available topics request. + * Publish list of available topics after receiving/processing the request. + */ + void availableTopicsReplier(); + + /** + * @brief A NATS replier to subscribe to NATS server and receive requested topics from telematic server. + * Process the request and update the selectedTopics global variable. + * Respond the telematic server with acknowledgement. + */ + void selectedTopicsReplier(); + + /** + * @brief A NATS replier to subscribe to NATS server and receive request for status check from telematic server. + * Publish unit status upon receiving a request. + */ + void checkStatusReplier(); + + /** + * @brief A function to publish message stream into NATS server + */ + void publishMessage(const std::string &topic, const Json::Value &payload); + + /** + * @brief A function to parse a JSON string and create a JSON object. + * @param string input json string + * @return Json::Value + */ + static Json::Value parseJson(const std::string &jsonStr); + + /** + * @brief construct available topic response + * @param unit_st struct that contains unit related information + * @param vector of available topics + * @param string Excluded topics separated by commas + */ + static std::string constructAvailableTopicsReplyString(const unit_st &unit, const std::string &eventLocation, const std::string &testingType, const std::string &eventName, const std::vector &availableTopicList, const std::string &excludedTopics); + + /** + * @brief A function to update available topics global variables when discovering a new topic. + */ + void updateAvailableTopics(const std::string &newTopic); + + /** + * @brief Update telematic unit registration status when receiving registration reply from NATS server + * @param string Register reply in Json format + * @return True when reply with event information (location, testing type, event name), otherwise false. + */ + bool validateRegisterStatus(const std::string& registerReply); + + /** + * @brief construct Json data string that will be streamed into the cloud by a publisher + * @param unit_st struct that contains unit related information + * @param string Event location + * @param string Testing type + * @param string Event name + * @param string Topic name is a combination of type_subtype_source from TMX IVPMessage + * @param Json::Value Payload is the actual data generated by V2xHub plugin + */ + std::string constructPublishedDataString(const unit_st &unit, const std::string &eventLocation, const std::string &testingType, const std::string &eventName, const std::string &topicName, const Json::Value &payload) const; + + /** + * @brief A function to update global unit variable + * @param unit_st object that has the unit id, type and name information + */ + void setUnit(const unit_st &unit); + + /** + * @brief Return unit structure + */ + unit_st getUnit() const; + + /** + * @brief Return list of available topics + */ + std::vector getAvailableTopics() const; + + /** + * @brief Return excluded topics string. + */ + std::string getExcludedTopics() const; + + /** + * @brief Return event name + */ + std::string getEventName() const; + + /** + * @brief Return event location + */ + std::string getEventLocation() const; + + /** + * @brief Return testing type + */ + std::string getTestingType() const; + + /** + * @brief Add new selected topic into the selected topics list + */ + void addSelectedTopic(const std::string &newSelectedTopic); + + /** + * @brief Clear selected topics list + */ + void clearSelectedTopics(); + + /** + * @brief A function to update excluded topics. + * @param string Excluded topics separated by commas + */ + void updateExcludedTopics(const std::string &excludedTopics); + + /** + * @brief Check if the given topic is inside the selectedTopics list + * @param string A topic to check for existence + * @return boolean indicator whether the input topic eixst. + */ + bool inSelectedTopics(const std::string &topic); + + /** + * @brief A callback function for available topic replier + */ + static void onAvailableTopicsCallback(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *object); + + /** + * @brief A callback function for selected topic replier + */ + static void onSelectedTopicsCallback(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *object); + + /** + * @brief A callback function for check status replier + */ + static void onCheckStatusCallback(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *object); + + ~TelematicUnit(); + }; + +} // namespace TelematicBridge diff --git a/src/v2i-hub/TelematicBridgePlugin/test/test_TelematicUnit.cpp b/src/v2i-hub/TelematicBridgePlugin/test/test_TelematicUnit.cpp new file mode 100644 index 000000000..9a81427b4 --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/test/test_TelematicUnit.cpp @@ -0,0 +1,156 @@ +#include +#include "TelematicUnit.h" +using namespace std; +using namespace tmx::utils; +using namespace std::chrono; + +namespace TelematicBridge +{ + class test_TelematicUnit : public ::testing::Test + { + public: + shared_ptr _telematicUnitPtr = make_shared(); + unit_st unit = + { + "test_id", + "test_name", + "infrastructure"}; + }; + + TEST_F(test_TelematicUnit, setUnit) + { + ASSERT_NO_THROW(_telematicUnitPtr->setUnit(unit)); + ASSERT_EQ(unit.unitId, _telematicUnitPtr->getUnit().unitId); + } + + TEST_F(test_TelematicUnit, updateExcludedTopics) + { + ASSERT_NO_THROW(_telematicUnitPtr->updateExcludedTopics("test_topic")); + } + + TEST_F(test_TelematicUnit, updateAvailableTopics) + { + ASSERT_NO_THROW(_telematicUnitPtr->updateAvailableTopics("test_topic")); + } + + TEST_F(test_TelematicUnit, constructAvailableTopicsReplyString) + { + vector topics = {"test_topic", "excluded_topic"}; + string excluded_topic = "excluded_topic"; + string eventLocation = "location"; + string testingType = "unit_test"; + string eventName = "testing"; + auto reply = TelematicUnit::constructAvailableTopicsReplyString(unit, eventLocation, testingType, eventName, topics, excluded_topic); + auto json = TelematicUnit::parseJson(reply); + ASSERT_EQ("test_topic", json["topics"][0]["name"].asString()); + } + + TEST_F(test_TelematicUnit, constructPublishedDataString) + { + string eventLocation = "location"; + string testingType = "unit_test"; + string eventName = "testing"; + string topicName = "test_topic"; + Json::Value payload; + payload["timestamp"] = 1701099016033; + + auto reply = _telematicUnitPtr->constructPublishedDataString(unit, eventLocation, testingType, eventName, topicName, payload); + auto json = TelematicUnit::parseJson(reply); + ASSERT_EQ(eventLocation, json["location"].asString()); + ASSERT_EQ(testingType, json["testing_type"].asString()); + ASSERT_EQ(eventName, json["event_name"].asString()); + ASSERT_EQ(1701099016033000, json["timestamp"].asUInt64()); + ASSERT_THROW(TelematicUnit::parseJson("Invalid Json"), TelematicBridgeException); + + Json::Value payload2; + payload2["body"] = "invalid"; + reply = _telematicUnitPtr->constructPublishedDataString(unit, eventLocation, testingType, eventName, topicName, payload2); + json = TelematicUnit::parseJson(reply); + ASSERT_NEAR(duration_cast(system_clock::now().time_since_epoch()).count(), json["timestamp"].asUInt64(), 100); + } + + TEST_F(test_TelematicUnit, onCheckStatusCallback) + { + natsMsg *msg; + string data = "{\"data\":\"test\"}"; + natsMsg_Create(&msg, "test_subject", "Test_reply", data.c_str(), data.size()); + ASSERT_NO_THROW(TelematicUnit::onCheckStatusCallback(nullptr, nullptr, msg, nullptr)); + } + + TEST_F(test_TelematicUnit, onSelectedTopicsCallback) + { + natsMsg *msg; + string data = "{\"topics\":[\"test_topic\"]}"; + natsMsg_Create(&msg, "test_subject", "Test_reply", data.c_str(), data.size()); + ASSERT_NO_THROW(TelematicUnit::onSelectedTopicsCallback(nullptr, nullptr, msg, _telematicUnitPtr.get())); + ASSERT_TRUE(_telematicUnitPtr->inSelectedTopics("test_topic")); + } + + TEST_F(test_TelematicUnit, onAvailableTopicsCallback) + { + natsMsg *msg; + string data = "{\"data\":\"test\"}"; + natsMsg_Create(&msg, "test_subject", "Test_reply", data.c_str(), data.size()); + ASSERT_NO_THROW(TelematicUnit::onAvailableTopicsCallback(nullptr, nullptr, msg, _telematicUnitPtr.get())); + } + + TEST_F(test_TelematicUnit, publishMessage) + { + string topicName = "test_topic"; + Json::Value payload; + payload["body"] = "test_body"; + ASSERT_THROW(_telematicUnitPtr->publishMessage(topicName, payload), TelematicBridgeException); + } + + TEST_F(test_TelematicUnit, checkStatusReplier) + { + _telematicUnitPtr->checkStatusReplier(); + } + + TEST_F(test_TelematicUnit, selectedTopicsReplier) + { + _telematicUnitPtr->selectedTopicsReplier(); + } + + TEST_F(test_TelematicUnit, availableTopicsReplier) + { + _telematicUnitPtr->availableTopicsReplier(); + } + + TEST_F(test_TelematicUnit, registerUnitRequestor) + { + ASSERT_THROW(_telematicUnitPtr->registerUnitRequestor(), TelematicBridgeException); + } + + TEST_F(test_TelematicUnit, connect) + { + ASSERT_THROW(_telematicUnitPtr->connect("nats://127.0.0.1:4222"), TelematicBridgeException); + } + + TEST_F(test_TelematicUnit, getters) + { + ASSERT_EQ(0, _telematicUnitPtr->getAvailableTopics().size()); + ASSERT_EQ("", _telematicUnitPtr->getEventLocation()); + ASSERT_EQ("", _telematicUnitPtr->getEventName()); + ASSERT_EQ("", _telematicUnitPtr->getExcludedTopics()); + ASSERT_EQ("", _telematicUnitPtr->getTestingType()); + } + + TEST_F(test_TelematicUnit, selectedTopics) + { + string selectedTopic = "test_selected_topics"; + _telematicUnitPtr->addSelectedTopic(selectedTopic); + ASSERT_TRUE(_telematicUnitPtr->inSelectedTopics(selectedTopic)); + _telematicUnitPtr->clearSelectedTopics(); + ASSERT_FALSE(_telematicUnitPtr->inSelectedTopics(selectedTopic)); + } + TEST_F(test_TelematicUnit, validateRegisterStatus) + { + string replyStr = "{\"event_name\":\"Test\",\"location\":\"Local\",\"testing_type\":\"Integration\"}"; + _telematicUnitPtr->validateRegisterStatus(replyStr); + ASSERT_EQ("Local", _telematicUnitPtr->getEventLocation()); + ASSERT_EQ("Test", _telematicUnitPtr->getEventName()); + ASSERT_EQ("Integration", _telematicUnitPtr->getTestingType()); + } + +} \ No newline at end of file