From d1bbb0aa47ac932917fe9dbd8c78f8f2c48ea33f Mon Sep 17 00:00:00 2001 From: dev Date: Fri, 14 Jul 2023 08:38:06 -0400 Subject: [PATCH] Issue-549: Moved Kafka time producer from CDASimAdapter to CARMA-Streets plugin + Update CARMA-Streets Plugin to be simulation time aware --- .../TmxUtils/src/PluginClientClockAware.cpp | 11 +++++--- src/tmx/TmxUtils/src/PluginClientClockAware.h | 7 ++++- .../src/CARMAStreetsPlugin.cpp | 9 ++++++- .../src/CARMAStreetsPlugin.h | 4 ++- .../CDASimAdapter/src/CDASimAdapter.cpp | 26 ------------------- .../src/include/CDASimAdapter.hpp | 10 ------- src/v2i-hub/MapPlugin/src/MapPlugin.cpp | 1 + src/v2i-hub/SpatPlugin/src/SpatPlugin.h | 1 + 8 files changed, 26 insertions(+), 43 deletions(-) diff --git a/src/tmx/TmxUtils/src/PluginClientClockAware.cpp b/src/tmx/TmxUtils/src/PluginClientClockAware.cpp index c8317c678..4cf87fe6b 100644 --- a/src/tmx/TmxUtils/src/PluginClientClockAware.cpp +++ b/src/tmx/TmxUtils/src/PluginClientClockAware.cpp @@ -8,11 +8,11 @@ namespace tmx::utils { : PluginClient(name) { // check for simulation mode enabled by environment variable - bool simulationMode = sim::is_simulation_mode(); + _simulation_mode = sim::is_simulation_mode(); using namespace fwha_stol::lib::time; - clock = std::make_shared(simulationMode); - if (simulationMode) { + clock = std::make_shared(_simulation_mode); + if (_simulation_mode) { AddMessageFilter(this, &PluginClientClockAware::HandleTimeSyncMessage); } @@ -25,7 +25,6 @@ namespace tmx::utils { this->getClock()->update( msg.get_timestep() ); if (sim::is_simulation_mode() ) { SetStatus(Key_Simulation_Time_Step, Clock::ToUtcPreciseTimeString(msg.get_timestep())); - } } @@ -36,4 +35,8 @@ namespace tmx::utils { } } + bool PluginClientClockAware::isSimulationMode() const { + return _simulation_mode; + } + } \ No newline at end of file diff --git a/src/tmx/TmxUtils/src/PluginClientClockAware.h b/src/tmx/TmxUtils/src/PluginClientClockAware.h index e0981b8d1..974f55060 100644 --- a/src/tmx/TmxUtils/src/PluginClientClockAware.h +++ b/src/tmx/TmxUtils/src/PluginClientClockAware.h @@ -28,7 +28,7 @@ class PluginClientClockAware : public PluginClient { * @param msg TimeSyncMessage broadcast on TMX core * @param routeableMsg */ - void HandleTimeSyncMessage(tmx::messages::TimeSyncMessage &msg, routeable_message &routeableMsg ); + virtual void HandleTimeSyncMessage(tmx::messages::TimeSyncMessage &msg, routeable_message &routeableMsg ); protected: @@ -41,6 +41,9 @@ class PluginClientClockAware : public PluginClient { } void OnStateChange(IvpPluginState state) override; + + bool isSimulationMode() const; + private: /** @@ -55,6 +58,8 @@ class PluginClientClockAware : public PluginClient { * @brief Status label to indicate whether plugin is in Simulation Mode. */ const char* Key_Simulation_Mode = "Simulation Mode "; + + bool _simulation_mode; }; diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp index 71703bd59..d0cc40399 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp @@ -19,7 +19,7 @@ namespace CARMAStreetsPlugin { * @param name The name to give the plugin for identification purposes */ CARMAStreetsPlugin::CARMAStreetsPlugin(string name) : - PluginClient(name) { + PluginClientClockAware(name) { AddMessageFilter < BsmMessage > (this, &CARMAStreetsPlugin::HandleBasicSafetyMessage); AddMessageFilter < tsm3Message > (this, &CARMAStreetsPlugin::HandleMobilityOperationMessage); AddMessageFilter < tsm2Message > (this, &CARMAStreetsPlugin::HandleMobilityPathMessage); @@ -109,6 +109,13 @@ void CARMAStreetsPlugin::OnConfigChanged(const char *key, const char *value) { UpdateConfigSettings(); } +void CARMAStreetsPlugin::HandleTimeSyncMessage(tmx::messages::TimeSyncMessage &msg, routeable_message &routeableMsg ) { + PluginClientClockAware::HandleTimeSyncMessage(msg, routeableMsg); + if ( isSimulationMode()) { + PLOG(logINFO) << "Handling TimeSync messages!" << std::endl; + produce_kafka_msg(msg.to_string(), "time_sync"); + } +} void CARMAStreetsPlugin::HandleMobilityOperationMessage(tsm3Message &msg, routeable_message &routeableMsg ) { try { diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h index 35df38132..3f0ade953 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h @@ -20,6 +20,7 @@ #include #include #include "JsonToJ2735SSMConverter.h" +#include "PluginClientClockAware.h" @@ -32,7 +33,7 @@ using namespace boost::property_tree; namespace CARMAStreetsPlugin { -class CARMAStreetsPlugin: public PluginClient { +class CARMAStreetsPlugin: public PluginClientClockAware { public: CARMAStreetsPlugin(std::string); virtual ~CARMAStreetsPlugin(); @@ -48,6 +49,7 @@ class CARMAStreetsPlugin: public PluginClient { void HandleMobilityOperationMessage(tsm3Message &msg, routeable_message &routeableMsg); void HandleMobilityPathMessage(tsm2Message &msg, routeable_message &routeableMsg); void HandleBasicSafetyMessage(BsmMessage &msg, routeable_message &routeableMsg); + virtual void HandleTimeSyncMessage(TimeSyncMessage &msg, routeable_message &routeableMsg) override; /** * @brief Subscribe to MAP message broadcast by the MAPPlugin. This handler will be called automatically whenever the MAPPlugin is broadcasting a J2735 MAP message. * @param msg The J2735 MAP message received from the internal diff --git a/src/v2i-hub/CDASimAdapter/src/CDASimAdapter.cpp b/src/v2i-hub/CDASimAdapter/src/CDASimAdapter.cpp index a036ff6f2..9a149f9b8 100644 --- a/src/v2i-hub/CDASimAdapter/src/CDASimAdapter.cpp +++ b/src/v2i-hub/CDASimAdapter/src/CDASimAdapter.cpp @@ -45,35 +45,12 @@ namespace CDASimAdapter{ } } - bool CDASimAdapter::initialize_time_producer() { - try { - std::string _broker_str = sim::get_sim_config(sim::KAFKA_BROKER_ADDRESS); - std::string _topic = sim::get_sim_config(sim::TIME_SYNC_TOPIC); - - kafka_client client; - time_producer = client.create_producer(_broker_str,_topic); - return time_producer->init(); - - } - catch( const runtime_error &e ) { - PLOG(logWARNING) << "Initialization of time producer failed: " << e.what() << std::endl; - } - return false; - } void CDASimAdapter::forward_time_sync_message(tmx::messages::TimeSyncMessage &msg) { std::string payload =msg.to_string(); PLOG(logDEBUG1) << "Sending Time Sync Message " << msg << std::endl; this->BroadcastMessage(msg, _name, 0 , IvpMsgFlags_None); - if (time_producer && time_producer->is_running()) { - try { - time_producer->send(payload); - } - catch( const runtime_error &e ) { - PLOG(logERROR) << "Exception encountered during kafka time sync forward : " << e.what() << std::endl; - } - } } @@ -91,9 +68,6 @@ namespace CDASimAdapter{ PLOG(logINFO) << "CDASim connecting " << simulation_ip << "\nUsing Registration Port : " << std::to_string( simulation_registration_port) << " Time Sync Port: " << std::to_string( time_sync_port) << " and V2X Port: " << std::to_string(v2x_port) << std::endl; - if (!initialize_time_producer()) { - return false; - } if ( connection ) { connection.reset(new CDASimConnection( simulation_ip, infrastructure_id, simulation_registration_port, sim_v2x_port, local_ip, time_sync_port, v2x_port, location )); diff --git a/src/v2i-hub/CDASimAdapter/src/include/CDASimAdapter.hpp b/src/v2i-hub/CDASimAdapter/src/include/CDASimAdapter.hpp index 6de71bcd6..bd2d2e921 100644 --- a/src/v2i-hub/CDASimAdapter/src/include/CDASimAdapter.hpp +++ b/src/v2i-hub/CDASimAdapter/src/include/CDASimAdapter.hpp @@ -18,8 +18,6 @@ #include #include #include "CDASimConnection.hpp" -#include -#include #include #include "ThreadWorker.h" @@ -61,13 +59,6 @@ namespace CDASimAdapter { void OnStateChange(IvpPluginState state) override; // Virtual method overrides END. - /** - * @brief Get Kafka Connection string from environment variable KAFKA_BROKER_ADDRESS and time sync topic name from - * CARMA_INFRASTRUCTURE_TIME_SYNC_TOPIC and initialize a Kafka producer to forward time synchronization messages to - * all infrastructure services. - * @return true if initialization is successful and false if initialization fails. - */ - bool initialize_time_producer(); /** * @brief Method to attempt to establish connection between CARMA Simulation and Infrastructure Software (V2X-Hub). * @return true if successful and false if unsuccessful. @@ -111,7 +102,6 @@ namespace CDASimAdapter { private: tmx::utils::WGS84Point location; - std::shared_ptr time_producer; std::unique_ptr connection; std::mutex _lock; std::unique_ptr thread_timer; diff --git a/src/v2i-hub/MapPlugin/src/MapPlugin.cpp b/src/v2i-hub/MapPlugin/src/MapPlugin.cpp index ba4a54b05..2317e7800 100644 --- a/src/v2i-hub/MapPlugin/src/MapPlugin.cpp +++ b/src/v2i-hub/MapPlugin/src/MapPlugin.cpp @@ -81,6 +81,7 @@ class MapPlugin: public PluginClientClockAware { void OnConfigChanged(const char *key, const char *value); void OnMessageReceived(IvpMessage *msg); void OnStateChange(IvpPluginState state); + private: std::atomic _mapAction {-1}; std::atomic _isMapFileNew {false}; diff --git a/src/v2i-hub/SpatPlugin/src/SpatPlugin.h b/src/v2i-hub/SpatPlugin/src/SpatPlugin.h index a42552618..0d0b9987f 100644 --- a/src/v2i-hub/SpatPlugin/src/SpatPlugin.h +++ b/src/v2i-hub/SpatPlugin/src/SpatPlugin.h @@ -44,6 +44,7 @@ class SpatPlugin: public tmx::utils::PluginClientClockAware { void OnConfigChanged(const char *key, const char *value); void OnStateChange(IvpPluginState state); + private: