Skip to content

Commit

Permalink
Added SDSM conversion to C-struct, updated main plugin, minor fixes t…
Browse files Browse the repository at this point in the history
…o previous converter
  • Loading branch information
willjohnsonk committed Sep 7, 2023
1 parent db06ff8 commit 794866a
Show file tree
Hide file tree
Showing 8 changed files with 698 additions and 21 deletions.
2 changes: 1 addition & 1 deletion src/v2i-hub/CARMAStreetsPlugin/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ TARGET_LINK_LIBRARIES (${PROJECT_NAME} tmxutils rdkafka++ jsoncpp)
#############
enable_testing()
include_directories(${PROJECT_SOURCE_DIR}/src)
add_library(${PROJECT_NAME}_lib src/J2735MapToJsonConverter.cpp src/JsonToJ2735SSMConverter.cpp src/JsonToJ2735SpatConverter.cpp src/J2735ToSRMJsonConverter.cpp src/J3224ToSDSMJsonConverter.cpp)
add_library(${PROJECT_NAME}_lib src/J2735MapToJsonConverter.cpp src/JsonToJ2735SSMConverter.cpp src/JsonToJ2735SpatConverter.cpp src/J2735ToSRMJsonConverter.cpp src/J3224ToSDSMJsonConverter.cpp src/JsonToJ3224SDSMConverter.cpp)
target_link_libraries(${PROJECT_NAME}_lib PUBLIC ${TMXAPI_LIBRARIES}
${ASN_J2735_LIBRARIES}
${MYSQL_LIBRARIES}
Expand Down
74 changes: 72 additions & 2 deletions src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ void CARMAStreetsPlugin::UpdateConfigSettings() {
GetConfigValue<string>("MapTopic", _transmitMAPTopic);
GetConfigValue<string>("SRMTopic", _transmitSRMTopic);
GetConfigValue<string>("SimSensorDetectedObjTopic", _transmitSimSensorDetectedObjTopic);
GetConfigValue<string>("SdsmTopic", _subscribeToSdsmTopic);
GetConfigValue<string>("SdsmTopic", _transmitSDSMTopic);
// Populate strategies config
string config;
GetConfigValue<string>("MobilityOperationStrategies", config);
Expand Down Expand Up @@ -81,19 +83,21 @@ void CARMAStreetsPlugin::InitKafkaConsumerProducers()
_spat_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSpatTopic,this->_name);
_scheduing_plan_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSchedulingPlanTopic, this->_name);
_ssm_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSsmTopic,this->_name);
if(!_scheduing_plan_kafka_consumer_ptr || !_spat_kafka_consumer_ptr || !_ssm_kafka_consumer_ptr)
_sdsm_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSdsmTopic,this->_name);
if(!_scheduing_plan_kafka_consumer_ptr || !_spat_kafka_consumer_ptr || !_ssm_kafka_consumer_ptr || !_sdsm_kafka_consumer_ptr)
{
throw TmxException("Failed to create Kafka consumers.");
}
PLOG(logDEBUG) <<"Kafka consumers created";
if(!_spat_kafka_consumer_ptr->init() || !_scheduing_plan_kafka_consumer_ptr->init() || !_ssm_kafka_consumer_ptr->init())
if(!_spat_kafka_consumer_ptr->init() || !_scheduing_plan_kafka_consumer_ptr->init() || !_ssm_kafka_consumer_ptr->init() || !_sdsm_kafka_consumer_ptr->init())
{
throw TmxException("Kafka consumers init() failed!");
}
// TODO: Replace with tmxutil ThreadTimer or some other more appropriate Thread wrapper.
boost::thread thread_schpl(&CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic, this);
boost::thread thread_spat(&CARMAStreetsPlugin::SubscribeSpatKafkaTopic, this);
boost::thread thread_ssm(&CARMAStreetsPlugin::SubscribeSSMKafkaTopic, this);
boost::thread thread_sdsm(&CARMAStreetsPlugin::SubscribeSDSMKafkaTopic, this);

}

