Skip to content

Commit

Permalink
Merge branch 'develop' into register_cdasim_adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
dan-du-car authored Jul 25, 2023
2 parents d0a5fbc + 77127e2 commit c96db25
Show file tree
Hide file tree
Showing 43 changed files with 315 additions and 174 deletions.
38 changes: 38 additions & 0 deletions docs/Release_notes.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,44 @@
V2X-Hub Release Notes
---------------------------------

Version 7.5.1, released June 21st, 2023
--------------------------------------------------------

**Summary:**
 V2X Hub release 7.5.1 includes added functionality to integrate V2X Hub with CDASim environment. This integration includes V2X Hub registering as a Roadside Unit (RSU) in the CDASim environment, consuming and producing J2735 messages to the CDASim environment, and adding functionality to synchronize plugins to CDASim simulation time.

**<ins>V2X Hub CDASim Functionalities </ins>**

Enhancements in this release:

- Added new carma-time-lib to V2X Hub to allow services to use an external source for time value and update rate.
- Added new Plugin Client ClockAware, which extends Plugin Client and implements functionality to consume time sync messages and updates a carma-clock object from carma-time-lib. Any plugins that want to synchronize their time to simulation must extend this plugin to gain access to this functionality.
- Added CDASim Adapter plugin which is responsible for establishing connection between V2X Hub and CDASim environment. This includes a handshake that provides information about the V2X Hub simulated location and ID and message forwarding for J2735 messages and time synchronization messages. This plugin requires several environment variables to be set which are documented on the GitHub repo README.md.

Fixes in this release:

- PR 488: Added a simulated clock functionality with the new time library and tested.
- PR 489: Setup Kafka consumers for the time topic when running in simulation mode.
- Issue 492: Created a carma-simulation adapter shell for service that will act as adapter for CARMA Simulation integration.
- PR 509: Added a V2X Hub plugin inside the simulation platform to receive all messages from V2X Hub. This plugin contains parameters and variables that are provided in real-world scenarios.
- Issue 514: Added handshake functionality to carma-simulation ambassador instance which register’s the V2X Hub instance inside the simulator to allow multiple V2X Hub instances to connect with a single CARMA Simulation platform.
- Issue 535: Updated infrastructure registration to use a cartesian point as location over a geodetic point to allow for easier configuration of simulated location of an RSU.
- Issue 537: Fixed configuration parameters to correctly map X, Y, Z coordinates to Point for Infrastructure registration in CDASim Adapter.
- Issue 525: Fixed CDASim Adapter plugin that throws an exception while attempting CDASim handshake with CARMA-Simulation.

Known issues in this release:

- Issue #540: CDASim Time Synchronization is non-time-regulating. If simulation runs too fast (faster than real-time) for V2X Hub to keep up, V2X Hub can fall behind in time.
- Issue #507: SPaT plugin throws segfault when in SIM MODE
- Issue #10 in carma-time-lib (not V2X Hub repo): wait_for_initialization does not support notifying multiple threads Work around exists for services/plugins using carma-time-lib.
- Issue #543: CARMA Streets Plugin Kafka Consumers can send redundant subscription attempts on initialization and can cause subscriptions to silently fail.

**<ins>Other </ins>**

Enhancements in this release:

- Issue 511: Added new functionality get the log time for a message received in V2xHub to forward to Carma cloud, and from receiving in Carma Cloud to forward V2xhub.

Version 7.5.0, released May 5th, 2023
--------------------------------------------------------

