There are many benefits to following clean and layered approaches to application architecture such as hexagonal arcthitecture. When application core is decoupled from transport details,the core functionality can be exposed in a manner to suit any business need, for e.g. as a REST endpoint or via asynchronous event listeners. It also means codebase that is testable by design.
In the Java world, SpringFramework makes this very easy through Spring Cloud Function project. The idea is our application can be thought of as functions that take an input and produce output. The framework then provides support to deploy these functions as a REST service or via event listeners in a runtime environment of choice - Serverless, Kubernetes or stand alone event listeners.
The sample application illustrates how Spring Cloud Function can be used to write business logic as java functions and deploy anywhere for e.g., to a Serverless platform or as a REST endpoints or asynchronous Kafka event listeners. It also shows how the solution can easility be extended to implement an event streaming architecture, finishing off with an http end point serving Server Sent Events for real time notifications to a Web application.
The sample application implements a very simple Claim Processing solution where Customers can submit claim requests that will then be processed according to the business rules resulting in customer getting paid or the request being rejected.
The application consists of three main components.
- ClaimsService
- ClaimsProcessor
- StreamProcessor
Overall architecture diagram:
ClaimRequest submitted to ClaimService -> produces ClaimCreatedEvent -> ClaimProcessor picks the ClaimCreatedEvent to go through fraud check process to produce PaymentEvent which is processed by payment processor for the final settlement.
The application exposes a http endpoint for users to submit their claim request. The service is implemented using Spring Cloud Function following hexagonal architecture principles.
At the core, the service is a function that accepts ClaimRequest
to produce a ClaimResponse
output. For simplicity,
ClaimRequest
has following attributes:
{
firstName: string,
lastName: string,
email: string,
claimAmount: number,
claimType: string
}
and ClaimResponse
consists of
{
status: {
code: string,
messate: string
},
correlationId: string
}
Service publishes event
on a Kafka topic upon receipt of customer claim request.
- Run kafka locally
- using Docker Compose
cd docker && docker-compose up
(ensure that docker daemon is running) It starts kafka with imagebitnami/kafka:latest
which is kafka without zookeeper.
- using Docker Compose
The application is a spring cloud function and can be started locally using the following command:
./gradlew bootRun
To test, issuing following POST request,
POST http://localhost:8080/submitClaimRequest
Content-Type: application/json
{
"firstName": "Joe",
"lastName": "Blogs",
"amount": 20,
"claimType": "CLAIM_A",
"email": "test@user.com"
}
will produce following response:
{
"status" : {
"code": "UNDER_REVIEW",
"message": "Application received"
},
"correlationId": "7d979021-5cc8-478d-8990-9d3d7c23b20a"
}
- Local Kubernetes
k3s
is lightweight distribution of Kubernetes perfect for resource constrained environments and a good choice for local
development environment. And k3d can be used to run k3s locally on Docker.
Also ensure kubectl is installed to connect to the cluster.
Create a local cluster with name dev
k3d cluster create dev
kubectl get nodes
to list the nodes in the cluster.
- Kafka
We can start with hosted kafka
solution, or deploy our own Kafka broker in the cluster. For a hosted solution
https://www.confluent.io/get-started/ is one option to get up and running quickly.
- build image using buildpack
./gradlew bootBuildImage
, which will build a local docker imageclaimservice:0.0.1-SNAPSHOT
. [!IMPORTANT] Ensure that the thin jar plugin related config is commented out frombuild.gradle.kts
to ensure that the image is built using self-contained fat jar - import the image into the cluster so that it is accessible to the deployments
k3d image import claimservice:0.0.1-SNAPSHOT -c {cluster-name}
cd k8s
kubectl create -f secrets.yaml
(for kafka credentials to connect to Confluent hosted broker)kubectl create -f deployment.yaml
kubectl create -f service-node-port.yaml
(to create a node port service for local testing)
The service can now be accessed at POST http://localhost:30000/submitClaimRequest
POST http://localhost:30000/submitClaimRequest
Content-Type: application/json
{
"firstName": "Joe",
"lastName": "Blogs",
"amount": 20,
"claimType": "",
"email": "test@user.com"
}
SpringCloudFunction provides adapters for various serverless platforms. For AWS Lambda, we need the spring cloud function adapter for aws and the thin jar plugin to produce the shaded jar for deployment.
implementation("org.springframework.cloud:spring-cloud-function-adapter-aws")
implementation("org.springframework.boot.experimental:spring-boot-thin-gradle-plugin:1.0.29.RELEASE")
and the following declarations in the build.gradle.kts
tasks.assemble{
dependsOn("shadowJar")
}
tasks.withType<ShadowJar> {
archiveClassifier.set("aws")
dependencies {
exclude("org.springframework.cloud:spring-cloud-function-web")
}
// Required for Spring
mergeServiceFiles()
append("META-INF/spring.handlers")
append("META-INF/spring.schemas")
append("META-INF/spring.tooling")
transform(PropertiesFileTransformer::class.java) {
paths.add("META-INF/spring.factories")
mergeStrategy = "append"
}
}
tasks.withType<Jar> {
manifest {
attributes["Start-Class"] = "com.arun.claimservice.ClaimserviceApplication"
}
}
We also need a request handler to process incoming requests and Spring provides a generic request handler for this:
org.springframework.cloud.function.adapter.aws.FunctionInvoker::handleRequest
Following environment variables need to be defined for this function example - spring_profiles_active, KAFKA_USR, KAFKA_PASSWORD
Create an AWS EKS cluster by following instructions ...
Claims Processor consists of functions that pick up the submitted claims for fraud check and then finalise the payment.
Following functions carry out fraud checks and processing of the payments. These functions are invoked by kafka message
listeners defined in KafkaConfig
.
fun performFraudCheck(fraudCheckService: FraudCheckService): (ClaimCreatedEvent) -> FraudCheckResult {
return {
fraudCheckService.performFraudCheck(it)
}
}
fun performPayment(paymentService: PaymentService): (FraudCheckResult) -> PaymentResult {
return {
paymentService.performPayment(it)
}
}
In the first part, implemented a stream that listens to the CLAIMS
topic for new claim requests, aggregates them by claim type and publishes
claim count by type on a new topic. Secondly implemented a REST endpoint via spring cloud function to serve events as they are published
on this new topic using reactor Sink
.
Stream topology is implemented in
@Bean
fun kStream(builder: StreamsBuilder) {
claimsStream(builder)
}
The stream topology above calculates number of claims by type in every five-minute window.
We can expose a REST api to serve the claim count as processed by the stream. For this we need to first define a sink for
the topic event-count
where the claim count events are being published.
First define a sink
@Bean
fun claimCountSink(): Sinks.Many<ClaimCount> {
val replaySink = Sinks.many().multicast().onBackpressureBuffer<ClaimCount>()
return replaySink
}
And then use this sink for the topic event-topic
where claim count events are published by the stream.
@KafkaListener(
topics = ["event-count"], groupId = "CONSUMER_ONE",
containerFactory = "claimCountKafkaListenerContainerFactory"
)
class ClaimCountListener(private val sink: Sinks.Many<ClaimCount>) {
private val logger = LoggerFactory.getLogger(ClaimCountListener::class.java)
@KafkaHandler
fun processMessage(claimCount: ClaimCount) {
logger.info("Received claim count $claimCount")
sink.emitNext(claimCount, { s,e -> true})
}
}
Following function exposes retrieveClaimCount
endpoint which will publish claim count events as server sent events.
@Bean
fun retrieveClaimCount(claimCountSinks: Sinks.Many<ClaimCount>): () -> Flux<ClaimCount> {
return retrieveClaimCountFunction(claimCountSinks)
}
The dashboard uses EventSource
to streaming event counts from the API to the browser which is then displayed in a
line chart.
import {useEffect} from "react";
const eventSource = new EventSource(`http://localhost:8081/retrieveClaimCount`);
...
eventSource.onmessage = m => {
...
// display events in the chart
}
The react app claims-ui
can be started by running npm run dev
and navigate to http://localhost:3000/claimCount
The function taking KStream as input is not getting invoked, but the others with bytearray inputs are invoked and can also be chained together to feed output from one as input to the other.