From 4052ee32b51a4516dbdae42f0a23e853e562eb22 Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Thu, 29 Jun 2023 21:29:59 -0400 Subject: [PATCH] init --- .devcontainer/docker-compose-vscode.yml | 3 +- .../src/kafka/kafka_consumer_worker.cpp | 2 +- .../src/CARMAStreetsPlugin.cpp | 259 +++++++----------- .../src/CARMAStreetsPlugin.h | 19 +- 4 files changed, 120 insertions(+), 163 deletions(-) diff --git a/.devcontainer/docker-compose-vscode.yml b/.devcontainer/docker-compose-vscode.yml index c3e19d9a8..e79a86d98 100755 --- a/.devcontainer/docker-compose-vscode.yml +++ b/.devcontainer/docker-compose-vscode.yml @@ -72,8 +72,9 @@ services: DOCKER_HOST_IP: 127.0.0.1 KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_CREATE_TOPICS: "time_sync:1:1" + KAFKA_CREATE_TOPICS: "time_sync:1:1,modified_spat:1:1,v2xhub_scheduling_plan_sub:1:1" KAFKA_LOG_DIRS: "/kafka/kafka-logs" + KAFKA_OFFSETS_RETENTION_MINUTES: 1 volumes: - /var/run/docker.sock:/var/run/docker.sock - /etc/localtime:/etc/localtime:ro diff --git a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp index 74c4e2735..419e1b94c 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp +++ b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp @@ -134,7 +134,7 @@ namespace tmx::utils switch (message->err()) { case RdKafka::ERR__TIMED_OUT: - FILE_LOG(logWARNING) << _consumer->name() << " consume failed: " << message->errstr() << std::endl; + FILE_LOG(logDEBUG4) << _consumer->name() << " consume failed: " << message->errstr() << std::endl; break; case RdKafka::ERR_NO_ERROR: FILE_LOG(logDEBUG1) << _consumer->name() << " read message at offset " << message->offset() << std::endl; diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp index 2e2fdf499..f0ae6e831 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp @@ -20,7 +20,7 @@ namespace CARMAStreetsPlugin { */ CARMAStreetsPlugin::CARMAStreetsPlugin(string name) : PluginClient(name) { - + PLOG(logINFO) << "CARMAStreetsPlugin constructor!" << std::endl; AddMessageFilter < BsmMessage > (this, &CARMAStreetsPlugin::HandleBasicSafetyMessage); AddMessageFilter < tsm3Message > (this, &CARMAStreetsPlugin::HandleMobilityOperationMessage); AddMessageFilter < tsm2Message > (this, &CARMAStreetsPlugin::HandleMobilityPathMessage); @@ -62,13 +62,16 @@ void CARMAStreetsPlugin::UpdateConfigSettings() { getline( ss, substring, ','); _strategies.push_back( substring); } +} + +void CARMAStreetsPlugin::InitKafkaConsumerProducers() +{ std::string kafkaConnectString = _kafkaBrokerIp + ':' + _kafkaBrokerPort; std::string error_string; kafkaConnectString = _kafkaBrokerIp + ':' + _kafkaBrokerPort; - kafka_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); - kafka_conf_sp_consumer = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); - kafka_conf_spat_consumer = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + //Producer + kafka_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); PLOG(logDEBUG) <<"Attempting to connect to " << kafkaConnectString; if ((kafka_conf->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK)) { PLOG(logERROR) <<"Setting kafka config options failed with error:" << error_string << "\n" <<"Exiting with exit code 1"; @@ -82,58 +85,31 @@ void CARMAStreetsPlugin::UpdateConfigSettings() { PLOG(logERROR) <<"Creating kafka producer failed with error:" << error_string << "\n" <<"Exiting with exit code 1"; exit(1); } - PLOG(logDEBUG) <<"Kafka producer created"; - - if (kafka_conf_sp_consumer->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK - || (kafka_conf_sp_consumer->set("group.id", _subscribeToSchedulingPlanConsumerGroupId, error_string) != RdKafka::Conf::CONF_OK) - || (kafka_conf_spat_consumer->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK) - || (kafka_conf_spat_consumer->set("group.id", _subscribeToSpatConsumerGroupId, error_string) != RdKafka::Conf::CONF_OK)) { - PLOG(logERROR) <<"Setting kafka config group.id options failed with error:" << error_string << "\n" <<"Exiting with exit code 1"; - exit(1); - } else { - PLOG(logDEBUG) <<"Kafka config group.id options set successfully"; - } - kafka_conf_sp_consumer->set("enable.partition.eof", "true", error_string); - kafka_conf_spat_consumer->set("enable.partition.eof", "true", error_string); - - _scheduing_plan_kafka_consumer = RdKafka::KafkaConsumer::create(kafka_conf_sp_consumer, error_string); - _spat_kafka_consumer = RdKafka::KafkaConsumer::create(kafka_conf_spat_consumer, error_string); - - if ( !_scheduing_plan_kafka_consumer || !_spat_kafka_consumer) { - PLOG(logERROR) << "Failed to create Kafka consumers: " << error_string << std::endl; - exit(1); - } - PLOG(logDEBUG) << "Created consumer " << _scheduing_plan_kafka_consumer->name() << std::endl; - PLOG(logDEBUG) << "Created consumer " << _spat_kafka_consumer->name() << std::endl; - - //create kafka topics - RdKafka::Conf *tconf_spat = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); - RdKafka::Conf *tconf_sp = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); - if(!tconf_spat && !tconf_sp) + PLOG(logDEBUG) <<"Kafka producers created"; + + //Consumers + kafka_client client; + auto uuid = boost::uuids::random_generator()(); + std::stringstream ss; + ss << uuid; + _subscribeToSpatConsumerGroupId += ss.str(); + _subscribeToSchedulingPlanConsumerGroupId += ss.str(); + PLOG(logERROR) << "Kafka INFO:" << kafkaConnectString<<_subscribeToSpatTopic<<_subscribeToSpatConsumerGroupId; + _spat_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSpatTopic,_subscribeToSpatConsumerGroupId); + _scheduing_plan_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSchedulingPlanTopic,_subscribeToSchedulingPlanConsumerGroupId); + if(!_scheduing_plan_kafka_consumer_ptr || !_spat_kafka_consumer_ptr) { - PLOG(logERROR) << "RDKafka create topic conf failed "; + PLOG(logERROR) <<"Failed to create Kafka consumers."; return; - } - - _scheduing_plan_topic = RdKafka::Topic::create(_scheduing_plan_kafka_consumer,_subscribeToSchedulingPlanTopic,tconf_sp,error_string); - if(!_scheduing_plan_topic) - { - PLOG(logERROR) << "RDKafka create scheduing plan topic failed:" << error_string; - return ; } - - _spat_topic = RdKafka::Topic::create(_spat_kafka_consumer,_subscribeToSpatTopic,tconf_spat,error_string); - if(!_spat_topic) + PLOG(logDEBUG) <<"Kafka consumers created"; + if(!_spat_kafka_consumer_ptr->init() || !_scheduing_plan_kafka_consumer_ptr->init()) { - PLOG(logERROR) << "RDKafka create SPAT topic failed:" << error_string; - return ; + PLOG(logERROR) <<"Kafka consumers init() failed!"; } - delete tconf_sp; - delete tconf_spat; - - boost::thread thread_schpl(&CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic, this); - boost::thread thread_spat(&CARMAStreetsPlugin::SubscribeSpatKafkaTopic, this); + thread_schpl = new std::thread(&CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic, this); + thread_spat = new std::thread(&CARMAStreetsPlugin::SubscribeSpatKafkaTopic, this); } void CARMAStreetsPlugin::OnConfigChanged(const char *key, const char *value) { @@ -519,6 +495,7 @@ void CARMAStreetsPlugin::OnStateChange(IvpPluginState state) { if (state == IvpPluginState_registered) { UpdateConfigSettings(); + InitKafkaConsumerProducers(); } } @@ -527,67 +504,54 @@ void CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic() if(_subscribeToSchedulingPlanTopic.length() > 0) { PLOG(logDEBUG) << "SubscribeSchedulingPlanKafkaTopics:" <<_subscribeToSchedulingPlanTopic << std::endl; - std::vector topics; - topics.emplace_back(_subscribeToSchedulingPlanTopic); - - RdKafka::ErrorCode err = _scheduing_plan_kafka_consumer->subscribe(topics); - if (err) - { - PLOG(logERROR) << "Failed to subscribe to " << topics.size() << " topics: " << RdKafka::err2str(err) << std::endl; - return; - } + _scheduing_plan_kafka_consumer_ptr->subscribe(); - while (true) + while (_scheduing_plan_kafka_consumer_ptr->is_running()) { - auto msg = _scheduing_plan_kafka_consumer->consume( 500 ); - if( msg->err() == RdKafka::ERR_NO_ERROR ) + auto payload_str = _scheduing_plan_kafka_consumer_ptr->consume(500); + if(strlen(payload_str) > 0) { - auto payload_str = static_cast( msg->payload() ); - if(msg->len() > 0) - { - PLOG(logDEBUG) << "consumed message payload: " << payload_str <(Key_ScheduleMessageSkipped, ++_scheduleMessageSkipped); - continue; - } + PLOG(logDEBUG) << "consumed message payload: " << payload_str <(Key_ScheduleMessageSkipped, ++_scheduleMessageSkipped); + continue; + } - Json::Value metadata = payload_root["metadata"]; - Json::Value payload_json_array = payload_root["payload"]; - - for ( int index = 0; index < payload_json_array.size(); ++index ) + Json::Value metadata = payload_root["metadata"]; + Json::Value payload_json_array = payload_root["payload"]; + + for ( int index = 0; index < payload_json_array.size(); ++index ) + { + PLOG(logDEBUG) << payload_json_array[index] << std::endl; + Json::Value payload_json = payload_json_array[index]; + tsm3EncodedMessage tsm3EncodedMsgs; + if( getEncodedtsm3 (&tsm3EncodedMsgs, metadata, payload_json) ) { - PLOG(logDEBUG) << payload_json_array[index] << std::endl; - Json::Value payload_json = payload_json_array[index]; - tsm3EncodedMessage tsm3EncodedMsgs; - if( getEncodedtsm3 (&tsm3EncodedMsgs, metadata, payload_json) ) - { - tsm3EncodedMsgs.set_flags( IvpMsgFlags_RouteDSRC ); - tsm3EncodedMsgs.addDsrcMetadata(0xBFEE ); - PLOG(logDEBUG) << "tsm3EncodedMsgs: " << tsm3EncodedMsgs; - BroadcastMessage(static_cast( tsm3EncodedMsgs )); - } + tsm3EncodedMsgs.set_flags( IvpMsgFlags_RouteDSRC ); + tsm3EncodedMsgs.addDsrcMetadata(0xBFEE ); + PLOG(logDEBUG) << "tsm3EncodedMsgs: " << tsm3EncodedMsgs; + BroadcastMessage(static_cast( tsm3EncodedMsgs )); } - //Empty payload - if(payload_json_array.empty()) - { - Json::Value payload_json = {}; - tsm3EncodedMessage tsm3EncodedMsgs; - if( getEncodedtsm3 (&tsm3EncodedMsgs, metadata, payload_json) ) - { - tsm3EncodedMsgs.set_flags( IvpMsgFlags_RouteDSRC ); - tsm3EncodedMsgs.addDsrcMetadata(0xBFEE); - PLOG(logDEBUG) << "tsm3EncodedMsgs: " << tsm3EncodedMsgs; - BroadcastMessage(static_cast( tsm3EncodedMsgs )); - } - } } + //Empty payload + if(payload_json_array.empty()) + { + Json::Value payload_json = {}; + tsm3EncodedMessage tsm3EncodedMsgs; + if( getEncodedtsm3 (&tsm3EncodedMsgs, metadata, payload_json) ) + { + tsm3EncodedMsgs.set_flags( IvpMsgFlags_RouteDSRC ); + tsm3EncodedMsgs.addDsrcMetadata(0xBFEE); + PLOG(logDEBUG) << "tsm3EncodedMsgs: " << tsm3EncodedMsgs; + BroadcastMessage(static_cast( tsm3EncodedMsgs )); + } + } } - delete msg; } } @@ -597,64 +561,51 @@ void CARMAStreetsPlugin::SubscribeSpatKafkaTopic(){ if(_subscribeToSpatTopic.length() > 0) { PLOG(logDEBUG) << "SubscribeSpatKafkaTopics:" <<_subscribeToSpatTopic << std::endl; - std::vector topics; - topics.emplace_back(_subscribeToSpatTopic); - - RdKafka::ErrorCode err = _spat_kafka_consumer->subscribe(topics); - if (err) - { - PLOG(logERROR) << "Failed to subscribe to " << topics.size() << " topics: " << RdKafka::err2str(err) << std::endl; - return; - } - //Initialize Json to J2735 Spat convertor + _spat_kafka_consumer_ptr->subscribe(); + //Initialize Json to J2735 Spat convertor JsonToJ2735SpatConverter spat_convertor; - while (true) + while (_spat_kafka_consumer_ptr->is_running()) { - auto msg = _spat_kafka_consumer->consume( 500 ); - if( msg->err() == RdKafka::ERR_NO_ERROR ) + auto payload_str = _spat_kafka_consumer_ptr->consume(500); + if(strlen(payload_str) > 0) { - auto payload_str = static_cast( msg->payload() ); - if(msg->len() > 0) + PLOG(logDEBUG) << "consumed message payload: " << payload_str <(Key_SPATMessageSkipped, ++_spatMessageSkipped); + continue; + } + //Convert the SPAT JSON string into J2735 SPAT message and encode it. + auto spat_ptr = std::make_shared(); + spat_convertor.convertJson2Spat(payload_root, spat_ptr.get()); + tmx::messages::SpatEncodedMessage spatEncodedMsg; + try { - PLOG(logDEBUG) << "consumed message payload: " << payload_str <(Key_SPATMessageSkipped, ++_spatMessageSkipped); - continue; - } - //Convert the SPAT JSON string into J2735 SPAT message and encode it. - auto spat_ptr = std::make_shared(); - spat_convertor.convertJson2Spat(payload_root, spat_ptr.get()); - tmx::messages::SpatEncodedMessage spatEncodedMsg; - try - { - spat_convertor.encodeSpat(spat_ptr, spatEncodedMsg); - } - catch (TmxException &ex) - { - // Skip messages that fail to encode. - PLOG(logERROR) << "Failed to encoded SPAT message : \n" << payload_str << std::endl << "Exception encountered: " - << ex.what() << std::endl; - ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SPAT, spat_ptr.get()); - SetStatus(Key_SPATMessageSkipped, ++_spatMessageSkipped); - - continue; - } - + spat_convertor.encodeSpat(spat_ptr, spatEncodedMsg); + } + catch (TmxException &ex) + { + // Skip messages that fail to encode. + PLOG(logERROR) << "Failed to encoded SPAT message : \n" << payload_str << std::endl << "Exception encountered: " + << ex.what() << std::endl; ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SPAT, spat_ptr.get()); - PLOG(logDEBUG) << "SpatEncodedMessage: " << spatEncodedMsg; + SetStatus(Key_SPATMessageSkipped, ++_spatMessageSkipped); - //Broadcast the encoded SPAT message - spatEncodedMsg.set_flags(IvpMsgFlags_RouteDSRC); - spatEncodedMsg.addDsrcMetadata(0x8002); - BroadcastMessage(static_cast(spatEncodedMsg)); + continue; } + + ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SPAT, spat_ptr.get()); + PLOG(logDEBUG) << "SpatEncodedMessage: " << spatEncodedMsg; + + //Broadcast the encoded SPAT message + spatEncodedMsg.set_flags(IvpMsgFlags_RouteDSRC); + spatEncodedMsg.addDsrcMetadata(0x8002); + BroadcastMessage(static_cast(spatEncodedMsg)); } - delete msg; } } } @@ -785,12 +736,10 @@ int CARMAStreetsPlugin::Main() { uint64_t lastSendTime = 0; while (_plugin->state != IvpPluginState_error) { - usleep(100000); //sleep for microseconds set from config. } - return (EXIT_SUCCESS); } } /* namespace */ diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h index a9111e945..391a452db 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h @@ -17,6 +17,11 @@ #include "J2735MapToJsonConverter.h" #include "JsonToJ2735SpatConverter.h" #include "J2735ToSRMJsonConverter.h" +#include +#include +#include +#include +#include @@ -70,6 +75,10 @@ class CARMAStreetsPlugin: public PluginClient { * @param topic_name The name of the topic */ void produce_kafka_msg(const string &msg, const string &topic_name) const; + /** + * @brief Initialize Kafka Producers and Consumers + */ + void InitKafkaConsumerProducers(); private: @@ -87,16 +96,14 @@ class CARMAStreetsPlugin: public PluginClient { std::string _kafkaBrokerIp; std::string _kafkaBrokerPort; RdKafka::Conf *kafka_conf; - RdKafka::Conf *kafka_conf_spat_consumer; - RdKafka::Conf *kafka_conf_sp_consumer; RdKafka::Producer *kafka_producer; - RdKafka::KafkaConsumer *_scheduing_plan_kafka_consumer; - RdKafka::KafkaConsumer *_spat_kafka_consumer; - RdKafka::Topic *_scheduing_plan_topic; - RdKafka::Topic *_spat_topic; + std::shared_ptr _spat_kafka_consumer_ptr; + std::shared_ptr _scheduing_plan_kafka_consumer_ptr; std::vector _strategies; tmx::messages::tsm3Message *_tsm3Message{NULL}; std::mutex data_lock; + std::thread* thread_schpl; + std::thread* thread_spat; /** * @brief Status label for SPAT messages skipped due to errors.