Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CARMA Streets Plugin Kafka Consumers #547

Merged
merged 15 commits into from
Jul 7, 2023
Merged

Conversation

dan-du-car
Copy link
Collaborator

@dan-du-car dan-du-car commented Jun 30, 2023

PR Details

Description

  • Remove the kafka consumer/producer initialization during config parameter update.
  • Replace consumer creation with v2xhub kafka_client library.
  • Add producer creation in kafka_client library

Related Issue

#543
#538

Motivation and Context

NA

How Has This Been Tested?

Local integration testing

Types of changes

  • Defect fix (non-breaking change that fixes an issue)
  • New feature (non-breaking change that adds functionality)
  • Breaking change (fix or feature that cause existing functionality to change)

Checklist:

  • I have added any new packages to the sonar-scanner.properties file
  • My change requires a change to the documentation.
  • I have updated the documentation accordingly.
  • I have read the CONTRIBUTING document.
    V2XHUB Contributing Guide
  • I have added tests to cover my changes.
  • All new and existing tests passed.

Copy link
Contributor

@paulbourelly999 paulbourelly999 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the random hash string appending functionality from the consumer group id.

@@ -134,7 +141,7 @@ namespace tmx::utils
switch (message->err())
{
case RdKafka::ERR__TIMED_OUT:
FILE_LOG(logWARNING) << _consumer->name() << " consume failed: " << message->errstr() << std::endl;
FILE_LOG(logDEBUG4) << _consumer->name() << " consume failed: " << message->errstr() << std::endl;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about setting this back to log WARNING and just setting the timeout value large to something that indicates an error like 60s or something.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will still print the log constantly when we are in between testing and no data is consumed during the time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think we have any kafka messages that have intervals even close to 60s so it should never print unless we are not receiving kafka messages

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, It could happen as we stop sending BSM/MP/MOM when the vehicle disengage and before the next round of testing. The while loop in the message consumption will keep checking if there are data from those topics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this unique to consumers.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, consumers needs to continuously poll Kafka for data with the while loop. Every poll request, the consumer is telling the Kafka broker that it is alive.

{
init_producer();
}

bool kafka_producer_worker::init()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of having two methods here init and init producer and one calls the other. Is it just to reduce method complexity? Also why are we calling the init_producer in the constructor? Seems like both init producer and init_topic are called by init which should be called either way after the constructor. Am I missing something?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is to support creating producer without a topic, we only want to init the producer only. But the original init() func has both init_producer and init_topic portion. So I split them into small chunk which will allow the orignal init() works as it, and allows for reuse of init_producer in support of creating a producer without a topic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest I would prefer you create a super class of kafka producer that is kafka_topicless_producer or the other way around. Kafka Topic Producer inherits from Kafka producer. I think this is weird because it is unclear which init methods need to be called for each or am I missing something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can table that as a future improvement since the kafka_client class abstracts this but could you at least explain how a producer with a topic would be created vs a producer without a topic to me. Is the producer with a topic required to initialize the topic while the other producer only initializes the producer? Is that correct?

Copy link
Collaborator Author

@dan-du-car dan-du-car Jul 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the latest code. I removed the init_producer() from constructor and allow the plugin to call it and give the plugin a bit more control like the consumers calling init().

src/v2i-hub/CARMAStreetsPlugin/src/CARMAStreetsPlugin.cpp Outdated Show resolved Hide resolved
@paulbourelly999 paulbourelly999 merged commit 1c5a862 into develop Jul 7, 2023
7 of 8 checks passed
@paulbourelly999 paulbourelly999 deleted the bugfix_consumer_delay branch July 7, 2023 14:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants