Skip to content

Commit

Permalink
Fix CARMAStreets Plugin undefined behavior for Kafka Producer (#559)
Browse files Browse the repository at this point in the history
<!-- Thanks for the contribution, this is awesome. -->

# PR Details
## Description
Changes : 
- Added return statement for init method of kafka producer to avoid
undefine behavior
- Replaced error logs and null returns with throwing exceptions in Kafka
consumer producer initialization in carma-streets plugin
- Added stop command for devcontainer setup
- Removed consumer group ID configuration, using plugin name instead
<!--- Describe your changes in detail -->

## Related Issue
#558 
<!--- This project only accepts pull requests related to open issues -->
<!--- If suggesting a new feature or change, please discuss it in an
issue first -->
<!--- If fixing a bug, there should be an issue describing it with steps
to reproduce -->
<!--- Please link to the issue here: -->

## Motivation and Context

<!--- Why is this change required? What problem does it solve? -->

## How Has This Been Tested?

<!--- Please describe in detail how you tested your changes. -->
<!--- Include details of your testing environment, and the tests you ran
to -->
<!--- see how your change affects other areas of the code, etc. -->

## Types of changes

<!--- What types of changes does your code introduce? Put an `x` in all
the boxes that apply: -->

- [x] Defect fix (non-breaking change that fixes an issue)
- [ ] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that cause existing functionality
to change)

## Checklist:

<!--- Go over all the following points, and put an `x` in all the boxes
that apply. -->
<!--- If you're unsure about any of these, don't hesitate to ask. We're
here to help! -->

- [ ] I have added any new packages to the sonar-scanner.properties file
- [ ] My change requires a change to the documentation.
- [ ] I have updated the documentation accordingly.
- [x] I have read the **CONTRIBUTING** document.
[V2XHUB Contributing
Guide](https://github.com/usdot-fhwa-OPS/V2X-Hub/blob/develop/Contributing.md)
- [ ] I have added tests to cover my changes.
- [x] All new and existing tests passed.
  • Loading branch information
paulbourelly999 authored Aug 24, 2023
1 parent e1b7cf8 commit aae33e6
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 30 deletions.
4 changes: 3 additions & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
// "forwardPorts": [],

// Uncomment the next line to run commands after the container is created - for example installing curl.
"postCreateCommand": "container/database.sh && container/library.sh && container/setup.sh && ldconfig"
"postCreateCommand": "container/database.sh && container/library.sh && container/setup.sh && ldconfig",

"shutdownAction": "stopCompose"

// Uncomment to use the Docker CLI from inside the container. See https://aka.ms/vscode-remote/samples/docker-from-docker.
// "mounts": [
Expand Down
1 change: 1 addition & 0 deletions src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ namespace tmx::utils
delete conf;

FILE_LOG(logINFO) << "Created producer: " << _producer->name() << std::endl;
return true;
}

