Skip to content

Commit

Permalink
CARMA Streets Plugin Kafka Consumers (#547)
Browse files Browse the repository at this point in the history
<!-- Thanks for the contribution, this is awesome. -->

# PR Details
## Description
- Remove the kafka consumer/producer initialization during config
parameter update.
<!--- Describe your changes in detail -->
- Replace consumer creation with v2xhub kafka_client library.
- Add producer creation in kafka_client library
## Related Issue
#543
<!--- This project only accepts pull requests related to open issues -->
<!--- If suggesting a new feature or change, please discuss it in an
issue first -->
<!--- If fixing a bug, there should be an issue describing it with steps
to reproduce -->
<!--- Please link to the issue here: -->

## Motivation and Context
NA
<!--- Why is this change required? What problem does it solve? -->

## How Has This Been Tested?
Local integration testing
<!--- Please describe in detail how you tested your changes. -->
<!--- Include details of your testing environment, and the tests you ran
to -->
<!--- see how your change affects other areas of the code, etc. -->

## Types of changes

<!--- What types of changes does your code introduce? Put an `x` in all
the boxes that apply: -->

- [x] Defect fix (non-breaking change that fixes an issue)
- [ ] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that cause existing functionality
to change)

## Checklist:

<!--- Go over all the following points, and put an `x` in all the boxes
that apply. -->
<!--- If you're unsure about any of these, don't hesitate to ask. We're
here to help! -->

- [ ] I have added any new packages to the sonar-scanner.properties file
- [ ] My change requires a change to the documentation.
- [ ] I have updated the documentation accordingly.
- [x] I have read the **CONTRIBUTING** document.
[V2XHUB Contributing
Guide](https://github.com/usdot-fhwa-OPS/V2X-Hub/blob/develop/Contributing.md)
- [ ] I have added tests to cover my changes.
- [ ] All new and existing tests passed.
  • Loading branch information
dan-du-car authored Jul 7, 2023
1 parent b708d32 commit 1c5a862
Show file tree
Hide file tree
Showing 10 changed files with 269 additions and 276 deletions.
2 changes: 1 addition & 1 deletion .devcontainer/docker-compose-vscode.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ services:
DOCKER_HOST_IP: 127.0.0.1
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "time_sync:1:1"
KAFKA_CREATE_TOPICS: "time_sync:1:1,modified_spat:1:1,v2xhub_scheduling_plan_sub:1:1,v2xhub_ssm_sub:1:1"
KAFKA_LOG_DIRS: "/kafka/kafka-logs"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
Expand Down
14 changes: 14 additions & 0 deletions src/tmx/TmxUtils/src/kafka/kafka_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,19 @@ namespace tmx::utils
}
}

std::shared_ptr<kafka_producer_worker> kafka_client::create_producer(const std::string &bootstrap_server) const
{
try
{
auto producer_ptr = std::make_shared<kafka_producer_worker>(bootstrap_server);
return producer_ptr;
}
catch (const std::runtime_error &e)
{
FILE_LOG(logERROR) << "Create producer failure: " << e.what() << std::endl;
exit(1);
}
}


}
1 change: 1 addition & 0 deletions src/tmx/TmxUtils/src/kafka/kafka_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace tmx::utils
std::shared_ptr<kafka_consumer_worker> create_consumer(const std::string &broker_str, const std::string &topic_str,
const std::string &group_id_str) const;
std::shared_ptr<kafka_producer_worker> create_producer(const std::string &broker_str, const std::string &topic_str) const;
std::shared_ptr<kafka_producer_worker> create_producer(const std::string &bootstrap_server) const;
};

}
Expand Down
10 changes: 9 additions & 1 deletion src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ namespace tmx::utils
return false;
}

if (conf->set(ENABLE_AUTO_COMMIT, "true", errstr) != RdKafka::Conf::CONF_OK)
{
FILE_LOG(logWARNING) << "RDKafka conf set enable auto commit failed: " << errstr.c_str() << std::endl;
return false;
}

