From a0986356b972c8e4a8c466bc766d33bd4aae0671 Mon Sep 17 00:00:00 2001 From: dan-du-car <62157949+dan-du-car@users.noreply.github.com> Date: Tue, 14 Nov 2023 15:33:44 -0500 Subject: [PATCH 1/3] Rsu health monitor (#564) # PR Details ## Description RSU health monitor plugin is a V2xHub plugin that interface with its connected RSU directly via SNMP protocol. The plugin is responsible for monitoring the connected RSU status by constantly ping RSU device. For detailed design of this plugin, refer to https://usdot-carma.atlassian.net/wiki/spaces/WFD2/pages/2640740360/RSU+Health+Monitor+Plugin+Design . ## Related Issue NA ## Motivation and Context Data visualization ## How Has This Been Tested? Integration test ## Types of changes - [ ] Defect fix (non-breaking change that fixes an issue) - [x] New feature (non-breaking change that adds functionality) - [ ] Breaking change (fix or feature that cause existing functionality to change) ## Checklist: - [ ] I have added any new packages to the sonar-scanner.properties file - [ ] My change requires a change to the documentation. - [ ] I have updated the documentation accordingly. - [x] I have read the **CONTRIBUTING** document. [V2XHUB Contributing Guide](https://github.com/usdot-fhwa-OPS/V2X-Hub/blob/develop/Contributing.md) - [ ] I have added tests to cover my changes. - [ ] All new and existing tests passed. --- .sonarqube/sonar-scanner.properties | 8 +- src/tmx/Messages/include/MessageTypes.h | 1 + src/tmx/Messages/include/RSUStatusMessage.h | 25 ++ src/tmx/TmxUtils/CMakeLists.txt | 12 +- src/tmx/TmxUtils/src/RSU_MIB_4_1.h | 56 +++++ src/tmx/TmxUtils/src/SNMPClient.cpp | 227 ++++++++++++++++++ src/tmx/TmxUtils/src/SNMPClient.h | 113 +++++++++ src/tmx/TmxUtils/src/SNMPClientException.cpp | 8 + src/tmx/TmxUtils/src/SNMPClientException.h | 23 ++ src/tmx/TmxUtils/test/MockSNMPClient.h | 18 ++ src/tmx/TmxUtils/test/test_SNMPClient.cpp | 93 +++++++ .../RSUHealthMonitorPlugin/CMakeLists.txt | 26 ++ .../RSUHealthMonitorPlugin/manifest.json | 51 ++++ .../src/RSUHealthMonitorPlugin.cpp | 90 +++++++ .../src/RSUHealthMonitorPlugin.h | 46 ++++ .../src/RSUHealthMonitorWorker.cpp | 217 +++++++++++++++++ .../src/RSUHealthMonitorWorker.h | 120 +++++++++ .../RSUHealthMonitorPlugin/test/main.cpp | 8 + .../test/test_RSUHealthMonitorWorker.cpp | 108 +++++++++ 19 files changed, 1246 insertions(+), 4 deletions(-) create mode 100644 src/tmx/Messages/include/RSUStatusMessage.h create mode 100644 src/tmx/TmxUtils/src/RSU_MIB_4_1.h create mode 100644 src/tmx/TmxUtils/src/SNMPClient.cpp create mode 100644 src/tmx/TmxUtils/src/SNMPClient.h create mode 100644 src/tmx/TmxUtils/src/SNMPClientException.cpp create mode 100644 src/tmx/TmxUtils/src/SNMPClientException.h create mode 100644 src/tmx/TmxUtils/test/MockSNMPClient.h create mode 100644 src/tmx/TmxUtils/test/test_SNMPClient.cpp create mode 100755 src/v2i-hub/RSUHealthMonitorPlugin/CMakeLists.txt create mode 100755 src/v2i-hub/RSUHealthMonitorPlugin/manifest.json create mode 100755 src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorPlugin.cpp create mode 100755 src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorPlugin.h create mode 100644 src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorWorker.cpp create mode 100644 src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorWorker.h create mode 100644 src/v2i-hub/RSUHealthMonitorPlugin/test/main.cpp create mode 100644 src/v2i-hub/RSUHealthMonitorPlugin/test/test_RSUHealthMonitorWorker.cpp diff --git a/.sonarqube/sonar-scanner.properties b/.sonarqube/sonar-scanner.properties index f6b40e299..d5f118b33 100644 --- a/.sonarqube/sonar-scanner.properties +++ b/.sonarqube/sonar-scanner.properties @@ -55,7 +55,8 @@ sonar.modules= PedestrianPlugin, \ MessageReceiverPlugin, \ CARMAStreetsPlugin, \ ERVCloudForwardingPlugin, \ - CDASimAdapter + CDASimAdapter, \ + RSUHealthMonitorPlugin @@ -82,6 +83,7 @@ TimPlugin.sonar.projectBaseDir =src/v2i-hub/TimPlugin CARMAStreetsPlugin.sonar.projectBaseDir =src/v2i-hub/CARMAStreetsPlugin ERVCloudForwardingPlugin.sonar.projectBaseDir =src/v2i-hub/ERVCloudForwardingPlugin CDASimAdapter.sonar.projectBaseDir =src/v2i-hub/CDASimAdapter +RSUHealthMonitorPlugin.sonar.projectBaseDir =src/v2i-hub/RSUHealthMonitorPlugin @@ -117,7 +119,8 @@ CARMAStreetsPlugin.sonar.exclusions =test/** ERVCloudForwardingPlugin.sonar.sources =src CDASimAdapter.sonar.sources =src CDASimAdapter.sonar.exclusions =test/** - +RSUHealthMonitorPlugin.sonar.sources =src +RSUHealthMonitorPlugin.sonar.exclusions =test/** # Tests # Note: For C++ setting this field does not cause test analysis to occur. It only allows the test source code to be evaluated. @@ -145,3 +148,4 @@ SpatPlugin.sonar.tests=test CARMAStreetsPlugin.sonar.tests=test ERVCloudForwardingPlugin.sonar.tests=test CDASimAdapter.sonar.tests=test +RSUHealthMonitorPlugin.sonar.tests=test diff --git a/src/tmx/Messages/include/MessageTypes.h b/src/tmx/Messages/include/MessageTypes.h index e0fd09c70..3360788fe 100644 --- a/src/tmx/Messages/include/MessageTypes.h +++ b/src/tmx/Messages/include/MessageTypes.h @@ -92,6 +92,7 @@ static CONSTEXPR const char *MSGSUBTYPE_OUTGOING_STRING = "Outgoing"; static CONSTEXPR const char *MSGSUBTYPE_SHUTDOWN_STRING = "Shutdown"; static CONSTEXPR const char *MSGSUBTYPE_TIMESYNC_STRING = "TimeSync"; static CONSTEXPR const char *MSGSUBTYPE_SENSOR_DETECTED_OBJECT_STRING = "SensorDetectedObject"; +static CONSTEXPR const char *MSGSUBTYPE_RSU_STATUS_STRING = "RSUStatus"; } /* End namespace messages */ diff --git a/src/tmx/Messages/include/RSUStatusMessage.h b/src/tmx/Messages/include/RSUStatusMessage.h new file mode 100644 index 000000000..a5e276268 --- /dev/null +++ b/src/tmx/Messages/include/RSUStatusMessage.h @@ -0,0 +1,25 @@ +#pragma once + + +#include +#include "MessageTypes.h" + + +namespace tmx::messages { + + +class RSUStatusMessage : public tmx::message +{ + public: + RSUStatusMessage() {} + + /// Message type for routing this message through TMX core. + static constexpr const char* MessageType = MSGTYPE_APPLICATION_STRING; + + /// Message sub type for routing this message through TMX core. + static constexpr const char* MessageSubType = MSGSUBTYPE_RSU_STATUS_STRING; + }; + +} /* namespace tmx::messages */ + + diff --git a/src/tmx/TmxUtils/CMakeLists.txt b/src/tmx/TmxUtils/CMakeLists.txt index 30c30fb41..ee3c12c5f 100644 --- a/src/tmx/TmxUtils/CMakeLists.txt +++ b/src/tmx/TmxUtils/CMakeLists.txt @@ -1,7 +1,9 @@ PROJECT ( tmxutils CXX ) FILE (GLOB_RECURSE HEADERS "src/" "*.h*") FILE (GLOB_RECURSE SOURCES "src/" "*.c*") - +find_library(NETSNMPAGENT "netsnmpagent") +find_library(NETSNMPMIBS "netsnmpmibs") +find_library(NETSNMP "netsnmp") FIND_PACKAGE (carma-clock) FIND_LIBRARY (UUID_LIBRARY uuid) @@ -16,12 +18,18 @@ TARGET_INCLUDE_DIRECTORIES (${PROJECT_NAME} SYSTEM PUBLIC $ $ ${MYSQL_INCLUDE_DIR} + ${NETSNMP_INCLUDE_DIRS} ${MYSQLCPPCONN_INCLUDE_DIR}) TARGET_LINK_LIBRARIES (${PROJECT_NAME} PUBLIC ${TMXAPI_LIBRARIES} ${MYSQL_LIBRARIES} ${MYSQLCPPCONN_LIBRARIES} ${UUID_LIBRARY} + ${UUID_LIBRARY} + ${NETSNMPAGENT} + ${NETSNMPMIBS} + ${NETSNMP} + ${NETSNMP_LIBRARIES} rdkafka++ ::carma-clock gmock @@ -54,4 +62,4 @@ add_executable(${BINARY} ${TEST_SOURCES}) add_test(NAME ${BINARY} COMMAND ${BINARY}) -target_link_libraries(${BINARY} PUBLIC ${PROJECT_NAME} rdkafka++ gmock ${TMXAPI_LIBRARIES} ${ASN_J2735_LIBRARIES} ${UUID_LIBRARY} gtest) \ No newline at end of file +target_link_libraries(${BINARY} PUBLIC ${PROJECT_NAME} rdkafka++ gmock ${TMXAPI_LIBRARIES} ${ASN_J2735_LIBRARIES} ${UUID_LIBRARY} ${NETSNMPAGENT} ${NETSNMPMIBS} ${NETSNMP} ${NETSNMP_LIBRARIES} gtest) \ No newline at end of file diff --git a/src/tmx/TmxUtils/src/RSU_MIB_4_1.h b/src/tmx/TmxUtils/src/RSU_MIB_4_1.h new file mode 100644 index 000000000..793bf4bea --- /dev/null +++ b/src/tmx/TmxUtils/src/RSU_MIB_4_1.h @@ -0,0 +1,56 @@ +#pragma once +namespace tmx::utils::rsu41::mib::oid +{ + /** + * @brief This header file contains a subset of RSU MIB definition from https://github.com/certificationoperatingcouncil/COC_TestSpecs/blob/master/AppNotes/RSU/RSU-MIB.txt + */ + // Contains the ID given to this RSU. + static constexpr const char *RSU_ID_OID = "1.0.15628.4.1.17.4.0"; + + // Contains the version of this MIB supported by this RSU, e.g. rsuMIB 4.1 rev201812060000Z + static constexpr const char *RSU_MIB_VERSION = "1.0.15628.4.1.17.1.0"; + + // Contains the version of firmware running on this RSU. + static constexpr const char *RSU_FIRMWARE_VERSION = "1.0.15628.4.1.17.2.0"; + + // Contains the name of the manufacturer of this RSU. + static constexpr const char *RSU_MANUFACTURER = "1.0.15628.4.1.17.5.0"; + + // Contains GPS NMEA GPGGA output string. + static constexpr const char *RSU_GPS_OUTPUT_STRING = "1.0.15628.4.1.8.5.0"; + + // Immediate Forward Message Index + static constexpr const char *RSU_IFM_INDEX = "1.0.15628.4.1.5.1.1.0"; + + // Immediate Forward Message PSID. + static constexpr const char *RSU_IFM_PSID = "1.0.15628.4.1.5.1.2.0"; + + // Immediate Forward Message DSRC Message ID + static constexpr const char *RSU_IFM_DSRC_MSG_ID = "1.0.15628.4.1.5.1.3.0"; + + // Immediate Forward Message Transmit Mode + static constexpr const char *RSU_IFM_TX_MODE = "1.0.15628.4.1.5.1.4.0"; + + // DSRC channel set for Immediate Forward Message transmit + static constexpr const char *RSU_IFM_TX_CHANNEL = "1.0.15628.4.1.5.1.5.0"; + + // Set this bit to enable transmission of the message 0=off, 1=on + static constexpr const char *RSU_IFM_ENABLE = "1.0.15628.4.1.5.1.6.0"; + + // Create (4) or Destroy (6) row entry + static constexpr const char *RSU_IFM_STATUS = "1.0.15628.4.1.5.1.7.0"; + + // Specifies the current mode of operation of the RSU and provides capability to transition the device into a new mode, e.g. from the current mode to off, etc + static constexpr const char *RSU_MODE = "1.0.15628.4.1.99.0"; + + /* + SYNTAX INTEGER { + bothOp (0), --both Continuous and Alternating modes are operational + altOp (1), --Alternating mode is operational, + --Continuous mode is not operational + contOp (2), --Continuous mode is operational, + --Alternating mode is not operational + noneOp (3) --neither Continuous nor Alternating mode is operational + */ + static constexpr const char *RSU_CHAN_STATUS = "1.0.15628.4.1.19.1.0"; +} \ No newline at end of file diff --git a/src/tmx/TmxUtils/src/SNMPClient.cpp b/src/tmx/TmxUtils/src/SNMPClient.cpp new file mode 100644 index 000000000..e3398ebbd --- /dev/null +++ b/src/tmx/TmxUtils/src/SNMPClient.cpp @@ -0,0 +1,227 @@ +#include "SNMPClient.h" + +namespace tmx::utils +{ + + // Client defaults to SNMPv3 + snmp_client::snmp_client(const std::string &ip, const int &port, const std::string &community, + const std::string &snmp_user, const std::string &securityLevel, const std::string &authPassPhrase, int snmp_version, int timeout) + + : ip_(ip), port_(port), community_(community), snmp_version_(snmp_version), timeout_(timeout) + { + + PLOG(logDEBUG1) << "Starting SNMP Client. Target device IP address: " << ip_ << ", Target device SNMP port: " << port_; + + // Bring the IP address and port of the target SNMP device in the required form, which is "IPADDRESS:PORT": + std::string ip_port_string = ip_ + ":" + std::to_string(port_); + char *ip_port = &ip_port_string[0]; + + // Initialize SNMP session parameters + init_snmp("carma_snmp"); + snmp_sess_init(&session); + session.peername = ip_port; + session.version = snmp_version_; // SNMP_VERSION_3 + session.securityName = (char *)snmp_user.c_str(); + session.securityNameLen = snmp_user.length(); + + // Fallback behavior to setup a community for SNMP V1/V2 + if (snmp_version_ != SNMP_VERSION_3) + { + session.community = (unsigned char *)community.c_str(); + session.community_len = community_.length(); + } + + // SNMP authorization/privach config + if (securityLevel == "authPriv") + { + session.securityLevel = SNMP_SEC_LEVEL_AUTHPRIV; + } + + else if (securityLevel == "authNoPriv") + { + session.securityLevel = SNMP_SEC_LEVEL_AUTHNOPRIV; + } + + else + session.securityLevel = SNMP_SEC_LEVEL_NOAUTH; + + // Passphrase used for both authentication and privacy + auto phrase_len = authPassPhrase.length(); + auto phrase = (u_char *)authPassPhrase.c_str(); + + // Defining and generating auth config with SHA1 + session.securityAuthProto = snmp_duplicate_objid(usmHMACSHA1AuthProtocol, USM_AUTH_PROTO_SHA_LEN); + session.securityAuthProtoLen = USM_AUTH_PROTO_SHA_LEN; + session.securityAuthKeyLen = USM_AUTH_KU_LEN; + if (session.securityLevel != SNMP_SEC_LEVEL_NOAUTH && generate_Ku(session.securityAuthProto, + session.securityAuthProtoLen, + phrase, phrase_len, + session.securityAuthKey, + &session.securityAuthKeyLen) != SNMPERR_SUCCESS) + { + std::string errMsg = "Error generating Ku from authentication pass phrase. \n"; + throw snmp_client_exception(errMsg); + } + + // Defining and generating priv config with AES (since using SHA1) + session.securityPrivKeyLen = USM_PRIV_KU_LEN; + session.securityPrivProto = + snmp_duplicate_objid(usmAESPrivProtocol, + OID_LENGTH(usmAESPrivProtocol)); + session.securityPrivProtoLen = OID_LENGTH(usmAESPrivProtocol); + + if (session.securityLevel == SNMP_SEC_LEVEL_AUTHPRIV && generate_Ku(session.securityAuthProto, + session.securityAuthProtoLen, + phrase, phrase_len, + session.securityPrivKey, + &session.securityPrivKeyLen) != SNMPERR_SUCCESS) + { + std::string errMsg = "Error generating Ku from privacy pass phrase. \n"; + throw snmp_client_exception(errMsg); + } + + session.timeout = timeout_; + + // Opens the snmp session if it exists + ss = snmp_open(&session); + + if (ss == nullptr) + { + PLOG(logERROR) << "Failed to establish session with target device"; + snmp_sess_perror("snmpget", &session); + throw snmp_client_exception("Failed to establish session with target device"); + } + else + { + PLOG(logINFO) << "Established session with device at " << ip_; + } + } + + snmp_client::~snmp_client() + { + PLOG(logINFO) << "Closing SNMP session"; + snmp_close(ss); + } + + // Original implementation used in Carma Streets https://github.com/usdot-fhwa-stol/snmp-client + bool snmp_client::process_snmp_request(const std::string &input_oid, const request_type &request_type, snmp_response_obj &val) + { + + /*Structure to hold response from the remote host*/ + snmp_pdu *response; + + // Create pdu for the data + if (request_type == request_type::GET) + { + PLOG(logDEBUG1) << "Attempting to GET value for: " << input_oid; + pdu = snmp_pdu_create(SNMP_MSG_GET); + } + else if (request_type == request_type::SET) + { + PLOG(logDEBUG1) << "Attempting to SET value for " << input_oid << " to " << val.val_int; + pdu = snmp_pdu_create(SNMP_MSG_SET); + } + else + { + PLOG(logERROR) << "Invalid request type, method accpets only GET and SET"; + return false; + } + + // Read input OID into an OID variable: + // net-snmp has several methods for creating an OID object + // their documentation suggests using get_node. read_objid seems like a simpler approach + // TO DO: investigate update to get_node + if (!read_objid(input_oid.c_str(), OID, &OID_len)) + { + // If oid cannot be created + PLOG(logERROR) << "OID could not be created from input: " << input_oid; + return false; + } + else + { + + if (request_type == request_type::GET) + { + // Add OID to pdu for get request + snmp_add_null_var(pdu, OID, OID_len); + } + else if (request_type == request_type::SET) + { + if (val.type == snmp_response_obj::response_type::INTEGER) + { + snmp_add_var(pdu, OID, OID_len, 'i', (std::to_string(val.val_int)).c_str()); + } + // Needs to be finalized to support octet string use + else if (val.type == snmp_response_obj::response_type::STRING) + { + PLOG(logERROR) << "Setting string value is currently not supported"; + } + } + + PLOG(logINFO) << "Created OID for input: " << input_oid; + } + // Send the request + int status = snmp_synch_response(ss, pdu, &response); + PLOG(logINFO) << "Response request status: " << status << " (=" << (status == STAT_SUCCESS ? "SUCCESS" : "FAILED") << ")"; + + // Check GET response + if (status == STAT_SUCCESS && response && response->errstat == SNMP_ERR_NOERROR && request_type == request_type::GET) + { + for (auto vars = response->variables; vars; vars = vars->next_variable) + { + // Get value of variable depending on ASN.1 type + // Variable could be a integer, string, bitstring, ojbid, counter : defined here https://github.com/net-snmp/net-snmp/blob/master/include/net-snmp/types.h + // get Integer value + if (vars->type == ASN_INTEGER && vars->val.integer) + { + val.type = snmp_response_obj::response_type::INTEGER; + val.val_int = *vars->val.integer; + } + else if (vars->type == ASN_OCTET_STR && vars->val.string) + { + size_t str_len = vars->val_len; + for (size_t i = 0; i < str_len; ++i) + { + val.val_string.push_back(vars->val.string[i]); + } + val.type = snmp_response_obj::response_type::STRING; + } + } + } + else + { + log_error(status, request_type, response); + return false; + } + + if (response) + { + snmp_free_pdu(response); + OID_len = MAX_OID_LEN; + } + + return true; + } + + int snmp_client::get_port() const + { + return port_; + } + + void snmp_client::log_error(const int &status, const request_type &request_type, const snmp_pdu *response) const + { + + if (status == STAT_SUCCESS) + { + PLOG(logERROR) << "Variable type: " << response->variables->type << ". Error in packet " << static_cast(snmp_errstring(static_cast(response->errstat))); + } + else if (status == STAT_TIMEOUT) + { + PLOG(logERROR) << "Timeout, no response from server"; + } + else + { + PLOG(logERROR) << "Unknown SNMP Error for " << (request_type == request_type::GET ? "GET" : "SET"); + } + } +} // namespace \ No newline at end of file diff --git a/src/tmx/TmxUtils/src/SNMPClient.h b/src/tmx/TmxUtils/src/SNMPClient.h new file mode 100644 index 000000000..75eaa9360 --- /dev/null +++ b/src/tmx/TmxUtils/src/SNMPClient.h @@ -0,0 +1,113 @@ +#pragma once + +#include +#include +#include +#include "PluginLog.h" + +#include "SNMPClientException.h" + +namespace tmx::utils +{ + + enum class request_type + { + GET, + SET, + OTHER // Processing this request type is not a defined behavior, included for testing only + }; + + /** @brief A struct to hold the value being sent to the TSC, can be integer or string. Type needs to be defined*/ + struct snmp_response_obj + { + /** @brief The type of value being requested or set, on the TSC */ + enum class response_type + { + INTEGER, + STRING + }; + + // snmp response values can be any asn.1 supported types. + // Integer and string values can be processed here + int64_t val_int = 0; + std::vector val_string; + response_type type; + + inline bool operator==(const snmp_response_obj &obj2) const + { + return val_int == obj2.val_int && val_string == obj2.val_string && type == obj2.type; + } + }; + + class snmp_client + { + private: + /*variables to store an snmp session*/ + // struct that holds information about who we're going to be talking to + // We need to declare 2 of these, one to fill info with and second which is + // a pointer returned by the library + snmp_session session; + snmp_session *ss; + + /*Structure to hold all of the information that we're going to send to the remote host*/ + snmp_pdu *pdu; + + /*OID is going to hold the location of the information which we want to receive. It will need a size as well*/ + oid OID[MAX_OID_LEN]; + size_t OID_len = MAX_OID_LEN; + + // Values from config + /*Target device IP address*/ + std::string ip_; + /*Target device NTCIP port*/ + int port_ = 0; + /*Target community for establishing snmp communication*/ + std::string community_ = "public"; + /* net-snmp version definition: SNMP_VERSION_1:0 SNMP_VERSION_2c:1 SNMP_VERSION_2u:2 SNMP_VERSION_3:3 + https://github.com/net-snmp/net-snmp/blob/master/include/net-snmp/library/snmp.h */ + int snmp_version_ = 3; // default to 3 since previous versions not compatable currently + /*Time after which the the snmp request times out*/ + int timeout_ = 10000; + + public: + /** @brief Constructor for Traffic Signal Controller Service client. + * Uses the arguments provided to establish an snmp connection + * @param ip The ip ,as a string, for the tsc_client_service to establish an snmp communication with. + * @param port Target port as integer on the host for snmp communication. + * @param community The community id as a string. Defaults to "public" if unassigned. + * @param snmp_version The snmp_version as defined in net-snmp.Default to 0 if unassigned. + * net-snmp version definition: SNMP_VERSION_1:0 SNMP_VERSION_2c:1 SNMP_VERSION_2u:2 SNMP_VERSION_3:3" + * @param timeout The time in microseconds after which an snmp session request expires. Defaults to 100 if unassigned + * **/ + snmp_client(const std::string &ip, const int &port, const std::string &community, const std::string &snmp_user, const std::string &securityLevel, const std::string &authPassPhrase, int snmp_version = 0, int timeout = 100); + + /* Disable default copy constructor*/ + snmp_client(snmp_client &sc) = delete; + + /* Disable default move constructor*/ + snmp_client(snmp_client &&sc) = delete; + + /** @brief Returns true or false depending on whether the request could be processed for given input OID at the Traffic Signal Controller. + * @param input_oid The OID to request information for. + * @param request_type The request type for which the error is being logged. Accepted values are "GET" and "SET" only. + * @param value_int The integer value for the object returned by reference. For "SET" it is the value to be set. + * For "GET", it is the value returned for the returned object by reference. + * This is an optional argument, if not provided, defaults to 0. + * @param value_str String value for the object, returned by reference. Optional argument, if not provided the value is set as an empty string + * @return Integer value at the oid, returns false if value cannot be set/requested or oid doesn't have an integer value to return.*/ + + virtual bool process_snmp_request(const std::string &input_oid, const request_type &request_type, snmp_response_obj &val); + /** @brief Finds error type from status and logs an error. + * @param status The integer value corresponding to net-snmp defined errors. macros considered are STAT_SUCCESS(0) and STAT_TIMEOUT(2) + * @param request_type The request type for which the error is being logged (GET/SET). + * @param response The snmp_pdu struct */ + + virtual int get_port() const; // Returns the current port (should always be 161 or 162) + + void log_error(const int &status, const request_type &request_type, const snmp_pdu *response) const; + + /** @brief Destructor for client. Closes the snmp session**/ + virtual ~snmp_client(); + }; + +} // namespace \ No newline at end of file diff --git a/src/tmx/TmxUtils/src/SNMPClientException.cpp b/src/tmx/TmxUtils/src/SNMPClientException.cpp new file mode 100644 index 000000000..67f82fabe --- /dev/null +++ b/src/tmx/TmxUtils/src/SNMPClientException.cpp @@ -0,0 +1,8 @@ +#include "SNMPClientException.h" + +namespace tmx::utils { + + snmp_client_exception::snmp_client_exception(const std::string &msg): std::runtime_error(msg){}; + + snmp_client_exception::~snmp_client_exception() = default; +} \ No newline at end of file diff --git a/src/tmx/TmxUtils/src/SNMPClientException.h b/src/tmx/TmxUtils/src/SNMPClientException.h new file mode 100644 index 000000000..76020a5b1 --- /dev/null +++ b/src/tmx/TmxUtils/src/SNMPClientException.h @@ -0,0 +1,23 @@ +#pragma once + +#include + +namespace tmx::utils { + /** + * @brief Runtime error related to SNMP client used to communicate with Traffic Signal Controller (NTCIP). + * + * @author Paul Bourelly + */ + class snmp_client_exception : public std::runtime_error{ + public: + /** + * @brief Destructor. + */ + ~snmp_client_exception() override; + /** + * @brief Constructor. + * @param msg String exception message. + */ + explicit snmp_client_exception(const std::string &msg ); + }; +} \ No newline at end of file diff --git a/src/tmx/TmxUtils/test/MockSNMPClient.h b/src/tmx/TmxUtils/test/MockSNMPClient.h new file mode 100644 index 000000000..7e07fe47f --- /dev/null +++ b/src/tmx/TmxUtils/test/MockSNMPClient.h @@ -0,0 +1,18 @@ +#pragma once +#include "SNMPClient.h" +#include +#include + +using namespace tmx::utils; +using namespace std; + +namespace unit_test +{ + class mock_snmp_client : public snmp_client + { + public: + mock_snmp_client(const std::string &ip, const int &port, const std::string &community, const std::string &snmp_user, const std::string &securityLevel, const std::string &authPassPhrase, int snmp_version = 0, int timeout = 100) : snmp_client(ip, port, community, snmp_user, securityLevel, authPassPhrase, snmp_version, timeout){}; + ~mock_snmp_client() = default; + MOCK_METHOD(bool, process_snmp_request, (const std::string &input_oid, const request_type &request_type, snmp_response_obj &val), (override)); + }; +} \ No newline at end of file diff --git a/src/tmx/TmxUtils/test/test_SNMPClient.cpp b/src/tmx/TmxUtils/test/test_SNMPClient.cpp new file mode 100644 index 000000000..3e775945c --- /dev/null +++ b/src/tmx/TmxUtils/test/test_SNMPClient.cpp @@ -0,0 +1,93 @@ + +#include "MockSNMPClient.h" +#include "gtest/gtest.h" +#include "RSU_MIB_4_1.h" + +using namespace tmx::utils; +using namespace std; +using namespace tmx::utils::rsu41::mib::oid; +using testing::_; +using testing::Action; +using testing::DoDefault; +using testing::Return; +using testing::SetArgReferee; +using testing::Throw; + +namespace unit_test +{ + class test_SNMPClient : public ::testing::Test + { + public: + shared_ptr scPtr; + uint16_t port = 161; + test_SNMPClient() + { + scPtr = make_shared("127.0.0.1", port, "public", "test", "authPriv", "testtesttest", SNMP_VERSION_3, 1000); + } + }; + + TEST_F(test_SNMPClient, constructor_error) + { + ASSERT_THROW(snmp_client("127.0.0.1", port, "public", "test", "authPriv", "test", SNMP_VERSION_3, 1000), snmp_client_exception); + ASSERT_NO_THROW(snmp_client("127.0.0.1", port, "public", "test", "authPriv", "testtesttest", SNMP_VERSION_3, 1000)); + ASSERT_NO_THROW(snmp_client("127.0.0.1", port, "public", "test", "authNoPriv", "testtesttest", SNMP_VERSION_3, 1000)); + ASSERT_NO_THROW(snmp_client("127.0.0.1", port, "public", "test", "authNoPriv", "testtesttest", SNMP_VERSION_1, 1000)); + ASSERT_NO_THROW(snmp_client("127.0.0.1", port, "public", "test", "", "testtesttest", SNMP_VERSION_3, 1000)); + ASSERT_THROW(snmp_client("127.0.XX.XX", port, "public", "test", "", "testtesttest", SNMP_VERSION_3, 1000), snmp_client_exception); + } + + TEST_F(test_SNMPClient, get_port) + { + ASSERT_EQ(161, scPtr->get_port()); + } + + TEST_F(test_SNMPClient, log_error) + { + snmp_pdu response; + ASSERT_NO_THROW(scPtr->log_error(STAT_ERROR, request_type::GET, &response)); + ASSERT_NO_THROW(scPtr->log_error(STAT_ERROR, request_type::SET, &response)); + ASSERT_NO_THROW(scPtr->log_error(STAT_SUCCESS, request_type::OTHER, &response)); + ASSERT_NO_THROW(scPtr->log_error(STAT_TIMEOUT, request_type::OTHER, &response)); + } + + TEST_F(test_SNMPClient, process_snmp_request) + { + snmp_response_obj reqponseRSUID; + string rsuId = "RSU4.1"; + vector rsuId_c; + copy(rsuId.begin(), rsuId.end(), back_inserter(rsuId_c)); + reqponseRSUID.val_string = rsuId_c; + reqponseRSUID.type = snmp_response_obj::response_type::STRING; + EXPECT_CALL(*scPtr, process_snmp_request(RSU_ID_OID, request_type::GET, _)).WillRepeatedly(testing::DoAll(SetArgReferee<2>(reqponseRSUID), Return(true))); + EXPECT_CALL(*scPtr, process_snmp_request(RSU_ID_OID, request_type::SET, _)).WillRepeatedly(testing::DoAll(SetArgReferee<2>(reqponseRSUID), Return(true))); + + snmp_response_obj reqponseMode; + reqponseMode.val_int = 2; + reqponseMode.type = snmp_response_obj::response_type::INTEGER; + EXPECT_CALL(*scPtr, process_snmp_request(RSU_MODE, request_type::GET, _)).WillRepeatedly(testing::DoAll(SetArgReferee<2>(reqponseMode), Return(true))); + EXPECT_CALL(*scPtr, process_snmp_request(RSU_MODE, request_type::SET, _)).WillRepeatedly(testing::DoAll(SetArgReferee<2>(reqponseRSUID), Return(true))); + + snmp_response_obj reqponseInvalidOID; + EXPECT_CALL(*scPtr, process_snmp_request("Invalid OID", request_type::GET, _)).WillRepeatedly(testing::DoAll(SetArgReferee<2>(reqponseInvalidOID), Return(false))); + EXPECT_CALL(*scPtr, process_snmp_request("Invalid OID", request_type::SET, _)).WillRepeatedly(testing::DoAll(SetArgReferee<2>(reqponseInvalidOID), Return(false))); + + snmp_response_obj response; + scPtr->process_snmp_request(RSU_ID_OID, request_type::GET, response); + scPtr->process_snmp_request(RSU_ID_OID, request_type::SET, response); + scPtr->process_snmp_request(RSU_MODE, request_type::GET, response); + scPtr->process_snmp_request(RSU_MODE, request_type::SET, response); + scPtr->process_snmp_request("Invalid OID", request_type::GET, response); + scPtr->process_snmp_request("Invalid OID", request_type::SET, response); + + snmp_client scClient("127.0.0.1", port, "public", "test", "authPriv", "testtesttest", SNMP_VERSION_3, 1000); + scClient.process_snmp_request(RSU_ID_OID, request_type::GET, reqponseRSUID); + scClient.process_snmp_request(RSU_ID_OID, request_type::SET, reqponseRSUID); + scClient.process_snmp_request(RSU_ID_OID, request_type::OTHER, reqponseRSUID); + scClient.process_snmp_request("INVALID OID", request_type::GET, reqponseRSUID); + + scClient.process_snmp_request(RSU_MODE, request_type::GET, reqponseMode); + scClient.process_snmp_request(RSU_MODE, request_type::SET, reqponseMode); + scClient.process_snmp_request(RSU_MODE, request_type::OTHER, reqponseMode); + } + +} \ No newline at end of file diff --git a/src/v2i-hub/RSUHealthMonitorPlugin/CMakeLists.txt b/src/v2i-hub/RSUHealthMonitorPlugin/CMakeLists.txt new file mode 100755 index 000000000..d863a4bf1 --- /dev/null +++ b/src/v2i-hub/RSUHealthMonitorPlugin/CMakeLists.txt @@ -0,0 +1,26 @@ +PROJECT(RSUHealthMonitorPlugin VERSION 7.5.1 LANGUAGES CXX) + +set(TMX_PLUGIN_NAME "RSU Health Monitor") + +find_library(libasn1c .) + +BuildTmxPlugin() + +TARGET_LINK_LIBRARIES(${PROJECT_NAME} PUBLIC tmxutils jsoncpp NemaTode) + +############# +## Testing ## +############# +enable_testing() +include_directories(${PROJECT_SOURCE_DIR}/src) +add_library(${PROJECT_NAME}_lib src/RSUHealthMonitorWorker.cpp) +target_link_libraries(${PROJECT_NAME}_lib PUBLIC + tmxutils + NemaTode + jsoncpp) +set(BINARY ${PROJECT_NAME}_test) +file(GLOB_RECURSE TEST_SOURCES LIST_DIRECTORIES false test/*.h test/*.cpp) +set(SOURCES ${TEST_SOURCES} WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}/test) +add_executable(${BINARY} ${TEST_SOURCES}) +add_test(NAME ${BINARY} COMMAND ${BINARY}) +target_link_libraries(${BINARY} PUBLIC ${PROJECT_NAME}_lib gtest) \ No newline at end of file diff --git a/src/v2i-hub/RSUHealthMonitorPlugin/manifest.json b/src/v2i-hub/RSUHealthMonitorPlugin/manifest.json new file mode 100755 index 000000000..f2ee94443 --- /dev/null +++ b/src/v2i-hub/RSUHealthMonitorPlugin/manifest.json @@ -0,0 +1,51 @@ +{ + "name": "RSUHealthMonitor", + "description": "Monitor RSU health status", + "version": "@PROJECT_VERSION@", + "exeLocation": "/bin/RSUHealthMonitorPlugin", + "coreIpAddr":"127.0.0.1", + "corePort":24601, + "messageTypes": [], + "configuration": [ + { + "key": "LogLevel", + "default": "INFO", + "description": "The log level for this plugin" + }, + { + "key":"Interval", + "default":"1", + "description": "Sending RSU SNMP GET request at every configured interval. Default every 1 second. Unit of measure: second." + }, + { + "key":"RSUIp", + "default":"192.168.XX.XX", + "description":"An IP address of the RSU the V2X hub is connected to." + }, + { + "key":"SNMPPort", + "default":"161", + "description":"The SNMP port for sending message or command." + }, + { + "key":"AuthPassPhrase", + "default":"dummy", + "description":"SNMP v3 authentication passphrase" + }, + { + "key":"SecurityUser", + "default":"authOnlyUser", + "description":"SNMP Security Name" + }, + { + "key":"SecurityLevel", + "default":"authPriv", + "description":"SNMP Security level" + }, + { + "key":"RSUMIBVersion", + "default":"RSU4.1", + "description":"The version of RSU MIB (Management Information Base). E.G. RSU4.1 or RSU1218. Currently only support RSU4.1" + } + ] +} \ No newline at end of file diff --git a/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorPlugin.cpp b/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorPlugin.cpp new file mode 100755 index 000000000..8d81a3b6c --- /dev/null +++ b/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorPlugin.cpp @@ -0,0 +1,90 @@ +#include "RSUHealthMonitorPlugin.h" + +using namespace RSUHealthMonitor; +using namespace tmx::utils; + +namespace RSUHealthMonitor +{ + + RSUHealthMonitorPlugin::RSUHealthMonitorPlugin(const std::string &name) : PluginClient(name) + { + _rsuWorker = std::make_shared(); + _rsuStatusTimer = make_unique(); + UpdateConfigSettings(); + + // Send SNMP call to RSU periodically at configurable interval. + _timerThId = _rsuStatusTimer->AddPeriodicTick([this]() + { + // Periodic SNMP call to get RSU status based on RSU MIB version 4.1 + auto rsuStatusJson = _rsuWorker->getRSUStatus(_rsuMibVersion, _rsuIp, _snmpPort, _securityUser, _authPassPhrase, _securityLevel, SEC_TO_MICRO); + PLOG(logINFO) << "Updating _interval: " << _interval; + //Broadcast RSU status periodically at _interval + BroadcastRSUStatus(rsuStatusJson); }, + std::chrono::milliseconds(_interval * SEC_TO_MILLI)); + _rsuStatusTimer->Start(); + } + + void RSUHealthMonitorPlugin::UpdateConfigSettings() + { + PLOG(logINFO) << "Updating configuration settings."; + + lock_guard lock(_configMutex); + GetConfigValue("Interval", _interval); + GetConfigValue("RSUIp", _rsuIp); + GetConfigValue("SNMPPort", _snmpPort); + GetConfigValue("AuthPassPhrase", _authPassPhrase); + GetConfigValue("SecurityUser", _securityUser); + GetConfigValue("SecurityLevel", _securityLevel); + GetConfigValue("RSUMIBVersion", _rsuMIBVersionStr); + boost::trim_left(_rsuMIBVersionStr); + boost::trim_right(_rsuMIBVersionStr); + // Support RSU MIB version 4.1 + if (boost::iequals(_rsuMIBVersionStr, RSU4_1_str)) + { + _rsuMibVersion = RSUMibVersion::RSUMIB_V_4_1; + } + else + { + _rsuMibVersion = RSUMibVersion::UNKOWN_MIB_V; + PLOG(logERROR) << "Uknown RSU MIB version: " << _rsuMIBVersionStr; + } + + try + { + _rsuStatusTimer->ChangeFrequency(_timerThId, std::chrono::milliseconds(_interval * SEC_TO_MILLI)); + } + catch (const tmx::TmxException &ex) + { + PLOG(logERROR) << ex.what(); + } + } + + void RSUHealthMonitorPlugin::OnConfigChanged(const char *key, const char *value) + { + PluginClient::OnConfigChanged(key, value); + UpdateConfigSettings(); + } + + void RSUHealthMonitorPlugin::BroadcastRSUStatus(const Json::Value &rsuStatusJson) + { + // Broadcast the RSU status info when there are RSU responses. + if (!rsuStatusJson.empty() && _rsuWorker) + { + auto rsuStatusFields = _rsuWorker->getJsonKeys(rsuStatusJson); + auto configTbl = _rsuWorker->GetRSUStatusConfig(_rsuMibVersion); + + // Only broadcast RSU status when all required fields are present. + if (_rsuWorker->validateAllRequiredFieldsPresent(configTbl, rsuStatusFields)) + { + auto sendRsuStatusMsg = _rsuWorker->convertJsonToTMXMsg(rsuStatusJson); + BroadcastMessage(sendRsuStatusMsg, RSUHealthMonitorPlugin::GetName()); + } + } + } + +} // namespace RSUHealthMonitor + +int main(int argc, char *argv[]) +{ + return run_plugin("RSU Health Monitor", argc, argv); +} diff --git a/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorPlugin.h b/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorPlugin.h new file mode 100755 index 000000000..47d8ee1bd --- /dev/null +++ b/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorPlugin.h @@ -0,0 +1,46 @@ + +#pragma once + +#include "PluginClient.h" +#include +#include "RSUStatusMessage.h" +#include "RSUHealthMonitorWorker.h" + +using namespace tmx::utils; +using namespace std; + +namespace RSUHealthMonitor +{ + + class RSUHealthMonitorPlugin : public PluginClient + { + private: + mutex _configMutex; + uint16_t _interval; + string _rsuIp; + uint16_t _snmpPort; + string _authPassPhrase; + string _securityUser; + string _securityLevel; + string _rsuMIBVersionStr; + RSUMibVersion _rsuMibVersion; + const char *RSU4_1_str = "RSU4.1"; + const char *RSU1218_str = "RSU1218"; + shared_ptr _rsuWorker; + unique_ptr _rsuStatusTimer; + uint _timerThId; + const long SEC_TO_MICRO = 1000000; + const long SEC_TO_MILLI= 1000; + /** + * @brief Broadcast RSU status + * @param Json::Value RSU status in JSON format + */ + void BroadcastRSUStatus(const Json::Value& rsuStatusJson); + + public: + explicit RSUHealthMonitorPlugin(const std::string &name); + void UpdateConfigSettings(); + void OnConfigChanged(const char *key, const char *value) override; + }; + +} // namespace RSUHealthMonitorPlugin \ No newline at end of file diff --git a/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorWorker.cpp b/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorWorker.cpp new file mode 100644 index 000000000..6de25d57e --- /dev/null +++ b/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorWorker.cpp @@ -0,0 +1,217 @@ +#include "RSUHealthMonitorWorker.h" + +namespace RSUHealthMonitor +{ + + RSUHealthMonitorWorker::RSUHealthMonitorWorker() + { + _RSUSTATUSConfigMapPtr = make_shared>(); + // Currently only support RSU MIB version 4.1. Other future supported versions will be inserted here. + RSUStatusConfigTable rsuRstatusTable = constructRsuStatusConfigTable(RSUMibVersion::RSUMIB_V_4_1); + _RSUSTATUSConfigMapPtr->insert({RSUMibVersion::RSUMIB_V_4_1, rsuRstatusTable}); + } + + RSUStatusConfigTable RSUHealthMonitorWorker::constructRsuStatusConfigTable(const RSUMibVersion &mibVersion) const + { + RSUStatusConfigTable rsuStatusTbl; + // Populate custom defined RSU Status table with RSU MIB version 4.1. + if (mibVersion == RSUMibVersion::RSUMIB_V_4_1) + { + RSUFieldOIDStruct rsuID = {"rsuID", RSU_ID_OID, true}; + rsuStatusTbl.push_back(rsuID); + + RSUFieldOIDStruct rsuMibVersion = {"rsuMibVersion", RSU_MIB_VERSION, true}; + rsuStatusTbl.push_back(rsuMibVersion); + + RSUFieldOIDStruct rsuFirmwareVersion = {"rsuFirmwareVersion", RSU_FIRMWARE_VERSION, true}; + rsuStatusTbl.push_back(rsuFirmwareVersion); + + RSUFieldOIDStruct rsuManufacturer = {"rsuManufacturer", RSU_MANUFACTURER, true}; + rsuStatusTbl.push_back(rsuManufacturer); + + RSUFieldOIDStruct rsuGpsOutputString = {"rsuGpsOutputString", RSU_GPS_OUTPUT_STRING, true}; + rsuStatusTbl.push_back(rsuGpsOutputString); + + RSUFieldOIDStruct rsuIFMIndex = {"rsuIFMIndex", RSU_IFM_INDEX, false}; + rsuStatusTbl.push_back(rsuIFMIndex); + + RSUFieldOIDStruct rsuIFMPsid = {"rsuIFMPsid", RSU_IFM_PSID, false}; + rsuStatusTbl.push_back(rsuIFMPsid); + + RSUFieldOIDStruct rsuIFMDsrcMsgId = {"rsuIFMDsrcMsgId", RSU_IFM_DSRC_MSG_ID, false}; + rsuStatusTbl.push_back(rsuIFMDsrcMsgId); + + RSUFieldOIDStruct rsuIFMTxMode = {"rsuIFMTxMode", RSU_IFM_INDEX, false}; + rsuStatusTbl.push_back(rsuIFMTxMode); + + RSUFieldOIDStruct rsuIFMTxChannel = {"rsuIFMTxChannel", RSU_IFM_TX_CHANNEL, false}; + rsuStatusTbl.push_back(rsuIFMTxChannel); + + RSUFieldOIDStruct rsuIFMEnable = {"rsuIFMEnable", RSU_IFM_ENABLE, false}; + rsuStatusTbl.push_back(rsuIFMEnable); + + RSUFieldOIDStruct rsuIFMStatus = {"rsuIFMStatus", RSU_IFM_STATUS, false}; + rsuStatusTbl.push_back(rsuIFMStatus); + + RSUFieldOIDStruct rsuMode = {"rsuMode", RSU_MODE, true}; + rsuStatusTbl.push_back(rsuMode); + + RSUFieldOIDStruct rsuChanStatus = {"rsuChanStatus", RSU_CHAN_STATUS, true}; + rsuStatusTbl.push_back(rsuChanStatus); + } + return rsuStatusTbl; + } + + bool RSUHealthMonitorWorker::validateAllRequiredFieldsPresent(const RSUHealthMonitor::RSUStatusConfigTable &configTbl, const vector &fields) const + { + bool isAllPresent = true; + for (const auto &config : configTbl) + { + if (config.required && find(fields.begin(), fields.end(), config.field) == fields.end()) + { + isAllPresent = false; + PLOG(logWARNING) << "No broadcast as required field " << config.field << " is not present!"; + } + } + return isAllPresent; + } + + RSUStatusConfigTable RSUHealthMonitorWorker::GetRSUStatusConfig(const RSUMibVersion &mibVersion) const + { + RSUStatusConfigTable result; + try + { + result = _RSUSTATUSConfigMapPtr->at(mibVersion); + } + catch (const out_of_range &ex) + { + PLOG(logERROR) << "Unknown MIB version! " << ex.what(); + } + return result; + } + + std::map RSUHealthMonitorWorker::ParseRSUGPS(const std::string &gps_nmea_data) const + { + std::map result; + nmea::NMEAParser parser; + nmea::GPSService gps(parser); + try + { + parser.readLine(gps_nmea_data); + std::stringstream ss; + ss << std::setprecision(8) << std::fixed << gps.fix.latitude << std::endl; + auto latitude_str = ss.str(); + std::stringstream sss; + sss << std::setprecision(8) << std::fixed << gps.fix.longitude << std::endl; + auto longitude_str = sss.str(); + result.insert({std::stod(latitude_str), std::stod(longitude_str)}); + PLOG(logDEBUG) << "Parse GPS NMEA string: " << gps_nmea_data << ". Result (Latitude, Longitude): (" << latitude_str << "," << longitude_str << ")"; + } + catch (const nmea::NMEAParseError &e) + { + PLOG(logERROR) << e.message.c_str(); + } + return result; + } + + Json::Value RSUHealthMonitorWorker::getRSUStatus(const RSUMibVersion &mibVersion, const string &_rsuIp, uint16_t &_snmpPort, const string &_securityUser, const string &_authPassPhrase, const string &_securityLevel, long timeout) + { + auto rsuStatusConfigTbl = GetRSUStatusConfig(mibVersion); + if (rsuStatusConfigTbl.size() == 0) + { + PLOG(logERROR) << "RSU status update call failed due to the RSU status config table is empty!"; + return Json::nullValue; + } + try + { + // Create SNMP client and use SNMP V3 protocol + PLOG(logINFO) << "Update SNMP client: RSU IP: " << _rsuIp << ", RSU port: " << _snmpPort << ", User: " << _securityUser << ", auth pass phrase: " << _authPassPhrase << ", security level: " + << _securityLevel; + auto _snmpClientPtr = std::make_unique(_rsuIp, _snmpPort, "", _securityUser, _securityLevel, _authPassPhrase, SNMP_VERSION_3, timeout); + + Json::Value rsuStatuJson; + // Sending RSU SNMP call for each field as each field has its own OID. + for (const auto &config : rsuStatusConfigTbl) + { + PLOG(logINFO) << "SNMP RSU status call for field:" << config.field << ", OID: " << config.oid; + snmp_response_obj responseVal; + if (_snmpClientPtr) + { + auto success = _snmpClientPtr->process_snmp_request(config.oid, request_type::GET, responseVal); + if (!success && config.required) + { + PLOG(logERROR) << "SNMP session stopped as the required field: " << config.field << " failed! Return empty RSU status!"; + return Json::nullValue; + } + else if (success) + { + rsuStatuJson.append(populateJson(config.field, responseVal)); + } + } + } + return rsuStatuJson; + } + catch (tmx::utils::snmp_client_exception &ex) + { + PLOG(logERROR) << ex.what(); + return Json::nullValue; + } + } + + Json::Value RSUHealthMonitorWorker::populateJson(const string &field, const snmp_response_obj &response) const + { + Json::Value rsuStatuJson; + if (response.type == snmp_response_obj::response_type::INTEGER) + { + rsuStatuJson[field] = response.val_int; + } + else if (response.type == snmp_response_obj::response_type::STRING) + { + string response_str(response.val_string.begin(), response.val_string.end()); + // Proess GPS nmea string + if (boost::iequals(field, "rsuGpsOutputString")) + { + auto gps = ParseRSUGPS(response_str); + rsuStatuJson["rsuGpsOutputStringLatitude"] = gps.begin()->first; + rsuStatuJson["rsuGpsOutputStringLongitude"] = gps.begin()->second; + } + rsuStatuJson[field] = response_str; + } + return rsuStatuJson; + } + + RSUStatusMessage RSUHealthMonitorWorker::convertJsonToTMXMsg(const Json::Value &json) const + { + Json::FastWriter fasterWirter; + string json_str = fasterWirter.write(json); + tmx::messages::RSUStatusMessage rsuStatusMsg; + rsuStatusMsg.set_contents(json_str); + return rsuStatusMsg; + } + + vector RSUHealthMonitorWorker::getJsonKeys(const Json::Value &json) const + { + vector keys; + if (json.isArray()) + { + for (auto itr = json.begin(); itr != json.end(); itr++) + { + if (itr->isObject()) + { + for (auto const &field : itr->getMemberNames()) + { + keys.push_back(field); + } + } + } + } + else if (json.isObject()) + { + for (auto const &field : json.getMemberNames()) + { + keys.push_back(field); + } + } + return keys; + } +} \ No newline at end of file diff --git a/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorWorker.h b/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorWorker.h new file mode 100644 index 000000000..ef7a429c5 --- /dev/null +++ b/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorWorker.h @@ -0,0 +1,120 @@ +#pragma once +#include +#include "RSU_MIB_4_1.h" +#include +#include +#include +#include +#include +#include "PluginLog.h" +#include +#include "SNMPClient.h" +#include +#include "RSUStatusMessage.h" + +using namespace std; +using namespace tmx::utils; +using namespace tmx::utils::rsu41::mib::oid; +using namespace tmx::messages; + +namespace RSUHealthMonitor +{ + enum class RSUMibVersion + { + UNKOWN_MIB_V = 0, + RSUMIB_V_4_1 = 1, + RSUMIB_V_1218 = 2 + }; + + struct RSUFieldOIDStruct + { + string field; + string oid; + bool required; // Indicate whether this field is required to before broadcasting the RSU status. + }; + + /** + * RSUStatusTable is custom defined RSU status information. + * The fields are a subset of the fields from the RSU MIB definition used to quantify the health of the RSU. https://github.com/certificationoperatingcouncil/COC_TestSpecs/blob/master/AppNotes/RSU/RSU-MIB.txt + */ + using RSUStatusConfigTable = vector; + + class RSUHealthMonitorWorker + { + private: + // A map of RSU MIB version used and RSUStatusTable + shared_ptr> _RSUSTATUSConfigMapPtr; + + /** + * @brief Poupate the RSU status table with the specified version of OIDs and fields. + * Private: Only allow to initialze the RSU STATUS MAP once + * @param mibVersion specified + * @return RSUStatusTable the self defined RSU status https://usdot-carma.atlassian.net/wiki/spaces/WFD2/pages/2640740360/RSU+Health+Monitor+Plugin+Design + */ + RSUStatusConfigTable constructRsuStatusConfigTable(const RSUMibVersion &mibVersion) const; + + public: + // Populate the RSU Status Table with predefined fields and their mapping OIDs in constructor + RSUHealthMonitorWorker(); + + // Access to the RSU status table based in the RSU MIB version provided + RSUStatusConfigTable GetRSUStatusConfig(const RSUMibVersion &mibVersion) const; + + /** + * @brief determine if all required fields in the RSU config map _RSUSTATUSConfigMapPtr present in the input fields + * Use _RSUSTATUSConfigMapPtr RSU status config map that defines all fields and whether the fields are required. + * @param RSUStatusConfigTable RSU Status configration table to compare with. + * @param vector Input RSU fields to verify + * @return True if all required fields found. Otherwise, false. + */ + bool validateAllRequiredFieldsPresent(const RSUHealthMonitor::RSUStatusConfigTable &configTbl, const vector &fields) const; + + /** + * @brief Parse NMEA GPS sentense and return GPS related data + * @param gps_nmea_data NMEA GPS sentense + * @return map A map of latitude and longitude + */ + std::map ParseRSUGPS(const std::string &gps_nmea_data) const; + + /** + * @brief Sending SNMP V3 requests to get info for each field in the RSUStatusConfigTable, and return the RSU status in JSON + * Use RSU Status configuration table include RSU field, OIDs, and whether fields are required or optional + * @param RSUMibVersion The RSU MIB version used + * @param string RSU IP address + * @param uint16_t SNMP port + * @param string security user used for SNMP authentication + * @param string authentication password + * @param string security level: authPriv or authNoPriv. + * @param long session time out + */ + Json::Value getRSUStatus(const RSUMibVersion &mibVersion, const string &_rsuIp, uint16_t &_snmpPort, const string &_securityUser, const string &_authPassPhrase, const string &_securityLevel, long timeout); + + /*** + *@brief Convert the JSON message into TMX message + @param Json Input Json value + @return RSUStatusMessage TMX message + */ + RSUStatusMessage convertJsonToTMXMsg(const Json::Value &json) const; + + /** + * @brief Populate Json with snmp response object. + * @param string The field that maps to an OID. + * @param snmp_response_obj The response returned by SNMP call for the OID. + * @return Json value populated with response object. + */ + Json::Value populateJson(const string &field, const snmp_response_obj &response) const; + + /** + * @brief List the keys from the input Json values + * @param Json Input JSON values + * @return vector of key strings + */ + vector getJsonKeys(const Json::Value &json) const; + + // Delete move constructor + RSUHealthMonitorWorker(RSUHealthMonitorWorker &&worker) = delete; + + // Delete copy constructor + RSUHealthMonitorWorker(RSUHealthMonitorWorker &worker) = delete; + }; +} // namespace RSUHealthMonitor diff --git a/src/v2i-hub/RSUHealthMonitorPlugin/test/main.cpp b/src/v2i-hub/RSUHealthMonitorPlugin/test/main.cpp new file mode 100644 index 000000000..ba7cd2667 --- /dev/null +++ b/src/v2i-hub/RSUHealthMonitorPlugin/test/main.cpp @@ -0,0 +1,8 @@ + +#include + +int main(int argc, char **argv) +{ + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/v2i-hub/RSUHealthMonitorPlugin/test/test_RSUHealthMonitorWorker.cpp b/src/v2i-hub/RSUHealthMonitorPlugin/test/test_RSUHealthMonitorWorker.cpp new file mode 100644 index 000000000..9d749f0df --- /dev/null +++ b/src/v2i-hub/RSUHealthMonitorPlugin/test/test_RSUHealthMonitorWorker.cpp @@ -0,0 +1,108 @@ +#include "RSUHealthMonitorWorker.h" +#include + +namespace RSUHealthMonitor +{ + class test_RSUHealthMonitorWorker : public ::testing::Test + { + public: + std::shared_ptr _rsuWorker = std::make_shared(); + }; + + TEST_F(test_RSUHealthMonitorWorker, GetRSUStatusConfig) + { + RSUStatusConfigTable rsuStatusConfigTbl = _rsuWorker->GetRSUStatusConfig(RSUMibVersion::RSUMIB_V_4_1); + ASSERT_EQ(14, rsuStatusConfigTbl.size()); + + rsuStatusConfigTbl = _rsuWorker->GetRSUStatusConfig(RSUMibVersion::UNKOWN_MIB_V); + ASSERT_EQ(0, rsuStatusConfigTbl.size()); + + RSUMibVersion mibVersionDefault; + rsuStatusConfigTbl = _rsuWorker->GetRSUStatusConfig(mibVersionDefault); + ASSERT_EQ(0, rsuStatusConfigTbl.size()); + } + + TEST_F(test_RSUHealthMonitorWorker, validateAllRequiredFieldsPresent) + { + auto config = _rsuWorker->GetRSUStatusConfig(RSUMibVersion::RSUMIB_V_4_1); + vector requiredFields = {"rsuID", "rsuMibVersion", "rsuFirmwareVersion", "rsuManufacturer", "rsuGpsOutputString", "rsuMode", "rsuChanStatus"}; + ASSERT_TRUE(_rsuWorker->validateAllRequiredFieldsPresent(config, requiredFields)); + + requiredFields = {"rsuID", "rsuMibVersion", "rsuFirmwareVersion"}; + ASSERT_FALSE(_rsuWorker->validateAllRequiredFieldsPresent(config, requiredFields)); + } + + TEST_F(test_RSUHealthMonitorWorker, ParseRSUGPS) + { + std::string gps_nmea_data = "$GPGGA,142440.00,3857.3065,N,07708.9734,W,2,18,0.65,86.18,M,-34.722,M,,*62"; + auto gps_map = _rsuWorker->ParseRSUGPS(gps_nmea_data); + ASSERT_EQ(1, gps_map.size()); + double expected_latitude = 38.9551; + double expected_longitude = -77.1496; + for (auto itr = gps_map.begin(); itr != gps_map.end(); itr++) + { + ASSERT_NEAR(expected_latitude, itr->first, 0.001); + ASSERT_NEAR(expected_longitude, itr->second, 0.001); + } + std::string invalid_gps_nmea_data = "$*GPGGA,invalid"; + auto gps_map_invalid = _rsuWorker->ParseRSUGPS(invalid_gps_nmea_data); + ASSERT_EQ(0, gps_map_invalid.size()); + } + + TEST_F(test_RSUHealthMonitorWorker, getRSUStatus) + { + uint16_t port = 161; + auto json = _rsuWorker->getRSUStatus(RSUMibVersion::RSUMIB_V_4_1, "127.0.0.1", port, "test", "testtesttest", "authPriv", 1000); + ASSERT_TRUE(json.empty()); + + json = _rsuWorker->getRSUStatus(RSUMibVersion::RSUMIB_V_4_1, "127.0.0.1", port, "test", "test", "authPriv", 1000); + ASSERT_TRUE(json.empty()); + + json = _rsuWorker->getRSUStatus(RSUMibVersion::RSUMIB_V_1218, "127.0.0.1", port, "test", "test", "authPriv", 1000); + ASSERT_TRUE(json.empty()); + } + + TEST_F(test_RSUHealthMonitorWorker, convertJsonToTMXMsg) + { + Json::Value json; + json["rsuID"] = "RSU4.1"; + json["rsuMode"] = 4; + auto rsuStatusTmxMsg = _rsuWorker->convertJsonToTMXMsg(json); + string expectedStr = "{\"rsuID\":\"RSU4.1\",\"rsuMode\":4}\n"; + ASSERT_EQ(expectedStr, rsuStatusTmxMsg.to_string()); + } + + TEST_F(test_RSUHealthMonitorWorker, populateJson) + { + Json::Value rsuStatusJson; + snmp_response_obj stringObj; + stringObj.type = snmp_response_obj::response_type::STRING; + std::string gps_nmea_data = "$GPGGA,142440.00,3857.3065,N,07708.9734,W,2,18,0.65,86.18,M,-34.722,M,,*62"; + vector rgps_nmea_data_c; + copy(gps_nmea_data.begin(), gps_nmea_data.end(), back_inserter(rgps_nmea_data_c)); + stringObj.val_string = rgps_nmea_data_c; + + auto json = _rsuWorker->populateJson("rsuGpsOutputString", stringObj); + double expected_latitude = 38.9551; + double expected_longitude = -77.1496; + ASSERT_NEAR(expected_latitude, json["rsuGpsOutputStringLatitude"].asDouble(), 0.001); + ASSERT_NEAR(expected_longitude, json["rsuGpsOutputStringLongitude"].asDouble(), 0.001); + rsuStatusJson.append(json); + + snmp_response_obj intObj; + intObj.type = snmp_response_obj::response_type::INTEGER; + intObj.val_int = 4; + + json = _rsuWorker->populateJson("rsuMode", intObj); + ASSERT_EQ(4, json["rsuMode"].asInt64()); + rsuStatusJson.append(json); + + Json::FastWriter fasterWirter; + string json_str = fasterWirter.write(rsuStatusJson); + string expectedStr = "[{\"rsuGpsOutputString\":\"$GPGGA,142440.00,3857.3065,N,07708.9734,W,2,18,0.65,86.18,M,-34.722,M,,*62\",\"rsuGpsOutputStringLatitude\":38.955108330000002,\"rsuGpsOutputStringLongitude\":-77.149556669999996},{\"rsuMode\":4}]\n"; + ASSERT_EQ(expectedStr, json_str); + ASSERT_EQ(4, _rsuWorker->getJsonKeys(rsuStatusJson).size()); + ASSERT_EQ(1, _rsuWorker->getJsonKeys(json).size()); + } + +} \ No newline at end of file From 774b31fced876b48226302ffd093f66484a76a2f Mon Sep 17 00:00:00 2001 From: dan-du-car <62157949+dan-du-car@users.noreply.github.com> Date: Tue, 21 Nov 2023 18:56:15 -0500 Subject: [PATCH 2/3] v2x hub Integration: Implement telematic plugin to subscribe to all TMX messages from V2xHub (#565) # PR Details ## Description In order to publish data from V2xHub to cloud a telematics bridge is needed. This story is the implementation story for the telematic module to collect J2735 data from V2xHub. The design refers to https://usdot-carma.atlassian.net/wiki/spaces/WFD2/pages/2640379977/V2xHub+WFD+Bridge+Design Connect to V2xHub Subscribe all TMX messages from V2xhub. If there are J2735 message payloads in the TMX messages, decode the J2735 into human readable message. Convert TMX messages into messages in JSON format. ## Related Issue NA ## Motivation and Context CDA telematic ## How Has This Been Tested? Unit test Integration test ## Types of changes - [ ] Defect fix (non-breaking change that fixes an issue) - [x] New feature (non-breaking change that adds functionality) - [ ] Breaking change (fix or feature that cause existing functionality to change) ## Checklist: - [ ] I have added any new packages to the sonar-scanner.properties file - [ ] My change requires a change to the documentation. - [ ] I have updated the documentation accordingly. - [x] I have read the **CONTRIBUTING** document. [V2XHUB Contributing Guide](https://github.com/usdot-fhwa-OPS/V2X-Hub/blob/develop/Contributing.md) - [ ] I have added tests to cover my changes. - [ ] All new and existing tests passed. --- .sonarqube/sonar-scanner.properties | 7 +- ext/build.sh | 18 +- scripts/install_dependencies.sh | 3 + .../src/RSUHealthMonitorWorker.cpp | 6 +- .../test/test_RSUHealthMonitorWorker.cpp | 9 +- .../test/testPedestrianDetectionForSPAT.cpp | 2 +- .../TelematicBridgePlugin/CMakeLists.txt | 15 ++ .../TelematicBridgePlugin/manifest.json | 16 ++ .../src/TelematicBridgeException.h | 11 + .../src/TelematicBridgeMsgWorker.h | 247 ++++++++++++++++++ .../src/TelematicBridgePlugin.cpp | 37 +++ .../src/TelematicBridgePlugin.h | 26 ++ .../TelematicBridgePlugin/test/main.cpp | 7 + .../test/test_TelematicMsgWorker.cpp | 106 ++++++++ 14 files changed, 503 insertions(+), 7 deletions(-) create mode 100644 src/v2i-hub/TelematicBridgePlugin/CMakeLists.txt create mode 100644 src/v2i-hub/TelematicBridgePlugin/manifest.json create mode 100644 src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgeException.h create mode 100644 src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgeMsgWorker.h create mode 100644 src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.cpp create mode 100644 src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.h create mode 100644 src/v2i-hub/TelematicBridgePlugin/test/main.cpp create mode 100644 src/v2i-hub/TelematicBridgePlugin/test/test_TelematicMsgWorker.cpp diff --git a/.sonarqube/sonar-scanner.properties b/.sonarqube/sonar-scanner.properties index d5f118b33..06424d6ef 100644 --- a/.sonarqube/sonar-scanner.properties +++ b/.sonarqube/sonar-scanner.properties @@ -56,7 +56,8 @@ sonar.modules= PedestrianPlugin, \ CARMAStreetsPlugin, \ ERVCloudForwardingPlugin, \ CDASimAdapter, \ - RSUHealthMonitorPlugin + RSUHealthMonitorPlugin, \ + TelematicBridgePlugin @@ -84,6 +85,7 @@ CARMAStreetsPlugin.sonar.projectBaseDir =src/v2i-hub/CARMAStreetsPlugin ERVCloudForwardingPlugin.sonar.projectBaseDir =src/v2i-hub/ERVCloudForwardingPlugin CDASimAdapter.sonar.projectBaseDir =src/v2i-hub/CDASimAdapter RSUHealthMonitorPlugin.sonar.projectBaseDir =src/v2i-hub/RSUHealthMonitorPlugin +TelematicBridgePlugin.sonar.projectBaseDir =src/v2i-hub/TelematicBridgePlugin @@ -121,6 +123,8 @@ CDASimAdapter.sonar.sources =src CDASimAdapter.sonar.exclusions =test/** RSUHealthMonitorPlugin.sonar.sources =src RSUHealthMonitorPlugin.sonar.exclusions =test/** +TelematicBridgePlugin.sonar.sources =src +TelematicBridgePlugin.sonar.exclusions =test/** # Tests # Note: For C++ setting this field does not cause test analysis to occur. It only allows the test source code to be evaluated. @@ -149,3 +153,4 @@ CARMAStreetsPlugin.sonar.tests=test ERVCloudForwardingPlugin.sonar.tests=test CDASimAdapter.sonar.tests=test RSUHealthMonitorPlugin.sonar.tests=test +TelematicBridgePlugin.sonar.tests=test diff --git a/ext/build.sh b/ext/build.sh index 03d2ec4fb..91d59ca8a 100755 --- a/ext/build.sh +++ b/ext/build.sh @@ -50,9 +50,25 @@ popd # GPS Parser pushd /tmp +if [ -d "NemaTode" ]; then + rm -r NemaTode +fi git clone https://github.com/ckgt/NemaTode.git cd NemaTode cmake . make -j${numCPU} make install -popd \ No newline at end of file +popd + +# Nats C API +pushd /tmp +if [ -d "nats.c" ]; then + rm -r nats.c +fi +git clone https://github.com/nats-io/nats.c +cd nats.c +cmake . -DNATS_BUILD_NO_SPIN=ON +make -j${numCPU} +make install +popd + diff --git a/scripts/install_dependencies.sh b/scripts/install_dependencies.sh index 2fa104c86..c4ce8727e 100755 --- a/scripts/install_dependencies.sh +++ b/scripts/install_dependencies.sh @@ -31,6 +31,9 @@ DEPENDENCIES="build-essential \ wget \ zip \ zlib1g \ + rapidjson-dev \ + librapidxml-dev \ + libprotobuf-c-dev \ curl" # STOL library dependencies diff --git a/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorWorker.cpp b/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorWorker.cpp index 6de25d57e..6ac62f9fc 100644 --- a/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorWorker.cpp +++ b/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorWorker.cpp @@ -145,7 +145,11 @@ namespace RSUHealthMonitor } else if (success) { - rsuStatuJson.append(populateJson(config.field, responseVal)); + auto json = populateJson(config.field, responseVal); + for(const auto &key: json.getMemberNames()) + { + rsuStatuJson[key] = json[key]; + } } } } diff --git a/src/v2i-hub/RSUHealthMonitorPlugin/test/test_RSUHealthMonitorWorker.cpp b/src/v2i-hub/RSUHealthMonitorPlugin/test/test_RSUHealthMonitorWorker.cpp index 9d749f0df..cc61247b5 100644 --- a/src/v2i-hub/RSUHealthMonitorPlugin/test/test_RSUHealthMonitorWorker.cpp +++ b/src/v2i-hub/RSUHealthMonitorPlugin/test/test_RSUHealthMonitorWorker.cpp @@ -87,7 +87,10 @@ namespace RSUHealthMonitor double expected_longitude = -77.1496; ASSERT_NEAR(expected_latitude, json["rsuGpsOutputStringLatitude"].asDouble(), 0.001); ASSERT_NEAR(expected_longitude, json["rsuGpsOutputStringLongitude"].asDouble(), 0.001); - rsuStatusJson.append(json); + for(const auto& key: json.getMemberNames()) + { + rsuStatusJson[key] = json[key]; + } snmp_response_obj intObj; intObj.type = snmp_response_obj::response_type::INTEGER; @@ -95,11 +98,11 @@ namespace RSUHealthMonitor json = _rsuWorker->populateJson("rsuMode", intObj); ASSERT_EQ(4, json["rsuMode"].asInt64()); - rsuStatusJson.append(json); + rsuStatusJson["rsuMode"] = json["rsuMode"]; Json::FastWriter fasterWirter; string json_str = fasterWirter.write(rsuStatusJson); - string expectedStr = "[{\"rsuGpsOutputString\":\"$GPGGA,142440.00,3857.3065,N,07708.9734,W,2,18,0.65,86.18,M,-34.722,M,,*62\",\"rsuGpsOutputStringLatitude\":38.955108330000002,\"rsuGpsOutputStringLongitude\":-77.149556669999996},{\"rsuMode\":4}]\n"; + string expectedStr = "{\"rsuGpsOutputString\":\"$GPGGA,142440.00,3857.3065,N,07708.9734,W,2,18,0.65,86.18,M,-34.722,M,,*62\",\"rsuGpsOutputStringLatitude\":38.955108330000002,\"rsuGpsOutputStringLongitude\":-77.149556669999996,\"rsuMode\":4}\n"; ASSERT_EQ(expectedStr, json_str); ASSERT_EQ(4, _rsuWorker->getJsonKeys(rsuStatusJson).size()); ASSERT_EQ(1, _rsuWorker->getJsonKeys(json).size()); diff --git a/src/v2i-hub/SpatPlugin/test/testPedestrianDetectionForSPAT.cpp b/src/v2i-hub/SpatPlugin/test/testPedestrianDetectionForSPAT.cpp index 8910b8bb0..e404307b2 100644 --- a/src/v2i-hub/SpatPlugin/test/testPedestrianDetectionForSPAT.cpp +++ b/src/v2i-hub/SpatPlugin/test/testPedestrianDetectionForSPAT.cpp @@ -73,7 +73,7 @@ TEST(PedestrianDetectionForSPAT, updateEncodedSpat) EXPECT_EQ(spatPtr->intersections.list.array[0]->states.list.array[0]->signalGroup, 1); EXPECT_EQ(spatPtr->intersections.list.array[0]->states.list.array[1]->signalGroup, 2); ASSERT_NE(spatPtr->intersections.list.array[0]->maneuverAssistList, nullptr); - EXPECT_EQ(spatPtr->intersections.list.array[0]->maneuverAssistList->list.count, 1); + EXPECT_NE(spatPtr->intersections.list.array[0]->maneuverAssistList->list.count, 0); ASSERT_NE(spatPtr->intersections.list.array[0]->maneuverAssistList->list.array[0]->pedBicycleDetect, nullptr); EXPECT_EQ(*(spatPtr->intersections.list.array[0]->maneuverAssistList->list.array[0]->pedBicycleDetect), 1); } diff --git a/src/v2i-hub/TelematicBridgePlugin/CMakeLists.txt b/src/v2i-hub/TelematicBridgePlugin/CMakeLists.txt new file mode 100644 index 000000000..5134ae38b --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/CMakeLists.txt @@ -0,0 +1,15 @@ +PROJECT (TelematicBridgePlugin VERSION 7.5.1 LANGUAGES CXX) + +set (TMX_PLUGIN_NAME "Telematic Bridge") + +BuildTmxPlugin() +TARGET_LINK_LIBRARIES ( ${PROJECT_NAME} tmxutils jsoncpp) + +#################################################### +################## Testing ####################### +#################################################### +enable_testing() +include_directories(${PROJECT_SOURCE_DIR}/src) +file(GLOB_RECURSE TEST_SOURCES LIST_DIRECTORIES false test/*.h test/*.cpp) +add_executable(${PROJECT_NAME}_test ${TEST_SOURCES}) +target_link_libraries(${PROJECT_NAME}_test PRIVATE gtest tmxutils jsoncpp) \ No newline at end of file diff --git a/src/v2i-hub/TelematicBridgePlugin/manifest.json b/src/v2i-hub/TelematicBridgePlugin/manifest.json new file mode 100644 index 000000000..530dbe0eb --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/manifest.json @@ -0,0 +1,16 @@ +{ + "name": "TelematicBridge", + "description": "Plugin that listens for TMX messages and forward them to the Telematic cloud services.", + "version": "@PROJECT_VERSION@", + "exeLocation": "/bin/TelematicBridgePlugin", + "coreIpAddr": "127.0.0.1", + "corePort": 24601, + "messageTypes": [], + "configuration": [ + { + "key": "LogLevel", + "default": "INFO", + "description": "The log level for this plugin" + } + ] +} \ No newline at end of file diff --git a/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgeException.h b/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgeException.h new file mode 100644 index 000000000..5959ca1ff --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgeException.h @@ -0,0 +1,11 @@ +#pragma once +#include + +namespace TelematicBridge +{ + class TelematicBridgeException : public tmx::TmxException + { + public: + using TmxException::TmxException; + }; +} \ No newline at end of file diff --git a/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgeMsgWorker.h b/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgeMsgWorker.h new file mode 100644 index 000000000..c31c5cc9c --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgeMsgWorker.h @@ -0,0 +1,247 @@ +#pragma once +#include "PluginLog.h" +#include +#include "TelematicBridgeException.h" +#include "jsoncpp/json/json.h" +#include +#include +#include +#include + +using namespace tmx::utils; +using namespace std; +using namespace tmx::messages; +namespace pt = boost::property_tree; + +namespace TelematicBridge +{ + using buffer_structure_t = struct buffer_structure + { + char *buffer; // buffer array + size_t buffer_size; // this is really where we will write next. + size_t allocated_size; // this is the total size of the buffer. + }; + + /** + * @brief Convert hex string into a vector of bytes + * @param string input hex string payload + * @param vector byte buffer to be updated. + * @return bool indicator whether conversion is successful. + */ + bool HexToBytes(const string &hexPaylod, vector &byteBuffer) + { + uint8_t d = 0; + int i = 0; + + for (const char &c : hexPaylod) + { + if (c <= '9' && c >= '0') + { + d = c - '0'; + } + else if (c <= 'F' && c >= 'A') + { + d = c - 55; // c - 'A' + 10 + } + else if (c <= 'f' && c >= 'a') + { + d = c - 87; // c - 'a' + 10; + } + else + { + return false; + } + + if (i % 2) + { + // low order nibble. + byteBuffer.back() |= d; + } + else + { + // high order nibble. + byteBuffer.push_back(d << 4); + } + ++i; + } + return true; + } + + /** + * @brief Decode J2735 message and populate the J2735 data frame + * @param string input hex string payload + * @param MessageFrame_t J2735 struct to be updated + * @return bool indicator whether decoding successful + */ + void DecodeJ2735Msg(const string &hexPaylod, MessageFrame_t *messageFrame) + { + /** + * Decode J2735 message + */ + ostringstream erroross; + vector byte_buffer; + if (!HexToBytes(hexPaylod, byte_buffer)) + { + throw TelematicBridgeException("Failed attempt to decode MessageFrame hex string: cannot convert to bytes."); + } + asn_dec_rval_t decode_rval = asn_decode( + nullptr, + ATS_UNALIGNED_BASIC_PER, + &asn_DEF_MessageFrame, + (void **)&messageFrame, + byte_buffer.data(), + byte_buffer.size()); + + if (decode_rval.code != RC_OK) + { + erroross.str(""); + erroross << "failed ASN.1 binary decoding of element " << asn_DEF_MessageFrame.name << ": bad data. Successfully decoded " << decode_rval.consumed << " bytes."; + throw TelematicBridgeException(erroross.str()); + } + } + + int DynamicBufferAppend(const void *buffer, size_t size, void *app_key) + { + auto *xb = static_cast(app_key); + + while (xb->buffer_size + size + 1 > xb->allocated_size) + { + // increase size of buffer. + size_t new_size = 2 * (xb->allocated_size ? xb->allocated_size : 64); + auto new_buf = static_cast(MALLOC(new_size)); + if (!new_buf) + return -1; + // move old to new. + memcpy(new_buf, xb->buffer, xb->buffer_size); + + FREEMEM(xb->buffer); + xb->buffer = new_buf; + xb->allocated_size = new_size; + } + + memcpy(xb->buffer + xb->buffer_size, buffer, size); + xb->buffer_size += size; + // null terminate the string. + xb->buffer[xb->buffer_size] = '\0'; + return 0; + } + + /** + * @brief Convert the J2735 messageFrame into string in XML format + * @param MessageFrame_t J2735 struct + * @return string XML formatted J2735 message + */ + string ConvertJ2735FrameToXML(const MessageFrame_t *messageFrame) + { + /** + * Convert J2735 message into XML + */ + buffer_structure_t xml_buffer = {nullptr, 0, 0}; + asn_enc_rval_t encode_rval = xer_encode( + &asn_DEF_MessageFrame, + messageFrame, + XER_F_CANONICAL, + DynamicBufferAppend, + static_cast(&xml_buffer)); + if (encode_rval.encoded == -1) + { + throw TelematicBridgeException("Failed to convert message with ID (=" + to_string(messageFrame->messageId) + ") to XML "); + } + return string(xml_buffer.buffer); + } + + /** + * @brief convert JSON value into string + * @param JSON input Json::Value + * @return string + */ + string JsonToString(const Json::Value &json) + { + Json::FastWriter fasterWirter; + string jsonStr = fasterWirter.write(json); + boost::replace_all(jsonStr, "\\n", ""); + boost::replace_all(jsonStr, "\n", ""); + boost::replace_all(jsonStr, "\\t", ""); + boost::replace_all(jsonStr, "\\", ""); + return jsonStr; + } + /** + * @brief convert string into JSON value + * @param string input string + * @return JSON::Value + */ + Json::Value StringToJson(const string &str) + { + Json::Value root; + Json::Reader reader; + bool parsingSuccessful = reader.parse(str, root); + if (!parsingSuccessful) + { + throw TelematicBridgeException("Error parsing the string"); + } + return root; + } + /** + * @brief Convert XML string into JSON string + */ + string xml2Json(const string &xml_str) + { + stringstream xmlss; + xmlss << xml_str; + pt::ptree root; + pt::read_xml(xmlss, root); + stringstream jsonss; + pt::write_json(jsonss, root, false); + return jsonss.str(); + } + /** + * @brief create JSON payload from given IVP message + * @param IVPMessage V2xHub interval exchanged message + * @return JSON value + */ + Json::Value IvpMessageToJson(const IvpMessage *msg) + { + Json::Value json; + if (msg->type) + { + json["type"] = msg->type; + } + + if (msg->subtype) + { + json["subType"] = msg->subtype; + } + + if (msg->dsrcMetadata) + { + json["channel"] = msg->dsrcMetadata->channel; + json["psid"] = msg->dsrcMetadata->psid; + } + + if (msg->encoding) + { + json["encoding"] = msg->encoding; + } + + if (msg->source) + { + json["source"] = msg->source; + } + json["sourceId"] = msg->sourceId; + json["flags"] = msg->flags; + json["timestamp"] = msg->timestamp; + if (msg->payload) + { + if (msg->payload->type == cJSON_Number) + { + json["payload"] = (msg->payload->valueint == 0 ? msg->payload->valuedouble : static_cast(msg->payload->valueint)); + } + else + { + json["payload"] = StringToJson(cJSON_Print(msg->payload)); + } + } + + return json; + } +} // TelematicBridge \ No newline at end of file diff --git a/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.cpp b/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.cpp new file mode 100644 index 000000000..a316084f4 --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.cpp @@ -0,0 +1,37 @@ +#include "TelematicBridgePlugin.h" + +namespace TelematicBridge +{ + TelematicBridgePlugin::TelematicBridgePlugin(const string &name) : PluginClient(name) + { + AddMessageFilter("*", "*", IvpMsgFlags_None); + AddMessageFilter("J2735", "*", IvpMsgFlags_RouteDSRC); + SubscribeToMessages(); + } + + void TelematicBridgePlugin::OnMessageReceived(IvpMessage *msg) + { + if (msg && msg->type) + { + auto json = IvpMessageToJson(msg); + // Process J2735 message payload hex string + if (strcasecmp(msg->type, Telematic_MSGTYPE_J2735_STRING) == 0) + { + auto messageFm = (MessageFrame_t *)calloc(1, sizeof(MessageFrame_t)); + DecodeJ2735Msg(msg->payload->valuestring, messageFm); + string xml_payload_str = ConvertJ2735FrameToXML(messageFm); + ASN_STRUCT_FREE(asn_DEF_MessageFrame, messageFm); + json["payload"] = StringToJson(xml2Json(xml_payload_str)); + } + + auto jsonStr = JsonToString(json); + PLOG(logINFO) << jsonStr; + } + } +} + +// The main entry point for this application. +int main(int argc, char *argv[]) +{ + return run_plugin("Telematic Bridge", argc, argv); +} \ No newline at end of file diff --git a/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.h b/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.h new file mode 100644 index 000000000..c790b6ca4 --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.h @@ -0,0 +1,26 @@ +#ifndef _TelematicBRIDGEPLUGIN_H_ +#define _TelematicBRIDGEPLUGIN_H_ + +#include "PluginClient.h" +#include "TelematicBridgeMsgWorker.h" + +using namespace tmx::utils; +using namespace std; +namespace TelematicBridge +{ + + + class TelematicBridgePlugin : public tmx::utils::PluginClient + { + private: + static CONSTEXPR const char *Telematic_MSGTYPE_J2735_STRING = "J2735"; + void OnMessageReceived(IvpMessage *msg); + + public: + explicit TelematicBridgePlugin(const string& name); + ~TelematicBridgePlugin() override = default; + }; + +} // namespace TelematicBridge + +#endif \ No newline at end of file diff --git a/src/v2i-hub/TelematicBridgePlugin/test/main.cpp b/src/v2i-hub/TelematicBridgePlugin/test/main.cpp new file mode 100644 index 000000000..d973578fc --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/test/main.cpp @@ -0,0 +1,7 @@ +#include + +int main(int argc, char *argv[]) +{ + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/v2i-hub/TelematicBridgePlugin/test/test_TelematicMsgWorker.cpp b/src/v2i-hub/TelematicBridgePlugin/test/test_TelematicMsgWorker.cpp new file mode 100644 index 000000000..9760ec906 --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/test/test_TelematicMsgWorker.cpp @@ -0,0 +1,106 @@ +#include +#include "TelematicBridgeMsgWorker.h" +#include "stdio.h" + +using namespace TelematicBridge; +using namespace std; + +class test_TelematicJ2735MsgWorker : public ::testing::Test +{ +}; + +TEST_F(test_TelematicJ2735MsgWorker, HexToBytes) +{ + vector byteBuff; + string bsmHex = "0014251d59d162dad7de266e9a7d1ea6d4220974ffffffff8ffff080fdfa1fa1007fff0000640fa0"; + auto success = HexToBytes(bsmHex, byteBuff); + ASSERT_TRUE(success); + ASSERT_EQ(bsmHex.size() / 2, byteBuff.size()); +} + +TEST_F(test_TelematicJ2735MsgWorker, DecodeJ2735Msg) +{ + auto messageFrame = (MessageFrame_t *)malloc(sizeof(MessageFrame_t)); + string bsmHex = "0014251d59d162dad7de266e9a7d1ea6d4220974ffffffff8ffff080fdfa1fa1007fff0000640fa0"; + ASSERT_NO_THROW(DecodeJ2735Msg(bsmHex, messageFrame)); + ASSERT_EQ(20, messageFrame->messageId); + ASN_STRUCT_FREE(asn_DEF_MessageFrame, messageFrame); +} + +TEST_F(test_TelematicJ2735MsgWorker, DecodeJ2735MsgFailure) +{ + auto messageFrame = (MessageFrame_t *)malloc(sizeof(MessageFrame_t)); + string badHex = "0014251d59d162dad7de266e9a7d1ea6d4220974ffffffff8ffff080fdfa1fa1007fff0000640fG0"; + ASSERT_THROW(DecodeJ2735Msg(badHex, messageFrame), TelematicBridgeException); + + badHex = "0014251d59d162dad7de266e9a"; + ASSERT_THROW(DecodeJ2735Msg(badHex, messageFrame), TelematicBridgeException); + ASN_STRUCT_FREE(asn_DEF_MessageFrame, messageFrame); +} + +TEST_F(test_TelematicJ2735MsgWorker, ConvertJ2735FrameToXML) +{ + auto messageFrame = (MessageFrame_t *)malloc(sizeof(MessageFrame_t)); + string bsmHex = "0014251d59d162dad7de266e9a7d1ea6d4220974ffffffff8ffff080fdfa1fa1007fff0000640fA0"; + ASSERT_NO_THROW(DecodeJ2735Msg(bsmHex, messageFrame)); + auto xmlStr = ConvertJ2735FrameToXML(messageFrame); + ASN_STRUCT_FREE(asn_DEF_MessageFrame, messageFrame); + string expectedXMLStr = "2011767458B6B24440389565434-7715004757452552556553581912880012720012001-127000000200500"; + ASSERT_EQ(expectedXMLStr, xmlStr); +} + +TEST_F(test_TelematicJ2735MsgWorker, constructTelematicPayload) +{ + IvpMessage msg; + auto payload = cJSON_CreateObject(); + payload->valuedouble = 12; + payload->type = cJSON_Number; + msg.payload = payload; + msg.source = "Plugin"; + msg.encoding = "json"; + msg.type = "Application"; + msg.subtype = "alive"; + msg.sourceId = 123; + msg.flags = 10; + msg.timestamp = 10; + IvpDsrcMetadata metadata; + metadata.channel = 12; + metadata.psid = 120; + msg.dsrcMetadata = &metadata; + auto json = IvpMessageToJson(&msg); + auto str = JsonToString(json); + string expectedStr = "{\"channel\":12,\"encoding\":\"json\",\"flags\":10,\"payload\":12.0,\"psid\":120,\"source\":\"Plugin\",\"sourceId\":123,\"subType\":\"alive\",\"timestamp\":10,\"type\":\"Application\"}"; + ASSERT_EQ(expectedStr, str); + + payload->valueint = 13; + payload->type = cJSON_Number; + msg.payload = payload; + json = IvpMessageToJson(&msg); + str = JsonToString(json); + expectedStr = "{\"channel\":12,\"encoding\":\"json\",\"flags\":10,\"payload\":13.0,\"psid\":120,\"source\":\"Plugin\",\"sourceId\":123,\"subType\":\"alive\",\"timestamp\":10,\"type\":\"Application\"}"; + ASSERT_EQ(expectedStr, str); + + payload->valuestring = "test"; + payload->type = cJSON_String; + msg.payload = payload; + json = IvpMessageToJson(&msg); + str = JsonToString(json); + expectedStr = "{\"channel\":12,\"encoding\":\"json\",\"flags\":10,\"payload\":\"test\",\"psid\":120,\"source\":\"Plugin\",\"sourceId\":123,\"subType\":\"alive\",\"timestamp\":10,\"type\":\"Application\"}"; + ASSERT_EQ(expectedStr, str); + + msg.payload = cJSON_Parse("[{\"test\":12}]"); + json = IvpMessageToJson(&msg); + str = JsonToString(json); + expectedStr = "{\"channel\":12,\"encoding\":\"json\",\"flags\":10,\"payload\":[{\"test\":12}],\"psid\":120,\"source\":\"Plugin\",\"sourceId\":123,\"subType\":\"alive\",\"timestamp\":10,\"type\":\"Application\"}"; + ASSERT_EQ(expectedStr, str); + + json = StringToJson("{\"test\":12}"); + ASSERT_EQ(12, json["test"].asInt64()); +} + +TEST_F(test_TelematicJ2735MsgWorker, xml2json) +{ + string expectedXMLStr = "2011767458B6B24440389565434-7715004757452552556553581912880012720012001-127000000200500"; + string expectedJSONStr = "{\"MessageFrame\":{\"messageId\":\"20\",\"value\":{\"BasicSafetyMessage\":{\"coreData\":{\"msgCnt\":\"117\",\"id\":\"67458B6B\",\"secMark\":\"24440\",\"lat\":\"389565434\",\"long\":\"-771500475\",\"elev\":\"745\",\"accuracy\":{\"semiMajor\":\"255\",\"semiMinor\":\"255\",\"orientation\":\"65535\"},\"transmission\":{\"neutral\":\"\"},\"speed\":\"8191\",\"heading\":\"28800\",\"angle\":\"127\",\"accelSet\":{\"long\":\"2001\",\"lat\":\"2001\",\"vert\":\"-127\",\"yaw\":\"0\"},\"brakes\":{\"wheelBrakes\":\"00000\",\"traction\":{\"unavailable\":\"\"},\"abs\":{\"unavailable\":\"\"},\"scs\":{\"unavailable\":\"\"},\"brakeBoost\":{\"unavailable\":\"\"},\"auxBrakes\":{\"unavailable\":\"\"}},\"size\":{\"width\":\"200\",\"length\":\"500\"}}}}}}\n"; + ASSERT_EQ(expectedJSONStr, xml2Json(expectedXMLStr)); +} \ No newline at end of file From 6b604b73b4b9fa68c70831ccd36d547412c94c9d Mon Sep 17 00:00:00 2001 From: dan-du-car <62157949+dan-du-car@users.noreply.github.com> Date: Thu, 30 Nov 2023 14:36:32 -0500 Subject: [PATCH 3/3] Telematic bridge nats (#567) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # PR Details ## Description This is the implementation story for the telematic bridge to stream J2735 data from V2xHub into the cloud at near real time. The module design refers to https://usdot-carma.atlassian.net/wiki/spaces/WFD2/pages/2640379977/V2xHub+WFD+Bridge+Design The following functionalities should be implemented for this module: Able to forward the JSON message to telematic cloud when the message type is requested by end users through telematic UI. Able to register itself with WFD cloud services. Able to map each TMX message type to each individual topic.   Able to provide available topics to telematic cloud. Able to stream the requested TMX message in JSON format to telematic cloud. ## Related Issue NA ## Motivation and Context Telematic tool data collection ## How Has This Been Tested? Unit test and local integration testing ## Types of changes - [ ] Defect fix (non-breaking change that fixes an issue) - [x] New feature (non-breaking change that adds functionality) - [ ] Breaking change (fix or feature that cause existing functionality to change) ## Checklist: - [ ] I have added any new packages to the sonar-scanner.properties file - [ ] My change requires a change to the documentation. - [ ] I have updated the documentation accordingly. - [x] I have read the **CONTRIBUTING** document. [V2XHUB Contributing Guide](https://github.com/usdot-fhwa-OPS/V2X-Hub/blob/develop/Contributing.md) - [ ] I have added tests to cover my changes. - [ ] All new and existing tests passed. --- .devcontainer/docker-compose-vscode.yml | 3 +- configuration/amd64/docker-compose.yml | 2 + .../src/simulation/SimulationEnvUtils.h | 6 + .../TelematicBridgePlugin/CMakeLists.txt | 6 +- src/v2i-hub/TelematicBridgePlugin/README.md | 96 ++++++ .../TelematicBridgePlugin/manifest.json | 10 + .../src/TelematicBridgePlugin.cpp | 54 ++- .../src/TelematicBridgePlugin.h | 19 +- .../src/TelematicUnit.cpp | 320 ++++++++++++++++++ .../TelematicBridgePlugin/src/TelematicUnit.h | 216 ++++++++++++ .../test/test_TelematicUnit.cpp | 156 +++++++++ 11 files changed, 876 insertions(+), 12 deletions(-) create mode 100644 src/v2i-hub/TelematicBridgePlugin/README.md create mode 100644 src/v2i-hub/TelematicBridgePlugin/src/TelematicUnit.cpp create mode 100644 src/v2i-hub/TelematicBridgePlugin/src/TelematicUnit.h create mode 100644 src/v2i-hub/TelematicBridgePlugin/test/test_TelematicUnit.cpp diff --git a/.devcontainer/docker-compose-vscode.yml b/.devcontainer/docker-compose-vscode.yml index cc113ce62..9035d1f9d 100755 --- a/.devcontainer/docker-compose-vscode.yml +++ b/.devcontainer/docker-compose-vscode.yml @@ -24,7 +24,8 @@ services: - SIM_V2X_PORT=5757 - SIM_INTERACTION_PORT=7576 - V2X_PORT=8686 - - INFRASTRUCTURE_ID=1 + - INFRASTRUCTURE_ID=rsu_ + - INFRASTRUCTURE_NAME= - SENSOR_JSON_FILE_PATH=/var/www/plugins/MAP/sensors.json secrets: - mysql_password diff --git a/configuration/amd64/docker-compose.yml b/configuration/amd64/docker-compose.yml index 6b5188297..0c9d5feb2 100755 --- a/configuration/amd64/docker-compose.yml +++ b/configuration/amd64/docker-compose.yml @@ -38,6 +38,8 @@ services: - db environment: - MYSQL_PASSWORD=/run/secrets/mysql_password + - INFRASTRUCTURE_ID=rsu_ + - INFRASTRUCTURE_NAME= secrets: - mysql_password volumes: diff --git a/src/tmx/TmxUtils/src/simulation/SimulationEnvUtils.h b/src/tmx/TmxUtils/src/simulation/SimulationEnvUtils.h index 7498a7922..efffc4ffa 100644 --- a/src/tmx/TmxUtils/src/simulation/SimulationEnvUtils.h +++ b/src/tmx/TmxUtils/src/simulation/SimulationEnvUtils.h @@ -66,6 +66,12 @@ namespace tmx::utils::sim{ * for CDASim connection. */ constexpr inline static const char *INFRASTRUCTURE_ID = "INFRASTRUCTURE_ID"; + + /** + * @brief Name of environment variable for storing infrastructure name of v2xhub. Only necessary in SIMULATION MODE + * for CDASim connection. + */ + constexpr inline static const char *INFRASTRUCTURE_NAME = "INFRASTRUCTURE_NAME"; /** * @brief Function to return bool indicating whether V2X-Hub deployment is in SIMULATION MODE or not. * @return true if SIMULATION_MODE is "true" or "TRUE" and false otherwise. diff --git a/src/v2i-hub/TelematicBridgePlugin/CMakeLists.txt b/src/v2i-hub/TelematicBridgePlugin/CMakeLists.txt index 5134ae38b..df7b1c0f7 100644 --- a/src/v2i-hub/TelematicBridgePlugin/CMakeLists.txt +++ b/src/v2i-hub/TelematicBridgePlugin/CMakeLists.txt @@ -3,13 +3,15 @@ PROJECT (TelematicBridgePlugin VERSION 7.5.1 LANGUAGES CXX) set (TMX_PLUGIN_NAME "Telematic Bridge") BuildTmxPlugin() -TARGET_LINK_LIBRARIES ( ${PROJECT_NAME} tmxutils jsoncpp) +TARGET_LINK_LIBRARIES ( ${PROJECT_NAME} tmxutils jsoncpp nats) #################################################### ################## Testing ####################### #################################################### +add_library(${PROJECT_NAME}_lib src/TelematicUnit.cpp) +target_link_libraries(${PROJECT_NAME}_lib PUBLIC tmxutils jsoncpp nats) enable_testing() include_directories(${PROJECT_SOURCE_DIR}/src) file(GLOB_RECURSE TEST_SOURCES LIST_DIRECTORIES false test/*.h test/*.cpp) add_executable(${PROJECT_NAME}_test ${TEST_SOURCES}) -target_link_libraries(${PROJECT_NAME}_test PRIVATE gtest tmxutils jsoncpp) \ No newline at end of file +target_link_libraries(${PROJECT_NAME}_test PRIVATE gtest ${PROJECT_NAME}_lib) \ No newline at end of file diff --git a/src/v2i-hub/TelematicBridgePlugin/README.md b/src/v2i-hub/TelematicBridgePlugin/README.md new file mode 100644 index 000000000..6ed0a0497 --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/README.md @@ -0,0 +1,96 @@ +## Telematic bridge introduction +Telematic bridge is a V2xHub plugin used to collect data stream generated by V2xHub and send the data stream into [telematic tool](https://github.com/usdot-fhwa-stol/cda-telematics) hosted in the AWS cloud. +## NATS Publisher/Subscriber +### NATS Connections and registration +#### Telematic plugin sends registration request to telematic server +##### Subject +NATS subject: *.register_unit +##### Request +``` +{ + "unit_id": "" +} +``` +##### Response +``` +{ + "unit_id": "", + "unit_type": "infrastructure", + "unit_name": "East Intersection", + "timestamp": "1678998191815233965", + "event_name": "wfd_integration_testing", + "location": "TFHRC", + "testing_type": "Integration" +} +``` + +### Available topics +#### Telematic plugin receives request from telematic UI +##### Subject +NATS subject: .available_topics +##### Request +``` +{ + "unit_id": "" +} +``` +##### Reply + +``` + { + "unit_id": "", + "unit_type": "infrastructure", + "unit_name": "East Intersection", + "timestamp": "1678998191815233965", + "event_name": "wfd_integration_testing", + "location": "TFHRC", + "testing_type": "Integration", + "topics": [ + { + "name": "J2735_TMSG03-P_CARMAStreetsPlugin" + }, + { + "name": "" + } + ] +} +``` + +### Selected topics +#### Telematic plugin receives selected topics from telematic UI +##### Subject +NATS subject: .publish_topics +#### Request +``` +{ + "unit_id": "", + "unit_type": "infrastructure", + "timestamp": 1663084528513000400, + "event_name": "wfd_integration_testing", + "location": "TFHRC", + "testing_type": "Integration", + "topics": [ + "", + "" + ] +} +``` +##### Reply +``` +"request received!" +``` + +## Check status +#### Telematic plugin receives live status request from telematic server +##### Subject +NATS subject: .check_status +##### Request +``` +{ + "unit_id": "" +} +``` +##### Reponse +``` +"OK" +``` \ No newline at end of file diff --git a/src/v2i-hub/TelematicBridgePlugin/manifest.json b/src/v2i-hub/TelematicBridgePlugin/manifest.json index 530dbe0eb..d133ceb0f 100644 --- a/src/v2i-hub/TelematicBridgePlugin/manifest.json +++ b/src/v2i-hub/TelematicBridgePlugin/manifest.json @@ -11,6 +11,16 @@ "key": "LogLevel", "default": "INFO", "description": "The log level for this plugin" + }, + { + "key": "NATSUrl", + "default": "nats://127.0.0.1:4222", + "description": "The NATS connection URL" + }, + { + "key": "MessageExclusionList", + "default": "System_KeepAlive_CommandPlugin,System_KeepAlive_CARMAStreetsPlugin,System_KeepAlive_CDASimAdapter,System_KeepAlive_MessageReceiver", + "description": "The list of messages are excluded from the available message list. Message name is a combination of message type, subtype and source separated by underscore. E.G: type_subtype_source" } ] } \ No newline at end of file diff --git a/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.cpp b/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.cpp index a316084f4..44a6738f2 100644 --- a/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.cpp +++ b/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.cpp @@ -1,9 +1,16 @@ #include "TelematicBridgePlugin.h" +using namespace tmx::utils; +using namespace std; + namespace TelematicBridge { TelematicBridgePlugin::TelematicBridgePlugin(const string &name) : PluginClient(name) { + _telematicUnitPtr = make_unique(); + _unitId = std::getenv("INFRASTRUCTURE_ID"); + _unitName = std::getenv("INFRASTRUCTURE_NAME"); + UpdateConfigSettings(); AddMessageFilter("*", "*", IvpMsgFlags_None); AddMessageFilter("J2735", "*", IvpMsgFlags_RouteDSRC); SubscribeToMessages(); @@ -20,14 +27,53 @@ namespace TelematicBridge auto messageFm = (MessageFrame_t *)calloc(1, sizeof(MessageFrame_t)); DecodeJ2735Msg(msg->payload->valuestring, messageFm); string xml_payload_str = ConvertJ2735FrameToXML(messageFm); - ASN_STRUCT_FREE(asn_DEF_MessageFrame, messageFm); - json["payload"] = StringToJson(xml2Json(xml_payload_str)); + ASN_STRUCT_FREE(asn_DEF_MessageFrame, messageFm); + string json_payload_str = xml2Json(xml_payload_str.c_str()); + json["payload"] = StringToJson(json_payload_str); } - auto jsonStr = JsonToString(json); - PLOG(logINFO) << jsonStr; + stringstream topic; + topic << (msg->type ? msg->type : "") << "_" << (msg->subtype ? msg->subtype : "") << "_" << (msg->source ? msg->source : ""); + auto topicStr = topic.str(); + _telematicUnitPtr->updateAvailableTopics(topicStr); + if (_telematicUnitPtr->inSelectedTopics(topicStr)) + { + _telematicUnitPtr->publishMessage(topicStr, json); + } + } + } + + void TelematicBridgePlugin::UpdateConfigSettings() + { + lock_guard lock(_configMutex); + GetConfigValue("NATSUrl", _natsURL); + GetConfigValue("MessageExclusionList", _excludedMessages); + unit_st unit = {_unitId, _unitName, UNIT_TYPE_INFRASTRUCTURE}; + if (_telematicUnitPtr) + { + _telematicUnitPtr->setUnit(unit); + _telematicUnitPtr->updateExcludedTopics(_excludedMessages); + } + } + + void TelematicBridgePlugin::OnStateChange(IvpPluginState state) + { + PluginClient::OnStateChange(state); + if (state == IvpPluginState_registered) + { + UpdateConfigSettings(); + if (_telematicUnitPtr) + { + _telematicUnitPtr->connect(_natsURL); + } } } + + void TelematicBridgePlugin::OnConfigChanged(const char *key, const char *value) + { + PluginClient::OnConfigChanged(key, value); + UpdateConfigSettings(); + } } // The main entry point for this application. diff --git a/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.h b/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.h index c790b6ca4..a75b21bbe 100644 --- a/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.h +++ b/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.h @@ -3,21 +3,30 @@ #include "PluginClient.h" #include "TelematicBridgeMsgWorker.h" +#include "TelematicUnit.h" +#include + -using namespace tmx::utils; -using namespace std; namespace TelematicBridge { - - class TelematicBridgePlugin : public tmx::utils::PluginClient { private: static CONSTEXPR const char *Telematic_MSGTYPE_J2735_STRING = "J2735"; + static CONSTEXPR const char *UNIT_TYPE_INFRASTRUCTURE = "Infrastructure"; + std::unique_ptr _telematicUnitPtr; + std::string _unitId; + std::string _unitName; + std::string _natsURL; + std::string _excludedMessages; + std::mutex _configMutex; void OnMessageReceived(IvpMessage *msg); public: - explicit TelematicBridgePlugin(const string& name); + explicit TelematicBridgePlugin(const std::string &name); + void OnConfigChanged(const char *key, const char *value) override; + void OnStateChange(IvpPluginState state) override; + void UpdateConfigSettings(); ~TelematicBridgePlugin() override = default; }; diff --git a/src/v2i-hub/TelematicBridgePlugin/src/TelematicUnit.cpp b/src/v2i-hub/TelematicBridgePlugin/src/TelematicUnit.cpp new file mode 100644 index 000000000..e99029b53 --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/src/TelematicUnit.cpp @@ -0,0 +1,320 @@ +#include "TelematicUnit.h" + +using namespace std; +using namespace tmx::utils; +using namespace std::chrono; + +namespace TelematicBridge +{ + void TelematicUnit::connect(const string &natsURL) + { + auto s = natsConnection_ConnectTo(&_conn, natsURL.c_str()); + PLOG(logINFO) << "NATS connection returned: " << natsStatus_GetText(s); + if (s == NATS_OK) + { + registerUnitRequestor(); + } + else + { + throw TelematicBridgeException(natsStatus_GetText(s)); + } + } + + void TelematicUnit::registerUnitRequestor() + { + // Reset registration status + bool isRegistered = false; + int attempts_count = 0; + + while (!isRegistered && attempts_count < REGISTRATION_MAX_ATTEMPTS) + { + attempts_count++; + PLOG(logDEBUG2) << "Inside register unit requestor"; + natsMsg *reply = nullptr; + string payload = "{\"unit_id\":\"" + _unit.unitId + "\"}"; + auto s = natsConnection_RequestString(&reply, _conn, REGISTER_UNIT_TOPIC, payload.c_str(), TIME_OUT); + if (s == NATS_OK) + { + auto replyStr = natsMsg_GetData(reply); + PLOG(logINFO) << "Received registered reply: " << replyStr; + + // Unit is registered when server responds with event information (location, testing_type, event_name) + isRegistered = validateRegisterStatus(replyStr); + natsMsg_Destroy(reply); + } + else + { + throw TelematicBridgeException(natsStatus_GetText(s)); + } + sleep(1); + } + + if (isRegistered) + { + // Provide below services when the unit is registered + availableTopicsReplier(); + selectedTopicsReplier(); + checkStatusReplier(); + } + } + + bool TelematicUnit::validateRegisterStatus(const string ®isterReply) + { + auto root = parseJson(registerReply); + if (root.isMember(LOCATION_KEY) && root.isMember(TESTING_TYPE_KEY) && root.isMember(EVENT_NAME_KEY)) + { + _eventLocation = root[LOCATION_KEY].asString(); + _testingType = root[TESTING_TYPE_KEY].asString(); + _eventName = root[EVENT_NAME_KEY].asString(); + return true; + } + PLOG(logERROR) << "Failed to register unit as event information (locatoin, testing type and event name) does not exist."; + return false; + } + + void TelematicUnit::availableTopicsReplier() + { + if (!_subAvailableTopic) + { + PLOG(logDEBUG2) << "Inside available topic replier"; + stringstream topic; + topic << _unit.unitId << AVAILABLE_TOPICS; + natsConnection_Subscribe(&_subAvailableTopic, _conn, topic.str().c_str(), onAvailableTopicsCallback, this); + } + } + + void TelematicUnit::selectedTopicsReplier() + { + if (!_subSelectedTopic) + { + PLOG(logDEBUG2) << "Inside selected topic replier"; + stringstream topic; + topic << _unit.unitId << PUBLISH_TOPICS; + natsConnection_Subscribe(&_subSelectedTopic, _conn, topic.str().c_str(), onSelectedTopicsCallback, this); + } + } + + void TelematicUnit::checkStatusReplier() + { + if (!_subCheckStatus) + { + PLOG(logDEBUG2) << "Inside check status replier"; + stringstream topic; + topic << _unit.unitId << CHECK_STATUS; + natsConnection_Subscribe(&_subCheckStatus, _conn, topic.str().c_str(), onCheckStatusCallback, this); + } + } + + void TelematicUnit::publishMessage(const string &topic, const Json::Value &payload) + { + auto pubMsgTopic = "streets." + _unit.unitId + ".data." + topic; + auto jsonStr = constructPublishedDataString(_unit, _eventLocation, _testingType, _eventName, topic, payload); + auto s = natsConnection_PublishString(_conn, pubMsgTopic.c_str(), jsonStr.c_str()); + if (s == NATS_OK) + { + PLOG(logINFO) << "Topic: " << pubMsgTopic << ". Published: " << jsonStr; + } + else + { + throw TelematicBridgeException(natsStatus_GetText(s)); + } + } + + void TelematicUnit::onAvailableTopicsCallback(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *object) + { + PLOG(logDEBUG3) << "Received available topics: " << natsMsg_GetSubject(msg) << " " << natsMsg_GetData(msg); + // Sends a reply + if (object && natsMsg_GetReply(msg) != nullptr) + { + const auto obj = (TelematicUnit *)object; + auto reply = constructAvailableTopicsReplyString(obj->getUnit(), obj->getEventLocation(), obj->getTestingType(), obj->getEventName(), obj->getAvailableTopics(), obj->getExcludedTopics()); + auto s = natsConnection_PublishString(nc, natsMsg_GetReply(msg), reply.c_str()); + natsMsg_Destroy(msg); + if (s == NATS_OK) + { + PLOG(logDEBUG3) << "Available topics replied: " << reply; + } + } + } + + void TelematicUnit::onSelectedTopicsCallback(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *object) + { + PLOG(logDEBUG3) << "Received selected topics: " << natsMsg_GetSubject(msg) << " " << natsMsg_GetData(msg); + // Sends a reply + if (natsMsg_GetReply(msg) != nullptr) + { + auto msgStr = natsMsg_GetData(msg); + auto root = parseJson(msgStr); + if (object && root.isMember(TOPICS_KEY) && root[TOPICS_KEY].isArray()) + { + auto obj = (TelematicUnit *)object; + // clear old selected topics + obj->clearSelectedTopics(); + + // update selected topics with selected topics from latest request + for (auto itr = root[TOPICS_KEY].begin(); itr != root[TOPICS_KEY].end(); itr++) + { + obj->addSelectedTopic(itr->asString()); + } + } + string reply = "request received!"; + auto s = natsConnection_PublishString(nc, natsMsg_GetReply(msg), reply.c_str()); + natsMsg_Destroy(msg); + if (s == NATS_OK) + { + PLOG(logDEBUG3) << "Selected topics replied: " << reply; + } + } + } + + void TelematicUnit::onCheckStatusCallback(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *object) + { + if (natsMsg_GetReply(msg) != nullptr) + { + auto s = natsConnection_PublishString(nc, natsMsg_GetReply(msg), "OK"); + if (s == NATS_OK) + { + PLOG(logDEBUG3) << "Received check status msg: " << natsMsg_GetSubject(msg) << " " << natsMsg_GetData(msg) << ". Replied: OK"; + } + natsMsg_Destroy(msg); + } + } + + string TelematicUnit::constructPublishedDataString(const unit_st &unit, const string &eventLocation, const string &testingType, const string &eventName, const string &topicName, const Json::Value &payload) const + { + Json::Value message; + message[UNIT_ID_KEY] = unit.unitId; + message[UNIT_NAME_KEY] = unit.unitName; + message[UNIT_TYPE_KEY] = unit.unitType; + message[LOCATION_KEY] = eventLocation; + message[TESTING_TYPE_KEY] = testingType; + message[EVENT_NAME_KEY] = eventName; + message[TOPIC_NAME_KEY] = topicName; + message[TIMESTAMP_KEY] = payload.isMember("timestamp") ? payload["timestamp"].asUInt64() * MILLI_TO_MICRO : duration_cast(system_clock::now().time_since_epoch()).count(); + message[PAYLOAD_KEY] = payload; + Json::FastWriter fasterWirter; + string jsonStr = fasterWirter.write(message); + return jsonStr; + } + + Json::Value TelematicUnit::parseJson(const string &jsonStr) + { + Json::Value root; + Json::Reader reader; + bool parsingSuccessful = reader.parse(jsonStr, root); + if (!parsingSuccessful) + { + throw TelematicBridgeException("Error parsing the reply message"); + } + return root; + } + + string TelematicUnit::constructAvailableTopicsReplyString(const unit_st &unit, const string &eventLocation, const string &testingType, const string &eventName, const vector &availableTopicList, const string &excludedTopics) + { + Json::Value message; + message[UNIT_ID_KEY] = unit.unitId; + message[UNIT_NAME_KEY] = unit.unitName; + message[UNIT_TYPE_KEY] = unit.unitType; + message[LOCATION_KEY] = eventLocation; + message[TESTING_TYPE_KEY] = testingType; + message[EVENT_NAME_KEY] = eventName; + message[TIMESTAMP_KEY] = duration_cast(system_clock::now().time_since_epoch()).count(); + Json::Value topics; + for (const auto &topic : availableTopicList) + { + if (!boost::icontains(excludedTopics, topic)) + { + Json::Value topicJson; + topicJson[NAME_KEY] = topic; + topics.append(topicJson); + } + } + message[TOPICS_KEY] = topics; + Json::FastWriter fasterWirter; + string reply = fasterWirter.write(message); + return reply; + } + + void TelematicUnit::updateAvailableTopics(const string &newTopic) + { + if (find(_availableTopics.begin(), _availableTopics.end(), newTopic) == _availableTopics.end()) + { + lock_guard lock(_availableTopicsMutex); + _availableTopics.push_back(newTopic); + PLOG(logINFO) << "Add topic (= " << newTopic << ") to available topics list. Size: " << _availableTopics.size(); + } + } + + void TelematicUnit::updateExcludedTopics(const string &excludedTopics) + { + lock_guard lock(_excludedTopicsMutex); + _excludedTopics = excludedTopics; + } + + bool TelematicUnit::inSelectedTopics(const string &topic) + { + if (find(_selectedTopics.begin(), _selectedTopics.end(), topic) == _selectedTopics.end()) + { + return false; + } + return true; + } + + void TelematicUnit::setUnit(const unit_st &unit) + { + lock_guard lock(_unitMutex); + _unit = unit; + } + + unit_st TelematicUnit::getUnit() const + { + return _unit; + } + + string TelematicUnit::getEventName() const + { + return _eventName; + } + + string TelematicUnit::getEventLocation() const + { + return _eventLocation; + } + + string TelematicUnit::getTestingType() const + { + return _testingType; + } + + vector TelematicUnit::getAvailableTopics() const + { + return _availableTopics; + } + + string TelematicUnit::getExcludedTopics() const + { + return _excludedTopics; + } + + void TelematicUnit::addSelectedTopic(const string &newSelectedTopic) + { + _selectedTopics.push_back(newSelectedTopic); + } + + void TelematicUnit::clearSelectedTopics() + { + _selectedTopics.clear(); + } + + TelematicUnit::~TelematicUnit() + { + natsSubscription_Destroy(_subAvailableTopic); + natsSubscription_Destroy(_subSelectedTopic); + natsSubscription_Destroy(_subCheckStatus); + natsConnection_Destroy(_conn); + _conn = nullptr; + _subAvailableTopic = nullptr; + _subSelectedTopic = nullptr; + _subCheckStatus = nullptr; + } +} \ No newline at end of file diff --git a/src/v2i-hub/TelematicBridgePlugin/src/TelematicUnit.h b/src/v2i-hub/TelematicBridgePlugin/src/TelematicUnit.h new file mode 100644 index 000000000..3a72f080b --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/src/TelematicUnit.h @@ -0,0 +1,216 @@ +#pragma once +#include +#include +#include +#include +#include +#include "PluginLog.h" +#include "ThreadTimer.h" +#include "TelematicBridgeException.h" +#include +#include + + +namespace TelematicBridge +{ + using unit_st = struct unit + { + std::string unitId; // Unique identifier for each unit + std::string unitName; // Descriptive name for each unit + std::string unitType; // Unit categorized base on unit type: platform or infrastructure + }; + + class TelematicUnit + { + private: + std::mutex _unitMutex; + std::mutex _availableTopicsMutex; + std::mutex _excludedTopicsMutex; + unit_st _unit; // Global variable to store the unit information + std::vector _availableTopics; // Global variable to store available topics + std::string _excludedTopics; // Global variable to store topics that are excluded by the users + std::vector _selectedTopics; // Global variable to store selected topics confirmed by users + static CONSTEXPR const char *AVAILABLE_TOPICS = ".available_topics"; // NATS subject to pub/sub available topics + static CONSTEXPR const char *REGISTER_UNIT_TOPIC = "*.register_unit"; // NATS subject to pub/sub registering unit + static CONSTEXPR const char *PUBLISH_TOPICS = ".publish_topics"; // NATS subject to publish data stream + static CONSTEXPR const char *CHECK_STATUS = ".check_status"; // NATS subject to pub/sub checking unit status + natsConnection *_conn = nullptr; // Global NATS connection object + natsSubscription *_subAvailableTopic = nullptr; // Global NATS subscription object + natsSubscription *_subSelectedTopic = nullptr; // Global NATS subscription object + natsSubscription *_subCheckStatus = nullptr; // Global NATS subscription object + int64_t TIME_OUT = 10000; // NATS Connection time out in milliseconds + std::string _eventName; // Testing event the unit is assigned to + std::string _eventLocation; // Testing event location + std::string _testingType; // Testing type + static CONSTEXPR const char *LOCATION_KEY = "location"; // location key used to find location value from JSON + static CONSTEXPR const char *TESTING_TYPE_KEY = "testing_type"; // testing_type key used to find testing_type value from JSON + static CONSTEXPR const char *EVENT_NAME_KEY = "event_name"; // event_name key used to find event_name value from JSON + static CONSTEXPR const char *UNIT_ID_KEY = "unit_id"; // unit_id key used to find unit_id value from JSON + static CONSTEXPR const char *UNIT_NAME_KEY = "unit_name"; // unit_name key used to find unit_name value from JSON + static CONSTEXPR const char *UNIT_TYPE_KEY = "unit_type"; // unit_type key used to find unit_type value from JSON + static CONSTEXPR const char *TOPIC_NAME_KEY = "topic_name"; // topic_name key used to find topic_name value from JSON + static CONSTEXPR const char *TIMESTAMP_KEY = "timestamp"; // timestamp key used to find timestamp value from JSON + static CONSTEXPR const char *PAYLOAD_KEY = "payload"; // payload key used to find payload value from JSON + static CONSTEXPR const char *TOPICS_KEY = "topics"; // topics key used to find topics value from JSON + static CONSTEXPR const char *NAME_KEY = "name"; // topics key used to find topics value from JSON + static const int MILLI_TO_MICRO = 1000; + static const int REGISTRATION_MAX_ATTEMPTS = 30; //The maximum numbers of attempts allowed to register this unit with server + + public: + /** + *@brief Construct telematic unit + */ + explicit TelematicUnit() = default; + /** + * @brief A function for telematic unit to connect to NATS server. Throw exception is connection failed. * + * @param string string NATS server URL + */ + void connect(const std::string &natsURL); + + /** + * @brief A NATS requestor for telematic unit to send register request to NATS server. + * If receives a response, it will update the isRegistered flag to indicate the unit is registered. + * If no response after the specified time out (unit of second) period, it considered register failed. + * */ + void registerUnitRequestor(); + + /** + * @brief A NATS replier to subscribe to NATS server and receive available topics request. + * Publish list of available topics after receiving/processing the request. + */ + void availableTopicsReplier(); + + /** + * @brief A NATS replier to subscribe to NATS server and receive requested topics from telematic server. + * Process the request and update the selectedTopics global variable. + * Respond the telematic server with acknowledgement. + */ + void selectedTopicsReplier(); + + /** + * @brief A NATS replier to subscribe to NATS server and receive request for status check from telematic server. + * Publish unit status upon receiving a request. + */ + void checkStatusReplier(); + + /** + * @brief A function to publish message stream into NATS server + */ + void publishMessage(const std::string &topic, const Json::Value &payload); + + /** + * @brief A function to parse a JSON string and create a JSON object. + * @param string input json string + * @return Json::Value + */ + static Json::Value parseJson(const std::string &jsonStr); + + /** + * @brief construct available topic response + * @param unit_st struct that contains unit related information + * @param vector of available topics + * @param string Excluded topics separated by commas + */ + static std::string constructAvailableTopicsReplyString(const unit_st &unit, const std::string &eventLocation, const std::string &testingType, const std::string &eventName, const std::vector &availableTopicList, const std::string &excludedTopics); + + /** + * @brief A function to update available topics global variables when discovering a new topic. + */ + void updateAvailableTopics(const std::string &newTopic); + + /** + * @brief Update telematic unit registration status when receiving registration reply from NATS server + * @param string Register reply in Json format + * @return True when reply with event information (location, testing type, event name), otherwise false. + */ + bool validateRegisterStatus(const std::string& registerReply); + + /** + * @brief construct Json data string that will be streamed into the cloud by a publisher + * @param unit_st struct that contains unit related information + * @param string Event location + * @param string Testing type + * @param string Event name + * @param string Topic name is a combination of type_subtype_source from TMX IVPMessage + * @param Json::Value Payload is the actual data generated by V2xHub plugin + */ + std::string constructPublishedDataString(const unit_st &unit, const std::string &eventLocation, const std::string &testingType, const std::string &eventName, const std::string &topicName, const Json::Value &payload) const; + + /** + * @brief A function to update global unit variable + * @param unit_st object that has the unit id, type and name information + */ + void setUnit(const unit_st &unit); + + /** + * @brief Return unit structure + */ + unit_st getUnit() const; + + /** + * @brief Return list of available topics + */ + std::vector getAvailableTopics() const; + + /** + * @brief Return excluded topics string. + */ + std::string getExcludedTopics() const; + + /** + * @brief Return event name + */ + std::string getEventName() const; + + /** + * @brief Return event location + */ + std::string getEventLocation() const; + + /** + * @brief Return testing type + */ + std::string getTestingType() const; + + /** + * @brief Add new selected topic into the selected topics list + */ + void addSelectedTopic(const std::string &newSelectedTopic); + + /** + * @brief Clear selected topics list + */ + void clearSelectedTopics(); + + /** + * @brief A function to update excluded topics. + * @param string Excluded topics separated by commas + */ + void updateExcludedTopics(const std::string &excludedTopics); + + /** + * @brief Check if the given topic is inside the selectedTopics list + * @param string A topic to check for existence + * @return boolean indicator whether the input topic eixst. + */ + bool inSelectedTopics(const std::string &topic); + + /** + * @brief A callback function for available topic replier + */ + static void onAvailableTopicsCallback(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *object); + + /** + * @brief A callback function for selected topic replier + */ + static void onSelectedTopicsCallback(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *object); + + /** + * @brief A callback function for check status replier + */ + static void onCheckStatusCallback(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *object); + + ~TelematicUnit(); + }; + +} // namespace TelematicBridge diff --git a/src/v2i-hub/TelematicBridgePlugin/test/test_TelematicUnit.cpp b/src/v2i-hub/TelematicBridgePlugin/test/test_TelematicUnit.cpp new file mode 100644 index 000000000..9a81427b4 --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/test/test_TelematicUnit.cpp @@ -0,0 +1,156 @@ +#include +#include "TelematicUnit.h" +using namespace std; +using namespace tmx::utils; +using namespace std::chrono; + +namespace TelematicBridge +{ + class test_TelematicUnit : public ::testing::Test + { + public: + shared_ptr _telematicUnitPtr = make_shared(); + unit_st unit = + { + "test_id", + "test_name", + "infrastructure"}; + }; + + TEST_F(test_TelematicUnit, setUnit) + { + ASSERT_NO_THROW(_telematicUnitPtr->setUnit(unit)); + ASSERT_EQ(unit.unitId, _telematicUnitPtr->getUnit().unitId); + } + + TEST_F(test_TelematicUnit, updateExcludedTopics) + { + ASSERT_NO_THROW(_telematicUnitPtr->updateExcludedTopics("test_topic")); + } + + TEST_F(test_TelematicUnit, updateAvailableTopics) + { + ASSERT_NO_THROW(_telematicUnitPtr->updateAvailableTopics("test_topic")); + } + + TEST_F(test_TelematicUnit, constructAvailableTopicsReplyString) + { + vector topics = {"test_topic", "excluded_topic"}; + string excluded_topic = "excluded_topic"; + string eventLocation = "location"; + string testingType = "unit_test"; + string eventName = "testing"; + auto reply = TelematicUnit::constructAvailableTopicsReplyString(unit, eventLocation, testingType, eventName, topics, excluded_topic); + auto json = TelematicUnit::parseJson(reply); + ASSERT_EQ("test_topic", json["topics"][0]["name"].asString()); + } + + TEST_F(test_TelematicUnit, constructPublishedDataString) + { + string eventLocation = "location"; + string testingType = "unit_test"; + string eventName = "testing"; + string topicName = "test_topic"; + Json::Value payload; + payload["timestamp"] = 1701099016033; + + auto reply = _telematicUnitPtr->constructPublishedDataString(unit, eventLocation, testingType, eventName, topicName, payload); + auto json = TelematicUnit::parseJson(reply); + ASSERT_EQ(eventLocation, json["location"].asString()); + ASSERT_EQ(testingType, json["testing_type"].asString()); + ASSERT_EQ(eventName, json["event_name"].asString()); + ASSERT_EQ(1701099016033000, json["timestamp"].asUInt64()); + ASSERT_THROW(TelematicUnit::parseJson("Invalid Json"), TelematicBridgeException); + + Json::Value payload2; + payload2["body"] = "invalid"; + reply = _telematicUnitPtr->constructPublishedDataString(unit, eventLocation, testingType, eventName, topicName, payload2); + json = TelematicUnit::parseJson(reply); + ASSERT_NEAR(duration_cast(system_clock::now().time_since_epoch()).count(), json["timestamp"].asUInt64(), 100); + } + + TEST_F(test_TelematicUnit, onCheckStatusCallback) + { + natsMsg *msg; + string data = "{\"data\":\"test\"}"; + natsMsg_Create(&msg, "test_subject", "Test_reply", data.c_str(), data.size()); + ASSERT_NO_THROW(TelematicUnit::onCheckStatusCallback(nullptr, nullptr, msg, nullptr)); + } + + TEST_F(test_TelematicUnit, onSelectedTopicsCallback) + { + natsMsg *msg; + string data = "{\"topics\":[\"test_topic\"]}"; + natsMsg_Create(&msg, "test_subject", "Test_reply", data.c_str(), data.size()); + ASSERT_NO_THROW(TelematicUnit::onSelectedTopicsCallback(nullptr, nullptr, msg, _telematicUnitPtr.get())); + ASSERT_TRUE(_telematicUnitPtr->inSelectedTopics("test_topic")); + } + + TEST_F(test_TelematicUnit, onAvailableTopicsCallback) + { + natsMsg *msg; + string data = "{\"data\":\"test\"}"; + natsMsg_Create(&msg, "test_subject", "Test_reply", data.c_str(), data.size()); + ASSERT_NO_THROW(TelematicUnit::onAvailableTopicsCallback(nullptr, nullptr, msg, _telematicUnitPtr.get())); + } + + TEST_F(test_TelematicUnit, publishMessage) + { + string topicName = "test_topic"; + Json::Value payload; + payload["body"] = "test_body"; + ASSERT_THROW(_telematicUnitPtr->publishMessage(topicName, payload), TelematicBridgeException); + } + + TEST_F(test_TelematicUnit, checkStatusReplier) + { + _telematicUnitPtr->checkStatusReplier(); + } + + TEST_F(test_TelematicUnit, selectedTopicsReplier) + { + _telematicUnitPtr->selectedTopicsReplier(); + } + + TEST_F(test_TelematicUnit, availableTopicsReplier) + { + _telematicUnitPtr->availableTopicsReplier(); + } + + TEST_F(test_TelematicUnit, registerUnitRequestor) + { + ASSERT_THROW(_telematicUnitPtr->registerUnitRequestor(), TelematicBridgeException); + } + + TEST_F(test_TelematicUnit, connect) + { + ASSERT_THROW(_telematicUnitPtr->connect("nats://127.0.0.1:4222"), TelematicBridgeException); + } + + TEST_F(test_TelematicUnit, getters) + { + ASSERT_EQ(0, _telematicUnitPtr->getAvailableTopics().size()); + ASSERT_EQ("", _telematicUnitPtr->getEventLocation()); + ASSERT_EQ("", _telematicUnitPtr->getEventName()); + ASSERT_EQ("", _telematicUnitPtr->getExcludedTopics()); + ASSERT_EQ("", _telematicUnitPtr->getTestingType()); + } + + TEST_F(test_TelematicUnit, selectedTopics) + { + string selectedTopic = "test_selected_topics"; + _telematicUnitPtr->addSelectedTopic(selectedTopic); + ASSERT_TRUE(_telematicUnitPtr->inSelectedTopics(selectedTopic)); + _telematicUnitPtr->clearSelectedTopics(); + ASSERT_FALSE(_telematicUnitPtr->inSelectedTopics(selectedTopic)); + } + TEST_F(test_TelematicUnit, validateRegisterStatus) + { + string replyStr = "{\"event_name\":\"Test\",\"location\":\"Local\",\"testing_type\":\"Integration\"}"; + _telematicUnitPtr->validateRegisterStatus(replyStr); + ASSERT_EQ("Local", _telematicUnitPtr->getEventLocation()); + ASSERT_EQ("Test", _telematicUnitPtr->getEventName()); + ASSERT_EQ("Integration", _telematicUnitPtr->getTestingType()); + } + +} \ No newline at end of file