Expand Down Expand Up @@ -449,6 +453,19 @@ void CARMAStreetsPlugin::HandleMapMessage(MapDataMessage &msg, routeable_message
produce_kafka_msg(message, _transmitMAPTopic);
}

void CARMAStreetsPlugin::HandleSDSMMessage(SdsmMessage &msg, routeable_message &routeableMsg)
{
std::shared_ptr<SensorDataSharingMessage> sdsmMsgPtr = msg.get_j2735_data();
PLOG(logDEBUG) << "Detected object count: " << sdsmMsgPtr->objects.list.count << std::endl;
Json::Value sdsmJson;
Json::StreamWriterBuilder builder;
J3224ToSDSMJsonConverter jsonConverter;
jsonConverter.convertJ3224ToSDSMJSON(sdsmMsgPtr, sdsmJson);
PLOG(logDEBUG) << "sdsmJson: " << sdsmJson << std::endl;
const std::string message = Json::writeString(builder, sdsmJson);
produce_kafka_msg(message, _transmitSDSMTopic);
}

void CARMAStreetsPlugin::produce_kafka_msg(const string& message, const string& topic_name) const
{
_kafka_producer_ptr->send(message, topic_name);
Expand Down Expand Up @@ -629,6 +646,59 @@ void CARMAStreetsPlugin::SubscribeSSMKafkaTopic(){

}

void CARMAStreetsPlugin::SubscribeSDSMKafkaTopic(){
// TODO: Update methods to represent consuming a single message from Kafka topic
if(_subscribeToSdsmTopic.length() > 0)
{
PLOG(logDEBUG) << "SubscribeSDSMKafkaTopics:" <<_subscribeToSdsmTopic << std::endl;
_sdsm_kafka_consumer_ptr->subscribe();
//Initialize Json to J3224 SDSM convertor
JsonToJ3224SDSMConverter sdsm_convertor;
while (_sdsm_kafka_consumer_ptr->is_running())
{
std::string payload_str = _sdsm_kafka_consumer_ptr->consume(500);
if(payload_str.length() > 0)
{
PLOG(logDEBUG) << "consumed message payload: " << payload_str <<std::endl;
Json::Value sdsmDoc;
auto parse_sucessful = sdsm_convertor.parseJsonString(payload_str, sdsmDoc);
if( !parse_sucessful )
{
PLOG(logERROR) << "Error parsing payload: " << payload_str << std::endl;
SetStatus<uint>(Key_SDSMMessageSkipped, ++_sdsmMessageSkipped);
continue;
}
//Convert the SSM JSON string into J3224 SDSM message and encode it.
auto sdsm_ptr = std::make_shared<SensorDataSharingMessage>();
sdsm_convertor.convertJsonToSDSM(sdsmDoc, sdsm_ptr);
tmx::messages::SdsmEncodedMessage sdsmEncodedMsg;
try
{
sdsm_convertor.encodeSDSM(sdsm_ptr, sdsmEncodedMsg);
}
catch (TmxException &ex)
{
// Skip messages that fail to encode.
PLOG(logERROR) << "Failed to encoded SDSM message : \n" << payload_str << std::endl << "Exception encountered: "
<< ex.what() << std::endl;
ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SensorDataSharingMessage, sdsm_ptr.get()); // may be unnecessary
SetStatus<uint>(Key_SDSMMessageSkipped, ++_sdsmMessageSkipped);
continue;
}

ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SensorDataSharingMessage, sdsm_ptr.get()); // same as above
PLOG(logDEBUG) << "sdsmEncodedMsg: " << sdsmEncodedMsg;

//Broadcast the encoded SSM message
sdsmEncodedMsg.set_flags(IvpMsgFlags_RouteDSRC);
sdsmEncodedMsg.addDsrcMetadata(0x8002);
BroadcastMessage(static_cast<routeable_message &>(sdsmEncodedMsg));
}
}
}

}

