Skip to content

Commit

Permalink
Issue-549: Moved Kafka time producer from CDASimAdapter to CARMA-Stre…
Browse files Browse the repository at this point in the history
…ets plugin

+ Update CARMA-Streets Plugin to be simulation time aware
  • Loading branch information
paulbourelly999 committed Jul 14, 2023
1 parent 639ad08 commit d1bbb0a
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 43 deletions.
11 changes: 7 additions & 4 deletions src/tmx/TmxUtils/src/PluginClientClockAware.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ namespace tmx::utils {
: PluginClient(name)
{
// check for simulation mode enabled by environment variable
bool simulationMode = sim::is_simulation_mode();
_simulation_mode = sim::is_simulation_mode();

using namespace fwha_stol::lib::time;
clock = std::make_shared<CarmaClock>(simulationMode);
if (simulationMode) {
clock = std::make_shared<CarmaClock>(_simulation_mode);
if (_simulation_mode) {
AddMessageFilter<tmx::messages::TimeSyncMessage>(this, &PluginClientClockAware::HandleTimeSyncMessage);

}
Expand All @@ -25,7 +25,6 @@ namespace tmx::utils {
this->getClock()->update( msg.get_timestep() );
if (sim::is_simulation_mode() ) {
SetStatus(Key_Simulation_Time_Step, Clock::ToUtcPreciseTimeString(msg.get_timestep()));

}
}

Expand All @@ -36,4 +35,8 @@ namespace tmx::utils {
}
}

bool PluginClientClockAware::isSimulationMode() const {
return _simulation_mode;
}

}
7 changes: 6 additions & 1 deletion src/tmx/TmxUtils/src/PluginClientClockAware.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class PluginClientClockAware : public PluginClient {
* @param msg TimeSyncMessage broadcast on TMX core
* @param routeableMsg
*/
void HandleTimeSyncMessage(tmx::messages::TimeSyncMessage &msg, routeable_message &routeableMsg );
virtual void HandleTimeSyncMessage(tmx::messages::TimeSyncMessage &msg, routeable_message &routeableMsg );


protected:
Expand All @@ -41,6 +41,9 @@ class PluginClientClockAware : public PluginClient {
}

void OnStateChange(IvpPluginState state) override;

bool isSimulationMode() const;


private:
/**
Expand All @@ -55,6 +58,8 @@ class PluginClientClockAware : public PluginClient {
* @brief Status label to indicate whether plugin is in Simulation Mode.
*/
const char* Key_Simulation_Mode = "Simulation Mode ";

bool _simulation_mode;

};

Expand Down
9 changes: 8 additions & 1 deletion src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace CARMAStreetsPlugin {
* @param name The name to give the plugin for identification purposes
*/
CARMAStreetsPlugin::CARMAStreetsPlugin(string name) :
PluginClient(name) {
PluginClientClockAware(name) {
AddMessageFilter < BsmMessage > (this, &CARMAStreetsPlugin::HandleBasicSafetyMessage);
AddMessageFilter < tsm3Message > (this, &CARMAStreetsPlugin::HandleMobilityOperationMessage);
AddMessageFilter < tsm2Message > (this, &CARMAStreetsPlugin::HandleMobilityPathMessage);
Expand Down Expand Up @@ -109,6 +109,13 @@ void CARMAStreetsPlugin::OnConfigChanged(const char *key, const char *value) {
UpdateConfigSettings();
}

void CARMAStreetsPlugin::HandleTimeSyncMessage(tmx::messages::TimeSyncMessage &msg, routeable_message &routeableMsg ) {
PluginClientClockAware::HandleTimeSyncMessage(msg, routeableMsg);
if ( isSimulationMode()) {
PLOG(logINFO) << "Handling TimeSync messages!" << std::endl;
produce_kafka_msg(msg.to_string(), "time_sync");
}
}
void CARMAStreetsPlugin::HandleMobilityOperationMessage(tsm3Message &msg, routeable_message &routeableMsg ) {
try
{
Expand Down
4 changes: 3 additions & 1 deletion src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <kafka/kafka_client.h>
#include <kafka/kafka_consumer_worker.h>
#include "JsonToJ2735SSMConverter.h"
#include "PluginClientClockAware.h"



Expand All @@ -32,7 +33,7 @@ using namespace boost::property_tree;

namespace CARMAStreetsPlugin {

class CARMAStreetsPlugin: public PluginClient {
class CARMAStreetsPlugin: public PluginClientClockAware {
public:
CARMAStreetsPlugin(std::string);
virtual ~CARMAStreetsPlugin();
Expand All @@ -48,6 +49,7 @@ class CARMAStreetsPlugin: public PluginClient {
void HandleMobilityOperationMessage(tsm3Message &msg, routeable_message &routeableMsg);
void HandleMobilityPathMessage(tsm2Message &msg, routeable_message &routeableMsg);
void HandleBasicSafetyMessage(BsmMessage &msg, routeable_message &routeableMsg);
virtual void HandleTimeSyncMessage(TimeSyncMessage &msg, routeable_message &routeableMsg) override;
/**
* @brief Subscribe to MAP message broadcast by the MAPPlugin. This handler will be called automatically whenever the MAPPlugin is broadcasting a J2735 MAP message.
* @param msg The J2735 MAP message received from the internal
Expand Down
26 changes: 0 additions & 26 deletions src/v2i-hub/CDASimAdapter/src/CDASimAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,35 +45,12 @@ namespace CDASimAdapter{
}
}

bool CDASimAdapter::initialize_time_producer() {
try {
std::string _broker_str = sim::get_sim_config(sim::KAFKA_BROKER_ADDRESS);
std::string _topic = sim::get_sim_config(sim::TIME_SYNC_TOPIC);

kafka_client client;
time_producer = client.create_producer(_broker_str,_topic);
return time_producer->init();

}
catch( const runtime_error &e ) {
PLOG(logWARNING) << "Initialization of time producer failed: " << e.what() << std::endl;
}
return false;

}

void CDASimAdapter::forward_time_sync_message(tmx::messages::TimeSyncMessage &msg) {
std::string payload =msg.to_string();
PLOG(logDEBUG1) << "Sending Time Sync Message " << msg << std::endl;
this->BroadcastMessage<tmx::messages::TimeSyncMessage>(msg, _name, 0 , IvpMsgFlags_None);
if (time_producer && time_producer->is_running()) {
try {
time_producer->send(payload);
}
catch( const runtime_error &e ) {
PLOG(logERROR) << "Exception encountered during kafka time sync forward : " << e.what() << std::endl;
}
}

}

Expand All @@ -91,9 +68,6 @@ namespace CDASimAdapter{
PLOG(logINFO) << "CDASim connecting " << simulation_ip <<
"\nUsing Registration Port : " << std::to_string( simulation_registration_port) <<
" Time Sync Port: " << std::to_string( time_sync_port) << " and V2X Port: " << std::to_string(v2x_port) << std::endl;
if (!initialize_time_producer()) {
return false;
}
if ( connection ) {
connection.reset(new CDASimConnection( simulation_ip, infrastructure_id, simulation_registration_port, sim_v2x_port, local_ip,
time_sync_port, v2x_port, location ));
Expand Down
10 changes: 0 additions & 10 deletions src/v2i-hub/CDASimAdapter/src/include/CDASimAdapter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
#include <tmx/IvpPlugin.h>
#include <PluginClient.h>
#include "CDASimConnection.hpp"
#include <kafka/kafka_producer_worker.h>
#include <kafka/kafka_client.h>
#include <simulation/SimulationEnvUtils.h>
#include "ThreadWorker.h"

Expand Down Expand Up @@ -61,13 +59,6 @@ namespace CDASimAdapter {
void OnStateChange(IvpPluginState state) override;
// Virtual method overrides END.

/**
* @brief Get Kafka Connection string from environment variable KAFKA_BROKER_ADDRESS and time sync topic name from
* CARMA_INFRASTRUCTURE_TIME_SYNC_TOPIC and initialize a Kafka producer to forward time synchronization messages to
* all infrastructure services.
* @return true if initialization is successful and false if initialization fails.
*/
bool initialize_time_producer();
/**
* @brief Method to attempt to establish connection between CARMA Simulation and Infrastructure Software (V2X-Hub).
* @return true if successful and false if unsuccessful.
Expand Down Expand Up @@ -111,7 +102,6 @@ namespace CDASimAdapter {
private:

tmx::utils::WGS84Point location;
std::shared_ptr<tmx::utils::kafka_producer_worker> time_producer;
std::unique_ptr<CDASimConnection> connection;
std::mutex _lock;
std::unique_ptr<tmx::utils::ThreadTimer> thread_timer;
Expand Down
1 change: 1 addition & 0 deletions src/v2i-hub/MapPlugin/src/MapPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class MapPlugin: public PluginClientClockAware {
void OnConfigChanged(const char *key, const char *value);
void OnMessageReceived(IvpMessage *msg);
void OnStateChange(IvpPluginState state);

private:
std::atomic<int> _mapAction {-1};
std::atomic<bool> _isMapFileNew {false};
Expand Down
1 change: 1 addition & 0 deletions src/v2i-hub/SpatPlugin/src/SpatPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class SpatPlugin: public tmx::utils::PluginClientClockAware {
void OnConfigChanged(const char *key, const char *value);
void OnStateChange(IvpPluginState state);


private:


Expand Down

0 comments on commit d1bbb0a

Please sign in to comment.