diff --git a/src/v2i-hub/CARMAStreetsPlugin/CMakeLists.txt b/src/v2i-hub/CARMAStreetsPlugin/CMakeLists.txt index f904ecac6..f98aebcae 100644 --- a/src/v2i-hub/CARMAStreetsPlugin/CMakeLists.txt +++ b/src/v2i-hub/CARMAStreetsPlugin/CMakeLists.txt @@ -9,7 +9,7 @@ TARGET_LINK_LIBRARIES (${PROJECT_NAME} tmxutils rdkafka++ jsoncpp) ############# enable_testing() include_directories(${PROJECT_SOURCE_DIR}/src) -add_library(${PROJECT_NAME}_lib src/J2735MapToJsonConverter.cpp src/JsonToJ2735SpatConverter.cpp src/J2735ToSRMJsonConverter.cpp) +add_library(${PROJECT_NAME}_lib src/J2735MapToJsonConverter.cpp src/JsonToJ2735SSMConverter.cpp src/JsonToJ2735SpatConverter.cpp src/J2735ToSRMJsonConverter.cpp) target_link_libraries(${PROJECT_NAME}_lib PUBLIC ${TMXAPI_LIBRARIES} ${ASN_J2735_LIBRARIES} ${MYSQL_LIBRARIES} diff --git a/src/v2i-hub/CARMAStreetsPlugin/manifest.json b/src/v2i-hub/CARMAStreetsPlugin/manifest.json index c4fb269cd..790ef1af9 100644 --- a/src/v2i-hub/CARMAStreetsPlugin/manifest.json +++ b/src/v2i-hub/CARMAStreetsPlugin/manifest.json @@ -102,6 +102,16 @@ "default": "modified_spat", "description": "Apache Kafka topic plugin will transmit message to." }, + { + "key": "SsmTopic", + "default": "v2xhub_ssm_sub", + "description": "Apache Kafka topic plugin will transmit message to." + }, + { + "key": "SsmConsumerGroupId", + "default": "v2xhub_ssm", + "description": "Apache Kafka consumer group ID for spat consumer." + }, { "key": "SpatConsumerGroupId", "default": "v2xhub_spat", diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp index 2e2fdf499..08bc96d8a 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp @@ -46,7 +46,9 @@ void CARMAStreetsPlugin::UpdateConfigSettings() { GetConfigValue("SchedulingPlanTopic", _subscribeToSchedulingPlanTopic); GetConfigValue("SchedulingPlanConsumerGroupId", _subscribeToSchedulingPlanConsumerGroupId); GetConfigValue("SpatTopic", _subscribeToSpatTopic); + GetConfigValue("SsmTopic", _subscribeToSsmTopic); GetConfigValue("SpatConsumerGroupId", _subscribeToSpatConsumerGroupId); + GetConfigValue("SsmConsumerGroupId", _subscribeToSSMConsumerGroupId); GetConfigValue("BsmTopic", _transmitBSMTopic); GetConfigValue("MobilityOperationTopic", _transmitMobilityOperationTopic); GetConfigValue("MobilityPathTopic", _transmitMobilityPathTopic); @@ -68,6 +70,8 @@ void CARMAStreetsPlugin::UpdateConfigSettings() { kafka_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); kafka_conf_sp_consumer = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); kafka_conf_spat_consumer = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + kafka_conf_ssm_consumer = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + PLOG(logDEBUG) <<"Attempting to connect to " << kafkaConnectString; if ((kafka_conf->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK)) { @@ -87,7 +91,10 @@ void CARMAStreetsPlugin::UpdateConfigSettings() { if (kafka_conf_sp_consumer->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK || (kafka_conf_sp_consumer->set("group.id", _subscribeToSchedulingPlanConsumerGroupId, error_string) != RdKafka::Conf::CONF_OK) || (kafka_conf_spat_consumer->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK) - || (kafka_conf_spat_consumer->set("group.id", _subscribeToSpatConsumerGroupId, error_string) != RdKafka::Conf::CONF_OK)) { + || (kafka_conf_spat_consumer->set("group.id", _subscribeToSpatConsumerGroupId, error_string) != RdKafka::Conf::CONF_OK) + || (kafka_conf_ssm_consumer->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK) + || (kafka_conf_ssm_consumer->set("group.id", _subscribeToSSMConsumerGroupId, error_string) != RdKafka::Conf::CONF_OK) + ) { PLOG(logERROR) <<"Setting kafka config group.id options failed with error:" << error_string << "\n" <<"Exiting with exit code 1"; exit(1); } else { @@ -95,20 +102,24 @@ void CARMAStreetsPlugin::UpdateConfigSettings() { } kafka_conf_sp_consumer->set("enable.partition.eof", "true", error_string); kafka_conf_spat_consumer->set("enable.partition.eof", "true", error_string); + kafka_conf_ssm_consumer->set("enable.partition.eof", "true", error_string); _scheduing_plan_kafka_consumer = RdKafka::KafkaConsumer::create(kafka_conf_sp_consumer, error_string); _spat_kafka_consumer = RdKafka::KafkaConsumer::create(kafka_conf_spat_consumer, error_string); + _ssm_kafka_consumer = RdKafka::KafkaConsumer::create(kafka_conf_ssm_consumer, error_string); - if ( !_scheduing_plan_kafka_consumer || !_spat_kafka_consumer) { + if ( !_scheduing_plan_kafka_consumer || !_spat_kafka_consumer || !_ssm_kafka_consumer) { PLOG(logERROR) << "Failed to create Kafka consumers: " << error_string << std::endl; exit(1); } PLOG(logDEBUG) << "Created consumer " << _scheduing_plan_kafka_consumer->name() << std::endl; PLOG(logDEBUG) << "Created consumer " << _spat_kafka_consumer->name() << std::endl; + PLOG(logDEBUG) << "Created consumer " << _ssm_kafka_consumer->name() << std::endl; //create kafka topics RdKafka::Conf *tconf_spat = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); RdKafka::Conf *tconf_sp = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); + RdKafka::Conf *tconf_ssm = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); if(!tconf_spat && !tconf_sp) { PLOG(logERROR) << "RDKafka create topic conf failed "; @@ -129,11 +140,20 @@ void CARMAStreetsPlugin::UpdateConfigSettings() { return ; } + _ssm_topic = RdKafka::Topic::create(_ssm_kafka_consumer,_subscribeToSsmTopic,tconf_ssm,error_string); + if(!_ssm_topic) + { + PLOG(logERROR) << "RDKafka create SSM topic failed:" << error_string; + return ; + } + delete tconf_sp; delete tconf_spat; + delete tconf_ssm; boost::thread thread_schpl(&CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic, this); boost::thread thread_spat(&CARMAStreetsPlugin::SubscribeSpatKafkaTopic, this); + boost::thread thread_ssm(&CARMAStreetsPlugin::SubscribeSSMKafkaTopic, this); } void CARMAStreetsPlugin::OnConfigChanged(const char *key, const char *value) { @@ -659,6 +679,71 @@ void CARMAStreetsPlugin::SubscribeSpatKafkaTopic(){ } } +void CARMAStreetsPlugin::SubscribeSSMKafkaTopic(){ + + if(_subscribeToSsmTopic.length() > 0) + { + PLOG(logDEBUG) << "SubscribeSSMKafkaTopics:" <<_subscribeToSsmTopic << std::endl; + std::vector topics; + topics.emplace_back(_subscribeToSsmTopic); + + RdKafka::ErrorCode err = _ssm_kafka_consumer->subscribe(topics); + if (err) + { + PLOG(logERROR) << "Failed to subscribe to " << topics.size() << " topics: " << RdKafka::err2str(err) << std::endl; + return; + } + //Initialize Json to J2735 SSM convertor + JsonToJ2735SSMConverter ssm_convertor; + while (true) + { + auto msg = _ssm_kafka_consumer->consume( 500 ); + if( msg->err() == RdKafka::ERR_NO_ERROR ) + { + auto payload_str = static_cast( msg->payload() ); + if(msg->len() > 0) + { + PLOG(logDEBUG) << "consumed message payload: " << payload_str <(Key_SSMMessageSkipped, ++_ssmMessageSkipped); + continue; + } + //Convert the SSM JSON string into J2735 SSM message and encode it. + auto ssm_ptr = std::make_shared(); + ssm_convertor.toJ2735SSM(ssmDoc, ssm_ptr); + tmx::messages::SsmEncodedMessage ssmEncodedMsg; + try + { + ssm_convertor.encodeSSM(ssm_ptr, ssmEncodedMsg); + } + catch (TmxException &ex) + { + // Skip messages that fail to encode. + PLOG(logERROR) << "Failed to encoded SSM message : \n" << payload_str << std::endl << "Exception encountered: " + << ex.what() << std::endl; + ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SignalStatusMessage, ssm_ptr.get()); + SetStatus(Key_SSMMessageSkipped, ++_ssmMessageSkipped); + continue; + } + + ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SignalStatusMessage, ssm_ptr.get()); + PLOG(logDEBUG) << "ssmEncodedMsg: " << ssmEncodedMsg; + + //Broadcast the encoded SSM message + ssmEncodedMsg.set_flags(IvpMsgFlags_RouteDSRC); + ssmEncodedMsg.addDsrcMetadata(0x8002); + BroadcastMessage(static_cast(ssmEncodedMsg)); + } + } + delete msg; + } + } + +} bool CARMAStreetsPlugin::getEncodedtsm3( tsm3EncodedMessage *tsm3EncodedMsg, Json::Value metadata, Json::Value payload_json ) { try diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h index a9111e945..566aaf35c 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h @@ -17,6 +17,7 @@ #include "J2735MapToJsonConverter.h" #include "JsonToJ2735SpatConverter.h" #include "J2735ToSRMJsonConverter.h" +#include "JsonToJ2735SSMConverter.h" @@ -62,6 +63,10 @@ class CARMAStreetsPlugin: public PluginClient { * @brief Subcribe to SPAT Kafka topic created by carma-streets */ void SubscribeSpatKafkaTopic(); + /** + * @brief Subcribe to SSM Kafka topic created by carma-streets + */ + void SubscribeSSMKafkaTopic(); bool getEncodedtsm3(tsm3EncodedMessage *tsm3EncodedMsg, Json::Value metadata, Json::Value payload_json); /** @@ -79,7 +84,9 @@ class CARMAStreetsPlugin: public PluginClient { std::string _subscribeToSchedulingPlanTopic; std::string _subscribeToSchedulingPlanConsumerGroupId; std::string _subscribeToSpatTopic; + std::string _subscribeToSsmTopic; std::string _subscribeToSpatConsumerGroupId; + std::string _subscribeToSSMConsumerGroupId; std::string _transmitMobilityPathTopic; std::string _transmitBSMTopic; std::string _transmitMAPTopic; @@ -89,11 +96,14 @@ class CARMAStreetsPlugin: public PluginClient { RdKafka::Conf *kafka_conf; RdKafka::Conf *kafka_conf_spat_consumer; RdKafka::Conf *kafka_conf_sp_consumer; + RdKafka::Conf *kafka_conf_ssm_consumer; RdKafka::Producer *kafka_producer; RdKafka::KafkaConsumer *_scheduing_plan_kafka_consumer; RdKafka::KafkaConsumer *_spat_kafka_consumer; + RdKafka::KafkaConsumer *_ssm_kafka_consumer; RdKafka::Topic *_scheduing_plan_topic; RdKafka::Topic *_spat_topic; + RdKafka::Topic *_ssm_topic; std::vector _strategies; tmx::messages::tsm3Message *_tsm3Message{NULL}; std::mutex data_lock; @@ -130,6 +140,7 @@ class CARMAStreetsPlugin: public PluginClient { /** * @brief Status label for Mobility Operation messages skipped due to errors. */ + const char* Key_MobilityOperationMessageSkipped = "Mobility Operation messages skipped due to errors."; /** @@ -156,7 +167,17 @@ class CARMAStreetsPlugin: public PluginClient { * @brief Count for BSM messages skipped due to errors. */ uint _bsmMessageSkipped = 0; - + + /** + * @brief Status label for SSM messages skipped due to errors. + */ + const char* Key_SSMMessageSkipped = "SSM messages skipped due to errors."; + + /** + * @brief Count for SSM messages skipped due to errors. + */ + uint _ssmMessageSkipped = 0; + /** * @brief Intersection Id for intersection */ diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/JsonToJ2735SSMConverter.cpp b/src/v2i-hub/CARMAStreetsPlugin/src/JsonToJ2735SSMConverter.cpp new file mode 100644 index 000000000..10e59d87d --- /dev/null +++ b/src/v2i-hub/CARMAStreetsPlugin/src/JsonToJ2735SSMConverter.cpp @@ -0,0 +1,156 @@ +#include "JsonToJ2735SSMConverter.h" + +namespace CARMAStreetsPlugin +{ + + bool JsonToJ2735SSMConverter::parseJsonString(const string &consumedMsg, Json::Value &ssmDoc) const + { + const auto jsonLen = static_cast(consumedMsg.length()); + Json::CharReaderBuilder builder; + JSONCPP_STRING err; + const std::unique_ptr reader(builder.newCharReader()); + bool parseResult = reader->parse(consumedMsg.c_str(), consumedMsg.c_str() + jsonLen, &ssmDoc, &err); + if (!parseResult) + { + PLOG(logERROR) << "Parse error: " << err << endl; + } + return parseResult; + } + + void JsonToJ2735SSMConverter::toJ2735SSM(const Json::Value &ssmDoc, std::shared_ptr ssmPtr) const + { + try + { + ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SignalStatusMessage, ssmPtr.get()); + if (!ssmDoc.isMember("SignalStatus")) + { + PLOG(logERROR) << "No SignalStatus present in JSON." << std::endl; + return; + } + + // populate SignalStatusMessage::second + if (ssmDoc["SignalStatus"].isMember("msOfMinute") && ssmDoc["SignalStatus"]["msOfMinute"].isNumeric()) + { + ssmPtr->second = ssmDoc["SignalStatus"]["msOfMinute"].asInt64(); + } + + // populate SignalStatusMessage::timstamp + if (ssmDoc["SignalStatus"].isMember("minuteOfYear") && ssmDoc["SignalStatus"]["minuteOfYear"].isNumeric()) + { + MinuteOfTheYear_t *timeStamp = (MinuteOfTheYear_t *)calloc(1, sizeof(MinuteOfTheYear_t)); + *timeStamp = ssmDoc["SignalStatus"]["minuteOfYear"].asInt64(); + ssmPtr->timeStamp = timeStamp; + } + + SignalStatusList_t *statusPtr = (SignalStatusList_t *)calloc(1, sizeof(SignalStatusList_t)); + SignalStatus *signalStatus = (SignalStatus *)calloc(1, sizeof(SignalStatus)); + + // populate SignalStatusMessage::status::id + if (ssmDoc["SignalStatus"].isMember("intersectionID") && ssmDoc["SignalStatus"]["intersectionID"].isNumeric()) + { + signalStatus->id.id = ssmDoc["SignalStatus"]["intersectionID"].asInt64(); + } + + // populate SignalStatusMessage::status::sequenceNumber + if (ssmDoc["SignalStatus"].isMember("sequenceNumber") && ssmDoc["SignalStatus"]["sequenceNumber"].isNumeric()) + { + signalStatus->sequenceNumber = ssmDoc["SignalStatus"]["sequenceNumber"].asInt64(); + } + + // populate SignalStatusMessage::status::sigStatus + if (ssmDoc["SignalStatus"].isMember("requestorInfo") && ssmDoc["SignalStatus"]["requestorInfo"].isArray()) + { + Json::Value requesterJsonArr = ssmDoc["SignalStatus"]["requestorInfo"]; + for (auto itr = requesterJsonArr.begin(); itr != requesterJsonArr.end(); itr++) + { + SignalStatusPackage *signalStatusPackage = (SignalStatusPackage *)calloc(1, sizeof(SignalStatusPackage)); + populateSigStatusPackage(signalStatusPackage, itr); + asn_sequence_add(&signalStatus->sigStatus.list.array, signalStatusPackage); + } // Populate signal status package + } + + asn_sequence_add(&statusPtr->list.array, signalStatus); + ssmPtr->status = *statusPtr; + } + catch(exception &ex) + { + PLOG(logERROR) << "Cannot read JSON file." << std::endl; + } + } + + void JsonToJ2735SSMConverter::populateSigStatusPackage(SignalStatusPackage *signalStatusPackage, Json::Value::iterator itr) const + { + signalStatusPackage->requester = (SignalRequesterInfo *)calloc(1, sizeof(SignalRequesterInfo)); + + // populate SignalStatusMessage::status::sigStatus::requester::request + if (itr->isMember("requestID") && (*itr)["requestID"].isNumeric()) + { + signalStatusPackage->requester->request = (*itr)["requestID"].asInt64(); + } + + // populate SignalStatusMessage::status::sigStatus::requester::id + if (itr->isMember("vehicleID") && (*itr)["vehicleID"].isNumeric()) + { + signalStatusPackage->requester->id.choice.stationID = (*itr)["vehicleID"].asInt64(); + signalStatusPackage->requester->id.present = VehicleID_PR_stationID; + } + + // populate SignalStatusMessage::status::sigStatus::requester::sequenceNumber + if (itr->isMember("msgCount") && (*itr)["msgCount"].isNumeric()) + { + signalStatusPackage->requester->sequenceNumber = (*itr)["msgCount"].asInt64(); + } + + // populate SignalStatusMessage::status::sigStatus::requester::role + if (itr->isMember("basicVehicleRole") && (*itr)["basicVehicleRole"].isNumeric()) + { + signalStatusPackage->requester->role = (BasicVehicleRole_t *)calloc(1, sizeof(BasicVehicleRole_t)); + *signalStatusPackage->requester->role = (*itr)["basicVehicleRole"].asInt64(); + } + + // populate SignalStatusMessage::status::sigStatus::inboundOn + if (itr->isMember("inBoundLaneID") && (*itr)["inBoundLaneID"].isNumeric()) + { + signalStatusPackage->inboundOn.present = IntersectionAccessPoint_PR_lane; + signalStatusPackage->inboundOn.choice.lane = (*itr)["inBoundLaneID"].asInt64(); + } + else if (itr->isMember("inBoundApproachID") && (*itr)["inBoundApproachID"].isNumeric()) + { + signalStatusPackage->inboundOn.present = IntersectionAccessPoint_PR_approach; + signalStatusPackage->inboundOn.choice.approach = (*itr)["inBoundApproachID"].asInt64(); + } + + // populate SignalStatusMessage::status::sigStatus::status + if (itr->isMember("priorityRequestStatus") && (*itr)["priorityRequestStatus"].isNumeric()) + { + signalStatusPackage->status = (*itr)["priorityRequestStatus"].asInt64(); + } + + // populate SignalStatusMessage::status::sigStatus::duration + if (itr->isMember("ETA_Duration") && (*itr)["ETA_Duration"].isNumeric()) + { + signalStatusPackage->duration = (DSecond_t *)calloc(1, sizeof(DSecond_t)); + *signalStatusPackage->duration = (*itr)["ETA_Duration"].asInt64(); + } + + // populate SignalStatusMessage::status::sigStatus::minute + if (itr->isMember("ETA_Minute") && (*itr)["ETA_Minute"].isNumeric()) + { + signalStatusPackage->minute = (DSecond_t *)calloc(1, sizeof(DSecond_t)); + *signalStatusPackage->minute = (*itr)["ETA_Minute"].asInt64(); + } + + // populate SignalStatusMessage::status::sigStatus::second + if (itr->isMember("ETA_Second") && (*itr)["ETA_Second"].isNumeric()) + { + signalStatusPackage->minute = (DSecond_t *)calloc(1, sizeof(DSecond_t)); + *signalStatusPackage->minute = (*itr)["ETA_Second"].asInt64(); + } + } + void JsonToJ2735SSMConverter::encodeSSM(const std::shared_ptr &ssmPtr, tmx::messages::SsmEncodedMessage &encodedSSM) const + { + tmx::messages::MessageFrameMessage frame(ssmPtr); + encodedSSM.set_data(tmx::messages::TmxJ2735EncodedMessage::encode_j2735_message>(frame)); + free(frame.get_j2735_data().get()); + } +} diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/JsonToJ2735SSMConverter.h b/src/v2i-hub/CARMAStreetsPlugin/src/JsonToJ2735SSMConverter.h new file mode 100644 index 000000000..64d90c816 --- /dev/null +++ b/src/v2i-hub/CARMAStreetsPlugin/src/JsonToJ2735SSMConverter.h @@ -0,0 +1,49 @@ +#ifndef JSONTOJ2735SSMCONVERTER_H_ +#define JSONTOJ2735SSMCONVERTER_H_ +#include "jsoncpp/json/json.h" +#include +#include +#include + +using namespace std; +using namespace tmx::utils; +namespace CARMAStreetsPlugin +{ + class JsonToJ2735SSMConverter + { + public: + // Constructor to initialize object + JsonToJ2735SSMConverter() = default; + /*** + * @brief Parse Json string into Json object + * @param jsonstring that is consumed from a kafka topic + * @param JsonObject that is populated by the input json string + * @return boolean. True if parse successfully, false if parse with errors + */ + bool parseJsonString(const string &consumed_msg, Json::Value &ssmDoc) const; + /*** + * @brief Read SSM Json document and populate J2735 SSM message + * @param ssmDoc Json object that contains the SSM information in Json format + * @param ssmPtr a pointer to J2735 message object. This object will be populated with the values in the Json object + */ + void toJ2735SSM(const Json::Value &ssmDoc, std::shared_ptr ssmPtr) const; + /*** + * @brief Encode J2735 SSM + * @param Pointer to J2735 SSM object + * @param Encoded J2735 SSM + */ + void encodeSSM(const std::shared_ptr &ssmPtr, tmx::messages::SsmEncodedMessage &encodedSSM) const; + + /*** + * @brief Populate J2735 SignalStatusPackage object with Json Value + * @param Pointer J2735 SignalStatusPackage object + * @param Json::Value SignalstatusPackage Json string iterator + */ + void populateSigStatusPackage(SignalStatusPackage *signalStatusPackage, Json::Value::iterator itr) const; + + ~JsonToJ2735SSMConverter() = default; + }; + +} + +#endif \ No newline at end of file diff --git a/src/v2i-hub/CARMAStreetsPlugin/test/test_JsonToJ2735SSMConverter.cpp b/src/v2i-hub/CARMAStreetsPlugin/test/test_JsonToJ2735SSMConverter.cpp new file mode 100644 index 000000000..2bd9fb405 --- /dev/null +++ b/src/v2i-hub/CARMAStreetsPlugin/test/test_JsonToJ2735SSMConverter.cpp @@ -0,0 +1,102 @@ +#include +#include +#include +#include +#include "jsoncpp/json/json.h" +#include "JsonToJ2735SSMConverter.h" +using namespace std; + +namespace CARMAStreetsPlugin +{ + class test_JsonToJ2735SSMConverter : public ::testing::Test + { + }; + + TEST_F(test_JsonToJ2735SSMConverter, parseJsonString) + { + JsonToJ2735SSMConverter converter; + // Json string refer to: https://github.com/mmitss/mmitss-az/blob/master/src/mrp/priority-request-server/SSM.json + string valid_json_str = "{\"MessageType\":\"SSM\",\"SignalStatus\":{\"intersectionID\":1003,\"minuteOfYear\":345239,\"msOfMinute\":54000,\"regionalID\":0,\"requestorInfo\":[{\"ETA_Duration\":2000,\"ETA_Minute\":1,\"ETA_Second\":20.001000000000005,\"basicVehicleRole\":16,\"inBoundLaneID\":8,\"msgCount\":2,\"priorityRequestStatus\":4,\"requestID\":5,\"vehicleID\":601},{\"ETA_Duration\":2000,\"ETA_Minute\":1,\"ETA_Second\":22.001000000000005,\"basicVehicleRole\":16,\"inBoundLaneID\":12,\"msgCount\":6,\"priorityRequestStatus\":4,\"requestID\":5,\"vehicleID\":605},{\"ETA_Duration\":2000,\"ETA_Minute\":1,\"ETA_Second\":22.001000000000005,\"basicVehicleRole\":9,\"inBoundLaneID\":12,\"msgCount\":6,\"priorityRequestStatus\":0,\"requestID\":5,\"vehicleID\":610}],\"sequenceNumber\":4,\"updateCount\":4},\"noOfRequest\":3}"; + Json::Value root; + bool result = converter.parseJsonString(valid_json_str, root); + ASSERT_TRUE(result); + string invalid_json = "invalid"; + result = converter.parseJsonString(invalid_json, root); + ASSERT_FALSE(result); + } + + TEST_F(test_JsonToJ2735SSMConverter, toJ2735SSM) + { + JsonToJ2735SSMConverter converter; + // Json string refer to: https://github.com/mmitss/mmitss-az/blob/master/src/mrp/priority-request-server/SSM.json + // Json has three requestors + string valid_json_str = "{\"MessageType\":\"SSM\",\"SignalStatus\":{\"intersectionID\":1003,\"minuteOfYear\":345239,\"msOfMinute\":54000,\"regionalID\":0,\"requestorInfo\":[{\"ETA_Duration\":2000,\"ETA_Minute\":1,\"ETA_Second\":20.001000000000005,\"basicVehicleRole\":16,\"inBoundApproachID\":8,\"msgCount\":2,\"priorityRequestStatus\":4,\"requestID\":5,\"vehicleID\":601},{\"ETA_Duration\":2000,\"ETA_Minute\":1,\"ETA_Second\":22.001000000000005,\"basicVehicleRole\":16,\"inBoundLaneID\":12,\"msgCount\":6,\"priorityRequestStatus\":4,\"requestID\":5,\"vehicleID\":605},{\"ETA_Duration\":2000,\"ETA_Minute\":1,\"ETA_Second\":22.001000000000005,\"basicVehicleRole\":9,\"inBoundLaneID\":12,\"msgCount\":6,\"priorityRequestStatus\":0,\"requestID\":5,\"vehicleID\":610}],\"sequenceNumber\":4,\"updateCount\":4},\"noOfRequest\":3}"; + Json::Value root; + bool result = converter.parseJsonString(valid_json_str, root); + ASSERT_TRUE(result); + auto ssmPtr = std::make_shared(); + converter.toJ2735SSM(root, ssmPtr); + ASSERT_EQ(1, ssmPtr->status.list.count); + ASSERT_EQ(1003, ssmPtr->status.list.array[0]->id.id); + ASSERT_EQ(54000, ssmPtr->second); + ASSERT_EQ(345239, *ssmPtr->timeStamp); + ASSERT_EQ(3, ssmPtr->status.list.array[0]->sigStatus.list.count); + ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SignalStatusMessage, ssmPtr.get()); + } + + TEST_F(test_JsonToJ2735SSMConverter, toJ2735SSM_2) + { + JsonToJ2735SSMConverter converter; + // Json has one requestor + string valid_json_str = "{\"MessageType\":\"SSM\",\"SignalStatus\":{\"intersectionID\":1004,\"minuteOfYear\":345240,\"msOfMinute\":54001,\"regionalID\":0,\"requestorInfo\":[{\"ETA_Duration\":2000,\"ETA_Minute\":1,\"ETA_Second\":20.001000000000005,\"basicVehicleRole\":16,\"inBoundLaneID\":8,\"msgCount\":2,\"priorityRequestStatus\":4,\"requestID\":5,\"vehicleID\":601}],\"sequenceNumber\":4,\"updateCount\":4},\"noOfRequest\":3}"; + Json::Value root; + bool result = converter.parseJsonString(valid_json_str, root); + ASSERT_TRUE(result); + auto ssmPtr = std::make_shared(); + converter.toJ2735SSM(root, ssmPtr); + ASSERT_EQ(1, ssmPtr->status.list.count); + ASSERT_EQ(1004, ssmPtr->status.list.array[0]->id.id); + ASSERT_EQ(54001, ssmPtr->second); + ASSERT_EQ(345240, *ssmPtr->timeStamp); + ASSERT_EQ(1, ssmPtr->status.list.array[0]->sigStatus.list.count); + ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SignalStatusMessage, ssmPtr.get()); + } + + TEST_F(test_JsonToJ2735SSMConverter, toJ2735SSM_3) + { + JsonToJ2735SSMConverter converter; + // Json has NO requestor + string valid_json_str = "{\"MessageType\":\"SSM\",\"SignalStatus\":{\"intersectionID\":1004,\"minuteOfYear\":345240,\"msOfMinute\":54001,\"regionalID\":0,\"sequenceNumber\":4,\"updateCount\":4},\"noOfRequest\":3}"; + Json::Value root; + bool result = converter.parseJsonString(valid_json_str, root); + ASSERT_TRUE(result); + auto ssmPtr = std::make_shared(); + converter.toJ2735SSM(root, ssmPtr); + ASSERT_EQ(0, ssmPtr->status.list.array[0]->sigStatus.list.count); + ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SignalStatusMessage, ssmPtr.get()); + } + + TEST_F(test_JsonToJ2735SSMConverter, encodeSSM) + { + JsonToJ2735SSMConverter converter; + // Json has one requestor + string valid_json_str = "{\"MessageType\":\"SSM\",\"SignalStatus\":{\"intersectionID\":1004,\"minuteOfYear\":345240,\"msOfMinute\":54001,\"regionalID\":0,\"requestorInfo\":[{\"ETA_Duration\":2000,\"ETA_Minute\":1,\"ETA_Second\":20.001000000000005,\"basicVehicleRole\":16,\"inBoundLaneID\":8,\"msgCount\":2,\"priorityRequestStatus\":4,\"requestID\":5,\"vehicleID\":601}],\"sequenceNumber\":4,\"updateCount\":4},\"noOfRequest\":3}"; + Json::Value root; + converter.parseJsonString(valid_json_str, root); + auto ssmPtr = std::make_shared(); + converter.toJ2735SSM(root, ssmPtr); + tmx::messages::SsmEncodedMessage encodedSSM; + converter.encodeSSM(ssmPtr, encodedSSM); + string expected_payload_hex = "001e18454498d2f1001007d8054a000004b20a090010000280fa08"; + ASSERT_EQ(expected_payload_hex, encodedSSM.get_payload_str()); + + // Json has three requestors + valid_json_str = "{\"MessageType\":\"SSM\",\"SignalStatus\":{\"intersectionID\":1003,\"minuteOfYear\":345239,\"msOfMinute\":54000,\"regionalID\":0,\"requestorInfo\":[{\"ETA_Duration\":2000,\"ETA_Minute\":1,\"ETA_Second\":20.001000000000005,\"basicVehicleRole\":16,\"inBoundLaneID\":8,\"msgCount\":2,\"priorityRequestStatus\":4,\"requestID\":5,\"vehicleID\":601},{\"ETA_Duration\":2000,\"ETA_Minute\":1,\"ETA_Second\":22.001000000000005,\"basicVehicleRole\":16,\"inBoundLaneID\":12,\"msgCount\":6,\"priorityRequestStatus\":4,\"requestID\":5,\"vehicleID\":605},{\"ETA_Duration\":2000,\"ETA_Minute\":1,\"ETA_Second\":22.001000000000005,\"basicVehicleRole\":9,\"inBoundLaneID\":12,\"msgCount\":6,\"priorityRequestStatus\":0,\"requestID\":5,\"vehicleID\":610}],\"sequenceNumber\":4,\"updateCount\":4},\"noOfRequest\":3}"; + converter.parseJsonString(valid_json_str, root); + converter.toJ2735SSM(root, ssmPtr); + converter.encodeSSM(ssmPtr, encodedSSM); + expected_payload_hex = "001e35454497d2f0001007d6254a000004b20a090010000280fa08a940000097414320030000581f4115280000131028624060000b03e800"; + ASSERT_EQ(expected_payload_hex, encodedSSM.get_payload_str()); + ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SignalStatusMessage, ssmPtr.get()); + } +} \ No newline at end of file diff --git a/src/v2i-hub/ImmediateForwardPlugin/manifest.json b/src/v2i-hub/ImmediateForwardPlugin/manifest.json index 5b142b2b1..43e6603c4 100644 --- a/src/v2i-hub/ImmediateForwardPlugin/manifest.json +++ b/src/v2i-hub/ImmediateForwardPlugin/manifest.json @@ -14,7 +14,7 @@ }, { "key": "Messages_Destination_1", - "default": "{ \"Messages\": [ { \"TmxType\": \"SPAT-P\", \"SendType\": \"SPAT\", \"PSID\": \"0x8002\", \"Channel\": \"183\" }, { \"TmxType\": \"MAP-P\", \"SendType\": \"MAP\", \"PSID\": \"0x8002\", \"Channel\": \"183\" }, { \"TmxType\": \"PSM\", \"SendType\": \"PSM\", \"PSID\": \"0x8002\", \"Channel\": \"183\" } ,{ \"TmxType\": \"TMSG07\", \"SendType\": \"TMSG07\", \"PSID\": \"0x8002\", \"Channel\": \"183\" },{ \"TmxType\": \"TMSG03-P\", \"SendType\": \"TMSG03-P\", \"PSID\": \"0xBFEE\", \"Channel\": \"183\" },{ \"TmxType\": \"TMSG05-P\", \"SendType\": \"TMSG05-P\", \"PSID\": \"0x8003\", \"Channel\": \"183\" }] }", + "default": "{ \"Messages\": [ { \"TmxType\": \"SPAT-P\", \"SendType\": \"SPAT\", \"PSID\": \"0x8002\", \"Channel\": \"183\" }, { \"TmxType\": \"MAP-P\", \"SendType\": \"MAP\", \"PSID\": \"0x8002\", \"Channel\": \"183\" }, { \"TmxType\": \"PSM\", \"SendType\": \"PSM\", \"PSID\": \"0x8002\", \"Channel\": \"183\" } ,{ \"TmxType\": \"TMSG07\", \"SendType\": \"TMSG07\", \"PSID\": \"0x8002\", \"Channel\": \"183\" },{ \"TmxType\": \"TMSG03-P\", \"SendType\": \"TMSG03-P\", \"PSID\": \"0xBFEE\", \"Channel\": \"183\" },{ \"TmxType\": \"TMSG05-P\", \"SendType\": \"TMSG05-P\", \"PSID\": \"0x8003\", \"Channel\": \"183\" }, { \"TmxType\": \"SSM\", \"SendType\": \"SSM\", \"PSID\": \"0x8002\", \"Channel\": \"183\" },] }", "description": "JSON data defining the message types, PSIDs, and channel number for messages forwarded to the V2X radio at destination 1." }, {