From 07c35330b03225c0510f9d2a9d2d41f85cda877f Mon Sep 17 00:00:00 2001 From: dev Date: Mon, 17 Jul 2023 13:32:02 -0400 Subject: [PATCH] Added KafkaTestEnvironment to properly cleanup Kafka objects and avoid Pure Virtual error --- .../src/kafka/kafka_consumer_worker.cpp | 5 ++-- .../src/kafka/kafka_producer_worker.cpp | 13 +++++++-- .../src/kafka/kafka_producer_worker.h | 8 ++---- .../TmxUtils/test/KafkaTestEnvironment.cpp | 28 +++++++++++++++++++ src/tmx/TmxUtils/test/Main.cpp | 2 ++ .../test/test_kafka_consumer_worker.cpp | 1 + .../test/test_kafka_producer_worker.cpp | 2 -- 7 files changed, 47 insertions(+), 12 deletions(-) create mode 100644 src/tmx/TmxUtils/test/KafkaTestEnvironment.cpp diff --git a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp index 5c82f425b..97f9d4373 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp +++ b/src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp @@ -97,11 +97,12 @@ namespace tmx::utils void kafka_consumer_worker::stop() { + FILE_LOG(logWARNING) << "Stopping Kafka Consumer!" << std::endl; _run = false; //Close and shutdown the consumer. _consumer->close(); - /*Destroy kafka instance*/ // Wait for RdKafka to decommission. - RdKafka::wait_destroyed(5000); + FILE_LOG(logWARNING) << "Kafka Consumer Stopped!" << std::endl; + } void kafka_consumer_worker::subscribe() diff --git a/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp b/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp index e81364c19..02d0a0cbb 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp +++ b/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp @@ -38,6 +38,11 @@ namespace tmx::utils { } + kafka_producer_worker::~kafka_producer_worker() { + stop(); + FILE_LOG(logWARNING) << "Kafka Producer Worker Destroyed!" << std::endl; + } + bool kafka_producer_worker::init() { if(init_producer()) @@ -217,16 +222,18 @@ namespace tmx::utils { if (_producer) { - _producer->flush(10 * 1000 /* wait for max 10 seconds */); - + auto error =_producer->flush(10 * 1000 /* wait for max 10 seconds */); + if (error == RdKafka::ERR__TIMED_OUT) + FILE_LOG(logERROR) << "Flush attempt timed out!" << std::endl; if (_producer->outq_len() > 0) - FILE_LOG(logWARNING) << _producer->name() << _producer->outq_len() << " message(s) were not delivered." << std::endl; + FILE_LOG(logERROR) << _producer->name() << _producer->outq_len() << " message(s) were not delivered." << std::endl; } } catch (const std::runtime_error &e) { FILE_LOG(logERROR) << "Error encountered flushing producer : " << e.what() << std::endl; } + FILE_LOG(logWARNING) << "Kafka producer stopped!" << std::endl; } void kafka_producer_worker::printCurrConf() diff --git a/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h b/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h index 7dde427d1..2ea7d4468 100644 --- a/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h +++ b/src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h @@ -60,6 +60,8 @@ namespace tmx::utils * @param broker_str network address of kafka broker. */ explicit kafka_producer_worker(const std::string &brokers); + + virtual ~kafka_producer_worker(); /** * @brief Initialize kafka_producer_worker. This method must be called before send! * @@ -105,11 +107,7 @@ namespace tmx::utils * @brief Print current configurations. */ virtual void printCurrConf(); - /** - * @brief Destroy the kafka producer worker object - * - */ - virtual ~kafka_producer_worker() = default; + }; } diff --git a/src/tmx/TmxUtils/test/KafkaTestEnvironment.cpp b/src/tmx/TmxUtils/test/KafkaTestEnvironment.cpp new file mode 100644 index 000000000..374a7ddaf --- /dev/null +++ b/src/tmx/TmxUtils/test/KafkaTestEnvironment.cpp @@ -0,0 +1,28 @@ +#include "gtest/gtest.h" +#include + +/** + * @brief Kafka Test Environment which allows for Setup/Teardown configuration at the + * test program level. Teardown waits on all rd_kafka_t objects to be destroyed. + */ +class KafkaTestEnvironment : public ::testing::Environment { + public: + ~KafkaTestEnvironment() override {} + + // Override this to define how to set up the environment. + void SetUp() override {} + + // Override this to define how to tear down the environment. + void TearDown() override { + std::cout << "Waiting for all RDKafka objects to be destroyed!" << std::endl; + // Wait for all rd_kafka_t objects to be destroyed + auto error = RdKafka::wait_destroyed(5000); + if (error == RdKafka::ERR__TIMED_OUT) { + std::cout << "Wait destroy attempted timed out!" << std::endl; + } + else { + std::cout << "All Objects are destroyed!" << std::endl; + } + + } +}; \ No newline at end of file diff --git a/src/tmx/TmxUtils/test/Main.cpp b/src/tmx/TmxUtils/test/Main.cpp index 75163d417..7bffd67ea 100644 --- a/src/tmx/TmxUtils/test/Main.cpp +++ b/src/tmx/TmxUtils/test/Main.cpp @@ -6,9 +6,11 @@ */ #include +#include "KafkaTestEnvironment.cpp" int main(int argc, char **argv) { + ::testing::AddGlobalTestEnvironment(new KafkaTestEnvironment()); testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/src/tmx/TmxUtils/test/test_kafka_consumer_worker.cpp b/src/tmx/TmxUtils/test/test_kafka_consumer_worker.cpp index 2d0d680a7..ba7a33c99 100644 --- a/src/tmx/TmxUtils/test/test_kafka_consumer_worker.cpp +++ b/src/tmx/TmxUtils/test/test_kafka_consumer_worker.cpp @@ -1,5 +1,6 @@ #include "gtest/gtest.h" #include "kafka/kafka_client.h" +#include "PluginLog.h" TEST(test_kafka_consumer_worker, create_consumer) { diff --git a/src/tmx/TmxUtils/test/test_kafka_producer_worker.cpp b/src/tmx/TmxUtils/test/test_kafka_producer_worker.cpp index ca04a1e19..e6a101d3e 100644 --- a/src/tmx/TmxUtils/test/test_kafka_producer_worker.cpp +++ b/src/tmx/TmxUtils/test/test_kafka_producer_worker.cpp @@ -13,7 +13,6 @@ TEST(test_kafka_producer_worker, create_producer) std::string msg = "test message"; // // Run this unit test without launching kafka broker will throw connection refused error worker->send(msg); - worker->stop(); } TEST(test_kafka_producer_worker, create_producer_no_topic) @@ -28,5 +27,4 @@ TEST(test_kafka_producer_worker, create_producer_no_topic) std::string msg = "test message"; // // Run this unit test without launching kafka broker will throw connection refused error worker->send(msg, topic); - worker->stop(); } \ No newline at end of file