Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue-549: Moved Kafka time producer from CDASimAdapter to CARMA-Stre… #550

Merged
merged 8 commits into from
Jul 20, 2023
11 changes: 7 additions & 4 deletions src/tmx/TmxUtils/src/PluginClientClockAware.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ namespace tmx::utils {
: PluginClient(name)
{
// check for simulation mode enabled by environment variable
bool simulationMode = sim::is_simulation_mode();
_simulation_mode = sim::is_simulation_mode();

using namespace fwha_stol::lib::time;
clock = std::make_shared<CarmaClock>(simulationMode);
if (simulationMode) {
clock = std::make_shared<CarmaClock>(_simulation_mode);
if (_simulation_mode) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the if statement to only apply filter when it is in simulation mode? Is it possible for CDAAdapterPlugin to broadcast TimeSyncMessage when it is not in simulation mode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it is not possible and also not valid. TimeSync messages are only valid when we are running in simulation. Otherwise we use machine time.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AddMessageFilter<tmx::messages::TimeSyncMessage>(this, &PluginClientClockAware::HandleTimeSyncMessage);

The AddMessageFilter is event based and should only be triggered when there is Timsync message sent to the TMX bus in simulation mode.
so the _simulation_mode should always be true. Can we remove this if check statement? The same to the HandleTimeSyncMessage() function. There is no need to check if _simulation_mode true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is incorrect. Simulation mode will be false in real world deployments. Here the PluginClockAwareClient will not subscribe to TimeSync messages so this is valid in my opinion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, the CDASimAdapter will only be active in simulation mode, which means we will only broadcast TimeSync messages in Simulation Mode. The PluginClockAwareClient is a PluginClient that is able to configurably use Simulation time or real time. It will only subscribe to TimeSync messages when in simulation mode. So on the broadcast side the plugin will not be active to produce time sync messages if not in simulation. On the filter side, even if something else did produce a time sync message, we will not add a filter for it if not in simulation mode.

AddMessageFilter<tmx::messages::TimeSyncMessage>(this, &PluginClientClockAware::HandleTimeSyncMessage);

}
Expand All @@ -25,7 +25,6 @@ namespace tmx::utils {
this->getClock()->update( msg.get_timestep() );
if (sim::is_simulation_mode() ) {
SetStatus(Key_Simulation_Time_Step, Clock::ToUtcPreciseTimeString(msg.get_timestep()));

}
}

Expand All @@ -36,4 +35,8 @@ namespace tmx::utils {
}
}

bool PluginClientClockAware::isSimulationMode() const {
return _simulation_mode;
}

}
7 changes: 6 additions & 1 deletion src/tmx/TmxUtils/src/PluginClientClockAware.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class PluginClientClockAware : public PluginClient {
* @param msg TimeSyncMessage broadcast on TMX core
* @param routeableMsg
*/
void HandleTimeSyncMessage(tmx::messages::TimeSyncMessage &msg, routeable_message &routeableMsg );
virtual void HandleTimeSyncMessage(tmx::messages::TimeSyncMessage &msg, routeable_message &routeableMsg );


protected:
Expand All @@ -41,6 +41,9 @@ class PluginClientClockAware : public PluginClient {
}

void OnStateChange(IvpPluginState state) override;

bool isSimulationMode() const;


private:
/**
Expand All @@ -55,6 +58,8 @@ class PluginClientClockAware : public PluginClient {
* @brief Status label to indicate whether plugin is in Simulation Mode.
*/
const char* Key_Simulation_Mode = "Simulation Mode ";

bool _simulation_mode;

};

Expand Down
9 changes: 7 additions & 2 deletions src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ namespace tmx::utils
_partition(partition)
{
}
kafka_consumer_worker::~kafka_consumer_worker() {
stop();
FILE_LOG(logWARNING) << "Kafka consumer destroyed!" << std::endl;
}

