Skip to content

Commit

Permalink
add create producer
Browse files Browse the repository at this point in the history
  • Loading branch information
dan-du-car committed Jul 6, 2023
1 parent a8858de commit 5f45cf5
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 50 deletions.
14 changes: 14 additions & 0 deletions src/tmx/TmxUtils/src/kafka/kafka_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,19 @@ namespace tmx::utils
}
}

std::shared_ptr<kafka_producer_worker> kafka_client::create_producer(const std::string &bootstrap_server) const
{
try
{
auto producer_ptr = std::make_shared<kafka_producer_worker>(bootstrap_server);
return producer_ptr;
}
catch (const std::runtime_error &e)
{
FILE_LOG(logERROR) << "Create producer failure: " << e.what() << std::endl;
exit(1);
}
}


}
1 change: 1 addition & 0 deletions src/tmx/TmxUtils/src/kafka/kafka_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace tmx::utils
std::shared_ptr<kafka_consumer_worker> create_consumer(const std::string &broker_str, const std::string &topic_str,
const std::string &group_id_str) const;
std::shared_ptr<kafka_producer_worker> create_producer(const std::string &broker_str, const std::string &topic_str) const;
std::shared_ptr<kafka_producer_worker> create_producer(const std::string &bootstrap_server) const;
};

}
Expand Down
53 changes: 52 additions & 1 deletion src/tmx/TmxUtils/src/kafka/kafka_producer_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,27 @@ namespace tmx::utils
{
}

kafka_producer_worker::kafka_producer_worker(const std::string &brokers)
: _broker_str(brokers), _run(true)
{
init_producer();
}

bool kafka_producer_worker::init()
{
if(init_producer())
{
return init_topic();
}
return false;
}

bool kafka_producer_worker::init_producer()
{
std::string errstr = "";

// Create configuration objects
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

/***
* Set Configuration properties
Expand Down Expand Up @@ -75,7 +89,12 @@ namespace tmx::utils
delete conf;

FILE_LOG(logINFO) << "Created producer: " << _producer->name() << std::endl;
}

bool kafka_producer_worker::init_topic()
{
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
std::string errstr = "";
// Create topic handle
_topic = RdKafka::Topic::create(_producer, _topics_str, tconf, errstr);
if (!_topic)
Expand Down Expand Up @@ -156,6 +175,38 @@ namespace tmx::utils
_producer->poll(0);
}

void kafka_producer_worker::send(const std::string& message, const std::string& topic_name ) const
{
bool retry = true;
while (retry)
{
RdKafka::ErrorCode produce_error = _producer->produce(topic_name,
RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()),
message.size(),
nullptr, 0, 0, nullptr);

if (produce_error == RdKafka::ERR_NO_ERROR) {
PLOG(logDEBUG) <<"Queued message:" << message;
retry = false;
}
else
{
PLOG(logERROR) <<"Failed to queue message:" << message <<" with error:" << RdKafka::err2str(produce_error);
if (produce_error == RdKafka::ERR__QUEUE_FULL) {
PLOG(logERROR) <<"Message queue full...retrying...";
_producer->poll(500); /* ms */
retry = true;
}
else {
PLOG(logERROR) <<"Unhandled error in queue_kafka_message:" << RdKafka::err2str(produce_error);
retry = false;
}
}
}
}