Expand Down
11 changes: 7 additions & 4 deletions src/tmx/TmxUtils/src/PluginClientClockAware.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ namespace tmx::utils {
: PluginClient(name)
{
// check for simulation mode enabled by environment variable
bool simulationMode = sim::is_simulation_mode();
_simulation_mode = sim::is_simulation_mode();

using namespace fwha_stol::lib::time;
clock = std::make_shared<CarmaClock>(simulationMode);
if (simulationMode) {
clock = std::make_shared<CarmaClock>(_simulation_mode);
if (_simulation_mode) {
AddMessageFilter<tmx::messages::TimeSyncMessage>(this, &PluginClientClockAware::HandleTimeSyncMessage);

}
Expand All @@ -25,7 +25,6 @@ namespace tmx::utils {
this->getClock()->update( msg.get_timestep() );
if (sim::is_simulation_mode() ) {
SetStatus(Key_Simulation_Time_Step, Clock::ToUtcPreciseTimeString(msg.get_timestep()));

}
}

Expand All @@ -36,4 +35,8 @@ namespace tmx::utils {
}
}

bool PluginClientClockAware::isSimulationMode() const {
return _simulation_mode;
}

}
7 changes: 6 additions & 1 deletion src/tmx/TmxUtils/src/PluginClientClockAware.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class PluginClientClockAware : public PluginClient {
* @param msg TimeSyncMessage broadcast on TMX core
* @param routeableMsg
*/
void HandleTimeSyncMessage(tmx::messages::TimeSyncMessage &msg, routeable_message &routeableMsg );
virtual void HandleTimeSyncMessage(tmx::messages::TimeSyncMessage &msg, routeable_message &routeableMsg );


protected:
Expand All @@ -41,6 +41,9 @@ class PluginClientClockAware : public PluginClient {
}

void OnStateChange(IvpPluginState state) override;

bool isSimulationMode() const;


private:
/**
Expand All @@ -55,6 +58,8 @@ class PluginClientClockAware : public PluginClient {
* @brief Status label to indicate whether plugin is in Simulation Mode.
*/
const char* Key_Simulation_Mode = "Simulation Mode ";

bool _simulation_mode;

};

Expand Down
19 changes: 19 additions & 0 deletions src/tmx/TmxUtils/src/Point.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#pragma once

namespace tmx::utils {


/// Cartesian Coordinates on a .
typedef struct Point
{
Point() : X(0), Y(0), Z(0) {}

Point(double x, double y, double z = 0.0):
X(x), Y(y), Z(z) { }

double X;
double Y;
double Z;
} Point;

} // namespace tmx::utils
9 changes: 7 additions & 2 deletions src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ namespace tmx::utils
_partition(partition)
{
}
kafka_consumer_worker::~kafka_consumer_worker() {
stop();
FILE_LOG(logWARNING) << "Kafka consumer destroyed!" << std::endl;
}

bool kafka_consumer_worker::init()
{
Expand Down Expand Up @@ -93,11 +97,12 @@ namespace tmx::utils

void kafka_consumer_worker::stop()
{
FILE_LOG(logWARNING) << "Stopping Kafka Consumer!" << std::endl;
_run = false;
//Close and shutdown the consumer.
_consumer->close();
/*Destroy kafka instance*/ // Wait for RdKafka to decommission.
RdKafka::wait_destroyed(5000);
FILE_LOG(logWARNING) << "Kafka Consumer Stopped!" << std::endl;

}

void kafka_consumer_worker::subscribe()
Expand Down
20 changes: 14 additions & 6 deletions src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ namespace tmx::utils {
* @param partition partition consumer should be assigned to.
*/
kafka_consumer_worker(const std::string &broker_str, const std::string &topic_str, const std::string & group_id, int64_t cur_offset = 0, int32_t partition = 0);
/**
* @brief Destroy the kafka consumer worker object. Calls stop on consumer to clean up resources.
*/
~kafka_consumer_worker();
// Rule of 5 because destructor is define (https://www.codementor.io/@sandesh87/the-rule-of-five-in-c-1pdgpzb04f)
// Delete copy constructor
kafka_consumer_worker(kafka_consumer_worker& other) = delete;
// Delete copy assigment
kafka_consumer_worker& operator=(const kafka_consumer_worker& other) = delete;
// delete move constructor
kafka_consumer_worker(kafka_consumer_worker &&consumer) = delete;
// delete move assignment
kafka_consumer_worker const & operator=(kafka_consumer_worker &&consumer) = delete;
/**
* @brief Initialize kafka_consumer_worker
*
Expand All @@ -89,7 +102,7 @@ namespace tmx::utils {
/**
* @brief Stop running kafka consumer.
*/
virtual void stop();
void stop();
/**
* @brief Print current configurations.
*/
Expand All @@ -101,11 +114,6 @@ namespace tmx::utils {
* @return false if kafka consumer is stopped.
*/
virtual bool is_running() const;
/**
* @brief Destroy the kafka consumer worker object
*
*/
virtual ~kafka_consumer_worker() = default;
};

}
Expand Down
13 changes: 10 additions & 3 deletions src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ namespace tmx::utils
{
}

kafka_producer_worker::~kafka_producer_worker() {
stop();
FILE_LOG(logWARNING) << "Kafka Producer Worker Destroyed!" << std::endl;
}

bool kafka_producer_worker::init()
{
if(init_producer())
Expand Down Expand Up @@ -217,16 +222,18 @@ namespace tmx::utils
{
if (_producer)
{
_producer->flush(10 * 1000 /* wait for max 10 seconds */);

auto error =_producer->flush(10 * 1000 /* wait for max 10 seconds */);
if (error == RdKafka::ERR__TIMED_OUT)
FILE_LOG(logERROR) << "Flush attempt timed out!" << std::endl;
if (_producer->outq_len() > 0)
FILE_LOG(logWARNING) << _producer->name() << _producer->outq_len() << " message(s) were not delivered." << std::endl;
FILE_LOG(logERROR) << _producer->name() << _producer->outq_len() << " message(s) were not delivered." << std::endl;
}
}
catch (const std::runtime_error &e)
{
FILE_LOG(logERROR) << "Error encountered flushing producer : " << e.what() << std::endl;
}
FILE_LOG(logWARNING) << "Kafka producer stopped!" << std::endl;
}

void kafka_producer_worker::printCurrConf()
Expand Down
21 changes: 15 additions & 6 deletions src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ namespace tmx::utils
* @param broker_str network address of kafka broker.
*/
explicit kafka_producer_worker(const std::string &brokers);
/**
* @brief Destroy the kafka producer worker object. Calls stop on producer to clean up resources.
*/
virtual ~kafka_producer_worker();
// Rule of 5 because destructor is define (https://www.codementor.io/@sandesh87/the-rule-of-five-in-c-1pdgpzb04f)
// delete copy constructor
kafka_producer_worker(kafka_producer_worker& other) = delete;
// delete copy assignment
kafka_producer_worker& operator=(const kafka_producer_worker& other) = delete;
// delete move constructor
kafka_producer_worker(kafka_producer_worker &&producer) = delete;
// delete move assignment
kafka_producer_worker const & operator=(kafka_producer_worker &&producer) = delete;
/**
* @brief Initialize kafka_producer_worker. This method must be called before send!
*
Expand Down Expand Up @@ -100,16 +113,12 @@ namespace tmx::utils
/**
* @brief Stop running kafka producer.
*/
virtual void stop();
void stop();
/**
* @brief Print current configurations.
*/
virtual void printCurrConf();
/**
* @brief Destroy the kafka producer worker object
*
*/
virtual ~kafka_producer_worker() = default;

};
}

