diff --git a/.devcontainer/docker-compose-vscode.yml b/.devcontainer/docker-compose-vscode.yml index a34dd25c9..89b1efd18 100755 --- a/.devcontainer/docker-compose-vscode.yml +++ b/.devcontainer/docker-compose-vscode.yml @@ -24,7 +24,6 @@ services: - SIM_V2X_PORT=5757 - V2X_PORT=8686 - INFRASTRUCTURE_ID=1 - - KAFKA_BROKER_ADDRESS=127.0.0.1:9092 secrets: - mysql_password diff --git a/src/tmx/TmxUtils/src/PluginClientClockAware.cpp b/src/tmx/TmxUtils/src/PluginClientClockAware.cpp index c8317c678..4cf87fe6b 100644 --- a/src/tmx/TmxUtils/src/PluginClientClockAware.cpp +++ b/src/tmx/TmxUtils/src/PluginClientClockAware.cpp @@ -8,11 +8,11 @@ namespace tmx::utils { : PluginClient(name) { // check for simulation mode enabled by environment variable - bool simulationMode = sim::is_simulation_mode(); + _simulation_mode = sim::is_simulation_mode(); using namespace fwha_stol::lib::time; - clock = std::make_shared(simulationMode); - if (simulationMode) { + clock = std::make_shared(_simulation_mode); + if (_simulation_mode) { AddMessageFilter(this, &PluginClientClockAware::HandleTimeSyncMessage); } @@ -25,7 +25,6 @@ namespace tmx::utils { this->getClock()->update( msg.get_timestep() ); if (sim::is_simulation_mode() ) { SetStatus(Key_Simulation_Time_Step, Clock::ToUtcPreciseTimeString(msg.get_timestep())); - } } @@ -36,4 +35,8 @@ namespace tmx::utils { } } + bool PluginClientClockAware::isSimulationMode() const { + return _simulation_mode; + } + } \ No newline at end of file diff --git a/src/tmx/TmxUtils/src/PluginClientClockAware.h b/src/tmx/TmxUtils/src/PluginClientClockAware.h index e0981b8d1..974f55060 100644 --- a/src/tmx/TmxUtils/src/PluginClientClockAware.h +++ b/src/tmx/TmxUtils/src/PluginClientClockAware.h @@ -28,7 +28,7 @@ class PluginClientClockAware : public PluginClient { * @param msg TimeSyncMessage broadcast on TMX core * @param routeableMsg */ - void HandleTimeSyncMessage(tmx::messages::TimeSyncMessage &msg, routeable_message &routeableMsg ); + virtual void HandleTimeSyncMessage(tmx::messages::TimeSyncMessage &msg, routeable_message &routeableMsg ); protected: @@ -41,6 +41,9 @@ class PluginClientClockAware : public PluginClient { } void OnStateChange(IvpPluginState state) override; + + bool isSimulationMode() const; + private: /** @@ -55,6 +58,8 @@ class PluginClientClockAware : public PluginClient { * @brief Status label to indicate whether plugin is in Simulation Mode. */ const char* Key_Simulation_Mode = "Simulation Mode "; + + bool _simulation_mode; }; diff --git a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp index a0a3d2c8f..97f9d4373 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp +++ b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp @@ -8,6 +8,10 @@ namespace tmx::utils _partition(partition) { } + kafka_consumer_worker::~kafka_consumer_worker() { + stop(); + FILE_LOG(logWARNING) << "Kafka consumer destroyed!" << std::endl; + } bool kafka_consumer_worker::init() { @@ -93,11 +97,12 @@ namespace tmx::utils void kafka_consumer_worker::stop() { + FILE_LOG(logWARNING) << "Stopping Kafka Consumer!" << std::endl; _run = false; //Close and shutdown the consumer. _consumer->close(); - /*Destroy kafka instance*/ // Wait for RdKafka to decommission. - RdKafka::wait_destroyed(5000); + FILE_LOG(logWARNING) << "Kafka Consumer Stopped!" << std::endl; + } void kafka_consumer_worker::subscribe() diff --git a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h index 95a9c96a8..341492b17 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h +++ b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h @@ -68,6 +68,19 @@ namespace tmx::utils { * @param partition partition consumer should be assigned to. */ kafka_consumer_worker(const std::string &broker_str, const std::string &topic_str, const std::string & group_id, int64_t cur_offset = 0, int32_t partition = 0); + /** + * @brief Destroy the kafka consumer worker object. Calls stop on consumer to clean up resources. + */ + ~kafka_consumer_worker(); + // Rule of 5 because destructor is define (https://www.codementor.io/@sandesh87/the-rule-of-five-in-c-1pdgpzb04f) + // Delete copy constructor + kafka_consumer_worker(kafka_consumer_worker& other) = delete; + // Delete copy assigment + kafka_consumer_worker& operator=(const kafka_consumer_worker& other) = delete; + // delete move constructor + kafka_consumer_worker(kafka_consumer_worker &&consumer) = delete; + // delete move assignment + kafka_consumer_worker const & operator=(kafka_consumer_worker &&consumer) = delete; /** * @brief Initialize kafka_consumer_worker * @@ -89,7 +102,7 @@ namespace tmx::utils { /** * @brief Stop running kafka consumer. */ - virtual void stop(); + void stop(); /** * @brief Print current configurations. */ @@ -101,11 +114,6 @@ namespace tmx::utils { * @return false if kafka consumer is stopped. */ virtual bool is_running() const; - /** - * @brief Destroy the kafka consumer worker object - * - */ - virtual ~kafka_consumer_worker() = default; }; } diff --git a/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp b/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp index e81364c19..02d0a0cbb 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp +++ b/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp @@ -38,6 +38,11 @@ namespace tmx::utils { } + kafka_producer_worker::~kafka_producer_worker() { + stop(); + FILE_LOG(logWARNING) << "Kafka Producer Worker Destroyed!" << std::endl; + } + bool kafka_producer_worker::init() { if(init_producer()) @@ -217,16 +222,18 @@ namespace tmx::utils { if (_producer) { - _producer->flush(10 * 1000 /* wait for max 10 seconds */); - + auto error =_producer->flush(10 * 1000 /* wait for max 10 seconds */); + if (error == RdKafka::ERR__TIMED_OUT) + FILE_LOG(logERROR) << "Flush attempt timed out!" << std::endl; if (_producer->outq_len() > 0) - FILE_LOG(logWARNING) << _producer->name() << _producer->outq_len() << " message(s) were not delivered." << std::endl; + FILE_LOG(logERROR) << _producer->name() << _producer->outq_len() << " message(s) were not delivered." << std::endl; } } catch (const std::runtime_error &e) { FILE_LOG(logERROR) << "Error encountered flushing producer : " << e.what() << std::endl; } + FILE_LOG(logWARNING) << "Kafka producer stopped!" << std::endl; } void kafka_producer_worker::printCurrConf() diff --git a/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h b/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h index 7dde427d1..ae8d0b56a 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h +++ b/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h @@ -60,6 +60,19 @@ namespace tmx::utils * @param broker_str network address of kafka broker. */ explicit kafka_producer_worker(const std::string &brokers); + /** + * @brief Destroy the kafka producer worker object. Calls stop on producer to clean up resources. + */ + virtual ~kafka_producer_worker(); + // Rule of 5 because destructor is define (https://www.codementor.io/@sandesh87/the-rule-of-five-in-c-1pdgpzb04f) + // delete copy constructor + kafka_producer_worker(kafka_producer_worker& other) = delete; + // delete copy assignment + kafka_producer_worker& operator=(const kafka_producer_worker& other) = delete; + // delete move constructor + kafka_producer_worker(kafka_producer_worker &&producer) = delete; + // delete move assignment + kafka_producer_worker const & operator=(kafka_producer_worker &&producer) = delete; /** * @brief Initialize kafka_producer_worker. This method must be called before send! * @@ -100,16 +113,12 @@ namespace tmx::utils /** * @brief Stop running kafka producer. */ - virtual void stop(); + void stop(); /** * @brief Print current configurations. */ virtual void printCurrConf(); - /** - * @brief Destroy the kafka producer worker object - * - */ - virtual ~kafka_producer_worker() = default; + }; } diff --git a/src/tmx/TmxUtils/src/kafka/mock_kafka_consumer_worker.h b/src/tmx/TmxUtils/src/kafka/mock_kafka_consumer_worker.h index a2bbb888b..fc927d84c 100644 --- a/src/tmx/TmxUtils/src/kafka/mock_kafka_consumer_worker.h +++ b/src/tmx/TmxUtils/src/kafka/mock_kafka_consumer_worker.h @@ -31,7 +31,6 @@ namespace tmx::utils { MOCK_METHOD(bool, init,(),(override)); MOCK_METHOD(const char*, consume, (int timeout_ms), (override)); MOCK_METHOD(void, subscribe, (), (override)); - MOCK_METHOD(void, stop, (), (override)); MOCK_METHOD(void, printCurrConf, (), (override)); MOCK_METHOD(bool, is_running, (), (const override)); }; diff --git a/src/tmx/TmxUtils/src/kafka/mock_kafka_producer_worker.h b/src/tmx/TmxUtils/src/kafka/mock_kafka_producer_worker.h index c587169b4..cb41d8de5 100644 --- a/src/tmx/TmxUtils/src/kafka/mock_kafka_producer_worker.h +++ b/src/tmx/TmxUtils/src/kafka/mock_kafka_producer_worker.h @@ -26,7 +26,6 @@ namespace tmx::utils { MOCK_METHOD(bool, init,(),(override)); MOCK_METHOD(void, send, (const std::string &msg), (override)); MOCK_METHOD(bool, is_running, (), (const, override)); - MOCK_METHOD(void, stop, (), (override)); MOCK_METHOD(void, printCurrConf, (), (override)); }; } \ No newline at end of file diff --git a/src/tmx/TmxUtils/test/KafkaTestEnvironment.cpp b/src/tmx/TmxUtils/test/KafkaTestEnvironment.cpp new file mode 100644 index 000000000..85fd6cdfa --- /dev/null +++ b/src/tmx/TmxUtils/test/KafkaTestEnvironment.cpp @@ -0,0 +1,27 @@ +#include "gtest/gtest.h" +#include + +/** + * @brief Kafka Test Environment which allows for Setup/Teardown configuration at the + * test program level. Teardown waits on all rd_kafka_t objects to be destroyed. + */ +class KafkaTestEnvironment : public ::testing::Environment { + public: + ~KafkaTestEnvironment() override {} + + // Override this to define how to set up the environment. + void SetUp() override {} + + // Override this to define how to tear down the environment. + void TearDown() override { + std::cout << "Waiting for all RDKafka objects to be destroyed!" << std::endl; + // Wait for all rd_kafka_t objects to be destroyed + auto error = RdKafka::wait_destroyed(5000); + if (error == RdKafka::ERR__TIMED_OUT) { + std::cout << "Wait destroy attempted timed out!" << std::endl; + } + else { + std::cout << "All Objects are destroyed!" << std::endl; + } + } +}; \ No newline at end of file diff --git a/src/tmx/TmxUtils/test/Main.cpp b/src/tmx/TmxUtils/test/Main.cpp index 75163d417..7bffd67ea 100644 --- a/src/tmx/TmxUtils/test/Main.cpp +++ b/src/tmx/TmxUtils/test/Main.cpp @@ -6,9 +6,11 @@ */ #include +#include "KafkaTestEnvironment.cpp" int main(int argc, char **argv) { + ::testing::AddGlobalTestEnvironment(new KafkaTestEnvironment()); testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/src/tmx/TmxUtils/test/test_kafka_consumer_worker.cpp b/src/tmx/TmxUtils/test/test_kafka_consumer_worker.cpp index 2d0d680a7..ba7a33c99 100644 --- a/src/tmx/TmxUtils/test/test_kafka_consumer_worker.cpp +++ b/src/tmx/TmxUtils/test/test_kafka_consumer_worker.cpp @@ -1,5 +1,6 @@ #include "gtest/gtest.h" #include "kafka/kafka_client.h" +#include "PluginLog.h" TEST(test_kafka_consumer_worker, create_consumer) { diff --git a/src/tmx/TmxUtils/test/test_kafka_producer_worker.cpp b/src/tmx/TmxUtils/test/test_kafka_producer_worker.cpp index ca04a1e19..e6a101d3e 100644 --- a/src/tmx/TmxUtils/test/test_kafka_producer_worker.cpp +++ b/src/tmx/TmxUtils/test/test_kafka_producer_worker.cpp @@ -13,7 +13,6 @@ TEST(test_kafka_producer_worker, create_producer) std::string msg = "test message"; // // Run this unit test without launching kafka broker will throw connection refused error worker->send(msg); - worker->stop(); } TEST(test_kafka_producer_worker, create_producer_no_topic) @@ -28,5 +27,4 @@ TEST(test_kafka_producer_worker, create_producer_no_topic) std::string msg = "test message"; // // Run this unit test without launching kafka broker will throw connection refused error worker->send(msg, topic); - worker->stop(); } \ No newline at end of file diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp index 71703bd59..d76260ad6 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp @@ -19,23 +19,15 @@ namespace CARMAStreetsPlugin { * @param name The name to give the plugin for identification purposes */ CARMAStreetsPlugin::CARMAStreetsPlugin(string name) : - PluginClient(name) { + PluginClientClockAware(name) { AddMessageFilter < BsmMessage > (this, &CARMAStreetsPlugin::HandleBasicSafetyMessage); AddMessageFilter < tsm3Message > (this, &CARMAStreetsPlugin::HandleMobilityOperationMessage); AddMessageFilter < tsm2Message > (this, &CARMAStreetsPlugin::HandleMobilityPathMessage); AddMessageFilter < MapDataMessage > (this, &CARMAStreetsPlugin::HandleMapMessage); AddMessageFilter < SrmMessage > (this, &CARMAStreetsPlugin::HandleSRMMessage); - SubscribeToMessages(); - } -CARMAStreetsPlugin::~CARMAStreetsPlugin() { - //Todo: It does not seem the desctructor is called. - _spat_kafka_consumer_ptr->stop(); - _scheduing_plan_kafka_consumer_ptr->stop(); - _ssm_kafka_consumer_ptr->stop(); -} void CARMAStreetsPlugin::UpdateConfigSettings() { @@ -64,6 +56,7 @@ void CARMAStreetsPlugin::UpdateConfigSettings() { _strategies.clear(); while( ss.good() ) { std::string substring; + getline( ss, substring, ','); _strategies.push_back( substring); } @@ -109,6 +102,13 @@ void CARMAStreetsPlugin::OnConfigChanged(const char *key, const char *value) { UpdateConfigSettings(); } +void CARMAStreetsPlugin::HandleTimeSyncMessage(tmx::messages::TimeSyncMessage &msg, routeable_message &routeableMsg ) { + PluginClientClockAware::HandleTimeSyncMessage(msg, routeableMsg); + if ( isSimulationMode()) { + PLOG(logINFO) << "Handling TimeSync messages!" << std::endl; + produce_kafka_msg(msg.to_string(), "time_sync"); + } +} void CARMAStreetsPlugin::HandleMobilityOperationMessage(tsm3Message &msg, routeable_message &routeableMsg ) { try { diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h index 35df38132..f7167f269 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h @@ -20,6 +20,7 @@ #include #include #include "JsonToJ2735SSMConverter.h" +#include "PluginClientClockAware.h" @@ -32,10 +33,9 @@ using namespace boost::property_tree; namespace CARMAStreetsPlugin { -class CARMAStreetsPlugin: public PluginClient { +class CARMAStreetsPlugin: public PluginClientClockAware { public: CARMAStreetsPlugin(std::string); - virtual ~CARMAStreetsPlugin(); int Main(); protected: @@ -48,6 +48,12 @@ class CARMAStreetsPlugin: public PluginClient { void HandleMobilityOperationMessage(tsm3Message &msg, routeable_message &routeableMsg); void HandleMobilityPathMessage(tsm2Message &msg, routeable_message &routeableMsg); void HandleBasicSafetyMessage(BsmMessage &msg, routeable_message &routeableMsg); + /** + * @brief Overide PluginClientClockAware HandleTimeSyncMessage to producer TimeSyncMessage to kafka for CARMA Streets Time Synchronization. + * @param msg TimeSyncMessage received by plugin when in simulation mode. Message provides current simulation time to all processes. + * @param routeableMsg routeable_message for time sync message. + */ + void HandleTimeSyncMessage(TimeSyncMessage &msg, routeable_message &routeableMsg) override; /** * @brief Subscribe to MAP message broadcast by the MAPPlugin. This handler will be called automatically whenever the MAPPlugin is broadcasting a J2735 MAP message. * @param msg The J2735 MAP message received from the internal diff --git a/src/v2i-hub/CDASimAdapter/CMakeLists.txt b/src/v2i-hub/CDASimAdapter/CMakeLists.txt index 97468a2cf..8fbc4f376 100755 --- a/src/v2i-hub/CDASimAdapter/CMakeLists.txt +++ b/src/v2i-hub/CDASimAdapter/CMakeLists.txt @@ -4,17 +4,17 @@ set(CMAKE_CXX_STANDARD 17) FIND_PACKAGE( carma-clock ) BuildTmxPlugin ( ) -TARGET_LINK_LIBRARIES (${PROJECT_NAME} tmxutils ::carma-clock rdkafka++ jsoncpp) +TARGET_LINK_LIBRARIES (${PROJECT_NAME} tmxutils ::carma-clock jsoncpp) # ############ # # Testing ## # ############ ADD_LIBRARY(${PROJECT_NAME}_lib src/CDASimConnection.cpp) -TARGET_LINK_LIBRARIES(${PROJECT_NAME}_lib PUBLIC tmxutils ::carma-clock rdkafka++ jsoncpp ) +TARGET_LINK_LIBRARIES(${PROJECT_NAME}_lib PUBLIC tmxutils ::carma-clock jsoncpp ) SET(BINARY ${PROJECT_NAME}_test) FILE(GLOB_RECURSE TEST_SOURCES LIST_DIRECTORIES false test/*.h test/*.cpp) SET(SOURCES ${TEST_SOURCES} WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}/test) ADD_EXECUTABLE(${BINARY} ${TEST_SOURCES}) ADD_TEST(NAME ${BINARY} COMMAND ${BINARY}) TARGET_INCLUDE_DIRECTORIES(${BINARY} PUBLIC /usr/local/lib src/) -TARGET_LINK_LIBRARIES(${BINARY} PUBLIC ${PROJECT_NAME}_lib gtest gmock tmxutils ::carma-clock rdkafka++ jsoncpp) \ No newline at end of file +TARGET_LINK_LIBRARIES(${BINARY} PUBLIC ${PROJECT_NAME}_lib gtest gmock tmxutils ::carma-clock jsoncpp) \ No newline at end of file diff --git a/src/v2i-hub/CDASimAdapter/src/CDASimAdapter.cpp b/src/v2i-hub/CDASimAdapter/src/CDASimAdapter.cpp index 01cec61ef..44fd35e2f 100644 --- a/src/v2i-hub/CDASimAdapter/src/CDASimAdapter.cpp +++ b/src/v2i-hub/CDASimAdapter/src/CDASimAdapter.cpp @@ -69,35 +69,12 @@ namespace CDASimAdapter{ } } - bool CDASimAdapter::initialize_time_producer() { - try { - std::string _broker_str = sim::get_sim_config(sim::KAFKA_BROKER_ADDRESS); - std::string _topic = sim::get_sim_config(sim::TIME_SYNC_TOPIC); - - kafka_client client; - time_producer = client.create_producer(_broker_str,_topic); - return time_producer->init(); - - } - catch( const runtime_error &e ) { - PLOG(logWARNING) << "Initialization of time producer failed: " << e.what() << std::endl; - } - return false; - } void CDASimAdapter::forward_time_sync_message(tmx::messages::TimeSyncMessage &msg) { std::string payload =msg.to_string(); PLOG(logDEBUG1) << "Sending Time Sync Message " << msg << std::endl; this->BroadcastMessage(msg, _name, 0 , IvpMsgFlags_None); - if (time_producer && time_producer->is_running()) { - try { - time_producer->send(payload); - } - catch( const runtime_error &e ) { - PLOG(logERROR) << "Exception encountered during kafka time sync forward : " << e.what() << std::endl; - } - } } @@ -115,9 +92,6 @@ namespace CDASimAdapter{ PLOG(logINFO) << "CDASim connecting " << simulation_ip << "\nUsing Registration Port : " << std::to_string( simulation_registration_port) << " Time Sync Port: " << std::to_string( time_sync_port) << " and V2X Port: " << std::to_string(v2x_port) << std::endl; - if (!initialize_time_producer()) { - return false; - } if ( connection ) { connection.reset(new CDASimConnection( simulation_ip, infrastructure_id, simulation_registration_port, sim_v2x_port, local_ip, time_sync_port, v2x_port, location )); diff --git a/src/v2i-hub/CDASimAdapter/src/include/CDASimAdapter.hpp b/src/v2i-hub/CDASimAdapter/src/include/CDASimAdapter.hpp index fd586fafc..b3a979373 100644 --- a/src/v2i-hub/CDASimAdapter/src/include/CDASimAdapter.hpp +++ b/src/v2i-hub/CDASimAdapter/src/include/CDASimAdapter.hpp @@ -18,8 +18,6 @@ #include #include #include "CDASimConnection.hpp" -#include -#include #include #include "ThreadWorker.h" @@ -61,13 +59,6 @@ namespace CDASimAdapter void OnStateChange(IvpPluginState state) override; // Virtual method overrides END. - /** - * @brief Get Kafka Connection string from environment variable KAFKA_BROKER_ADDRESS and time sync topic name from - * CARMA_INFRASTRUCTURE_TIME_SYNC_TOPIC and initialize a Kafka producer to forward time synchronization messages to - * all infrastructure services. - * @return true if initialization is successful and false if initialization fails. - */ - bool initialize_time_producer(); /** * @brief Method to attempt to establish connection between CARMA Simulation and Infrastructure Software (V2X-Hub). * @return true if successful and false if unsuccessful. @@ -94,8 +85,7 @@ namespace CDASimAdapter */ void attempt_message_from_simulation() const; /** - * @brief Forward time sychronization message to TMX message bus for other V2X-Hub Plugin and to infrastructure Kafka Broker for - * CARMA Streets services + * @brief Forward time sychronization message to TMX message bus for other V2X-Hub Plugin * @param msg TimeSyncMessage. */ void forward_time_sync_message(tmx::messages::TimeSyncMessage &msg); @@ -115,8 +105,6 @@ namespace CDASimAdapter int max_connection_attempts; // Time in seconds between connection attempts. Most be greater than zero! uint connection_sleep_time; - // Kafka producer for sending time_sync messages to carma-streets - std::shared_ptr time_producer; // CDASim connection std::unique_ptr connection; // Mutex for configuration parameter thread safety diff --git a/src/v2i-hub/CDASimAdapter/src/include/CDASimConnection.hpp b/src/v2i-hub/CDASimAdapter/src/include/CDASimConnection.hpp index 86bd0ff95..9420cc3f3 100644 --- a/src/v2i-hub/CDASimAdapter/src/include/CDASimConnection.hpp +++ b/src/v2i-hub/CDASimAdapter/src/include/CDASimConnection.hpp @@ -30,7 +30,6 @@ namespace CDASimAdapter { * @param time_sync_port Port on which connection listens for time synchronization messages. * @param v2x_port Port on which connecction listens for incoming v2x messages. * @param location Simulationed location of infrastructure. - * @param producer Kafka Producer for forwarding time synchronization messages. */ explicit CDASimConnection( const std::string &simulation_ip, const std::string &infrastructure_id, const uint simulation_registration_port, const uint sim_v2x_port, const std::string &local_ip, const uint time_sync_port, const uint v2x_port, diff --git a/src/v2i-hub/CDASimAdapter/test/TestCARMASimulationConnection.cpp b/src/v2i-hub/CDASimAdapter/test/TestCARMASimulationConnection.cpp index 28b7376ab..b239c96e0 100644 --- a/src/v2i-hub/CDASimAdapter/test/TestCARMASimulationConnection.cpp +++ b/src/v2i-hub/CDASimAdapter/test/TestCARMASimulationConnection.cpp @@ -3,7 +3,6 @@ #include "include/CDASimConnection.hpp" #include "include/CDASimAdapter.hpp" #include -#include #include #include @@ -22,7 +21,7 @@ namespace CDASimAdapter { class TestCARMASimulationConnection : public ::testing::Test { protected: void SetUp() override { - // Initialize CARMA Simulation connection with (0,0,0) location and mock kafka producer. + // Initialize CARMA Simulation connection with (0,0,0) location. Point location; connection = std::make_shared("127.0.0.1", "1212", 4567, 4678, "127.0.0.1", 1213, 1214, location); } diff --git a/src/v2i-hub/MapPlugin/src/MapPlugin.cpp b/src/v2i-hub/MapPlugin/src/MapPlugin.cpp index ba4a54b05..2317e7800 100644 --- a/src/v2i-hub/MapPlugin/src/MapPlugin.cpp +++ b/src/v2i-hub/MapPlugin/src/MapPlugin.cpp @@ -81,6 +81,7 @@ class MapPlugin: public PluginClientClockAware { void OnConfigChanged(const char *key, const char *value); void OnMessageReceived(IvpMessage *msg); void OnStateChange(IvpPluginState state); + private: std::atomic _mapAction {-1}; std::atomic _isMapFileNew {false}; diff --git a/src/v2i-hub/SpatPlugin/src/SpatPlugin.h b/src/v2i-hub/SpatPlugin/src/SpatPlugin.h index a42552618..0d0b9987f 100644 --- a/src/v2i-hub/SpatPlugin/src/SpatPlugin.h +++ b/src/v2i-hub/SpatPlugin/src/SpatPlugin.h @@ -44,6 +44,7 @@ class SpatPlugin: public tmx::utils::PluginClientClockAware { void OnConfigChanged(const char *key, const char *value); void OnStateChange(IvpPluginState state); + private: