From 842aa5b2689a54f69cb0face169f4b357d6fbdf1 Mon Sep 17 00:00:00 2001 From: dan-du-car <62157949+dan-du-car@users.noreply.github.com> Date: Wed, 24 May 2023 13:41:22 -0400 Subject: [PATCH 1/2] V2XHub / CARMA Streets to process and broadcast SSM (#533) # PR Details ## Description For the POC TIM/TSP use case, priority-eligible vehicles will broadcast J2375 Signal Request Message (SRM) to the MMITSS Roadside Processor (MRP), which in turn will send J2375 Signal Status Message (SSM) in acknowledgement. Since MRP is being integrated with CARMA Streets, CARMA Streets / V2XHub needs to #1) consume the json SSM (used by the MMITSS MRP internally, see [here](https://github.com/mmitss/mmitss-az/blob/master/src/mrp/priority-request-server/Readme.md) and [here](https://github.com/mmitss/mmitss-az/blob/master/src/mrp/priority-request-server/SSM.json)) sent on the CARMA Streets Kafka broker, #2) encode it according to the J2375 [ASN.1 ](https://leidoscorpus.sharepoint.us/:f:/s/STR/EpU-cLOWhUtGvsddHUSf8I4Bf7-Ot8oMIY4yf2m7x2-Uag?e=0Rvpug)schema, and #3) broadcast J2375 SSMs through RSU. For #1) the V2XHub carma-streets-plugin needs to pull the json SSM from CARMA Steets Kafa broker. For #3) the V2XHub immediate-forward-plugin configuration needs to to be updated. ## Related Issue https://github.com/usdot-fhwa-OPS/V2X-Hub/issues/534 ## Motivation and Context TM/TSP ## How Has This Been Tested? Unit test ## Types of changes - [ ] Defect fix (non-breaking change that fixes an issue) - [x] New feature (non-breaking change that adds functionality) - [ ] Breaking change (fix or feature that cause existing functionality to change) ## Checklist: - [ ] I have added any new packages to the sonar-scanner.properties file - [ ] My change requires a change to the documentation. - [ ] I have updated the documentation accordingly. - [x] I have read the **CONTRIBUTING** document. [V2XHUB Contributing Guide](https://github.com/usdot-fhwa-OPS/V2X-Hub/blob/develop/Contributing.md) - [ ] I have added tests to cover my changes. - [ ] All new and existing tests passed. --- src/v2i-hub/CARMAStreetsPlugin/CMakeLists.txt | 2 +- src/v2i-hub/CARMAStreetsPlugin/manifest.json | 10 ++ .../src/CARMAStreetsPlugin.cpp | 89 +++++++++- .../src/CARMAStreetsPlugin.h | 23 ++- .../src/JsonToJ2735SSMConverter.cpp | 156 ++++++++++++++++++ .../src/JsonToJ2735SSMConverter.h | 49 ++++++ .../test/test_JsonToJ2735SSMConverter.cpp | 102 ++++++++++++ .../ImmediateForwardPlugin/manifest.json | 2 +- 8 files changed, 428 insertions(+), 5 deletions(-) create mode 100644 src/v2i-hub/CARMAStreetsPlugin/src/JsonToJ2735SSMConverter.cpp create mode 100644 src/v2i-hub/CARMAStreetsPlugin/src/JsonToJ2735SSMConverter.h create mode 100644 src/v2i-hub/CARMAStreetsPlugin/test/test_JsonToJ2735SSMConverter.cpp diff --git a/src/v2i-hub/CARMAStreetsPlugin/CMakeLists.txt b/src/v2i-hub/CARMAStreetsPlugin/CMakeLists.txt index f904ecac6..f98aebcae 100644 --- a/src/v2i-hub/CARMAStreetsPlugin/CMakeLists.txt +++ b/src/v2i-hub/CARMAStreetsPlugin/CMakeLists.txt @@ -9,7 +9,7 @@ TARGET_LINK_LIBRARIES (${PROJECT_NAME} tmxutils rdkafka++ jsoncpp) ############# enable_testing() include_directories(${PROJECT_SOURCE_DIR}/src) -add_library(${PROJECT_NAME}_lib src/J2735MapToJsonConverter.cpp src/JsonToJ2735SpatConverter.cpp src/J2735ToSRMJsonConverter.cpp) +add_library(${PROJECT_NAME}_lib src/J2735MapToJsonConverter.cpp src/JsonToJ2735SSMConverter.cpp src/JsonToJ2735SpatConverter.cpp src/J2735ToSRMJsonConverter.cpp) target_link_libraries(${PROJECT_NAME}_lib PUBLIC ${TMXAPI_LIBRARIES} ${ASN_J2735_LIBRARIES} ${MYSQL_LIBRARIES} diff --git a/src/v2i-hub/CARMAStreetsPlugin/manifest.json b/src/v2i-hub/CARMAStreetsPlugin/manifest.json index c4fb269cd..790ef1af9 100644 --- a/src/v2i-hub/CARMAStreetsPlugin/manifest.json +++ b/src/v2i-hub/CARMAStreetsPlugin/manifest.json @@ -102,6 +102,16 @@ "default": "modified_spat", "description": "Apache Kafka topic plugin will transmit message to." }, + { + "key": "SsmTopic", + "default": "v2xhub_ssm_sub", + "description": "Apache Kafka topic plugin will transmit message to." + }, + { + "key": "SsmConsumerGroupId", + "default": "v2xhub_ssm", + "description": "Apache Kafka consumer group ID for spat consumer." + }, { "key": "SpatConsumerGroupId", "default": "v2xhub_spat", diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp index 2e2fdf499..08bc96d8a 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp @@ -46,7 +46,9 @@ void CARMAStreetsPlugin::UpdateConfigSettings() { GetConfigValue("SchedulingPlanTopic", _subscribeToSchedulingPlanTopic); GetConfigValue("SchedulingPlanConsumerGroupId", _subscribeToSchedulingPlanConsumerGroupId); GetConfigValue("SpatTopic", _subscribeToSpatTopic); + GetConfigValue("SsmTopic", _subscribeToSsmTopic); GetConfigValue("SpatConsumerGroupId", _subscribeToSpatConsumerGroupId); + GetConfigValue("SsmConsumerGroupId", _subscribeToSSMConsumerGroupId); GetConfigValue("BsmTopic", _transmitBSMTopic); GetConfigValue("MobilityOperationTopic", _transmitMobilityOperationTopic); GetConfigValue("MobilityPathTopic", _transmitMobilityPathTopic); @@ -68,6 +70,8 @@ void CARMAStreetsPlugin::UpdateConfigSettings() { kafka_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); kafka_conf_sp_consumer = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); kafka_conf_spat_consumer = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + kafka_conf_ssm_consumer = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + PLOG(logDEBUG) <<"Attempting to connect to " << kafkaConnectString; if ((kafka_conf->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK)) { @@ -87,7 +91,10 @@ void CARMAStreetsPlugin::UpdateConfigSettings() { if (kafka_conf_sp_consumer->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK || (kafka_conf_sp_consumer->set("group.id", _subscribeToSchedulingPlanConsumerGroupId, error_string) != RdKafka::Conf::CONF_OK) || (kafka_conf_spat_consumer->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK) - || (kafka_conf_spat_consumer->set("group.id", _subscribeToSpatConsumerGroupId, error_string) != RdKafka::Conf::CONF_OK)) { + || (kafka_conf_spat_consumer->set("group.id", _subscribeToSpatConsumerGroupId, error_string) != RdKafka::Conf::CONF_OK) + || (kafka_conf_ssm_consumer->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK) + || (kafka_conf_ssm_consumer->set("group.id", _subscribeToSSMConsumerGroupId, error_string) != RdKafka::Conf::CONF_OK) + ) { PLOG(logERROR) <<"Setting kafka config group.id options failed with error:" << error_string << "\n" <<"Exiting with exit code 1"; exit(1); } else { @@ -95,20 +102,24 @@ void CARMAStreetsPlugin::UpdateConfigSettings() { } kafka_conf_sp_consumer->set("enable.partition.eof", "true", error_string); kafka_conf_spat_consumer->set("enable.partition.eof", "true", error_string); + kafka_conf_ssm_consumer->set("enable.partition.eof", "true", error_string); _scheduing_plan_kafka_consumer = RdKafka::KafkaConsumer::create(kafka_conf_sp_consumer, error_string); _spat_kafka_consumer = RdKafka::KafkaConsumer::create(kafka_conf_spat_consumer, error_string); + _ssm_kafka_consumer = RdKafka::KafkaConsumer::create(kafka_conf_ssm_consumer, error_string); - if ( !_scheduing_plan_kafka_consumer || !_spat_kafka_consumer) { + if ( !_scheduing_plan_kafka_consumer || !_spat_kafka_consumer || !_ssm_kafka_consumer) { PLOG(logERROR) << "Failed to create Kafka consumers: " << error_string << std::endl; exit(1); } PLOG(logDEBUG) << "Created consumer " << _scheduing_plan_kafka_consumer->name() << std::endl; PLOG(logDEBUG) << "Created consumer " << _spat_kafka_consumer->name() << std::endl; + PLOG(logDEBUG) << "Created consumer " << _ssm_kafka_consumer->name() << std::endl; //create kafka topics RdKafka::Conf *tconf_spat = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); RdKafka::Conf *tconf_sp = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); + RdKafka::Conf *tconf_ssm = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); if(!tconf_spat && !tconf_sp) { PLOG(logERROR) << "RDKafka create topic conf failed "; @@ -129,11 +140,20 @@ void CARMAStreetsPlugin::UpdateConfigSettings() { return ; } + _ssm_topic = RdKafka::Topic::create(_ssm_kafka_consumer,_subscribeToSsmTopic,tconf_ssm,error_string); + if(!_ssm_topic) + { + PLOG(logERROR) << "RDKafka create SSM topic failed:" << error_string; + return ; + } + delete tconf_sp; delete tconf_spat; + delete tconf_ssm; boost::thread thread_schpl(&CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic, this); boost::thread thread_spat(&CARMAStreetsPlugin::SubscribeSpatKafkaTopic, this); + boost::thread thread_ssm(&CARMAStreetsPlugin::SubscribeSSMKafkaTopic, this); } void CARMAStreetsPlugin::OnConfigChanged(const char *key, const char *value) { @@ -659,6 +679,71 @@ void CARMAStreetsPlugin::SubscribeSpatKafkaTopic(){ } } +void CARMAStreetsPlugin::SubscribeSSMKafkaTopic(){ + + if(_subscribeToSsmTopic.length() > 0) + { + PLOG(logDEBUG) << "SubscribeSSMKafkaTopics:" <<_subscribeToSsmTopic << std::endl; + std::vector topics; + topics.emplace_back(_subscribeToSsmTopic); + + RdKafka::ErrorCode err = _ssm_kafka_consumer->subscribe(topics); + if (err) + { + PLOG(logERROR) << "Failed to subscribe to " << topics.size() << " topics: " << RdKafka::err2str(err) << std::endl; + return; + } + //Initialize Json to J2735 SSM convertor + JsonToJ2735SSMConverter ssm_convertor; + while (true) + { + auto msg = _ssm_kafka_consumer->consume( 500 ); + if( msg->err() == RdKafka::ERR_NO_ERROR ) + { + auto payload_str = static_cast( msg->payload() ); + if(msg->len() > 0) + { + PLOG(logDEBUG) << "consumed message payload: " << payload_str <(Key_SSMMessageSkipped, ++_ssmMessageSkipped); + continue; + } + //Convert the SSM JSON string into J2735 SSM message and encode it. + auto ssm_ptr = std::make_shared(); + ssm_convertor.toJ2735SSM(ssmDoc, ssm_ptr); + tmx::messages::SsmEncodedMessage ssmEncodedMsg; + try + { + ssm_convertor.encodeSSM(ssm_ptr, ssmEncodedMsg); + } + catch (TmxException &ex) + { + // Skip messages that fail to encode. + PLOG(logERROR) << "Failed to encoded SSM message : \n" << payload_str << std::endl << "Exception encountered: " + << ex.what() << std::endl; + ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SignalStatusMessage, ssm_ptr.get()); + SetStatus(Key_SSMMessageSkipped, ++_ssmMessageSkipped); + continue; + } + + ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SignalStatusMessage, ssm_ptr.get()); + PLOG(logDEBUG) << "ssmEncodedMsg: " << ssmEncodedMsg; + + //Broadcast the encoded SSM message + ssmEncodedMsg.set_flags(IvpMsgFlags_RouteDSRC); + ssmEncodedMsg.addDsrcMetadata(0x8002); + BroadcastMessage(static_cast(ssmEncodedMsg)); + } + } + delete msg; + } + } + +} bool CARMAStreetsPlugin::getEncodedtsm3( tsm3EncodedMessage *tsm3EncodedMsg, Json::Value metadata, Json::Value payload_json ) { try diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h index a9111e945..566aaf35c 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h @@ -17,6 +17,7 @@ #include "J2735MapToJsonConverter.h" #include "JsonToJ2735SpatConverter.h" #include "J2735ToSRMJsonConverter.h" +#include "JsonToJ2735SSMConverter.h" @@ -62,6 +63,10 @@ class CARMAStreetsPlugin: public PluginClient { * @brief Subcribe to SPAT Kafka topic created by carma-streets */ void SubscribeSpatKafkaTopic(); + /** + * @brief Subcribe to SSM Kafka topic created by carma-streets + */ + void SubscribeSSMKafkaTopic(); bool getEncodedtsm3(tsm3EncodedMessage *tsm3EncodedMsg, Json::Value metadata, Json::Value payload_json); /** @@ -79,7 +84,9 @@ class CARMAStreetsPlugin: public PluginClient { std::string _subscribeToSchedulingPlanTopic; std::string _subscribeToSchedulingPlanConsumerGroupId; std::string _subscribeToSpatTopic; + std::string _subscribeToSsmTopic; std::string _subscribeToSpatConsumerGroupId; + std::string _subscribeToSSMConsumerGroupId; std::string _transmitMobilityPathTopic; std::string _transmitBSMTopic; std::string _transmitMAPTopic; @@ -89,11 +96,14 @@ class CARMAStreetsPlugin: public PluginClient { RdKafka::Conf *kafka_conf; RdKafka::Conf *kafka_conf_spat_consumer; RdKafka::Conf *kafka_conf_sp_consumer; + RdKafka::Conf *kafka_conf_ssm_consumer; RdKafka::Producer *kafka_producer; RdKafka::KafkaConsumer *_scheduing_plan_kafka_consumer; RdKafka::KafkaConsumer *_spat_kafka_consumer; + RdKafka::KafkaConsumer *_ssm_kafka_consumer; RdKafka::Topic *_scheduing_plan_topic; RdKafka::Topic *_spat_topic; + RdKafka::Topic *_ssm_topic; std::vector _strategies; tmx::messages::tsm3Message *_tsm3Message{NULL}; std::mutex data_lock; @@ -130,6 +140,7 @@ class CARMAStreetsPlugin: public PluginClient { /** * @brief Status label for Mobility Operation messages skipped due to errors. */ + const char* Key_MobilityOperationMessageSkipped = "Mobility Operation messages skipped due to errors."; /** @@ -156,7 +167,17 @@ class CARMAStreetsPlugin: public PluginClient { * @brief Count for BSM messages skipped due to errors. */ uint _bsmMessageSkipped = 0; - + + /** + * @brief Status label for SSM messages skipped due to errors. + */ + const char* Key_SSMMessageSkipped = "SSM messages skipped due to errors."; + + /** + * @brief Count for SSM messages skipped due to errors. + */ + uint _ssmMessageSkipped = 0; + /** * @brief Intersection Id for intersection */ diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/JsonToJ2735SSMConverter.cpp b/src/v2i-hub/CARMAStreetsPlugin/src/JsonToJ2735SSMConverter.cpp new file mode 100644 index 000000000..10e59d87d --- /dev/null +++ b/src/v2i-hub/CARMAStreetsPlugin/src/JsonToJ2735SSMConverter.cpp @@ -0,0 +1,156 @@ +#include "JsonToJ2735SSMConverter.h" + +namespace CARMAStreetsPlugin +{ + + bool JsonToJ2735SSMConverter::parseJsonString(const string &consumedMsg, Json::Value &ssmDoc) const + { + const auto jsonLen = static_cast(consumedMsg.length()); + Json::CharReaderBuilder builder; + JSONCPP_STRING err; + const std::unique_ptr reader(builder.newCharReader()); + bool parseResult = reader->parse(consumedMsg.c_str(), consumedMsg.c_str() + jsonLen, &ssmDoc, &err); + if (!parseResult) + { + PLOG(logERROR) << "Parse error: " << err << endl; + } + return parseResult; + } + + void JsonToJ2735SSMConverter::toJ2735SSM(const Json::Value &ssmDoc, std::shared_ptr ssmPtr) const + { + try + { + ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SignalStatusMessage, ssmPtr.get()); + if (!ssmDoc.isMember("SignalStatus")) + { + PLOG(logERROR) << "No SignalStatus present in JSON." << std::endl; + return; + } + + // populate SignalStatusMessage::second + if (ssmDoc["SignalStatus"].isMember("msOfMinute") && ssmDoc["SignalStatus"]["msOfMinute"].isNumeric()) + { + ssmPtr->second = ssmDoc["SignalStatus"]["msOfMinute"].asInt64(); + } + + // populate SignalStatusMessage::timstamp + if (ssmDoc["SignalStatus"].isMember("minuteOfYear") && ssmDoc["SignalStatus"]["minuteOfYear"].isNumeric()) + { + MinuteOfTheYear_t *timeStamp = (MinuteOfTheYear_t *)calloc(1, sizeof(MinuteOfTheYear_t)); + *timeStamp = ssmDoc["SignalStatus"]["minuteOfYear"].asInt64(); + ssmPtr->timeStamp = timeStamp; + } + + SignalStatusList_t *statusPtr = (SignalStatusList_t *)calloc(1, sizeof(SignalStatusList_t)); + SignalStatus *signalStatus = (SignalStatus *)calloc(1, sizeof(SignalStatus)); + + // populate SignalStatusMessage::status::id + if (ssmDoc["SignalStatus"].isMember("intersectionID") && ssmDoc["SignalStatus"]["intersectionID"].isNumeric()) + { + signalStatus->id.id = ssmDoc["SignalStatus"]["intersectionID"].asInt64(); + } + + // populate SignalStatusMessage::status::sequenceNumber + if (ssmDoc["SignalStatus"].isMember("sequenceNumber") && ssmDoc["SignalStatus"]["sequenceNumber"].isNumeric()) + { + signalStatus->sequenceNumber = ssmDoc["SignalStatus"]["sequenceNumber"].asInt64(); + } + + // populate SignalStatusMessage::status::sigStatus + if (ssmDoc["SignalStatus"].isMember("requestorInfo") && ssmDoc["SignalStatus"]["requestorInfo"].isArray()) + { + Json::Value requesterJsonArr = ssmDoc["SignalStatus"]["requestorInfo"]; + for (auto itr = requesterJsonArr.begin(); itr != requesterJsonArr.end(); itr++) + { + SignalStatusPackage *signalStatusPackage = (SignalStatusPackage *)calloc(1, sizeof(SignalStatusPackage)); + populateSigStatusPackage(signalStatusPackage, itr); + asn_sequence_add(&signalStatus->sigStatus.list.array, signalStatusPackage); + } // Populate signal status package + } + + asn_sequence_add(&statusPtr->list.array, signalStatus); + ssmPtr->status = *statusPtr; + } + catch(exception &ex) + { + PLOG(logERROR) << "Cannot read JSON file." << std::endl; + } + } + + void JsonToJ2735SSMConverter::populateSigStatusPackage(SignalStatusPackage *signalStatusPackage, Json::Value::iterator itr) const + { + signalStatusPackage->requester = (SignalRequesterInfo *)calloc(1, sizeof(SignalRequesterInfo)); + + // populate SignalStatusMessage::status::sigStatus::requester::request + if (itr->isMember("requestID") && (*itr)["requestID"].isNumeric()) + { + signalStatusPackage->requester->request = (*itr)["requestID"].asInt64(); + } + + // populate SignalStatusMessage::status::sigStatus::requester::id + if (itr->isMember("vehicleID") && (*itr)["vehicleID"].isNumeric()) + { + signalStatusPackage->requester->id.choice.stationID = (*itr)["vehicleID"].asInt64(); + signalStatusPackage->requester->id.present = VehicleID_PR_stationID; + } + + // populate SignalStatusMessage::status::sigStatus::requester::sequenceNumber + if (itr->isMember("msgCount") && (*itr)["msgCount"].isNumeric()) + { + signalStatusPackage->requester->sequenceNumber = (*itr)["msgCount"].asInt64(); + } + + // populate SignalStatusMessage::status::sigStatus::requester::role + if (itr->isMember("basicVehicleRole") && (*itr)["basicVehicleRole"].isNumeric()) + { + signalStatusPackage->requester->role = (BasicVehicleRole_t *)calloc(1, sizeof(BasicVehicleRole_t)); + *signalStatusPackage->requester->role = (*itr)["basicVehicleRole"].asInt64(); + } + + // populate SignalStatusMessage::status::sigStatus::inboundOn + if (itr->isMember("inBoundLaneID") && (*itr)["inBoundLaneID"].isNumeric()) + { + signalStatusPackage->inboundOn.present = IntersectionAccessPoint_PR_lane; + signalStatusPackage->inboundOn.choice.lane = (*itr)["inBoundLaneID"].asInt64(); + } + else if (itr->isMember("inBoundApproachID") && (*itr)["inBoundApproachID"].isNumeric()) + { + signalStatusPackage->inboundOn.present = IntersectionAccessPoint_PR_approach; + signalStatusPackage->inboundOn.choice.approach = (*itr)["inBoundApproachID"].asInt64(); + } + + // populate SignalStatusMessage::status::sigStatus::status + if (itr->isMember("priorityRequestStatus") && (*itr)["priorityRequestStatus"].isNumeric()) + { + signalStatusPackage->status = (*itr)["priorityRequestStatus"].asInt64(); + } + + // populate SignalStatusMessage::status::sigStatus::duration + if (itr->isMember("ETA_Duration") && (*itr)["ETA_Duration"].isNumeric()) + { + signalStatusPackage->duration = (DSecond_t *)calloc(1, sizeof(DSecond_t)); + *signalStatusPackage->duration = (*itr)["ETA_Duration"].asInt64(); + } + + // populate SignalStatusMessage::status::sigStatus::minute + if (itr->isMember("ETA_Minute") && (*itr)["ETA_Minute"].isNumeric()) + { + signalStatusPackage->minute = (DSecond_t *)calloc(1, sizeof(DSecond_t)); + *signalStatusPackage->minute = (*itr)["ETA_Minute"].asInt64(); + } + + // populate SignalStatusMessage::status::sigStatus::second + if (itr->isMember("ETA_Second") && (*itr)["ETA_Second"].isNumeric()) + { + signalStatusPackage->minute = (DSecond_t *)calloc(1, sizeof(DSecond_t)); + *signalStatusPackage->minute = (*itr)["ETA_Second"].asInt64(); + } + } + void JsonToJ2735SSMConverter::encodeSSM(const std::shared_ptr &ssmPtr, tmx::messages::SsmEncodedMessage &encodedSSM) const + { + tmx::messages::MessageFrameMessage frame(ssmPtr); + encodedSSM.set_data(tmx::messages::TmxJ2735EncodedMessage::encode_j2735_message>(frame)); + free(frame.get_j2735_data().get()); + } +} diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/JsonToJ2735SSMConverter.h b/src/v2i-hub/CARMAStreetsPlugin/src/JsonToJ2735SSMConverter.h new file mode 100644 index 000000000..64d90c816 --- /dev/null +++ b/src/v2i-hub/CARMAStreetsPlugin/src/JsonToJ2735SSMConverter.h @@ -0,0 +1,49 @@ +#ifndef JSONTOJ2735SSMCONVERTER_H_ +#define JSONTOJ2735SSMCONVERTER_H_ +#include "jsoncpp/json/json.h" +#include +#include +#include + +using namespace std; +using namespace tmx::utils; +namespace CARMAStreetsPlugin +{ + class JsonToJ2735SSMConverter + { + public: + // Constructor to initialize object + JsonToJ2735SSMConverter() = default; + /*** + * @brief Parse Json string into Json object + * @param jsonstring that is consumed from a kafka topic + * @param JsonObject that is populated by the input json string + * @return boolean. True if parse successfully, false if parse with errors + */ + bool parseJsonString(const string &consumed_msg, Json::Value &ssmDoc) const; + /*** + * @brief Read SSM Json document and populate J2735 SSM message + * @param ssmDoc Json object that contains the SSM information in Json format + * @param ssmPtr a pointer to J2735 message object. This object will be populated with the values in the Json object + */ + void toJ2735SSM(const Json::Value &ssmDoc, std::shared_ptr ssmPtr) const; + /*** + * @brief Encode J2735 SSM + * @param Pointer to J2735 SSM object + * @param Encoded J2735 SSM + */ + void encodeSSM(const std::shared_ptr &ssmPtr, tmx::messages::SsmEncodedMessage &encodedSSM) const; + + /*** + * @brief Populate J2735 SignalStatusPackage object with Json Value + * @param Pointer J2735 SignalStatusPackage object + * @param Json::Value SignalstatusPackage Json string iterator + */ + void populateSigStatusPackage(SignalStatusPackage *signalStatusPackage, Json::Value::iterator itr) const; + + ~JsonToJ2735SSMConverter() = default; + }; + +} + +#endif \ No newline at end of file diff --git a/src/v2i-hub/CARMAStreetsPlugin/test/test_JsonToJ2735SSMConverter.cpp b/src/v2i-hub/CARMAStreetsPlugin/test/test_JsonToJ2735SSMConverter.cpp new file mode 100644 index 000000000..2bd9fb405 --- /dev/null +++ b/src/v2i-hub/CARMAStreetsPlugin/test/test_JsonToJ2735SSMConverter.cpp @@ -0,0 +1,102 @@ +#include +#include +#include +#include +#include "jsoncpp/json/json.h" +#include "JsonToJ2735SSMConverter.h" +using namespace std; + +namespace CARMAStreetsPlugin +{ + class test_JsonToJ2735SSMConverter : public ::testing::Test + { + }; + + TEST_F(test_JsonToJ2735SSMConverter, parseJsonString) + { + JsonToJ2735SSMConverter converter; + // Json string refer to: https://github.com/mmitss/mmitss-az/blob/master/src/mrp/priority-request-server/SSM.json + string valid_json_str = "{\"MessageType\":\"SSM\",\"SignalStatus\":{\"intersectionID\":1003,\"minuteOfYear\":345239,\"msOfMinute\":54000,\"regionalID\":0,\"requestorInfo\":[{\"ETA_Duration\":2000,\"ETA_Minute\":1,\"ETA_Second\":20.001000000000005,\"basicVehicleRole\":16,\"inBoundLaneID\":8,\"msgCount\":2,\"priorityRequestStatus\":4,\"requestID\":5,\"vehicleID\":601},{\"ETA_Duration\":2000,\"ETA_Minute\":1,\"ETA_Second\":22.001000000000005,\"basicVehicleRole\":16,\"inBoundLaneID\":12,\"msgCount\":6,\"priorityRequestStatus\":4,\"requestID\":5,\"vehicleID\":605},{\"ETA_Duration\":2000,\"ETA_Minute\":1,\"ETA_Second\":22.001000000000005,\"basicVehicleRole\":9,\"inBoundLaneID\":12,\"msgCount\":6,\"priorityRequestStatus\":0,\"requestID\":5,\"vehicleID\":610}],\"sequenceNumber\":4,\"updateCount\":4},\"noOfRequest\":3}"; + Json::Value root; + bool result = converter.parseJsonString(valid_json_str, root); + ASSERT_TRUE(result); + string invalid_json = "invalid"; + result = converter.parseJsonString(invalid_json, root); + ASSERT_FALSE(result); + } + + TEST_F(test_JsonToJ2735SSMConverter, toJ2735SSM) + { + JsonToJ2735SSMConverter converter; + // Json string refer to: https://github.com/mmitss/mmitss-az/blob/master/src/mrp/priority-request-server/SSM.json + // Json has three requestors + string valid_json_str = "{\"MessageType\":\"SSM\",\"SignalStatus\":{\"intersectionID\":1003,\"minuteOfYear\":345239,\"msOfMinute\":54000,\"regionalID\":0,\"requestorInfo\":[{\"ETA_Duration\":2000,\"ETA_Minute\":1,\"ETA_Second\":20.001000000000005,\"basicVehicleRole\":16,\"inBoundApproachID\":8,\"msgCount\":2,\"priorityRequestStatus\":4,\"requestID\":5,\"vehicleID\":601},{\"ETA_Duration\":2000,\"ETA_Minute\":1,\"ETA_Second\":22.001000000000005,\"basicVehicleRole\":16,\"inBoundLaneID\":12,\"msgCount\":6,\"priorityRequestStatus\":4,\"requestID\":5,\"vehicleID\":605},{\"ETA_Duration\":2000,\"ETA_Minute\":1,\"ETA_Second\":22.001000000000005,\"basicVehicleRole\":9,\"inBoundLaneID\":12,\"msgCount\":6,\"priorityRequestStatus\":0,\"requestID\":5,\"vehicleID\":610}],\"sequenceNumber\":4,\"updateCount\":4},\"noOfRequest\":3}"; + Json::Value root; + bool result = converter.parseJsonString(valid_json_str, root); + ASSERT_TRUE(result); + auto ssmPtr = std::make_shared(); + converter.toJ2735SSM(root, ssmPtr); + ASSERT_EQ(1, ssmPtr->status.list.count); + ASSERT_EQ(1003, ssmPtr->status.list.array[0]->id.id); + ASSERT_EQ(54000, ssmPtr->second); + ASSERT_EQ(345239, *ssmPtr->timeStamp); + ASSERT_EQ(3, ssmPtr->status.list.array[0]->sigStatus.list.count); + ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SignalStatusMessage, ssmPtr.get()); + } + + TEST_F(test_JsonToJ2735SSMConverter, toJ2735SSM_2) + { + JsonToJ2735SSMConverter converter; + // Json has one requestor + string valid_json_str = "{\"MessageType\":\"SSM\",\"SignalStatus\":{\"intersectionID\":1004,\"minuteOfYear\":345240,\"msOfMinute\":54001,\"regionalID\":0,\"requestorInfo\":[{\"ETA_Duration\":2000,\"ETA_Minute\":1,\"ETA_Second\":20.001000000000005,\"basicVehicleRole\":16,\"inBoundLaneID\":8,\"msgCount\":2,\"priorityRequestStatus\":4,\"requestID\":5,\"vehicleID\":601}],\"sequenceNumber\":4,\"updateCount\":4},\"noOfRequest\":3}"; + Json::Value root; + bool result = converter.parseJsonString(valid_json_str, root); + ASSERT_TRUE(result); + auto ssmPtr = std::make_shared(); + converter.toJ2735SSM(root, ssmPtr); + ASSERT_EQ(1, ssmPtr->status.list.count); + ASSERT_EQ(1004, ssmPtr->status.list.array[0]->id.id); + ASSERT_EQ(54001, ssmPtr->second); + ASSERT_EQ(345240, *ssmPtr->timeStamp); + ASSERT_EQ(1, ssmPtr->status.list.array[0]->sigStatus.list.count); + ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SignalStatusMessage, ssmPtr.get()); + } + + TEST_F(test_JsonToJ2735SSMConverter, toJ2735SSM_3) + { + JsonToJ2735SSMConverter converter; + // Json has NO requestor + string valid_json_str = "{\"MessageType\":\"SSM\",\"SignalStatus\":{\"intersectionID\":1004,\"minuteOfYear\":345240,\"msOfMinute\":54001,\"regionalID\":0,\"sequenceNumber\":4,\"updateCount\":4},\"noOfRequest\":3}"; + Json::Value root; + bool result = converter.parseJsonString(valid_json_str, root); + ASSERT_TRUE(result); + auto ssmPtr = std::make_shared(); + converter.toJ2735SSM(root, ssmPtr); + ASSERT_EQ(0, ssmPtr->status.list.array[0]->sigStatus.list.count); + ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SignalStatusMessage, ssmPtr.get()); + } + + TEST_F(test_JsonToJ2735SSMConverter, encodeSSM) + { + JsonToJ2735SSMConverter converter; + // Json has one requestor + string valid_json_str = "{\"MessageType\":\"SSM\",\"SignalStatus\":{\"intersectionID\":1004,\"minuteOfYear\":345240,\"msOfMinute\":54001,\"regionalID\":0,\"requestorInfo\":[{\"ETA_Duration\":2000,\"ETA_Minute\":1,\"ETA_Second\":20.001000000000005,\"basicVehicleRole\":16,\"inBoundLaneID\":8,\"msgCount\":2,\"priorityRequestStatus\":4,\"requestID\":5,\"vehicleID\":601}],\"sequenceNumber\":4,\"updateCount\":4},\"noOfRequest\":3}"; + Json::Value root; + converter.parseJsonString(valid_json_str, root); + auto ssmPtr = std::make_shared(); + converter.toJ2735SSM(root, ssmPtr); + tmx::messages::SsmEncodedMessage encodedSSM; + converter.encodeSSM(ssmPtr, encodedSSM); + string expected_payload_hex = "001e18454498d2f1001007d8054a000004b20a090010000280fa08"; + ASSERT_EQ(expected_payload_hex, encodedSSM.get_payload_str()); + + // Json has three requestors + valid_json_str = "{\"MessageType\":\"SSM\",\"SignalStatus\":{\"intersectionID\":1003,\"minuteOfYear\":345239,\"msOfMinute\":54000,\"regionalID\":0,\"requestorInfo\":[{\"ETA_Duration\":2000,\"ETA_Minute\":1,\"ETA_Second\":20.001000000000005,\"basicVehicleRole\":16,\"inBoundLaneID\":8,\"msgCount\":2,\"priorityRequestStatus\":4,\"requestID\":5,\"vehicleID\":601},{\"ETA_Duration\":2000,\"ETA_Minute\":1,\"ETA_Second\":22.001000000000005,\"basicVehicleRole\":16,\"inBoundLaneID\":12,\"msgCount\":6,\"priorityRequestStatus\":4,\"requestID\":5,\"vehicleID\":605},{\"ETA_Duration\":2000,\"ETA_Minute\":1,\"ETA_Second\":22.001000000000005,\"basicVehicleRole\":9,\"inBoundLaneID\":12,\"msgCount\":6,\"priorityRequestStatus\":0,\"requestID\":5,\"vehicleID\":610}],\"sequenceNumber\":4,\"updateCount\":4},\"noOfRequest\":3}"; + converter.parseJsonString(valid_json_str, root); + converter.toJ2735SSM(root, ssmPtr); + converter.encodeSSM(ssmPtr, encodedSSM); + expected_payload_hex = "001e35454497d2f0001007d6254a000004b20a090010000280fa08a940000097414320030000581f4115280000131028624060000b03e800"; + ASSERT_EQ(expected_payload_hex, encodedSSM.get_payload_str()); + ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SignalStatusMessage, ssmPtr.get()); + } +} \ No newline at end of file diff --git a/src/v2i-hub/ImmediateForwardPlugin/manifest.json b/src/v2i-hub/ImmediateForwardPlugin/manifest.json index 5b142b2b1..43e6603c4 100644 --- a/src/v2i-hub/ImmediateForwardPlugin/manifest.json +++ b/src/v2i-hub/ImmediateForwardPlugin/manifest.json @@ -14,7 +14,7 @@ }, { "key": "Messages_Destination_1", - "default": "{ \"Messages\": [ { \"TmxType\": \"SPAT-P\", \"SendType\": \"SPAT\", \"PSID\": \"0x8002\", \"Channel\": \"183\" }, { \"TmxType\": \"MAP-P\", \"SendType\": \"MAP\", \"PSID\": \"0x8002\", \"Channel\": \"183\" }, { \"TmxType\": \"PSM\", \"SendType\": \"PSM\", \"PSID\": \"0x8002\", \"Channel\": \"183\" } ,{ \"TmxType\": \"TMSG07\", \"SendType\": \"TMSG07\", \"PSID\": \"0x8002\", \"Channel\": \"183\" },{ \"TmxType\": \"TMSG03-P\", \"SendType\": \"TMSG03-P\", \"PSID\": \"0xBFEE\", \"Channel\": \"183\" },{ \"TmxType\": \"TMSG05-P\", \"SendType\": \"TMSG05-P\", \"PSID\": \"0x8003\", \"Channel\": \"183\" }] }", + "default": "{ \"Messages\": [ { \"TmxType\": \"SPAT-P\", \"SendType\": \"SPAT\", \"PSID\": \"0x8002\", \"Channel\": \"183\" }, { \"TmxType\": \"MAP-P\", \"SendType\": \"MAP\", \"PSID\": \"0x8002\", \"Channel\": \"183\" }, { \"TmxType\": \"PSM\", \"SendType\": \"PSM\", \"PSID\": \"0x8002\", \"Channel\": \"183\" } ,{ \"TmxType\": \"TMSG07\", \"SendType\": \"TMSG07\", \"PSID\": \"0x8002\", \"Channel\": \"183\" },{ \"TmxType\": \"TMSG03-P\", \"SendType\": \"TMSG03-P\", \"PSID\": \"0xBFEE\", \"Channel\": \"183\" },{ \"TmxType\": \"TMSG05-P\", \"SendType\": \"TMSG05-P\", \"PSID\": \"0x8003\", \"Channel\": \"183\" }, { \"TmxType\": \"SSM\", \"SendType\": \"SSM\", \"PSID\": \"0x8002\", \"Channel\": \"183\" },] }", "description": "JSON data defining the message types, PSIDs, and channel number for messages forwarded to the V2X radio at destination 1." }, { From 4528b26194e76de39bade494e85cea8d6c47cf1b Mon Sep 17 00:00:00 2001 From: Will Martin Date: Fri, 16 Jun 2023 09:33:40 -0400 Subject: [PATCH 2/2] modified: src/v2i-hub/ImmediateForwardPlugin/manifest.json (#541) # PR Details ## Description Removing unexpected comma in manifest.json ## Related Issue ## Motivation and Context ## How Has This Been Tested? ## Types of changes - [ ] Defect fix (non-breaking change that fixes an issue) - [ ] New feature (non-breaking change that adds functionality) - [ ] Breaking change (fix or feature that cause existing functionality to change) ## Checklist: - [ ] I have added any new packages to the sonar-scanner.properties file - [ ] My change requires a change to the documentation. - [ ] I have updated the documentation accordingly. - [ ] I have read the **CONTRIBUTING** document. [V2XHUB Contributing Guide](https://github.com/usdot-fhwa-OPS/V2X-Hub/blob/develop/Contributing.md) - [ ] I have added tests to cover my changes. - [ ] All new and existing tests passed. --- src/v2i-hub/ImmediateForwardPlugin/manifest.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/v2i-hub/ImmediateForwardPlugin/manifest.json b/src/v2i-hub/ImmediateForwardPlugin/manifest.json index 43e6603c4..b26756a76 100644 --- a/src/v2i-hub/ImmediateForwardPlugin/manifest.json +++ b/src/v2i-hub/ImmediateForwardPlugin/manifest.json @@ -14,7 +14,7 @@ }, { "key": "Messages_Destination_1", - "default": "{ \"Messages\": [ { \"TmxType\": \"SPAT-P\", \"SendType\": \"SPAT\", \"PSID\": \"0x8002\", \"Channel\": \"183\" }, { \"TmxType\": \"MAP-P\", \"SendType\": \"MAP\", \"PSID\": \"0x8002\", \"Channel\": \"183\" }, { \"TmxType\": \"PSM\", \"SendType\": \"PSM\", \"PSID\": \"0x8002\", \"Channel\": \"183\" } ,{ \"TmxType\": \"TMSG07\", \"SendType\": \"TMSG07\", \"PSID\": \"0x8002\", \"Channel\": \"183\" },{ \"TmxType\": \"TMSG03-P\", \"SendType\": \"TMSG03-P\", \"PSID\": \"0xBFEE\", \"Channel\": \"183\" },{ \"TmxType\": \"TMSG05-P\", \"SendType\": \"TMSG05-P\", \"PSID\": \"0x8003\", \"Channel\": \"183\" }, { \"TmxType\": \"SSM\", \"SendType\": \"SSM\", \"PSID\": \"0x8002\", \"Channel\": \"183\" },] }", + "default": "{ \"Messages\": [ { \"TmxType\": \"SPAT-P\", \"SendType\": \"SPAT\", \"PSID\": \"0x8002\", \"Channel\": \"183\" }, { \"TmxType\": \"MAP-P\", \"SendType\": \"MAP\", \"PSID\": \"0x8002\", \"Channel\": \"183\" }, { \"TmxType\": \"PSM\", \"SendType\": \"PSM\", \"PSID\": \"0x8002\", \"Channel\": \"183\" } ,{ \"TmxType\": \"TMSG07\", \"SendType\": \"TMSG07\", \"PSID\": \"0x8002\", \"Channel\": \"183\" },{ \"TmxType\": \"TMSG03-P\", \"SendType\": \"TMSG03-P\", \"PSID\": \"0xBFEE\", \"Channel\": \"183\" },{ \"TmxType\": \"TMSG05-P\", \"SendType\": \"TMSG05-P\", \"PSID\": \"0x8003\", \"Channel\": \"183\" }, { \"TmxType\": \"SSM\", \"SendType\": \"SSM\", \"PSID\": \"0x8002\", \"Channel\": \"183\" }] }", "description": "JSON data defining the message types, PSIDs, and channel number for messages forwarded to the V2X radio at destination 1." }, {