Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MOSAIC-CARMA-Streets: CDASimAdapter listens on port for incoming Simulated Object Dectections and publish the serialized object to Kafka #552

Merged
merged 37 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
0963f01
init
dan-du-car Jul 17, 2023
c3b7ba8
update externalObject
dan-du-car Jul 18, 2023
c98778d
update external object converter
dan-du-car Jul 18, 2023
1705c98
update unit test
dan-du-car Jul 18, 2023
4e6aa43
code smell and coverage
dan-du-car Jul 19, 2023
a1faf70
add comments
dan-du-car Jul 19, 2023
9a534b3
use static func
dan-du-car Jul 19, 2023
7252eb4
address code smell
dan-du-car Jul 19, 2023
d035cf8
Merge branch 'develop' into add_external_object
dan-du-car Jul 20, 2023
d14b339
resolve merge error
dan-du-car Jul 21, 2023
144f29c
remove timer thread
dan-du-car Jul 21, 2023
f75137d
update message definition
dan-du-car Jul 24, 2023
e9e32c7
update
dan-du-car Jul 24, 2023
39f5a73
update
dan-du-car Jul 24, 2023
5b2f6d8
update
dan-du-car Jul 24, 2023
1d4e003
update
dan-du-car Jul 24, 2023
05291fd
update
dan-du-car Jul 24, 2023
48b73ea
update
dan-du-car Jul 24, 2023
ba1ebbc
update
dan-du-car Jul 25, 2023
a10a294
remove definition
dan-du-car Jul 25, 2023
d1b6086
update name
dan-du-car Jul 25, 2023
dc77fb2
update naming convension
dan-du-car Jul 25, 2023
78cf596
update naming convension
dan-du-car Jul 25, 2023
89fb734
update json key
dan-du-car Jul 25, 2023
bdd7dcf
rename variable
dan-du-car Jul 25, 2023
cba6d6b
address comments
dan-du-car Jul 26, 2023
5f4a9dc
update
dan-du-car Jul 26, 2023
aab4911
address comms
dan-du-car Jul 26, 2023
1cb1a6e
address comments
dan-du-car Jul 26, 2023
21af975
unit test failture
dan-du-car Jul 26, 2023
ff272a7
unit test failure
dan-du-car Jul 26, 2023
917fa34
Merge branch 'develop' into add_external_object
dan-du-car Jul 27, 2023
4aef6b1
fix merge issue
dan-du-car Jul 27, 2023
8aa3778
fix unit test
dan-du-car Jul 27, 2023
4bafa7c
address comments
dan-du-car Jul 28, 2023
b741588
adddress comments
dan-du-car Jul 28, 2023
bfdce62
code smell
dan-du-car Jul 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .devcontainer/docker-compose-vscode.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ services:
- TIME_SYNC_TOPIC=time_sync
- TIME_SYNC_PORT=7575
- SIM_V2X_PORT=5757
- SIM_INTERACTION_PORT=7576
paulbourelly999 marked this conversation as resolved.
Show resolved Hide resolved
- V2X_PORT=8686
- INFRASTRUCTURE_ID=1
- SENSOR_JSON_FILE_PATH=/var/www/plugins/MAP/sensors.json
Expand Down
1 change: 1 addition & 0 deletions src/tmx/Messages/include/MessageTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ static CONSTEXPR const char *MSGSUBTYPE_INCOMING_STRING = "Incoming";
static CONSTEXPR const char *MSGSUBTYPE_OUTGOING_STRING = "Outgoing";
static CONSTEXPR const char *MSGSUBTYPE_SHUTDOWN_STRING = "Shutdown";
static CONSTEXPR const char *MSGSUBTYPE_TIMESYNC_STRING = "TimeSync";
static CONSTEXPR const char *MSGSUBTYPE_SENSOR_DETECTED_OBJECT_STRING = "SensorDetectedObject";

} /* End namespace messages */

Expand Down
35 changes: 35 additions & 0 deletions src/tmx/Messages/include/simulation/SensorDetectedObject.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#ifndef INCLUDE_SIMULATED_SensorDetectedObject_H_
#define INCLUDE_SIMULATED_SensorDetectedObject_H_

