From ad2311074491b8a301fc9a9f4d318871b0af50d2 Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Fri, 30 Jun 2023 20:55:24 +0000 Subject: [PATCH] address comments --- .devcontainer/docker-compose-vscode.yml | 2 +- .../src/kafka/kafka_consumer_worker.cpp | 7 +++++++ .../TmxUtils/src/kafka/kafka_consumer_worker.h | 1 + .../src/CARMAStreetsPlugin.cpp | 17 +++++++++++++---- .../CARMAStreetsPlugin/src/CARMAStreetsPlugin.h | 3 --- 5 files changed, 22 insertions(+), 8 deletions(-) diff --git a/.devcontainer/docker-compose-vscode.yml b/.devcontainer/docker-compose-vscode.yml index e79a86d98..2a419aa6c 100755 --- a/.devcontainer/docker-compose-vscode.yml +++ b/.devcontainer/docker-compose-vscode.yml @@ -72,7 +72,7 @@ services: DOCKER_HOST_IP: 127.0.0.1 KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_CREATE_TOPICS: "time_sync:1:1,modified_spat:1:1,v2xhub_scheduling_plan_sub:1:1" + KAFKA_CREATE_TOPICS: "time_sync:1:1,modified_spat:1:1,v2xhub_scheduling_plan_sub:1:1,v2xhub_ssm_sub:1:1" KAFKA_LOG_DIRS: "/kafka/kafka-logs" KAFKA_OFFSETS_RETENTION_MINUTES: 1 volumes: diff --git a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp index 419e1b94c..376383039 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp +++ b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp @@ -42,6 +42,12 @@ namespace tmx::utils return false; } + if (conf->set(ENABLE_AUTO_COMMIT, "true", errstr) != RdKafka::Conf::CONF_OK) + { + FILE_LOG(logWARNING) << "RDKafka conf set enable auto commit failed: " << errstr.c_str() << std::endl; + return false; + } + // set consumer group if (conf->set(GROUP_ID, _group_id_str, errstr) != RdKafka::Conf::CONF_OK) { @@ -88,6 +94,7 @@ namespace tmx::utils void kafka_consumer_worker::stop() { _run = false; + _consumer->close(); /*Destroy kafka instance*/ // Wait for RdKafka to decommission. RdKafka::wait_destroyed(5000); } diff --git a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h index a3e00c00b..95a9c96a8 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h +++ b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h @@ -40,6 +40,7 @@ namespace tmx::utils { const std::string GROUP_ID="group.id"; const std::string MAX_PARTITION_FETCH_SIZE="max.partition.fetch.bytes"; const std::string ENABLE_PARTITION_END_OF="enable.partition.eof"; + const std::string ENABLE_AUTO_COMMIT="enable.auto.commit"; //maximum size for pulling message from a single partition at a time std::string STR_FETCH_NUM = "10240000"; diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp index 26d4aecdc..2de561e02 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp @@ -20,6 +20,8 @@ namespace CARMAStreetsPlugin { */ CARMAStreetsPlugin::CARMAStreetsPlugin(string name) : PluginClient(name) { + + PLOG(logERROR) << "CARMAStreetsPlugin Constructor."; AddMessageFilter < BsmMessage > (this, &CARMAStreetsPlugin::HandleBasicSafetyMessage); AddMessageFilter < tsm3Message > (this, &CARMAStreetsPlugin::HandleMobilityOperationMessage); AddMessageFilter < tsm2Message > (this, &CARMAStreetsPlugin::HandleMobilityPathMessage); @@ -31,6 +33,11 @@ CARMAStreetsPlugin::CARMAStreetsPlugin(string name) : } CARMAStreetsPlugin::~CARMAStreetsPlugin() { + PLOG(logERROR) << "CARMAStreetsPlugin Destructor."; + //Todo: It does not seem the desctructor is called. + _spat_kafka_consumer_ptr->stop(); + _scheduing_plan_kafka_consumer_ptr->stop(); + _ssm_kafka_consumer_ptr->stop(); } void CARMAStreetsPlugin::UpdateConfigSettings() { @@ -95,7 +102,8 @@ void CARMAStreetsPlugin::InitKafkaConsumerProducers() ss << uuid; _subscribeToSpatConsumerGroupId += ss.str(); _subscribeToSchedulingPlanConsumerGroupId += ss.str(); - PLOG(logERROR) << "Kafka INFO:" << kafkaConnectString<<_subscribeToSpatTopic<<_subscribeToSpatConsumerGroupId; + _subscribeToSSMConsumerGroupId += ss.str(); + //Todo further enhancement: Temporary fix for the consumer rebalancing die to multiple consumers join the same group upon restarting plugin. _spat_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSpatTopic,_subscribeToSpatConsumerGroupId); _scheduing_plan_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSchedulingPlanTopic,_subscribeToSchedulingPlanConsumerGroupId); _ssm_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSsmTopic,_subscribeToSSMConsumerGroupId); @@ -110,9 +118,9 @@ void CARMAStreetsPlugin::InitKafkaConsumerProducers() PLOG(logERROR) <<"Kafka consumers init() failed!"; } - thread_schpl = new std::thread(&CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic, this); - thread_spat = new std::thread(&CARMAStreetsPlugin::SubscribeSpatKafkaTopic, this); - thread_ssm = new std::thread(&CARMAStreetsPlugin::SubscribeSSMKafkaTopic, this); + boost::thread thread_schpl(&CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic, this); + boost::thread thread_spat(&CARMAStreetsPlugin::SubscribeSpatKafkaTopic, this); + boost::thread thread_ssm(&CARMAStreetsPlugin::SubscribeSSMKafkaTopic, this); } void CARMAStreetsPlugin::OnConfigChanged(const char *key, const char *value) { @@ -795,6 +803,7 @@ int CARMAStreetsPlugin::Main() { usleep(100000); //sleep for microseconds set from config. } + return (EXIT_SUCCESS); } } /* namespace */ diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h index e647141cd..e8d4620cd 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h @@ -111,9 +111,6 @@ class CARMAStreetsPlugin: public PluginClient { std::vector _strategies; tmx::messages::tsm3Message *_tsm3Message{NULL}; std::mutex data_lock; - std::thread* thread_schpl; - std::thread* thread_spat; - std::thread* thread_ssm; /** * @brief Status label for SPAT messages skipped due to errors.