This repository has been archived by the owner on Jun 1, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
Examples
Martin Krasser edited this page May 8, 2017
·
6 revisions
import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, ProducerSettings}
import org.apache.kafka.common.serialization.{Deserializer, Serializer, StringDeserializer, StringSerializer}
// Example event class used in all examples
case class ExampleEvent(eventId: String, aggregateId: String, payload: Any)
// Example request and response types
trait ExampleRequest { def aggregateId: String }
trait ExampleResponse
// Kafka serializer/deserializer for ExampleEvent
class EventSerialization extends Serializer[ExampleEvent] with Deserializer[ExampleEvent] {
// ...
}
// Actor system needed for Reactive Kafka
implicit val system: ActorSystem = ...
// Reactive Kafka producer settings
val producerSettings: ProducerSettings[String, ExampleEvent] =
ProducerSettings(system, new StringSerializer, new EventSerialization)
.withBootstrapServers("localhost:6001")
// Reactive Kafka consumer settings
val consumerSettings: ConsumerSettings[String, ExampleEvent] =
ConsumerSettings(system, new StringDeserializer, new EventSerialization)
.withBootstrapServers("localhost:6001")
.withGroupId("example")
import akka.kafka.Subscriptions
import akka.kafka.scaladsl.Consumer
import akka.stream.scaladsl.Source
// event source for given topic
def eventSource(topic: String): Source[ExampleEvent, Consumer.Control] = {
Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
.map(_.value)
}
val exampleEventSource: Source[ExampleEvent, Consumer.Control] =
eventSource("example")
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.{Flow, Sink}
import org.apache.kafka.clients.producer.ProducerRecord
case class ExampleEvent(eventId: String, aggregateId: String, payload: Any)
def eventSink(topic: String): Sink[ExampleEvent, NotUsed] = {
Flow[ExampleEvent]
.map(e => new ProducerRecord(topic, e.aggregateId, e))
.to(Producer.plainSink(producerSettings))
}
val exampleEventSink: Sink[ExampleEvent, NotUsed] =
eventSink("example")
import akka.kafka.scaladsl.Consumer
import akka.stream.scaladsl.Flow
import com.rbmhtechnology.calliope.KafkaArchive
import com.rbmhtechnology.calliope.KafkaEventLog
val exampleArchive: KafkaArchive[String, ExampleEvent] = ...
val exampleLog: Flow[ExampleEvent, ExampleEvent, Consumer.Control] =
KafkaEventLog.topic(producerSettings, consumerSettings, topic = "example", exampleArchive)
import akka.stream.scaladsl.BidiFlow
import com.rbmhtechnology.calliope.{EventSourcing, EventSourcingLogic}
// example event sourcing logic (sequentially executed like actors)
def exampleLogic[S] = new EventSourcingLogic[ExampleEvent, ExampleRequest, ExampleResponse] {
private var writeModel: S = _
override def onRequest(r: ExampleRequest): (Seq[ExampleEvent], () => ExampleResponse) = {
// validate request and return derived events plus response factory
}
override def onEvent(e: ExampleEvent): Unit = {
// apply event to current state
}
}
// create event sourcing stage (bidi flow) from example logic
def exampleStage(logic: EventSourcingLogic[ExampleEvent, ExampleRequest, ExampleResponse])
: BidiFlow[ExampleRequest, ExampleEvent, ExampleEvent, ExampleResponse, NotUsed] =
EventSourcing(logic)
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.kafka.scaladsl.Consumer
import akka.stream.scaladsl.{BidiFlow, Flow, RunnableGraph}
val eventLog: Flow[ExampleEvent, ExampleEvent, Consumer.Control] = ...
val eventSourcingStage: BidiFlow[ExampleRequest, ExampleEvent, ExampleEvent, ExampleResponse, NotUsed] = ...
val codec: BidiFlow[HttpRequest, ExampleRequest, ExampleResponse, HttpResponse, NotUsed] = ...
val http: Flow[HttpResponse, HttpRequest, NotUsed] = ...
// wire stages to an runnable application
val application: RunnableGraph[NotUsed] =
http.join(codec.atop(eventSourcingStage).join(eventLog))
// run application
application.run()
import akka.stream.scaladsl.{Flow, RunnableGraph}
import com.rbmhtechnology.calliope.{EventSourcingLogic, KafkaEventHub, KafkaEvents, KafkaIndex}
def eventSourcingLogic(aggregateId: String)
: EventSourcingLogic[ExampleEvent, ExampleRequest, ExampleResponse] = ...
val eventIndex: KafkaIndex[String, ExampleEvent] = ...
val eventHub: KafkaEventHub[String, ExampleEvent] =
KafkaEvents.hub(consumerSettings, producerSettings, topic = "example", eventIndex)
val commandHub: Flow[ExampleRequest, ExampleResponse, NotUsed] =
eventHub.requestProcessorN(eventSourcingLogic)
val application: RunnableGraph[NotUsed] =
http.join(codec.join(commandHub))
application.run()