Skip to content

Commit

Permalink
CDAR-759: Provision Kafka topics for SDSM and Detections (#314)
Browse files Browse the repository at this point in the history
<!-- Thanks for the contribution, this is awesome. -->

# PR Details
## Description
This PR provisions Kafka topics for detection data and for SDSMs. This
helps prevent kafka consumer initialization from failing when attempting
to consume from topics that do not exist yet.
Additionally despite kafka depedency , sensor_data_sharing_service will
start prior to creation of topics or health of kafka consumers causing
confusing critical logs about connection timeouts and also outright
consumer subscription failure when kafka volumes are removed. This PR
also adds a healthcheck to check the creation of the last topic, which
confirms all topics are created and the kafka server is healthy
Example of confusing connection timeout logs :
```
critical] [kafka_consumer_worker.h:98] /home/carma-streets/kafka_clients/include/kafka_consumer_worker.h : Line 98. LOG:  3  FAIL [thrd:172.4.0.3:9092/bootstrap]: 172.4.0.3:9092/bootstrap: Connect to ipv4#172.4.0.3:9092 failed: Connection refused (after 0ms in state CONNECT)
[2024-02-02 11:58:49.467] [debug] [sensor_data_sharing_service.cpp:91] Attempting to consume detections ...
[2024-02-02 11:58:49.467] [info] [sensor_data_sharing_service.cpp:118] Starting SDSM Producer!
[2024-02-02 11:58:49.467] [info] [kafka_consumer_worker.cpp:114] rdkafka#consumer-1 Successfully to subscribe to   1 topics: Success 
[2024-02-02 11:58:49.467] [critical] [kafka_consumer_worker.h:90] /home/carma-streets/kafka_clients/include/kafka_consumer_worker.h : Line 90. ERROR:  Local: Broker transport failure  172.4.0.3:9092/bootstrap: Connect to ipv4#172.4.0.3:9092 failed: Connection refused (after 144ms in state CONNECT)
[2024-02-02 11:58:49.467] [info] [kafka_consumer_worker.cpp:114] rdkafka#consumer-3 Successfully to subscribe to   1 topics: Success 
[2024-02-02 11:58:49.467] [critical] [kafka_consumer_worker.h:90] /home/carma-streets/kafka_clients/include/kafka_consumer_worker.h : Line 90. ERROR:  Local: All broker connections are down  1/1 brokers are down
[2024-02-02 11:58:49.467] [critical] [kafka_consumer_worker.h:90] /home/carma-streets/kafka_clients/include/kafka_consumer_worker.h : Line 90. ERROR:  Local: Broker transport failure  172.4.0.3:9092/bootstrap: Connect to ipv4#172.4.0.3:9092 failed: Connection refused (after 0ms in state CONNECT)
[2024-02-02 11:58:49.467] [critical] [kafka_consumer_worker.h:90] /home/carma-streets/kafka_clients/include/kafka_consumer_worker.h : Line 90. ERROR:  Local: Broker transport failure  172.4.0.3:9092/bootstrap: Connect to ipv4#172.4.0.3:9092 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
[2024-02-02 11:58:49.467] [critical] [kafka_consumer_worker.h:90] /home/carma-streets/kafka_clients/include/kafka_consumer_worker.h : Line 90. ERROR:  Local: All broker connections are down  1/1 brokers are down
[2024-02-02 11:59:35.529] [info] [kafka_consumer_worker.h:42] RebalanceCb: Local: Assign partitions 
[2024-02-02 11:59:35.529] [info] [kafka_consumer_worker.h:36] Topic time_sync , Partition 0
[2024-02-02 11:59:35.529] [info] [kafka_consumer_worker.h:42] RebalanceCb: Local: Assign partitions 
```
<!--- Describe your changes in detail -->

## Related Issue
[CDAR-759
](https://usdot-carma.atlassian.net/browse/CDAR-759)<!--- This project
only accepts pull requests related to open issues -->
<!--- If suggesting a new feature or change, please discuss it in an
issue first -->
<!--- If fixing a bug, there should be an issue describing it with steps
to reproduce -->
<!--- Please link to the issue here: -->

## Motivation and Context
When clear Kafka and Zookeeper volumes, existing topics are also
deleted. Since the current CARMA Config does not provision a topic for
detections this causes the initialization of the Kafka consumer to fail
when attempting to subscribe to a not existing topic. The topic
eventually gets created by the detection producer in the CARMA Streets
Plugin in V2X-Hub.
Failed consumer subscribe is indicated by following logs :
```
[2024-02-06 00:18:13.730] [info] [sensor_data_sharing_service.cpp:118] Starting SDSM Producer!
[2024-02-06 00:18:13.730] [info] [kafka_consumer_worker.cpp:114] rdkafka#consumer-3 Successfully to subscribe to   1 topics: Success 
[2024-02-06 00:18:13.730] [info] [kafka_consumer_worker.cpp:114] rdkafka#consumer-1 Successfully to subscribe to   1 topics: Success 
[2024-02-06 00:18:13.734] [critical] [kafka_consumer_worker.cpp:161] rdkafka#consumer-1 Consume failed:  Subscribed topic not available: time_sync: Broker: Unknown topic or partition 
[2024-02-06 00:18:13.740] [critical] [kafka_consumer_worker.cpp:161] rdkafka#consumer-3 Consume failed:  Subscribed topic not available: v2xhub_sim_sensor_detected_object: Broker: Unknown topic or partition 
[2024-02-06 00:18:18.757] [error] [sensor_data_sharing_service.cpp:110] Something went wrong, no longer consuming detections.
```
<!--- Why is this change required? What problem does it solve? -->

## How Has This Been Tested?
CDASim Deployment 
<!--- Please describe in detail how you tested your changes. -->
<!--- Include details of your testing environment, and the tests you ran
to -->
<!--- see how your change affects other areas of the code, etc. -->

## Types of changes

<!--- What types of changes does your code introduce? Put an `x` in all
the boxes that apply: -->

- [x] Defect fix (non-breaking change that fixes an issue)
- [ ] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that cause existing functionality
to change)

## Checklist:

<!--- Go over all the following points, and put an `x` in all the boxes
that apply. -->
<!--- If you're unsure about any of these, don't hesitate to ask. We're
here to help! -->

- [ ] I have added any new packages to the sonar-scanner.properties file
- [ ] My change requires a change to the documentation.
- [ ] I have updated the documentation accordingly.
- [x] I have read the **CONTRIBUTING** document.
[CARMA Contributing
Guide](https://github.com/usdot-fhwa-stol/carma-platform/blob/develop/Contributing.md)
- [ ] I have added tests to cover my changes.
- [ ] All new and existing tests passed.
  • Loading branch information
paulbourelly999 authored Feb 6, 2024
1 parent 9df5f43 commit 4d2471c
Showing 1 changed file with 1 addition and 1 deletion.
2 changes: 1 addition & 1 deletion ail_vru_uc1_scenario/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ services:
depends_on:
- zookeeper
network_mode: service:zookeeper
# Health check to confirm kafka server is healthing (script is a client) and all topics
# Health check to confirm kafka server is healthy (script is a client) and all topics
# have been created (time_sync is last topic).
healthcheck:
test: ["CMD", "kafka-topics.sh", "--describe", "--bootstrap-server", "172.4.0.3:9092", "--topic", "time_sync"]
Expand Down

0 comments on commit 4d2471c

Please sign in to comment.