bool kafka_producer_worker::init_topic()
Expand Down
15 changes: 0 additions & 15 deletions src/v2i-hub/CARMAStreetsPlugin/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,6 @@
"default": "v2xhub_scheduling_plan_sub",
"description": "Apache Kafka topic plugin will transmit message to."
},
{
"key": "SchedulingPlanConsumerGroupId",
"default": "v2xhub_scheduling_plan",
"description": "Apache Kafka consumer group ID for scheduling plan consumer."
},
{
"key": "SpatTopic",
"default": "modified_spat",
Expand All @@ -112,16 +107,6 @@
"default": "v2xhub_ssm_sub",
"description": "Apache Kafka topic plugin will transmit message to."
},
{
"key": "SsmConsumerGroupId",
"default": "v2xhub_ssm",
"description": "Apache Kafka consumer group ID for spat consumer."
},
{
"key": "SpatConsumerGroupId",
"default": "v2xhub_spat",
"description": "Apache Kafka consumer group ID for spat consumer."
},
{
"key": "SimSensorDetectedObjTopic",
"default": "v2xhub_sim_sensor_detected_object",
Expand Down
27 changes: 13 additions & 14 deletions src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,8 @@ void CARMAStreetsPlugin::UpdateConfigSettings() {
GetConfigValue<string>("KafkaBrokerPort", _kafkaBrokerPort);
//
GetConfigValue<string>("SchedulingPlanTopic", _subscribeToSchedulingPlanTopic);
GetConfigValue<string>("SchedulingPlanConsumerGroupId", _subscribeToSchedulingPlanConsumerGroupId);
GetConfigValue<string>("SpatTopic", _subscribeToSpatTopic);
GetConfigValue<string>("SsmTopic", _subscribeToSsmTopic);
GetConfigValue<string>("SpatConsumerGroupId", _subscribeToSpatConsumerGroupId);
GetConfigValue<string>("SsmConsumerGroupId", _subscribeToSSMConsumerGroupId);
GetConfigValue<string>("BsmTopic", _transmitBSMTopic);
GetConfigValue<string>("MobilityOperationTopic", _transmitMobilityOperationTopic);
GetConfigValue<string>("MobilityPathTopic", _transmitMobilityPathTopic);
Expand Down Expand Up @@ -76,28 +73,28 @@ void CARMAStreetsPlugin::InitKafkaConsumerProducers()
_kafka_producer_ptr = client.create_producer(kafkaConnectString);
if(!_kafka_producer_ptr->init_producer())
{
return;
throw TmxException("Failed to create Kafka producer.");

}

//Consumers
_spat_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSpatTopic,_subscribeToSpatConsumerGroupId);
_scheduing_plan_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSchedulingPlanTopic,_subscribeToSchedulingPlanConsumerGroupId);
_ssm_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSsmTopic,_subscribeToSSMConsumerGroupId);
_spat_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSpatTopic,this->_name);
_scheduing_plan_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSchedulingPlanTopic, this->_name);
_ssm_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSsmTopic,this->_name);
if(!_scheduing_plan_kafka_consumer_ptr || !_spat_kafka_consumer_ptr || !_ssm_kafka_consumer_ptr)
{
PLOG(logERROR) <<"Failed to create Kafka consumers.";
return;
throw TmxException("Failed to create Kafka consumers.");
}
PLOG(logDEBUG) <<"Kafka consumers created";
if(!_spat_kafka_consumer_ptr->init() || !_scheduing_plan_kafka_consumer_ptr->init() || !_ssm_kafka_consumer_ptr->init())
{
PLOG(logERROR) <<"Kafka consumers init() failed!";
return;
throw TmxException("Kafka consumers init() failed!");
}

// TODO: Replace with tmxutil ThreadTimer or some other more appropriate Thread wrapper.
boost::thread thread_schpl(&CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic, this);
boost::thread thread_spat(&CARMAStreetsPlugin::SubscribeSpatKafkaTopic, this);
boost::thread thread_ssm(&CARMAStreetsPlugin::SubscribeSSMKafkaTopic, this);
boost::thread thread_ssm(&CARMAStreetsPlugin::SubscribeSSMKafkaTopic, this);

}

void CARMAStreetsPlugin::OnConfigChanged(const char *key, const char *value) {
Expand Down Expand Up @@ -468,6 +465,7 @@ void CARMAStreetsPlugin::OnStateChange(IvpPluginState state) {

void CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic()
{
// TODO: Update methods to represent consuming a single message from Kafka topic
if(_subscribeToSchedulingPlanTopic.length() > 0)
{
PLOG(logDEBUG) << "SubscribeSchedulingPlanKafkaTopics:" <<_subscribeToSchedulingPlanTopic << std::endl;
Expand Down Expand Up @@ -525,6 +523,7 @@ void CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic()
}

void CARMAStreetsPlugin::SubscribeSpatKafkaTopic(){
// TODO: Update methods to represent consuming a single message from Kafka topic
if(_subscribeToSpatTopic.length() > 0)
{
PLOG(logDEBUG) << "SubscribeSpatKafkaTopics:" <<_subscribeToSpatTopic << std::endl;
Expand Down Expand Up @@ -578,7 +577,7 @@ void CARMAStreetsPlugin::SubscribeSpatKafkaTopic(){
}

void CARMAStreetsPlugin::SubscribeSSMKafkaTopic(){

// TODO: Update methods to represent consuming a single message from Kafka topic
if(_subscribeToSsmTopic.length() > 0)
{
PLOG(logDEBUG) << "SubscribeSSMKafkaTopics:" <<_subscribeToSsmTopic << std::endl;
Expand Down

0 comments on commit aae33e6

Please sign in to comment.