bool kafka_consumer_worker::init()
{
Expand Down Expand Up @@ -93,11 +97,12 @@ namespace tmx::utils

void kafka_consumer_worker::stop()
{
FILE_LOG(logWARNING) << "Stopping Kafka Consumer!" << std::endl;
_run = false;
//Close and shutdown the consumer.
_consumer->close();
/*Destroy kafka instance*/ // Wait for RdKafka to decommission.
RdKafka::wait_destroyed(5000);
FILE_LOG(logWARNING) << "Kafka Consumer Stopped!" << std::endl;

}

void kafka_consumer_worker::subscribe()
Expand Down
20 changes: 14 additions & 6 deletions src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ namespace tmx::utils {
* @param partition partition consumer should be assigned to.
*/
kafka_consumer_worker(const std::string &broker_str, const std::string &topic_str, const std::string & group_id, int64_t cur_offset = 0, int32_t partition = 0);
/**
* @brief Destroy the kafka consumer worker object. Calls stop on consumer to clean up resources.
*/
~kafka_consumer_worker();
// Rule of 5 because destructor is define (https://www.codementor.io/@sandesh87/the-rule-of-five-in-c-1pdgpzb04f)
// Delete copy constructor
kafka_consumer_worker(kafka_consumer_worker& other) = delete;
// Delete copy assigment
kafka_consumer_worker& operator=(const kafka_consumer_worker& other) = delete;
// delete move constructor
kafka_consumer_worker(kafka_consumer_worker &&consumer) = delete;
// delete move assignment
kafka_consumer_worker const & operator=(kafka_consumer_worker &&consumer) = delete;
/**
* @brief Initialize kafka_consumer_worker
*
Expand All @@ -89,7 +102,7 @@ namespace tmx::utils {
/**
* @brief Stop running kafka consumer.
*/
virtual void stop();
void stop();
/**
* @brief Print current configurations.
*/
Expand All @@ -101,11 +114,6 @@ namespace tmx::utils {
* @return false if kafka consumer is stopped.
*/
virtual bool is_running() const;
/**
* @brief Destroy the kafka consumer worker object
*
*/
virtual ~kafka_consumer_worker() = default;
};

}
Expand Down
13 changes: 10 additions & 3 deletions src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ namespace tmx::utils
{
}

kafka_producer_worker::~kafka_producer_worker() {
stop();
FILE_LOG(logWARNING) << "Kafka Producer Worker Destroyed!" << std::endl;
}

bool kafka_producer_worker::init()
{
if(init_producer())
Expand Down Expand Up @@ -217,16 +222,18 @@ namespace tmx::utils
{
if (_producer)
{
_producer->flush(10 * 1000 /* wait for max 10 seconds */);

auto error =_producer->flush(10 * 1000 /* wait for max 10 seconds */);
if (error == RdKafka::ERR__TIMED_OUT)
FILE_LOG(logERROR) << "Flush attempt timed out!" << std::endl;
if (_producer->outq_len() > 0)
FILE_LOG(logWARNING) << _producer->name() << _producer->outq_len() << " message(s) were not delivered." << std::endl;
FILE_LOG(logERROR) << _producer->name() << _producer->outq_len() << " message(s) were not delivered." << std::endl;
}
}
catch (const std::runtime_error &e)
{
FILE_LOG(logERROR) << "Error encountered flushing producer : " << e.what() << std::endl;
}
FILE_LOG(logWARNING) << "Kafka producer stopped!" << std::endl;
}

void kafka_producer_worker::printCurrConf()
Expand Down
21 changes: 15 additions & 6 deletions src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ namespace tmx::utils
* @param broker_str network address of kafka broker.
*/
explicit kafka_producer_worker(const std::string &brokers);
/**
* @brief Destroy the kafka producer worker object. Calls stop on producer to clean up resources.
*/
virtual ~kafka_producer_worker();
// Rule of 5 because destructor is define (https://www.codementor.io/@sandesh87/the-rule-of-five-in-c-1pdgpzb04f)
// delete copy constructor
kafka_producer_worker(kafka_producer_worker& other) = delete;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of deleting the copy and move constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rule of 5 is a rule in C++ that if you define one of these you need to explicitly define all to avoid issues with compiler generated versions. This is to address a Code Smell. (https://www.codementor.io/@sandesh87/the-rule-of-five-in-c-1pdgpzb04f)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a comment as well

// delete copy assignment
kafka_producer_worker& operator=(const kafka_producer_worker& other) = delete;
// delete move constructor
kafka_producer_worker(kafka_producer_worker &&producer) = delete;
// delete move assignment
kafka_producer_worker const & operator=(kafka_producer_worker &&producer) = delete;
/**
* @brief Initialize kafka_producer_worker. This method must be called before send!
*
Expand Down Expand Up @@ -100,16 +113,12 @@ namespace tmx::utils
/**
* @brief Stop running kafka producer.
*/
virtual void stop();
void stop();
/**
* @brief Print current configurations.
*/
virtual void printCurrConf();
/**
* @brief Destroy the kafka producer worker object
*
*/
virtual ~kafka_producer_worker() = default;

};
}

