Skip to content

Commit

Permalink
merge conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
dan-du-car committed Jun 30, 2023
2 parents 4052ee3 + 4528b26 commit 82ada6f
Show file tree
Hide file tree
Showing 8 changed files with 401 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/v2i-hub/CARMAStreetsPlugin/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ TARGET_LINK_LIBRARIES (${PROJECT_NAME} tmxutils rdkafka++ jsoncpp)
#############
enable_testing()
include_directories(${PROJECT_SOURCE_DIR}/src)
add_library(${PROJECT_NAME}_lib src/J2735MapToJsonConverter.cpp src/JsonToJ2735SpatConverter.cpp src/J2735ToSRMJsonConverter.cpp)
add_library(${PROJECT_NAME}_lib src/J2735MapToJsonConverter.cpp src/JsonToJ2735SSMConverter.cpp src/JsonToJ2735SpatConverter.cpp src/J2735ToSRMJsonConverter.cpp)
target_link_libraries(${PROJECT_NAME}_lib PUBLIC ${TMXAPI_LIBRARIES}
${ASN_J2735_LIBRARIES}
${MYSQL_LIBRARIES}
Expand Down
10 changes: 10 additions & 0 deletions src/v2i-hub/CARMAStreetsPlugin/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@
"default": "modified_spat",
"description": "Apache Kafka topic plugin will transmit message to."
},
{
"key": "SsmTopic",
"default": "v2xhub_ssm_sub",
"description": "Apache Kafka topic plugin will transmit message to."
},
{
"key": "SsmConsumerGroupId",
"default": "v2xhub_ssm",
"description": "Apache Kafka consumer group ID for spat consumer."
},
{
"key": "SpatConsumerGroupId",
"default": "v2xhub_spat",
Expand Down
60 changes: 58 additions & 2 deletions src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ void CARMAStreetsPlugin::UpdateConfigSettings() {
GetConfigValue<string>("SchedulingPlanTopic", _subscribeToSchedulingPlanTopic);
GetConfigValue<string>("SchedulingPlanConsumerGroupId", _subscribeToSchedulingPlanConsumerGroupId);
GetConfigValue<string>("SpatTopic", _subscribeToSpatTopic);
GetConfigValue<string>("SsmTopic", _subscribeToSsmTopic);
GetConfigValue<string>("SpatConsumerGroupId", _subscribeToSpatConsumerGroupId);
GetConfigValue<string>("SsmConsumerGroupId", _subscribeToSSMConsumerGroupId);
GetConfigValue<string>("BsmTopic", _transmitBSMTopic);
GetConfigValue<string>("MobilityOperationTopic", _transmitMobilityOperationTopic);
GetConfigValue<string>("MobilityPathTopic", _transmitMobilityPathTopic);
Expand Down Expand Up @@ -97,19 +99,21 @@ void CARMAStreetsPlugin::InitKafkaConsumerProducers()
PLOG(logERROR) << "Kafka INFO:" << kafkaConnectString<<_subscribeToSpatTopic<<_subscribeToSpatConsumerGroupId;
_spat_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSpatTopic,_subscribeToSpatConsumerGroupId);
_scheduing_plan_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSchedulingPlanTopic,_subscribeToSchedulingPlanConsumerGroupId);
if(!_scheduing_plan_kafka_consumer_ptr || !_spat_kafka_consumer_ptr)
_ssm_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSsmTopic,_subscribeToSSMConsumerGroupId);
if(!_scheduing_plan_kafka_consumer_ptr || !_spat_kafka_consumer_ptr || !_ssm_kafka_consumer_ptr)
{
PLOG(logERROR) <<"Failed to create Kafka consumers.";
return;
}
PLOG(logDEBUG) <<"Kafka consumers created";
if(!_spat_kafka_consumer_ptr->init() || !_scheduing_plan_kafka_consumer_ptr->init())
if(!_spat_kafka_consumer_ptr->init() || !_scheduing_plan_kafka_consumer_ptr->init() || !_ssm_kafka_consumer_ptr->init())
{
PLOG(logERROR) <<"Kafka consumers init() failed!";
}

thread_schpl = new std::thread(&CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic, this);
thread_spat = new std::thread(&CARMAStreetsPlugin::SubscribeSpatKafkaTopic, this);
thread_ssm = new std::thread(&CARMAStreetsPlugin::SubscribeSSMKafkaTopic, this);
}

void CARMAStreetsPlugin::OnConfigChanged(const char *key, const char *value) {
Expand Down Expand Up @@ -610,6 +614,58 @@ void CARMAStreetsPlugin::SubscribeSpatKafkaTopic(){
}
}

