Skip to content

Commit

Permalink
V2XHub / CARMA Streets to process and broadcast SSM (#533)
Browse files Browse the repository at this point in the history
<!-- Thanks for the contribution, this is awesome. -->

# PR Details
## Description
For the POC TIM/TSP use case, priority-eligible vehicles will broadcast
J2375 Signal Request Message (SRM) to the MMITSS Roadside Processor
(MRP), which in turn will send J2375 Signal Status Message (SSM) in
acknowledgement.

Since MRP is being integrated with CARMA Streets, CARMA Streets / V2XHub
needs to #1) consume the json SSM (used by the MMITSS MRP internally,
see
[here](https://github.com/mmitss/mmitss-az/blob/master/src/mrp/priority-request-server/Readme.md)
and
[here](https://github.com/mmitss/mmitss-az/blob/master/src/mrp/priority-request-server/SSM.json))
sent on the CARMA Streets Kafka broker, #2) encode it according to the
J2375 [ASN.1
](https://leidoscorpus.sharepoint.us/:f:/s/STR/EpU-cLOWhUtGvsddHUSf8I4Bf7-Ot8oMIY4yf2m7x2-Uag?e=0Rvpug)schema,
and #3) broadcast J2375 SSMs through RSU.

For #1) the V2XHub carma-streets-plugin needs to pull the json SSM from
CARMA Steets Kafa broker.

For #3) the V2XHub immediate-forward-plugin configuration needs to to be
updated.

<!--- Describe your changes in detail -->

## Related Issue
#534
<!--- This project only accepts pull requests related to open issues -->
<!--- If suggesting a new feature or change, please discuss it in an
issue first -->
<!--- If fixing a bug, there should be an issue describing it with steps
to reproduce -->
<!--- Please link to the issue here: -->

## Motivation and Context
TM/TSP
<!--- Why is this change required? What problem does it solve? -->

## How Has This Been Tested?
Unit test
<!--- Please describe in detail how you tested your changes. -->
<!--- Include details of your testing environment, and the tests you ran
to -->
<!--- see how your change affects other areas of the code, etc. -->

## Types of changes

<!--- What types of changes does your code introduce? Put an `x` in all
the boxes that apply: -->

- [ ] Defect fix (non-breaking change that fixes an issue)
- [x] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that cause existing functionality
to change)

## Checklist:

<!--- Go over all the following points, and put an `x` in all the boxes
that apply. -->
<!--- If you're unsure about any of these, don't hesitate to ask. We're
here to help! -->

- [ ] I have added any new packages to the sonar-scanner.properties file
- [ ] My change requires a change to the documentation.
- [ ] I have updated the documentation accordingly.
- [x] I have read the **CONTRIBUTING** document.
[V2XHUB Contributing
Guide](https://github.com/usdot-fhwa-OPS/V2X-Hub/blob/develop/Contributing.md)
- [ ] I have added tests to cover my changes.
- [ ] All new and existing tests passed.
  • Loading branch information
dan-du-car authored May 24, 2023
1 parent cad66ed commit 842aa5b
Show file tree
Hide file tree
Showing 8 changed files with 428 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/v2i-hub/CARMAStreetsPlugin/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ TARGET_LINK_LIBRARIES (${PROJECT_NAME} tmxutils rdkafka++ jsoncpp)
#############
enable_testing()
include_directories(${PROJECT_SOURCE_DIR}/src)
add_library(${PROJECT_NAME}_lib src/J2735MapToJsonConverter.cpp src/JsonToJ2735SpatConverter.cpp src/J2735ToSRMJsonConverter.cpp)
add_library(${PROJECT_NAME}_lib src/J2735MapToJsonConverter.cpp src/JsonToJ2735SSMConverter.cpp src/JsonToJ2735SpatConverter.cpp src/J2735ToSRMJsonConverter.cpp)
target_link_libraries(${PROJECT_NAME}_lib PUBLIC ${TMXAPI_LIBRARIES}
${ASN_J2735_LIBRARIES}
${MYSQL_LIBRARIES}
Expand Down
10 changes: 10 additions & 0 deletions src/v2i-hub/CARMAStreetsPlugin/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@
"default": "modified_spat",
"description": "Apache Kafka topic plugin will transmit message to."
},
{
"key": "SsmTopic",
"default": "v2xhub_ssm_sub",
"description": "Apache Kafka topic plugin will transmit message to."
},
{
"key": "SsmConsumerGroupId",
"default": "v2xhub_ssm",
"description": "Apache Kafka consumer group ID for spat consumer."
},
{
"key": "SpatConsumerGroupId",
"default": "v2xhub_spat",
Expand Down
89 changes: 87 additions & 2 deletions src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ void CARMAStreetsPlugin::UpdateConfigSettings() {
GetConfigValue<string>("SchedulingPlanTopic", _subscribeToSchedulingPlanTopic);
GetConfigValue<string>("SchedulingPlanConsumerGroupId", _subscribeToSchedulingPlanConsumerGroupId);
GetConfigValue<string>("SpatTopic", _subscribeToSpatTopic);
GetConfigValue<string>("SsmTopic", _subscribeToSsmTopic);
GetConfigValue<string>("SpatConsumerGroupId", _subscribeToSpatConsumerGroupId);
GetConfigValue<string>("SsmConsumerGroupId", _subscribeToSSMConsumerGroupId);
GetConfigValue<string>("BsmTopic", _transmitBSMTopic);
GetConfigValue<string>("MobilityOperationTopic", _transmitMobilityOperationTopic);
GetConfigValue<string>("MobilityPathTopic", _transmitMobilityPathTopic);
Expand All @@ -68,6 +70,8 @@ void CARMAStreetsPlugin::UpdateConfigSettings() {
kafka_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
kafka_conf_sp_consumer = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
kafka_conf_spat_consumer = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
kafka_conf_ssm_consumer = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);


PLOG(logDEBUG) <<"Attempting to connect to " << kafkaConnectString;
if ((kafka_conf->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK)) {
Expand All @@ -87,28 +91,35 @@ void CARMAStreetsPlugin::UpdateConfigSettings() {
if (kafka_conf_sp_consumer->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK
|| (kafka_conf_sp_consumer->set("group.id", _subscribeToSchedulingPlanConsumerGroupId, error_string) != RdKafka::Conf::CONF_OK)
|| (kafka_conf_spat_consumer->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK)
|| (kafka_conf_spat_consumer->set("group.id", _subscribeToSpatConsumerGroupId, error_string) != RdKafka::Conf::CONF_OK)) {
|| (kafka_conf_spat_consumer->set("group.id", _subscribeToSpatConsumerGroupId, error_string) != RdKafka::Conf::CONF_OK)
|| (kafka_conf_ssm_consumer->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK)
|| (kafka_conf_ssm_consumer->set("group.id", _subscribeToSSMConsumerGroupId, error_string) != RdKafka::Conf::CONF_OK)
) {
PLOG(logERROR) <<"Setting kafka config group.id options failed with error:" << error_string << "\n" <<"Exiting with exit code 1";
exit(1);
} else {
PLOG(logDEBUG) <<"Kafka config group.id options set successfully";
}
kafka_conf_sp_consumer->set("enable.partition.eof", "true", error_string);
kafka_conf_spat_consumer->set("enable.partition.eof", "true", error_string);
kafka_conf_ssm_consumer->set("enable.partition.eof", "true", error_string);

_scheduing_plan_kafka_consumer = RdKafka::KafkaConsumer::create(kafka_conf_sp_consumer, error_string);
_spat_kafka_consumer = RdKafka::KafkaConsumer::create(kafka_conf_spat_consumer, error_string);
_ssm_kafka_consumer = RdKafka::KafkaConsumer::create(kafka_conf_ssm_consumer, error_string);

if ( !_scheduing_plan_kafka_consumer || !_spat_kafka_consumer) {
if ( !_scheduing_plan_kafka_consumer || !_spat_kafka_consumer || !_ssm_kafka_consumer) {
PLOG(logERROR) << "Failed to create Kafka consumers: " << error_string << std::endl;
exit(1);
}
PLOG(logDEBUG) << "Created consumer " << _scheduing_plan_kafka_consumer->name() << std::endl;
PLOG(logDEBUG) << "Created consumer " << _spat_kafka_consumer->name() << std::endl;
PLOG(logDEBUG) << "Created consumer " << _ssm_kafka_consumer->name() << std::endl;

//create kafka topics
RdKafka::Conf *tconf_spat = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
RdKafka::Conf *tconf_sp = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
RdKafka::Conf *tconf_ssm = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if(!tconf_spat && !tconf_sp)
{
PLOG(logERROR) << "RDKafka create topic conf failed ";
Expand All @@ -129,11 +140,20 @@ void CARMAStreetsPlugin::UpdateConfigSettings() {
return ;
}

_ssm_topic = RdKafka::Topic::create(_ssm_kafka_consumer,_subscribeToSsmTopic,tconf_ssm,error_string);
if(!_ssm_topic)
{
PLOG(logERROR) << "RDKafka create SSM topic failed:" << error_string;
return ;
}

delete tconf_sp;
delete tconf_spat;
delete tconf_ssm;

boost::thread thread_schpl(&CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic, this);
boost::thread thread_spat(&CARMAStreetsPlugin::SubscribeSpatKafkaTopic, this);
boost::thread thread_ssm(&CARMAStreetsPlugin::SubscribeSSMKafkaTopic, this);
}

void CARMAStreetsPlugin::OnConfigChanged(const char *key, const char *value) {
Expand Down Expand Up @@ -659,6 +679,71 @@ void CARMAStreetsPlugin::SubscribeSpatKafkaTopic(){
}
}

void CARMAStreetsPlugin::SubscribeSSMKafkaTopic(){

if(_subscribeToSsmTopic.length() > 0)
{
PLOG(logDEBUG) << "SubscribeSSMKafkaTopics:" <<_subscribeToSsmTopic << std::endl;
std::vector<std::string> topics;
topics.emplace_back(_subscribeToSsmTopic);

RdKafka::ErrorCode err = _ssm_kafka_consumer->subscribe(topics);
if (err)
{
PLOG(logERROR) << "Failed to subscribe to " << topics.size() << " topics: " << RdKafka::err2str(err) << std::endl;
return;
}
//Initialize Json to J2735 SSM convertor
JsonToJ2735SSMConverter ssm_convertor;
while (true)
{
auto msg = _ssm_kafka_consumer->consume( 500 );
if( msg->err() == RdKafka::ERR_NO_ERROR )
{
auto payload_str = static_cast<const char *>( msg->payload() );
if(msg->len() > 0)
{
PLOG(logDEBUG) << "consumed message payload: " << payload_str <<std::endl;
Json::Value ssmDoc;
auto parse_sucessful = ssm_convertor.parseJsonString(payload_str, ssmDoc);
if( !parse_sucessful )
{
PLOG(logERROR) << "Error parsing payload: " << payload_str << std::endl;
SetStatus<uint>(Key_SSMMessageSkipped, ++_ssmMessageSkipped);
continue;
}
//Convert the SSM JSON string into J2735 SSM message and encode it.
auto ssm_ptr = std::make_shared<SignalStatusMessage>();
ssm_convertor.toJ2735SSM(ssmDoc, ssm_ptr);
tmx::messages::SsmEncodedMessage ssmEncodedMsg;
try
{
ssm_convertor.encodeSSM(ssm_ptr, ssmEncodedMsg);
}
catch (TmxException &ex)
{
// Skip messages that fail to encode.
PLOG(logERROR) << "Failed to encoded SSM message : \n" << payload_str << std::endl << "Exception encountered: "
<< ex.what() << std::endl;
ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SignalStatusMessage, ssm_ptr.get());
SetStatus<uint>(Key_SSMMessageSkipped, ++_ssmMessageSkipped);
continue;
}

ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SignalStatusMessage, ssm_ptr.get());
PLOG(logDEBUG) << "ssmEncodedMsg: " << ssmEncodedMsg;

//Broadcast the encoded SSM message
ssmEncodedMsg.set_flags(IvpMsgFlags_RouteDSRC);
ssmEncodedMsg.addDsrcMetadata(0x8002);
BroadcastMessage(static_cast<routeable_message &>(ssmEncodedMsg));
}
}
delete msg;
}
}

}
bool CARMAStreetsPlugin::getEncodedtsm3( tsm3EncodedMessage *tsm3EncodedMsg, Json::Value metadata, Json::Value payload_json )
{
try
Expand Down
23 changes: 22 additions & 1 deletion src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "J2735MapToJsonConverter.h"
#include "JsonToJ2735SpatConverter.h"
#include "J2735ToSRMJsonConverter.h"
#include "JsonToJ2735SSMConverter.h"



Expand Down Expand Up @@ -62,6 +63,10 @@ class CARMAStreetsPlugin: public PluginClient {
* @brief Subcribe to SPAT Kafka topic created by carma-streets
*/
void SubscribeSpatKafkaTopic();
/**
* @brief Subcribe to SSM Kafka topic created by carma-streets
*/
void SubscribeSSMKafkaTopic();

bool getEncodedtsm3(tsm3EncodedMessage *tsm3EncodedMsg, Json::Value metadata, Json::Value payload_json);
/**
Expand All @@ -79,7 +84,9 @@ class CARMAStreetsPlugin: public PluginClient {
std::string _subscribeToSchedulingPlanTopic;
std::string _subscribeToSchedulingPlanConsumerGroupId;
std::string _subscribeToSpatTopic;
std::string _subscribeToSsmTopic;
std::string _subscribeToSpatConsumerGroupId;
std::string _subscribeToSSMConsumerGroupId;
std::string _transmitMobilityPathTopic;
std::string _transmitBSMTopic;
std::string _transmitMAPTopic;
Expand All @@ -89,11 +96,14 @@ class CARMAStreetsPlugin: public PluginClient {
RdKafka::Conf *kafka_conf;
RdKafka::Conf *kafka_conf_spat_consumer;
RdKafka::Conf *kafka_conf_sp_consumer;
RdKafka::Conf *kafka_conf_ssm_consumer;
RdKafka::Producer *kafka_producer;
RdKafka::KafkaConsumer *_scheduing_plan_kafka_consumer;
RdKafka::KafkaConsumer *_spat_kafka_consumer;
RdKafka::KafkaConsumer *_ssm_kafka_consumer;
RdKafka::Topic *_scheduing_plan_topic;
RdKafka::Topic *_spat_topic;
RdKafka::Topic *_ssm_topic;
std::vector<std::string> _strategies;
tmx::messages::tsm3Message *_tsm3Message{NULL};
std::mutex data_lock;
Expand Down Expand Up @@ -130,6 +140,7 @@ class CARMAStreetsPlugin: public PluginClient {
/**
* @brief Status label for Mobility Operation messages skipped due to errors.
*/

const char* Key_MobilityOperationMessageSkipped = "Mobility Operation messages skipped due to errors.";

/**
Expand All @@ -156,7 +167,17 @@ class CARMAStreetsPlugin: public PluginClient {
* @brief Count for BSM messages skipped due to errors.
*/
uint _bsmMessageSkipped = 0;


/**
* @brief Status label for SSM messages skipped due to errors.
*/
const char* Key_SSMMessageSkipped = "SSM messages skipped due to errors.";

/**
* @brief Count for SSM messages skipped due to errors.
*/
uint _ssmMessageSkipped = 0;

/**
* @brief Intersection Id for intersection
*/
Expand Down
Loading

0 comments on commit 842aa5b

Please sign in to comment.