-
Notifications
You must be signed in to change notification settings - Fork 1
Akka
Helenus integrates with Akka through Alpakka.
Note: We only provide integration against Scala 2.13, due to the Alpakka's latest Apache License release.
Include the library into you project definition:
libraryDependencies += "net.nmoncho" %% "helenus-akka" % "1.6.1"
Include the library into you project definition:
libraryDependencies += "net.nmoncho" %% "helenus-akka-busl" % "1.6.1"
The integration with Alpakka tries to be as seamless as possible. Marking a CassandraSession
implicit is the only thing required to use Helenus:
implicit val system: ActorSystem = ActorSystem()
val sessionSettings = CassandraSessionSettings()
import system.dispatcher
implicit val cassandraSession: CassandraSession =
CassandraSessionRegistry.get(system).sessionFor(sessionSettings)
If you are already using Alpakka, this is probably already setup.
Cassandra queries, just like in Alpakka,
are mapped to Akka Stream Source
s. With Helenus we can leverage any defined
RowMapper
to adapt a Source[Row]
to a Source[A]
.
import net.nmoncho.helenus._
import net.nmoncho.helenus.api.RowMapper
import net.nmoncho.helenus.api.cql.Adapter
import net.nmoncho.helenus.akka._
case class Person(id: Int, name: String, city: String)
implicit val rowMapper: RowMapper[Person] = RowMapper[Person]
val peopleQuery = "SELECT * FROM akka_people WHERE id = ?".toCQLAsync
.prepare[Int].as[Person]
val helenusSource: Source[Person, NotUsed] = peopleQuery.asReadSource(1)
The integration usage is inteded as follows:
- Transform your query with
.toCQLAsync
. - Prepare the query with
.prepare
, providing the query parameter types. - Define a query result, which uses an implicit
RowMapper
. If this isn't provided,Row
will be used as query result type. - Transform your query into a source with
asReadSource
. This method follows the same principle asScalaPreparedStatement
. Use this method as a factory forSource
s.
This last method isn't the only integration point with Alpakka.
CassandraSource
can also be used with Helenus:
import _root_.akka.stream.alpakka.cassandra.scaladsl.CassandraSource
val alpakkaSource: Source[Person, NotUsed] =
CassandraSource.fromFuture(peopleQuery.map(query => query(1))).map(rowMapper.apply)
We arrive at the same Source[Person, NotUsed]
. Executing these queries
is performed as usual:
Await.result(helenusSource.runWith(Sink.head), 5.seconds)
// res4: Person = Person(id = 1, name = "John", city = "Rome")
Await.result(alpakkaSource.runWith(Sink.head), 5.seconds)
// res5: Person = Person(id = 1, name = "John", city = "Rome")
We can use the same facilities when inserting, or updating, to Cassandra.
Unlike Alpakka, Helenus offers both Sink
and Flow
for this purpose.
implicit val personAdapter: Adapter[Person, (Int, String, String)] = Adapter[Person]
val sink: Sink[Person, Future[Done]] =
"INSERT INTO akka_people(id, name, city) VALUES (?, ?, ?)".toCQLAsync
.prepare[Int, String, String]
.from[Person]
.asWriteSink(CassandraWriteSettings.defaults)
val people: Source[Person, NotUsed] = Source(List(
Person(4, "Jane", "Amsterdam"),
Person(5, "Lisa", "Paris"),
Person(6, "Maria", "Madrid")
))
val insert = people.runWith(sink)
We can then query these data as shown above:
Await.result(
peopleQuery.asReadSource(5).runWith(Sink.head),
5.seconds
)
// res7: Person = Person(id = 5, name = "Lisa", city = "Paris")