Skip to content

Commit

Permalink
Added return statement for init method of kafka producer
Browse files Browse the repository at this point in the history
to avoid undefine behavior

Replaced error logs and null returns with throwing exceptions
in Kafka consumer producer initialization in carma-streets plugin

Added stop command for devcontainer setup

Removed consumer group ID configuration, using plugin name instead
  • Loading branch information
paulbourelly999 committed Aug 23, 2023
1 parent d4a370f commit a1c9c92
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 30 deletions.
4 changes: 3 additions & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
// "forwardPorts": [],

// Uncomment the next line to run commands after the container is created - for example installing curl.
"postCreateCommand": "container/database.sh && container/library.sh && container/setup.sh && ldconfig"
"postCreateCommand": "container/database.sh && container/library.sh && container/setup.sh && ldconfig",

"shutdownAction": "stopCompose"

// Uncomment to use the Docker CLI from inside the container. See https://aka.ms/vscode-remote/samples/docker-from-docker.
// "mounts": [
Expand Down
1 change: 1 addition & 0 deletions src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ namespace tmx::utils
delete conf;

FILE_LOG(logINFO) << "Created producer: " << _producer->name() << std::endl;
return true;
}

bool kafka_producer_worker::init_topic()
Expand Down
15 changes: 0 additions & 15 deletions src/v2i-hub/CARMAStreetsPlugin/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,6 @@
"default": "v2xhub_scheduling_plan_sub",
"description": "Apache Kafka topic plugin will transmit message to."
},
{
"key": "SchedulingPlanConsumerGroupId",
"default": "v2xhub_scheduling_plan",
"description": "Apache Kafka consumer group ID for scheduling plan consumer."
},
{
"key": "SpatTopic",
"default": "modified_spat",
Expand All @@ -112,16 +107,6 @@
"default": "v2xhub_ssm_sub",
"description": "Apache Kafka topic plugin will transmit message to."
},
{
"key": "SsmConsumerGroupId",
"default": "v2xhub_ssm",
"description": "Apache Kafka consumer group ID for spat consumer."
},
{
"key": "SpatConsumerGroupId",
"default": "v2xhub_spat",
"description": "Apache Kafka consumer group ID for spat consumer."
},
{
"key": "SimSensorDetectedObjTopic",
"default": "v2xhub_sim_sensor_detected_object",
Expand Down
27 changes: 13 additions & 14 deletions src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,8 @@ void CARMAStreetsPlugin::UpdateConfigSettings() {
GetConfigValue<string>("KafkaBrokerPort", _kafkaBrokerPort);
//
GetConfigValue<string>("SchedulingPlanTopic", _subscribeToSchedulingPlanTopic);
GetConfigValue<string>("SchedulingPlanConsumerGroupId", _subscribeToSchedulingPlanConsumerGroupId);
GetConfigValue<string>("SpatTopic", _subscribeToSpatTopic);
GetConfigValue<string>("SsmTopic", _subscribeToSsmTopic);
GetConfigValue<string>("SpatConsumerGroupId", _subscribeToSpatConsumerGroupId);
GetConfigValue<string>("SsmConsumerGroupId", _subscribeToSSMConsumerGroupId);
GetConfigValue<string>("BsmTopic", _transmitBSMTopic);
GetConfigValue<string>("MobilityOperationTopic", _transmitMobilityOperationTopic);
GetConfigValue<string>("MobilityPathTopic", _transmitMobilityPathTopic);
Expand Down Expand Up @@ -76,28 +73,28 @@ void CARMAStreetsPlugin::InitKafkaConsumerProducers()
_kafka_producer_ptr = client.create_producer(kafkaConnectString);
if(!_kafka_producer_ptr->init_producer())
{
return;
throw TmxException("Failed to create Kafka producer.");

}

//Consumers
_spat_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSpatTopic,_subscribeToSpatConsumerGroupId);
_scheduing_plan_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSchedulingPlanTopic,_subscribeToSchedulingPlanConsumerGroupId);
_ssm_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSsmTopic,_subscribeToSSMConsumerGroupId);
_spat_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSpatTopic,this->_name);
_scheduing_plan_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSchedulingPlanTopic, this->_name);
_ssm_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSsmTopic,this->_name);
if(!_scheduing_plan_kafka_consumer_ptr || !_spat_kafka_consumer_ptr || !_ssm_kafka_consumer_ptr)
{
PLOG(logERROR) <<"Failed to create Kafka consumers.";
return;
throw TmxException("Failed to create Kafka consumers.");
}
PLOG(logDEBUG) <<"Kafka consumers created";
if(!_spat_kafka_consumer_ptr->init() || !_scheduing_plan_kafka_consumer_ptr->init() || !_ssm_kafka_consumer_ptr->init())
{
PLOG(logERROR) <<"Kafka consumers init() failed!";
return;
throw TmxException("Kafka consumers init() failed!");
}

// TODO: Replace with tmxutil ThreadTimer or some other more appropriate Thread wrapper.
boost::thread thread_schpl(&CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic, this);
boost::thread thread_spat(&CARMAStreetsPlugin::SubscribeSpatKafkaTopic, this);
boost::thread thread_ssm(&CARMAStreetsPlugin::SubscribeSSMKafkaTopic, this);
boost::thread thread_ssm(&CARMAStreetsPlugin::SubscribeSSMKafkaTopic, this);

}

void CARMAStreetsPlugin::OnConfigChanged(const char *key, const char *value) {
Expand Down Expand Up @@ -468,6 +465,7 @@ void CARMAStreetsPlugin::OnStateChange(IvpPluginState state) {

void CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic()
{
// TODO: Update methods to represent consuming a single message from Kafka topic
if(_subscribeToSchedulingPlanTopic.length() > 0)
{
PLOG(logDEBUG) << "SubscribeSchedulingPlanKafkaTopics:" <<_subscribeToSchedulingPlanTopic << std::endl;
Expand Down Expand Up @@ -525,6 +523,7 @@ void CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic()
}

void CARMAStreetsPlugin::SubscribeSpatKafkaTopic(){
// TODO: Update methods to represent consuming a single message from Kafka topic
if(_subscribeToSpatTopic.length() > 0)
{
PLOG(logDEBUG) << "SubscribeSpatKafkaTopics:" <<_subscribeToSpatTopic << std::endl;
Expand Down Expand Up @@ -578,7 +577,7 @@ void CARMAStreetsPlugin::SubscribeSpatKafkaTopic(){
}

void CARMAStreetsPlugin::SubscribeSSMKafkaTopic(){

// TODO: Update methods to represent consuming a single message from Kafka topic
if(_subscribeToSsmTopic.length() > 0)
{
PLOG(logDEBUG) << "SubscribeSSMKafkaTopics:" <<_subscribeToSsmTopic << std::endl;
Expand Down

0 comments on commit a1c9c92

Please sign in to comment.