Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
dan-du-car committed Jun 30, 2023
1 parent cad66ed commit 4052ee3
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 163 deletions.
3 changes: 2 additions & 1 deletion .devcontainer/docker-compose-vscode.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ services:
DOCKER_HOST_IP: 127.0.0.1
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "time_sync:1:1"
KAFKA_CREATE_TOPICS: "time_sync:1:1,modified_spat:1:1,v2xhub_scheduling_plan_sub:1:1"
KAFKA_LOG_DIRS: "/kafka/kafka-logs"
KAFKA_OFFSETS_RETENTION_MINUTES: 1
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- /etc/localtime:/etc/localtime:ro
Expand Down
2 changes: 1 addition & 1 deletion src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ namespace tmx::utils
switch (message->err())
{
case RdKafka::ERR__TIMED_OUT:
FILE_LOG(logWARNING) << _consumer->name() << " consume failed: " << message->errstr() << std::endl;
FILE_LOG(logDEBUG4) << _consumer->name() << " consume failed: " << message->errstr() << std::endl;
break;
case RdKafka::ERR_NO_ERROR:
FILE_LOG(logDEBUG1) << _consumer->name() << " read message at offset " << message->offset() << std::endl;
Expand Down
259 changes: 104 additions & 155 deletions src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace CARMAStreetsPlugin {
*/
CARMAStreetsPlugin::CARMAStreetsPlugin(string name) :
PluginClient(name) {
PLOG(logINFO) << "CARMAStreetsPlugin constructor!" << std::endl;
AddMessageFilter < BsmMessage > (this, &CARMAStreetsPlugin::HandleBasicSafetyMessage);
AddMessageFilter < tsm3Message > (this, &CARMAStreetsPlugin::HandleMobilityOperationMessage);
AddMessageFilter < tsm2Message > (this, &CARMAStreetsPlugin::HandleMobilityPathMessage);
Expand Down Expand Up @@ -62,13 +62,16 @@ void CARMAStreetsPlugin::UpdateConfigSettings() {
getline( ss, substring, ',');
_strategies.push_back( substring);
}
}

void CARMAStreetsPlugin::InitKafkaConsumerProducers()
{
std::string kafkaConnectString = _kafkaBrokerIp + ':' + _kafkaBrokerPort;
std::string error_string;
kafkaConnectString = _kafkaBrokerIp + ':' + _kafkaBrokerPort;
kafka_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
kafka_conf_sp_consumer = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
kafka_conf_spat_consumer = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);

//Producer
kafka_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
PLOG(logDEBUG) <<"Attempting to connect to " << kafkaConnectString;
if ((kafka_conf->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK)) {
PLOG(logERROR) <<"Setting kafka config options failed with error:" << error_string << "\n" <<"Exiting with exit code 1";
Expand All @@ -82,58 +85,31 @@ void CARMAStreetsPlugin::UpdateConfigSettings() {
PLOG(logERROR) <<"Creating kafka producer failed with error:" << error_string << "\n" <<"Exiting with exit code 1";
exit(1);
}
PLOG(logDEBUG) <<"Kafka producer created";

if (kafka_conf_sp_consumer->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK
|| (kafka_conf_sp_consumer->set("group.id", _subscribeToSchedulingPlanConsumerGroupId, error_string) != RdKafka::Conf::CONF_OK)
|| (kafka_conf_spat_consumer->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK)
|| (kafka_conf_spat_consumer->set("group.id", _subscribeToSpatConsumerGroupId, error_string) != RdKafka::Conf::CONF_OK)) {
PLOG(logERROR) <<"Setting kafka config group.id options failed with error:" << error_string << "\n" <<"Exiting with exit code 1";
exit(1);
} else {
PLOG(logDEBUG) <<"Kafka config group.id options set successfully";
}
kafka_conf_sp_consumer->set("enable.partition.eof", "true", error_string);
kafka_conf_spat_consumer->set("enable.partition.eof", "true", error_string);

_scheduing_plan_kafka_consumer = RdKafka::KafkaConsumer::create(kafka_conf_sp_consumer, error_string);
_spat_kafka_consumer = RdKafka::KafkaConsumer::create(kafka_conf_spat_consumer, error_string);

if ( !_scheduing_plan_kafka_consumer || !_spat_kafka_consumer) {
PLOG(logERROR) << "Failed to create Kafka consumers: " << error_string << std::endl;
exit(1);
}
PLOG(logDEBUG) << "Created consumer " << _scheduing_plan_kafka_consumer->name() << std::endl;
PLOG(logDEBUG) << "Created consumer " << _spat_kafka_consumer->name() << std::endl;

//create kafka topics
RdKafka::Conf *tconf_spat = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
RdKafka::Conf *tconf_sp = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if(!tconf_spat && !tconf_sp)
PLOG(logDEBUG) <<"Kafka producers created";

//Consumers
kafka_client client;
auto uuid = boost::uuids::random_generator()();
std::stringstream ss;
ss << uuid;
_subscribeToSpatConsumerGroupId += ss.str();
_subscribeToSchedulingPlanConsumerGroupId += ss.str();
PLOG(logERROR) << "Kafka INFO:" << kafkaConnectString<<_subscribeToSpatTopic<<_subscribeToSpatConsumerGroupId;
_spat_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSpatTopic,_subscribeToSpatConsumerGroupId);
_scheduing_plan_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSchedulingPlanTopic,_subscribeToSchedulingPlanConsumerGroupId);
if(!_scheduing_plan_kafka_consumer_ptr || !_spat_kafka_consumer_ptr)
{
PLOG(logERROR) << "RDKafka create topic conf failed ";
PLOG(logERROR) <<"Failed to create Kafka consumers.";
return;
}

_scheduing_plan_topic = RdKafka::Topic::create(_scheduing_plan_kafka_consumer,_subscribeToSchedulingPlanTopic,tconf_sp,error_string);
if(!_scheduing_plan_topic)
{
PLOG(logERROR) << "RDKafka create scheduing plan topic failed:" << error_string;
return ;
}

_spat_topic = RdKafka::Topic::create(_spat_kafka_consumer,_subscribeToSpatTopic,tconf_spat,error_string);
if(!_spat_topic)
PLOG(logDEBUG) <<"Kafka consumers created";
if(!_spat_kafka_consumer_ptr->init() || !_scheduing_plan_kafka_consumer_ptr->init())
{
PLOG(logERROR) << "RDKafka create SPAT topic failed:" << error_string;
return ;
PLOG(logERROR) <<"Kafka consumers init() failed!";
}

delete tconf_sp;
delete tconf_spat;

boost::thread thread_schpl(&CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic, this);
boost::thread thread_spat(&CARMAStreetsPlugin::SubscribeSpatKafkaTopic, this);
thread_schpl = new std::thread(&CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic, this);
thread_spat = new std::thread(&CARMAStreetsPlugin::SubscribeSpatKafkaTopic, this);
}

