Skip to content

Commit

Permalink
Issue-549: Moved Kafka time producer from CDASimAdapter to CARMA-Stre… (
Browse files Browse the repository at this point in the history
#550)

…ets plugin

+ Update CARMA-Streets Plugin to be simulation time aware

<!-- Thanks for the contribution, this is awesome. -->

# PR Details
## Description

<!--- Describe your changes in detail -->
Move Kafka time producer for CARMA-Streets time synchronization to
CARMA-Streets plugin. This removes unnecessary dependency on kafka for
CDASimAdapter and also makes CARMA-Streets Plugin contain entire
interface to CARMA-Streets including simulation. CARMA-Streets Plugin
now extends our simulation time aware PluginClient which can be further
extended to listen for simulated messages.
## Related Issue
#549
<!--- This project only accepts pull requests related to open issues -->
<!--- If suggesting a new feature or change, please discuss it in an
issue first -->
<!--- If fixing a bug, there should be an issue describing it with steps
to reproduce -->
<!--- Please link to the issue here: -->

## Motivation and Context

<!--- Why is this change required? What problem does it solve? -->

## How Has This Been Tested?
Locally integration testing with python time step script to send udp
time step messages.
<!--- Please describe in detail how you tested your changes. -->
<!--- Include details of your testing environment, and the tests you ran
to -->
<!--- see how your change affects other areas of the code, etc. -->

## Types of changes

<!--- What types of changes does your code introduce? Put an `x` in all
the boxes that apply: -->

- [ ] Defect fix (non-breaking change that fixes an issue)
- [x] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that cause existing functionality
to change)

## Checklist:

<!--- Go over all the following points, and put an `x` in all the boxes
that apply. -->
<!--- If you're unsure about any of these, don't hesitate to ask. We're
here to help! -->