void CARMAStreetsPlugin::HandleSimulatedSensorDetectedMessage(simulation::SensorDetectedObject &msg, routeable_message &routeableMsg)
{
PLOG(logDEBUG) << "Produce sensor detected message in JSON format: " << msg.to_string() <<std::endl;
Expand Down
25 changes: 25 additions & 0 deletions src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <kafka/kafka_consumer_worker.h>
#include "JsonToJ2735SSMConverter.h"
#include <simulation/SensorDetectedObject.h>
#include "JsonToJ3224SDSMConverter.h"
#include "J3224ToSDSMJsonConverter.h"
#include "PluginClientClockAware.h"


Expand Down Expand Up @@ -67,6 +69,12 @@ class CARMAStreetsPlugin: public PluginClientClockAware {
* @param routeableMsg
*/
void HandleMapMessage(MapDataMessage &msg, routeable_message &routeableMsg);
/**
* @brief Subscribe to SDSM broadcast.
* @param msg The J3224 SDSM received from the internal
* @param routeableMsg
*/
void HandleSDSMMessage(SdsmMessage &msg, routeable_message &routeableMsg);
/**
* @brief Subscribe to SRM message received from RSU and publish the message to a Kafka topic
*/
Expand All @@ -83,6 +91,10 @@ class CARMAStreetsPlugin: public PluginClientClockAware {
* @brief Subcribe to SSM Kafka topic created by carma-streets
*/
void SubscribeSSMKafkaTopic();
/**
* @brief Subcribe to SDSM Kafka topic created by carma-streets
*/
void SubscribeSDSMKafkaTopic();

bool getEncodedtsm3(tsm3EncodedMessage *tsm3EncodedMsg, Json::Value metadata, Json::Value payload_json);
/**
Expand All @@ -105,19 +117,22 @@ class CARMAStreetsPlugin: public PluginClientClockAware {
std::string _subscribeToSchedulingPlanConsumerGroupId;
std::string _subscribeToSpatTopic;
std::string _subscribeToSsmTopic;
std::string _subscribeToSdsmTopic;
std::string _subscribeToSpatConsumerGroupId;
std::string _subscribeToSSMConsumerGroupId;
std::string _transmitMobilityPathTopic;
std::string _transmitBSMTopic;
std::string _transmitMAPTopic;
std::string _transmitSRMTopic;
std::string _transmitSimSensorDetectedObjTopic;
std::string _transmitSDSMTopic;
std::string _kafkaBrokerIp;
std::string _kafkaBrokerPort;
std::shared_ptr<kafka_producer_worker> _kafka_producer_ptr;
std::shared_ptr<kafka_consumer_worker> _spat_kafka_consumer_ptr;
std::shared_ptr<kafka_consumer_worker> _scheduing_plan_kafka_consumer_ptr;
std::shared_ptr<kafka_consumer_worker> _ssm_kafka_consumer_ptr;
std::shared_ptr<kafka_consumer_worker> _sdsm_kafka_consumer_ptr;
std::vector<std::string> _strategies;
tmx::messages::tsm3Message *_tsm3Message{NULL};
std::mutex data_lock;
Expand Down Expand Up @@ -192,6 +207,16 @@ class CARMAStreetsPlugin: public PluginClientClockAware {
*/
uint _ssmMessageSkipped = 0;

/**
* @brief Status label for SDSM messages skipped due to errors.
*/
const char* Key_SDSMMessageSkipped = "SDSM messages skipped due to errors.";

/**
* @brief Count for SDSM messages skipped due to errors.
*/
uint _sdsmMessageSkipped = 0;

/**
* @brief Intersection Id for intersection
*/
Expand Down
20 changes: 11 additions & 9 deletions src/v2i-hub/CARMAStreetsPlugin/src/J3224ToSDSMJsonConverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ namespace CARMAStreetsPlugin
auto det_object = det_obj_list.list.array[i];

// Detected object common data
detectedObjectJson["detected_object_data"]["detected_object_common_data"]["object_type"] = det_object->detObjCommon.objType;
detectedObjectJson["detected_object_data"]["detected_object_common_data"]["object_type_conf"] = det_object->detObjCommon.objTypeCfd;
detectedObjectJson["detected_object_data"]["detected_object_common_data"]["obj_type"] = det_object->detObjCommon.objType;
detectedObjectJson["detected_object_data"]["detected_object_common_data"]["obj_type_cfd"] = det_object->detObjCommon.objTypeCfd;
detectedObjectJson["detected_object_data"]["detected_object_common_data"]["object_id"] = det_object->detObjCommon.objectID;
detectedObjectJson["detected_object_data"]["detected_object_common_data"]["measurement_time"] = det_object->detObjCommon.measurementTime;
detectedObjectJson["detected_object_data"]["detected_object_common_data"]["time_confidence"] = det_object->detObjCommon.timeConfidence;
Expand All @@ -84,7 +84,7 @@ namespace CARMAStreetsPlugin
detectedObjectJson["detected_object_data"]["detected_object_common_data"]["speed_z"] = *det_object->detObjCommon.speedZ;
detectedObjectJson["detected_object_data"]["detected_object_common_data"]["speed_confidence_z"] = *det_object->detObjCommon.speedConfidenceZ;

detectedObjectJson["detected_object_data"]["detected_object_common_data"]["accel_4_way"]["long"] = det_object->detObjCommon.accel4way->Long;
detectedObjectJson["detected_object_data"]["detected_object_common_data"]["accel_4_way"]["Long"] = det_object->detObjCommon.accel4way->Long;
detectedObjectJson["detected_object_data"]["detected_object_common_data"]["accel_4_way"]["lat"] = det_object->detObjCommon.accel4way->lat;
detectedObjectJson["detected_object_data"]["detected_object_common_data"]["accel_4_way"]["vert"] = det_object->detObjCommon.accel4way->vert;
detectedObjectJson["detected_object_data"]["detected_object_common_data"]["accel_4_way"]["yaw"] = det_object->detObjCommon.accel4way->yaw;
Expand Down Expand Up @@ -156,13 +156,13 @@ namespace CARMAStreetsPlugin
break;

case DetectedObjectOptionalData_PR_detObst:
optionalDataJson["detected_obst_data"]["obst_size"]["width"] = det_object->detObjOptData->choice.detObst.obstSize.width;
optionalDataJson["detected_obst_data"]["obst_size"]["length"] = det_object->detObjOptData->choice.detObst.obstSize.length;
optionalDataJson["detected_obst_data"]["obst_size"]["height"] = *det_object->detObjOptData->choice.detObst.obstSize.height; // optional
optionalDataJson["detected_obstacle_data"]["obst_size"]["width"] = det_object->detObjOptData->choice.detObst.obstSize.width;
optionalDataJson["detected_obstacle_data"]["obst_size"]["length"] = det_object->detObjOptData->choice.detObst.obstSize.length;
optionalDataJson["detected_obstacle_data"]["obst_size"]["height"] = *det_object->detObjOptData->choice.detObst.obstSize.height; // optional

optionalDataJson["detected_obst_data"]["obst_size"]["width_confidence"] = det_object->detObjOptData->choice.detObst.obstSizeConfidence.widthConfidence;
optionalDataJson["detected_obst_data"]["obst_size"]["length_confidence"] = det_object->detObjOptData->choice.detObst.obstSizeConfidence.lengthConfidence;
optionalDataJson["detected_obst_data"]["obst_size"]["height_confidence"] = *det_object->detObjOptData->choice.detObst.obstSizeConfidence.heightConfidence;
optionalDataJson["detected_obstacle_data"]["obst_size_confidence"]["width_confidence"] = det_object->detObjOptData->choice.detObst.obstSizeConfidence.widthConfidence;
optionalDataJson["detected_obstacle_data"]["obst_size_confidence"]["length_confidence"] = det_object->detObjOptData->choice.detObst.obstSizeConfidence.lengthConfidence;
optionalDataJson["detected_obstacle_data"]["obst_size_confidence"]["height_confidence"] = *det_object->detObjOptData->choice.detObst.obstSizeConfidence.heightConfidence;
break;
}
detectedObjectJson["detected_object_data"]["detected_object_optional_data"] = optionalDataJson;
Expand All @@ -173,5 +173,7 @@ namespace CARMAStreetsPlugin

}
}

//std::cout << SDSMDataJson.toStyledString() << std::endl;
}
}
Loading

0 comments on commit 794866a

Please sign in to comment.