Skip to content

Commit

Permalink
Added conversion between JSON and ASN.1 C-struct for SDSM with Kafka …
Browse files Browse the repository at this point in the history
…updates (#562)

<!-- Thanks for the contribution, this is awesome. -->

# PR Details
## Description

Added functionality to convert SDSMs from inbound json strings to
outbound ASN.1 styled C-structs to facilitate communication over the
CARMA Streets plugin in both directions.

## Related Issue

CDAR-308

## Motivation and Context

<!--- Why is this change required? What problem does it solve? -->

## How Has This Been Tested?

Tested locally in VSCode with new unit tests for the conversion between
jsons and the ASN.1 C-struct messages.

## Types of changes

<!--- What types of changes does your code introduce? Put an `x` in all
the boxes that apply: -->

- [X] Defect fix (non-breaking change that fixes an issue)
- [X] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that cause existing functionality
to change)

## Checklist:

<!--- Go over all the following points, and put an `x` in all the boxes
that apply. -->
<!--- If you're unsure about any of these, don't hesitate to ask. We're
here to help! -->

- [ ] I have added any new packages to the sonar-scanner.properties file
- [ ] My change requires a change to the documentation.
- [ ] I have updated the documentation accordingly.
- [X] I have read the **CONTRIBUTING** document.
[V2XHUB Contributing
Guide](https://github.com/usdot-fhwa-OPS/V2X-Hub/blob/develop/Contributing.md)
- [X] I have added tests to cover my changes.
- [X] All new and existing tests passed.
  • Loading branch information
willjohnsonk authored Sep 11, 2023
1 parent a8c58fe commit 5328f89
Show file tree
Hide file tree
Showing 9 changed files with 728 additions and 21 deletions.
2 changes: 1 addition & 1 deletion src/v2i-hub/CARMAStreetsPlugin/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ TARGET_LINK_LIBRARIES (${PROJECT_NAME} tmxutils rdkafka++ jsoncpp)
#############
enable_testing()
include_directories(${PROJECT_SOURCE_DIR}/src)
add_library(${PROJECT_NAME}_lib src/J2735MapToJsonConverter.cpp src/JsonToJ2735SSMConverter.cpp src/JsonToJ2735SpatConverter.cpp src/J2735ToSRMJsonConverter.cpp src/J3224ToSDSMJsonConverter.cpp)
add_library(${PROJECT_NAME}_lib src/J2735MapToJsonConverter.cpp src/JsonToJ2735SSMConverter.cpp src/JsonToJ2735SpatConverter.cpp src/J2735ToSRMJsonConverter.cpp src/J3224ToSDSMJsonConverter.cpp src/JsonToJ3224SDSMConverter.cpp)
target_link_libraries(${PROJECT_NAME}_lib PUBLIC ${TMXAPI_LIBRARIES}
${ASN_J2735_LIBRARIES}
${MYSQL_LIBRARIES}
Expand Down
10 changes: 10 additions & 0 deletions src/v2i-hub/CARMAStreetsPlugin/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,16 @@
"key": "SimSensorDetectedObjTopic",
"default": "v2xhub_sim_sensor_detected_object",
"description": "Apache Kafka topic plugin will transmit simulated sensor detected object to."
},
{
"key": "SdsmSubscribeTopic",
"default": "v2xhub_sdsm_sub",
"description": "Apache Kafka topic plugin that will subscribe to SDSM streams."
},
{
"key": "SdsmTransmitTopic",
"default": "v2xhub_sdsm_tra",
"description": "Apache Kafka topic plugin that will transmit SDSMs."
}

]
Expand Down
74 changes: 72 additions & 2 deletions src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ void CARMAStreetsPlugin::UpdateConfigSettings() {
GetConfigValue<string>("MapTopic", _transmitMAPTopic);
GetConfigValue<string>("SRMTopic", _transmitSRMTopic);
GetConfigValue<string>("SimSensorDetectedObjTopic", _transmitSimSensorDetectedObjTopic);
GetConfigValue<string>("SdsmSubscribeTopic", _subscribeToSdsmTopic);
GetConfigValue<string>("SdsmTransmitTopic", _transmitSDSMTopic);
// Populate strategies config
string config;
GetConfigValue<string>("MobilityOperationStrategies", config);
Expand Down Expand Up @@ -81,19 +83,21 @@ void CARMAStreetsPlugin::InitKafkaConsumerProducers()
_spat_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSpatTopic,this->_name);
_scheduing_plan_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSchedulingPlanTopic, this->_name);
_ssm_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSsmTopic,this->_name);
if(!_scheduing_plan_kafka_consumer_ptr || !_spat_kafka_consumer_ptr || !_ssm_kafka_consumer_ptr)
_sdsm_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSdsmTopic,this->_name);
if(!_scheduing_plan_kafka_consumer_ptr || !_spat_kafka_consumer_ptr || !_ssm_kafka_consumer_ptr || !_sdsm_kafka_consumer_ptr)
{
throw TmxException("Failed to create Kafka consumers.");
}
PLOG(logDEBUG) <<"Kafka consumers created";
if(!_spat_kafka_consumer_ptr->init() || !_scheduing_plan_kafka_consumer_ptr->init() || !_ssm_kafka_consumer_ptr->init())
if(!_spat_kafka_consumer_ptr->init() || !_scheduing_plan_kafka_consumer_ptr->init() || !_ssm_kafka_consumer_ptr->init() || !_sdsm_kafka_consumer_ptr->init())
{
throw TmxException("Kafka consumers init() failed!");
}
// TODO: Replace with tmxutil ThreadTimer or some other more appropriate Thread wrapper.
boost::thread thread_schpl(&CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic, this);
boost::thread thread_spat(&CARMAStreetsPlugin::SubscribeSpatKafkaTopic, this);
boost::thread thread_ssm(&CARMAStreetsPlugin::SubscribeSSMKafkaTopic, this);
boost::thread thread_sdsm(&CARMAStreetsPlugin::SubscribeSDSMKafkaTopic, this);

}