#include <tmx/messages/message.hpp>
#include <MessageTypes.h>

namespace tmx
{
namespace messages
{
namespace simulation
{
/**
* This SensorDetectedObject is used to communicate the sensor detected object information with various applications
* including internal infrastructure applications and external road user applications through simulated environment.
* It defines the message type and sub type and all data members.
*/
class SensorDetectedObject : public tmx::message
{
public:
SensorDetectedObject(){};
SensorDetectedObject(const tmx::message_container_type &contents) : tmx::message(contents) {};
~SensorDetectedObject(){};
// Message type for routing this message through TMX core
static constexpr const char *MessageType = MSGTYPE_APPLICATION_STRING;

// // Message sub type for routing this message through TMX core
static constexpr const char *MessageSubType = MSGSUBTYPE_SENSOR_DETECTED_OBJECT_STRING;
};

}
}

}; // namespace tmx
#endif
5 changes: 5 additions & 0 deletions src/tmx/TmxUtils/src/simulation/SimulationEnvUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ namespace tmx::utils::sim{
* necessary in SIMULATION MODE for CDASim message forwarding.
*/
constexpr inline static const char *SIM_V2X_PORT = "SIM_V2X_PORT";
/**
* @brief Name of environment variable for storing port for forwarding v2x messages to CDASim. Only
* necessary in SIMULATION MODE for CDASim message forwarding.
*/
constexpr inline static const char *SIM_INTERACTION_PORT= "SIM_INTERACTION_PORT";
/**
* @brief Name of environment variable for storing port for receiving v2x messages from CDASim. Only
* necessary in SIMULATION MODE for CDASim message forwarding.
Expand Down
10 changes: 10 additions & 0 deletions src/v2i-hub/CARMAStreetsPlugin/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
"type":"J2735",
"subtype":"SPAT-P",
"description":"Signal Phase and Timing (SPAT) status for the signalized intersection."
},
{
"type":"Application",
"subtype":"SensorDetectedObject",
"description": "Sensor detected object for cooperative perception."
}
],
"configuration": [
Expand Down Expand Up @@ -116,6 +121,11 @@
"key": "SpatConsumerGroupId",
"default": "v2xhub_spat",
"description": "Apache Kafka consumer group ID for spat consumer."
},
{
"key": "SimSensorDetectedObjTopic",
"default": "v2xhub_sim_sensor_detected_object",
"description": "Apache Kafka topic plugin will transmit simulated sensor detected object to."
}

]
Expand Down
12 changes: 11 additions & 1 deletion src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ CARMAStreetsPlugin::CARMAStreetsPlugin(string name) :
AddMessageFilter < tsm2Message > (this, &CARMAStreetsPlugin::HandleMobilityPathMessage);
AddMessageFilter < MapDataMessage > (this, &CARMAStreetsPlugin::HandleMapMessage);
AddMessageFilter < SrmMessage > (this, &CARMAStreetsPlugin::HandleSRMMessage);
AddMessageFilter < simulation::SensorDetectedObject > (this, &CARMAStreetsPlugin::HandleSimulatedSensorDetectedMessage );

SubscribeToMessages();
}

Expand All @@ -48,7 +50,8 @@ void CARMAStreetsPlugin::UpdateConfigSettings() {
GetConfigValue<string>("MobilityOperationTopic", _transmitMobilityOperationTopic);
GetConfigValue<string>("MobilityPathTopic", _transmitMobilityPathTopic);
GetConfigValue<string>("MapTopic", _transmitMAPTopic);
GetConfigValue<string>("SRMTopic", _transmitSRMTopic);
GetConfigValue<string>("SRMTopic", _transmitSRMTopic);
GetConfigValue<string>("SimSensorDetectedObjTopic", _transmitSimSensorDetectedObjTopic);
// Populate strategies config
string config;
GetConfigValue<string>("MobilityOperationStrategies", config);
Expand Down Expand Up @@ -626,6 +629,13 @@ void CARMAStreetsPlugin::SubscribeSSMKafkaTopic(){
}

}

void CARMAStreetsPlugin::HandleSimulatedSensorDetectedMessage(simulation::SensorDetectedObject &msg, routeable_message &routeableMsg)
{
PLOG(logDEBUG) << "Produce sensor detected message in JSON format: " << msg.to_string() <<std::endl;
produce_kafka_msg( msg.to_string(), _transmitSimSensorDetectedObjTopic);
}

bool CARMAStreetsPlugin::getEncodedtsm3( tsm3EncodedMessage *tsm3EncodedMsg, Json::Value metadata, Json::Value payload_json )
{
try
Expand Down
8 changes: 8 additions & 0 deletions src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <kafka/kafka_client.h>
#include <kafka/kafka_consumer_worker.h>
#include "JsonToJ2735SSMConverter.h"
#include <simulation/SensorDetectedObject.h>
#include "PluginClientClockAware.h"


Expand Down Expand Up @@ -48,6 +49,12 @@ class CARMAStreetsPlugin: public PluginClientClockAware {
void HandleMobilityOperationMessage(tsm3Message &msg, routeable_message &routeableMsg);
void HandleMobilityPathMessage(tsm2Message &msg, routeable_message &routeableMsg);
void HandleBasicSafetyMessage(BsmMessage &msg, routeable_message &routeableMsg);
/**
* @brief Callback function when the plugin received detected object, and forward the detected object to Kafka topic.
* @param msg Detected object received from TMX bus.
* @param routeableMsg routeable_message for detected object.
*/
void HandleSimulatedSensorDetectedMessage(simulation::SensorDetectedObject &msg, routeable_message &routeableMsg);
/**
* @brief Overide PluginClientClockAware HandleTimeSyncMessage to producer TimeSyncMessage to kafka for CARMA Streets Time Synchronization.
* @param msg TimeSyncMessage received by plugin when in simulation mode. Message provides current simulation time to all processes.
Expand Down Expand Up @@ -104,6 +111,7 @@ class CARMAStreetsPlugin: public PluginClientClockAware {
std::string _transmitBSMTopic;
std::string _transmitMAPTopic;
std::string _transmitSRMTopic;
std::string _transmitSimSensorDetectedObjTopic;
std::string _kafkaBrokerIp;
std::string _kafkaBrokerPort;
std::shared_ptr<kafka_producer_worker> _kafka_producer_ptr;
Expand Down
81 changes: 81 additions & 0 deletions src/v2i-hub/CDASimAdapter/scripts/send_sim_detected_object_udp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@

import socket
import sys
import json
import time

# Script for integration testing CDASimAdapter Time Sync functionality.
# This python script sends periodic time sync messages to a configurable
# host and port. To Test the Time Sync functionality of the CDASimAdapter
# set port to the value of the TIME_SYNC_PORT environment variable.
#
# TODO Move this script into a more permanent location
count_num = 0

def generate_sim_external_object():
jsonResult = {
"metadata": {
"type": "Application",
"subtype": "SensorDetectedObject",
"timestamp": 123,
"isSimulated": True
},
"payload": {
"sensor_id": "sensor1",
"proj_string": "asdlasdkasd",
"type": "Car",
"confidence": "0.7",
"objectId": "Object1",
"position": {
"x": 1.0,
"y": 2.5,
"z": 1.1
},
"positionCovariance" : [12,12,2, 34, 34, 55],
"velocity": {
"x": 1.0,
"y": 2.5,
"z": 1.1
},
"velocityCovariance" : ["a11", "a12", "a13", "a21", "a22", "a23", "a31", "a32", "a33"],
"angularVelocity":{
"x": 1.0,
"y": 2.5,
"z": 1.1
},
"angularVelocityCovariance" : ["a11", "a12", "a13", "a21", "a22", "a23", "a31", "a32", "a33"],
"size": {
"length": 0.1,
"width": 0.4,
"height": 1.5
}
}
}
jsonResult = json.dumps(jsonResult)
return jsonResult
port = 7576
address = "127.0.0.1"
host = (address, port)
try:
sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
except socket.error as err:
print('Socket error because of %s' %(err))


while True :
try:
msg = generate_sim_external_object()
encoded_msg = str.encode(msg)
count_num += 1
sock.sendto(encoded_msg,host)
print( encoded_msg.decode(encoding= 'UTF-8'), 'was sent to ', host)
print(f'Message sent at ${time.time()}')
time.sleep(5)
except socket.gaierror:

print ('There an error resolving the host')
break

sock.close()


39 changes: 37 additions & 2 deletions src/v2i-hub/CDASimAdapter/src/CDASimAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ namespace CDASimAdapter{

if ( connection->is_connected() ) {
start_time_sync_thread_timer();
start_sensor_detected_object_detection_thread();
start_immediate_forward_thread();
start_message_receiver_thread();
}else {
Expand All @@ -78,13 +79,19 @@ namespace CDASimAdapter{

}

void CDASimAdapter::forward_simulated_detected_message(tmx::messages::simulation::SensorDetectedObject &msg) {
PLOG(logDEBUG1) << "Sending Simulated SensorDetectedObject Message " << msg << std::endl;
this->BroadcastMessage<tmx::messages::simulation::SensorDetectedObject>(msg, _name, 0 , IvpMsgFlags_None);
}

bool CDASimAdapter::connect() {
try {
std::string simulation_ip = sim::get_sim_config(sim::SIMULATION_IP);
std::string local_ip = sim::get_sim_config(sim::LOCAL_IP);
PLOG(logINFO) << "Simulation and local IP successfully initialized!"<< std::endl;
uint simulation_registration_port = std::stoul(sim::get_sim_config(sim::SIMULATION_REGISTRATION_PORT));
uint time_sync_port = std::stoul(sim::get_sim_config(sim::TIME_SYNC_PORT));
auto simulated_interaction_port =static_cast<unsigned int>(std::stoi(sim::get_sim_config(sim::SIM_INTERACTION_PORT)));
uint v2x_port = std::stoul(sim::get_sim_config(sim::V2X_PORT));
uint sim_v2x_port = std::stoul(sim::get_sim_config(sim::SIM_V2X_PORT));
std::string infrastructure_id = sim::get_sim_config(sim::INFRASTRUCTURE_ID);
Expand All @@ -95,11 +102,11 @@ namespace CDASimAdapter{
" Time Sync Port: " << std::to_string( time_sync_port) << " and V2X Port: " << std::to_string(v2x_port) << std::endl;
if ( connection ) {
connection.reset(new CDASimConnection( simulation_ip, infrastructure_id, simulation_registration_port, sim_v2x_port, local_ip,
time_sync_port, v2x_port, location, sensor_json_file_path));
time_sync_port, simulated_interaction_port, v2x_port, location, sensor_json_file_path ));
}
else {
connection = std::make_unique<CDASimConnection>(simulation_ip, infrastructure_id, simulation_registration_port, sim_v2x_port, local_ip,
time_sync_port, v2x_port, location, sensor_json_file_path);
time_sync_port, simulated_interaction_port, v2x_port, location, sensor_json_file_path);
}
}
catch (const TmxException &e) {
Expand Down Expand Up @@ -161,6 +168,34 @@ namespace CDASimAdapter{
}
}

void CDASimAdapter::start_sensor_detected_object_detection_thread() {
PLOG(logDEBUG) << "Creating Thread Timer for simulated external object" << std::endl;
try
{
if(!external_object_detection_thread_timer)
{
external_object_detection_thread_timer = std::make_unique<tmx::utils::ThreadTimer>();
}
external_object_detection_thread_timer->AddPeriodicTick([this](){
PLOG(logDEBUG1) << "Listening for Sensor Detected Message from CDASim." << std::endl;
auto msg = connection->consume_sensor_detected_object_message();
if ( !msg.is_empty()) {
this->forward_simulated_detected_message(msg);
}
else
{
PLOG(logDEBUG1) << "CDASim connection has not yet received an simulated sensor detected message!" << std::endl;
}
}//End lambda
, std::chrono::milliseconds(100));
external_object_detection_thread_timer->Start();
}
catch ( const UdpServerRuntimeError &e )
{
PLOG(logERROR) << "Error occured :" << e.what() << std::endl;
}
}

void CDASimAdapter::attempt_message_from_v2xhub() const {
try {
std::string msg = connection->consume_v2x_message_from_v2xhub();
Expand Down
Loading