Skip to content

Commit

Permalink
Moved destructor to kafka consumer instead of plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
paulbourelly999 committed Jul 17, 2023
1 parent d1bbb0a commit 7169c45
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 15 deletions.
4 changes: 4 additions & 0 deletions src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ namespace tmx::utils
_partition(partition)
{
}
kafka_consumer_worker::~kafka_consumer_worker() {
stop();
FILE_LOG(logWARNING) << "Kafka consumer destroyed!" << std::endl;
}

bool kafka_consumer_worker::init()
{
Expand Down
10 changes: 5 additions & 5 deletions src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ namespace tmx::utils {
* @param partition partition consumer should be assigned to.
*/
kafka_consumer_worker(const std::string &broker_str, const std::string &topic_str, const std::string & group_id, int64_t cur_offset = 0, int32_t partition = 0);
/**
* @brief Destroy the kafka consumer worker object
*
*/
~kafka_consumer_worker();
/**
* @brief Initialize kafka_consumer_worker
*
Expand Down Expand Up @@ -101,11 +106,6 @@ namespace tmx::utils {
* @return false if kafka consumer is stopped.
*/
virtual bool is_running() const;
/**
* @brief Destroy the kafka consumer worker object
*
*/
virtual ~kafka_consumer_worker() = default;
};

}
Expand Down
9 changes: 1 addition & 8 deletions src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,9 @@ CARMAStreetsPlugin::CARMAStreetsPlugin(string name) :
AddMessageFilter < tsm2Message > (this, &CARMAStreetsPlugin::HandleMobilityPathMessage);
AddMessageFilter < MapDataMessage > (this, &CARMAStreetsPlugin::HandleMapMessage);
AddMessageFilter < SrmMessage > (this, &CARMAStreetsPlugin::HandleSRMMessage);

SubscribeToMessages();

}

CARMAStreetsPlugin::~CARMAStreetsPlugin() {
//Todo: It does not seem the desctructor is called.
_spat_kafka_consumer_ptr->stop();
_scheduing_plan_kafka_consumer_ptr->stop();
_ssm_kafka_consumer_ptr->stop();
}

void CARMAStreetsPlugin::UpdateConfigSettings() {

Expand Down Expand Up @@ -64,6 +56,7 @@ void CARMAStreetsPlugin::UpdateConfigSettings() {
_strategies.clear();
while( ss.good() ) {
std::string substring;

getline( ss, substring, ',');
_strategies.push_back( substring);
}
Expand Down
9 changes: 7 additions & 2 deletions src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace CARMAStreetsPlugin {
class CARMAStreetsPlugin: public PluginClientClockAware {
public:
CARMAStreetsPlugin(std::string);
virtual ~CARMAStreetsPlugin();
virtual ~CARMAStreetsPlugin() ;
int Main();
protected:

Expand All @@ -49,7 +49,12 @@ class CARMAStreetsPlugin: public PluginClientClockAware {
void HandleMobilityOperationMessage(tsm3Message &msg, routeable_message &routeableMsg);
void HandleMobilityPathMessage(tsm2Message &msg, routeable_message &routeableMsg);
void HandleBasicSafetyMessage(BsmMessage &msg, routeable_message &routeableMsg);
virtual void HandleTimeSyncMessage(TimeSyncMessage &msg, routeable_message &routeableMsg) override;
/**
* @brief Overide PluginClientClockAware HandleTimeSyncMessage to producer TimeSyncMessage to kafka for CARMA Streets Time Synchronization.
* @param msg TimeSyncMessage received by plugin when in simulation mode. Message provides current simulation time to all processes.
* @param routeableMsg routeable_message for time sync message.
*/
void HandleTimeSyncMessage(TimeSyncMessage &msg, routeable_message &routeableMsg) override;
/**
* @brief Subscribe to MAP message broadcast by the MAPPlugin. This handler will be called automatically whenever the MAPPlugin is broadcasting a J2735 MAP message.
* @param msg The J2735 MAP message received from the internal
Expand Down

0 comments on commit 7169c45

Please sign in to comment.