The main motivation of this project is to showcase how to integrate Kafka vanilla client in Spring Boot application.
Some ideas you might find in the project :
- Dynamic loading of Producer/Consumer properties
- Producer/Consumer metrics collection via Micrometer/Prometheus
- Support multiple consumer threads
- Deserialization & Processing Error Handling (has various strategies including dead letter queue)
- Using Avro Generated classes
To start the environment simply run the following command
docker-compose up -d
This would start a local Kafka cluster (single node) and UI (Confluent Control Center).
Once started you can run the application by running
mvn spring-boot:run
Once started you can access swagger to publish / consume messages.
You can also open the Confluent Control Center to explore the content of topics.
To stop the environment simply run the following command
docker-compose down -v
To inspect the state of the consumer group you can run the following command
docker-compose exec broker kafka-consumer-groups --bootstrap-server broker:9092 --group kafka-vanilla-spring-boot-demo --describe
This will show you if the application is running, what the current consumer offset position and potentially the lag.
WARNING: Before you running this command you need to make sure that the application is stopped.
If you need to reset offsets to beginning you can run the following command
docker-compose exec broker kafka-consumer-groups --bootstrap-server broker:9092 --group kafka-vanilla-spring-boot-demo --reset-offsets --all-topics --to-earliest --execute
Metrics are collected via micrometer. You can choose the backend but this project showcase the prometheus backend.
Metrics are available at http://localhost:8080/actuator/prometheus
We might add a sample Producer/Consumer dashboard in the future.
Most of the configuration is done via traditional application.yml file.
kafka:
properties:
bootstrap.servers: "localhost:29092"
schema.registry.url: "http://localhost:8081"
specific.avro.reader: "true"
producer:
key.serializer: "org.apache.kafka.common.serialization.StringSerializer"
value.serializer: "io.confluent.kafka.serializers.KafkaAvroSerializer"
consumer:
key.deserializer: "org.apache.kafka.common.serialization.StringDeserializer"
value.deserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer"
group.id: "kafka-vanilla-spring-boot-demo"
exceptionHandler: "LogAndFail"
nbConsumerThreads: 1
Basically you have global properties, producer and consumer specific properties. Every property configured as global (like schema registry here) will be injected in all producers/consumers configuration.
The application accept dynamic property so you can use any of the following properties:
- Producer Config
- Consumer Config
- Schema Registry Config
- etc...
By-default the application is configured to use the spring boot application name as an client.id. This will ease monitoring if we have multiple instance of our application. If needed this can be overridden by specifying the client.id property on the producer or consumer config.
The application provides dead letter queue on deserialization / processing error.
The code provide multiple implementation:
By default, the LogAndFail implementation is used. IT will encourage projects to think about error handling and picking the relevant strategy for their context.
This behavior can be configured via kafka.exceptionHandler
attribute your application.yml file.
kafka:
exceptionHandler: "LogAndContinue"
This implementation will send deserialization and processing errors in the same topic.
Out of the box the topic is called <spring.application.name>-dlq
but this can be configured in your application.yml file.
kafka:
dlqName: "my-dlq"
This implementation will preserve the original :
- headers
- key (as byte[] as we potentially didn't succeed to deserialize it)
- value the (as byte[] as we potentially didn't succeed to deserialize it)
- timestamp
In addition to this it will add some useful headers :
dlq.error.app.name
containing your spring boot application name.dlq.error.timestamp
containing the timestamp of the errordlq.error.topic
containing the source topicdlq.error.partition
containing the source partitiondlq.error.offset
containing the source offsetdlq.error.exception.class.name
containing the exception class namedlq.error.exception.message
containing the exception messagedlq.error.type
containing the error type (either DESERIALIZATION_ERROR or PROCESSING_ERROR)
Number of consumer thread is controlled by kafka. nbConsumerThreads
attribute your application.yml file.
kafka:
nbConsumerThreads: 1
To support multiple thread the class containing your code must :
- extend AbstractKafkaReader class
- be annotated with @Service and @Scope("prototype") (see https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-factory-scopes-prototype to get more details)
Behind the scene ConsumerAsyncConfiguration will create an executor service with the provided number of threads. In case of uncaught exception handler, the executor service is configured to stop the application.
Some important pointers in the code :
Have any idea to make showcase better ? Found a bug ? Do not hesitate to report us via github issues and/or create a pull request.
For further reference, please consider the following sections: