From 7169c4501334e4eea42868fbf09210f5c6a57aca Mon Sep 17 00:00:00 2001 From: dev Date: Mon, 17 Jul 2023 06:46:38 -0400 Subject: [PATCH] Moved destructor to kafka consumer instead of plugin --- src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp | 4 ++++ src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h | 10 +++++----- .../CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp | 9 +-------- .../CARMAStreetsPlugin/src/CARMAStreetsPlugin.h | 9 +++++++-- 4 files changed, 17 insertions(+), 15 deletions(-) diff --git a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp index a0a3d2c8f..5c82f425b 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp +++ b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp @@ -8,6 +8,10 @@ namespace tmx::utils _partition(partition) { } + kafka_consumer_worker::~kafka_consumer_worker() { + stop(); + FILE_LOG(logWARNING) << "Kafka consumer destroyed!" << std::endl; + } bool kafka_consumer_worker::init() { diff --git a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h index 95a9c96a8..691c6e7af 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h +++ b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h @@ -68,6 +68,11 @@ namespace tmx::utils { * @param partition partition consumer should be assigned to. */ kafka_consumer_worker(const std::string &broker_str, const std::string &topic_str, const std::string & group_id, int64_t cur_offset = 0, int32_t partition = 0); + /** + * @brief Destroy the kafka consumer worker object + * + */ + ~kafka_consumer_worker(); /** * @brief Initialize kafka_consumer_worker * @@ -101,11 +106,6 @@ namespace tmx::utils { * @return false if kafka consumer is stopped. */ virtual bool is_running() const; - /** - * @brief Destroy the kafka consumer worker object - * - */ - virtual ~kafka_consumer_worker() = default; }; } diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp index d0cc40399..d76260ad6 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp @@ -25,17 +25,9 @@ CARMAStreetsPlugin::CARMAStreetsPlugin(string name) : AddMessageFilter < tsm2Message > (this, &CARMAStreetsPlugin::HandleMobilityPathMessage); AddMessageFilter < MapDataMessage > (this, &CARMAStreetsPlugin::HandleMapMessage); AddMessageFilter < SrmMessage > (this, &CARMAStreetsPlugin::HandleSRMMessage); - SubscribeToMessages(); - } -CARMAStreetsPlugin::~CARMAStreetsPlugin() { - //Todo: It does not seem the desctructor is called. - _spat_kafka_consumer_ptr->stop(); - _scheduing_plan_kafka_consumer_ptr->stop(); - _ssm_kafka_consumer_ptr->stop(); -} void CARMAStreetsPlugin::UpdateConfigSettings() { @@ -64,6 +56,7 @@ void CARMAStreetsPlugin::UpdateConfigSettings() { _strategies.clear(); while( ss.good() ) { std::string substring; + getline( ss, substring, ','); _strategies.push_back( substring); } diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h index 3f0ade953..60d42807f 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h @@ -36,7 +36,7 @@ namespace CARMAStreetsPlugin { class CARMAStreetsPlugin: public PluginClientClockAware { public: CARMAStreetsPlugin(std::string); - virtual ~CARMAStreetsPlugin(); + virtual ~CARMAStreetsPlugin() ; int Main(); protected: @@ -49,7 +49,12 @@ class CARMAStreetsPlugin: public PluginClientClockAware { void HandleMobilityOperationMessage(tsm3Message &msg, routeable_message &routeableMsg); void HandleMobilityPathMessage(tsm2Message &msg, routeable_message &routeableMsg); void HandleBasicSafetyMessage(BsmMessage &msg, routeable_message &routeableMsg); - virtual void HandleTimeSyncMessage(TimeSyncMessage &msg, routeable_message &routeableMsg) override; + /** + * @brief Overide PluginClientClockAware HandleTimeSyncMessage to producer TimeSyncMessage to kafka for CARMA Streets Time Synchronization. + * @param msg TimeSyncMessage received by plugin when in simulation mode. Message provides current simulation time to all processes. + * @param routeableMsg routeable_message for time sync message. + */ + void HandleTimeSyncMessage(TimeSyncMessage &msg, routeable_message &routeableMsg) override; /** * @brief Subscribe to MAP message broadcast by the MAPPlugin. This handler will be called automatically whenever the MAPPlugin is broadcasting a J2735 MAP message. * @param msg The J2735 MAP message received from the internal