Expand Down Expand Up @@ -458,6 +462,19 @@ void CARMAStreetsPlugin::HandleMapMessage(MapDataMessage &msg, routeable_message
produce_kafka_msg(message, _transmitMAPTopic);
}

void CARMAStreetsPlugin::HandleSDSMMessage(SdsmMessage &msg, routeable_message &routeableMsg)
{
std::shared_ptr<SensorDataSharingMessage> sdsmMsgPtr = msg.get_j2735_data();
PLOG(logDEBUG) << "Detected object count: " << sdsmMsgPtr->objects.list.count << std::endl;
Json::Value sdsmJson;
Json::StreamWriterBuilder builder;
J3224ToSDSMJsonConverter jsonConverter;
jsonConverter.convertJ3224ToSDSMJSON(sdsmMsgPtr, sdsmJson);
PLOG(logDEBUG) << "sdsmJson: " << sdsmJson << std::endl;
const std::string message = Json::writeString(builder, sdsmJson);
produce_kafka_msg(message, _transmitSDSMTopic);
}

void CARMAStreetsPlugin::produce_kafka_msg(const string& message, const string& topic_name) const
{
_kafka_producer_ptr->send(message, topic_name);
Expand Down Expand Up @@ -638,6 +655,59 @@ void CARMAStreetsPlugin::SubscribeSSMKafkaTopic(){

}

void CARMAStreetsPlugin::SubscribeSDSMKafkaTopic(){
// TODO: Update methods to represent consuming a single message from Kafka topic
if(_subscribeToSdsmTopic.length() > 0)
{
PLOG(logDEBUG) << "SubscribeSDSMKafkaTopics:" <<_subscribeToSdsmTopic << std::endl;
_sdsm_kafka_consumer_ptr->subscribe();
//Initialize Json to J3224 SDSM convertor
JsonToJ3224SDSMConverter sdsm_convertor;
while (_sdsm_kafka_consumer_ptr->is_running())
{
std::string payload_str = _sdsm_kafka_consumer_ptr->consume(500);
if(payload_str.length() > 0)
{
PLOG(logDEBUG) << "consumed message payload: " << payload_str <<std::endl;
Json::Value sdsmDoc;
auto parse_sucessful = sdsm_convertor.parseJsonString(payload_str, sdsmDoc);
if( !parse_sucessful )
{
PLOG(logERROR) << "Error parsing payload: " << payload_str << std::endl;
SetStatus<uint>(Key_SDSMMessageSkipped, ++_sdsmMessageSkipped);
continue;
}
//Convert the SDSM JSON string into J3224 SDSM message and encode it.
auto sdsm_ptr = std::make_shared<SensorDataSharingMessage>();
sdsm_convertor.convertJsonToSDSM(sdsmDoc, sdsm_ptr);
tmx::messages::SdsmEncodedMessage sdsmEncodedMsg;
try
{
sdsm_convertor.encodeSDSM(sdsm_ptr, sdsmEncodedMsg);
}
catch (TmxException &ex)
{
// Skip messages that fail to encode.
PLOG(logERROR) << "Failed to encoded SDSM message : \n" << payload_str << std::endl << "Exception encountered: "
<< ex.what() << std::endl;
ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SensorDataSharingMessage, sdsm_ptr.get()); // may be unnecessary
SetStatus<uint>(Key_SDSMMessageSkipped, ++_sdsmMessageSkipped);
continue;
}

ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SensorDataSharingMessage, sdsm_ptr.get()); // same as above
PLOG(logDEBUG) << "sdsmEncodedMsg: " << sdsmEncodedMsg;

//Broadcast the encoded SDSM message
sdsmEncodedMsg.set_flags(IvpMsgFlags_RouteDSRC);
sdsmEncodedMsg.addDsrcMetadata(0x8002);
BroadcastMessage(static_cast<routeable_message &>(sdsmEncodedMsg));
}
}
}

}

