Skip to content
Gustavo De Micheli edited this page Aug 30, 2024 · 2 revisions

Helenus integrates with Pekko through Pekko Connectors.

Installation

Include the library into you project definition:

libraryDependencies += "net.nmoncho" %% "helenus-pekko" % "1.0.0"

Setup

The integration with Pekko tries to be as seamless as possible. Marking a CassandraSession implicit is the only thing required to use Helenus:

given system: ActorSystem = ActorSystem()
given CassandraSession = CassandraSessionRegistry.get(system).sessionFor(CassandraSessionSettings())

import system.dispatcher

If you are already using Pekko, this is probably already setup.

Reading from Cassandra

Cassandra queries, just like in Pekko, are mapped to Pekko Stream Sources. With Helenus we can leverage any defined RowMapper to adapt a Source[Row] to a Source[A].

import net.nmoncho.helenus.*
import net.nmoncho.helenus.api.RowMapper
import net.nmoncho.helenus.api.cql.Adapter
import net.nmoncho.helenus.pekko.*

case class Person(id: Int, name: String, city: String) derives RowMapper

val peopleQuery = "SELECT * FROM pekko_people WHERE id = ?".toCQLAsync
        .prepare[Int].as[Person]

val helenusSource: Source[Person, NotUsed] = peopleQuery.asReadSource(1)

The integration usage is inteded as follows:

  • Transform your query with .toCQLAsync.
  • Prepare the query with .prepare, providing the query parameter types.
  • Define a query result, which uses an implicit RowMapper. If this isn't provided, Row will be used as query result type.
  • Transform your query into a source with asReadSource. This method follows the same principle as ScalaPreparedStatement. Use this method as a factory for Sources.

This last method isn't the only integration point with Pekko. CassandraSource can also be used with Helenus:

import _root_.org.apache.pekko.stream.connectors.cassandra.scaladsl.CassandraSource

val pekkoSource: Source[Person, NotUsed] =
    CassandraSource.fromFuture(peopleQuery.map(query => query(1))).map(summon[RowMapper[Person]].apply)

We arrive at the same Source[Person, NotUsed]. Executing these queries is performed as usual:

Await.result(helenusSource.runWith(Sink.head), 5.seconds)
// res4: Person = Person(id = 1, name = "John", city = "Rome")

Await.result(pekkoSource.runWith(Sink.head), 5.seconds)
// res5: Person = Person(id = 1, name = "John", city = "Rome")

Writing to Cassandra

We can use the same facilities when inserting, or updating, to Cassandra. Unlike Pekko, Helenus offers both Sink and Flow for this purpose.

given Adapter[Person, (Int, String, String)] = Adapter[Person]

val sink: Sink[Person, Future[Done]] =
    "INSERT INTO pekko_people(id, name, city) VALUES (?, ?, ?)".toCQLAsync
        .prepare[Int, String, String]
        .from[Person]
        .asWriteSink(CassandraWriteSettings.defaults)

val people: Source[Person, NotUsed] = Source(List(
  Person(4, "Jane", "Amsterdam"),
  Person(5, "Lisa", "Paris"),
  Person(6, "Maria", "Madrid")
))

val insert = people.runWith(sink)

We can then query these data as shown above:

Await.result(
    peopleQuery.asReadSource(5).runWith(Sink.head),
    5.seconds
)
// res7: Person = Person(id = 5, name = "Lisa", city = "Paris")
Clone this wiki locally