Skip to content

vhoang55/kafka-streams-springboot

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 

Repository files navigation

A simple kafka-streams application to show how springboot can use to manage kafka-streams applicaiton. Typically kafka streams app are self-contained jar file that doesn't need spring. However, spring-kafka does provides a set of nice utilities to manage the lifecycle of the streams app.

We take advantage of springkafka to implement the smart life cycle so it can detect the app start/stop and gracefully shutdown the the app and manage the state of the stream applicaion. This is particularly useful in a spring manage application

for more information, look at the

org.springframework.kafka.config.StreamsBuilderFactoryBean.start()/stop()

and passing in the config properties

	@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
	public StreamsBuilderFactoryBean streamsBuilderFactoryBean(KafkaStreamsConfiguration configuration) {
		return new StreamsBuilderFactoryBean(configuration);
	}

	@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
	public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
		Map<String, Object> properties = new HashMap<>();
		properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVERS);
		properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
		properties.put(StreamsConfig.APPLICATION_ID_CONFIG, STREAMS_APP_NAME);
		return new KafkaStreamsConfiguration(properties);
	}

This sample uses a few concepts in Kafkastream high level DSL provided to take input from a topic and calculate the running average, total cost, and total items for each product category.

Image you are running a e-commerce site where the product purchase items are forward to a kafka topic, for each product category, you want to know how many product are purchase, what the total cost, and what is the average purchase price for each category.

to run the app, need to have the kafka running, create a topic called product.categories

run the main app.

KafkaStreamsSpringbootApplication

and to send make product purchases to the topic, run the producer code in SimpleProducer to generate record to the topic.

finally, to code to do the aggregation is in here. StreamsConfiguration

	@Bean
	public KTable<String, ProductCategoryAggregate> kStream(StreamsBuilder streamsBuilder) {
		KStream<String, Product> purchasedStreams =
				streamsBuilder.stream(SOURCE_TOPIC_NAME, Consumed.with(AppCustomSerdes.String(), AppCustomSerdes.product()));

		KGroupedStream<String, Product> productsByCatGroupedStream =
				purchasedStreams.groupBy((k, v) -> v.getCategory(),
						Serialized.with(AppCustomSerdes.String(),
								AppCustomSerdes.product()));

		KTable<String, ProductCategoryAggregate> aggregateKTable = productsByCatGroupedStream.aggregate(
				//Initializer
				() -> new ProductCategoryAggregate()
						.withProductCount(0)
						.withTotalPrice(0D)
						.withAvgPrice(0D),
				//Aggregator
				(k, v, aggV) -> new ProductCategoryAggregate()
						.withProductCount(aggV.getProductCount() + 1)
						.withTotalPrice(aggV.getTotalPrice() + v.getPrice())
						.withAvgPrice((aggV.getTotalPrice() + v.getPrice()) / (aggV.getProductCount() + 1D)),
				//Serializer
				Materialized.<String, ProductCategoryAggregate, KeyValueStore<Bytes, byte[]>>as("product.category.agg.store")
						.withKeySerde(AppCustomSerdes.String())
						.withValueSerde(AppCustomSerdes.productCategoryAggregate())
		);

		aggregateKTable.toStream().foreach(
				(k, v) -> System.out.println("Key = " + k + " Value = " + v.toString()));

		return aggregateKTable;
	}

About

springboot to manage kafka stream lifecycle

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages