From 4052ee32b51a4516dbdae42f0a23e853e562eb22 Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Thu, 29 Jun 2023 21:29:59 -0400 Subject: [PATCH 01/14] init --- .devcontainer/docker-compose-vscode.yml | 3 +- .../src/kafka/kafka_consumer_worker.cpp | 2 +- .../src/CARMAStreetsPlugin.cpp | 259 +++++++----------- .../src/CARMAStreetsPlugin.h | 19 +- 4 files changed, 120 insertions(+), 163 deletions(-) diff --git a/.devcontainer/docker-compose-vscode.yml b/.devcontainer/docker-compose-vscode.yml index c3e19d9a8..e79a86d98 100755 --- a/.devcontainer/docker-compose-vscode.yml +++ b/.devcontainer/docker-compose-vscode.yml @@ -72,8 +72,9 @@ services: DOCKER_HOST_IP: 127.0.0.1 KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_CREATE_TOPICS: "time_sync:1:1" + KAFKA_CREATE_TOPICS: "time_sync:1:1,modified_spat:1:1,v2xhub_scheduling_plan_sub:1:1" KAFKA_LOG_DIRS: "/kafka/kafka-logs" + KAFKA_OFFSETS_RETENTION_MINUTES: 1 volumes: - /var/run/docker.sock:/var/run/docker.sock - /etc/localtime:/etc/localtime:ro diff --git a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp index 74c4e2735..419e1b94c 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp +++ b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp @@ -134,7 +134,7 @@ namespace tmx::utils switch (message->err()) { case RdKafka::ERR__TIMED_OUT: - FILE_LOG(logWARNING) << _consumer->name() << " consume failed: " << message->errstr() << std::endl; + FILE_LOG(logDEBUG4) << _consumer->name() << " consume failed: " << message->errstr() << std::endl; break; case RdKafka::ERR_NO_ERROR: FILE_LOG(logDEBUG1) << _consumer->name() << " read message at offset " << message->offset() << std::endl; diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp index 2e2fdf499..f0ae6e831 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp @@ -20,7 +20,7 @@ namespace CARMAStreetsPlugin { */ CARMAStreetsPlugin::CARMAStreetsPlugin(string name) : PluginClient(name) { - + PLOG(logINFO) << "CARMAStreetsPlugin constructor!" << std::endl; AddMessageFilter < BsmMessage > (this, &CARMAStreetsPlugin::HandleBasicSafetyMessage); AddMessageFilter < tsm3Message > (this, &CARMAStreetsPlugin::HandleMobilityOperationMessage); AddMessageFilter < tsm2Message > (this, &CARMAStreetsPlugin::HandleMobilityPathMessage); @@ -62,13 +62,16 @@ void CARMAStreetsPlugin::UpdateConfigSettings() { getline( ss, substring, ','); _strategies.push_back( substring); } +} + +void CARMAStreetsPlugin::InitKafkaConsumerProducers() +{ std::string kafkaConnectString = _kafkaBrokerIp + ':' + _kafkaBrokerPort; std::string error_string; kafkaConnectString = _kafkaBrokerIp + ':' + _kafkaBrokerPort; - kafka_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); - kafka_conf_sp_consumer = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); - kafka_conf_spat_consumer = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + //Producer + kafka_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); PLOG(logDEBUG) <<"Attempting to connect to " << kafkaConnectString; if ((kafka_conf->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK)) { PLOG(logERROR) <<"Setting kafka config options failed with error:" << error_string << "\n" <<"Exiting with exit code 1"; @@ -82,58 +85,31 @@ void CARMAStreetsPlugin::UpdateConfigSettings() { PLOG(logERROR) <<"Creating kafka producer failed with error:" << error_string << "\n" <<"Exiting with exit code 1"; exit(1); } - PLOG(logDEBUG) <<"Kafka producer created"; - - if (kafka_conf_sp_consumer->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK - || (kafka_conf_sp_consumer->set("group.id", _subscribeToSchedulingPlanConsumerGroupId, error_string) != RdKafka::Conf::CONF_OK) - || (kafka_conf_spat_consumer->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK) - || (kafka_conf_spat_consumer->set("group.id", _subscribeToSpatConsumerGroupId, error_string) != RdKafka::Conf::CONF_OK)) { - PLOG(logERROR) <<"Setting kafka config group.id options failed with error:" << error_string << "\n" <<"Exiting with exit code 1"; - exit(1); - } else { - PLOG(logDEBUG) <<"Kafka config group.id options set successfully"; - } - kafka_conf_sp_consumer->set("enable.partition.eof", "true", error_string); - kafka_conf_spat_consumer->set("enable.partition.eof", "true", error_string); - - _scheduing_plan_kafka_consumer = RdKafka::KafkaConsumer::create(kafka_conf_sp_consumer, error_string); - _spat_kafka_consumer = RdKafka::KafkaConsumer::create(kafka_conf_spat_consumer, error_string); - - if ( !_scheduing_plan_kafka_consumer || !_spat_kafka_consumer) { - PLOG(logERROR) << "Failed to create Kafka consumers: " << error_string << std::endl; - exit(1); - } - PLOG(logDEBUG) << "Created consumer " << _scheduing_plan_kafka_consumer->name() << std::endl; - PLOG(logDEBUG) << "Created consumer " << _spat_kafka_consumer->name() << std::endl; - - //create kafka topics - RdKafka::Conf *tconf_spat = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); - RdKafka::Conf *tconf_sp = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); - if(!tconf_spat && !tconf_sp) + PLOG(logDEBUG) <<"Kafka producers created"; + + //Consumers + kafka_client client; + auto uuid = boost::uuids::random_generator()(); + std::stringstream ss; + ss << uuid; + _subscribeToSpatConsumerGroupId += ss.str(); + _subscribeToSchedulingPlanConsumerGroupId += ss.str(); + PLOG(logERROR) << "Kafka INFO:" << kafkaConnectString<<_subscribeToSpatTopic<<_subscribeToSpatConsumerGroupId; + _spat_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSpatTopic,_subscribeToSpatConsumerGroupId); + _scheduing_plan_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSchedulingPlanTopic,_subscribeToSchedulingPlanConsumerGroupId); + if(!_scheduing_plan_kafka_consumer_ptr || !_spat_kafka_consumer_ptr) { - PLOG(logERROR) << "RDKafka create topic conf failed "; + PLOG(logERROR) <<"Failed to create Kafka consumers."; return; - } - - _scheduing_plan_topic = RdKafka::Topic::create(_scheduing_plan_kafka_consumer,_subscribeToSchedulingPlanTopic,tconf_sp,error_string); - if(!_scheduing_plan_topic) - { - PLOG(logERROR) << "RDKafka create scheduing plan topic failed:" << error_string; - return ; } - - _spat_topic = RdKafka::Topic::create(_spat_kafka_consumer,_subscribeToSpatTopic,tconf_spat,error_string); - if(!_spat_topic) + PLOG(logDEBUG) <<"Kafka consumers created"; + if(!_spat_kafka_consumer_ptr->init() || !_scheduing_plan_kafka_consumer_ptr->init()) { - PLOG(logERROR) << "RDKafka create SPAT topic failed:" << error_string; - return ; + PLOG(logERROR) <<"Kafka consumers init() failed!"; } - delete tconf_sp; - delete tconf_spat; - - boost::thread thread_schpl(&CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic, this); - boost::thread thread_spat(&CARMAStreetsPlugin::SubscribeSpatKafkaTopic, this); + thread_schpl = new std::thread(&CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic, this); + thread_spat = new std::thread(&CARMAStreetsPlugin::SubscribeSpatKafkaTopic, this); } void CARMAStreetsPlugin::OnConfigChanged(const char *key, const char *value) { @@ -519,6 +495,7 @@ void CARMAStreetsPlugin::OnStateChange(IvpPluginState state) { if (state == IvpPluginState_registered) { UpdateConfigSettings(); + InitKafkaConsumerProducers(); } } @@ -527,67 +504,54 @@ void CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic() if(_subscribeToSchedulingPlanTopic.length() > 0) { PLOG(logDEBUG) << "SubscribeSchedulingPlanKafkaTopics:" <<_subscribeToSchedulingPlanTopic << std::endl; - std::vector topics; - topics.emplace_back(_subscribeToSchedulingPlanTopic); - - RdKafka::ErrorCode err = _scheduing_plan_kafka_consumer->subscribe(topics); - if (err) - { - PLOG(logERROR) << "Failed to subscribe to " << topics.size() << " topics: " << RdKafka::err2str(err) << std::endl; - return; - } + _scheduing_plan_kafka_consumer_ptr->subscribe(); - while (true) + while (_scheduing_plan_kafka_consumer_ptr->is_running()) { - auto msg = _scheduing_plan_kafka_consumer->consume( 500 ); - if( msg->err() == RdKafka::ERR_NO_ERROR ) + auto payload_str = _scheduing_plan_kafka_consumer_ptr->consume(500); + if(strlen(payload_str) > 0) { - auto payload_str = static_cast( msg->payload() ); - if(msg->len() > 0) - { - PLOG(logDEBUG) << "consumed message payload: " << payload_str <(Key_ScheduleMessageSkipped, ++_scheduleMessageSkipped); - continue; - } + PLOG(logDEBUG) << "consumed message payload: " << payload_str <(Key_ScheduleMessageSkipped, ++_scheduleMessageSkipped); + continue; + } - Json::Value metadata = payload_root["metadata"]; - Json::Value payload_json_array = payload_root["payload"]; - - for ( int index = 0; index < payload_json_array.size(); ++index ) + Json::Value metadata = payload_root["metadata"]; + Json::Value payload_json_array = payload_root["payload"]; + + for ( int index = 0; index < payload_json_array.size(); ++index ) + { + PLOG(logDEBUG) << payload_json_array[index] << std::endl; + Json::Value payload_json = payload_json_array[index]; + tsm3EncodedMessage tsm3EncodedMsgs; + if( getEncodedtsm3 (&tsm3EncodedMsgs, metadata, payload_json) ) { - PLOG(logDEBUG) << payload_json_array[index] << std::endl; - Json::Value payload_json = payload_json_array[index]; - tsm3EncodedMessage tsm3EncodedMsgs; - if( getEncodedtsm3 (&tsm3EncodedMsgs, metadata, payload_json) ) - { - tsm3EncodedMsgs.set_flags( IvpMsgFlags_RouteDSRC ); - tsm3EncodedMsgs.addDsrcMetadata(0xBFEE ); - PLOG(logDEBUG) << "tsm3EncodedMsgs: " << tsm3EncodedMsgs; - BroadcastMessage(static_cast( tsm3EncodedMsgs )); - } + tsm3EncodedMsgs.set_flags( IvpMsgFlags_RouteDSRC ); + tsm3EncodedMsgs.addDsrcMetadata(0xBFEE ); + PLOG(logDEBUG) << "tsm3EncodedMsgs: " << tsm3EncodedMsgs; + BroadcastMessage(static_cast( tsm3EncodedMsgs )); } - //Empty payload - if(payload_json_array.empty()) - { - Json::Value payload_json = {}; - tsm3EncodedMessage tsm3EncodedMsgs; - if( getEncodedtsm3 (&tsm3EncodedMsgs, metadata, payload_json) ) - { - tsm3EncodedMsgs.set_flags( IvpMsgFlags_RouteDSRC ); - tsm3EncodedMsgs.addDsrcMetadata(0xBFEE); - PLOG(logDEBUG) << "tsm3EncodedMsgs: " << tsm3EncodedMsgs; - BroadcastMessage(static_cast( tsm3EncodedMsgs )); - } - } } + //Empty payload + if(payload_json_array.empty()) + { + Json::Value payload_json = {}; + tsm3EncodedMessage tsm3EncodedMsgs; + if( getEncodedtsm3 (&tsm3EncodedMsgs, metadata, payload_json) ) + { + tsm3EncodedMsgs.set_flags( IvpMsgFlags_RouteDSRC ); + tsm3EncodedMsgs.addDsrcMetadata(0xBFEE); + PLOG(logDEBUG) << "tsm3EncodedMsgs: " << tsm3EncodedMsgs; + BroadcastMessage(static_cast( tsm3EncodedMsgs )); + } + } } - delete msg; } } @@ -597,64 +561,51 @@ void CARMAStreetsPlugin::SubscribeSpatKafkaTopic(){ if(_subscribeToSpatTopic.length() > 0) { PLOG(logDEBUG) << "SubscribeSpatKafkaTopics:" <<_subscribeToSpatTopic << std::endl; - std::vector topics; - topics.emplace_back(_subscribeToSpatTopic); - - RdKafka::ErrorCode err = _spat_kafka_consumer->subscribe(topics); - if (err) - { - PLOG(logERROR) << "Failed to subscribe to " << topics.size() << " topics: " << RdKafka::err2str(err) << std::endl; - return; - } - //Initialize Json to J2735 Spat convertor + _spat_kafka_consumer_ptr->subscribe(); + //Initialize Json to J2735 Spat convertor JsonToJ2735SpatConverter spat_convertor; - while (true) + while (_spat_kafka_consumer_ptr->is_running()) { - auto msg = _spat_kafka_consumer->consume( 500 ); - if( msg->err() == RdKafka::ERR_NO_ERROR ) + auto payload_str = _spat_kafka_consumer_ptr->consume(500); + if(strlen(payload_str) > 0) { - auto payload_str = static_cast( msg->payload() ); - if(msg->len() > 0) + PLOG(logDEBUG) << "consumed message payload: " << payload_str <(Key_SPATMessageSkipped, ++_spatMessageSkipped); + continue; + } + //Convert the SPAT JSON string into J2735 SPAT message and encode it. + auto spat_ptr = std::make_shared(); + spat_convertor.convertJson2Spat(payload_root, spat_ptr.get()); + tmx::messages::SpatEncodedMessage spatEncodedMsg; + try { - PLOG(logDEBUG) << "consumed message payload: " << payload_str <(Key_SPATMessageSkipped, ++_spatMessageSkipped); - continue; - } - //Convert the SPAT JSON string into J2735 SPAT message and encode it. - auto spat_ptr = std::make_shared(); - spat_convertor.convertJson2Spat(payload_root, spat_ptr.get()); - tmx::messages::SpatEncodedMessage spatEncodedMsg; - try - { - spat_convertor.encodeSpat(spat_ptr, spatEncodedMsg); - } - catch (TmxException &ex) - { - // Skip messages that fail to encode. - PLOG(logERROR) << "Failed to encoded SPAT message : \n" << payload_str << std::endl << "Exception encountered: " - << ex.what() << std::endl; - ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SPAT, spat_ptr.get()); - SetStatus(Key_SPATMessageSkipped, ++_spatMessageSkipped); - - continue; - } - + spat_convertor.encodeSpat(spat_ptr, spatEncodedMsg); + } + catch (TmxException &ex) + { + // Skip messages that fail to encode. + PLOG(logERROR) << "Failed to encoded SPAT message : \n" << payload_str << std::endl << "Exception encountered: " + << ex.what() << std::endl; ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SPAT, spat_ptr.get()); - PLOG(logDEBUG) << "SpatEncodedMessage: " << spatEncodedMsg; + SetStatus(Key_SPATMessageSkipped, ++_spatMessageSkipped); - //Broadcast the encoded SPAT message - spatEncodedMsg.set_flags(IvpMsgFlags_RouteDSRC); - spatEncodedMsg.addDsrcMetadata(0x8002); - BroadcastMessage(static_cast(spatEncodedMsg)); + continue; } + + ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SPAT, spat_ptr.get()); + PLOG(logDEBUG) << "SpatEncodedMessage: " << spatEncodedMsg; + + //Broadcast the encoded SPAT message + spatEncodedMsg.set_flags(IvpMsgFlags_RouteDSRC); + spatEncodedMsg.addDsrcMetadata(0x8002); + BroadcastMessage(static_cast(spatEncodedMsg)); } - delete msg; } } } @@ -785,12 +736,10 @@ int CARMAStreetsPlugin::Main() { uint64_t lastSendTime = 0; while (_plugin->state != IvpPluginState_error) { - usleep(100000); //sleep for microseconds set from config. } - return (EXIT_SUCCESS); } } /* namespace */ diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h index a9111e945..391a452db 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h @@ -17,6 +17,11 @@ #include "J2735MapToJsonConverter.h" #include "JsonToJ2735SpatConverter.h" #include "J2735ToSRMJsonConverter.h" +#include +#include +#include +#include +#include @@ -70,6 +75,10 @@ class CARMAStreetsPlugin: public PluginClient { * @param topic_name The name of the topic */ void produce_kafka_msg(const string &msg, const string &topic_name) const; + /** + * @brief Initialize Kafka Producers and Consumers + */ + void InitKafkaConsumerProducers(); private: @@ -87,16 +96,14 @@ class CARMAStreetsPlugin: public PluginClient { std::string _kafkaBrokerIp; std::string _kafkaBrokerPort; RdKafka::Conf *kafka_conf; - RdKafka::Conf *kafka_conf_spat_consumer; - RdKafka::Conf *kafka_conf_sp_consumer; RdKafka::Producer *kafka_producer; - RdKafka::KafkaConsumer *_scheduing_plan_kafka_consumer; - RdKafka::KafkaConsumer *_spat_kafka_consumer; - RdKafka::Topic *_scheduing_plan_topic; - RdKafka::Topic *_spat_topic; + std::shared_ptr _spat_kafka_consumer_ptr; + std::shared_ptr _scheduing_plan_kafka_consumer_ptr; std::vector _strategies; tmx::messages::tsm3Message *_tsm3Message{NULL}; std::mutex data_lock; + std::thread* thread_schpl; + std::thread* thread_spat; /** * @brief Status label for SPAT messages skipped due to errors. From 39f4d58344b6dd5461162af757a37d8b567f160e Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Fri, 30 Jun 2023 10:50:11 -0400 Subject: [PATCH 02/14] update --- src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp index 50e3c0e30..26d4aecdc 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp @@ -20,7 +20,6 @@ namespace CARMAStreetsPlugin { */ CARMAStreetsPlugin::CARMAStreetsPlugin(string name) : PluginClient(name) { - PLOG(logINFO) << "CARMAStreetsPlugin constructor!" << std::endl; AddMessageFilter < BsmMessage > (this, &CARMAStreetsPlugin::HandleBasicSafetyMessage); AddMessageFilter < tsm3Message > (this, &CARMAStreetsPlugin::HandleMobilityOperationMessage); AddMessageFilter < tsm2Message > (this, &CARMAStreetsPlugin::HandleMobilityPathMessage); From 108c67daf02c7785b5412540a1002ba394f10a2f Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Fri, 30 Jun 2023 14:59:51 +0000 Subject: [PATCH 03/14] update --- src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h | 1 - 1 file changed, 1 deletion(-) diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h index 14b18316d..e647141cd 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h @@ -108,7 +108,6 @@ class CARMAStreetsPlugin: public PluginClient { std::shared_ptr _spat_kafka_consumer_ptr; std::shared_ptr _scheduing_plan_kafka_consumer_ptr; std::shared_ptr _ssm_kafka_consumer_ptr; - RdKafka::Producer *kafka_producer; std::vector _strategies; tmx::messages::tsm3Message *_tsm3Message{NULL}; std::mutex data_lock; From ad2311074491b8a301fc9a9f4d318871b0af50d2 Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Fri, 30 Jun 2023 20:55:24 +0000 Subject: [PATCH 04/14] address comments --- .devcontainer/docker-compose-vscode.yml | 2 +- .../src/kafka/kafka_consumer_worker.cpp | 7 +++++++ .../TmxUtils/src/kafka/kafka_consumer_worker.h | 1 + .../src/CARMAStreetsPlugin.cpp | 17 +++++++++++++---- .../CARMAStreetsPlugin/src/CARMAStreetsPlugin.h | 3 --- 5 files changed, 22 insertions(+), 8 deletions(-) diff --git a/.devcontainer/docker-compose-vscode.yml b/.devcontainer/docker-compose-vscode.yml index e79a86d98..2a419aa6c 100755 --- a/.devcontainer/docker-compose-vscode.yml +++ b/.devcontainer/docker-compose-vscode.yml @@ -72,7 +72,7 @@ services: DOCKER_HOST_IP: 127.0.0.1 KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_CREATE_TOPICS: "time_sync:1:1,modified_spat:1:1,v2xhub_scheduling_plan_sub:1:1" + KAFKA_CREATE_TOPICS: "time_sync:1:1,modified_spat:1:1,v2xhub_scheduling_plan_sub:1:1,v2xhub_ssm_sub:1:1" KAFKA_LOG_DIRS: "/kafka/kafka-logs" KAFKA_OFFSETS_RETENTION_MINUTES: 1 volumes: diff --git a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp index 419e1b94c..376383039 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp +++ b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp @@ -42,6 +42,12 @@ namespace tmx::utils return false; } + if (conf->set(ENABLE_AUTO_COMMIT, "true", errstr) != RdKafka::Conf::CONF_OK) + { + FILE_LOG(logWARNING) << "RDKafka conf set enable auto commit failed: " << errstr.c_str() << std::endl; + return false; + } + // set consumer group if (conf->set(GROUP_ID, _group_id_str, errstr) != RdKafka::Conf::CONF_OK) { @@ -88,6 +94,7 @@ namespace tmx::utils void kafka_consumer_worker::stop() { _run = false; + _consumer->close(); /*Destroy kafka instance*/ // Wait for RdKafka to decommission. RdKafka::wait_destroyed(5000); } diff --git a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h index a3e00c00b..95a9c96a8 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h +++ b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h @@ -40,6 +40,7 @@ namespace tmx::utils { const std::string GROUP_ID="group.id"; const std::string MAX_PARTITION_FETCH_SIZE="max.partition.fetch.bytes"; const std::string ENABLE_PARTITION_END_OF="enable.partition.eof"; + const std::string ENABLE_AUTO_COMMIT="enable.auto.commit"; //maximum size for pulling message from a single partition at a time std::string STR_FETCH_NUM = "10240000"; diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp index 26d4aecdc..2de561e02 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp @@ -20,6 +20,8 @@ namespace CARMAStreetsPlugin { */ CARMAStreetsPlugin::CARMAStreetsPlugin(string name) : PluginClient(name) { + + PLOG(logERROR) << "CARMAStreetsPlugin Constructor."; AddMessageFilter < BsmMessage > (this, &CARMAStreetsPlugin::HandleBasicSafetyMessage); AddMessageFilter < tsm3Message > (this, &CARMAStreetsPlugin::HandleMobilityOperationMessage); AddMessageFilter < tsm2Message > (this, &CARMAStreetsPlugin::HandleMobilityPathMessage); @@ -31,6 +33,11 @@ CARMAStreetsPlugin::CARMAStreetsPlugin(string name) : } CARMAStreetsPlugin::~CARMAStreetsPlugin() { + PLOG(logERROR) << "CARMAStreetsPlugin Destructor."; + //Todo: It does not seem the desctructor is called. + _spat_kafka_consumer_ptr->stop(); + _scheduing_plan_kafka_consumer_ptr->stop(); + _ssm_kafka_consumer_ptr->stop(); } void CARMAStreetsPlugin::UpdateConfigSettings() { @@ -95,7 +102,8 @@ void CARMAStreetsPlugin::InitKafkaConsumerProducers() ss << uuid; _subscribeToSpatConsumerGroupId += ss.str(); _subscribeToSchedulingPlanConsumerGroupId += ss.str(); - PLOG(logERROR) << "Kafka INFO:" << kafkaConnectString<<_subscribeToSpatTopic<<_subscribeToSpatConsumerGroupId; + _subscribeToSSMConsumerGroupId += ss.str(); + //Todo further enhancement: Temporary fix for the consumer rebalancing die to multiple consumers join the same group upon restarting plugin. _spat_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSpatTopic,_subscribeToSpatConsumerGroupId); _scheduing_plan_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSchedulingPlanTopic,_subscribeToSchedulingPlanConsumerGroupId); _ssm_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSsmTopic,_subscribeToSSMConsumerGroupId); @@ -110,9 +118,9 @@ void CARMAStreetsPlugin::InitKafkaConsumerProducers() PLOG(logERROR) <<"Kafka consumers init() failed!"; } - thread_schpl = new std::thread(&CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic, this); - thread_spat = new std::thread(&CARMAStreetsPlugin::SubscribeSpatKafkaTopic, this); - thread_ssm = new std::thread(&CARMAStreetsPlugin::SubscribeSSMKafkaTopic, this); + boost::thread thread_schpl(&CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic, this); + boost::thread thread_spat(&CARMAStreetsPlugin::SubscribeSpatKafkaTopic, this); + boost::thread thread_ssm(&CARMAStreetsPlugin::SubscribeSSMKafkaTopic, this); } void CARMAStreetsPlugin::OnConfigChanged(const char *key, const char *value) { @@ -795,6 +803,7 @@ int CARMAStreetsPlugin::Main() { usleep(100000); //sleep for microseconds set from config. } + return (EXIT_SUCCESS); } } /* namespace */ diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h index e647141cd..e8d4620cd 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h @@ -111,9 +111,6 @@ class CARMAStreetsPlugin: public PluginClient { std::vector _strategies; tmx::messages::tsm3Message *_tsm3Message{NULL}; std::mutex data_lock; - std::thread* thread_schpl; - std::thread* thread_spat; - std::thread* thread_ssm; /** * @brief Status label for SPAT messages skipped due to errors. From a8858de5d1b80943703314810a87cab8db6d2dd6 Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Wed, 5 Jul 2023 20:38:03 +0000 Subject: [PATCH 05/14] update comments --- src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp index 2de561e02..071768523 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp @@ -103,7 +103,7 @@ void CARMAStreetsPlugin::InitKafkaConsumerProducers() _subscribeToSpatConsumerGroupId += ss.str(); _subscribeToSchedulingPlanConsumerGroupId += ss.str(); _subscribeToSSMConsumerGroupId += ss.str(); - //Todo further enhancement: Temporary fix for the consumer rebalancing die to multiple consumers join the same group upon restarting plugin. + //Todo further enhancement: Temporary fix for the consumer rebalancing due to multiple consumers join the same group upon restarting plugin. _spat_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSpatTopic,_subscribeToSpatConsumerGroupId); _scheduing_plan_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSchedulingPlanTopic,_subscribeToSchedulingPlanConsumerGroupId); _ssm_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSsmTopic,_subscribeToSSMConsumerGroupId); @@ -116,6 +116,7 @@ void CARMAStreetsPlugin::InitKafkaConsumerProducers() if(!_spat_kafka_consumer_ptr->init() || !_scheduing_plan_kafka_consumer_ptr->init() || !_ssm_kafka_consumer_ptr->init()) { PLOG(logERROR) <<"Kafka consumers init() failed!"; + return; } boost::thread thread_schpl(&CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic, this); From 5f45cf5c6487201d18707f5c3dc12999295bb548 Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Thu, 6 Jul 2023 02:05:37 +0000 Subject: [PATCH 06/14] add create producer --- src/tmx/TmxUtils/src/kafka/kafka_client.cpp | 14 +++++ src/tmx/TmxUtils/src/kafka/kafka_client.h | 1 + .../src/kafka/kafka_producer_worker.cpp | 53 ++++++++++++++++++- .../src/kafka/kafka_producer_worker.h | 25 ++++++++- .../src/CARMAStreetsPlugin.cpp | 50 ++--------------- .../src/CARMAStreetsPlugin.h | 3 +- 6 files changed, 96 insertions(+), 50 deletions(-) diff --git a/src/tmx/TmxUtils/src/kafka/kafka_client.cpp b/src/tmx/TmxUtils/src/kafka/kafka_client.cpp index df5346f4f..925b35aa4 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_client.cpp +++ b/src/tmx/TmxUtils/src/kafka/kafka_client.cpp @@ -34,5 +34,19 @@ namespace tmx::utils } } + std::shared_ptr kafka_client::create_producer(const std::string &bootstrap_server) const + { + try + { + auto producer_ptr = std::make_shared(bootstrap_server); + return producer_ptr; + } + catch (const std::runtime_error &e) + { + FILE_LOG(logERROR) << "Create producer failure: " << e.what() << std::endl; + exit(1); + } + } + } \ No newline at end of file diff --git a/src/tmx/TmxUtils/src/kafka/kafka_client.h b/src/tmx/TmxUtils/src/kafka/kafka_client.h index b4da5e059..778a17026 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_client.h +++ b/src/tmx/TmxUtils/src/kafka/kafka_client.h @@ -15,6 +15,7 @@ namespace tmx::utils std::shared_ptr create_consumer(const std::string &broker_str, const std::string &topic_str, const std::string &group_id_str) const; std::shared_ptr create_producer(const std::string &broker_str, const std::string &topic_str) const; + std::shared_ptr create_producer(const std::string &bootstrap_server) const; }; } diff --git a/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp b/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp index 46cbc55e3..bdfd62169 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp +++ b/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp @@ -33,13 +33,27 @@ namespace tmx::utils { } + kafka_producer_worker::kafka_producer_worker(const std::string &brokers) + : _broker_str(brokers), _run(true) + { + init_producer(); + } + bool kafka_producer_worker::init() + { + if(init_producer()) + { + return init_topic(); + } + return false; + } + + bool kafka_producer_worker::init_producer() { std::string errstr = ""; // Create configuration objects RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); - RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); /*** * Set Configuration properties @@ -75,7 +89,12 @@ namespace tmx::utils delete conf; FILE_LOG(logINFO) << "Created producer: " << _producer->name() << std::endl; + } + bool kafka_producer_worker::init_topic() + { + RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); + std::string errstr = ""; // Create topic handle _topic = RdKafka::Topic::create(_producer, _topics_str, tconf, errstr); if (!_topic) @@ -156,6 +175,38 @@ namespace tmx::utils _producer->poll(0); } + void kafka_producer_worker::send(const std::string& message, const std::string& topic_name ) const + { + bool retry = true; + while (retry) + { + RdKafka::ErrorCode produce_error = _producer->produce(topic_name, + RdKafka::Topic::PARTITION_UA, + RdKafka::Producer::RK_MSG_COPY, + const_cast(message.c_str()), + message.size(), + nullptr, 0, 0, nullptr); + + if (produce_error == RdKafka::ERR_NO_ERROR) { + PLOG(logDEBUG) <<"Queued message:" << message; + retry = false; + } + else + { + PLOG(logERROR) <<"Failed to queue message:" << message <<" with error:" << RdKafka::err2str(produce_error); + if (produce_error == RdKafka::ERR__QUEUE_FULL) { + PLOG(logERROR) <<"Message queue full...retrying..."; + _producer->poll(500); /* ms */ + retry = true; + } + else { + PLOG(logERROR) <<"Unhandled error in queue_kafka_message:" << RdKafka::err2str(produce_error); + retry = false; + } + } + } + } + void kafka_producer_worker::stop() { /* Wait for final messages to be delivered or fail. diff --git a/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h b/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h index a06a5587c..e02e9ac49 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h +++ b/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h @@ -54,6 +54,12 @@ namespace tmx::utils * @param n_partition partition producer should be assigned to. */ kafka_producer_worker(const std::string &brokers, const std::string &topics, int n_partition = 0); + /** + * @brief Construct a new kafka producer worker object + * + * @param broker_str network address of kafka broker. + */ + kafka_producer_worker(const std::string &brokers); /** * @brief Initialize kafka_producer_worker. This method must be called before send! * @@ -61,12 +67,29 @@ namespace tmx::utils * @return false if unsuccessful. */ virtual bool init(); + /** + * @brief Initialize kafka topic. + * @return true if successful. + * @return false if unsuccessful. + */ + virtual bool init_topic(); + /** + * @brief Initialize kafka producer. + * @return true if successful. + * @return false if unsuccessful. + */ + virtual bool init_producer(); /** * @brief Produce to topic. Will result in segmentation fault if init() is not called on producer first. - * * @param msg message to produce. */ virtual void send(const std::string &msg); + /** + * @brief Produce to a specific topic. Will result in segmentation fault if init() is not called on producer first. + * @param msg message to produce. + * @param topic_name topic to send the message to. + */ + virtual void send(const std::string& message, const std::string& topic_name ) const; /** * @brief Is kafka_producer_worker still running? * diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp index 071768523..8a2fb8970 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp @@ -77,26 +77,12 @@ void CARMAStreetsPlugin::InitKafkaConsumerProducers() std::string kafkaConnectString = _kafkaBrokerIp + ':' + _kafkaBrokerPort; std::string error_string; kafkaConnectString = _kafkaBrokerIp + ':' + _kafkaBrokerPort; + kafka_client client; //Producer - kafka_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); - PLOG(logDEBUG) <<"Attempting to connect to " << kafkaConnectString; - if ((kafka_conf->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK)) { - PLOG(logERROR) <<"Setting kafka config options failed with error:" << error_string << "\n" <<"Exiting with exit code 1"; - exit(1); - } else { - PLOG(logDEBUG) <<"Kafka config options set successfully"; - } - - kafka_producer = RdKafka::Producer::create(kafka_conf, error_string); - if (!kafka_producer) { - PLOG(logERROR) <<"Creating kafka producer failed with error:" << error_string << "\n" <<"Exiting with exit code 1"; - exit(1); - } - PLOG(logDEBUG) <<"Kafka producers created"; + _kafka_producer_ptr = client.create_producer(kafkaConnectString); //Consumers - kafka_client client; auto uuid = boost::uuids::random_generator()(); std::stringstream ss; ss << uuid; @@ -135,8 +121,7 @@ void CARMAStreetsPlugin::HandleMobilityOperationMessage(tsm3Message &msg, routea auto mobilityOperation = msg.get_j2735_data(); PLOG(logDEBUG) << "Body OperationParams : " << mobilityOperation->body.operationParams.buf << "\n" << "Body Strategy : " << mobilityOperation->body.strategy.buf<< "\n" - <<"Queueing kafka message:topic:" << _transmitMobilityOperationTopic << " " - << kafka_producer->outq_len() <<"messages already in queue"; + <<"Queueing kafka message:topic:" << _transmitMobilityOperationTopic; std::stringstream strat; std::stringstream payload; @@ -472,34 +457,7 @@ void CARMAStreetsPlugin::HandleMapMessage(MapDataMessage &msg, routeable_message void CARMAStreetsPlugin::produce_kafka_msg(const string& message, const string& topic_name) const { - bool retry = true; - while (retry) - { - RdKafka::ErrorCode produce_error = kafka_producer->produce(topic_name, - RdKafka::Topic::PARTITION_UA, - RdKafka::Producer::RK_MSG_COPY, - const_cast(message.c_str()), - message.size(), - nullptr, 0, 0, nullptr); - - if (produce_error == RdKafka::ERR_NO_ERROR) { - PLOG(logDEBUG) <<"Queued message:" << message; - retry = false; - } - else - { - PLOG(logERROR) <<"Failed to queue message:" << message <<" with error:" << RdKafka::err2str(produce_error); - if (produce_error == RdKafka::ERR__QUEUE_FULL) { - PLOG(logERROR) <<"Message queue full...retrying..."; - kafka_producer->poll(500); /* ms */ - retry = true; - } - else { - PLOG(logERROR) <<"Unhandled error in queue_kafka_message:" << RdKafka::err2str(produce_error); - retry = false; - } - } - } + _kafka_producer_ptr->send(message, topic_name); } void CARMAStreetsPlugin::OnStateChange(IvpPluginState state) { diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h index e8d4620cd..190e2a19f 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h @@ -103,8 +103,7 @@ class CARMAStreetsPlugin: public PluginClient { std::string _transmitSRMTopic; std::string _kafkaBrokerIp; std::string _kafkaBrokerPort; - RdKafka::Conf *kafka_conf; - RdKafka::Producer *kafka_producer; + std::shared_ptr _kafka_producer_ptr; std::shared_ptr _spat_kafka_consumer_ptr; std::shared_ptr _scheduing_plan_kafka_consumer_ptr; std::shared_ptr _ssm_kafka_consumer_ptr; From 89f0d546bd9c1422a5504376c214e466d079da77 Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Thu, 6 Jul 2023 15:36:14 +0000 Subject: [PATCH 07/14] update --- .../CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp index 8a2fb8970..375e54cd7 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp @@ -20,8 +20,6 @@ namespace CARMAStreetsPlugin { */ CARMAStreetsPlugin::CARMAStreetsPlugin(string name) : PluginClient(name) { - - PLOG(logERROR) << "CARMAStreetsPlugin Constructor."; AddMessageFilter < BsmMessage > (this, &CARMAStreetsPlugin::HandleBasicSafetyMessage); AddMessageFilter < tsm3Message > (this, &CARMAStreetsPlugin::HandleMobilityOperationMessage); AddMessageFilter < tsm2Message > (this, &CARMAStreetsPlugin::HandleMobilityPathMessage); @@ -33,7 +31,6 @@ CARMAStreetsPlugin::CARMAStreetsPlugin(string name) : } CARMAStreetsPlugin::~CARMAStreetsPlugin() { - PLOG(logERROR) << "CARMAStreetsPlugin Destructor."; //Todo: It does not seem the desctructor is called. _spat_kafka_consumer_ptr->stop(); _scheduing_plan_kafka_consumer_ptr->stop(); @@ -478,8 +475,8 @@ void CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic() while (_scheduing_plan_kafka_consumer_ptr->is_running()) { - auto payload_str = _scheduing_plan_kafka_consumer_ptr->consume(500); - if(strlen(payload_str) > 0) + std::string payload_str = _scheduing_plan_kafka_consumer_ptr->consume(500); + if(payload_str.length() > 0) { PLOG(logDEBUG) << "consumed message payload: " << payload_str <is_running()) { - auto payload_str = _spat_kafka_consumer_ptr->consume(500); - if(strlen(payload_str) > 0) + std::string payload_str = _spat_kafka_consumer_ptr->consume(500); + if(payload_str.length() > 0) { PLOG(logDEBUG) << "consumed message payload: " << payload_str <is_running()) { - auto payload_str = _ssm_kafka_consumer_ptr->consume(500); - if(strlen(payload_str) > 0) + std::string payload_str = _ssm_kafka_consumer_ptr->consume(500); + if(payload_str.length() > 0) { PLOG(logDEBUG) << "consumed message payload: " << payload_str < Date: Thu, 6 Jul 2023 15:47:27 +0000 Subject: [PATCH 08/14] add unit test --- .../TmxUtils/test/test_kafka_producer_worker.cpp | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/tmx/TmxUtils/test/test_kafka_producer_worker.cpp b/src/tmx/TmxUtils/test/test_kafka_producer_worker.cpp index 28aaf6159..ca04a1e19 100644 --- a/src/tmx/TmxUtils/test/test_kafka_producer_worker.cpp +++ b/src/tmx/TmxUtils/test/test_kafka_producer_worker.cpp @@ -15,3 +15,18 @@ TEST(test_kafka_producer_worker, create_producer) worker->send(msg); worker->stop(); } + +TEST(test_kafka_producer_worker, create_producer_no_topic) +{ + std::string broker_str = "localhost:9092"; + std::string topic = "test"; + auto client = std::make_shared(); + std::shared_ptr worker; + worker = client->create_producer(broker_str); + worker->init(); + worker->printCurrConf(); + std::string msg = "test message"; + // // Run this unit test without launching kafka broker will throw connection refused error + worker->send(msg, topic); + worker->stop(); +} \ No newline at end of file From 70df5ecbb65b2716b28f18d2f10e34b6c2288a47 Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Thu, 6 Jul 2023 16:45:00 +0000 Subject: [PATCH 09/14] address comments --- src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp index 376383039..a0a3d2c8f 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp +++ b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp @@ -94,6 +94,7 @@ namespace tmx::utils void kafka_consumer_worker::stop() { _run = false; + //Close and shutdown the consumer. _consumer->close(); /*Destroy kafka instance*/ // Wait for RdKafka to decommission. RdKafka::wait_destroyed(5000); From 0c1bddd5cd6a644333673dedbc872db69dcfce6b Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Thu, 6 Jul 2023 17:07:07 +0000 Subject: [PATCH 10/14] address comments --- src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp | 3 +-- src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp | 4 ++++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp b/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp index bdfd62169..e81364c19 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp +++ b/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp @@ -35,8 +35,7 @@ namespace tmx::utils kafka_producer_worker::kafka_producer_worker(const std::string &brokers) : _broker_str(brokers), _run(true) - { - init_producer(); + { } bool kafka_producer_worker::init() diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp index 375e54cd7..217ca1fa6 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp @@ -78,6 +78,10 @@ void CARMAStreetsPlugin::InitKafkaConsumerProducers() //Producer _kafka_producer_ptr = client.create_producer(kafkaConnectString); + if(!_kafka_producer_ptr->init_producer()) + { + return; + } //Consumers auto uuid = boost::uuids::random_generator()(); From 8280865dce0e7df5529eee2abd543f6feda7e9ea Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Thu, 6 Jul 2023 14:30:41 -0400 Subject: [PATCH 11/14] remove uuid --- src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp | 7 ------- src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h | 5 +---- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp index 217ca1fa6..71703bd59 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp @@ -84,13 +84,6 @@ void CARMAStreetsPlugin::InitKafkaConsumerProducers() } //Consumers - auto uuid = boost::uuids::random_generator()(); - std::stringstream ss; - ss << uuid; - _subscribeToSpatConsumerGroupId += ss.str(); - _subscribeToSchedulingPlanConsumerGroupId += ss.str(); - _subscribeToSSMConsumerGroupId += ss.str(); - //Todo further enhancement: Temporary fix for the consumer rebalancing due to multiple consumers join the same group upon restarting plugin. _spat_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSpatTopic,_subscribeToSpatConsumerGroupId); _scheduing_plan_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSchedulingPlanTopic,_subscribeToSchedulingPlanConsumerGroupId); _ssm_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSsmTopic,_subscribeToSSMConsumerGroupId); diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h index 190e2a19f..35df38132 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h @@ -16,10 +16,7 @@ #include #include "J2735MapToJsonConverter.h" #include "JsonToJ2735SpatConverter.h" -#include "J2735ToSRMJsonConverter.h" -#include -#include -#include +#include "J2735ToSRMJsonConverter.h" #include #include #include "JsonToJ2735SSMConverter.h" From ecc4ada3bd08e0612a1dad6369bbffa353f72fe0 Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Fri, 7 Jul 2023 09:26:48 -0400 Subject: [PATCH 12/14] address comments --- .devcontainer/docker-compose-vscode.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.devcontainer/docker-compose-vscode.yml b/.devcontainer/docker-compose-vscode.yml index 2a419aa6c..a34dd25c9 100755 --- a/.devcontainer/docker-compose-vscode.yml +++ b/.devcontainer/docker-compose-vscode.yml @@ -74,7 +74,6 @@ services: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_CREATE_TOPICS: "time_sync:1:1,modified_spat:1:1,v2xhub_scheduling_plan_sub:1:1,v2xhub_ssm_sub:1:1" KAFKA_LOG_DIRS: "/kafka/kafka-logs" - KAFKA_OFFSETS_RETENTION_MINUTES: 1 volumes: - /var/run/docker.sock:/var/run/docker.sock - /etc/localtime:/etc/localtime:ro From d91ae9d895d0508b2ba0a154ddee3b825c34a64b Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Fri, 7 Jul 2023 09:31:35 -0400 Subject: [PATCH 13/14] code smell --- src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h b/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h index e02e9ac49..f14709cb8 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h +++ b/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h @@ -59,7 +59,7 @@ namespace tmx::utils * * @param broker_str network address of kafka broker. */ - kafka_producer_worker(const std::string &brokers); + explicit kafka_producer_worker(const std::string &brokers); /** * @brief Initialize kafka_producer_worker. This method must be called before send! * From f06e0f5cad0f02c1a5d72338c2898c9f598a1488 Mon Sep 17 00:00:00 2001 From: dan-du-car Date: Fri, 7 Jul 2023 09:32:05 -0400 Subject: [PATCH 14/14] code smell --- src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h b/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h index f14709cb8..7dde427d1 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h +++ b/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h @@ -53,7 +53,7 @@ namespace tmx::utils * @param topic_str topic producer should produce to. * @param n_partition partition producer should be assigned to. */ - kafka_producer_worker(const std::string &brokers, const std::string &topics, int n_partition = 0); + explicit kafka_producer_worker(const std::string &brokers, const std::string &topics, int n_partition = 0); /** * @brief Construct a new kafka producer worker object *