void CARMAStreetsPlugin::HandleSimulatedSensorDetectedMessage(simulation::SensorDetectedObject &msg, routeable_message &routeableMsg)
{
PLOG(logDEBUG) << "Produce sensor detected message in JSON format: " << msg.to_string() <<std::endl;
Expand Down
25 changes: 25 additions & 0 deletions src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <kafka/kafka_consumer_worker.h>
#include "JsonToJ2735SSMConverter.h"
#include <simulation/SensorDetectedObject.h>
#include "JsonToJ3224SDSMConverter.h"
#include "J3224ToSDSMJsonConverter.h"
#include "PluginClientClockAware.h"


Expand Down Expand Up @@ -67,6 +69,12 @@ class CARMAStreetsPlugin: public PluginClientClockAware {
* @param routeableMsg
*/
void HandleMapMessage(MapDataMessage &msg, routeable_message &routeableMsg);
/**
* @brief Subscribes to incoming ASN.1, C-Struct formatted SDSMs generated from broadcasting RSUs. These SDSM C-Structs are then converted to JSON to be forwarded to CARMA Streets/Kafka by the handler.
* @param msg The J3224 SDSM received from the internal
* @param routeableMsg
*/
void HandleSDSMMessage(SdsmMessage &msg, routeable_message &routeableMsg);
/**
* @brief Subscribe to SRM message received from RSU and publish the message to a Kafka topic
*/
Expand All @@ -83,6 +91,10 @@ class CARMAStreetsPlugin: public PluginClientClockAware {
* @brief Subcribe to SSM Kafka topic created by carma-streets
*/
void SubscribeSSMKafkaTopic();
/**
* @brief Subcribe to SDSM Kafka topic created by carma-streets
*/
void SubscribeSDSMKafkaTopic();

bool getEncodedtsm3(tsm3EncodedMessage *tsm3EncodedMsg, Json::Value metadata, Json::Value payload_json);
/**
Expand All @@ -105,19 +117,22 @@ class CARMAStreetsPlugin: public PluginClientClockAware {
std::string _subscribeToSchedulingPlanConsumerGroupId;
std::string _subscribeToSpatTopic;
std::string _subscribeToSsmTopic;
std::string _subscribeToSdsmTopic;
std::string _subscribeToSpatConsumerGroupId;
std::string _subscribeToSSMConsumerGroupId;
std::string _transmitMobilityPathTopic;
std::string _transmitBSMTopic;
std::string _transmitMAPTopic;
std::string _transmitSRMTopic;
std::string _transmitSimSensorDetectedObjTopic;
std::string _transmitSDSMTopic;
std::string _kafkaBrokerIp;
std::string _kafkaBrokerPort;
std::shared_ptr<kafka_producer_worker> _kafka_producer_ptr;
std::shared_ptr<kafka_consumer_worker> _spat_kafka_consumer_ptr;
std::shared_ptr<kafka_consumer_worker> _scheduing_plan_kafka_consumer_ptr;
std::shared_ptr<kafka_consumer_worker> _ssm_kafka_consumer_ptr;
std::shared_ptr<kafka_consumer_worker> _sdsm_kafka_consumer_ptr;
std::vector<std::string> _strategies;
tmx::messages::tsm3Message *_tsm3Message{NULL};
std::mutex data_lock;
Expand Down Expand Up @@ -192,6 +207,16 @@ class CARMAStreetsPlugin: public PluginClientClockAware {
*/
uint _ssmMessageSkipped = 0;

/**
* @brief Status label for SDSM messages skipped due to errors.
*/
const char* Key_SDSMMessageSkipped = "SDSM messages skipped due to errors.";

/**
* @brief Count for SDSM messages skipped due to errors.
*/
uint _sdsmMessageSkipped = 0;

/**
* @brief Intersection Id for intersection
*/
Expand Down
18 changes: 9 additions & 9 deletions src/v2i-hub/CARMAStreetsPlugin/src/J3224ToSDSMJsonConverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace CARMAStreetsPlugin

Json::Value refPos;
refPos["lat"] = sdsmMsgPtr->refPos.lat;
refPos["Long"] = sdsmMsgPtr->refPos.Long;
refPos["long"] = sdsmMsgPtr->refPos.Long;
refPos["elevation"] = *sdsmMsgPtr->refPos.elevation;
SDSMDataJson["ref_pos"] = refPos;

Expand All @@ -61,8 +61,8 @@ namespace CARMAStreetsPlugin
auto det_object = det_obj_list.list.array[i];

// Detected object common data
detectedObjectJson["detected_object_data"]["detected_object_common_data"]["object_type"] = det_object->detObjCommon.objType;
detectedObjectJson["detected_object_data"]["detected_object_common_data"]["object_type_conf"] = det_object->detObjCommon.objTypeCfd;
detectedObjectJson["detected_object_data"]["detected_object_common_data"]["obj_type"] = det_object->detObjCommon.objType;
detectedObjectJson["detected_object_data"]["detected_object_common_data"]["obj_type_cfd"] = det_object->detObjCommon.objTypeCfd;
detectedObjectJson["detected_object_data"]["detected_object_common_data"]["object_id"] = det_object->detObjCommon.objectID;
detectedObjectJson["detected_object_data"]["detected_object_common_data"]["measurement_time"] = det_object->detObjCommon.measurementTime;
detectedObjectJson["detected_object_data"]["detected_object_common_data"]["time_confidence"] = det_object->detObjCommon.timeConfidence;
Expand Down Expand Up @@ -156,13 +156,13 @@ namespace CARMAStreetsPlugin
break;

case DetectedObjectOptionalData_PR_detObst:
optionalDataJson["detected_obst_data"]["obst_size"]["width"] = det_object->detObjOptData->choice.detObst.obstSize.width;
optionalDataJson["detected_obst_data"]["obst_size"]["length"] = det_object->detObjOptData->choice.detObst.obstSize.length;
optionalDataJson["detected_obst_data"]["obst_size"]["height"] = *det_object->detObjOptData->choice.detObst.obstSize.height; // optional
optionalDataJson["detected_obstacle_data"]["obst_size"]["width"] = det_object->detObjOptData->choice.detObst.obstSize.width;
optionalDataJson["detected_obstacle_data"]["obst_size"]["length"] = det_object->detObjOptData->choice.detObst.obstSize.length;
optionalDataJson["detected_obstacle_data"]["obst_size"]["height"] = *det_object->detObjOptData->choice.detObst.obstSize.height; // optional

optionalDataJson["detected_obst_data"]["obst_size"]["width_confidence"] = det_object->detObjOptData->choice.detObst.obstSizeConfidence.widthConfidence;
optionalDataJson["detected_obst_data"]["obst_size"]["length_confidence"] = det_object->detObjOptData->choice.detObst.obstSizeConfidence.lengthConfidence;
optionalDataJson["detected_obst_data"]["obst_size"]["height_confidence"] = *det_object->detObjOptData->choice.detObst.obstSizeConfidence.heightConfidence;
optionalDataJson["detected_obstacle_data"]["obst_size_confidence"]["width_confidence"] = det_object->detObjOptData->choice.detObst.obstSizeConfidence.widthConfidence;
optionalDataJson["detected_obstacle_data"]["obst_size_confidence"]["length_confidence"] = det_object->detObjOptData->choice.detObst.obstSizeConfidence.lengthConfidence;
optionalDataJson["detected_obstacle_data"]["obst_size_confidence"]["height_confidence"] = *det_object->detObjOptData->choice.detObst.obstSizeConfidence.heightConfidence;
break;
}
detectedObjectJson["detected_object_data"]["detected_object_optional_data"] = optionalDataJson;
Expand Down
Loading

0 comments on commit 5328f89

Please sign in to comment.