Skip to content

Commit

Permalink
Updates
Browse files Browse the repository at this point in the history
  • Loading branch information
paulbourelly999 committed Jul 11, 2024
1 parent 1173dbb commit f49a8e1
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 83 deletions.
96 changes: 40 additions & 56 deletions src/v2i-hub/SpatPlugin/src/SignalControllerConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,68 +8,52 @@ namespace SpatPlugin {
bool SignalControllerConnection::initializeSignalControllerConnection() {
tmx::utils::snmp_response_obj resp;
resp.val_int = 2;
resp.type = tmx::utils::snmp_response_obj::response_type::INTEGER;
return scSNMPClient->process_snmp_request("1.3.6.1.4.1.1206.3.5.2.9.44.1.0", tmx::utils::request_type::SET, resp);
};
tmx::messages::SpatEncodedMessage SignalControllerConnection::receiveSPAT(SPAT *spat, uint64_t timeMs, const SPAT_MODE &spatMode)
{
if ( spatMode == SPAT_MODE::BINARY ) {
FILE_LOG(tmx::utils::logDEBUG) << "Receiving binary SPAT ..." << std::endl;
char buf[1000];
auto numBytes = spatPacketReceiver->TimedReceive(buf, 1000, 1000);
auto ntcip1202 = std::make_unique<Ntcip1202>();
ntcip1202->setSignalGroupMappingList(this->signalGroupMapping);

if ( numBytes > 0 ) {
// TODO: Revist this implementation. See if we can make SPAT a shared pointer
// and skipe the SPAT to SpatMessage conversion.
FILE_LOG(tmx::utils::logDEBUG) << "Decoding binary SPAT from " << numBytes << " bytes ..." << std::endl;
ntcip1202->copyBytesIntoNtcip1202(buf, numBytes);


ntcip1202->ToJ2735SPAT(spat,timeMs, intersectionName, intersectionId);
FILE_LOG(tmx::utils::logDEBUG) << "Sending SPAT ..." << std::endl;
if ( tmx::utils::FILELog::ReportingLevel() >= tmx::utils::logDEBUG) {
xer_fprint(stdout, &asn_DEF_SPAT, spat);
}
tmx::messages::SpatMessage _spatMessage(spat);
tmx::messages::SpatEncodedMessage spatEncodedMsg;
spatEncodedMsg.initialize(_spatMessage);
return spatEncodedMsg;
}
else {
throw std::runtime_error("Something went wrong");
}
void SignalControllerConnection::receiveBinarySPAT(std::shared_ptr<SPAT> spat, uint64_t timeMs ) {
FILE_LOG(tmx::utils::logDEBUG) << "Receiving binary SPAT ..." << std::endl;
char buf[1000];
auto numBytes = spatPacketReceiver->TimedReceive(buf, 1000, 1000);

if ( numBytes > 0 ) {
// Convert Binary buffer to SPAT pointer
Ntcip1202 ntcip1202;
ntcip1202.setSignalGroupMappingList(this->signalGroupMapping);
ntcip1202.copyBytesIntoNtcip1202(buf, numBytes);
ntcip1202.ToJ2735SPAT(spat.get(),timeMs, intersectionName, intersectionId);

}
else {
FILE_LOG(tmx::utils::logDEBUG) << "Receiving J2725 HEX SPAT ..." << std::endl;

tmx::messages::SpatEncodedMessage spatEncodedMsg;
auto payload = spatPacketReceiver->stringTimedReceive( 1000 );
auto index = payload.find("Payload=");
FILE_LOG(tmx::utils::logDEBUG) << "Found Payload at index " << index << std::endl;

if ( index != std::string::npos ) {
auto hex = payload.substr(index + 8);
hex.erase(std::remove(hex.begin(), hex.end(), '\n'), hex.end());
hex.erase(std::remove(hex.begin(), hex.end(), ' '), hex.end());

FILE_LOG(tmx::utils::logDEBUG) << "Reading HEX String " << hex << std::endl;
tmx::byte_stream bytes = tmx::byte_stream_decode(hex);

FILE_LOG(tmx::utils::logDEBUG) << "Reading Bytes " << tmx::byte_stream_encode(bytes) ;
tmx::messages::J2735MessageFactory myFactory;
auto spatEncodedMsg = dynamic_cast<tmx::messages::SpatEncodedMessage*>(myFactory.NewMessage(bytes));
if (tmx::utils::FILELog::ReportingLevel() >= tmx::utils::logDEBUG)
{
xer_fprint(stdout, &asn_DEF_SPAT, spatEncodedMsg->decode_j2735_message().get_j2735_data().get());
PLOG(tmx::utils::logDEBUG) << "Message is "<< spatEncodedMsg->get_payload_str();
}

return *spatEncodedMsg;
}
else {
throw std::runtime_error("Something went wrong");
throw tmx::utils::UdpServerRuntimeError("UDP Server error occured or socket time out.");
}
}

void SignalControllerConnection::receiveUPERSPAT(std::shared_ptr<tmx::messages::SpatEncodedMessage> spatEncoded_ptr) {
FILE_LOG(tmx::utils::logDEBUG) << "Receiving J2725 HEX SPAT ..." << std::endl;
auto payload = spatPacketReceiver->stringTimedReceive( 1000 );
auto index = payload.find("Payload=");
if ( index != std::string::npos ) {
// Retreive hex string payload
auto hex = payload.substr(index + 8);
// Remove new lines and empty space
hex.erase(std::remove(hex.begin(), hex.end(), '\n'), hex.end());
hex.erase(std::remove(hex.begin(), hex.end(), ' '), hex.end());
FILE_LOG(tmx::utils::logDEBUG) << "Reading HEX String " << hex << std::endl;
// Convert to byte stream
tmx::byte_stream bytes = tmx::byte_stream_decode(hex);
// Read SpateEncodedMessage from bytes
tmx::messages::J2735MessageFactory myFactory;
spatEncoded_ptr.reset(dynamic_cast<tmx::messages::SpatEncodedMessage*>(myFactory.NewMessage(bytes)));
if (tmx::utils::FILELog::ReportingLevel() >= tmx::utils::logDEBUG)
{
xer_fprint(stdout, &asn_DEF_SPAT, spatEncoded_ptr->decode_j2735_message().get_j2735_data().get());
}
}
else {
throw tmx::TmxException("Could not find UPER Payload in received SPAT UDP Packet!");
}
}

}
4 changes: 4 additions & 0 deletions src/v2i-hub/SpatPlugin/src/SignalControllerConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <PluginLog.h>
#include <tmx/messages/byte_stream.hpp>
#include <tmx/j2735_messages/J2735MessageFactory.hpp>
#include <tmx/TmxException.hpp>

namespace SpatPlugin {
enum class SPAT_MODE
Expand All @@ -32,5 +33,8 @@ namespace SpatPlugin {
bool initializeSignalControllerConnection();
tmx::messages::SpatEncodedMessage receiveSPAT(SPAT *spat, uint64_t timeMs , const SPAT_MODE &spat_mode = SPAT_MODE::BINARY);

void receiveBinarySPAT(std::shared_ptr<SPAT> spat, uint64_t timeMs);

void receiveUPERSPAT(std::shared_ptr<tmx::messages::SpatEncodedMessage> spatEncoded_ptr);
};
}
78 changes: 51 additions & 27 deletions src/v2i-hub/SpatPlugin/src/SpatPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,32 @@ namespace SpatPlugin {
auto connected = scConnection->initializeSignalControllerConnection();
if ( connected ) {
SetStatus(keyConnectionStatus, "IDLE");

spatReceiverThread->AddPeriodicTick([this]()
{
this->processSpat();
}, // end of lambda expression
std::chrono::milliseconds(5));
PluginClientClockAware::getClock()->wait_for_initialization();
spatReceiverThread->Start();
try {
spatReceiverThread->AddPeriodicTick([this]()
{
this->processSpat();
if (!this->isConnected) {
SetStatus(keyConnectionStatus, "CONNECTED");
this->isConnected = true;
}
}, // end of lambda expression
std::chrono::milliseconds(5)
);
PluginClientClockAware::getClock()->wait_for_initialization();
spatReceiverThread->Start();
}
catch (const TmxException &e) {
PLOG(tmx::utils::logERROR) << "Encountered error " << e.what() << " during SPAT Processing." << std::endl
<< e.GetBacktrace();
SetStatus(keyConnectionStatus, "ERROR");
this->isConnected = false;

}
}
else {
PLOG(tmx::utils::logERROR) << "Traffic Signal Controller at " << scIp << ":" << scSNMPPort << " failed!";
SetStatus(keyConnectionStatus, "DISCONNECTED");
this->isConnected = false;

}
}
Expand All @@ -65,27 +81,35 @@ namespace SpatPlugin {
void SpatPlugin::processSpat() {
if (this->scConnection ) {
PLOG(tmx::utils::logDEBUG) << "Processing SPAT ... " << std::endl;
SPAT_MODE mode;
if (spatMode == "BINARY")
{
mode = SPAT_MODE::BINARY;
}
else if (spatMode == "J2735_HEX") {
mode = SPAT_MODE::J2735_HEX;
try {
if (spatMode == "BINARY")
{
auto spat_ptr = std::make_shared<SPAT>();
scConnection->receiveBinarySPAT(spat_ptr, PluginClientClockAware::getClock()->nowInMilliseconds());
tmx::messages::SpatMessage _spatMessage(spat_ptr);
tmx::messages::SpatEncodedMessage spatEncodedMsg;
spatEncodedMsg.initialize(_spatMessage,"", 0U, IvpMsgFlags_RouteDSRC);
spatEncodedMsg.addDsrcMetadata(tmx::messages::api::msgPSID::signalPhaseAndTimingMessage_PSID);

PLOG(tmx::utils::logDEBUG) << "Broadcasting SPAT" << std::endl;
BroadcastMessage(static_cast<routeable_message>(spatEncodedMsg));
}
else if (spatMode == "J2735_HEX") {
auto spatEncoded_ptr = std::make_shared<tmx::messages::SpatEncodedMessage>();
scConnection->receiveUPERSPAT(spatEncoded_ptr);
spatEncoded_ptr->set_flags(IvpMsgFlags_RouteDSRC);
spatEncoded_ptr->addDsrcMetadata(tmx::messages::api::msgPSID::signalPhaseAndTimingMessage_PSID);
auto rMsg = dynamic_cast<routeable_message *>(spatEncoded_ptr.get());
BroadcastMessage(*rMsg);
}
else {
throw TmxException("SPAT Mode " + spatMode + " is not supported. Support SPAT Modes are J2735_HEX and BINARY.");
}
}
else {
PLOG(tmx::utils::logWARNING) << "SPAT Mode " << spatMode << " is unrecognized. Defaulting to BINARY." << std::endl;
catch (const tmx::J2735Exception &e) {
PLOG(tmx::utils::logERROR) << "Encountered J2735 Exception " << e.what() << " attempting to process SPAT." << std::endl
<< e.GetBacktrace();
}
SPAT *spat_ptr = (SPAT *) calloc(1, sizeof(SPAT));
auto spatMessage = scConnection->receiveSPAT(spat_ptr, PluginClientClockAware::getClock()->nowInMilliseconds(), mode);

spatMessage.set_flags(IvpMsgFlags_RouteDSRC);
spatMessage.addDsrcMetadata(tmx::messages::api::msgPSID::signalPhaseAndTimingMessage_PSID);

BroadcastMessage(static_cast<routeable_message &>(spatMessage));
PLOG(tmx::utils::logDEBUG) << "Broadcasting SPAT" << std::endl;

SetStatus(keyConnectionStatus, "CONNECTED");
}
}
void SpatPlugin::OnConfigChanged(const char *key, const char *value) {
Expand Down
3 changes: 3 additions & 0 deletions src/v2i-hub/SpatPlugin/src/SpatPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ class SpatPlugin: public tmx::utils::PluginClientClockAware {

const char* keyConnectionStatus = "Connection Status";

const char* keySkippedMessages = "Skipped Messages";

bool isConnected = false;

void processSpat();
};
Expand Down

0 comments on commit f49a8e1

Please sign in to comment.