Skip to content
Gustavo De Micheli edited this page Aug 30, 2024 · 2 revisions

Helenus integrates with Akka through Alpakka.

Note: We only provide integration against Scala 2.13, due to the Alpakka's latest Apache License release.

Installation - Apache 2 License

Include the library into you project definition:

libraryDependencies += "net.nmoncho" %% "helenus-akka" % "1.0.0"

Installation - Business Source License

Include the library into you project definition:

libraryDependencies += "net.nmoncho" %% "helenus-akka-busl" % "1.0.0"

Setup

The integration with Alpakka tries to be as seamless as possible. Marking a CassandraSession implicit is the only thing required to use Helenus:

given system: ActorSystem = ActorSystem()
given CassandraSession = CassandraSessionRegistry.get(system).sessionFor(CassandraSessionSettings())

import system.dispatcher

If you are already using Alpakka, this is probably already setup.

Reading from Cassandra

Cassandra queries, just like in Alpakka, are mapped to Akka Stream Sources. With Helenus we can leverage any defined RowMapper to adapt a Source[Row] to a Source[A].

import net.nmoncho.helenus.*
import net.nmoncho.helenus.api.RowMapper
import net.nmoncho.helenus.api.cql.Adapter
import net.nmoncho.helenus.akka.*

case class Person(id: Int, name: String, city: String) derives RowMapper

val peopleQuery = "SELECT * FROM akka_people WHERE id = ?".toCQLAsync
        .prepare[Int].as[Person]

val helenusSource: Source[Person, NotUsed] = peopleQuery.asReadSource(1)

The integration usage is inteded as follows:

  • Transform your query with .toCQLAsync.
  • Prepare the query with .prepare, providing the query parameter types.
  • Define a query result, which uses an implicit RowMapper. If this isn't provided, Row will be used as query result type.
  • Transform your query into a source with asReadSource. This method follows the same principle as ScalaPreparedStatement. Use this method as a factory for Sources.

This last method isn't the only integration point with Alpakka. CassandraSource can also be used with Helenus:

import _root_.akka.stream.alpakka.cassandra.scaladsl.CassandraSource

val alpakkaSource: Source[Person, NotUsed] =
    CassandraSource.fromFuture(peopleQuery.map(query => query(1))).map(summon[RowMapper[Person]].apply)

We arrive at the same Source[Person, NotUsed]. Executing these queries is performed as usual:

Await.result(helenusSource.runWith(Sink.head), 5.seconds)
// res4: Person = Person(id = 1, name = "John", city = "Rome")

Await.result(alpakkaSource.runWith(Sink.head), 5.seconds)
// res5: Person = Person(id = 1, name = "John", city = "Rome")

Writing to Cassandra

We can use the same facilities when inserting, or updating, to Cassandra. Unlike Alpakka, Helenus offers both Sink and Flow for this purpose.

given Adapter[Person, (Int, String, String)] = Adapter[Person]

val sink: Sink[Person, Future[Done]] =
    "INSERT INTO akka_people(id, name, city) VALUES (?, ?, ?)".toCQLAsync
        .prepare[Int, String, String]
        .from[Person]
        .asWriteSink(CassandraWriteSettings.defaults)

val people: Source[Person, NotUsed] = Source(List(
  Person(4, "Jane", "Amsterdam"),
  Person(5, "Lisa", "Paris"),
  Person(6, "Maria", "Madrid")
))

val insert = people.runWith(sink)

We can then query these data as shown above:

Await.result(
    peopleQuery.asReadSource(5).runWith(Sink.head),
    5.seconds
)
// res7: Person = Person(id = 5, name = "Lisa", city = "Paris")