Expand Down
1 change: 0 additions & 1 deletion src/tmx/TmxUtils/src/kafka/mock_kafka_consumer_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ namespace tmx::utils {
MOCK_METHOD(bool, init,(),(override));
MOCK_METHOD(const char*, consume, (int timeout_ms), (override));
MOCK_METHOD(void, subscribe, (), (override));
MOCK_METHOD(void, stop, (), (override));
MOCK_METHOD(void, printCurrConf, (), (override));
MOCK_METHOD(bool, is_running, (), (const override));
};
Expand Down
1 change: 0 additions & 1 deletion src/tmx/TmxUtils/src/kafka/mock_kafka_producer_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ namespace tmx::utils {
MOCK_METHOD(bool, init,(),(override));
MOCK_METHOD(void, send, (const std::string &msg), (override));
MOCK_METHOD(bool, is_running, (), (const, override));
MOCK_METHOD(void, stop, (), (override));
MOCK_METHOD(void, printCurrConf, (), (override));
};
}
27 changes: 27 additions & 0 deletions src/tmx/TmxUtils/test/KafkaTestEnvironment.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#include "gtest/gtest.h"
#include <librdkafka/rdkafkacpp.h>

/**
* @brief Kafka Test Environment which allows for Setup/Teardown configuration at the
* test program level. Teardown waits on all rd_kafka_t objects to be destroyed.
*/
class KafkaTestEnvironment : public ::testing::Environment {
public:
~KafkaTestEnvironment() override {}

// Override this to define how to set up the environment.
void SetUp() override {}

// Override this to define how to tear down the environment.
void TearDown() override {
std::cout << "Waiting for all RDKafka objects to be destroyed!" << std::endl;
// Wait for all rd_kafka_t objects to be destroyed
auto error = RdKafka::wait_destroyed(5000);
if (error == RdKafka::ERR__TIMED_OUT) {
std::cout << "Wait destroy attempted timed out!" << std::endl;
}
else {
std::cout << "All Objects are destroyed!" << std::endl;
}
}
};
2 changes: 2 additions & 0 deletions src/tmx/TmxUtils/test/Main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
*/

#include <gtest/gtest.h>
#include "KafkaTestEnvironment.cpp"

