Skip to content

Commit

Permalink
[QC-1232] Improve log message when kafka broker url is missing (#2437)
Browse files Browse the repository at this point in the history
* better error handling when kafkaBrokers are missing

* Update Framework/src/Triggers.cxx

Co-authored-by: Piotr Konopka <piotr.jan.konopka@cern.ch>

---------

Co-authored-by: Michal Tichák <michal.tichak@cern.ch>
Co-authored-by: Barthélémy von Haller <barthelemy.von.haller@gmail.com>
Co-authored-by: Piotr Konopka <piotr.jan.konopka@cern.ch>
  • Loading branch information
4 people authored Sep 26, 2024
1 parent fa48f59 commit 4e33ac4
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 5 deletions.
22 changes: 17 additions & 5 deletions Framework/src/KafkaPoller.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "proto/events.pb.h"
#include <chrono>
#include <ranges>
#include <stdexcept>

namespace o2::quality_control::core
{
Expand Down Expand Up @@ -112,10 +113,21 @@ bool end_of_run::isValid(const events::Event& event, const std::string& environm

kafka::Properties createProperties(const std::string& brokers, const std::string& groupId)
{
return { { { "bootstrap.servers", { brokers } },
{ "group.id", { groupId } },
{ "enable.auto.commit", { "true" } },
{ "auto.offset.reset", { "latest" } } } };
if (brokers.empty()) {
constexpr std::string_view message{ "You are trying to start KafkaPoller without any brokers" };
ILOG(Fatal, Ops) << message << ENDM;
throw std::invalid_argument{ message.data() };
}

auto properties = kafka::Properties{ { { "bootstrap.servers", { brokers } },
{ "enable.auto.commit", { "true" } },
{ "auto.offset.reset", { "latest" } } } };

if (!groupId.empty()) {
properties.put("group.id", groupId);
}

return properties;
}

KafkaPoller::KafkaPoller(const std::string& brokers, const std::string& groupId)
Expand All @@ -130,7 +142,7 @@ void KafkaPoller::subscribe(const std::string& topic, size_t numberOfRetries)
mConsumer.subscribe({ topic });
return;
} catch (const kafka::KafkaException& ex) {
// it sometimes happen that subscibe timeouts but another retry succeeds
// it sometimes happens that subscibe timeouts but another retry succeeds
if (ex.error().value() != RD_KAFKA_RESP_ERR__TIMED_OUT) {
throw;
} else {
Expand Down
21 changes: 21 additions & 0 deletions Framework/src/Triggers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,25 @@ std::string createKafkaGroupId(std::string_view prefix, std::string_view detecto
return groupId;
}

bool checkKafkaParams(const std::string& kafkaBrokers, const std::string& topic, const std::string_view triggerTypeLogId)
{
if (kafkaBrokers.empty()) {
ILOG(Error, Support) << "You are tring to create " << triggerTypeLogId << " trigger using Kafka without any brokers, fill config value 'kafkaBrokersUrl'" << ENDM;
return false;
}
if (topic.empty()) {
ILOG(Error, Support) << "You are tring to consume empty Kafka topic from '" << triggerTypeLogId << "' trigger, fill config value 'kafkaTopic'" << ENDM;
return false;
}
return true;
}

TriggerFcn StartOfRun(const std::string& kafkaBrokers, const std::string& topic, const std::string& detector, const std::string& taskName, const core::Activity& activity)
{
if (!checkKafkaParams(kafkaBrokers, topic, "SOR")) {
throw std::invalid_argument{ "We don't have enough information to consume Kafka. Check IL" };
}

auto copiedActivity = activity;
auto poller = std::make_shared<core::KafkaPoller>(kafkaBrokers, createKafkaGroupId("SOR_postprocessing", detector, taskName));
poller->subscribe(topic);
Expand Down Expand Up @@ -117,6 +134,10 @@ TriggerFcn Never(const Activity& activity)

TriggerFcn EndOfRun(const std::string& kafkaBrokers, const std::string& topic, const std::string& detector, const std::string& taskName, const Activity& activity)
{
if (!checkKafkaParams(kafkaBrokers, topic, "EOR")) {
throw std::invalid_argument{ "We don't have enough information to consume Kafka. Check IL" };
}

auto copiedActivity = activity;
auto poller = std::make_shared<core::KafkaPoller>(kafkaBrokers, createKafkaGroupId("EOR_postprocessing", detector, taskName));
poller->subscribe(topic);
Expand Down
16 changes: 16 additions & 0 deletions Framework/test/testKafkaTests.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,22 @@ void sendSorAndTeardown()
sendMessage(producer, createEorTeardownProtoMessage());
}

TEST_CASE("test_kafka_empty_brokers", "[.manual_kafka]")
{
using namespace o2::quality_control::core;
REQUIRE_THROWS_AS(KafkaPoller("", ""), std::invalid_argument);
}

TEST_CASE("test_kafka_poller_soreor_empty_brokers", "[.manual_kafka]")
{
using namespace o2::quality_control;
core::Activity activityMatching{};
REQUIRE_THROWS_AS(postprocessing::triggers::StartOfRun("", testTopic, "TST", "sor_test_matching", activityMatching), std::invalid_argument);
REQUIRE_THROWS_AS(postprocessing::triggers::StartOfRun("emptystring", "", "TST", "sor_test_matching", activityMatching), std::invalid_argument);
REQUIRE_THROWS_AS(postprocessing::triggers::EndOfRun("", testTopic, "TST", "sor_test_matching", activityMatching), std::invalid_argument);
REQUIRE_THROWS_AS(postprocessing::triggers::EndOfRun("emptystring", "", "TST", "sor_test_matching", activityMatching), std::invalid_argument);
}

TEST_CASE("test_kafka_poller_soreor", "[.manual_kafka]")
{
const auto kafkaCluster = getClusterURLFromEnv();
Expand Down

0 comments on commit 4e33ac4

Please sign in to comment.