void CARMAStreetsPlugin::SubscribeSSMKafkaTopic(){

if(_subscribeToSsmTopic.length() > 0)
{
PLOG(logDEBUG) << "SubscribeSSMKafkaTopics:" <<_subscribeToSsmTopic << std::endl;
_ssm_kafka_consumer_ptr->subscribe();
//Initialize Json to J2735 SSM convertor
JsonToJ2735SSMConverter ssm_convertor;
while (_ssm_kafka_consumer_ptr->is_running())
{
auto payload_str = _ssm_kafka_consumer_ptr->consume(500);
if(strlen(payload_str) > 0)
{
PLOG(logDEBUG) << "consumed message payload: " << payload_str <<std::endl;
Json::Value ssmDoc;
auto parse_sucessful = ssm_convertor.parseJsonString(payload_str, ssmDoc);
if( !parse_sucessful )
{
PLOG(logERROR) << "Error parsing payload: " << payload_str << std::endl;
SetStatus<uint>(Key_SSMMessageSkipped, ++_ssmMessageSkipped);
continue;
}
//Convert the SSM JSON string into J2735 SSM message and encode it.
auto ssm_ptr = std::make_shared<SignalStatusMessage>();
ssm_convertor.toJ2735SSM(ssmDoc, ssm_ptr);
tmx::messages::SsmEncodedMessage ssmEncodedMsg;
try
{
ssm_convertor.encodeSSM(ssm_ptr, ssmEncodedMsg);
}
catch (TmxException &ex)
{
// Skip messages that fail to encode.
PLOG(logERROR) << "Failed to encoded SSM message : \n" << payload_str << std::endl << "Exception encountered: "
<< ex.what() << std::endl;
ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SignalStatusMessage, ssm_ptr.get());
SetStatus<uint>(Key_SSMMessageSkipped, ++_ssmMessageSkipped);
continue;
}

ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SignalStatusMessage, ssm_ptr.get());
PLOG(logDEBUG) << "ssmEncodedMsg: " << ssmEncodedMsg;

//Broadcast the encoded SSM message
ssmEncodedMsg.set_flags(IvpMsgFlags_RouteDSRC);
ssmEncodedMsg.addDsrcMetadata(0x8002);
BroadcastMessage(static_cast<routeable_message &>(ssmEncodedMsg));
}
}
}

}
bool CARMAStreetsPlugin::getEncodedtsm3( tsm3EncodedMessage *tsm3EncodedMsg, Json::Value metadata, Json::Value payload_json )
{
try
Expand Down
26 changes: 24 additions & 2 deletions src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <boost/uuid/uuid_io.hpp>
#include <kafka/kafka_client.h>
#include <kafka/kafka_consumer_worker.h>
#include "JsonToJ2735SSMConverter.h"




Expand Down Expand Up @@ -67,6 +69,10 @@ class CARMAStreetsPlugin: public PluginClient {
* @brief Subcribe to SPAT Kafka topic created by carma-streets
*/
void SubscribeSpatKafkaTopic();
/**
* @brief Subcribe to SSM Kafka topic created by carma-streets
*/
void SubscribeSSMKafkaTopic();

bool getEncodedtsm3(tsm3EncodedMessage *tsm3EncodedMsg, Json::Value metadata, Json::Value payload_json);
/**
Expand All @@ -88,7 +94,9 @@ class CARMAStreetsPlugin: public PluginClient {
std::string _subscribeToSchedulingPlanTopic;
std::string _subscribeToSchedulingPlanConsumerGroupId;
std::string _subscribeToSpatTopic;
std::string _subscribeToSsmTopic;
std::string _subscribeToSpatConsumerGroupId;
std::string _subscribeToSSMConsumerGroupId;
std::string _transmitMobilityPathTopic;
std::string _transmitBSMTopic;
std::string _transmitMAPTopic;
Expand All @@ -99,11 +107,14 @@ class CARMAStreetsPlugin: public PluginClient {
RdKafka::Producer *kafka_producer;
std::shared_ptr<kafka_consumer_worker> _spat_kafka_consumer_ptr;
std::shared_ptr<kafka_consumer_worker> _scheduing_plan_kafka_consumer_ptr;
std::shared_ptr<kafka_consumer_worker> _ssm_kafka_consumer_ptr;
RdKafka::Producer *kafka_producer;
std::vector<std::string> _strategies;
tmx::messages::tsm3Message *_tsm3Message{NULL};
std::mutex data_lock;
std::thread* thread_schpl;
std::thread* thread_spat;
std::thread* thread_ssm;

/**
* @brief Status label for SPAT messages skipped due to errors.
Expand Down Expand Up @@ -137,6 +148,7 @@ class CARMAStreetsPlugin: public PluginClient {
/**
* @brief Status label for Mobility Operation messages skipped due to errors.
*/

const char* Key_MobilityOperationMessageSkipped = "Mobility Operation messages skipped due to errors.";

/**
Expand All @@ -163,7 +175,17 @@ class CARMAStreetsPlugin: public PluginClient {
* @brief Count for BSM messages skipped due to errors.
*/
uint _bsmMessageSkipped = 0;


/**
* @brief Status label for SSM messages skipped due to errors.
*/
const char* Key_SSMMessageSkipped = "SSM messages skipped due to errors.";

/**
* @brief Count for SSM messages skipped due to errors.
*/
uint _ssmMessageSkipped = 0;

/**
* @brief Intersection Id for intersection
*/
Expand All @@ -180,4 +202,4 @@ class CARMAStreetsPlugin: public PluginClient {
std::mutex _cfgLock;

}
#endif
#endif
156 changes: 156 additions & 0 deletions src/v2i-hub/CARMAStreetsPlugin/src/JsonToJ2735SSMConverter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
#include "JsonToJ2735SSMConverter.h"

namespace CARMAStreetsPlugin
{

bool JsonToJ2735SSMConverter::parseJsonString(const string &consumedMsg, Json::Value &ssmDoc) const
{
const auto jsonLen = static_cast<int>(consumedMsg.length());
Json::CharReaderBuilder builder;
JSONCPP_STRING err;
const std::unique_ptr<Json::CharReader> reader(builder.newCharReader());
bool parseResult = reader->parse(consumedMsg.c_str(), consumedMsg.c_str() + jsonLen, &ssmDoc, &err);
if (!parseResult)
{
PLOG(logERROR) << "Parse error: " << err << endl;
}
return parseResult;
}

void JsonToJ2735SSMConverter::toJ2735SSM(const Json::Value &ssmDoc, std::shared_ptr<SignalStatusMessage> ssmPtr) const
{
try
{
ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SignalStatusMessage, ssmPtr.get());
if (!ssmDoc.isMember("SignalStatus"))
{
PLOG(logERROR) << "No SignalStatus present in JSON." << std::endl;
return;
}

// populate SignalStatusMessage::second
if (ssmDoc["SignalStatus"].isMember("msOfMinute") && ssmDoc["SignalStatus"]["msOfMinute"].isNumeric())
{
ssmPtr->second = ssmDoc["SignalStatus"]["msOfMinute"].asInt64();
}

// populate SignalStatusMessage::timstamp
if (ssmDoc["SignalStatus"].isMember("minuteOfYear") && ssmDoc["SignalStatus"]["minuteOfYear"].isNumeric())
{
MinuteOfTheYear_t *timeStamp = (MinuteOfTheYear_t *)calloc(1, sizeof(MinuteOfTheYear_t));
*timeStamp = ssmDoc["SignalStatus"]["minuteOfYear"].asInt64();
ssmPtr->timeStamp = timeStamp;
}

SignalStatusList_t *statusPtr = (SignalStatusList_t *)calloc(1, sizeof(SignalStatusList_t));
SignalStatus *signalStatus = (SignalStatus *)calloc(1, sizeof(SignalStatus));

// populate SignalStatusMessage::status::id
if (ssmDoc["SignalStatus"].isMember("intersectionID") && ssmDoc["SignalStatus"]["intersectionID"].isNumeric())
{
signalStatus->id.id = ssmDoc["SignalStatus"]["intersectionID"].asInt64();
}

// populate SignalStatusMessage::status::sequenceNumber
if (ssmDoc["SignalStatus"].isMember("sequenceNumber") && ssmDoc["SignalStatus"]["sequenceNumber"].isNumeric())
{
signalStatus->sequenceNumber = ssmDoc["SignalStatus"]["sequenceNumber"].asInt64();
}

// populate SignalStatusMessage::status::sigStatus
if (ssmDoc["SignalStatus"].isMember("requestorInfo") && ssmDoc["SignalStatus"]["requestorInfo"].isArray())
{
Json::Value requesterJsonArr = ssmDoc["SignalStatus"]["requestorInfo"];
for (auto itr = requesterJsonArr.begin(); itr != requesterJsonArr.end(); itr++)
{
SignalStatusPackage *signalStatusPackage = (SignalStatusPackage *)calloc(1, sizeof(SignalStatusPackage));
populateSigStatusPackage(signalStatusPackage, itr);
asn_sequence_add(&signalStatus->sigStatus.list.array, signalStatusPackage);
} // Populate signal status package
}

asn_sequence_add(&statusPtr->list.array, signalStatus);
ssmPtr->status = *statusPtr;
}
catch(exception &ex)
{
PLOG(logERROR) << "Cannot read JSON file." << std::endl;
}
}

void JsonToJ2735SSMConverter::populateSigStatusPackage(SignalStatusPackage *signalStatusPackage, Json::Value::iterator itr) const
{
signalStatusPackage->requester = (SignalRequesterInfo *)calloc(1, sizeof(SignalRequesterInfo));

// populate SignalStatusMessage::status::sigStatus::requester::request
if (itr->isMember("requestID") && (*itr)["requestID"].isNumeric())
{
signalStatusPackage->requester->request = (*itr)["requestID"].asInt64();
}

// populate SignalStatusMessage::status::sigStatus::requester::id
if (itr->isMember("vehicleID") && (*itr)["vehicleID"].isNumeric())
{
signalStatusPackage->requester->id.choice.stationID = (*itr)["vehicleID"].asInt64();
signalStatusPackage->requester->id.present = VehicleID_PR_stationID;
}

// populate SignalStatusMessage::status::sigStatus::requester::sequenceNumber
if (itr->isMember("msgCount") && (*itr)["msgCount"].isNumeric())
{
signalStatusPackage->requester->sequenceNumber = (*itr)["msgCount"].asInt64();
}

// populate SignalStatusMessage::status::sigStatus::requester::role
if (itr->isMember("basicVehicleRole") && (*itr)["basicVehicleRole"].isNumeric())
{
signalStatusPackage->requester->role = (BasicVehicleRole_t *)calloc(1, sizeof(BasicVehicleRole_t));
*signalStatusPackage->requester->role = (*itr)["basicVehicleRole"].asInt64();
}

// populate SignalStatusMessage::status::sigStatus::inboundOn
if (itr->isMember("inBoundLaneID") && (*itr)["inBoundLaneID"].isNumeric())
{
signalStatusPackage->inboundOn.present = IntersectionAccessPoint_PR_lane;
signalStatusPackage->inboundOn.choice.lane = (*itr)["inBoundLaneID"].asInt64();
}
else if (itr->isMember("inBoundApproachID") && (*itr)["inBoundApproachID"].isNumeric())
{
signalStatusPackage->inboundOn.present = IntersectionAccessPoint_PR_approach;
signalStatusPackage->inboundOn.choice.approach = (*itr)["inBoundApproachID"].asInt64();
}

// populate SignalStatusMessage::status::sigStatus::status
if (itr->isMember("priorityRequestStatus") && (*itr)["priorityRequestStatus"].isNumeric())
{
signalStatusPackage->status = (*itr)["priorityRequestStatus"].asInt64();
}

// populate SignalStatusMessage::status::sigStatus::duration
if (itr->isMember("ETA_Duration") && (*itr)["ETA_Duration"].isNumeric())
{
signalStatusPackage->duration = (DSecond_t *)calloc(1, sizeof(DSecond_t));
*signalStatusPackage->duration = (*itr)["ETA_Duration"].asInt64();
}

// populate SignalStatusMessage::status::sigStatus::minute
if (itr->isMember("ETA_Minute") && (*itr)["ETA_Minute"].isNumeric())
{
signalStatusPackage->minute = (DSecond_t *)calloc(1, sizeof(DSecond_t));
*signalStatusPackage->minute = (*itr)["ETA_Minute"].asInt64();
}

// populate SignalStatusMessage::status::sigStatus::second
if (itr->isMember("ETA_Second") && (*itr)["ETA_Second"].isNumeric())
{
signalStatusPackage->minute = (DSecond_t *)calloc(1, sizeof(DSecond_t));
*signalStatusPackage->minute = (*itr)["ETA_Second"].asInt64();
}
}
void JsonToJ2735SSMConverter::encodeSSM(const std::shared_ptr<SignalStatusMessage> &ssmPtr, tmx::messages::SsmEncodedMessage &encodedSSM) const
{
tmx::messages::MessageFrameMessage frame(ssmPtr);
encodedSSM.set_data(tmx::messages::TmxJ2735EncodedMessage<SignalStatusMessage>::encode_j2735_message<tmx::messages::codec::uper<tmx::messages::MessageFrameMessage>>(frame));
free(frame.get_j2735_data().get());
}
}
Loading

0 comments on commit 82ada6f

Please sign in to comment.