void CARMAStreetsPlugin::OnConfigChanged(const char *key, const char *value) {
Expand Down Expand Up @@ -519,6 +495,7 @@ void CARMAStreetsPlugin::OnStateChange(IvpPluginState state) {

if (state == IvpPluginState_registered) {
UpdateConfigSettings();
InitKafkaConsumerProducers();
}
}

Expand All @@ -527,67 +504,54 @@ void CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic()
if(_subscribeToSchedulingPlanTopic.length() > 0)
{
PLOG(logDEBUG) << "SubscribeSchedulingPlanKafkaTopics:" <<_subscribeToSchedulingPlanTopic << std::endl;
std::vector<std::string> topics;
topics.emplace_back(_subscribeToSchedulingPlanTopic);

RdKafka::ErrorCode err = _scheduing_plan_kafka_consumer->subscribe(topics);
if (err)
{
PLOG(logERROR) << "Failed to subscribe to " << topics.size() << " topics: " << RdKafka::err2str(err) << std::endl;
return;
}
_scheduing_plan_kafka_consumer_ptr->subscribe();

while (true)
while (_scheduing_plan_kafka_consumer_ptr->is_running())
{
auto msg = _scheduing_plan_kafka_consumer->consume( 500 );
if( msg->err() == RdKafka::ERR_NO_ERROR )
auto payload_str = _scheduing_plan_kafka_consumer_ptr->consume(500);
if(strlen(payload_str) > 0)
{
auto payload_str = static_cast<const char *>( msg->payload() );
if(msg->len() > 0)
{
PLOG(logDEBUG) << "consumed message payload: " << payload_str <<std::endl;
Json::Value payload_root;
Json::Reader payload_reader;
bool parse_sucessful = payload_reader.parse(payload_str, payload_root);
if( !parse_sucessful )
{
PLOG(logERROR) << "Error parsing payload: " << payload_str << std::endl;
SetStatus<uint>(Key_ScheduleMessageSkipped, ++_scheduleMessageSkipped);
continue;
}
PLOG(logDEBUG) << "consumed message payload: " << payload_str <<std::endl;
Json::Value payload_root;
Json::Reader payload_reader;
bool parse_sucessful = payload_reader.parse(payload_str, payload_root);
if( !parse_sucessful )
{
PLOG(logERROR) << "Error parsing payload: " << payload_str << std::endl;
SetStatus<uint>(Key_ScheduleMessageSkipped, ++_scheduleMessageSkipped);
continue;
}

Json::Value metadata = payload_root["metadata"];
Json::Value payload_json_array = payload_root["payload"];

for ( int index = 0; index < payload_json_array.size(); ++index )
Json::Value metadata = payload_root["metadata"];
Json::Value payload_json_array = payload_root["payload"];

for ( int index = 0; index < payload_json_array.size(); ++index )
{
PLOG(logDEBUG) << payload_json_array[index] << std::endl;
Json::Value payload_json = payload_json_array[index];
tsm3EncodedMessage tsm3EncodedMsgs;
if( getEncodedtsm3 (&tsm3EncodedMsgs, metadata, payload_json) )
{
PLOG(logDEBUG) << payload_json_array[index] << std::endl;
Json::Value payload_json = payload_json_array[index];
tsm3EncodedMessage tsm3EncodedMsgs;
if( getEncodedtsm3 (&tsm3EncodedMsgs, metadata, payload_json) )
{
tsm3EncodedMsgs.set_flags( IvpMsgFlags_RouteDSRC );
tsm3EncodedMsgs.addDsrcMetadata(0xBFEE );
PLOG(logDEBUG) << "tsm3EncodedMsgs: " << tsm3EncodedMsgs;
BroadcastMessage(static_cast<routeable_message &>( tsm3EncodedMsgs ));
}
tsm3EncodedMsgs.set_flags( IvpMsgFlags_RouteDSRC );
tsm3EncodedMsgs.addDsrcMetadata(0xBFEE );
PLOG(logDEBUG) << "tsm3EncodedMsgs: " << tsm3EncodedMsgs;
BroadcastMessage(static_cast<routeable_message &>( tsm3EncodedMsgs ));
}
//Empty payload
if(payload_json_array.empty())
{
Json::Value payload_json = {};
tsm3EncodedMessage tsm3EncodedMsgs;
if( getEncodedtsm3 (&tsm3EncodedMsgs, metadata, payload_json) )
{
tsm3EncodedMsgs.set_flags( IvpMsgFlags_RouteDSRC );
tsm3EncodedMsgs.addDsrcMetadata(0xBFEE);
PLOG(logDEBUG) << "tsm3EncodedMsgs: " << tsm3EncodedMsgs;
BroadcastMessage(static_cast<routeable_message &>( tsm3EncodedMsgs ));
}
}
}
//Empty payload
if(payload_json_array.empty())
{
Json::Value payload_json = {};
tsm3EncodedMessage tsm3EncodedMsgs;
if( getEncodedtsm3 (&tsm3EncodedMsgs, metadata, payload_json) )
{
tsm3EncodedMsgs.set_flags( IvpMsgFlags_RouteDSRC );
tsm3EncodedMsgs.addDsrcMetadata(0xBFEE);
PLOG(logDEBUG) << "tsm3EncodedMsgs: " << tsm3EncodedMsgs;
BroadcastMessage(static_cast<routeable_message &>( tsm3EncodedMsgs ));
}
}
}
delete msg;
}

}
Expand All @@ -597,64 +561,51 @@ void CARMAStreetsPlugin::SubscribeSpatKafkaTopic(){
if(_subscribeToSpatTopic.length() > 0)
{
PLOG(logDEBUG) << "SubscribeSpatKafkaTopics:" <<_subscribeToSpatTopic << std::endl;
std::vector<std::string> topics;
topics.emplace_back(_subscribeToSpatTopic);

RdKafka::ErrorCode err = _spat_kafka_consumer->subscribe(topics);
if (err)
{
PLOG(logERROR) << "Failed to subscribe to " << topics.size() << " topics: " << RdKafka::err2str(err) << std::endl;
return;
}
//Initialize Json to J2735 Spat convertor
_spat_kafka_consumer_ptr->subscribe();
//Initialize Json to J2735 Spat convertor
JsonToJ2735SpatConverter spat_convertor;
while (true)
while (_spat_kafka_consumer_ptr->is_running())
{
auto msg = _spat_kafka_consumer->consume( 500 );
if( msg->err() == RdKafka::ERR_NO_ERROR )
auto payload_str = _spat_kafka_consumer_ptr->consume(500);
if(strlen(payload_str) > 0)
{
auto payload_str = static_cast<const char *>( msg->payload() );
if(msg->len() > 0)
PLOG(logDEBUG) << "consumed message payload: " << payload_str <<std::endl;
Json::Value payload_root;
Json::Reader payload_reader;
bool parse_sucessful = payload_reader.parse(payload_str, payload_root);
if( !parse_sucessful )
{
PLOG(logERROR) << "Error parsing payload: " << payload_str << std::endl;
SetStatus<uint>(Key_SPATMessageSkipped, ++_spatMessageSkipped);
continue;
}
//Convert the SPAT JSON string into J2735 SPAT message and encode it.
auto spat_ptr = std::make_shared<SPAT>();
spat_convertor.convertJson2Spat(payload_root, spat_ptr.get());
tmx::messages::SpatEncodedMessage spatEncodedMsg;
try
{
PLOG(logDEBUG) << "consumed message payload: " << payload_str <<std::endl;
Json::Value payload_root;
Json::Reader payload_reader;
bool parse_sucessful = payload_reader.parse(payload_str, payload_root);
if( !parse_sucessful )
{
PLOG(logERROR) << "Error parsing payload: " << payload_str << std::endl;
SetStatus<uint>(Key_SPATMessageSkipped, ++_spatMessageSkipped);
continue;
}
//Convert the SPAT JSON string into J2735 SPAT message and encode it.
auto spat_ptr = std::make_shared<SPAT>();
spat_convertor.convertJson2Spat(payload_root, spat_ptr.get());
tmx::messages::SpatEncodedMessage spatEncodedMsg;
try
{
spat_convertor.encodeSpat(spat_ptr, spatEncodedMsg);
}
catch (TmxException &ex)
{
// Skip messages that fail to encode.
PLOG(logERROR) << "Failed to encoded SPAT message : \n" << payload_str << std::endl << "Exception encountered: "
<< ex.what() << std::endl;
ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SPAT, spat_ptr.get());
SetStatus<uint>(Key_SPATMessageSkipped, ++_spatMessageSkipped);

continue;
}

spat_convertor.encodeSpat(spat_ptr, spatEncodedMsg);
}
catch (TmxException &ex)
{
// Skip messages that fail to encode.
PLOG(logERROR) << "Failed to encoded SPAT message : \n" << payload_str << std::endl << "Exception encountered: "
<< ex.what() << std::endl;
ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SPAT, spat_ptr.get());
PLOG(logDEBUG) << "SpatEncodedMessage: " << spatEncodedMsg;
SetStatus<uint>(Key_SPATMessageSkipped, ++_spatMessageSkipped);

//Broadcast the encoded SPAT message
spatEncodedMsg.set_flags(IvpMsgFlags_RouteDSRC);
spatEncodedMsg.addDsrcMetadata(0x8002);
BroadcastMessage(static_cast<routeable_message &>(spatEncodedMsg));
continue;
}

ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SPAT, spat_ptr.get());
PLOG(logDEBUG) << "SpatEncodedMessage: " << spatEncodedMsg;

//Broadcast the encoded SPAT message
spatEncodedMsg.set_flags(IvpMsgFlags_RouteDSRC);
spatEncodedMsg.addDsrcMetadata(0x8002);
BroadcastMessage(static_cast<routeable_message &>(spatEncodedMsg));
}
delete msg;
}
}
}
Expand Down Expand Up @@ -785,12 +736,10 @@ int CARMAStreetsPlugin::Main() {
uint64_t lastSendTime = 0;

while (_plugin->state != IvpPluginState_error) {



usleep(100000); //sleep for microseconds set from config.
}

return (EXIT_SUCCESS);
}
} /* namespace */
Expand Down
Loading

0 comments on commit 4052ee3

Please sign in to comment.