void kafka_producer_worker::stop()
{
/* Wait for final messages to be delivered or fail.
Expand Down
25 changes: 24 additions & 1 deletion src/tmx/TmxUtils/src/kafka/kafka_producer_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,42 @@ namespace tmx::utils
* @param n_partition partition producer should be assigned to.
*/
kafka_producer_worker(const std::string &brokers, const std::string &topics, int n_partition = 0);
/**
* @brief Construct a new kafka producer worker object
*
* @param broker_str network address of kafka broker.
*/
kafka_producer_worker(const std::string &brokers);
/**
* @brief Initialize kafka_producer_worker. This method must be called before send!
*
* @return true if successful.
* @return false if unsuccessful.
*/
virtual bool init();
/**
* @brief Initialize kafka topic.
* @return true if successful.
* @return false if unsuccessful.
*/
virtual bool init_topic();
/**
* @brief Initialize kafka producer.
* @return true if successful.
* @return false if unsuccessful.
*/
virtual bool init_producer();
/**
* @brief Produce to topic. Will result in segmentation fault if init() is not called on producer first.
*
* @param msg message to produce.
*/
virtual void send(const std::string &msg);
/**
* @brief Produce to a specific topic. Will result in segmentation fault if init() is not called on producer first.
* @param msg message to produce.
* @param topic_name topic to send the message to.
*/
virtual void send(const std::string& message, const std::string& topic_name ) const;
/**
* @brief Is kafka_producer_worker still running?
*
Expand Down
50 changes: 4 additions & 46 deletions src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,26 +77,12 @@ void CARMAStreetsPlugin::InitKafkaConsumerProducers()
std::string kafkaConnectString = _kafkaBrokerIp + ':' + _kafkaBrokerPort;
std::string error_string;
kafkaConnectString = _kafkaBrokerIp + ':' + _kafkaBrokerPort;
kafka_client client;

//Producer
kafka_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
PLOG(logDEBUG) <<"Attempting to connect to " << kafkaConnectString;
if ((kafka_conf->set("bootstrap.servers", kafkaConnectString, error_string) != RdKafka::Conf::CONF_OK)) {
PLOG(logERROR) <<"Setting kafka config options failed with error:" << error_string << "\n" <<"Exiting with exit code 1";
exit(1);
} else {
PLOG(logDEBUG) <<"Kafka config options set successfully";
}

kafka_producer = RdKafka::Producer::create(kafka_conf, error_string);
if (!kafka_producer) {
PLOG(logERROR) <<"Creating kafka producer failed with error:" << error_string << "\n" <<"Exiting with exit code 1";
exit(1);
}
PLOG(logDEBUG) <<"Kafka producers created";
_kafka_producer_ptr = client.create_producer(kafkaConnectString);

//Consumers
kafka_client client;
auto uuid = boost::uuids::random_generator()();
std::stringstream ss;
ss << uuid;
Expand Down Expand Up @@ -135,8 +121,7 @@ void CARMAStreetsPlugin::HandleMobilityOperationMessage(tsm3Message &msg, routea
auto mobilityOperation = msg.get_j2735_data();
PLOG(logDEBUG) << "Body OperationParams : " << mobilityOperation->body.operationParams.buf << "\n"
<< "Body Strategy : " << mobilityOperation->body.strategy.buf<< "\n"
<<"Queueing kafka message:topic:" << _transmitMobilityOperationTopic << " "
<< kafka_producer->outq_len() <<"messages already in queue";
<<"Queueing kafka message:topic:" << _transmitMobilityOperationTopic;

std::stringstream strat;
std::stringstream payload;
Expand Down Expand Up @@ -472,34 +457,7 @@ void CARMAStreetsPlugin::HandleMapMessage(MapDataMessage &msg, routeable_message

void CARMAStreetsPlugin::produce_kafka_msg(const string& message, const string& topic_name) const
{
bool retry = true;
while (retry)
{
RdKafka::ErrorCode produce_error = kafka_producer->produce(topic_name,
RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()),
message.size(),
nullptr, 0, 0, nullptr);

if (produce_error == RdKafka::ERR_NO_ERROR) {
PLOG(logDEBUG) <<"Queued message:" << message;
retry = false;
}
else
{
PLOG(logERROR) <<"Failed to queue message:" << message <<" with error:" << RdKafka::err2str(produce_error);
if (produce_error == RdKafka::ERR__QUEUE_FULL) {
PLOG(logERROR) <<"Message queue full...retrying...";
kafka_producer->poll(500); /* ms */
retry = true;
}
else {
PLOG(logERROR) <<"Unhandled error in queue_kafka_message:" << RdKafka::err2str(produce_error);
retry = false;
}
}
}
_kafka_producer_ptr->send(message, topic_name);
}

void CARMAStreetsPlugin::OnStateChange(IvpPluginState state) {
Expand Down
3 changes: 1 addition & 2 deletions src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ class CARMAStreetsPlugin: public PluginClient {
std::string _transmitSRMTopic;
std::string _kafkaBrokerIp;
std::string _kafkaBrokerPort;
RdKafka::Conf *kafka_conf;
RdKafka::Producer *kafka_producer;
std::shared_ptr<kafka_producer_worker> _kafka_producer_ptr;
std::shared_ptr<kafka_consumer_worker> _spat_kafka_consumer_ptr;
std::shared_ptr<kafka_consumer_worker> _scheduing_plan_kafka_consumer_ptr;
std::shared_ptr<kafka_consumer_worker> _ssm_kafka_consumer_ptr;
Expand Down

0 comments on commit 5f45cf5

Please sign in to comment.