Kafka Streams is one of the ways streaming (flowing events) can be worked upon right away to get insights. Some of the operating that are possible:
- Filtering
- Grouping
- Aggregating
- Joining
Kafka Streams is a Java built API. Both Streams & KSQL DB can achieve the same operation - the fundamental difference is that KSQL is an additonal Clustered Infrastructure provisioning where the SQL like queries will run against the topic and perform the desired operation. Whilst, Streams is a Code level deployment (plugged in with Kafka-Stream Libraries).
I've taken a simple movie data streaming as the use case. Movies data are posted by the user and it flows as per the below architecture. (Just an imaginary use case to potentially explore on the features of Kafka Streams)
Movie data will be POSTED with Rest-API, that are inserted into movies-dump Topic - producer-api service will be responsible for this action.
This topic will be streamed & filtered by stream-filter service. This service will filter the movies based on the IMDB rating ( good movie if rating >=7; else bad movie)
Finally, the stream-consumer service will print the messages in both good-movies & bad-movies topic. One I have used Standard Kafka Consumer Client library, while for another I have again used KStreams to stream the data. This is just to have a variation
The stream-aggregator service does aggregate the data from movies-dump topic - based on the director name, and aggregate operation is count(), as well streams this data back to director-counnt topic.
This data is being queried back from stream-aggregator service with a REST endpoint, where the director name is searched (as KEY) and the response is count of movies.
For each of the service, I have attached reference properties file (sample). I have used Confluent Cloud's infrastructure for this demo, so might be slightly advanced (in terms of security properties).
- During Stream Queries - i have observed that the below Error
because the stream thread is STARTING, not RUNNING
, and found that the it takes few more seconds for the stream to start properly. So, this is something that needs handling - like a State Check (health check)- To Counter this during every stream query, added this
if ( streams.state().toString().equalsIgnoreCase("RUNNING") )
validation to ensure an active state-stream is queried.
- To Counter this during every stream query, added this
- Also, during the streamed query many factors needed to be considered, like
- Querying local state stores
- discover and query other instances of the same app (ex: containers / ec2 instances ); because in multi-instance mode one instance is assigned a partition
The data-dump i have taken contains around 47K movies data. for quick testing I used the JQ filtering to filter out based on year & IMDB Rating.
jq '.[] | select(.imdbRating>6.8 and .imdbRating < 7.1 and .year > 2000 and .year < 2003)' movies_dump.json > striped_data.json