-
Notifications
You must be signed in to change notification settings - Fork 67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CARMA Streets Plugin Kafka Consumers #547
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove the random hash string appending functionality from the consumer group id.
@@ -134,7 +141,7 @@ namespace tmx::utils | |||
switch (message->err()) | |||
{ | |||
case RdKafka::ERR__TIMED_OUT: | |||
FILE_LOG(logWARNING) << _consumer->name() << " consume failed: " << message->errstr() << std::endl; | |||
FILE_LOG(logDEBUG4) << _consumer->name() << " consume failed: " << message->errstr() << std::endl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about setting this back to log WARNING and just setting the timeout value large to something that indicates an error like 60s or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will still print the log constantly when we are in between testing and no data is consumed during the time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think we have any kafka messages that have intervals even close to 60s so it should never print unless we are not receiving kafka messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, It could happen as we stop sending BSM/MP/MOM when the vehicle disengage and before the next round of testing. The while loop in the message consumption will keep checking if there are data from those topics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this unique to consumers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so, consumers needs to continuously poll Kafka for data with the while loop. Every poll request, the consumer is telling the Kafka broker that it is alive.
{ | ||
init_producer(); | ||
} | ||
|
||
bool kafka_producer_worker::init() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of having two methods here init and init producer and one calls the other. Is it just to reduce method complexity? Also why are we calling the init_producer in the constructor? Seems like both init producer and init_topic are called by init which should be called either way after the constructor. Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is to support creating producer without a topic, we only want to init the producer only. But the original init() func has both init_producer and init_topic portion. So I split them into small chunk which will allow the orignal init() works as it, and allows for reuse of init_producer in support of creating a producer without a topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be honest I would prefer you create a super class of kafka producer that is kafka_topicless_producer or the other way around. Kafka Topic Producer inherits from Kafka producer. I think this is weird because it is unclear which init methods need to be called for each or am I missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can table that as a future improvement since the kafka_client class abstracts this but could you at least explain how a producer with a topic would be created vs a producer without a topic to me. Is the producer with a topic required to initialize the topic while the other producer only initializes the producer? Is that correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See the latest code. I removed the init_producer() from constructor and allow the plugin to call it and give the plugin a bit more control like the consumers calling init().
PR Details
Description
Related Issue
#543
#538
Motivation and Context
NA
How Has This Been Tested?
Local integration testing
Types of changes
Checklist:
V2XHUB Contributing Guide