- [ ] I have added any new packages to the sonar-scanner.properties file
- [x] My change requires a change to the documentation.
- [x] I have updated the documentation accordingly.
- [x] I have read the **CONTRIBUTING** document.
[V2XHUB Contributing
Guide](https://github.com/usdot-fhwa-OPS/V2X-Hub/blob/develop/Contributing.md)
- [ ] I have added tests to cover my changes.
- [ ] All new and existing tests passed.
  • Loading branch information
paulbourelly999 authored Jul 20, 2023
1 parent b3be4ff commit 77127e2
Show file tree
Hide file tree
Showing 22 changed files with 113 additions and 83 deletions.
1 change: 0 additions & 1 deletion .devcontainer/docker-compose-vscode.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ services:
- SIM_V2X_PORT=5757
- V2X_PORT=8686
- INFRASTRUCTURE_ID=1
- KAFKA_BROKER_ADDRESS=127.0.0.1:9092
secrets:
- mysql_password

Expand Down
11 changes: 7 additions & 4 deletions src/tmx/TmxUtils/src/PluginClientClockAware.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ namespace tmx::utils {
: PluginClient(name)
{
// check for simulation mode enabled by environment variable
bool simulationMode = sim::is_simulation_mode();
_simulation_mode = sim::is_simulation_mode();

using namespace fwha_stol::lib::time;
clock = std::make_shared<CarmaClock>(simulationMode);
if (simulationMode) {
clock = std::make_shared<CarmaClock>(_simulation_mode);
if (_simulation_mode) {
AddMessageFilter<tmx::messages::TimeSyncMessage>(this, &PluginClientClockAware::HandleTimeSyncMessage);

}
Expand All @@ -25,7 +25,6 @@ namespace tmx::utils {
this->getClock()->update( msg.get_timestep() );
if (sim::is_simulation_mode() ) {
SetStatus(Key_Simulation_Time_Step, Clock::ToUtcPreciseTimeString(msg.get_timestep()));

}
}

Expand All @@ -36,4 +35,8 @@ namespace tmx::utils {
}
}

bool PluginClientClockAware::isSimulationMode() const {
return _simulation_mode;
}

}
7 changes: 6 additions & 1 deletion src/tmx/TmxUtils/src/PluginClientClockAware.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class PluginClientClockAware : public PluginClient {
* @param msg TimeSyncMessage broadcast on TMX core
* @param routeableMsg
*/
void HandleTimeSyncMessage(tmx::messages::TimeSyncMessage &msg, routeable_message &routeableMsg );
virtual void HandleTimeSyncMessage(tmx::messages::TimeSyncMessage &msg, routeable_message &routeableMsg );


protected:
Expand All @@ -41,6 +41,9 @@ class PluginClientClockAware : public PluginClient {
}

void OnStateChange(IvpPluginState state) override;

bool isSimulationMode() const;


private:
/**
Expand All @@ -55,6 +58,8 @@ class PluginClientClockAware : public PluginClient {
* @brief Status label to indicate whether plugin is in Simulation Mode.
*/
const char* Key_Simulation_Mode = "Simulation Mode ";

bool _simulation_mode;

};

Expand Down
9 changes: 7 additions & 2 deletions src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ namespace tmx::utils
_partition(partition)
{
}
kafka_consumer_worker::~kafka_consumer_worker() {
stop();
FILE_LOG(logWARNING) << "Kafka consumer destroyed!" << std::endl;
}

bool kafka_consumer_worker::init()
{
Expand Down Expand Up @@ -93,11 +97,12 @@ namespace tmx::utils

void kafka_consumer_worker::stop()
{
FILE_LOG(logWARNING) << "Stopping Kafka Consumer!" << std::endl;
_run = false;
//Close and shutdown the consumer.
_consumer->close();
/*Destroy kafka instance*/ // Wait for RdKafka to decommission.
RdKafka::wait_destroyed(5000);
FILE_LOG(logWARNING) << "Kafka Consumer Stopped!" << std::endl;

}

void kafka_consumer_worker::subscribe()
Expand Down
20 changes: 14 additions & 6 deletions src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ namespace tmx::utils {
* @param partition partition consumer should be assigned to.
*/
kafka_consumer_worker(const std::string &broker_str, const std::string &topic_str, const std::string & group_id, int64_t cur_offset = 0, int32_t partition = 0);
/**
* @brief Destroy the kafka consumer worker object. Calls stop on consumer to clean up resources.
*/
~kafka_consumer_worker();
// Rule of 5 because destructor is define (https://www.codementor.io/@sandesh87/the-rule-of-five-in-c-1pdgpzb04f)
// Delete copy constructor
kafka_consumer_worker(kafka_consumer_worker& other) = delete;
// Delete copy assigment
kafka_consumer_worker& operator=(const kafka_consumer_worker& other) = delete;
// delete move constructor
kafka_consumer_worker(kafka_consumer_worker &&consumer) = delete;
// delete move assignment
kafka_consumer_worker const & operator=(kafka_consumer_worker &&consumer) = delete;
/**
* @brief Initialize kafka_consumer_worker
*
Expand All @@ -89,7 +102,7 @@ namespace tmx::utils {
/**
* @brief Stop running kafka consumer.
*/
virtual void stop();
void stop();
/**
* @brief Print current configurations.
*/
Expand All @@ -101,11 +114,6 @@ namespace tmx::utils {
* @return false if kafka consumer is stopped.
*/
virtual bool is_running() const;
/**
* @brief Destroy the kafka consumer worker object
*
*/
virtual ~kafka_consumer_worker() = default;
};

}
Expand Down
13 changes: 10 additions & 3 deletions src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ namespace tmx::utils
{
}

kafka_producer_worker::~kafka_producer_worker() {
stop();
FILE_LOG(logWARNING) << "Kafka Producer Worker Destroyed!" << std::endl;
}

bool kafka_producer_worker::init()
{
if(init_producer())
Expand Down Expand Up @@ -217,16 +222,18 @@ namespace tmx::utils
{
if (_producer)
{
_producer->flush(10 * 1000 /* wait for max 10 seconds */);

auto error =_producer->flush(10 * 1000 /* wait for max 10 seconds */);
if (error == RdKafka::ERR__TIMED_OUT)
FILE_LOG(logERROR) << "Flush attempt timed out!" << std::endl;
if (_producer->outq_len() > 0)
FILE_LOG(logWARNING) << _producer->name() << _producer->outq_len() << " message(s) were not delivered." << std::endl;
FILE_LOG(logERROR) << _producer->name() << _producer->outq_len() << " message(s) were not delivered." << std::endl;
}
}
catch (const std::runtime_error &e)
{
FILE_LOG(logERROR) << "Error encountered flushing producer : " << e.what() << std::endl;
}
FILE_LOG(logWARNING) << "Kafka producer stopped!" << std::endl;
}

void kafka_producer_worker::printCurrConf()
Expand Down
21 changes: 15 additions & 6 deletions src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ namespace tmx::utils
* @param broker_str network address of kafka broker.
*/
explicit kafka_producer_worker(const std::string &brokers);
/**
* @brief Destroy the kafka producer worker object. Calls stop on producer to clean up resources.
*/
virtual ~kafka_producer_worker();
// Rule of 5 because destructor is define (https://www.codementor.io/@sandesh87/the-rule-of-five-in-c-1pdgpzb04f)
// delete copy constructor
kafka_producer_worker(kafka_producer_worker& other) = delete;
// delete copy assignment
kafka_producer_worker& operator=(const kafka_producer_worker& other) = delete;
// delete move constructor
kafka_producer_worker(kafka_producer_worker &&producer) = delete;
// delete move assignment
kafka_producer_worker const & operator=(kafka_producer_worker &&producer) = delete;
/**
* @brief Initialize kafka_producer_worker. This method must be called before send!
*
Expand Down Expand Up @@ -100,16 +113,12 @@ namespace tmx::utils
/**
* @brief Stop running kafka producer.
*/
virtual void stop();
void stop();
/**
* @brief Print current configurations.
*/
virtual void printCurrConf();
/**
* @brief Destroy the kafka producer worker object
*
*/
virtual ~kafka_producer_worker() = default;

};
}

Expand Down
1 change: 0 additions & 1 deletion src/tmx/TmxUtils/src/kafka/mock_kafka_consumer_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ namespace tmx::utils {
MOCK_METHOD(bool, init,(),(override));
MOCK_METHOD(const char*, consume, (int timeout_ms), (override));
MOCK_METHOD(void, subscribe, (), (override));
MOCK_METHOD(void, stop, (), (override));
MOCK_METHOD(void, printCurrConf, (), (override));
MOCK_METHOD(bool, is_running, (), (const override));
};
Expand Down
1 change: 0 additions & 1 deletion src/tmx/TmxUtils/src/kafka/mock_kafka_producer_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ namespace tmx::utils {
MOCK_METHOD(bool, init,(),(override));
MOCK_METHOD(void, send, (const std::string &msg), (override));
MOCK_METHOD(bool, is_running, (), (const, override));
MOCK_METHOD(void, stop, (), (override));
MOCK_METHOD(void, printCurrConf, (), (override));
};
}
27 changes: 27 additions & 0 deletions src/tmx/TmxUtils/test/KafkaTestEnvironment.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#include "gtest/gtest.h"
#include <librdkafka/rdkafkacpp.h>

