From 1c5a862ce4ffabca662159859a6d6f95300b78cb Mon Sep 17 00:00:00 2001 From: dan-du-car <62157949+dan-du-car@users.noreply.github.com> Date: Fri, 7 Jul 2023 10:53:31 -0400 Subject: [PATCH] CARMA Streets Plugin Kafka Consumers (#547) # PR Details ## Description - Remove the kafka consumer/producer initialization during config parameter update. - Replace consumer creation with v2xhub kafka_client library. - Add producer creation in kafka_client library ## Related Issue https://github.com/usdot-fhwa-OPS/V2X-Hub/issues/543 ## Motivation and Context NA ## How Has This Been Tested? Local integration testing ## Types of changes - [x] Defect fix (non-breaking change that fixes an issue) - [ ] New feature (non-breaking change that adds functionality) - [ ] Breaking change (fix or feature that cause existing functionality to change) ## Checklist: - [ ] I have added any new packages to the sonar-scanner.properties file - [ ] My change requires a change to the documentation. - [ ] I have updated the documentation accordingly. - [x] I have read the **CONTRIBUTING** document. [V2XHUB Contributing Guide](https://github.com/usdot-fhwa-OPS/V2X-Hub/blob/develop/Contributing.md) - [ ] I have added tests to cover my changes. - [ ] All new and existing tests passed. --- .devcontainer/docker-compose-vscode.yml | 2 +- src/tmx/TmxUtils/src/kafka/kafka_client.cpp | 14 + src/tmx/TmxUtils/src/kafka/kafka_client.h | 1 + .../src/kafka/kafka_consumer_worker.cpp | 10 +- .../src/kafka/kafka_consumer_worker.h | 1 + .../src/kafka/kafka_producer_worker.cpp | 52 ++- .../src/kafka/kafka_producer_worker.h | 27 +- .../test/test_kafka_producer_worker.cpp | 15 + .../src/CARMAStreetsPlugin.cpp | 397 ++++++------------ .../src/CARMAStreetsPlugin.h | 26 +- 10 files changed, 269 insertions(+), 276 deletions(-) diff --git a/.devcontainer/docker-compose-vscode.yml b/.devcontainer/docker-compose-vscode.yml index c3e19d9a8..a34dd25c9 100755 --- a/.devcontainer/docker-compose-vscode.yml +++ b/.devcontainer/docker-compose-vscode.yml @@ -72,7 +72,7 @@ services: DOCKER_HOST_IP: 127.0.0.1 KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_CREATE_TOPICS: "time_sync:1:1" + KAFKA_CREATE_TOPICS: "time_sync:1:1,modified_spat:1:1,v2xhub_scheduling_plan_sub:1:1,v2xhub_ssm_sub:1:1" KAFKA_LOG_DIRS: "/kafka/kafka-logs" volumes: - /var/run/docker.sock:/var/run/docker.sock diff --git a/src/tmx/TmxUtils/src/kafka/kafka_client.cpp b/src/tmx/TmxUtils/src/kafka/kafka_client.cpp index df5346f4f..925b35aa4 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_client.cpp +++ b/src/tmx/TmxUtils/src/kafka/kafka_client.cpp @@ -34,5 +34,19 @@ namespace tmx::utils } } + std::shared_ptr kafka_client::create_producer(const std::string &bootstrap_server) const + { + try + { + auto producer_ptr = std::make_shared(bootstrap_server); + return producer_ptr; + } + catch (const std::runtime_error &e) + { + FILE_LOG(logERROR) << "Create producer failure: " << e.what() << std::endl; + exit(1); + } + } + } \ No newline at end of file diff --git a/src/tmx/TmxUtils/src/kafka/kafka_client.h b/src/tmx/TmxUtils/src/kafka/kafka_client.h index b4da5e059..778a17026 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_client.h +++ b/src/tmx/TmxUtils/src/kafka/kafka_client.h @@ -15,6 +15,7 @@ namespace tmx::utils std::shared_ptr create_consumer(const std::string &broker_str, const std::string &topic_str, const std::string &group_id_str) const; std::shared_ptr create_producer(const std::string &broker_str, const std::string &topic_str) const; + std::shared_ptr create_producer(const std::string &bootstrap_server) const; }; } diff --git a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp index 74c4e2735..a0a3d2c8f 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp +++ b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp @@ -42,6 +42,12 @@ namespace tmx::utils return false; } + if (conf->set(ENABLE_AUTO_COMMIT, "true", errstr) != RdKafka::Conf::CONF_OK) + { + FILE_LOG(logWARNING) << "RDKafka conf set enable auto commit failed: " << errstr.c_str() << std::endl; + return false; + } + // set consumer group if (conf->set(GROUP_ID, _group_id_str, errstr) != RdKafka::Conf::CONF_OK) { @@ -88,6 +94,8 @@ namespace tmx::utils void kafka_consumer_worker::stop() { _run = false; + //Close and shutdown the consumer. + _consumer->close(); /*Destroy kafka instance*/ // Wait for RdKafka to decommission. RdKafka::wait_destroyed(5000); } @@ -134,7 +142,7 @@ namespace tmx::utils switch (message->err()) { case RdKafka::ERR__TIMED_OUT: - FILE_LOG(logWARNING) << _consumer->name() << " consume failed: " << message->errstr() << std::endl; + FILE_LOG(logDEBUG4) << _consumer->name() << " consume failed: " << message->errstr() << std::endl; break; case RdKafka::ERR_NO_ERROR: FILE_LOG(logDEBUG1) << _consumer->name() << " read message at offset " << message->offset() << std::endl; diff --git a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h index a3e00c00b..95a9c96a8 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h +++ b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h @@ -40,6 +40,7 @@ namespace tmx::utils { const std::string GROUP_ID="group.id"; const std::string MAX_PARTITION_FETCH_SIZE="max.partition.fetch.bytes"; const std::string ENABLE_PARTITION_END_OF="enable.partition.eof"; + const std::string ENABLE_AUTO_COMMIT="enable.auto.commit"; //maximum size for pulling message from a single partition at a time std::string STR_FETCH_NUM = "10240000"; diff --git a/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp b/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp index 46cbc55e3..e81364c19 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp +++ b/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp @@ -33,13 +33,26 @@ namespace tmx::utils { } + kafka_producer_worker::kafka_producer_worker(const std::string &brokers) + : _broker_str(brokers), _run(true) + { + } + bool kafka_producer_worker::init() + { + if(init_producer()) + { + return init_topic(); + } + return false; + } + + bool kafka_producer_worker::init_producer() { std::string errstr = ""; // Create configuration objects RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); - RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); /*** * Set Configuration properties @@ -75,7 +88,12 @@ namespace tmx::utils delete conf; FILE_LOG(logINFO) << "Created producer: " << _producer->name() << std::endl; + } + bool kafka_producer_worker::init_topic() + { + RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); + std::string errstr = ""; // Create topic handle _topic = RdKafka::Topic::create(_producer, _topics_str, tconf, errstr); if (!_topic) @@ -156,6 +174,38 @@ namespace tmx::utils _producer->poll(0); } + void kafka_producer_worker::send(const std::string& message, const std::string& topic_name ) const + { + bool retry = true; + while (retry) + { + RdKafka::ErrorCode produce_error = _producer->produce(topic_name, + RdKafka::Topic::PARTITION_UA, + RdKafka::Producer::RK_MSG_COPY, + const_cast(message.c_str()), + message.size(), + nullptr, 0, 0, nullptr); + + if (produce_error == RdKafka::ERR_NO_ERROR) { + PLOG(logDEBUG) <<"Queued message:" << message; + retry = false; + } + else + { + PLOG(logERROR) <<"Failed to queue message:" << message <<" with error:" << RdKafka::err2str(produce_error); + if (produce_error == RdKafka::ERR__QUEUE_FULL) { + PLOG(logERROR) <<"Message queue full...retrying..."; + _producer->poll(500); /* ms */ + retry = true; + } + else { + PLOG(logERROR) <<"Unhandled error in queue_kafka_message:" << RdKafka::err2str(produce_error); + retry = false; + } + } + } + } + void kafka_producer_worker::stop() { /* Wait for final messages to be delivered or fail. diff --git a/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h b/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h index a06a5587c..7dde427d1 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h +++ b/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h @@ -53,7 +53,13 @@ namespace tmx::utils * @param topic_str topic producer should produce to. * @param n_partition partition producer should be assigned to. */ - kafka_producer_worker(const std::string &brokers, const std::string &topics, int n_partition = 0); + explicit kafka_producer_worker(const std::string &brokers, const std::string &topics, int n_partition = 0); + /** + * @brief Construct a new kafka producer worker object + * + * @param broker_str network address of kafka broker. + */ + explicit kafka_producer_worker(const std::string &brokers); /** * @brief Initialize kafka_producer_worker. This method must be called before send! * @@ -61,12 +67,29 @@ namespace tmx::utils * @return false if unsuccessful. */ virtual bool init(); + /** + * @brief Initialize kafka topic. + * @return true if successful. + * @return false if unsuccessful. + */ + virtual bool init_topic(); + /** + * @brief Initialize kafka producer. + * @return true if successful. + * @return false if unsuccessful. + */ + virtual bool init_producer(); /** * @brief Produce to topic. Will result in segmentation fault if init() is not called on producer first. - * * @param msg message to produce. */ virtual void send(const std::string &msg); + /** + * @brief Produce to a specific topic. Will result in segmentation fault if init() is not called on producer first. + * @param msg message to produce. + * @param topic_name topic to send the message to. + */ + virtual void send(const std::string& message, const std::string& topic_name ) const; /** * @brief Is kafka_producer_worker still running? * diff --git a/src/tmx/TmxUtils/test/test_kafka_producer_worker.cpp b/src/tmx/TmxUtils/test/test_kafka_producer_worker.cpp index 28aaf6159..ca04a1e19 100644 --- a/src/tmx/TmxUtils/test/test_kafka_producer_worker.cpp +++ b/src/tmx/TmxUtils/test/test_kafka_producer_worker.cpp @@ -15,3 +15,18 @@ TEST(test_kafka_producer_worker, create_producer) worker->send(msg); worker->stop(); } + +TEST(test_kafka_producer_worker, create_producer_no_topic) +{ + std::string broker_str = "localhost:9092"; + std::string topic = "test"; + auto client = std::make_shared(); + std::shared_ptr worker; + worker = client->create_producer(broker_str); + worker->init(); + worker->printCurrConf(); + std::string msg = "test message"; + // // Run this unit test without launching kafka broker will throw connection refused error + worker->send(msg, topic); + worker->stop(); +} \ No newline at end of file diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp index 08bc96d8a..71703bd59 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp @@ -20,7 +20,6 @@ namespace CARMAStreetsPlugin { */ CARMAStreetsPlugin::CARMAStreetsPlugin(string name) : PluginClient(name) { - AddMessageFilter < BsmMessage > (this, &CARMAStreetsPlugin::HandleBasicSafetyMessage); AddMessageFilter < tsm3Message > (this, &CARMAStreetsPlugin::HandleMobilityOperationMessage); AddMessageFilter < tsm2Message > (this, &CARMAStreetsPlugin::HandleMobilityPathMessage); @@ -32,6 +31,10 @@ CARMAStreetsPlugin::CARMAStreetsPlugin(string name) : } CARMAStreetsPlugin::~CARMAStreetsPlugin() { + //Todo: It does not seem the desctructor is called. + _spat_kafka_consumer_ptr->stop(); + _scheduing_plan_kafka_consumer_ptr->stop(); + _ssm_kafka_consumer_ptr->stop(); } void CARMAStreetsPlugin::UpdateConfigSettings() { @@ -64,96 +67,41 @@ void CARMAStreetsPlugin::UpdateConfigSettings() { getline( ss, substring, ','); _strategies.push_back( substring); } +} + +void CARMAStreetsPlugin::InitKafkaConsumerProducers() +{ std::string kafkaConnectString = _kafkaBrokerIp + ':' + _kafkaBrokerPort; std::string error_string; kafkaConnectString = _kafkaBrokerIp + ':' + _kafkaBrokerPort; - kafka_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); - kafka_conf_sp_consumer = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); - kafka_conf_spat_consumer = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); - kafka_conf_ssm_consumer = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); - - - PLOG(logDEBUG) <<"Attempting to connect to " << kafkaConnectString; - if ((kafka_conf->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK)) { - PLOG(logERROR) <<"Setting kafka config options failed with error:" << error_string << "\n" <<"Exiting with exit code 1"; - exit(1); - } else { - PLOG(logDEBUG) <<"Kafka config options set successfully"; - } - - kafka_producer = RdKafka::Producer::create(kafka_conf, error_string); - if (!kafka_producer) { - PLOG(logERROR) <<"Creating kafka producer failed with error:" << error_string << "\n" <<"Exiting with exit code 1"; - exit(1); - } - PLOG(logDEBUG) <<"Kafka producer created"; - - if (kafka_conf_sp_consumer->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK - || (kafka_conf_sp_consumer->set("group.id", _subscribeToSchedulingPlanConsumerGroupId, error_string) != RdKafka::Conf::CONF_OK) - || (kafka_conf_spat_consumer->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK) - || (kafka_conf_spat_consumer->set("group.id", _subscribeToSpatConsumerGroupId, error_string) != RdKafka::Conf::CONF_OK) - || (kafka_conf_ssm_consumer->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK) - || (kafka_conf_ssm_consumer->set("group.id", _subscribeToSSMConsumerGroupId, error_string) != RdKafka::Conf::CONF_OK) - ) { - PLOG(logERROR) <<"Setting kafka config group.id options failed with error:" << error_string << "\n" <<"Exiting with exit code 1"; - exit(1); - } else { - PLOG(logDEBUG) <<"Kafka config group.id options set successfully"; - } - kafka_conf_sp_consumer->set("enable.partition.eof", "true", error_string); - kafka_conf_spat_consumer->set("enable.partition.eof", "true", error_string); - kafka_conf_ssm_consumer->set("enable.partition.eof", "true", error_string); - - _scheduing_plan_kafka_consumer = RdKafka::KafkaConsumer::create(kafka_conf_sp_consumer, error_string); - _spat_kafka_consumer = RdKafka::KafkaConsumer::create(kafka_conf_spat_consumer, error_string); - _ssm_kafka_consumer = RdKafka::KafkaConsumer::create(kafka_conf_ssm_consumer, error_string); + kafka_client client; - if ( !_scheduing_plan_kafka_consumer || !_spat_kafka_consumer || !_ssm_kafka_consumer) { - PLOG(logERROR) << "Failed to create Kafka consumers: " << error_string << std::endl; - exit(1); - } - PLOG(logDEBUG) << "Created consumer " << _scheduing_plan_kafka_consumer->name() << std::endl; - PLOG(logDEBUG) << "Created consumer " << _spat_kafka_consumer->name() << std::endl; - PLOG(logDEBUG) << "Created consumer " << _ssm_kafka_consumer->name() << std::endl; - - //create kafka topics - RdKafka::Conf *tconf_spat = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); - RdKafka::Conf *tconf_sp = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); - RdKafka::Conf *tconf_ssm = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); - if(!tconf_spat && !tconf_sp) + //Producer + _kafka_producer_ptr = client.create_producer(kafkaConnectString); + if(!_kafka_producer_ptr->init_producer()) { - PLOG(logERROR) << "RDKafka create topic conf failed "; return; - } - - _scheduing_plan_topic = RdKafka::Topic::create(_scheduing_plan_kafka_consumer,_subscribeToSchedulingPlanTopic,tconf_sp,error_string); - if(!_scheduing_plan_topic) - { - PLOG(logERROR) << "RDKafka create scheduing plan topic failed:" << error_string; - return ; } - _spat_topic = RdKafka::Topic::create(_spat_kafka_consumer,_subscribeToSpatTopic,tconf_spat,error_string); - if(!_spat_topic) + //Consumers + _spat_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSpatTopic,_subscribeToSpatConsumerGroupId); + _scheduing_plan_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSchedulingPlanTopic,_subscribeToSchedulingPlanConsumerGroupId); + _ssm_kafka_consumer_ptr = client.create_consumer(kafkaConnectString, _subscribeToSsmTopic,_subscribeToSSMConsumerGroupId); + if(!_scheduing_plan_kafka_consumer_ptr || !_spat_kafka_consumer_ptr || !_ssm_kafka_consumer_ptr) { - PLOG(logERROR) << "RDKafka create SPAT topic failed:" << error_string; - return ; + PLOG(logERROR) <<"Failed to create Kafka consumers."; + return; } - - _ssm_topic = RdKafka::Topic::create(_ssm_kafka_consumer,_subscribeToSsmTopic,tconf_ssm,error_string); - if(!_ssm_topic) + PLOG(logDEBUG) <<"Kafka consumers created"; + if(!_spat_kafka_consumer_ptr->init() || !_scheduing_plan_kafka_consumer_ptr->init() || !_ssm_kafka_consumer_ptr->init()) { - PLOG(logERROR) << "RDKafka create SSM topic failed:" << error_string; - return ; + PLOG(logERROR) <<"Kafka consumers init() failed!"; + return; } - delete tconf_sp; - delete tconf_spat; - delete tconf_ssm; - boost::thread thread_schpl(&CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic, this); boost::thread thread_spat(&CARMAStreetsPlugin::SubscribeSpatKafkaTopic, this); - boost::thread thread_ssm(&CARMAStreetsPlugin::SubscribeSSMKafkaTopic, this); + boost::thread thread_ssm(&CARMAStreetsPlugin::SubscribeSSMKafkaTopic, this); } void CARMAStreetsPlugin::OnConfigChanged(const char *key, const char *value) { @@ -167,8 +115,7 @@ void CARMAStreetsPlugin::HandleMobilityOperationMessage(tsm3Message &msg, routea auto mobilityOperation = msg.get_j2735_data(); PLOG(logDEBUG) << "Body OperationParams : " << mobilityOperation->body.operationParams.buf << "\n" << "Body Strategy : " << mobilityOperation->body.strategy.buf<< "\n" - <<"Queueing kafka message:topic:" << _transmitMobilityOperationTopic << " " - << kafka_producer->outq_len() <<"messages already in queue"; + <<"Queueing kafka message:topic:" << _transmitMobilityOperationTopic; std::stringstream strat; std::stringstream payload; @@ -504,34 +451,7 @@ void CARMAStreetsPlugin::HandleMapMessage(MapDataMessage &msg, routeable_message void CARMAStreetsPlugin::produce_kafka_msg(const string& message, const string& topic_name) const { - bool retry = true; - while (retry) - { - RdKafka::ErrorCode produce_error = kafka_producer->produce(topic_name, - RdKafka::Topic::PARTITION_UA, - RdKafka::Producer::RK_MSG_COPY, - const_cast(message.c_str()), - message.size(), - nullptr, 0, 0, nullptr); - - if (produce_error == RdKafka::ERR_NO_ERROR) { - PLOG(logDEBUG) <<"Queued message:" << message; - retry = false; - } - else - { - PLOG(logERROR) <<"Failed to queue message:" << message <<" with error:" << RdKafka::err2str(produce_error); - if (produce_error == RdKafka::ERR__QUEUE_FULL) { - PLOG(logERROR) <<"Message queue full...retrying..."; - kafka_producer->poll(500); /* ms */ - retry = true; - } - else { - PLOG(logERROR) <<"Unhandled error in queue_kafka_message:" << RdKafka::err2str(produce_error); - retry = false; - } - } - } + _kafka_producer_ptr->send(message, topic_name); } void CARMAStreetsPlugin::OnStateChange(IvpPluginState state) { @@ -539,6 +459,7 @@ void CARMAStreetsPlugin::OnStateChange(IvpPluginState state) { if (state == IvpPluginState_registered) { UpdateConfigSettings(); + InitKafkaConsumerProducers(); } } @@ -547,67 +468,54 @@ void CARMAStreetsPlugin::SubscribeSchedulingPlanKafkaTopic() if(_subscribeToSchedulingPlanTopic.length() > 0) { PLOG(logDEBUG) << "SubscribeSchedulingPlanKafkaTopics:" <<_subscribeToSchedulingPlanTopic << std::endl; - std::vector topics; - topics.emplace_back(_subscribeToSchedulingPlanTopic); - - RdKafka::ErrorCode err = _scheduing_plan_kafka_consumer->subscribe(topics); - if (err) - { - PLOG(logERROR) << "Failed to subscribe to " << topics.size() << " topics: " << RdKafka::err2str(err) << std::endl; - return; - } + _scheduing_plan_kafka_consumer_ptr->subscribe(); - while (true) + while (_scheduing_plan_kafka_consumer_ptr->is_running()) { - auto msg = _scheduing_plan_kafka_consumer->consume( 500 ); - if( msg->err() == RdKafka::ERR_NO_ERROR ) + std::string payload_str = _scheduing_plan_kafka_consumer_ptr->consume(500); + if(payload_str.length() > 0) { - auto payload_str = static_cast( msg->payload() ); - if(msg->len() > 0) - { - PLOG(logDEBUG) << "consumed message payload: " << payload_str <(Key_ScheduleMessageSkipped, ++_scheduleMessageSkipped); - continue; - } + PLOG(logDEBUG) << "consumed message payload: " << payload_str <(Key_ScheduleMessageSkipped, ++_scheduleMessageSkipped); + continue; + } - Json::Value metadata = payload_root["metadata"]; - Json::Value payload_json_array = payload_root["payload"]; - - for ( int index = 0; index < payload_json_array.size(); ++index ) + Json::Value metadata = payload_root["metadata"]; + Json::Value payload_json_array = payload_root["payload"]; + + for ( int index = 0; index < payload_json_array.size(); ++index ) + { + PLOG(logDEBUG) << payload_json_array[index] << std::endl; + Json::Value payload_json = payload_json_array[index]; + tsm3EncodedMessage tsm3EncodedMsgs; + if( getEncodedtsm3 (&tsm3EncodedMsgs, metadata, payload_json) ) { - PLOG(logDEBUG) << payload_json_array[index] << std::endl; - Json::Value payload_json = payload_json_array[index]; - tsm3EncodedMessage tsm3EncodedMsgs; - if( getEncodedtsm3 (&tsm3EncodedMsgs, metadata, payload_json) ) - { - tsm3EncodedMsgs.set_flags( IvpMsgFlags_RouteDSRC ); - tsm3EncodedMsgs.addDsrcMetadata(0xBFEE ); - PLOG(logDEBUG) << "tsm3EncodedMsgs: " << tsm3EncodedMsgs; - BroadcastMessage(static_cast( tsm3EncodedMsgs )); - } + tsm3EncodedMsgs.set_flags( IvpMsgFlags_RouteDSRC ); + tsm3EncodedMsgs.addDsrcMetadata(0xBFEE ); + PLOG(logDEBUG) << "tsm3EncodedMsgs: " << tsm3EncodedMsgs; + BroadcastMessage(static_cast( tsm3EncodedMsgs )); } - //Empty payload - if(payload_json_array.empty()) - { - Json::Value payload_json = {}; - tsm3EncodedMessage tsm3EncodedMsgs; - if( getEncodedtsm3 (&tsm3EncodedMsgs, metadata, payload_json) ) - { - tsm3EncodedMsgs.set_flags( IvpMsgFlags_RouteDSRC ); - tsm3EncodedMsgs.addDsrcMetadata(0xBFEE); - PLOG(logDEBUG) << "tsm3EncodedMsgs: " << tsm3EncodedMsgs; - BroadcastMessage(static_cast( tsm3EncodedMsgs )); - } - } } + //Empty payload + if(payload_json_array.empty()) + { + Json::Value payload_json = {}; + tsm3EncodedMessage tsm3EncodedMsgs; + if( getEncodedtsm3 (&tsm3EncodedMsgs, metadata, payload_json) ) + { + tsm3EncodedMsgs.set_flags( IvpMsgFlags_RouteDSRC ); + tsm3EncodedMsgs.addDsrcMetadata(0xBFEE); + PLOG(logDEBUG) << "tsm3EncodedMsgs: " << tsm3EncodedMsgs; + BroadcastMessage(static_cast( tsm3EncodedMsgs )); + } + } } - delete msg; } } @@ -617,64 +525,51 @@ void CARMAStreetsPlugin::SubscribeSpatKafkaTopic(){ if(_subscribeToSpatTopic.length() > 0) { PLOG(logDEBUG) << "SubscribeSpatKafkaTopics:" <<_subscribeToSpatTopic << std::endl; - std::vector topics; - topics.emplace_back(_subscribeToSpatTopic); - - RdKafka::ErrorCode err = _spat_kafka_consumer->subscribe(topics); - if (err) - { - PLOG(logERROR) << "Failed to subscribe to " << topics.size() << " topics: " << RdKafka::err2str(err) << std::endl; - return; - } - //Initialize Json to J2735 Spat convertor + _spat_kafka_consumer_ptr->subscribe(); + //Initialize Json to J2735 Spat convertor JsonToJ2735SpatConverter spat_convertor; - while (true) + while (_spat_kafka_consumer_ptr->is_running()) { - auto msg = _spat_kafka_consumer->consume( 500 ); - if( msg->err() == RdKafka::ERR_NO_ERROR ) + std::string payload_str = _spat_kafka_consumer_ptr->consume(500); + if(payload_str.length() > 0) { - auto payload_str = static_cast( msg->payload() ); - if(msg->len() > 0) + PLOG(logDEBUG) << "consumed message payload: " << payload_str <(Key_SPATMessageSkipped, ++_spatMessageSkipped); + continue; + } + //Convert the SPAT JSON string into J2735 SPAT message and encode it. + auto spat_ptr = std::make_shared(); + spat_convertor.convertJson2Spat(payload_root, spat_ptr.get()); + tmx::messages::SpatEncodedMessage spatEncodedMsg; + try { - PLOG(logDEBUG) << "consumed message payload: " << payload_str <(Key_SPATMessageSkipped, ++_spatMessageSkipped); - continue; - } - //Convert the SPAT JSON string into J2735 SPAT message and encode it. - auto spat_ptr = std::make_shared(); - spat_convertor.convertJson2Spat(payload_root, spat_ptr.get()); - tmx::messages::SpatEncodedMessage spatEncodedMsg; - try - { - spat_convertor.encodeSpat(spat_ptr, spatEncodedMsg); - } - catch (TmxException &ex) - { - // Skip messages that fail to encode. - PLOG(logERROR) << "Failed to encoded SPAT message : \n" << payload_str << std::endl << "Exception encountered: " - << ex.what() << std::endl; - ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SPAT, spat_ptr.get()); - SetStatus(Key_SPATMessageSkipped, ++_spatMessageSkipped); - - continue; - } - + spat_convertor.encodeSpat(spat_ptr, spatEncodedMsg); + } + catch (TmxException &ex) + { + // Skip messages that fail to encode. + PLOG(logERROR) << "Failed to encoded SPAT message : \n" << payload_str << std::endl << "Exception encountered: " + << ex.what() << std::endl; ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SPAT, spat_ptr.get()); - PLOG(logDEBUG) << "SpatEncodedMessage: " << spatEncodedMsg; + SetStatus(Key_SPATMessageSkipped, ++_spatMessageSkipped); - //Broadcast the encoded SPAT message - spatEncodedMsg.set_flags(IvpMsgFlags_RouteDSRC); - spatEncodedMsg.addDsrcMetadata(0x8002); - BroadcastMessage(static_cast(spatEncodedMsg)); + continue; } + + ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SPAT, spat_ptr.get()); + PLOG(logDEBUG) << "SpatEncodedMessage: " << spatEncodedMsg; + + //Broadcast the encoded SPAT message + spatEncodedMsg.set_flags(IvpMsgFlags_RouteDSRC); + spatEncodedMsg.addDsrcMetadata(0x8002); + BroadcastMessage(static_cast(spatEncodedMsg)); } - delete msg; } } } @@ -684,62 +579,49 @@ void CARMAStreetsPlugin::SubscribeSSMKafkaTopic(){ if(_subscribeToSsmTopic.length() > 0) { PLOG(logDEBUG) << "SubscribeSSMKafkaTopics:" <<_subscribeToSsmTopic << std::endl; - std::vector topics; - topics.emplace_back(_subscribeToSsmTopic); - - RdKafka::ErrorCode err = _ssm_kafka_consumer->subscribe(topics); - if (err) - { - PLOG(logERROR) << "Failed to subscribe to " << topics.size() << " topics: " << RdKafka::err2str(err) << std::endl; - return; - } + _ssm_kafka_consumer_ptr->subscribe(); //Initialize Json to J2735 SSM convertor JsonToJ2735SSMConverter ssm_convertor; - while (true) + while (_ssm_kafka_consumer_ptr->is_running()) { - auto msg = _ssm_kafka_consumer->consume( 500 ); - if( msg->err() == RdKafka::ERR_NO_ERROR ) + std::string payload_str = _ssm_kafka_consumer_ptr->consume(500); + if(payload_str.length() > 0) { - auto payload_str = static_cast( msg->payload() ); - if(msg->len() > 0) + PLOG(logDEBUG) << "consumed message payload: " << payload_str <(Key_SSMMessageSkipped, ++_ssmMessageSkipped); + continue; + } + //Convert the SSM JSON string into J2735 SSM message and encode it. + auto ssm_ptr = std::make_shared(); + ssm_convertor.toJ2735SSM(ssmDoc, ssm_ptr); + tmx::messages::SsmEncodedMessage ssmEncodedMsg; + try { - PLOG(logDEBUG) << "consumed message payload: " << payload_str <(Key_SSMMessageSkipped, ++_ssmMessageSkipped); - continue; - } - //Convert the SSM JSON string into J2735 SSM message and encode it. - auto ssm_ptr = std::make_shared(); - ssm_convertor.toJ2735SSM(ssmDoc, ssm_ptr); - tmx::messages::SsmEncodedMessage ssmEncodedMsg; - try - { - ssm_convertor.encodeSSM(ssm_ptr, ssmEncodedMsg); - } - catch (TmxException &ex) - { - // Skip messages that fail to encode. - PLOG(logERROR) << "Failed to encoded SSM message : \n" << payload_str << std::endl << "Exception encountered: " - << ex.what() << std::endl; - ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SignalStatusMessage, ssm_ptr.get()); - SetStatus(Key_SSMMessageSkipped, ++_ssmMessageSkipped); - continue; - } - + ssm_convertor.encodeSSM(ssm_ptr, ssmEncodedMsg); + } + catch (TmxException &ex) + { + // Skip messages that fail to encode. + PLOG(logERROR) << "Failed to encoded SSM message : \n" << payload_str << std::endl << "Exception encountered: " + << ex.what() << std::endl; ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SignalStatusMessage, ssm_ptr.get()); - PLOG(logDEBUG) << "ssmEncodedMsg: " << ssmEncodedMsg; - - //Broadcast the encoded SSM message - ssmEncodedMsg.set_flags(IvpMsgFlags_RouteDSRC); - ssmEncodedMsg.addDsrcMetadata(0x8002); - BroadcastMessage(static_cast(ssmEncodedMsg)); + SetStatus(Key_SSMMessageSkipped, ++_ssmMessageSkipped); + continue; } + + ASN_STRUCT_FREE_CONTENTS_ONLY(asn_DEF_SignalStatusMessage, ssm_ptr.get()); + PLOG(logDEBUG) << "ssmEncodedMsg: " << ssmEncodedMsg; + + //Broadcast the encoded SSM message + ssmEncodedMsg.set_flags(IvpMsgFlags_RouteDSRC); + ssmEncodedMsg.addDsrcMetadata(0x8002); + BroadcastMessage(static_cast(ssmEncodedMsg)); } - delete msg; } } @@ -870,7 +752,6 @@ int CARMAStreetsPlugin::Main() { uint64_t lastSendTime = 0; while (_plugin->state != IvpPluginState_error) { - usleep(100000); //sleep for microseconds set from config. diff --git a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h index 566aaf35c..35df38132 100755 --- a/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h +++ b/src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h @@ -16,11 +16,14 @@ #include #include "J2735MapToJsonConverter.h" #include "JsonToJ2735SpatConverter.h" -#include "J2735ToSRMJsonConverter.h" +#include "J2735ToSRMJsonConverter.h" +#include +#include #include "JsonToJ2735SSMConverter.h" + using namespace std; using namespace tmx; using namespace tmx::utils; @@ -75,6 +78,10 @@ class CARMAStreetsPlugin: public PluginClient { * @param topic_name The name of the topic */ void produce_kafka_msg(const string &msg, const string &topic_name) const; + /** + * @brief Initialize Kafka Producers and Consumers + */ + void InitKafkaConsumerProducers(); private: @@ -93,17 +100,10 @@ class CARMAStreetsPlugin: public PluginClient { std::string _transmitSRMTopic; std::string _kafkaBrokerIp; std::string _kafkaBrokerPort; - RdKafka::Conf *kafka_conf; - RdKafka::Conf *kafka_conf_spat_consumer; - RdKafka::Conf *kafka_conf_sp_consumer; - RdKafka::Conf *kafka_conf_ssm_consumer; - RdKafka::Producer *kafka_producer; - RdKafka::KafkaConsumer *_scheduing_plan_kafka_consumer; - RdKafka::KafkaConsumer *_spat_kafka_consumer; - RdKafka::KafkaConsumer *_ssm_kafka_consumer; - RdKafka::Topic *_scheduing_plan_topic; - RdKafka::Topic *_spat_topic; - RdKafka::Topic *_ssm_topic; + std::shared_ptr _kafka_producer_ptr; + std::shared_ptr _spat_kafka_consumer_ptr; + std::shared_ptr _scheduing_plan_kafka_consumer_ptr; + std::shared_ptr _ssm_kafka_consumer_ptr; std::vector _strategies; tmx::messages::tsm3Message *_tsm3Message{NULL}; std::mutex data_lock; @@ -194,4 +194,4 @@ class CARMAStreetsPlugin: public PluginClient { std::mutex _cfgLock; } -#endif \ No newline at end of file +#endif