Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CARMA Streets Plugin Kafka Consumers #547

Merged
merged 15 commits into from
Jul 7, 2023
2 changes: 1 addition & 1 deletion .devcontainer/docker-compose-vscode.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ services:
DOCKER_HOST_IP: 127.0.0.1
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "time_sync:1:1"
KAFKA_CREATE_TOPICS: "time_sync:1:1,modified_spat:1:1,v2xhub_scheduling_plan_sub:1:1,v2xhub_ssm_sub:1:1"
KAFKA_LOG_DIRS: "/kafka/kafka-logs"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
Expand Down
14 changes: 14 additions & 0 deletions src/tmx/TmxUtils/src/kafka/kafka_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,19 @@ namespace tmx::utils
}
}

std::shared_ptr<kafka_producer_worker> kafka_client::create_producer(const std::string &bootstrap_server) const
{
try
{
auto producer_ptr = std::make_shared<kafka_producer_worker>(bootstrap_server);
return producer_ptr;
}
catch (const std::runtime_error &e)
{
FILE_LOG(logERROR) << "Create producer failure: " << e.what() << std::endl;
exit(1);
}
}


}
1 change: 1 addition & 0 deletions src/tmx/TmxUtils/src/kafka/kafka_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace tmx::utils
std::shared_ptr<kafka_consumer_worker> create_consumer(const std::string &broker_str, const std::string &topic_str,
const std::string &group_id_str) const;
std::shared_ptr<kafka_producer_worker> create_producer(const std::string &broker_str, const std::string &topic_str) const;
std::shared_ptr<kafka_producer_worker> create_producer(const std::string &bootstrap_server) const;
paulbourelly999 marked this conversation as resolved.
Show resolved Hide resolved
};

}
Expand Down
10 changes: 9 additions & 1 deletion src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ namespace tmx::utils
return false;
}

if (conf->set(ENABLE_AUTO_COMMIT, "true", errstr) != RdKafka::Conf::CONF_OK)
paulbourelly999 marked this conversation as resolved.
Show resolved Hide resolved
{
FILE_LOG(logWARNING) << "RDKafka conf set enable auto commit failed: " << errstr.c_str() << std::endl;
return false;
}

// set consumer group
if (conf->set(GROUP_ID, _group_id_str, errstr) != RdKafka::Conf::CONF_OK)
{
Expand Down Expand Up @@ -88,6 +94,8 @@ namespace tmx::utils
void kafka_consumer_worker::stop()
{
_run = false;
//Close and shutdown the consumer.
_consumer->close();
paulbourelly999 marked this conversation as resolved.
Show resolved Hide resolved
/*Destroy kafka instance*/ // Wait for RdKafka to decommission.
RdKafka::wait_destroyed(5000);
}
Expand Down Expand Up @@ -134,7 +142,7 @@ namespace tmx::utils
switch (message->err())
{
case RdKafka::ERR__TIMED_OUT:
FILE_LOG(logWARNING) << _consumer->name() << " consume failed: " << message->errstr() << std::endl;
FILE_LOG(logDEBUG4) << _consumer->name() << " consume failed: " << message->errstr() << std::endl;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about setting this back to log WARNING and just setting the timeout value large to something that indicates an error like 60s or something.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will still print the log constantly when we are in between testing and no data is consumed during the time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think we have any kafka messages that have intervals even close to 60s so it should never print unless we are not receiving kafka messages

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, It could happen as we stop sending BSM/MP/MOM when the vehicle disengage and before the next round of testing. The while loop in the message consumption will keep checking if there are data from those topics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this unique to consumers.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, consumers needs to continuously poll Kafka for data with the while loop. Every poll request, the consumer is telling the Kafka broker that it is alive.

break;
case RdKafka::ERR_NO_ERROR:
FILE_LOG(logDEBUG1) << _consumer->name() << " read message at offset " << message->offset() << std::endl;
Expand Down
1 change: 1 addition & 0 deletions src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ namespace tmx::utils {
const std::string GROUP_ID="group.id";
const std::string MAX_PARTITION_FETCH_SIZE="max.partition.fetch.bytes";
const std::string ENABLE_PARTITION_END_OF="enable.partition.eof";
const std::string ENABLE_AUTO_COMMIT="enable.auto.commit";

//maximum size for pulling message from a single partition at a time
std::string STR_FETCH_NUM = "10240000";
Expand Down
52 changes: 51 additions & 1 deletion src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,26 @@ namespace tmx::utils
{
}

kafka_producer_worker::kafka_producer_worker(const std::string &brokers)
: _broker_str(brokers), _run(true)
{
}

bool kafka_producer_worker::init()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of having two methods here init and init producer and one calls the other. Is it just to reduce method complexity? Also why are we calling the init_producer in the constructor? Seems like both init producer and init_topic are called by init which should be called either way after the constructor. Am I missing something?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is to support creating producer without a topic, we only want to init the producer only. But the original init() func has both init_producer and init_topic portion. So I split them into small chunk which will allow the orignal init() works as it, and allows for reuse of init_producer in support of creating a producer without a topic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest I would prefer you create a super class of kafka producer that is kafka_topicless_producer or the other way around. Kafka Topic Producer inherits from Kafka producer. I think this is weird because it is unclear which init methods need to be called for each or am I missing something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can table that as a future improvement since the kafka_client class abstracts this but could you at least explain how a producer with a topic would be created vs a producer without a topic to me. Is the producer with a topic required to initialize the topic while the other producer only initializes the producer? Is that correct?

Copy link
Collaborator Author

@dan-du-car dan-du-car Jul 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the latest code. I removed the init_producer() from constructor and allow the plugin to call it and give the plugin a bit more control like the consumers calling init().

{
if(init_producer())
{
return init_topic();
}
return false;
}

bool kafka_producer_worker::init_producer()
{
std::string errstr = "";

// Create configuration objects
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

/***
* Set Configuration properties
Expand Down Expand Up @@ -75,7 +88,12 @@ namespace tmx::utils
delete conf;

FILE_LOG(logINFO) << "Created producer: " << _producer->name() << std::endl;
}

bool kafka_producer_worker::init_topic()
{
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
std::string errstr = "";
// Create topic handle
_topic = RdKafka::Topic::create(_producer, _topics_str, tconf, errstr);
if (!_topic)
Expand Down Expand Up @@ -156,6 +174,38 @@ namespace tmx::utils
_producer->poll(0);
}

void kafka_producer_worker::send(const std::string& message, const std::string& topic_name ) const
{
bool retry = true;
while (retry)
{
RdKafka::ErrorCode produce_error = _producer->produce(topic_name,
RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()),
message.size(),
nullptr, 0, 0, nullptr);

if (produce_error == RdKafka::ERR_NO_ERROR) {
PLOG(logDEBUG) <<"Queued message:" << message;
retry = false;
}
else
{
PLOG(logERROR) <<"Failed to queue message:" << message <<" with error:" << RdKafka::err2str(produce_error);
if (produce_error == RdKafka::ERR__QUEUE_FULL) {
PLOG(logERROR) <<"Message queue full...retrying...";
_producer->poll(500); /* ms */
retry = true;
}
else {
PLOG(logERROR) <<"Unhandled error in queue_kafka_message:" << RdKafka::err2str(produce_error);
retry = false;
}
}
}
}