// set consumer group
if (conf->set(GROUP_ID, _group_id_str, errstr) != RdKafka::Conf::CONF_OK)
{
Expand Down Expand Up @@ -88,6 +94,8 @@ namespace tmx::utils
void kafka_consumer_worker::stop()
{
_run = false;
//Close and shutdown the consumer.
_consumer->close();
/*Destroy kafka instance*/ // Wait for RdKafka to decommission.
RdKafka::wait_destroyed(5000);
}
Expand Down Expand Up @@ -134,7 +142,7 @@ namespace tmx::utils
switch (message->err())
{
case RdKafka::ERR__TIMED_OUT:
FILE_LOG(logWARNING) << _consumer->name() << " consume failed: " << message->errstr() << std::endl;
FILE_LOG(logDEBUG4) << _consumer->name() << " consume failed: " << message->errstr() << std::endl;
break;
case RdKafka::ERR_NO_ERROR:
FILE_LOG(logDEBUG1) << _consumer->name() << " read message at offset " << message->offset() << std::endl;
Expand Down
1 change: 1 addition & 0 deletions src/tmx/TmxUtils/src/kafka/kafka_consumer_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ namespace tmx::utils {
const std::string GROUP_ID="group.id";
const std::string MAX_PARTITION_FETCH_SIZE="max.partition.fetch.bytes";
const std::string ENABLE_PARTITION_END_OF="enable.partition.eof";
const std::string ENABLE_AUTO_COMMIT="enable.auto.commit";

//maximum size for pulling message from a single partition at a time
std::string STR_FETCH_NUM = "10240000";
Expand Down
52 changes: 51 additions & 1 deletion src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,26 @@ namespace tmx::utils
{
}

kafka_producer_worker::kafka_producer_worker(const std::string &brokers)
: _broker_str(brokers), _run(true)
{
}

bool kafka_producer_worker::init()
{
if(init_producer())
{
return init_topic();
}
return false;
}

bool kafka_producer_worker::init_producer()
{
std::string errstr = "";

// Create configuration objects
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

/***
* Set Configuration properties
Expand Down Expand Up @@ -75,7 +88,12 @@ namespace tmx::utils
delete conf;

FILE_LOG(logINFO) << "Created producer: " << _producer->name() << std::endl;
}

bool kafka_producer_worker::init_topic()
{
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
std::string errstr = "";
// Create topic handle
_topic = RdKafka::Topic::create(_producer, _topics_str, tconf, errstr);
if (!_topic)
Expand Down Expand Up @@ -156,6 +174,38 @@ namespace tmx::utils
_producer->poll(0);
}

void kafka_producer_worker::send(const std::string& message, const std::string& topic_name ) const
{
bool retry = true;
while (retry)
{
RdKafka::ErrorCode produce_error = _producer->produce(topic_name,
RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()),
message.size(),
nullptr, 0, 0, nullptr);

if (produce_error == RdKafka::ERR_NO_ERROR) {
PLOG(logDEBUG) <<"Queued message:" << message;
retry = false;
}
else
{
PLOG(logERROR) <<"Failed to queue message:" << message <<" with error:" << RdKafka::err2str(produce_error);
if (produce_error == RdKafka::ERR__QUEUE_FULL) {
PLOG(logERROR) <<"Message queue full...retrying...";
_producer->poll(500); /* ms */
retry = true;
}
else {
PLOG(logERROR) <<"Unhandled error in queue_kafka_message:" << RdKafka::err2str(produce_error);
retry = false;
}
}
}
}

void kafka_producer_worker::stop()
{
/* Wait for final messages to be delivered or fail.
Expand Down
27 changes: 25 additions & 2 deletions src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,43 @@ namespace tmx::utils
* @param topic_str topic producer should produce to.
* @param n_partition partition producer should be assigned to.
*/
kafka_producer_worker(const std::string &brokers, const std::string &topics, int n_partition = 0);
explicit kafka_producer_worker(const std::string &brokers, const std::string &topics, int n_partition = 0);
/**
* @brief Construct a new kafka producer worker object
*
* @param broker_str network address of kafka broker.
*/
explicit kafka_producer_worker(const std::string &brokers);
/**
* @brief Initialize kafka_producer_worker. This method must be called before send!
*
* @return true if successful.
* @return false if unsuccessful.
*/
virtual bool init();
/**
* @brief Initialize kafka topic.
* @return true if successful.
* @return false if unsuccessful.
*/
virtual bool init_topic();
/**
* @brief Initialize kafka producer.
* @return true if successful.
* @return false if unsuccessful.
*/
virtual bool init_producer();
/**
* @brief Produce to topic. Will result in segmentation fault if init() is not called on producer first.
*
* @param msg message to produce.
*/
virtual void send(const std::string &msg);
/**
* @brief Produce to a specific topic. Will result in segmentation fault if init() is not called on producer first.
* @param msg message to produce.
* @param topic_name topic to send the message to.
*/
virtual void send(const std::string& message, const std::string& topic_name ) const;
/**
* @brief Is kafka_producer_worker still running?
*
Expand Down
15 changes: 15 additions & 0 deletions src/tmx/TmxUtils/test/test_kafka_producer_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,18 @@ TEST(test_kafka_producer_worker, create_producer)
worker->send(msg);
worker->stop();
}

TEST(test_kafka_producer_worker, create_producer_no_topic)
{
std::string broker_str = "localhost:9092";
std::string topic = "test";
auto client = std::make_shared<tmx::utils::kafka_client>();
std::shared_ptr<tmx::utils::kafka_producer_worker> worker;
worker = client->create_producer(broker_str);
worker->init();
worker->printCurrConf();
std::string msg = "test message";
// // Run this unit test without launching kafka broker will throw connection refused error
worker->send(msg, topic);
worker->stop();
}
Loading

0 comments on commit 1c5a862

Please sign in to comment.