int main(int argc, char **argv)
{
::testing::AddGlobalTestEnvironment(new KafkaTestEnvironment());
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
1 change: 1 addition & 0 deletions src/tmx/TmxUtils/test/test_kafka_consumer_worker.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "gtest/gtest.h"
#include "kafka/kafka_client.h"
#include "PluginLog.h"

TEST(test_kafka_consumer_worker, create_consumer)
{
Expand Down
2 changes: 0 additions & 2 deletions src/tmx/TmxUtils/test/test_kafka_producer_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ TEST(test_kafka_producer_worker, create_producer)
std::string msg = "test message";
// // Run this unit test without launching kafka broker will throw connection refused error
worker->send(msg);
worker->stop();
}

TEST(test_kafka_producer_worker, create_producer_no_topic)
Expand All @@ -28,5 +27,4 @@ TEST(test_kafka_producer_worker, create_producer_no_topic)
std::string msg = "test message";
// // Run this unit test without launching kafka broker will throw connection refused error
worker->send(msg, topic);
worker->stop();
}
18 changes: 9 additions & 9 deletions src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,15 @@ namespace CARMAStreetsPlugin {
* @param name The name to give the plugin for identification purposes
*/
CARMAStreetsPlugin::CARMAStreetsPlugin(string name) :
PluginClient(name) {
PluginClientClockAware(name) {
AddMessageFilter < BsmMessage > (this, &CARMAStreetsPlugin::HandleBasicSafetyMessage);
AddMessageFilter < tsm3Message > (this, &CARMAStreetsPlugin::HandleMobilityOperationMessage);
AddMessageFilter < tsm2Message > (this, &CARMAStreetsPlugin::HandleMobilityPathMessage);
AddMessageFilter < MapDataMessage > (this, &CARMAStreetsPlugin::HandleMapMessage);
AddMessageFilter < SrmMessage > (this, &CARMAStreetsPlugin::HandleSRMMessage);

SubscribeToMessages();

}

CARMAStreetsPlugin::~CARMAStreetsPlugin() {
//Todo: It does not seem the desctructor is called.
_spat_kafka_consumer_ptr->stop();
_scheduing_plan_kafka_consumer_ptr->stop();
_ssm_kafka_consumer_ptr->stop();
}

void CARMAStreetsPlugin::UpdateConfigSettings() {

Expand Down Expand Up @@ -64,6 +56,7 @@ void CARMAStreetsPlugin::UpdateConfigSettings() {
_strategies.clear();
while( ss.good() ) {
std::string substring;

getline( ss, substring, ',');
_strategies.push_back( substring);
}
Expand Down Expand Up @@ -109,6 +102,13 @@ void CARMAStreetsPlugin::OnConfigChanged(const char *key, const char *value) {
UpdateConfigSettings();
}

void CARMAStreetsPlugin::HandleTimeSyncMessage(tmx::messages::TimeSyncMessage &msg, routeable_message &routeableMsg ) {
PluginClientClockAware::HandleTimeSyncMessage(msg, routeableMsg);
if ( isSimulationMode()) {
PLOG(logINFO) << "Handling TimeSync messages!" << std::endl;
produce_kafka_msg(msg.to_string(), "time_sync");
}
}
void CARMAStreetsPlugin::HandleMobilityOperationMessage(tsm3Message &msg, routeable_message &routeableMsg ) {
try
{
Expand Down
10 changes: 8 additions & 2 deletions src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <kafka/kafka_client.h>
#include <kafka/kafka_consumer_worker.h>
#include "JsonToJ2735SSMConverter.h"
#include "PluginClientClockAware.h"



Expand All @@ -32,10 +33,9 @@ using namespace boost::property_tree;

namespace CARMAStreetsPlugin {

class CARMAStreetsPlugin: public PluginClient {
class CARMAStreetsPlugin: public PluginClientClockAware {
public:
CARMAStreetsPlugin(std::string);
virtual ~CARMAStreetsPlugin();
int Main();
protected:

Expand All @@ -48,6 +48,12 @@ class CARMAStreetsPlugin: public PluginClient {
void HandleMobilityOperationMessage(tsm3Message &msg, routeable_message &routeableMsg);
void HandleMobilityPathMessage(tsm2Message &msg, routeable_message &routeableMsg);
void HandleBasicSafetyMessage(BsmMessage &msg, routeable_message &routeableMsg);
/**
* @brief Overide PluginClientClockAware HandleTimeSyncMessage to producer TimeSyncMessage to kafka for CARMA Streets Time Synchronization.
* @param msg TimeSyncMessage received by plugin when in simulation mode. Message provides current simulation time to all processes.
* @param routeableMsg routeable_message for time sync message.
*/
void HandleTimeSyncMessage(TimeSyncMessage &msg, routeable_message &routeableMsg) override;
/**
* @brief Subscribe to MAP message broadcast by the MAPPlugin. This handler will be called automatically whenever the MAPPlugin is broadcasting a J2735 MAP message.
* @param msg The J2735 MAP message received from the internal
Expand Down
Loading