diff --git a/.devcontainer/docker-compose-vscode.yml b/.devcontainer/docker-compose-vscode.yml index cc113ce62..9035d1f9d 100755 --- a/.devcontainer/docker-compose-vscode.yml +++ b/.devcontainer/docker-compose-vscode.yml @@ -24,7 +24,8 @@ services: - SIM_V2X_PORT=5757 - SIM_INTERACTION_PORT=7576 - V2X_PORT=8686 - - INFRASTRUCTURE_ID=1 + - INFRASTRUCTURE_ID=rsu_ + - INFRASTRUCTURE_NAME= - SENSOR_JSON_FILE_PATH=/var/www/plugins/MAP/sensors.json secrets: - mysql_password diff --git a/.sonarqube/sonar-scanner.properties b/.sonarqube/sonar-scanner.properties index f6b40e299..06424d6ef 100644 --- a/.sonarqube/sonar-scanner.properties +++ b/.sonarqube/sonar-scanner.properties @@ -55,7 +55,9 @@ sonar.modules= PedestrianPlugin, \ MessageReceiverPlugin, \ CARMAStreetsPlugin, \ ERVCloudForwardingPlugin, \ - CDASimAdapter + CDASimAdapter, \ + RSUHealthMonitorPlugin, \ + TelematicBridgePlugin @@ -82,6 +84,8 @@ TimPlugin.sonar.projectBaseDir =src/v2i-hub/TimPlugin CARMAStreetsPlugin.sonar.projectBaseDir =src/v2i-hub/CARMAStreetsPlugin ERVCloudForwardingPlugin.sonar.projectBaseDir =src/v2i-hub/ERVCloudForwardingPlugin CDASimAdapter.sonar.projectBaseDir =src/v2i-hub/CDASimAdapter +RSUHealthMonitorPlugin.sonar.projectBaseDir =src/v2i-hub/RSUHealthMonitorPlugin +TelematicBridgePlugin.sonar.projectBaseDir =src/v2i-hub/TelematicBridgePlugin @@ -117,7 +121,10 @@ CARMAStreetsPlugin.sonar.exclusions =test/** ERVCloudForwardingPlugin.sonar.sources =src CDASimAdapter.sonar.sources =src CDASimAdapter.sonar.exclusions =test/** - +RSUHealthMonitorPlugin.sonar.sources =src +RSUHealthMonitorPlugin.sonar.exclusions =test/** +TelematicBridgePlugin.sonar.sources =src +TelematicBridgePlugin.sonar.exclusions =test/** # Tests # Note: For C++ setting this field does not cause test analysis to occur. It only allows the test source code to be evaluated. @@ -145,3 +152,5 @@ SpatPlugin.sonar.tests=test CARMAStreetsPlugin.sonar.tests=test ERVCloudForwardingPlugin.sonar.tests=test CDASimAdapter.sonar.tests=test +RSUHealthMonitorPlugin.sonar.tests=test +TelematicBridgePlugin.sonar.tests=test diff --git a/configuration/amd64/docker-compose.yml b/configuration/amd64/docker-compose.yml index 6b5188297..0c9d5feb2 100755 --- a/configuration/amd64/docker-compose.yml +++ b/configuration/amd64/docker-compose.yml @@ -38,6 +38,8 @@ services: - db environment: - MYSQL_PASSWORD=/run/secrets/mysql_password + - INFRASTRUCTURE_ID=rsu_ + - INFRASTRUCTURE_NAME= secrets: - mysql_password volumes: diff --git a/ext/build.sh b/ext/build.sh index 03d2ec4fb..91d59ca8a 100755 --- a/ext/build.sh +++ b/ext/build.sh @@ -50,9 +50,25 @@ popd # GPS Parser pushd /tmp +if [ -d "NemaTode" ]; then + rm -r NemaTode +fi git clone https://github.com/ckgt/NemaTode.git cd NemaTode cmake . make -j${numCPU} make install -popd \ No newline at end of file +popd + +# Nats C API +pushd /tmp +if [ -d "nats.c" ]; then + rm -r nats.c +fi +git clone https://github.com/nats-io/nats.c +cd nats.c +cmake . -DNATS_BUILD_NO_SPIN=ON +make -j${numCPU} +make install +popd + diff --git a/scripts/install_dependencies.sh b/scripts/install_dependencies.sh index 951d13967..6f4e2e4a7 100755 --- a/scripts/install_dependencies.sh +++ b/scripts/install_dependencies.sh @@ -33,6 +33,9 @@ DEPENDENCIES="build-essential \ wget \ zip \ zlib1g \ + rapidjson-dev \ + librapidxml-dev \ + libprotobuf-c-dev \ curl" # STOL library dependencies diff --git a/src/tmx/Messages/include/MessageTypes.h b/src/tmx/Messages/include/MessageTypes.h index e0fd09c70..3360788fe 100644 --- a/src/tmx/Messages/include/MessageTypes.h +++ b/src/tmx/Messages/include/MessageTypes.h @@ -92,6 +92,7 @@ static CONSTEXPR const char *MSGSUBTYPE_OUTGOING_STRING = "Outgoing"; static CONSTEXPR const char *MSGSUBTYPE_SHUTDOWN_STRING = "Shutdown"; static CONSTEXPR const char *MSGSUBTYPE_TIMESYNC_STRING = "TimeSync"; static CONSTEXPR const char *MSGSUBTYPE_SENSOR_DETECTED_OBJECT_STRING = "SensorDetectedObject"; +static CONSTEXPR const char *MSGSUBTYPE_RSU_STATUS_STRING = "RSUStatus"; } /* End namespace messages */ diff --git a/src/tmx/Messages/include/RSUStatusMessage.h b/src/tmx/Messages/include/RSUStatusMessage.h new file mode 100644 index 000000000..a5e276268 --- /dev/null +++ b/src/tmx/Messages/include/RSUStatusMessage.h @@ -0,0 +1,25 @@ +#pragma once + + +#include +#include "MessageTypes.h" + + +namespace tmx::messages { + + +class RSUStatusMessage : public tmx::message +{ + public: + RSUStatusMessage() {} + + /// Message type for routing this message through TMX core. + static constexpr const char* MessageType = MSGTYPE_APPLICATION_STRING; + + /// Message sub type for routing this message through TMX core. + static constexpr const char* MessageSubType = MSGSUBTYPE_RSU_STATUS_STRING; + }; + +} /* namespace tmx::messages */ + + diff --git a/src/tmx/TmxUtils/CMakeLists.txt b/src/tmx/TmxUtils/CMakeLists.txt index 30c30fb41..ee3c12c5f 100644 --- a/src/tmx/TmxUtils/CMakeLists.txt +++ b/src/tmx/TmxUtils/CMakeLists.txt @@ -1,7 +1,9 @@ PROJECT ( tmxutils CXX ) FILE (GLOB_RECURSE HEADERS "src/" "*.h*") FILE (GLOB_RECURSE SOURCES "src/" "*.c*") - +find_library(NETSNMPAGENT "netsnmpagent") +find_library(NETSNMPMIBS "netsnmpmibs") +find_library(NETSNMP "netsnmp") FIND_PACKAGE (carma-clock) FIND_LIBRARY (UUID_LIBRARY uuid) @@ -16,12 +18,18 @@ TARGET_INCLUDE_DIRECTORIES (${PROJECT_NAME} SYSTEM PUBLIC $ $ ${MYSQL_INCLUDE_DIR} + ${NETSNMP_INCLUDE_DIRS} ${MYSQLCPPCONN_INCLUDE_DIR}) TARGET_LINK_LIBRARIES (${PROJECT_NAME} PUBLIC ${TMXAPI_LIBRARIES} ${MYSQL_LIBRARIES} ${MYSQLCPPCONN_LIBRARIES} ${UUID_LIBRARY} + ${UUID_LIBRARY} + ${NETSNMPAGENT} + ${NETSNMPMIBS} + ${NETSNMP} + ${NETSNMP_LIBRARIES} rdkafka++ ::carma-clock gmock @@ -54,4 +62,4 @@ add_executable(${BINARY} ${TEST_SOURCES}) add_test(NAME ${BINARY} COMMAND ${BINARY}) -target_link_libraries(${BINARY} PUBLIC ${PROJECT_NAME} rdkafka++ gmock ${TMXAPI_LIBRARIES} ${ASN_J2735_LIBRARIES} ${UUID_LIBRARY} gtest) \ No newline at end of file +target_link_libraries(${BINARY} PUBLIC ${PROJECT_NAME} rdkafka++ gmock ${TMXAPI_LIBRARIES} ${ASN_J2735_LIBRARIES} ${UUID_LIBRARY} ${NETSNMPAGENT} ${NETSNMPMIBS} ${NETSNMP} ${NETSNMP_LIBRARIES} gtest) \ No newline at end of file diff --git a/src/tmx/TmxUtils/src/RSU_MIB_4_1.h b/src/tmx/TmxUtils/src/RSU_MIB_4_1.h new file mode 100644 index 000000000..793bf4bea --- /dev/null +++ b/src/tmx/TmxUtils/src/RSU_MIB_4_1.h @@ -0,0 +1,56 @@ +#pragma once +namespace tmx::utils::rsu41::mib::oid +{ + /** + * @brief This header file contains a subset of RSU MIB definition from https://github.com/certificationoperatingcouncil/COC_TestSpecs/blob/master/AppNotes/RSU/RSU-MIB.txt + */ + // Contains the ID given to this RSU. + static constexpr const char *RSU_ID_OID = "1.0.15628.4.1.17.4.0"; + + // Contains the version of this MIB supported by this RSU, e.g. rsuMIB 4.1 rev201812060000Z + static constexpr const char *RSU_MIB_VERSION = "1.0.15628.4.1.17.1.0"; + + // Contains the version of firmware running on this RSU. + static constexpr const char *RSU_FIRMWARE_VERSION = "1.0.15628.4.1.17.2.0"; + + // Contains the name of the manufacturer of this RSU. + static constexpr const char *RSU_MANUFACTURER = "1.0.15628.4.1.17.5.0"; + + // Contains GPS NMEA GPGGA output string. + static constexpr const char *RSU_GPS_OUTPUT_STRING = "1.0.15628.4.1.8.5.0"; + + // Immediate Forward Message Index + static constexpr const char *RSU_IFM_INDEX = "1.0.15628.4.1.5.1.1.0"; + + // Immediate Forward Message PSID. + static constexpr const char *RSU_IFM_PSID = "1.0.15628.4.1.5.1.2.0"; + + // Immediate Forward Message DSRC Message ID + static constexpr const char *RSU_IFM_DSRC_MSG_ID = "1.0.15628.4.1.5.1.3.0"; + + // Immediate Forward Message Transmit Mode + static constexpr const char *RSU_IFM_TX_MODE = "1.0.15628.4.1.5.1.4.0"; + + // DSRC channel set for Immediate Forward Message transmit + static constexpr const char *RSU_IFM_TX_CHANNEL = "1.0.15628.4.1.5.1.5.0"; + + // Set this bit to enable transmission of the message 0=off, 1=on + static constexpr const char *RSU_IFM_ENABLE = "1.0.15628.4.1.5.1.6.0"; + + // Create (4) or Destroy (6) row entry + static constexpr const char *RSU_IFM_STATUS = "1.0.15628.4.1.5.1.7.0"; + + // Specifies the current mode of operation of the RSU and provides capability to transition the device into a new mode, e.g. from the current mode to off, etc + static constexpr const char *RSU_MODE = "1.0.15628.4.1.99.0"; + + /* + SYNTAX INTEGER { + bothOp (0), --both Continuous and Alternating modes are operational + altOp (1), --Alternating mode is operational, + --Continuous mode is not operational + contOp (2), --Continuous mode is operational, + --Alternating mode is not operational + noneOp (3) --neither Continuous nor Alternating mode is operational + */ + static constexpr const char *RSU_CHAN_STATUS = "1.0.15628.4.1.19.1.0"; +} \ No newline at end of file diff --git a/src/tmx/TmxUtils/src/SNMPClient.cpp b/src/tmx/TmxUtils/src/SNMPClient.cpp new file mode 100644 index 000000000..e3398ebbd --- /dev/null +++ b/src/tmx/TmxUtils/src/SNMPClient.cpp @@ -0,0 +1,227 @@ +#include "SNMPClient.h" + +namespace tmx::utils +{ + + // Client defaults to SNMPv3 + snmp_client::snmp_client(const std::string &ip, const int &port, const std::string &community, + const std::string &snmp_user, const std::string &securityLevel, const std::string &authPassPhrase, int snmp_version, int timeout) + + : ip_(ip), port_(port), community_(community), snmp_version_(snmp_version), timeout_(timeout) + { + + PLOG(logDEBUG1) << "Starting SNMP Client. Target device IP address: " << ip_ << ", Target device SNMP port: " << port_; + + // Bring the IP address and port of the target SNMP device in the required form, which is "IPADDRESS:PORT": + std::string ip_port_string = ip_ + ":" + std::to_string(port_); + char *ip_port = &ip_port_string[0]; + + // Initialize SNMP session parameters + init_snmp("carma_snmp"); + snmp_sess_init(&session); + session.peername = ip_port; + session.version = snmp_version_; // SNMP_VERSION_3 + session.securityName = (char *)snmp_user.c_str(); + session.securityNameLen = snmp_user.length(); + + // Fallback behavior to setup a community for SNMP V1/V2 + if (snmp_version_ != SNMP_VERSION_3) + { + session.community = (unsigned char *)community.c_str(); + session.community_len = community_.length(); + } + + // SNMP authorization/privach config + if (securityLevel == "authPriv") + { + session.securityLevel = SNMP_SEC_LEVEL_AUTHPRIV; + } + + else if (securityLevel == "authNoPriv") + { + session.securityLevel = SNMP_SEC_LEVEL_AUTHNOPRIV; + } + + else + session.securityLevel = SNMP_SEC_LEVEL_NOAUTH; + + // Passphrase used for both authentication and privacy + auto phrase_len = authPassPhrase.length(); + auto phrase = (u_char *)authPassPhrase.c_str(); + + // Defining and generating auth config with SHA1 + session.securityAuthProto = snmp_duplicate_objid(usmHMACSHA1AuthProtocol, USM_AUTH_PROTO_SHA_LEN); + session.securityAuthProtoLen = USM_AUTH_PROTO_SHA_LEN; + session.securityAuthKeyLen = USM_AUTH_KU_LEN; + if (session.securityLevel != SNMP_SEC_LEVEL_NOAUTH && generate_Ku(session.securityAuthProto, + session.securityAuthProtoLen, + phrase, phrase_len, + session.securityAuthKey, + &session.securityAuthKeyLen) != SNMPERR_SUCCESS) + { + std::string errMsg = "Error generating Ku from authentication pass phrase. \n"; + throw snmp_client_exception(errMsg); + } + + // Defining and generating priv config with AES (since using SHA1) + session.securityPrivKeyLen = USM_PRIV_KU_LEN; + session.securityPrivProto = + snmp_duplicate_objid(usmAESPrivProtocol, + OID_LENGTH(usmAESPrivProtocol)); + session.securityPrivProtoLen = OID_LENGTH(usmAESPrivProtocol); + + if (session.securityLevel == SNMP_SEC_LEVEL_AUTHPRIV && generate_Ku(session.securityAuthProto, + session.securityAuthProtoLen, + phrase, phrase_len, + session.securityPrivKey, + &session.securityPrivKeyLen) != SNMPERR_SUCCESS) + { + std::string errMsg = "Error generating Ku from privacy pass phrase. \n"; + throw snmp_client_exception(errMsg); + } + + session.timeout = timeout_; + + // Opens the snmp session if it exists + ss = snmp_open(&session); + + if (ss == nullptr) + { + PLOG(logERROR) << "Failed to establish session with target device"; + snmp_sess_perror("snmpget", &session); + throw snmp_client_exception("Failed to establish session with target device"); + } + else + { + PLOG(logINFO) << "Established session with device at " << ip_; + } + } + + snmp_client::~snmp_client() + { + PLOG(logINFO) << "Closing SNMP session"; + snmp_close(ss); + } + + // Original implementation used in Carma Streets https://github.com/usdot-fhwa-stol/snmp-client + bool snmp_client::process_snmp_request(const std::string &input_oid, const request_type &request_type, snmp_response_obj &val) + { + + /*Structure to hold response from the remote host*/ + snmp_pdu *response; + + // Create pdu for the data + if (request_type == request_type::GET) + { + PLOG(logDEBUG1) << "Attempting to GET value for: " << input_oid; + pdu = snmp_pdu_create(SNMP_MSG_GET); + } + else if (request_type == request_type::SET) + { + PLOG(logDEBUG1) << "Attempting to SET value for " << input_oid << " to " << val.val_int; + pdu = snmp_pdu_create(SNMP_MSG_SET); + } + else + { + PLOG(logERROR) << "Invalid request type, method accpets only GET and SET"; + return false; + } + + // Read input OID into an OID variable: + // net-snmp has several methods for creating an OID object + // their documentation suggests using get_node. read_objid seems like a simpler approach + // TO DO: investigate update to get_node + if (!read_objid(input_oid.c_str(), OID, &OID_len)) + { + // If oid cannot be created + PLOG(logERROR) << "OID could not be created from input: " << input_oid; + return false; + } + else + { + + if (request_type == request_type::GET) + { + // Add OID to pdu for get request + snmp_add_null_var(pdu, OID, OID_len); + } + else if (request_type == request_type::SET) + { + if (val.type == snmp_response_obj::response_type::INTEGER) + { + snmp_add_var(pdu, OID, OID_len, 'i', (std::to_string(val.val_int)).c_str()); + } + // Needs to be finalized to support octet string use + else if (val.type == snmp_response_obj::response_type::STRING) + { + PLOG(logERROR) << "Setting string value is currently not supported"; + } + } + + PLOG(logINFO) << "Created OID for input: " << input_oid; + } + // Send the request + int status = snmp_synch_response(ss, pdu, &response); + PLOG(logINFO) << "Response request status: " << status << " (=" << (status == STAT_SUCCESS ? "SUCCESS" : "FAILED") << ")"; + + // Check GET response + if (status == STAT_SUCCESS && response && response->errstat == SNMP_ERR_NOERROR && request_type == request_type::GET) + { + for (auto vars = response->variables; vars; vars = vars->next_variable) + { + // Get value of variable depending on ASN.1 type + // Variable could be a integer, string, bitstring, ojbid, counter : defined here https://github.com/net-snmp/net-snmp/blob/master/include/net-snmp/types.h + // get Integer value + if (vars->type == ASN_INTEGER && vars->val.integer) + { + val.type = snmp_response_obj::response_type::INTEGER; + val.val_int = *vars->val.integer; + } + else if (vars->type == ASN_OCTET_STR && vars->val.string) + { + size_t str_len = vars->val_len; + for (size_t i = 0; i < str_len; ++i) + { + val.val_string.push_back(vars->val.string[i]); + } + val.type = snmp_response_obj::response_type::STRING; + } + } + } + else + { + log_error(status, request_type, response); + return false; + } + + if (response) + { + snmp_free_pdu(response); + OID_len = MAX_OID_LEN; + } + + return true; + } + + int snmp_client::get_port() const + { + return port_; + } + + void snmp_client::log_error(const int &status, const request_type &request_type, const snmp_pdu *response) const + { + + if (status == STAT_SUCCESS) + { + PLOG(logERROR) << "Variable type: " << response->variables->type << ". Error in packet " << static_cast(snmp_errstring(static_cast(response->errstat))); + } + else if (status == STAT_TIMEOUT) + { + PLOG(logERROR) << "Timeout, no response from server"; + } + else + { + PLOG(logERROR) << "Unknown SNMP Error for " << (request_type == request_type::GET ? "GET" : "SET"); + } + } +} // namespace \ No newline at end of file diff --git a/src/tmx/TmxUtils/src/SNMPClient.h b/src/tmx/TmxUtils/src/SNMPClient.h new file mode 100644 index 000000000..75eaa9360 --- /dev/null +++ b/src/tmx/TmxUtils/src/SNMPClient.h @@ -0,0 +1,113 @@ +#pragma once + +#include +#include +#include +#include "PluginLog.h" + +#include "SNMPClientException.h" + +namespace tmx::utils +{ + + enum class request_type + { + GET, + SET, + OTHER // Processing this request type is not a defined behavior, included for testing only + }; + + /** @brief A struct to hold the value being sent to the TSC, can be integer or string. Type needs to be defined*/ + struct snmp_response_obj + { + /** @brief The type of value being requested or set, on the TSC */ + enum class response_type + { + INTEGER, + STRING + }; + + // snmp response values can be any asn.1 supported types. + // Integer and string values can be processed here + int64_t val_int = 0; + std::vector val_string; + response_type type; + + inline bool operator==(const snmp_response_obj &obj2) const + { + return val_int == obj2.val_int && val_string == obj2.val_string && type == obj2.type; + } + }; + + class snmp_client + { + private: + /*variables to store an snmp session*/ + // struct that holds information about who we're going to be talking to + // We need to declare 2 of these, one to fill info with and second which is + // a pointer returned by the library + snmp_session session; + snmp_session *ss; + + /*Structure to hold all of the information that we're going to send to the remote host*/ + snmp_pdu *pdu; + + /*OID is going to hold the location of the information which we want to receive. It will need a size as well*/ + oid OID[MAX_OID_LEN]; + size_t OID_len = MAX_OID_LEN; + + // Values from config + /*Target device IP address*/ + std::string ip_; + /*Target device NTCIP port*/ + int port_ = 0; + /*Target community for establishing snmp communication*/ + std::string community_ = "public"; + /* net-snmp version definition: SNMP_VERSION_1:0 SNMP_VERSION_2c:1 SNMP_VERSION_2u:2 SNMP_VERSION_3:3 + https://github.com/net-snmp/net-snmp/blob/master/include/net-snmp/library/snmp.h */ + int snmp_version_ = 3; // default to 3 since previous versions not compatable currently + /*Time after which the the snmp request times out*/ + int timeout_ = 10000; + + public: + /** @brief Constructor for Traffic Signal Controller Service client. + * Uses the arguments provided to establish an snmp connection + * @param ip The ip ,as a string, for the tsc_client_service to establish an snmp communication with. + * @param port Target port as integer on the host for snmp communication. + * @param community The community id as a string. Defaults to "public" if unassigned. + * @param snmp_version The snmp_version as defined in net-snmp.Default to 0 if unassigned. + * net-snmp version definition: SNMP_VERSION_1:0 SNMP_VERSION_2c:1 SNMP_VERSION_2u:2 SNMP_VERSION_3:3" + * @param timeout The time in microseconds after which an snmp session request expires. Defaults to 100 if unassigned + * **/ + snmp_client(const std::string &ip, const int &port, const std::string &community, const std::string &snmp_user, const std::string &securityLevel, const std::string &authPassPhrase, int snmp_version = 0, int timeout = 100); + + /* Disable default copy constructor*/ + snmp_client(snmp_client &sc) = delete; + + /* Disable default move constructor*/ + snmp_client(snmp_client &&sc) = delete; + + /** @brief Returns true or false depending on whether the request could be processed for given input OID at the Traffic Signal Controller. + * @param input_oid The OID to request information for. + * @param request_type The request type for which the error is being logged. Accepted values are "GET" and "SET" only. + * @param value_int The integer value for the object returned by reference. For "SET" it is the value to be set. + * For "GET", it is the value returned for the returned object by reference. + * This is an optional argument, if not provided, defaults to 0. + * @param value_str String value for the object, returned by reference. Optional argument, if not provided the value is set as an empty string + * @return Integer value at the oid, returns false if value cannot be set/requested or oid doesn't have an integer value to return.*/ + + virtual bool process_snmp_request(const std::string &input_oid, const request_type &request_type, snmp_response_obj &val); + /** @brief Finds error type from status and logs an error. + * @param status The integer value corresponding to net-snmp defined errors. macros considered are STAT_SUCCESS(0) and STAT_TIMEOUT(2) + * @param request_type The request type for which the error is being logged (GET/SET). + * @param response The snmp_pdu struct */ + + virtual int get_port() const; // Returns the current port (should always be 161 or 162) + + void log_error(const int &status, const request_type &request_type, const snmp_pdu *response) const; + + /** @brief Destructor for client. Closes the snmp session**/ + virtual ~snmp_client(); + }; + +} // namespace \ No newline at end of file diff --git a/src/tmx/TmxUtils/src/SNMPClientException.cpp b/src/tmx/TmxUtils/src/SNMPClientException.cpp new file mode 100644 index 000000000..67f82fabe --- /dev/null +++ b/src/tmx/TmxUtils/src/SNMPClientException.cpp @@ -0,0 +1,8 @@ +#include "SNMPClientException.h" + +namespace tmx::utils { + + snmp_client_exception::snmp_client_exception(const std::string &msg): std::runtime_error(msg){}; + + snmp_client_exception::~snmp_client_exception() = default; +} \ No newline at end of file diff --git a/src/tmx/TmxUtils/src/SNMPClientException.h b/src/tmx/TmxUtils/src/SNMPClientException.h new file mode 100644 index 000000000..76020a5b1 --- /dev/null +++ b/src/tmx/TmxUtils/src/SNMPClientException.h @@ -0,0 +1,23 @@ +#pragma once + +#include + +namespace tmx::utils { + /** + * @brief Runtime error related to SNMP client used to communicate with Traffic Signal Controller (NTCIP). + * + * @author Paul Bourelly + */ + class snmp_client_exception : public std::runtime_error{ + public: + /** + * @brief Destructor. + */ + ~snmp_client_exception() override; + /** + * @brief Constructor. + * @param msg String exception message. + */ + explicit snmp_client_exception(const std::string &msg ); + }; +} \ No newline at end of file diff --git a/src/tmx/TmxUtils/src/simulation/SimulationEnvUtils.h b/src/tmx/TmxUtils/src/simulation/SimulationEnvUtils.h index 7498a7922..efffc4ffa 100644 --- a/src/tmx/TmxUtils/src/simulation/SimulationEnvUtils.h +++ b/src/tmx/TmxUtils/src/simulation/SimulationEnvUtils.h @@ -66,6 +66,12 @@ namespace tmx::utils::sim{ * for CDASim connection. */ constexpr inline static const char *INFRASTRUCTURE_ID = "INFRASTRUCTURE_ID"; + + /** + * @brief Name of environment variable for storing infrastructure name of v2xhub. Only necessary in SIMULATION MODE + * for CDASim connection. + */ + constexpr inline static const char *INFRASTRUCTURE_NAME = "INFRASTRUCTURE_NAME"; /** * @brief Function to return bool indicating whether V2X-Hub deployment is in SIMULATION MODE or not. * @return true if SIMULATION_MODE is "true" or "TRUE" and false otherwise. diff --git a/src/tmx/TmxUtils/test/MockSNMPClient.h b/src/tmx/TmxUtils/test/MockSNMPClient.h new file mode 100644 index 000000000..7e07fe47f --- /dev/null +++ b/src/tmx/TmxUtils/test/MockSNMPClient.h @@ -0,0 +1,18 @@ +#pragma once +#include "SNMPClient.h" +#include +#include + +using namespace tmx::utils; +using namespace std; + +namespace unit_test +{ + class mock_snmp_client : public snmp_client + { + public: + mock_snmp_client(const std::string &ip, const int &port, const std::string &community, const std::string &snmp_user, const std::string &securityLevel, const std::string &authPassPhrase, int snmp_version = 0, int timeout = 100) : snmp_client(ip, port, community, snmp_user, securityLevel, authPassPhrase, snmp_version, timeout){}; + ~mock_snmp_client() = default; + MOCK_METHOD(bool, process_snmp_request, (const std::string &input_oid, const request_type &request_type, snmp_response_obj &val), (override)); + }; +} \ No newline at end of file diff --git a/src/tmx/TmxUtils/test/test_SNMPClient.cpp b/src/tmx/TmxUtils/test/test_SNMPClient.cpp new file mode 100644 index 000000000..3e775945c --- /dev/null +++ b/src/tmx/TmxUtils/test/test_SNMPClient.cpp @@ -0,0 +1,93 @@ + +#include "MockSNMPClient.h" +#include "gtest/gtest.h" +#include "RSU_MIB_4_1.h" + +using namespace tmx::utils; +using namespace std; +using namespace tmx::utils::rsu41::mib::oid; +using testing::_; +using testing::Action; +using testing::DoDefault; +using testing::Return; +using testing::SetArgReferee; +using testing::Throw; + +namespace unit_test +{ + class test_SNMPClient : public ::testing::Test + { + public: + shared_ptr scPtr; + uint16_t port = 161; + test_SNMPClient() + { + scPtr = make_shared("127.0.0.1", port, "public", "test", "authPriv", "testtesttest", SNMP_VERSION_3, 1000); + } + }; + + TEST_F(test_SNMPClient, constructor_error) + { + ASSERT_THROW(snmp_client("127.0.0.1", port, "public", "test", "authPriv", "test", SNMP_VERSION_3, 1000), snmp_client_exception); + ASSERT_NO_THROW(snmp_client("127.0.0.1", port, "public", "test", "authPriv", "testtesttest", SNMP_VERSION_3, 1000)); + ASSERT_NO_THROW(snmp_client("127.0.0.1", port, "public", "test", "authNoPriv", "testtesttest", SNMP_VERSION_3, 1000)); + ASSERT_NO_THROW(snmp_client("127.0.0.1", port, "public", "test", "authNoPriv", "testtesttest", SNMP_VERSION_1, 1000)); + ASSERT_NO_THROW(snmp_client("127.0.0.1", port, "public", "test", "", "testtesttest", SNMP_VERSION_3, 1000)); + ASSERT_THROW(snmp_client("127.0.XX.XX", port, "public", "test", "", "testtesttest", SNMP_VERSION_3, 1000), snmp_client_exception); + } + + TEST_F(test_SNMPClient, get_port) + { + ASSERT_EQ(161, scPtr->get_port()); + } + + TEST_F(test_SNMPClient, log_error) + { + snmp_pdu response; + ASSERT_NO_THROW(scPtr->log_error(STAT_ERROR, request_type::GET, &response)); + ASSERT_NO_THROW(scPtr->log_error(STAT_ERROR, request_type::SET, &response)); + ASSERT_NO_THROW(scPtr->log_error(STAT_SUCCESS, request_type::OTHER, &response)); + ASSERT_NO_THROW(scPtr->log_error(STAT_TIMEOUT, request_type::OTHER, &response)); + } + + TEST_F(test_SNMPClient, process_snmp_request) + { + snmp_response_obj reqponseRSUID; + string rsuId = "RSU4.1"; + vector rsuId_c; + copy(rsuId.begin(), rsuId.end(), back_inserter(rsuId_c)); + reqponseRSUID.val_string = rsuId_c; + reqponseRSUID.type = snmp_response_obj::response_type::STRING; + EXPECT_CALL(*scPtr, process_snmp_request(RSU_ID_OID, request_type::GET, _)).WillRepeatedly(testing::DoAll(SetArgReferee<2>(reqponseRSUID), Return(true))); + EXPECT_CALL(*scPtr, process_snmp_request(RSU_ID_OID, request_type::SET, _)).WillRepeatedly(testing::DoAll(SetArgReferee<2>(reqponseRSUID), Return(true))); + + snmp_response_obj reqponseMode; + reqponseMode.val_int = 2; + reqponseMode.type = snmp_response_obj::response_type::INTEGER; + EXPECT_CALL(*scPtr, process_snmp_request(RSU_MODE, request_type::GET, _)).WillRepeatedly(testing::DoAll(SetArgReferee<2>(reqponseMode), Return(true))); + EXPECT_CALL(*scPtr, process_snmp_request(RSU_MODE, request_type::SET, _)).WillRepeatedly(testing::DoAll(SetArgReferee<2>(reqponseRSUID), Return(true))); + + snmp_response_obj reqponseInvalidOID; + EXPECT_CALL(*scPtr, process_snmp_request("Invalid OID", request_type::GET, _)).WillRepeatedly(testing::DoAll(SetArgReferee<2>(reqponseInvalidOID), Return(false))); + EXPECT_CALL(*scPtr, process_snmp_request("Invalid OID", request_type::SET, _)).WillRepeatedly(testing::DoAll(SetArgReferee<2>(reqponseInvalidOID), Return(false))); + + snmp_response_obj response; + scPtr->process_snmp_request(RSU_ID_OID, request_type::GET, response); + scPtr->process_snmp_request(RSU_ID_OID, request_type::SET, response); + scPtr->process_snmp_request(RSU_MODE, request_type::GET, response); + scPtr->process_snmp_request(RSU_MODE, request_type::SET, response); + scPtr->process_snmp_request("Invalid OID", request_type::GET, response); + scPtr->process_snmp_request("Invalid OID", request_type::SET, response); + + snmp_client scClient("127.0.0.1", port, "public", "test", "authPriv", "testtesttest", SNMP_VERSION_3, 1000); + scClient.process_snmp_request(RSU_ID_OID, request_type::GET, reqponseRSUID); + scClient.process_snmp_request(RSU_ID_OID, request_type::SET, reqponseRSUID); + scClient.process_snmp_request(RSU_ID_OID, request_type::OTHER, reqponseRSUID); + scClient.process_snmp_request("INVALID OID", request_type::GET, reqponseRSUID); + + scClient.process_snmp_request(RSU_MODE, request_type::GET, reqponseMode); + scClient.process_snmp_request(RSU_MODE, request_type::SET, reqponseMode); + scClient.process_snmp_request(RSU_MODE, request_type::OTHER, reqponseMode); + } + +} \ No newline at end of file diff --git a/src/v2i-hub/RSUHealthMonitorPlugin/CMakeLists.txt b/src/v2i-hub/RSUHealthMonitorPlugin/CMakeLists.txt new file mode 100755 index 000000000..d863a4bf1 --- /dev/null +++ b/src/v2i-hub/RSUHealthMonitorPlugin/CMakeLists.txt @@ -0,0 +1,26 @@ +PROJECT(RSUHealthMonitorPlugin VERSION 7.5.1 LANGUAGES CXX) + +set(TMX_PLUGIN_NAME "RSU Health Monitor") + +find_library(libasn1c .) + +BuildTmxPlugin() + +TARGET_LINK_LIBRARIES(${PROJECT_NAME} PUBLIC tmxutils jsoncpp NemaTode) + +############# +## Testing ## +############# +enable_testing() +include_directories(${PROJECT_SOURCE_DIR}/src) +add_library(${PROJECT_NAME}_lib src/RSUHealthMonitorWorker.cpp) +target_link_libraries(${PROJECT_NAME}_lib PUBLIC + tmxutils + NemaTode + jsoncpp) +set(BINARY ${PROJECT_NAME}_test) +file(GLOB_RECURSE TEST_SOURCES LIST_DIRECTORIES false test/*.h test/*.cpp) +set(SOURCES ${TEST_SOURCES} WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}/test) +add_executable(${BINARY} ${TEST_SOURCES}) +add_test(NAME ${BINARY} COMMAND ${BINARY}) +target_link_libraries(${BINARY} PUBLIC ${PROJECT_NAME}_lib gtest) \ No newline at end of file diff --git a/src/v2i-hub/RSUHealthMonitorPlugin/manifest.json b/src/v2i-hub/RSUHealthMonitorPlugin/manifest.json new file mode 100755 index 000000000..f2ee94443 --- /dev/null +++ b/src/v2i-hub/RSUHealthMonitorPlugin/manifest.json @@ -0,0 +1,51 @@ +{ + "name": "RSUHealthMonitor", + "description": "Monitor RSU health status", + "version": "@PROJECT_VERSION@", + "exeLocation": "/bin/RSUHealthMonitorPlugin", + "coreIpAddr":"127.0.0.1", + "corePort":24601, + "messageTypes": [], + "configuration": [ + { + "key": "LogLevel", + "default": "INFO", + "description": "The log level for this plugin" + }, + { + "key":"Interval", + "default":"1", + "description": "Sending RSU SNMP GET request at every configured interval. Default every 1 second. Unit of measure: second." + }, + { + "key":"RSUIp", + "default":"192.168.XX.XX", + "description":"An IP address of the RSU the V2X hub is connected to." + }, + { + "key":"SNMPPort", + "default":"161", + "description":"The SNMP port for sending message or command." + }, + { + "key":"AuthPassPhrase", + "default":"dummy", + "description":"SNMP v3 authentication passphrase" + }, + { + "key":"SecurityUser", + "default":"authOnlyUser", + "description":"SNMP Security Name" + }, + { + "key":"SecurityLevel", + "default":"authPriv", + "description":"SNMP Security level" + }, + { + "key":"RSUMIBVersion", + "default":"RSU4.1", + "description":"The version of RSU MIB (Management Information Base). E.G. RSU4.1 or RSU1218. Currently only support RSU4.1" + } + ] +} \ No newline at end of file diff --git a/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorPlugin.cpp b/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorPlugin.cpp new file mode 100755 index 000000000..8d81a3b6c --- /dev/null +++ b/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorPlugin.cpp @@ -0,0 +1,90 @@ +#include "RSUHealthMonitorPlugin.h" + +using namespace RSUHealthMonitor; +using namespace tmx::utils; + +namespace RSUHealthMonitor +{ + + RSUHealthMonitorPlugin::RSUHealthMonitorPlugin(const std::string &name) : PluginClient(name) + { + _rsuWorker = std::make_shared(); + _rsuStatusTimer = make_unique(); + UpdateConfigSettings(); + + // Send SNMP call to RSU periodically at configurable interval. + _timerThId = _rsuStatusTimer->AddPeriodicTick([this]() + { + // Periodic SNMP call to get RSU status based on RSU MIB version 4.1 + auto rsuStatusJson = _rsuWorker->getRSUStatus(_rsuMibVersion, _rsuIp, _snmpPort, _securityUser, _authPassPhrase, _securityLevel, SEC_TO_MICRO); + PLOG(logINFO) << "Updating _interval: " << _interval; + //Broadcast RSU status periodically at _interval + BroadcastRSUStatus(rsuStatusJson); }, + std::chrono::milliseconds(_interval * SEC_TO_MILLI)); + _rsuStatusTimer->Start(); + } + + void RSUHealthMonitorPlugin::UpdateConfigSettings() + { + PLOG(logINFO) << "Updating configuration settings."; + + lock_guard lock(_configMutex); + GetConfigValue("Interval", _interval); + GetConfigValue("RSUIp", _rsuIp); + GetConfigValue("SNMPPort", _snmpPort); + GetConfigValue("AuthPassPhrase", _authPassPhrase); + GetConfigValue("SecurityUser", _securityUser); + GetConfigValue("SecurityLevel", _securityLevel); + GetConfigValue("RSUMIBVersion", _rsuMIBVersionStr); + boost::trim_left(_rsuMIBVersionStr); + boost::trim_right(_rsuMIBVersionStr); + // Support RSU MIB version 4.1 + if (boost::iequals(_rsuMIBVersionStr, RSU4_1_str)) + { + _rsuMibVersion = RSUMibVersion::RSUMIB_V_4_1; + } + else + { + _rsuMibVersion = RSUMibVersion::UNKOWN_MIB_V; + PLOG(logERROR) << "Uknown RSU MIB version: " << _rsuMIBVersionStr; + } + + try + { + _rsuStatusTimer->ChangeFrequency(_timerThId, std::chrono::milliseconds(_interval * SEC_TO_MILLI)); + } + catch (const tmx::TmxException &ex) + { + PLOG(logERROR) << ex.what(); + } + } + + void RSUHealthMonitorPlugin::OnConfigChanged(const char *key, const char *value) + { + PluginClient::OnConfigChanged(key, value); + UpdateConfigSettings(); + } + + void RSUHealthMonitorPlugin::BroadcastRSUStatus(const Json::Value &rsuStatusJson) + { + // Broadcast the RSU status info when there are RSU responses. + if (!rsuStatusJson.empty() && _rsuWorker) + { + auto rsuStatusFields = _rsuWorker->getJsonKeys(rsuStatusJson); + auto configTbl = _rsuWorker->GetRSUStatusConfig(_rsuMibVersion); + + // Only broadcast RSU status when all required fields are present. + if (_rsuWorker->validateAllRequiredFieldsPresent(configTbl, rsuStatusFields)) + { + auto sendRsuStatusMsg = _rsuWorker->convertJsonToTMXMsg(rsuStatusJson); + BroadcastMessage(sendRsuStatusMsg, RSUHealthMonitorPlugin::GetName()); + } + } + } + +} // namespace RSUHealthMonitor + +int main(int argc, char *argv[]) +{ + return run_plugin("RSU Health Monitor", argc, argv); +} diff --git a/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorPlugin.h b/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorPlugin.h new file mode 100755 index 000000000..47d8ee1bd --- /dev/null +++ b/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorPlugin.h @@ -0,0 +1,46 @@ + +#pragma once + +#include "PluginClient.h" +#include +#include "RSUStatusMessage.h" +#include "RSUHealthMonitorWorker.h" + +using namespace tmx::utils; +using namespace std; + +namespace RSUHealthMonitor +{ + + class RSUHealthMonitorPlugin : public PluginClient + { + private: + mutex _configMutex; + uint16_t _interval; + string _rsuIp; + uint16_t _snmpPort; + string _authPassPhrase; + string _securityUser; + string _securityLevel; + string _rsuMIBVersionStr; + RSUMibVersion _rsuMibVersion; + const char *RSU4_1_str = "RSU4.1"; + const char *RSU1218_str = "RSU1218"; + shared_ptr _rsuWorker; + unique_ptr _rsuStatusTimer; + uint _timerThId; + const long SEC_TO_MICRO = 1000000; + const long SEC_TO_MILLI= 1000; + /** + * @brief Broadcast RSU status + * @param Json::Value RSU status in JSON format + */ + void BroadcastRSUStatus(const Json::Value& rsuStatusJson); + + public: + explicit RSUHealthMonitorPlugin(const std::string &name); + void UpdateConfigSettings(); + void OnConfigChanged(const char *key, const char *value) override; + }; + +} // namespace RSUHealthMonitorPlugin \ No newline at end of file diff --git a/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorWorker.cpp b/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorWorker.cpp new file mode 100644 index 000000000..6ac62f9fc --- /dev/null +++ b/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorWorker.cpp @@ -0,0 +1,221 @@ +#include "RSUHealthMonitorWorker.h" + +namespace RSUHealthMonitor +{ + + RSUHealthMonitorWorker::RSUHealthMonitorWorker() + { + _RSUSTATUSConfigMapPtr = make_shared>(); + // Currently only support RSU MIB version 4.1. Other future supported versions will be inserted here. + RSUStatusConfigTable rsuRstatusTable = constructRsuStatusConfigTable(RSUMibVersion::RSUMIB_V_4_1); + _RSUSTATUSConfigMapPtr->insert({RSUMibVersion::RSUMIB_V_4_1, rsuRstatusTable}); + } + + RSUStatusConfigTable RSUHealthMonitorWorker::constructRsuStatusConfigTable(const RSUMibVersion &mibVersion) const + { + RSUStatusConfigTable rsuStatusTbl; + // Populate custom defined RSU Status table with RSU MIB version 4.1. + if (mibVersion == RSUMibVersion::RSUMIB_V_4_1) + { + RSUFieldOIDStruct rsuID = {"rsuID", RSU_ID_OID, true}; + rsuStatusTbl.push_back(rsuID); + + RSUFieldOIDStruct rsuMibVersion = {"rsuMibVersion", RSU_MIB_VERSION, true}; + rsuStatusTbl.push_back(rsuMibVersion); + + RSUFieldOIDStruct rsuFirmwareVersion = {"rsuFirmwareVersion", RSU_FIRMWARE_VERSION, true}; + rsuStatusTbl.push_back(rsuFirmwareVersion); + + RSUFieldOIDStruct rsuManufacturer = {"rsuManufacturer", RSU_MANUFACTURER, true}; + rsuStatusTbl.push_back(rsuManufacturer); + + RSUFieldOIDStruct rsuGpsOutputString = {"rsuGpsOutputString", RSU_GPS_OUTPUT_STRING, true}; + rsuStatusTbl.push_back(rsuGpsOutputString); + + RSUFieldOIDStruct rsuIFMIndex = {"rsuIFMIndex", RSU_IFM_INDEX, false}; + rsuStatusTbl.push_back(rsuIFMIndex); + + RSUFieldOIDStruct rsuIFMPsid = {"rsuIFMPsid", RSU_IFM_PSID, false}; + rsuStatusTbl.push_back(rsuIFMPsid); + + RSUFieldOIDStruct rsuIFMDsrcMsgId = {"rsuIFMDsrcMsgId", RSU_IFM_DSRC_MSG_ID, false}; + rsuStatusTbl.push_back(rsuIFMDsrcMsgId); + + RSUFieldOIDStruct rsuIFMTxMode = {"rsuIFMTxMode", RSU_IFM_INDEX, false}; + rsuStatusTbl.push_back(rsuIFMTxMode); + + RSUFieldOIDStruct rsuIFMTxChannel = {"rsuIFMTxChannel", RSU_IFM_TX_CHANNEL, false}; + rsuStatusTbl.push_back(rsuIFMTxChannel); + + RSUFieldOIDStruct rsuIFMEnable = {"rsuIFMEnable", RSU_IFM_ENABLE, false}; + rsuStatusTbl.push_back(rsuIFMEnable); + + RSUFieldOIDStruct rsuIFMStatus = {"rsuIFMStatus", RSU_IFM_STATUS, false}; + rsuStatusTbl.push_back(rsuIFMStatus); + + RSUFieldOIDStruct rsuMode = {"rsuMode", RSU_MODE, true}; + rsuStatusTbl.push_back(rsuMode); + + RSUFieldOIDStruct rsuChanStatus = {"rsuChanStatus", RSU_CHAN_STATUS, true}; + rsuStatusTbl.push_back(rsuChanStatus); + } + return rsuStatusTbl; + } + + bool RSUHealthMonitorWorker::validateAllRequiredFieldsPresent(const RSUHealthMonitor::RSUStatusConfigTable &configTbl, const vector &fields) const + { + bool isAllPresent = true; + for (const auto &config : configTbl) + { + if (config.required && find(fields.begin(), fields.end(), config.field) == fields.end()) + { + isAllPresent = false; + PLOG(logWARNING) << "No broadcast as required field " << config.field << " is not present!"; + } + } + return isAllPresent; + } + + RSUStatusConfigTable RSUHealthMonitorWorker::GetRSUStatusConfig(const RSUMibVersion &mibVersion) const + { + RSUStatusConfigTable result; + try + { + result = _RSUSTATUSConfigMapPtr->at(mibVersion); + } + catch (const out_of_range &ex) + { + PLOG(logERROR) << "Unknown MIB version! " << ex.what(); + } + return result; + } + + std::map RSUHealthMonitorWorker::ParseRSUGPS(const std::string &gps_nmea_data) const + { + std::map result; + nmea::NMEAParser parser; + nmea::GPSService gps(parser); + try + { + parser.readLine(gps_nmea_data); + std::stringstream ss; + ss << std::setprecision(8) << std::fixed << gps.fix.latitude << std::endl; + auto latitude_str = ss.str(); + std::stringstream sss; + sss << std::setprecision(8) << std::fixed << gps.fix.longitude << std::endl; + auto longitude_str = sss.str(); + result.insert({std::stod(latitude_str), std::stod(longitude_str)}); + PLOG(logDEBUG) << "Parse GPS NMEA string: " << gps_nmea_data << ". Result (Latitude, Longitude): (" << latitude_str << "," << longitude_str << ")"; + } + catch (const nmea::NMEAParseError &e) + { + PLOG(logERROR) << e.message.c_str(); + } + return result; + } + + Json::Value RSUHealthMonitorWorker::getRSUStatus(const RSUMibVersion &mibVersion, const string &_rsuIp, uint16_t &_snmpPort, const string &_securityUser, const string &_authPassPhrase, const string &_securityLevel, long timeout) + { + auto rsuStatusConfigTbl = GetRSUStatusConfig(mibVersion); + if (rsuStatusConfigTbl.size() == 0) + { + PLOG(logERROR) << "RSU status update call failed due to the RSU status config table is empty!"; + return Json::nullValue; + } + try + { + // Create SNMP client and use SNMP V3 protocol + PLOG(logINFO) << "Update SNMP client: RSU IP: " << _rsuIp << ", RSU port: " << _snmpPort << ", User: " << _securityUser << ", auth pass phrase: " << _authPassPhrase << ", security level: " + << _securityLevel; + auto _snmpClientPtr = std::make_unique(_rsuIp, _snmpPort, "", _securityUser, _securityLevel, _authPassPhrase, SNMP_VERSION_3, timeout); + + Json::Value rsuStatuJson; + // Sending RSU SNMP call for each field as each field has its own OID. + for (const auto &config : rsuStatusConfigTbl) + { + PLOG(logINFO) << "SNMP RSU status call for field:" << config.field << ", OID: " << config.oid; + snmp_response_obj responseVal; + if (_snmpClientPtr) + { + auto success = _snmpClientPtr->process_snmp_request(config.oid, request_type::GET, responseVal); + if (!success && config.required) + { + PLOG(logERROR) << "SNMP session stopped as the required field: " << config.field << " failed! Return empty RSU status!"; + return Json::nullValue; + } + else if (success) + { + auto json = populateJson(config.field, responseVal); + for(const auto &key: json.getMemberNames()) + { + rsuStatuJson[key] = json[key]; + } + } + } + } + return rsuStatuJson; + } + catch (tmx::utils::snmp_client_exception &ex) + { + PLOG(logERROR) << ex.what(); + return Json::nullValue; + } + } + + Json::Value RSUHealthMonitorWorker::populateJson(const string &field, const snmp_response_obj &response) const + { + Json::Value rsuStatuJson; + if (response.type == snmp_response_obj::response_type::INTEGER) + { + rsuStatuJson[field] = response.val_int; + } + else if (response.type == snmp_response_obj::response_type::STRING) + { + string response_str(response.val_string.begin(), response.val_string.end()); + // Proess GPS nmea string + if (boost::iequals(field, "rsuGpsOutputString")) + { + auto gps = ParseRSUGPS(response_str); + rsuStatuJson["rsuGpsOutputStringLatitude"] = gps.begin()->first; + rsuStatuJson["rsuGpsOutputStringLongitude"] = gps.begin()->second; + } + rsuStatuJson[field] = response_str; + } + return rsuStatuJson; + } + + RSUStatusMessage RSUHealthMonitorWorker::convertJsonToTMXMsg(const Json::Value &json) const + { + Json::FastWriter fasterWirter; + string json_str = fasterWirter.write(json); + tmx::messages::RSUStatusMessage rsuStatusMsg; + rsuStatusMsg.set_contents(json_str); + return rsuStatusMsg; + } + + vector RSUHealthMonitorWorker::getJsonKeys(const Json::Value &json) const + { + vector keys; + if (json.isArray()) + { + for (auto itr = json.begin(); itr != json.end(); itr++) + { + if (itr->isObject()) + { + for (auto const &field : itr->getMemberNames()) + { + keys.push_back(field); + } + } + } + } + else if (json.isObject()) + { + for (auto const &field : json.getMemberNames()) + { + keys.push_back(field); + } + } + return keys; + } +} \ No newline at end of file diff --git a/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorWorker.h b/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorWorker.h new file mode 100644 index 000000000..ef7a429c5 --- /dev/null +++ b/src/v2i-hub/RSUHealthMonitorPlugin/src/RSUHealthMonitorWorker.h @@ -0,0 +1,120 @@ +#pragma once +#include +#include "RSU_MIB_4_1.h" +#include +#include +#include +#include +#include +#include "PluginLog.h" +#include +#include "SNMPClient.h" +#include +#include "RSUStatusMessage.h" + +using namespace std; +using namespace tmx::utils; +using namespace tmx::utils::rsu41::mib::oid; +using namespace tmx::messages; + +namespace RSUHealthMonitor +{ + enum class RSUMibVersion + { + UNKOWN_MIB_V = 0, + RSUMIB_V_4_1 = 1, + RSUMIB_V_1218 = 2 + }; + + struct RSUFieldOIDStruct + { + string field; + string oid; + bool required; // Indicate whether this field is required to before broadcasting the RSU status. + }; + + /** + * RSUStatusTable is custom defined RSU status information. + * The fields are a subset of the fields from the RSU MIB definition used to quantify the health of the RSU. https://github.com/certificationoperatingcouncil/COC_TestSpecs/blob/master/AppNotes/RSU/RSU-MIB.txt + */ + using RSUStatusConfigTable = vector; + + class RSUHealthMonitorWorker + { + private: + // A map of RSU MIB version used and RSUStatusTable + shared_ptr> _RSUSTATUSConfigMapPtr; + + /** + * @brief Poupate the RSU status table with the specified version of OIDs and fields. + * Private: Only allow to initialze the RSU STATUS MAP once + * @param mibVersion specified + * @return RSUStatusTable the self defined RSU status https://usdot-carma.atlassian.net/wiki/spaces/WFD2/pages/2640740360/RSU+Health+Monitor+Plugin+Design + */ + RSUStatusConfigTable constructRsuStatusConfigTable(const RSUMibVersion &mibVersion) const; + + public: + // Populate the RSU Status Table with predefined fields and their mapping OIDs in constructor + RSUHealthMonitorWorker(); + + // Access to the RSU status table based in the RSU MIB version provided + RSUStatusConfigTable GetRSUStatusConfig(const RSUMibVersion &mibVersion) const; + + /** + * @brief determine if all required fields in the RSU config map _RSUSTATUSConfigMapPtr present in the input fields + * Use _RSUSTATUSConfigMapPtr RSU status config map that defines all fields and whether the fields are required. + * @param RSUStatusConfigTable RSU Status configration table to compare with. + * @param vector Input RSU fields to verify + * @return True if all required fields found. Otherwise, false. + */ + bool validateAllRequiredFieldsPresent(const RSUHealthMonitor::RSUStatusConfigTable &configTbl, const vector &fields) const; + + /** + * @brief Parse NMEA GPS sentense and return GPS related data + * @param gps_nmea_data NMEA GPS sentense + * @return map A map of latitude and longitude + */ + std::map ParseRSUGPS(const std::string &gps_nmea_data) const; + + /** + * @brief Sending SNMP V3 requests to get info for each field in the RSUStatusConfigTable, and return the RSU status in JSON + * Use RSU Status configuration table include RSU field, OIDs, and whether fields are required or optional + * @param RSUMibVersion The RSU MIB version used + * @param string RSU IP address + * @param uint16_t SNMP port + * @param string security user used for SNMP authentication + * @param string authentication password + * @param string security level: authPriv or authNoPriv. + * @param long session time out + */ + Json::Value getRSUStatus(const RSUMibVersion &mibVersion, const string &_rsuIp, uint16_t &_snmpPort, const string &_securityUser, const string &_authPassPhrase, const string &_securityLevel, long timeout); + + /*** + *@brief Convert the JSON message into TMX message + @param Json Input Json value + @return RSUStatusMessage TMX message + */ + RSUStatusMessage convertJsonToTMXMsg(const Json::Value &json) const; + + /** + * @brief Populate Json with snmp response object. + * @param string The field that maps to an OID. + * @param snmp_response_obj The response returned by SNMP call for the OID. + * @return Json value populated with response object. + */ + Json::Value populateJson(const string &field, const snmp_response_obj &response) const; + + /** + * @brief List the keys from the input Json values + * @param Json Input JSON values + * @return vector of key strings + */ + vector getJsonKeys(const Json::Value &json) const; + + // Delete move constructor + RSUHealthMonitorWorker(RSUHealthMonitorWorker &&worker) = delete; + + // Delete copy constructor + RSUHealthMonitorWorker(RSUHealthMonitorWorker &worker) = delete; + }; +} // namespace RSUHealthMonitor diff --git a/src/v2i-hub/RSUHealthMonitorPlugin/test/main.cpp b/src/v2i-hub/RSUHealthMonitorPlugin/test/main.cpp new file mode 100644 index 000000000..ba7cd2667 --- /dev/null +++ b/src/v2i-hub/RSUHealthMonitorPlugin/test/main.cpp @@ -0,0 +1,8 @@ + +#include + +int main(int argc, char **argv) +{ + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/v2i-hub/RSUHealthMonitorPlugin/test/test_RSUHealthMonitorWorker.cpp b/src/v2i-hub/RSUHealthMonitorPlugin/test/test_RSUHealthMonitorWorker.cpp new file mode 100644 index 000000000..cc61247b5 --- /dev/null +++ b/src/v2i-hub/RSUHealthMonitorPlugin/test/test_RSUHealthMonitorWorker.cpp @@ -0,0 +1,111 @@ +#include "RSUHealthMonitorWorker.h" +#include + +namespace RSUHealthMonitor +{ + class test_RSUHealthMonitorWorker : public ::testing::Test + { + public: + std::shared_ptr _rsuWorker = std::make_shared(); + }; + + TEST_F(test_RSUHealthMonitorWorker, GetRSUStatusConfig) + { + RSUStatusConfigTable rsuStatusConfigTbl = _rsuWorker->GetRSUStatusConfig(RSUMibVersion::RSUMIB_V_4_1); + ASSERT_EQ(14, rsuStatusConfigTbl.size()); + + rsuStatusConfigTbl = _rsuWorker->GetRSUStatusConfig(RSUMibVersion::UNKOWN_MIB_V); + ASSERT_EQ(0, rsuStatusConfigTbl.size()); + + RSUMibVersion mibVersionDefault; + rsuStatusConfigTbl = _rsuWorker->GetRSUStatusConfig(mibVersionDefault); + ASSERT_EQ(0, rsuStatusConfigTbl.size()); + } + + TEST_F(test_RSUHealthMonitorWorker, validateAllRequiredFieldsPresent) + { + auto config = _rsuWorker->GetRSUStatusConfig(RSUMibVersion::RSUMIB_V_4_1); + vector requiredFields = {"rsuID", "rsuMibVersion", "rsuFirmwareVersion", "rsuManufacturer", "rsuGpsOutputString", "rsuMode", "rsuChanStatus"}; + ASSERT_TRUE(_rsuWorker->validateAllRequiredFieldsPresent(config, requiredFields)); + + requiredFields = {"rsuID", "rsuMibVersion", "rsuFirmwareVersion"}; + ASSERT_FALSE(_rsuWorker->validateAllRequiredFieldsPresent(config, requiredFields)); + } + + TEST_F(test_RSUHealthMonitorWorker, ParseRSUGPS) + { + std::string gps_nmea_data = "$GPGGA,142440.00,3857.3065,N,07708.9734,W,2,18,0.65,86.18,M,-34.722,M,,*62"; + auto gps_map = _rsuWorker->ParseRSUGPS(gps_nmea_data); + ASSERT_EQ(1, gps_map.size()); + double expected_latitude = 38.9551; + double expected_longitude = -77.1496; + for (auto itr = gps_map.begin(); itr != gps_map.end(); itr++) + { + ASSERT_NEAR(expected_latitude, itr->first, 0.001); + ASSERT_NEAR(expected_longitude, itr->second, 0.001); + } + std::string invalid_gps_nmea_data = "$*GPGGA,invalid"; + auto gps_map_invalid = _rsuWorker->ParseRSUGPS(invalid_gps_nmea_data); + ASSERT_EQ(0, gps_map_invalid.size()); + } + + TEST_F(test_RSUHealthMonitorWorker, getRSUStatus) + { + uint16_t port = 161; + auto json = _rsuWorker->getRSUStatus(RSUMibVersion::RSUMIB_V_4_1, "127.0.0.1", port, "test", "testtesttest", "authPriv", 1000); + ASSERT_TRUE(json.empty()); + + json = _rsuWorker->getRSUStatus(RSUMibVersion::RSUMIB_V_4_1, "127.0.0.1", port, "test", "test", "authPriv", 1000); + ASSERT_TRUE(json.empty()); + + json = _rsuWorker->getRSUStatus(RSUMibVersion::RSUMIB_V_1218, "127.0.0.1", port, "test", "test", "authPriv", 1000); + ASSERT_TRUE(json.empty()); + } + + TEST_F(test_RSUHealthMonitorWorker, convertJsonToTMXMsg) + { + Json::Value json; + json["rsuID"] = "RSU4.1"; + json["rsuMode"] = 4; + auto rsuStatusTmxMsg = _rsuWorker->convertJsonToTMXMsg(json); + string expectedStr = "{\"rsuID\":\"RSU4.1\",\"rsuMode\":4}\n"; + ASSERT_EQ(expectedStr, rsuStatusTmxMsg.to_string()); + } + + TEST_F(test_RSUHealthMonitorWorker, populateJson) + { + Json::Value rsuStatusJson; + snmp_response_obj stringObj; + stringObj.type = snmp_response_obj::response_type::STRING; + std::string gps_nmea_data = "$GPGGA,142440.00,3857.3065,N,07708.9734,W,2,18,0.65,86.18,M,-34.722,M,,*62"; + vector rgps_nmea_data_c; + copy(gps_nmea_data.begin(), gps_nmea_data.end(), back_inserter(rgps_nmea_data_c)); + stringObj.val_string = rgps_nmea_data_c; + + auto json = _rsuWorker->populateJson("rsuGpsOutputString", stringObj); + double expected_latitude = 38.9551; + double expected_longitude = -77.1496; + ASSERT_NEAR(expected_latitude, json["rsuGpsOutputStringLatitude"].asDouble(), 0.001); + ASSERT_NEAR(expected_longitude, json["rsuGpsOutputStringLongitude"].asDouble(), 0.001); + for(const auto& key: json.getMemberNames()) + { + rsuStatusJson[key] = json[key]; + } + + snmp_response_obj intObj; + intObj.type = snmp_response_obj::response_type::INTEGER; + intObj.val_int = 4; + + json = _rsuWorker->populateJson("rsuMode", intObj); + ASSERT_EQ(4, json["rsuMode"].asInt64()); + rsuStatusJson["rsuMode"] = json["rsuMode"]; + + Json::FastWriter fasterWirter; + string json_str = fasterWirter.write(rsuStatusJson); + string expectedStr = "{\"rsuGpsOutputString\":\"$GPGGA,142440.00,3857.3065,N,07708.9734,W,2,18,0.65,86.18,M,-34.722,M,,*62\",\"rsuGpsOutputStringLatitude\":38.955108330000002,\"rsuGpsOutputStringLongitude\":-77.149556669999996,\"rsuMode\":4}\n"; + ASSERT_EQ(expectedStr, json_str); + ASSERT_EQ(4, _rsuWorker->getJsonKeys(rsuStatusJson).size()); + ASSERT_EQ(1, _rsuWorker->getJsonKeys(json).size()); + } + +} \ No newline at end of file diff --git a/src/v2i-hub/SpatPlugin/test/testPedestrianDetectionForSPAT.cpp b/src/v2i-hub/SpatPlugin/test/testPedestrianDetectionForSPAT.cpp index 8910b8bb0..e404307b2 100644 --- a/src/v2i-hub/SpatPlugin/test/testPedestrianDetectionForSPAT.cpp +++ b/src/v2i-hub/SpatPlugin/test/testPedestrianDetectionForSPAT.cpp @@ -73,7 +73,7 @@ TEST(PedestrianDetectionForSPAT, updateEncodedSpat) EXPECT_EQ(spatPtr->intersections.list.array[0]->states.list.array[0]->signalGroup, 1); EXPECT_EQ(spatPtr->intersections.list.array[0]->states.list.array[1]->signalGroup, 2); ASSERT_NE(spatPtr->intersections.list.array[0]->maneuverAssistList, nullptr); - EXPECT_EQ(spatPtr->intersections.list.array[0]->maneuverAssistList->list.count, 1); + EXPECT_NE(spatPtr->intersections.list.array[0]->maneuverAssistList->list.count, 0); ASSERT_NE(spatPtr->intersections.list.array[0]->maneuverAssistList->list.array[0]->pedBicycleDetect, nullptr); EXPECT_EQ(*(spatPtr->intersections.list.array[0]->maneuverAssistList->list.array[0]->pedBicycleDetect), 1); } diff --git a/src/v2i-hub/TelematicBridgePlugin/CMakeLists.txt b/src/v2i-hub/TelematicBridgePlugin/CMakeLists.txt new file mode 100644 index 000000000..df7b1c0f7 --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/CMakeLists.txt @@ -0,0 +1,17 @@ +PROJECT (TelematicBridgePlugin VERSION 7.5.1 LANGUAGES CXX) + +set (TMX_PLUGIN_NAME "Telematic Bridge") + +BuildTmxPlugin() +TARGET_LINK_LIBRARIES ( ${PROJECT_NAME} tmxutils jsoncpp nats) + +#################################################### +################## Testing ####################### +#################################################### +add_library(${PROJECT_NAME}_lib src/TelematicUnit.cpp) +target_link_libraries(${PROJECT_NAME}_lib PUBLIC tmxutils jsoncpp nats) +enable_testing() +include_directories(${PROJECT_SOURCE_DIR}/src) +file(GLOB_RECURSE TEST_SOURCES LIST_DIRECTORIES false test/*.h test/*.cpp) +add_executable(${PROJECT_NAME}_test ${TEST_SOURCES}) +target_link_libraries(${PROJECT_NAME}_test PRIVATE gtest ${PROJECT_NAME}_lib) \ No newline at end of file diff --git a/src/v2i-hub/TelematicBridgePlugin/README.md b/src/v2i-hub/TelematicBridgePlugin/README.md new file mode 100644 index 000000000..6ed0a0497 --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/README.md @@ -0,0 +1,96 @@ +## Telematic bridge introduction +Telematic bridge is a V2xHub plugin used to collect data stream generated by V2xHub and send the data stream into [telematic tool](https://github.com/usdot-fhwa-stol/cda-telematics) hosted in the AWS cloud. +## NATS Publisher/Subscriber +### NATS Connections and registration +#### Telematic plugin sends registration request to telematic server +##### Subject +NATS subject: *.register_unit +##### Request +``` +{ + "unit_id": "" +} +``` +##### Response +``` +{ + "unit_id": "", + "unit_type": "infrastructure", + "unit_name": "East Intersection", + "timestamp": "1678998191815233965", + "event_name": "wfd_integration_testing", + "location": "TFHRC", + "testing_type": "Integration" +} +``` + +### Available topics +#### Telematic plugin receives request from telematic UI +##### Subject +NATS subject: .available_topics +##### Request +``` +{ + "unit_id": "" +} +``` +##### Reply + +``` + { + "unit_id": "", + "unit_type": "infrastructure", + "unit_name": "East Intersection", + "timestamp": "1678998191815233965", + "event_name": "wfd_integration_testing", + "location": "TFHRC", + "testing_type": "Integration", + "topics": [ + { + "name": "J2735_TMSG03-P_CARMAStreetsPlugin" + }, + { + "name": "" + } + ] +} +``` + +### Selected topics +#### Telematic plugin receives selected topics from telematic UI +##### Subject +NATS subject: .publish_topics +#### Request +``` +{ + "unit_id": "", + "unit_type": "infrastructure", + "timestamp": 1663084528513000400, + "event_name": "wfd_integration_testing", + "location": "TFHRC", + "testing_type": "Integration", + "topics": [ + "", + "" + ] +} +``` +##### Reply +``` +"request received!" +``` + +## Check status +#### Telematic plugin receives live status request from telematic server +##### Subject +NATS subject: .check_status +##### Request +``` +{ + "unit_id": "" +} +``` +##### Reponse +``` +"OK" +``` \ No newline at end of file diff --git a/src/v2i-hub/TelematicBridgePlugin/manifest.json b/src/v2i-hub/TelematicBridgePlugin/manifest.json new file mode 100644 index 000000000..d133ceb0f --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/manifest.json @@ -0,0 +1,26 @@ +{ + "name": "TelematicBridge", + "description": "Plugin that listens for TMX messages and forward them to the Telematic cloud services.", + "version": "@PROJECT_VERSION@", + "exeLocation": "/bin/TelematicBridgePlugin", + "coreIpAddr": "127.0.0.1", + "corePort": 24601, + "messageTypes": [], + "configuration": [ + { + "key": "LogLevel", + "default": "INFO", + "description": "The log level for this plugin" + }, + { + "key": "NATSUrl", + "default": "nats://127.0.0.1:4222", + "description": "The NATS connection URL" + }, + { + "key": "MessageExclusionList", + "default": "System_KeepAlive_CommandPlugin,System_KeepAlive_CARMAStreetsPlugin,System_KeepAlive_CDASimAdapter,System_KeepAlive_MessageReceiver", + "description": "The list of messages are excluded from the available message list. Message name is a combination of message type, subtype and source separated by underscore. E.G: type_subtype_source" + } + ] +} \ No newline at end of file diff --git a/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgeException.h b/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgeException.h new file mode 100644 index 000000000..5959ca1ff --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgeException.h @@ -0,0 +1,11 @@ +#pragma once +#include + +namespace TelematicBridge +{ + class TelematicBridgeException : public tmx::TmxException + { + public: + using TmxException::TmxException; + }; +} \ No newline at end of file diff --git a/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgeMsgWorker.h b/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgeMsgWorker.h new file mode 100644 index 000000000..c31c5cc9c --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgeMsgWorker.h @@ -0,0 +1,247 @@ +#pragma once +#include "PluginLog.h" +#include +#include "TelematicBridgeException.h" +#include "jsoncpp/json/json.h" +#include +#include +#include +#include + +using namespace tmx::utils; +using namespace std; +using namespace tmx::messages; +namespace pt = boost::property_tree; + +namespace TelematicBridge +{ + using buffer_structure_t = struct buffer_structure + { + char *buffer; // buffer array + size_t buffer_size; // this is really where we will write next. + size_t allocated_size; // this is the total size of the buffer. + }; + + /** + * @brief Convert hex string into a vector of bytes + * @param string input hex string payload + * @param vector byte buffer to be updated. + * @return bool indicator whether conversion is successful. + */ + bool HexToBytes(const string &hexPaylod, vector &byteBuffer) + { + uint8_t d = 0; + int i = 0; + + for (const char &c : hexPaylod) + { + if (c <= '9' && c >= '0') + { + d = c - '0'; + } + else if (c <= 'F' && c >= 'A') + { + d = c - 55; // c - 'A' + 10 + } + else if (c <= 'f' && c >= 'a') + { + d = c - 87; // c - 'a' + 10; + } + else + { + return false; + } + + if (i % 2) + { + // low order nibble. + byteBuffer.back() |= d; + } + else + { + // high order nibble. + byteBuffer.push_back(d << 4); + } + ++i; + } + return true; + } + + /** + * @brief Decode J2735 message and populate the J2735 data frame + * @param string input hex string payload + * @param MessageFrame_t J2735 struct to be updated + * @return bool indicator whether decoding successful + */ + void DecodeJ2735Msg(const string &hexPaylod, MessageFrame_t *messageFrame) + { + /** + * Decode J2735 message + */ + ostringstream erroross; + vector byte_buffer; + if (!HexToBytes(hexPaylod, byte_buffer)) + { + throw TelematicBridgeException("Failed attempt to decode MessageFrame hex string: cannot convert to bytes."); + } + asn_dec_rval_t decode_rval = asn_decode( + nullptr, + ATS_UNALIGNED_BASIC_PER, + &asn_DEF_MessageFrame, + (void **)&messageFrame, + byte_buffer.data(), + byte_buffer.size()); + + if (decode_rval.code != RC_OK) + { + erroross.str(""); + erroross << "failed ASN.1 binary decoding of element " << asn_DEF_MessageFrame.name << ": bad data. Successfully decoded " << decode_rval.consumed << " bytes."; + throw TelematicBridgeException(erroross.str()); + } + } + + int DynamicBufferAppend(const void *buffer, size_t size, void *app_key) + { + auto *xb = static_cast(app_key); + + while (xb->buffer_size + size + 1 > xb->allocated_size) + { + // increase size of buffer. + size_t new_size = 2 * (xb->allocated_size ? xb->allocated_size : 64); + auto new_buf = static_cast(MALLOC(new_size)); + if (!new_buf) + return -1; + // move old to new. + memcpy(new_buf, xb->buffer, xb->buffer_size); + + FREEMEM(xb->buffer); + xb->buffer = new_buf; + xb->allocated_size = new_size; + } + + memcpy(xb->buffer + xb->buffer_size, buffer, size); + xb->buffer_size += size; + // null terminate the string. + xb->buffer[xb->buffer_size] = '\0'; + return 0; + } + + /** + * @brief Convert the J2735 messageFrame into string in XML format + * @param MessageFrame_t J2735 struct + * @return string XML formatted J2735 message + */ + string ConvertJ2735FrameToXML(const MessageFrame_t *messageFrame) + { + /** + * Convert J2735 message into XML + */ + buffer_structure_t xml_buffer = {nullptr, 0, 0}; + asn_enc_rval_t encode_rval = xer_encode( + &asn_DEF_MessageFrame, + messageFrame, + XER_F_CANONICAL, + DynamicBufferAppend, + static_cast(&xml_buffer)); + if (encode_rval.encoded == -1) + { + throw TelematicBridgeException("Failed to convert message with ID (=" + to_string(messageFrame->messageId) + ") to XML "); + } + return string(xml_buffer.buffer); + } + + /** + * @brief convert JSON value into string + * @param JSON input Json::Value + * @return string + */ + string JsonToString(const Json::Value &json) + { + Json::FastWriter fasterWirter; + string jsonStr = fasterWirter.write(json); + boost::replace_all(jsonStr, "\\n", ""); + boost::replace_all(jsonStr, "\n", ""); + boost::replace_all(jsonStr, "\\t", ""); + boost::replace_all(jsonStr, "\\", ""); + return jsonStr; + } + /** + * @brief convert string into JSON value + * @param string input string + * @return JSON::Value + */ + Json::Value StringToJson(const string &str) + { + Json::Value root; + Json::Reader reader; + bool parsingSuccessful = reader.parse(str, root); + if (!parsingSuccessful) + { + throw TelematicBridgeException("Error parsing the string"); + } + return root; + } + /** + * @brief Convert XML string into JSON string + */ + string xml2Json(const string &xml_str) + { + stringstream xmlss; + xmlss << xml_str; + pt::ptree root; + pt::read_xml(xmlss, root); + stringstream jsonss; + pt::write_json(jsonss, root, false); + return jsonss.str(); + } + /** + * @brief create JSON payload from given IVP message + * @param IVPMessage V2xHub interval exchanged message + * @return JSON value + */ + Json::Value IvpMessageToJson(const IvpMessage *msg) + { + Json::Value json; + if (msg->type) + { + json["type"] = msg->type; + } + + if (msg->subtype) + { + json["subType"] = msg->subtype; + } + + if (msg->dsrcMetadata) + { + json["channel"] = msg->dsrcMetadata->channel; + json["psid"] = msg->dsrcMetadata->psid; + } + + if (msg->encoding) + { + json["encoding"] = msg->encoding; + } + + if (msg->source) + { + json["source"] = msg->source; + } + json["sourceId"] = msg->sourceId; + json["flags"] = msg->flags; + json["timestamp"] = msg->timestamp; + if (msg->payload) + { + if (msg->payload->type == cJSON_Number) + { + json["payload"] = (msg->payload->valueint == 0 ? msg->payload->valuedouble : static_cast(msg->payload->valueint)); + } + else + { + json["payload"] = StringToJson(cJSON_Print(msg->payload)); + } + } + + return json; + } +} // TelematicBridge \ No newline at end of file diff --git a/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.cpp b/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.cpp new file mode 100644 index 000000000..44a6738f2 --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.cpp @@ -0,0 +1,83 @@ +#include "TelematicBridgePlugin.h" + +using namespace tmx::utils; +using namespace std; + +namespace TelematicBridge +{ + TelematicBridgePlugin::TelematicBridgePlugin(const string &name) : PluginClient(name) + { + _telematicUnitPtr = make_unique(); + _unitId = std::getenv("INFRASTRUCTURE_ID"); + _unitName = std::getenv("INFRASTRUCTURE_NAME"); + UpdateConfigSettings(); + AddMessageFilter("*", "*", IvpMsgFlags_None); + AddMessageFilter("J2735", "*", IvpMsgFlags_RouteDSRC); + SubscribeToMessages(); + } + + void TelematicBridgePlugin::OnMessageReceived(IvpMessage *msg) + { + if (msg && msg->type) + { + auto json = IvpMessageToJson(msg); + // Process J2735 message payload hex string + if (strcasecmp(msg->type, Telematic_MSGTYPE_J2735_STRING) == 0) + { + auto messageFm = (MessageFrame_t *)calloc(1, sizeof(MessageFrame_t)); + DecodeJ2735Msg(msg->payload->valuestring, messageFm); + string xml_payload_str = ConvertJ2735FrameToXML(messageFm); + ASN_STRUCT_FREE(asn_DEF_MessageFrame, messageFm); + string json_payload_str = xml2Json(xml_payload_str.c_str()); + json["payload"] = StringToJson(json_payload_str); + } + + stringstream topic; + topic << (msg->type ? msg->type : "") << "_" << (msg->subtype ? msg->subtype : "") << "_" << (msg->source ? msg->source : ""); + auto topicStr = topic.str(); + _telematicUnitPtr->updateAvailableTopics(topicStr); + if (_telematicUnitPtr->inSelectedTopics(topicStr)) + { + _telematicUnitPtr->publishMessage(topicStr, json); + } + } + } + + void TelematicBridgePlugin::UpdateConfigSettings() + { + lock_guard lock(_configMutex); + GetConfigValue("NATSUrl", _natsURL); + GetConfigValue("MessageExclusionList", _excludedMessages); + unit_st unit = {_unitId, _unitName, UNIT_TYPE_INFRASTRUCTURE}; + if (_telematicUnitPtr) + { + _telematicUnitPtr->setUnit(unit); + _telematicUnitPtr->updateExcludedTopics(_excludedMessages); + } + } + + void TelematicBridgePlugin::OnStateChange(IvpPluginState state) + { + PluginClient::OnStateChange(state); + if (state == IvpPluginState_registered) + { + UpdateConfigSettings(); + if (_telematicUnitPtr) + { + _telematicUnitPtr->connect(_natsURL); + } + } + } + + void TelematicBridgePlugin::OnConfigChanged(const char *key, const char *value) + { + PluginClient::OnConfigChanged(key, value); + UpdateConfigSettings(); + } +} + +// The main entry point for this application. +int main(int argc, char *argv[]) +{ + return run_plugin("Telematic Bridge", argc, argv); +} \ No newline at end of file diff --git a/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.h b/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.h new file mode 100644 index 000000000..a75b21bbe --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/src/TelematicBridgePlugin.h @@ -0,0 +1,35 @@ +#ifndef _TelematicBRIDGEPLUGIN_H_ +#define _TelematicBRIDGEPLUGIN_H_ + +#include "PluginClient.h" +#include "TelematicBridgeMsgWorker.h" +#include "TelematicUnit.h" +#include + + +namespace TelematicBridge +{ + class TelematicBridgePlugin : public tmx::utils::PluginClient + { + private: + static CONSTEXPR const char *Telematic_MSGTYPE_J2735_STRING = "J2735"; + static CONSTEXPR const char *UNIT_TYPE_INFRASTRUCTURE = "Infrastructure"; + std::unique_ptr _telematicUnitPtr; + std::string _unitId; + std::string _unitName; + std::string _natsURL; + std::string _excludedMessages; + std::mutex _configMutex; + void OnMessageReceived(IvpMessage *msg); + + public: + explicit TelematicBridgePlugin(const std::string &name); + void OnConfigChanged(const char *key, const char *value) override; + void OnStateChange(IvpPluginState state) override; + void UpdateConfigSettings(); + ~TelematicBridgePlugin() override = default; + }; + +} // namespace TelematicBridge + +#endif \ No newline at end of file diff --git a/src/v2i-hub/TelematicBridgePlugin/src/TelematicUnit.cpp b/src/v2i-hub/TelematicBridgePlugin/src/TelematicUnit.cpp new file mode 100644 index 000000000..e99029b53 --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/src/TelematicUnit.cpp @@ -0,0 +1,320 @@ +#include "TelematicUnit.h" + +using namespace std; +using namespace tmx::utils; +using namespace std::chrono; + +namespace TelematicBridge +{ + void TelematicUnit::connect(const string &natsURL) + { + auto s = natsConnection_ConnectTo(&_conn, natsURL.c_str()); + PLOG(logINFO) << "NATS connection returned: " << natsStatus_GetText(s); + if (s == NATS_OK) + { + registerUnitRequestor(); + } + else + { + throw TelematicBridgeException(natsStatus_GetText(s)); + } + } + + void TelematicUnit::registerUnitRequestor() + { + // Reset registration status + bool isRegistered = false; + int attempts_count = 0; + + while (!isRegistered && attempts_count < REGISTRATION_MAX_ATTEMPTS) + { + attempts_count++; + PLOG(logDEBUG2) << "Inside register unit requestor"; + natsMsg *reply = nullptr; + string payload = "{\"unit_id\":\"" + _unit.unitId + "\"}"; + auto s = natsConnection_RequestString(&reply, _conn, REGISTER_UNIT_TOPIC, payload.c_str(), TIME_OUT); + if (s == NATS_OK) + { + auto replyStr = natsMsg_GetData(reply); + PLOG(logINFO) << "Received registered reply: " << replyStr; + + // Unit is registered when server responds with event information (location, testing_type, event_name) + isRegistered = validateRegisterStatus(replyStr); + natsMsg_Destroy(reply); + } + else + { + throw TelematicBridgeException(natsStatus_GetText(s)); + } + sleep(1); + } + + if (isRegistered) + { + // Provide below services when the unit is registered + availableTopicsReplier(); + selectedTopicsReplier(); + checkStatusReplier(); + } + } + + bool TelematicUnit::validateRegisterStatus(const string ®isterReply) + { + auto root = parseJson(registerReply); + if (root.isMember(LOCATION_KEY) && root.isMember(TESTING_TYPE_KEY) && root.isMember(EVENT_NAME_KEY)) + { + _eventLocation = root[LOCATION_KEY].asString(); + _testingType = root[TESTING_TYPE_KEY].asString(); + _eventName = root[EVENT_NAME_KEY].asString(); + return true; + } + PLOG(logERROR) << "Failed to register unit as event information (locatoin, testing type and event name) does not exist."; + return false; + } + + void TelematicUnit::availableTopicsReplier() + { + if (!_subAvailableTopic) + { + PLOG(logDEBUG2) << "Inside available topic replier"; + stringstream topic; + topic << _unit.unitId << AVAILABLE_TOPICS; + natsConnection_Subscribe(&_subAvailableTopic, _conn, topic.str().c_str(), onAvailableTopicsCallback, this); + } + } + + void TelematicUnit::selectedTopicsReplier() + { + if (!_subSelectedTopic) + { + PLOG(logDEBUG2) << "Inside selected topic replier"; + stringstream topic; + topic << _unit.unitId << PUBLISH_TOPICS; + natsConnection_Subscribe(&_subSelectedTopic, _conn, topic.str().c_str(), onSelectedTopicsCallback, this); + } + } + + void TelematicUnit::checkStatusReplier() + { + if (!_subCheckStatus) + { + PLOG(logDEBUG2) << "Inside check status replier"; + stringstream topic; + topic << _unit.unitId << CHECK_STATUS; + natsConnection_Subscribe(&_subCheckStatus, _conn, topic.str().c_str(), onCheckStatusCallback, this); + } + } + + void TelematicUnit::publishMessage(const string &topic, const Json::Value &payload) + { + auto pubMsgTopic = "streets." + _unit.unitId + ".data." + topic; + auto jsonStr = constructPublishedDataString(_unit, _eventLocation, _testingType, _eventName, topic, payload); + auto s = natsConnection_PublishString(_conn, pubMsgTopic.c_str(), jsonStr.c_str()); + if (s == NATS_OK) + { + PLOG(logINFO) << "Topic: " << pubMsgTopic << ". Published: " << jsonStr; + } + else + { + throw TelematicBridgeException(natsStatus_GetText(s)); + } + } + + void TelematicUnit::onAvailableTopicsCallback(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *object) + { + PLOG(logDEBUG3) << "Received available topics: " << natsMsg_GetSubject(msg) << " " << natsMsg_GetData(msg); + // Sends a reply + if (object && natsMsg_GetReply(msg) != nullptr) + { + const auto obj = (TelematicUnit *)object; + auto reply = constructAvailableTopicsReplyString(obj->getUnit(), obj->getEventLocation(), obj->getTestingType(), obj->getEventName(), obj->getAvailableTopics(), obj->getExcludedTopics()); + auto s = natsConnection_PublishString(nc, natsMsg_GetReply(msg), reply.c_str()); + natsMsg_Destroy(msg); + if (s == NATS_OK) + { + PLOG(logDEBUG3) << "Available topics replied: " << reply; + } + } + } + + void TelematicUnit::onSelectedTopicsCallback(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *object) + { + PLOG(logDEBUG3) << "Received selected topics: " << natsMsg_GetSubject(msg) << " " << natsMsg_GetData(msg); + // Sends a reply + if (natsMsg_GetReply(msg) != nullptr) + { + auto msgStr = natsMsg_GetData(msg); + auto root = parseJson(msgStr); + if (object && root.isMember(TOPICS_KEY) && root[TOPICS_KEY].isArray()) + { + auto obj = (TelematicUnit *)object; + // clear old selected topics + obj->clearSelectedTopics(); + + // update selected topics with selected topics from latest request + for (auto itr = root[TOPICS_KEY].begin(); itr != root[TOPICS_KEY].end(); itr++) + { + obj->addSelectedTopic(itr->asString()); + } + } + string reply = "request received!"; + auto s = natsConnection_PublishString(nc, natsMsg_GetReply(msg), reply.c_str()); + natsMsg_Destroy(msg); + if (s == NATS_OK) + { + PLOG(logDEBUG3) << "Selected topics replied: " << reply; + } + } + } + + void TelematicUnit::onCheckStatusCallback(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *object) + { + if (natsMsg_GetReply(msg) != nullptr) + { + auto s = natsConnection_PublishString(nc, natsMsg_GetReply(msg), "OK"); + if (s == NATS_OK) + { + PLOG(logDEBUG3) << "Received check status msg: " << natsMsg_GetSubject(msg) << " " << natsMsg_GetData(msg) << ". Replied: OK"; + } + natsMsg_Destroy(msg); + } + } + + string TelematicUnit::constructPublishedDataString(const unit_st &unit, const string &eventLocation, const string &testingType, const string &eventName, const string &topicName, const Json::Value &payload) const + { + Json::Value message; + message[UNIT_ID_KEY] = unit.unitId; + message[UNIT_NAME_KEY] = unit.unitName; + message[UNIT_TYPE_KEY] = unit.unitType; + message[LOCATION_KEY] = eventLocation; + message[TESTING_TYPE_KEY] = testingType; + message[EVENT_NAME_KEY] = eventName; + message[TOPIC_NAME_KEY] = topicName; + message[TIMESTAMP_KEY] = payload.isMember("timestamp") ? payload["timestamp"].asUInt64() * MILLI_TO_MICRO : duration_cast(system_clock::now().time_since_epoch()).count(); + message[PAYLOAD_KEY] = payload; + Json::FastWriter fasterWirter; + string jsonStr = fasterWirter.write(message); + return jsonStr; + } + + Json::Value TelematicUnit::parseJson(const string &jsonStr) + { + Json::Value root; + Json::Reader reader; + bool parsingSuccessful = reader.parse(jsonStr, root); + if (!parsingSuccessful) + { + throw TelematicBridgeException("Error parsing the reply message"); + } + return root; + } + + string TelematicUnit::constructAvailableTopicsReplyString(const unit_st &unit, const string &eventLocation, const string &testingType, const string &eventName, const vector &availableTopicList, const string &excludedTopics) + { + Json::Value message; + message[UNIT_ID_KEY] = unit.unitId; + message[UNIT_NAME_KEY] = unit.unitName; + message[UNIT_TYPE_KEY] = unit.unitType; + message[LOCATION_KEY] = eventLocation; + message[TESTING_TYPE_KEY] = testingType; + message[EVENT_NAME_KEY] = eventName; + message[TIMESTAMP_KEY] = duration_cast(system_clock::now().time_since_epoch()).count(); + Json::Value topics; + for (const auto &topic : availableTopicList) + { + if (!boost::icontains(excludedTopics, topic)) + { + Json::Value topicJson; + topicJson[NAME_KEY] = topic; + topics.append(topicJson); + } + } + message[TOPICS_KEY] = topics; + Json::FastWriter fasterWirter; + string reply = fasterWirter.write(message); + return reply; + } + + void TelematicUnit::updateAvailableTopics(const string &newTopic) + { + if (find(_availableTopics.begin(), _availableTopics.end(), newTopic) == _availableTopics.end()) + { + lock_guard lock(_availableTopicsMutex); + _availableTopics.push_back(newTopic); + PLOG(logINFO) << "Add topic (= " << newTopic << ") to available topics list. Size: " << _availableTopics.size(); + } + } + + void TelematicUnit::updateExcludedTopics(const string &excludedTopics) + { + lock_guard lock(_excludedTopicsMutex); + _excludedTopics = excludedTopics; + } + + bool TelematicUnit::inSelectedTopics(const string &topic) + { + if (find(_selectedTopics.begin(), _selectedTopics.end(), topic) == _selectedTopics.end()) + { + return false; + } + return true; + } + + void TelematicUnit::setUnit(const unit_st &unit) + { + lock_guard lock(_unitMutex); + _unit = unit; + } + + unit_st TelematicUnit::getUnit() const + { + return _unit; + } + + string TelematicUnit::getEventName() const + { + return _eventName; + } + + string TelematicUnit::getEventLocation() const + { + return _eventLocation; + } + + string TelematicUnit::getTestingType() const + { + return _testingType; + } + + vector TelematicUnit::getAvailableTopics() const + { + return _availableTopics; + } + + string TelematicUnit::getExcludedTopics() const + { + return _excludedTopics; + } + + void TelematicUnit::addSelectedTopic(const string &newSelectedTopic) + { + _selectedTopics.push_back(newSelectedTopic); + } + + void TelematicUnit::clearSelectedTopics() + { + _selectedTopics.clear(); + } + + TelematicUnit::~TelematicUnit() + { + natsSubscription_Destroy(_subAvailableTopic); + natsSubscription_Destroy(_subSelectedTopic); + natsSubscription_Destroy(_subCheckStatus); + natsConnection_Destroy(_conn); + _conn = nullptr; + _subAvailableTopic = nullptr; + _subSelectedTopic = nullptr; + _subCheckStatus = nullptr; + } +} \ No newline at end of file diff --git a/src/v2i-hub/TelematicBridgePlugin/src/TelematicUnit.h b/src/v2i-hub/TelematicBridgePlugin/src/TelematicUnit.h new file mode 100644 index 000000000..3a72f080b --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/src/TelematicUnit.h @@ -0,0 +1,216 @@ +#pragma once +#include +#include +#include +#include +#include +#include "PluginLog.h" +#include "ThreadTimer.h" +#include "TelematicBridgeException.h" +#include +#include + + +namespace TelematicBridge +{ + using unit_st = struct unit + { + std::string unitId; // Unique identifier for each unit + std::string unitName; // Descriptive name for each unit + std::string unitType; // Unit categorized base on unit type: platform or infrastructure + }; + + class TelematicUnit + { + private: + std::mutex _unitMutex; + std::mutex _availableTopicsMutex; + std::mutex _excludedTopicsMutex; + unit_st _unit; // Global variable to store the unit information + std::vector _availableTopics; // Global variable to store available topics + std::string _excludedTopics; // Global variable to store topics that are excluded by the users + std::vector _selectedTopics; // Global variable to store selected topics confirmed by users + static CONSTEXPR const char *AVAILABLE_TOPICS = ".available_topics"; // NATS subject to pub/sub available topics + static CONSTEXPR const char *REGISTER_UNIT_TOPIC = "*.register_unit"; // NATS subject to pub/sub registering unit + static CONSTEXPR const char *PUBLISH_TOPICS = ".publish_topics"; // NATS subject to publish data stream + static CONSTEXPR const char *CHECK_STATUS = ".check_status"; // NATS subject to pub/sub checking unit status + natsConnection *_conn = nullptr; // Global NATS connection object + natsSubscription *_subAvailableTopic = nullptr; // Global NATS subscription object + natsSubscription *_subSelectedTopic = nullptr; // Global NATS subscription object + natsSubscription *_subCheckStatus = nullptr; // Global NATS subscription object + int64_t TIME_OUT = 10000; // NATS Connection time out in milliseconds + std::string _eventName; // Testing event the unit is assigned to + std::string _eventLocation; // Testing event location + std::string _testingType; // Testing type + static CONSTEXPR const char *LOCATION_KEY = "location"; // location key used to find location value from JSON + static CONSTEXPR const char *TESTING_TYPE_KEY = "testing_type"; // testing_type key used to find testing_type value from JSON + static CONSTEXPR const char *EVENT_NAME_KEY = "event_name"; // event_name key used to find event_name value from JSON + static CONSTEXPR const char *UNIT_ID_KEY = "unit_id"; // unit_id key used to find unit_id value from JSON + static CONSTEXPR const char *UNIT_NAME_KEY = "unit_name"; // unit_name key used to find unit_name value from JSON + static CONSTEXPR const char *UNIT_TYPE_KEY = "unit_type"; // unit_type key used to find unit_type value from JSON + static CONSTEXPR const char *TOPIC_NAME_KEY = "topic_name"; // topic_name key used to find topic_name value from JSON + static CONSTEXPR const char *TIMESTAMP_KEY = "timestamp"; // timestamp key used to find timestamp value from JSON + static CONSTEXPR const char *PAYLOAD_KEY = "payload"; // payload key used to find payload value from JSON + static CONSTEXPR const char *TOPICS_KEY = "topics"; // topics key used to find topics value from JSON + static CONSTEXPR const char *NAME_KEY = "name"; // topics key used to find topics value from JSON + static const int MILLI_TO_MICRO = 1000; + static const int REGISTRATION_MAX_ATTEMPTS = 30; //The maximum numbers of attempts allowed to register this unit with server + + public: + /** + *@brief Construct telematic unit + */ + explicit TelematicUnit() = default; + /** + * @brief A function for telematic unit to connect to NATS server. Throw exception is connection failed. * + * @param string string NATS server URL + */ + void connect(const std::string &natsURL); + + /** + * @brief A NATS requestor for telematic unit to send register request to NATS server. + * If receives a response, it will update the isRegistered flag to indicate the unit is registered. + * If no response after the specified time out (unit of second) period, it considered register failed. + * */ + void registerUnitRequestor(); + + /** + * @brief A NATS replier to subscribe to NATS server and receive available topics request. + * Publish list of available topics after receiving/processing the request. + */ + void availableTopicsReplier(); + + /** + * @brief A NATS replier to subscribe to NATS server and receive requested topics from telematic server. + * Process the request and update the selectedTopics global variable. + * Respond the telematic server with acknowledgement. + */ + void selectedTopicsReplier(); + + /** + * @brief A NATS replier to subscribe to NATS server and receive request for status check from telematic server. + * Publish unit status upon receiving a request. + */ + void checkStatusReplier(); + + /** + * @brief A function to publish message stream into NATS server + */ + void publishMessage(const std::string &topic, const Json::Value &payload); + + /** + * @brief A function to parse a JSON string and create a JSON object. + * @param string input json string + * @return Json::Value + */ + static Json::Value parseJson(const std::string &jsonStr); + + /** + * @brief construct available topic response + * @param unit_st struct that contains unit related information + * @param vector of available topics + * @param string Excluded topics separated by commas + */ + static std::string constructAvailableTopicsReplyString(const unit_st &unit, const std::string &eventLocation, const std::string &testingType, const std::string &eventName, const std::vector &availableTopicList, const std::string &excludedTopics); + + /** + * @brief A function to update available topics global variables when discovering a new topic. + */ + void updateAvailableTopics(const std::string &newTopic); + + /** + * @brief Update telematic unit registration status when receiving registration reply from NATS server + * @param string Register reply in Json format + * @return True when reply with event information (location, testing type, event name), otherwise false. + */ + bool validateRegisterStatus(const std::string& registerReply); + + /** + * @brief construct Json data string that will be streamed into the cloud by a publisher + * @param unit_st struct that contains unit related information + * @param string Event location + * @param string Testing type + * @param string Event name + * @param string Topic name is a combination of type_subtype_source from TMX IVPMessage + * @param Json::Value Payload is the actual data generated by V2xHub plugin + */ + std::string constructPublishedDataString(const unit_st &unit, const std::string &eventLocation, const std::string &testingType, const std::string &eventName, const std::string &topicName, const Json::Value &payload) const; + + /** + * @brief A function to update global unit variable + * @param unit_st object that has the unit id, type and name information + */ + void setUnit(const unit_st &unit); + + /** + * @brief Return unit structure + */ + unit_st getUnit() const; + + /** + * @brief Return list of available topics + */ + std::vector getAvailableTopics() const; + + /** + * @brief Return excluded topics string. + */ + std::string getExcludedTopics() const; + + /** + * @brief Return event name + */ + std::string getEventName() const; + + /** + * @brief Return event location + */ + std::string getEventLocation() const; + + /** + * @brief Return testing type + */ + std::string getTestingType() const; + + /** + * @brief Add new selected topic into the selected topics list + */ + void addSelectedTopic(const std::string &newSelectedTopic); + + /** + * @brief Clear selected topics list + */ + void clearSelectedTopics(); + + /** + * @brief A function to update excluded topics. + * @param string Excluded topics separated by commas + */ + void updateExcludedTopics(const std::string &excludedTopics); + + /** + * @brief Check if the given topic is inside the selectedTopics list + * @param string A topic to check for existence + * @return boolean indicator whether the input topic eixst. + */ + bool inSelectedTopics(const std::string &topic); + + /** + * @brief A callback function for available topic replier + */ + static void onAvailableTopicsCallback(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *object); + + /** + * @brief A callback function for selected topic replier + */ + static void onSelectedTopicsCallback(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *object); + + /** + * @brief A callback function for check status replier + */ + static void onCheckStatusCallback(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *object); + + ~TelematicUnit(); + }; + +} // namespace TelematicBridge diff --git a/src/v2i-hub/TelematicBridgePlugin/test/main.cpp b/src/v2i-hub/TelematicBridgePlugin/test/main.cpp new file mode 100644 index 000000000..d973578fc --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/test/main.cpp @@ -0,0 +1,7 @@ +#include + +int main(int argc, char *argv[]) +{ + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/v2i-hub/TelematicBridgePlugin/test/test_TelematicMsgWorker.cpp b/src/v2i-hub/TelematicBridgePlugin/test/test_TelematicMsgWorker.cpp new file mode 100644 index 000000000..9760ec906 --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/test/test_TelematicMsgWorker.cpp @@ -0,0 +1,106 @@ +#include +#include "TelematicBridgeMsgWorker.h" +#include "stdio.h" + +using namespace TelematicBridge; +using namespace std; + +class test_TelematicJ2735MsgWorker : public ::testing::Test +{ +}; + +TEST_F(test_TelematicJ2735MsgWorker, HexToBytes) +{ + vector byteBuff; + string bsmHex = "0014251d59d162dad7de266e9a7d1ea6d4220974ffffffff8ffff080fdfa1fa1007fff0000640fa0"; + auto success = HexToBytes(bsmHex, byteBuff); + ASSERT_TRUE(success); + ASSERT_EQ(bsmHex.size() / 2, byteBuff.size()); +} + +TEST_F(test_TelematicJ2735MsgWorker, DecodeJ2735Msg) +{ + auto messageFrame = (MessageFrame_t *)malloc(sizeof(MessageFrame_t)); + string bsmHex = "0014251d59d162dad7de266e9a7d1ea6d4220974ffffffff8ffff080fdfa1fa1007fff0000640fa0"; + ASSERT_NO_THROW(DecodeJ2735Msg(bsmHex, messageFrame)); + ASSERT_EQ(20, messageFrame->messageId); + ASN_STRUCT_FREE(asn_DEF_MessageFrame, messageFrame); +} + +TEST_F(test_TelematicJ2735MsgWorker, DecodeJ2735MsgFailure) +{ + auto messageFrame = (MessageFrame_t *)malloc(sizeof(MessageFrame_t)); + string badHex = "0014251d59d162dad7de266e9a7d1ea6d4220974ffffffff8ffff080fdfa1fa1007fff0000640fG0"; + ASSERT_THROW(DecodeJ2735Msg(badHex, messageFrame), TelematicBridgeException); + + badHex = "0014251d59d162dad7de266e9a"; + ASSERT_THROW(DecodeJ2735Msg(badHex, messageFrame), TelematicBridgeException); + ASN_STRUCT_FREE(asn_DEF_MessageFrame, messageFrame); +} + +TEST_F(test_TelematicJ2735MsgWorker, ConvertJ2735FrameToXML) +{ + auto messageFrame = (MessageFrame_t *)malloc(sizeof(MessageFrame_t)); + string bsmHex = "0014251d59d162dad7de266e9a7d1ea6d4220974ffffffff8ffff080fdfa1fa1007fff0000640fA0"; + ASSERT_NO_THROW(DecodeJ2735Msg(bsmHex, messageFrame)); + auto xmlStr = ConvertJ2735FrameToXML(messageFrame); + ASN_STRUCT_FREE(asn_DEF_MessageFrame, messageFrame); + string expectedXMLStr = "2011767458B6B24440389565434-7715004757452552556553581912880012720012001-127000000200500"; + ASSERT_EQ(expectedXMLStr, xmlStr); +} + +TEST_F(test_TelematicJ2735MsgWorker, constructTelematicPayload) +{ + IvpMessage msg; + auto payload = cJSON_CreateObject(); + payload->valuedouble = 12; + payload->type = cJSON_Number; + msg.payload = payload; + msg.source = "Plugin"; + msg.encoding = "json"; + msg.type = "Application"; + msg.subtype = "alive"; + msg.sourceId = 123; + msg.flags = 10; + msg.timestamp = 10; + IvpDsrcMetadata metadata; + metadata.channel = 12; + metadata.psid = 120; + msg.dsrcMetadata = &metadata; + auto json = IvpMessageToJson(&msg); + auto str = JsonToString(json); + string expectedStr = "{\"channel\":12,\"encoding\":\"json\",\"flags\":10,\"payload\":12.0,\"psid\":120,\"source\":\"Plugin\",\"sourceId\":123,\"subType\":\"alive\",\"timestamp\":10,\"type\":\"Application\"}"; + ASSERT_EQ(expectedStr, str); + + payload->valueint = 13; + payload->type = cJSON_Number; + msg.payload = payload; + json = IvpMessageToJson(&msg); + str = JsonToString(json); + expectedStr = "{\"channel\":12,\"encoding\":\"json\",\"flags\":10,\"payload\":13.0,\"psid\":120,\"source\":\"Plugin\",\"sourceId\":123,\"subType\":\"alive\",\"timestamp\":10,\"type\":\"Application\"}"; + ASSERT_EQ(expectedStr, str); + + payload->valuestring = "test"; + payload->type = cJSON_String; + msg.payload = payload; + json = IvpMessageToJson(&msg); + str = JsonToString(json); + expectedStr = "{\"channel\":12,\"encoding\":\"json\",\"flags\":10,\"payload\":\"test\",\"psid\":120,\"source\":\"Plugin\",\"sourceId\":123,\"subType\":\"alive\",\"timestamp\":10,\"type\":\"Application\"}"; + ASSERT_EQ(expectedStr, str); + + msg.payload = cJSON_Parse("[{\"test\":12}]"); + json = IvpMessageToJson(&msg); + str = JsonToString(json); + expectedStr = "{\"channel\":12,\"encoding\":\"json\",\"flags\":10,\"payload\":[{\"test\":12}],\"psid\":120,\"source\":\"Plugin\",\"sourceId\":123,\"subType\":\"alive\",\"timestamp\":10,\"type\":\"Application\"}"; + ASSERT_EQ(expectedStr, str); + + json = StringToJson("{\"test\":12}"); + ASSERT_EQ(12, json["test"].asInt64()); +} + +TEST_F(test_TelematicJ2735MsgWorker, xml2json) +{ + string expectedXMLStr = "2011767458B6B24440389565434-7715004757452552556553581912880012720012001-127000000200500"; + string expectedJSONStr = "{\"MessageFrame\":{\"messageId\":\"20\",\"value\":{\"BasicSafetyMessage\":{\"coreData\":{\"msgCnt\":\"117\",\"id\":\"67458B6B\",\"secMark\":\"24440\",\"lat\":\"389565434\",\"long\":\"-771500475\",\"elev\":\"745\",\"accuracy\":{\"semiMajor\":\"255\",\"semiMinor\":\"255\",\"orientation\":\"65535\"},\"transmission\":{\"neutral\":\"\"},\"speed\":\"8191\",\"heading\":\"28800\",\"angle\":\"127\",\"accelSet\":{\"long\":\"2001\",\"lat\":\"2001\",\"vert\":\"-127\",\"yaw\":\"0\"},\"brakes\":{\"wheelBrakes\":\"00000\",\"traction\":{\"unavailable\":\"\"},\"abs\":{\"unavailable\":\"\"},\"scs\":{\"unavailable\":\"\"},\"brakeBoost\":{\"unavailable\":\"\"},\"auxBrakes\":{\"unavailable\":\"\"}},\"size\":{\"width\":\"200\",\"length\":\"500\"}}}}}}\n"; + ASSERT_EQ(expectedJSONStr, xml2Json(expectedXMLStr)); +} \ No newline at end of file diff --git a/src/v2i-hub/TelematicBridgePlugin/test/test_TelematicUnit.cpp b/src/v2i-hub/TelematicBridgePlugin/test/test_TelematicUnit.cpp new file mode 100644 index 000000000..9a81427b4 --- /dev/null +++ b/src/v2i-hub/TelematicBridgePlugin/test/test_TelematicUnit.cpp @@ -0,0 +1,156 @@ +#include +#include "TelematicUnit.h" +using namespace std; +using namespace tmx::utils; +using namespace std::chrono; + +namespace TelematicBridge +{ + class test_TelematicUnit : public ::testing::Test + { + public: + shared_ptr _telematicUnitPtr = make_shared(); + unit_st unit = + { + "test_id", + "test_name", + "infrastructure"}; + }; + + TEST_F(test_TelematicUnit, setUnit) + { + ASSERT_NO_THROW(_telematicUnitPtr->setUnit(unit)); + ASSERT_EQ(unit.unitId, _telematicUnitPtr->getUnit().unitId); + } + + TEST_F(test_TelematicUnit, updateExcludedTopics) + { + ASSERT_NO_THROW(_telematicUnitPtr->updateExcludedTopics("test_topic")); + } + + TEST_F(test_TelematicUnit, updateAvailableTopics) + { + ASSERT_NO_THROW(_telematicUnitPtr->updateAvailableTopics("test_topic")); + } + + TEST_F(test_TelematicUnit, constructAvailableTopicsReplyString) + { + vector topics = {"test_topic", "excluded_topic"}; + string excluded_topic = "excluded_topic"; + string eventLocation = "location"; + string testingType = "unit_test"; + string eventName = "testing"; + auto reply = TelematicUnit::constructAvailableTopicsReplyString(unit, eventLocation, testingType, eventName, topics, excluded_topic); + auto json = TelematicUnit::parseJson(reply); + ASSERT_EQ("test_topic", json["topics"][0]["name"].asString()); + } + + TEST_F(test_TelematicUnit, constructPublishedDataString) + { + string eventLocation = "location"; + string testingType = "unit_test"; + string eventName = "testing"; + string topicName = "test_topic"; + Json::Value payload; + payload["timestamp"] = 1701099016033; + + auto reply = _telematicUnitPtr->constructPublishedDataString(unit, eventLocation, testingType, eventName, topicName, payload); + auto json = TelematicUnit::parseJson(reply); + ASSERT_EQ(eventLocation, json["location"].asString()); + ASSERT_EQ(testingType, json["testing_type"].asString()); + ASSERT_EQ(eventName, json["event_name"].asString()); + ASSERT_EQ(1701099016033000, json["timestamp"].asUInt64()); + ASSERT_THROW(TelematicUnit::parseJson("Invalid Json"), TelematicBridgeException); + + Json::Value payload2; + payload2["body"] = "invalid"; + reply = _telematicUnitPtr->constructPublishedDataString(unit, eventLocation, testingType, eventName, topicName, payload2); + json = TelematicUnit::parseJson(reply); + ASSERT_NEAR(duration_cast(system_clock::now().time_since_epoch()).count(), json["timestamp"].asUInt64(), 100); + } + + TEST_F(test_TelematicUnit, onCheckStatusCallback) + { + natsMsg *msg; + string data = "{\"data\":\"test\"}"; + natsMsg_Create(&msg, "test_subject", "Test_reply", data.c_str(), data.size()); + ASSERT_NO_THROW(TelematicUnit::onCheckStatusCallback(nullptr, nullptr, msg, nullptr)); + } + + TEST_F(test_TelematicUnit, onSelectedTopicsCallback) + { + natsMsg *msg; + string data = "{\"topics\":[\"test_topic\"]}"; + natsMsg_Create(&msg, "test_subject", "Test_reply", data.c_str(), data.size()); + ASSERT_NO_THROW(TelematicUnit::onSelectedTopicsCallback(nullptr, nullptr, msg, _telematicUnitPtr.get())); + ASSERT_TRUE(_telematicUnitPtr->inSelectedTopics("test_topic")); + } + + TEST_F(test_TelematicUnit, onAvailableTopicsCallback) + { + natsMsg *msg; + string data = "{\"data\":\"test\"}"; + natsMsg_Create(&msg, "test_subject", "Test_reply", data.c_str(), data.size()); + ASSERT_NO_THROW(TelematicUnit::onAvailableTopicsCallback(nullptr, nullptr, msg, _telematicUnitPtr.get())); + } + + TEST_F(test_TelematicUnit, publishMessage) + { + string topicName = "test_topic"; + Json::Value payload; + payload["body"] = "test_body"; + ASSERT_THROW(_telematicUnitPtr->publishMessage(topicName, payload), TelematicBridgeException); + } + + TEST_F(test_TelematicUnit, checkStatusReplier) + { + _telematicUnitPtr->checkStatusReplier(); + } + + TEST_F(test_TelematicUnit, selectedTopicsReplier) + { + _telematicUnitPtr->selectedTopicsReplier(); + } + + TEST_F(test_TelematicUnit, availableTopicsReplier) + { + _telematicUnitPtr->availableTopicsReplier(); + } + + TEST_F(test_TelematicUnit, registerUnitRequestor) + { + ASSERT_THROW(_telematicUnitPtr->registerUnitRequestor(), TelematicBridgeException); + } + + TEST_F(test_TelematicUnit, connect) + { + ASSERT_THROW(_telematicUnitPtr->connect("nats://127.0.0.1:4222"), TelematicBridgeException); + } + + TEST_F(test_TelematicUnit, getters) + { + ASSERT_EQ(0, _telematicUnitPtr->getAvailableTopics().size()); + ASSERT_EQ("", _telematicUnitPtr->getEventLocation()); + ASSERT_EQ("", _telematicUnitPtr->getEventName()); + ASSERT_EQ("", _telematicUnitPtr->getExcludedTopics()); + ASSERT_EQ("", _telematicUnitPtr->getTestingType()); + } + + TEST_F(test_TelematicUnit, selectedTopics) + { + string selectedTopic = "test_selected_topics"; + _telematicUnitPtr->addSelectedTopic(selectedTopic); + ASSERT_TRUE(_telematicUnitPtr->inSelectedTopics(selectedTopic)); + _telematicUnitPtr->clearSelectedTopics(); + ASSERT_FALSE(_telematicUnitPtr->inSelectedTopics(selectedTopic)); + } + TEST_F(test_TelematicUnit, validateRegisterStatus) + { + string replyStr = "{\"event_name\":\"Test\",\"location\":\"Local\",\"testing_type\":\"Integration\"}"; + _telematicUnitPtr->validateRegisterStatus(replyStr); + ASSERT_EQ("Local", _telematicUnitPtr->getEventLocation()); + ASSERT_EQ("Test", _telematicUnitPtr->getEventName()); + ASSERT_EQ("Integration", _telematicUnitPtr->getTestingType()); + } + +} \ No newline at end of file