From f49a8e17f7581d71dcb1c777cc7d2073de07ad98 Mon Sep 17 00:00:00 2001 From: dev Date: Thu, 11 Jul 2024 15:00:40 -0400 Subject: [PATCH] Updates --- .../src/SignalControllerConnection.cpp | 96 ++++++++----------- .../src/SignalControllerConnection.h | 4 + src/v2i-hub/SpatPlugin/src/SpatPlugin.cpp | 78 +++++++++------ src/v2i-hub/SpatPlugin/src/SpatPlugin.h | 3 + 4 files changed, 98 insertions(+), 83 deletions(-) diff --git a/src/v2i-hub/SpatPlugin/src/SignalControllerConnection.cpp b/src/v2i-hub/SpatPlugin/src/SignalControllerConnection.cpp index fb8cb771c..e7854ad29 100644 --- a/src/v2i-hub/SpatPlugin/src/SignalControllerConnection.cpp +++ b/src/v2i-hub/SpatPlugin/src/SignalControllerConnection.cpp @@ -8,68 +8,52 @@ namespace SpatPlugin { bool SignalControllerConnection::initializeSignalControllerConnection() { tmx::utils::snmp_response_obj resp; resp.val_int = 2; + resp.type = tmx::utils::snmp_response_obj::response_type::INTEGER; return scSNMPClient->process_snmp_request("1.3.6.1.4.1.1206.3.5.2.9.44.1.0", tmx::utils::request_type::SET, resp); }; - tmx::messages::SpatEncodedMessage SignalControllerConnection::receiveSPAT(SPAT *spat, uint64_t timeMs, const SPAT_MODE &spatMode) - { - if ( spatMode == SPAT_MODE::BINARY ) { - FILE_LOG(tmx::utils::logDEBUG) << "Receiving binary SPAT ..." << std::endl; - char buf[1000]; - auto numBytes = spatPacketReceiver->TimedReceive(buf, 1000, 1000); - auto ntcip1202 = std::make_unique(); - ntcip1202->setSignalGroupMappingList(this->signalGroupMapping); - if ( numBytes > 0 ) { - // TODO: Revist this implementation. See if we can make SPAT a shared pointer - // and skipe the SPAT to SpatMessage conversion. - FILE_LOG(tmx::utils::logDEBUG) << "Decoding binary SPAT from " << numBytes << " bytes ..." << std::endl; - ntcip1202->copyBytesIntoNtcip1202(buf, numBytes); - - - ntcip1202->ToJ2735SPAT(spat,timeMs, intersectionName, intersectionId); - FILE_LOG(tmx::utils::logDEBUG) << "Sending SPAT ..." << std::endl; - if ( tmx::utils::FILELog::ReportingLevel() >= tmx::utils::logDEBUG) { - xer_fprint(stdout, &asn_DEF_SPAT, spat); - } - tmx::messages::SpatMessage _spatMessage(spat); - tmx::messages::SpatEncodedMessage spatEncodedMsg; - spatEncodedMsg.initialize(_spatMessage); - return spatEncodedMsg; - } - else { - throw std::runtime_error("Something went wrong"); - } + void SignalControllerConnection::receiveBinarySPAT(std::shared_ptr spat, uint64_t timeMs ) { + FILE_LOG(tmx::utils::logDEBUG) << "Receiving binary SPAT ..." << std::endl; + char buf[1000]; + auto numBytes = spatPacketReceiver->TimedReceive(buf, 1000, 1000); + + if ( numBytes > 0 ) { + // Convert Binary buffer to SPAT pointer + Ntcip1202 ntcip1202; + ntcip1202.setSignalGroupMappingList(this->signalGroupMapping); + ntcip1202.copyBytesIntoNtcip1202(buf, numBytes); + ntcip1202.ToJ2735SPAT(spat.get(),timeMs, intersectionName, intersectionId); + } else { - FILE_LOG(tmx::utils::logDEBUG) << "Receiving J2725 HEX SPAT ..." << std::endl; - - tmx::messages::SpatEncodedMessage spatEncodedMsg; - auto payload = spatPacketReceiver->stringTimedReceive( 1000 ); - auto index = payload.find("Payload="); - FILE_LOG(tmx::utils::logDEBUG) << "Found Payload at index " << index << std::endl; - - if ( index != std::string::npos ) { - auto hex = payload.substr(index + 8); - hex.erase(std::remove(hex.begin(), hex.end(), '\n'), hex.end()); - hex.erase(std::remove(hex.begin(), hex.end(), ' '), hex.end()); - - FILE_LOG(tmx::utils::logDEBUG) << "Reading HEX String " << hex << std::endl; - tmx::byte_stream bytes = tmx::byte_stream_decode(hex); - - FILE_LOG(tmx::utils::logDEBUG) << "Reading Bytes " << tmx::byte_stream_encode(bytes) ; - tmx::messages::J2735MessageFactory myFactory; - auto spatEncodedMsg = dynamic_cast(myFactory.NewMessage(bytes)); - if (tmx::utils::FILELog::ReportingLevel() >= tmx::utils::logDEBUG) - { - xer_fprint(stdout, &asn_DEF_SPAT, spatEncodedMsg->decode_j2735_message().get_j2735_data().get()); - PLOG(tmx::utils::logDEBUG) << "Message is "<< spatEncodedMsg->get_payload_str(); - } - - return *spatEncodedMsg; - } - else { - throw std::runtime_error("Something went wrong"); + throw tmx::utils::UdpServerRuntimeError("UDP Server error occured or socket time out."); + } + } + + void SignalControllerConnection::receiveUPERSPAT(std::shared_ptr spatEncoded_ptr) { + FILE_LOG(tmx::utils::logDEBUG) << "Receiving J2725 HEX SPAT ..." << std::endl; + auto payload = spatPacketReceiver->stringTimedReceive( 1000 ); + auto index = payload.find("Payload="); + if ( index != std::string::npos ) { + // Retreive hex string payload + auto hex = payload.substr(index + 8); + // Remove new lines and empty space + hex.erase(std::remove(hex.begin(), hex.end(), '\n'), hex.end()); + hex.erase(std::remove(hex.begin(), hex.end(), ' '), hex.end()); + FILE_LOG(tmx::utils::logDEBUG) << "Reading HEX String " << hex << std::endl; + // Convert to byte stream + tmx::byte_stream bytes = tmx::byte_stream_decode(hex); + // Read SpateEncodedMessage from bytes + tmx::messages::J2735MessageFactory myFactory; + spatEncoded_ptr.reset(dynamic_cast(myFactory.NewMessage(bytes))); + if (tmx::utils::FILELog::ReportingLevel() >= tmx::utils::logDEBUG) + { + xer_fprint(stdout, &asn_DEF_SPAT, spatEncoded_ptr->decode_j2735_message().get_j2735_data().get()); } } + else { + throw tmx::TmxException("Could not find UPER Payload in received SPAT UDP Packet!"); + } } + } \ No newline at end of file diff --git a/src/v2i-hub/SpatPlugin/src/SignalControllerConnection.h b/src/v2i-hub/SpatPlugin/src/SignalControllerConnection.h index 037cdcda1..0da434fba 100644 --- a/src/v2i-hub/SpatPlugin/src/SignalControllerConnection.h +++ b/src/v2i-hub/SpatPlugin/src/SignalControllerConnection.h @@ -7,6 +7,7 @@ #include #include #include +#include namespace SpatPlugin { enum class SPAT_MODE @@ -32,5 +33,8 @@ namespace SpatPlugin { bool initializeSignalControllerConnection(); tmx::messages::SpatEncodedMessage receiveSPAT(SPAT *spat, uint64_t timeMs , const SPAT_MODE &spat_mode = SPAT_MODE::BINARY); + void receiveBinarySPAT(std::shared_ptr spat, uint64_t timeMs); + + void receiveUPERSPAT(std::shared_ptr spatEncoded_ptr); }; } \ No newline at end of file diff --git a/src/v2i-hub/SpatPlugin/src/SpatPlugin.cpp b/src/v2i-hub/SpatPlugin/src/SpatPlugin.cpp index 052879e03..0c0676be2 100644 --- a/src/v2i-hub/SpatPlugin/src/SpatPlugin.cpp +++ b/src/v2i-hub/SpatPlugin/src/SpatPlugin.cpp @@ -47,16 +47,32 @@ namespace SpatPlugin { auto connected = scConnection->initializeSignalControllerConnection(); if ( connected ) { SetStatus(keyConnectionStatus, "IDLE"); - - spatReceiverThread->AddPeriodicTick([this]() - { - this->processSpat(); - }, // end of lambda expression - std::chrono::milliseconds(5)); - PluginClientClockAware::getClock()->wait_for_initialization(); - spatReceiverThread->Start(); + try { + spatReceiverThread->AddPeriodicTick([this]() + { + this->processSpat(); + if (!this->isConnected) { + SetStatus(keyConnectionStatus, "CONNECTED"); + this->isConnected = true; + } + }, // end of lambda expression + std::chrono::milliseconds(5) + ); + PluginClientClockAware::getClock()->wait_for_initialization(); + spatReceiverThread->Start(); + } + catch (const TmxException &e) { + PLOG(tmx::utils::logERROR) << "Encountered error " << e.what() << " during SPAT Processing." << std::endl + << e.GetBacktrace(); + SetStatus(keyConnectionStatus, "ERROR"); + this->isConnected = false; + + } } else { + PLOG(tmx::utils::logERROR) << "Traffic Signal Controller at " << scIp << ":" << scSNMPPort << " failed!"; + SetStatus(keyConnectionStatus, "DISCONNECTED"); + this->isConnected = false; } } @@ -65,27 +81,35 @@ namespace SpatPlugin { void SpatPlugin::processSpat() { if (this->scConnection ) { PLOG(tmx::utils::logDEBUG) << "Processing SPAT ... " << std::endl; - SPAT_MODE mode; - if (spatMode == "BINARY") - { - mode = SPAT_MODE::BINARY; - } - else if (spatMode == "J2735_HEX") { - mode = SPAT_MODE::J2735_HEX; + try { + if (spatMode == "BINARY") + { + auto spat_ptr = std::make_shared(); + scConnection->receiveBinarySPAT(spat_ptr, PluginClientClockAware::getClock()->nowInMilliseconds()); + tmx::messages::SpatMessage _spatMessage(spat_ptr); + tmx::messages::SpatEncodedMessage spatEncodedMsg; + spatEncodedMsg.initialize(_spatMessage,"", 0U, IvpMsgFlags_RouteDSRC); + spatEncodedMsg.addDsrcMetadata(tmx::messages::api::msgPSID::signalPhaseAndTimingMessage_PSID); + + PLOG(tmx::utils::logDEBUG) << "Broadcasting SPAT" << std::endl; + BroadcastMessage(static_cast(spatEncodedMsg)); + } + else if (spatMode == "J2735_HEX") { + auto spatEncoded_ptr = std::make_shared(); + scConnection->receiveUPERSPAT(spatEncoded_ptr); + spatEncoded_ptr->set_flags(IvpMsgFlags_RouteDSRC); + spatEncoded_ptr->addDsrcMetadata(tmx::messages::api::msgPSID::signalPhaseAndTimingMessage_PSID); + auto rMsg = dynamic_cast(spatEncoded_ptr.get()); + BroadcastMessage(*rMsg); + } + else { + throw TmxException("SPAT Mode " + spatMode + " is not supported. Support SPAT Modes are J2735_HEX and BINARY."); + } } - else { - PLOG(tmx::utils::logWARNING) << "SPAT Mode " << spatMode << " is unrecognized. Defaulting to BINARY." << std::endl; + catch (const tmx::J2735Exception &e) { + PLOG(tmx::utils::logERROR) << "Encountered J2735 Exception " << e.what() << " attempting to process SPAT." << std::endl + << e.GetBacktrace(); } - SPAT *spat_ptr = (SPAT *) calloc(1, sizeof(SPAT)); - auto spatMessage = scConnection->receiveSPAT(spat_ptr, PluginClientClockAware::getClock()->nowInMilliseconds(), mode); - - spatMessage.set_flags(IvpMsgFlags_RouteDSRC); - spatMessage.addDsrcMetadata(tmx::messages::api::msgPSID::signalPhaseAndTimingMessage_PSID); - - BroadcastMessage(static_cast(spatMessage)); - PLOG(tmx::utils::logDEBUG) << "Broadcasting SPAT" << std::endl; - - SetStatus(keyConnectionStatus, "CONNECTED"); } } void SpatPlugin::OnConfigChanged(const char *key, const char *value) { diff --git a/src/v2i-hub/SpatPlugin/src/SpatPlugin.h b/src/v2i-hub/SpatPlugin/src/SpatPlugin.h index 802fa331f..a2e618e08 100644 --- a/src/v2i-hub/SpatPlugin/src/SpatPlugin.h +++ b/src/v2i-hub/SpatPlugin/src/SpatPlugin.h @@ -49,6 +49,9 @@ class SpatPlugin: public tmx::utils::PluginClientClockAware { const char* keyConnectionStatus = "Connection Status"; + const char* keySkippedMessages = "Skipped Messages"; + + bool isConnected = false; void processSpat(); };