Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

Examples

Martin Krasser edited this page May 8, 2017 · 6 revisions

Common

  import akka.actor.ActorSystem
  import akka.kafka.{ConsumerSettings, ProducerSettings}
  import org.apache.kafka.common.serialization.{Deserializer, Serializer, StringDeserializer, StringSerializer}

  // Example event class used in all examples
  case class ExampleEvent(eventId: String, aggregateId: String, payload: Any)

  // Example request and response types
  trait ExampleRequest { def aggregateId: String }
  trait ExampleResponse

  // Kafka serializer/deserializer for ExampleEvent
  class EventSerialization extends Serializer[ExampleEvent] with Deserializer[ExampleEvent] {
    // ...
  }

  // Actor system needed for Reactive Kafka
  implicit val system: ActorSystem = ...

  // Reactive Kafka producer settings
  val producerSettings: ProducerSettings[String, ExampleEvent] =
    ProducerSettings(system, new StringSerializer, new EventSerialization)
      .withBootstrapServers("localhost:6001")

  // Reactive Kafka consumer settings
  val consumerSettings: ConsumerSettings[String, ExampleEvent] =
    ConsumerSettings(system, new StringDeserializer, new EventSerialization)
      .withBootstrapServers("localhost:6001")
      .withGroupId("example")

Example 1

  import akka.kafka.Subscriptions
  import akka.kafka.scaladsl.Consumer
  import akka.stream.scaladsl.Source

  // event source for given topic
  def eventSource(topic: String): Source[ExampleEvent, Consumer.Control] = {
    Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
      .map(_.value)                                                    
  }

  val exampleEventSource: Source[ExampleEvent, Consumer.Control] =
    eventSource("example")

Example 2

  import akka.kafka.scaladsl.Producer
  import akka.stream.scaladsl.{Flow, Sink}
  import org.apache.kafka.clients.producer.ProducerRecord

  case class ExampleEvent(eventId: String, aggregateId: String, payload: Any)

  def eventSink(topic: String): Sink[ExampleEvent, NotUsed] = {
    Flow[ExampleEvent]
      .map(e => new ProducerRecord(topic, e.aggregateId, e))
      .to(Producer.plainSink(producerSettings))             
  }

  val exampleEventSink: Sink[ExampleEvent, NotUsed] =
    eventSink("example")

Example 3

  import akka.kafka.scaladsl.Consumer
  import akka.stream.scaladsl.Flow
  import com.rbmhtechnology.calliope.KafkaArchive
  import com.rbmhtechnology.calliope.KafkaEventLog

  val exampleArchive: KafkaArchive[String, ExampleEvent] = ...

  val exampleLog: Flow[ExampleEvent, ExampleEvent, Consumer.Control] =
    KafkaEventLog.topic(producerSettings, consumerSettings, topic = "example", exampleArchive)

Example 4

  import akka.stream.scaladsl.BidiFlow
  import com.rbmhtechnology.calliope.{EventSourcing, EventSourcingLogic}

  // example event sourcing logic (sequentially executed like actors)
  def exampleLogic[S] = new EventSourcingLogic[ExampleEvent, ExampleRequest, ExampleResponse] {
    private var writeModel: S = _

    override def onRequest(r: ExampleRequest): (Seq[ExampleEvent], () => ExampleResponse) = {
      // validate request and return derived events plus response factory
    }

    override def onEvent(e: ExampleEvent): Unit = {
      // apply event to current state
    }
  }

  // create event sourcing stage (bidi flow) from example logic
  def exampleStage(logic: EventSourcingLogic[ExampleEvent, ExampleRequest, ExampleResponse])
      : BidiFlow[ExampleRequest, ExampleEvent, ExampleEvent, ExampleResponse, NotUsed] =
    EventSourcing(logic)

Example 5

  import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
  import akka.kafka.scaladsl.Consumer
  import akka.stream.scaladsl.{BidiFlow, Flow, RunnableGraph}

  val eventLog: Flow[ExampleEvent, ExampleEvent, Consumer.Control] = ...

  val eventSourcingStage: BidiFlow[ExampleRequest, ExampleEvent, ExampleEvent, ExampleResponse, NotUsed] = ...

  val codec: BidiFlow[HttpRequest, ExampleRequest, ExampleResponse, HttpResponse, NotUsed] = ...

  val http: Flow[HttpResponse, HttpRequest, NotUsed] = ...

  // wire stages to an runnable application
  val application: RunnableGraph[NotUsed] =
    http.join(codec.atop(eventSourcingStage).join(eventLog))

  // run application
  application.run()

Example 6

  import akka.stream.scaladsl.{Flow, RunnableGraph}
  import com.rbmhtechnology.calliope.{EventSourcingLogic, KafkaEventHub, KafkaEvents, KafkaIndex}

  def eventSourcingLogic(aggregateId: String)
    : EventSourcingLogic[ExampleEvent, ExampleRequest, ExampleResponse] = ...

  val eventIndex: KafkaIndex[String, ExampleEvent] = ...

  val eventHub: KafkaEventHub[String, ExampleEvent] =
    KafkaEvents.hub(consumerSettings, producerSettings, topic = "example", eventIndex)

  val commandHub: Flow[ExampleRequest, ExampleResponse, NotUsed] =
    eventHub.requestProcessorN(eventSourcingLogic)

  val application: RunnableGraph[NotUsed] =
    http.join(codec.join(commandHub))

  application.run()
Clone this wiki locally