void kafka_producer_worker::stop()
{
/* Wait for final messages to be delivered or fail.
Expand Down
27 changes: 25 additions & 2 deletions src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,43 @@ namespace tmx::utils
* @param topic_str topic producer should produce to.
* @param n_partition partition producer should be assigned to.
*/
kafka_producer_worker(const std::string &brokers, const std::string &topics, int n_partition = 0);
explicit kafka_producer_worker(const std::string &brokers, const std::string &topics, int n_partition = 0);
/**
* @brief Construct a new kafka producer worker object
*
* @param broker_str network address of kafka broker.
*/
explicit kafka_producer_worker(const std::string &brokers);
/**
* @brief Initialize kafka_producer_worker. This method must be called before send!
*
* @return true if successful.
* @return false if unsuccessful.
*/
virtual bool init();
/**
* @brief Initialize kafka topic.
* @return true if successful.
* @return false if unsuccessful.
*/
virtual bool init_topic();
/**
* @brief Initialize kafka producer.
* @return true if successful.
* @return false if unsuccessful.
*/
virtual bool init_producer();
/**
* @brief Produce to topic. Will result in segmentation fault if init() is not called on producer first.
*
* @param msg message to produce.
*/
virtual void send(const std::string &msg);
/**
* @brief Produce to a specific topic. Will result in segmentation fault if init() is not called on producer first.
* @param msg message to produce.
* @param topic_name topic to send the message to.
*/
virtual void send(const std::string& message, const std::string& topic_name ) const;
/**
* @brief Is kafka_producer_worker still running?
*
Expand Down
15 changes: 15 additions & 0 deletions src/tmx/TmxUtils/test/test_kafka_producer_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,18 @@ TEST(test_kafka_producer_worker, create_producer)
worker->send(msg);
worker->stop();
}

TEST(test_kafka_producer_worker, create_producer_no_topic)
{
std::string broker_str = "localhost:9092";
std::string topic = "test";
auto client = std::make_shared<tmx::utils::kafka_client>();
std::shared_ptr<tmx::utils::kafka_producer_worker> worker;
worker = client->create_producer(broker_str);
worker->init();
worker->printCurrConf();
std::string msg = "test message";
// // Run this unit test without launching kafka broker will throw connection refused error
worker->send(msg, topic);
worker->stop();
}
Loading