Expand Down
1 change: 0 additions & 1 deletion src/tmx/TmxUtils/src/kafka/mock_kafka_consumer_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ namespace tmx::utils {
MOCK_METHOD(bool, init,(),(override));
MOCK_METHOD(const char*, consume, (int timeout_ms), (override));
MOCK_METHOD(void, subscribe, (), (override));
MOCK_METHOD(void, stop, (), (override));
MOCK_METHOD(void, printCurrConf, (), (override));
MOCK_METHOD(bool, is_running, (), (const override));
};
Expand Down
1 change: 0 additions & 1 deletion src/tmx/TmxUtils/src/kafka/mock_kafka_producer_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ namespace tmx::utils {
MOCK_METHOD(bool, init,(),(override));
MOCK_METHOD(void, send, (const std::string &msg), (override));
MOCK_METHOD(bool, is_running, (), (const, override));
MOCK_METHOD(void, stop, (), (override));
MOCK_METHOD(void, printCurrConf, (), (override));
};
}
27 changes: 27 additions & 0 deletions src/tmx/TmxUtils/test/KafkaTestEnvironment.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#include "gtest/gtest.h"
#include <librdkafka/rdkafkacpp.h>

/**
* @brief Kafka Test Environment which allows for Setup/Teardown configuration at the
* test program level. Teardown waits on all rd_kafka_t objects to be destroyed.
*/
class KafkaTestEnvironment : public ::testing::Environment {
public:
~KafkaTestEnvironment() override {}

// Override this to define how to set up the environment.
void SetUp() override {}

// Override this to define how to tear down the environment.
void TearDown() override {
std::cout << "Waiting for all RDKafka objects to be destroyed!" << std::endl;
// Wait for all rd_kafka_t objects to be destroyed
auto error = RdKafka::wait_destroyed(5000);
if (error == RdKafka::ERR__TIMED_OUT) {
std::cout << "Wait destroy attempted timed out!" << std::endl;
}
else {
std::cout << "All Objects are destroyed!" << std::endl;
}
}
};
2 changes: 2 additions & 0 deletions src/tmx/TmxUtils/test/Main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
*/

#include <gtest/gtest.h>
#include "KafkaTestEnvironment.cpp"

int main(int argc, char **argv)
{
::testing::AddGlobalTestEnvironment(new KafkaTestEnvironment());
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
1 change: 1 addition & 0 deletions src/tmx/TmxUtils/test/test_kafka_consumer_worker.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "gtest/gtest.h"
#include "kafka/kafka_client.h"
#include "PluginLog.h"

TEST(test_kafka_consumer_worker, create_consumer)
{
Expand Down
2 changes: 0 additions & 2 deletions src/tmx/TmxUtils/test/test_kafka_producer_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ TEST(test_kafka_producer_worker, create_producer)
std::string msg = "test message";
// // Run this unit test without launching kafka broker will throw connection refused error
worker->send(msg);
worker->stop();
}

TEST(test_kafka_producer_worker, create_producer_no_topic)
Expand All @@ -28,5 +27,4 @@ TEST(test_kafka_producer_worker, create_producer_no_topic)
std::string msg = "test message";
// // Run this unit test without launching kafka broker will throw connection refused error
worker->send(msg, topic);
worker->stop();
}
2 changes: 1 addition & 1 deletion src/v2i-hub/CARMACloudPlugin/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
PROJECT ( CARMACloudPlugin VERSION 7.5.0 LANGUAGES CXX )
PROJECT ( CARMACloudPlugin VERSION 7.5.1 LANGUAGES CXX )

SET (TMX_PLUGIN_NAME "CARMACloud")
add_compile_options(-fPIC)
Expand Down
2 changes: 1 addition & 1 deletion src/v2i-hub/CARMAStreetsPlugin/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
PROJECT ( CARMAStreetsPlugin VERSION 7.5.0 LANGUAGES CXX )
PROJECT ( CARMAStreetsPlugin VERSION 7.5.1 LANGUAGES CXX )

BuildTmxPlugin ( )

Expand Down
Loading

0 comments on commit c96db25

Please sign in to comment.