/**
* @brief Kafka Test Environment which allows for Setup/Teardown configuration at the
* test program level. Teardown waits on all rd_kafka_t objects to be destroyed.
*/
class KafkaTestEnvironment : public ::testing::Environment {
public:
~KafkaTestEnvironment() override {}

// Override this to define how to set up the environment.
void SetUp() override {}

// Override this to define how to tear down the environment.
void TearDown() override {
std::cout << "Waiting for all RDKafka objects to be destroyed!" << std::endl;
// Wait for all rd_kafka_t objects to be destroyed
auto error = RdKafka::wait_destroyed(5000);
if (error == RdKafka::ERR__TIMED_OUT) {
std::cout << "Wait destroy attempted timed out!" << std::endl;
}
else {
std::cout << "All Objects are destroyed!" << std::endl;
}
}
};
2 changes: 2 additions & 0 deletions src/tmx/TmxUtils/test/Main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
*/

#include <gtest/gtest.h>
#include "KafkaTestEnvironment.cpp"

int main(int argc, char **argv)
{
::testing::AddGlobalTestEnvironment(new KafkaTestEnvironment());
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
1 change: 1 addition & 0 deletions src/tmx/TmxUtils/test/test_kafka_consumer_worker.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "gtest/gtest.h"
#include "kafka/kafka_client.h"
#include "PluginLog.h"

TEST(test_kafka_consumer_worker, create_consumer)
{
Expand Down
2 changes: 0 additions & 2 deletions src/tmx/TmxUtils/test/test_kafka_producer_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ TEST(test_kafka_producer_worker, create_producer)
std::string msg = "test message";
// // Run this unit test without launching kafka broker will throw connection refused error
worker->send(msg);
worker->stop();
}

TEST(test_kafka_producer_worker, create_producer_no_topic)
Expand All @@ -28,5 +27,4 @@ TEST(test_kafka_producer_worker, create_producer_no_topic)
std::string msg = "test message";
// // Run this unit test without launching kafka broker will throw connection refused error
worker->send(msg, topic);
worker->stop();
}
18 changes: 9 additions & 9 deletions src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,15 @@ namespace CARMAStreetsPlugin {
* @param name The name to give the plugin for identification purposes
*/
CARMAStreetsPlugin::CARMAStreetsPlugin(string name) :
PluginClient(name) {
PluginClientClockAware(name) {
AddMessageFilter < BsmMessage > (this, &CARMAStreetsPlugin::HandleBasicSafetyMessage);
AddMessageFilter < tsm3Message > (this, &CARMAStreetsPlugin::HandleMobilityOperationMessage);
AddMessageFilter < tsm2Message > (this, &CARMAStreetsPlugin::HandleMobilityPathMessage);
AddMessageFilter < MapDataMessage > (this, &CARMAStreetsPlugin::HandleMapMessage);
AddMessageFilter < SrmMessage > (this, &CARMAStreetsPlugin::HandleSRMMessage);

SubscribeToMessages();

}

CARMAStreetsPlugin::~CARMAStreetsPlugin() {
//Todo: It does not seem the desctructor is called.
_spat_kafka_consumer_ptr->stop();
_scheduing_plan_kafka_consumer_ptr->stop();
_ssm_kafka_consumer_ptr->stop();
}

void CARMAStreetsPlugin::UpdateConfigSettings() {

Expand Down Expand Up @@ -64,6 +56,7 @@ void CARMAStreetsPlugin::UpdateConfigSettings() {
_strategies.clear();
while( ss.good() ) {
std::string substring;

getline( ss, substring, ',');
_strategies.push_back( substring);
}
Expand Down Expand Up @@ -109,6 +102,13 @@ void CARMAStreetsPlugin::OnConfigChanged(const char *key, const char *value) {
UpdateConfigSettings();
}

void CARMAStreetsPlugin::HandleTimeSyncMessage(tmx::messages::TimeSyncMessage &msg, routeable_message &routeableMsg ) {
PluginClientClockAware::HandleTimeSyncMessage(msg, routeableMsg);
if ( isSimulationMode()) {
PLOG(logINFO) << "Handling TimeSync messages!" << std::endl;
produce_kafka_msg(msg.to_string(), "time_sync");
}
}
void CARMAStreetsPlugin::HandleMobilityOperationMessage(tsm3Message &msg, routeable_message &routeableMsg ) {
try
{
Expand Down
10 changes: 8 additions & 2 deletions src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <kafka/kafka_client.h>
#include <kafka/kafka_consumer_worker.h>
#include "JsonToJ2735SSMConverter.h"
#include "PluginClientClockAware.h"



Expand All @@ -32,10 +33,9 @@ using namespace boost::property_tree;

namespace CARMAStreetsPlugin {

class CARMAStreetsPlugin: public PluginClient {
class CARMAStreetsPlugin: public PluginClientClockAware {
public:
CARMAStreetsPlugin(std::string);
virtual ~CARMAStreetsPlugin();
int Main();
protected:

Expand All @@ -48,6 +48,12 @@ class CARMAStreetsPlugin: public PluginClient {
void HandleMobilityOperationMessage(tsm3Message &msg, routeable_message &routeableMsg);
void HandleMobilityPathMessage(tsm2Message &msg, routeable_message &routeableMsg);
void HandleBasicSafetyMessage(BsmMessage &msg, routeable_message &routeableMsg);
/**
* @brief Overide PluginClientClockAware HandleTimeSyncMessage to producer TimeSyncMessage to kafka for CARMA Streets Time Synchronization.
* @param msg TimeSyncMessage received by plugin when in simulation mode. Message provides current simulation time to all processes.
* @param routeableMsg routeable_message for time sync message.
*/
void HandleTimeSyncMessage(TimeSyncMessage &msg, routeable_message &routeableMsg) override;
/**
* @brief Subscribe to MAP message broadcast by the MAPPlugin. This handler will be called automatically whenever the MAPPlugin is broadcasting a J2735 MAP message.
* @param msg The J2735 MAP message received from the internal
Expand Down
6 changes: 3 additions & 3 deletions src/v2i-hub/CDASimAdapter/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ set(CMAKE_CXX_STANDARD 17)
FIND_PACKAGE( carma-clock )
BuildTmxPlugin ( )

TARGET_LINK_LIBRARIES (${PROJECT_NAME} tmxutils ::carma-clock rdkafka++ jsoncpp)
TARGET_LINK_LIBRARIES (${PROJECT_NAME} tmxutils ::carma-clock jsoncpp)

# ############
# # Testing ##
# ############
ADD_LIBRARY(${PROJECT_NAME}_lib src/CDASimConnection.cpp)
TARGET_LINK_LIBRARIES(${PROJECT_NAME}_lib PUBLIC tmxutils ::carma-clock rdkafka++ jsoncpp )
TARGET_LINK_LIBRARIES(${PROJECT_NAME}_lib PUBLIC tmxutils ::carma-clock jsoncpp )
SET(BINARY ${PROJECT_NAME}_test)
FILE(GLOB_RECURSE TEST_SOURCES LIST_DIRECTORIES false test/*.h test/*.cpp)
SET(SOURCES ${TEST_SOURCES} WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}/test)
ADD_EXECUTABLE(${BINARY} ${TEST_SOURCES})
ADD_TEST(NAME ${BINARY} COMMAND ${BINARY})
TARGET_INCLUDE_DIRECTORIES(${BINARY} PUBLIC /usr/local/lib src/)
TARGET_LINK_LIBRARIES(${BINARY} PUBLIC ${PROJECT_NAME}_lib gtest gmock tmxutils ::carma-clock rdkafka++ jsoncpp)
TARGET_LINK_LIBRARIES(${BINARY} PUBLIC ${PROJECT_NAME}_lib gtest gmock tmxutils ::carma-clock jsoncpp)
Loading

0 comments on commit 77127e2

Please sign in to comment.