From a1c9c924a2279e5edc48385346a598f0d0d1480c Mon Sep 17 00:00:00 2001 From: dev Date: Wed, 23 Aug 2023 14:52:59 -0400 Subject: [PATCH] Added return statement for init method of kafka producer to avoid undefine behavior Replaced error logs and null returns with throwing exceptions in Kafka consumer producer initialization in carma-streets plugin Added stop command for devcontainer setup Removed consumer group ID configuration, using plugin name instead --- .devcontainer/devcontainer.json | 4 ++- .../src/kafka/kafka_producer_worker.cpp | 1 + src/v2i-hub/CARMAStreetsPlugin/manifest.json | 15 ----------- .../src/CARMAStreetsPlugin.cpp | 27 +++++++++---------- 4 files changed, 17 insertions(+), 30 deletions(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 09591f645..b306616f5 100755 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -35,7 +35,9 @@ // "forwardPorts": [], // Uncomment the next line to run commands after the container is created - for example installing curl. - "postCreateCommand": "container/database.sh && container/library.sh && container/setup.sh && ldconfig" + "postCreateCommand": "container/database.sh && container/library.sh && container/setup.sh && ldconfig", + + "shutdownAction": "stopCompose" // Uncomment to use the Docker CLI from inside the container. See https://aka.ms/vscode-remote/samples/docker-from-docker. // "mounts": [ diff --git a/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp b/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp index 02d0a0cbb..ae074e4aa 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp +++ b/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp @@ -93,6 +93,7 @@ namespace tmx::utils delete conf; FILE_LOG(logINFO) << "Created producer: " << _producer->name() << std::endl; + return true; } bool kafka_producer_worker::init_topic() diff --git a/src/v2i-hub/CARMAStreetsPlugin/manifest.json b/src/v2i-hub/CARMAStreetsPlugin/manifest.json index f0daaf9a9..5faa1f8e2 100644 --- a/src/v2i-hub/CARMAStreetsPlugin/manifest.json +++ b/src/v2i-hub/CARMAStreetsPlugin/manifest.json @@ -97,11 +97,6 @@ "default": "v2xhub_scheduling_plan_sub", "description": "Apache Kafka topic plugin will transmit message to." }, - { - "key": "SchedulingPlanConsumerGroupId", - "default": "v2xhub_scheduling_plan", - "description": "Apache Kafka consumer group ID for scheduling plan consumer." - }, { "key": "SpatTopic", "default": "modified_spat", @@ -112,16 +107,6 @@ "default": "v2xhub_ssm_sub", "description": "Apache Kafka topic plugin will transmit message to." }, - { - "key": "SsmConsumerGroupId", - "default": "v2xhub_ssm", - "description": "Apache Kafka consumer group ID for spat consumer." - }, - { - "key": "SpatConsumerGroupId", - "default": "v2xhub_spat", - "description": "Apache Kafka consumer group ID for spat consumer." - }, { "key": "SimSensorDetectedObjTopic", "default": "v2xhub_sim_sensor_detected_object", diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp index c5bb9a7e6..ed04c2112 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp @@ -41,11 +41,8 @@ void CARMAStreetsPlugin::UpdateConfigSettings() { GetConfigValue("KafkaBrokerPort", _kafkaBrokerPort); // GetConfigValue("SchedulingPlanTopic", _subscribeToSchedulingPlanTopic); - GetConfigValue("SchedulingPlanConsumerGroupId", _subscribeToSchedulingPlanConsumerGroupId); GetConfigValue("SpatTopic", _subscribeToSpatTopic); GetConfigValue("SsmTopic", _subscribeToSsmTopic); - GetConfigValue("SpatConsumerGroupId", _subscribeToSpatConsumerGroupId); - GetConfigValue("SsmConsumerGroupId", _subscribeToSSMConsumerGroupId); GetConfigValue("BsmTopic", _transmitBSMTopic); GetConfigValue("MobilityOperationTopic", _transmitMobilityOperationTopic); GetConfigValue("MobilityPathTopic", _transmitMobilityPathTopic); @@ -76,28 +73,28 @@ void CARMAStreetsPlugin::InitKafkaConsumerProducers() _kafka_producer_ptr = client.create_producer(kafkaConnectString); if(!_kafka_producer_ptr->init_producer()) { - return; + throw TmxException("Failed to create Kafka producer."); + } //Consumers - _spat_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSpatTopic,_subscribeToSpatConsumerGroupId); - _scheduing_plan_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSchedulingPlanTopic,_subscribeToSchedulingPlanConsumerGroupId); - _ssm_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSsmTopic,_subscribeToSSMConsumerGroupId); + _spat_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSpatTopic,this->_name); + _scheduing_plan_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSchedulingPlanTopic, this->_name); + _ssm_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSsmTopic,this->_name); if(!_scheduing_plan_kafka_consumer_ptr || !_spat_kafka_consumer_ptr || !_ssm_kafka_consumer_ptr) { - PLOG(logERROR) <<"Failed to create Kafka consumers."; - return; + throw TmxException("Failed to create Kafka consumers."); } PLOG(logDEBUG) <<"Kafka consumers created"; if(!_spat_kafka_consumer_ptr->init() || !_scheduing_plan_kafka_consumer_ptr->init() || !_ssm_kafka_consumer_ptr->init()) { - PLOG(logERROR) <<"Kafka consumers init() failed!"; - return; + throw TmxException("Kafka consumers init() failed!"); } - + // TODO: Replace with tmxutil ThreadTimer or some other more appropriate Thread wrapper. boost::thread thread_schpl(&CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic, this); boost::thread thread_spat(&CARMAStreetsPlugin::SubscribeSpatKafkaTopic, this); - boost::thread thread_ssm(&CARMAStreetsPlugin::SubscribeSSMKafkaTopic, this); + boost::thread thread_ssm(&CARMAStreetsPlugin::SubscribeSSMKafkaTopic, this); + } void CARMAStreetsPlugin::OnConfigChanged(const char *key, const char *value) { @@ -468,6 +465,7 @@ void CARMAStreetsPlugin::OnStateChange(IvpPluginState state) { void CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic() { + // TODO: Update methods to represent consuming a single message from Kafka topic if(_subscribeToSchedulingPlanTopic.length() > 0) { PLOG(logDEBUG) << "SubscribeSchedulingPlanKafkaTopics:" <<_subscribeToSchedulingPlanTopic << std::endl; @@ -525,6 +523,7 @@ void CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic() } void CARMAStreetsPlugin::SubscribeSpatKafkaTopic(){ + // TODO: Update methods to represent consuming a single message from Kafka topic if(_subscribeToSpatTopic.length() > 0) { PLOG(logDEBUG) << "SubscribeSpatKafkaTopics:" <<_subscribeToSpatTopic << std::endl; @@ -578,7 +577,7 @@ void CARMAStreetsPlugin::SubscribeSpatKafkaTopic(){ } void CARMAStreetsPlugin::SubscribeSSMKafkaTopic(){ - + // TODO: Update methods to represent consuming a single message from Kafka topic if(_subscribeToSsmTopic.length() > 0) { PLOG(logDEBUG) << "SubscribeSSMKafkaTopics:" <<_subscribeToSsmTopic << std::endl;