Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CARMA Streets Plugin Kafka Consumers #547
CARMA Streets Plugin Kafka Consumers #547
Changes from all commits
4052ee3
82ada6f
39f4d58
108c67d
ad23110
a8858de
5f45cf5
89f0d54
1409c16
70df5ec
0c1bddd
8280865
ecc4ada
d91ae9d
f06e0f5
File filter
Filter by extension
Conversations
Jump to
There are no files selected for viewing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about setting this back to log WARNING and just setting the timeout value large to something that indicates an error like 60s or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will still print the log constantly when we are in between testing and no data is consumed during the time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think we have any kafka messages that have intervals even close to 60s so it should never print unless we are not receiving kafka messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, It could happen as we stop sending BSM/MP/MOM when the vehicle disengage and before the next round of testing. The while loop in the message consumption will keep checking if there are data from those topics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this unique to consumers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so, consumers needs to continuously poll Kafka for data with the while loop. Every poll request, the consumer is telling the Kafka broker that it is alive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of having two methods here init and init producer and one calls the other. Is it just to reduce method complexity? Also why are we calling the init_producer in the constructor? Seems like both init producer and init_topic are called by init which should be called either way after the constructor. Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is to support creating producer without a topic, we only want to init the producer only. But the original init() func has both init_producer and init_topic portion. So I split them into small chunk which will allow the orignal init() works as it, and allows for reuse of init_producer in support of creating a producer without a topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be honest I would prefer you create a super class of kafka producer that is kafka_topicless_producer or the other way around. Kafka Topic Producer inherits from Kafka producer. I think this is weird because it is unclear which init methods need to be called for each or am I missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can table that as a future improvement since the kafka_client class abstracts this but could you at least explain how a producer with a topic would be created vs a producer without a topic to me. Is the producer with a topic required to initialize the topic while the other producer only initializes the producer? Is that correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See the latest code. I removed the init_producer() from constructor and allow the plugin to call it and give the plugin a bit more control like the consumers calling init().