Skip to content

Commit

Permalink
Added KafkaTestEnvironment to properly cleanup Kafka objects and avoi…
Browse files Browse the repository at this point in the history
…d Pure Virtual error
  • Loading branch information
paulbourelly999 committed Jul 17, 2023
1 parent 5819a5f commit 07c3533
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 12 deletions.
5 changes: 3 additions & 2 deletions src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,12 @@ namespace tmx::utils

void kafka_consumer_worker::stop()
{
FILE_LOG(logWARNING) << "Stopping Kafka Consumer!" << std::endl;
_run = false;
//Close and shutdown the consumer.
_consumer->close();
/*Destroy kafka instance*/ // Wait for RdKafka to decommission.
RdKafka::wait_destroyed(5000);
FILE_LOG(logWARNING) << "Kafka Consumer Stopped!" << std::endl;

}

void kafka_consumer_worker::subscribe()
Expand Down
13 changes: 10 additions & 3 deletions src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ namespace tmx::utils
{
}

kafka_producer_worker::~kafka_producer_worker() {
stop();
FILE_LOG(logWARNING) << "Kafka Producer Worker Destroyed!" << std::endl;
}

bool kafka_producer_worker::init()
{
if(init_producer())
Expand Down Expand Up @@ -217,16 +222,18 @@ namespace tmx::utils
{
if (_producer)
{
_producer->flush(10 * 1000 /* wait for max 10 seconds */);

auto error =_producer->flush(10 * 1000 /* wait for max 10 seconds */);
if (error == RdKafka::ERR__TIMED_OUT)
FILE_LOG(logERROR) << "Flush attempt timed out!" << std::endl;
if (_producer->outq_len() > 0)
FILE_LOG(logWARNING) << _producer->name() << _producer->outq_len() << " message(s) were not delivered." << std::endl;
FILE_LOG(logERROR) << _producer->name() << _producer->outq_len() << " message(s) were not delivered." << std::endl;
}
}
catch (const std::runtime_error &e)
{
FILE_LOG(logERROR) << "Error encountered flushing producer : " << e.what() << std::endl;
}
FILE_LOG(logWARNING) << "Kafka producer stopped!" << std::endl;
}

void kafka_producer_worker::printCurrConf()
Expand Down
8 changes: 3 additions & 5 deletions src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ namespace tmx::utils
* @param broker_str network address of kafka broker.
*/
explicit kafka_producer_worker(const std::string &brokers);

virtual ~kafka_producer_worker();
/**
* @brief Initialize kafka_producer_worker. This method must be called before send!
*
Expand Down Expand Up @@ -105,11 +107,7 @@ namespace tmx::utils
* @brief Print current configurations.
*/
virtual void printCurrConf();
/**
* @brief Destroy the kafka producer worker object
*
*/
virtual ~kafka_producer_worker() = default;

};
}

Expand Down
28 changes: 28 additions & 0 deletions src/tmx/TmxUtils/test/KafkaTestEnvironment.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#include "gtest/gtest.h"
#include <librdkafka/rdkafkacpp.h>

/**
* @brief Kafka Test Environment which allows for Setup/Teardown configuration at the
* test program level. Teardown waits on all rd_kafka_t objects to be destroyed.
*/
class KafkaTestEnvironment : public ::testing::Environment {
public:
~KafkaTestEnvironment() override {}

// Override this to define how to set up the environment.
void SetUp() override {}

// Override this to define how to tear down the environment.
void TearDown() override {
std::cout << "Waiting for all RDKafka objects to be destroyed!" << std::endl;
// Wait for all rd_kafka_t objects to be destroyed
auto error = RdKafka::wait_destroyed(5000);
if (error == RdKafka::ERR__TIMED_OUT) {
std::cout << "Wait destroy attempted timed out!" << std::endl;
}
else {
std::cout << "All Objects are destroyed!" << std::endl;
}

}
};
2 changes: 2 additions & 0 deletions src/tmx/TmxUtils/test/Main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
*/

#include <gtest/gtest.h>
#include "KafkaTestEnvironment.cpp"

int main(int argc, char **argv)
{
::testing::AddGlobalTestEnvironment(new KafkaTestEnvironment());
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
1 change: 1 addition & 0 deletions src/tmx/TmxUtils/test/test_kafka_consumer_worker.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "gtest/gtest.h"
#include "kafka/kafka_client.h"
#include "PluginLog.h"

TEST(test_kafka_consumer_worker, create_consumer)
{
Expand Down
2 changes: 0 additions & 2 deletions src/tmx/TmxUtils/test/test_kafka_producer_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ TEST(test_kafka_producer_worker, create_producer)
std::string msg = "test message";
// // Run this unit test without launching kafka broker will throw connection refused error
worker->send(msg);
worker->stop();
}

TEST(test_kafka_producer_worker, create_producer_no_topic)
Expand All @@ -28,5 +27,4 @@ TEST(test_kafka_producer_worker, create_producer_no_topic)
std::string msg = "test message";
// // Run this unit test without launching kafka broker will throw connection refused error
worker->send(msg, topic);
worker->stop();
}

0 comments on commit 07c3533

Please sign in to comment.