-
Notifications
You must be signed in to change notification settings - Fork 1
Pekko
Helenus integrates with Pekko through Pekko Connectors.
Include the library into you project definition:
libraryDependencies += "net.nmoncho" %% "helenus-pekko" % "1.0.0"
The integration with Pekko tries to be as seamless as possible. Marking a CassandraSession
implicit is the only thing required to use Helenus:
given system: ActorSystem = ActorSystem()
given CassandraSession = CassandraSessionRegistry.get(system).sessionFor(CassandraSessionSettings())
import system.dispatcher
If you are already using Pekko, this is probably already setup.
Cassandra queries, just like in Pekko, are mapped to Pekko Stream Source
s.
With Helenus we can leverage any defined RowMapper
to adapt a Source[Row]
to a Source[A]
.
import net.nmoncho.helenus.*
import net.nmoncho.helenus.api.RowMapper
import net.nmoncho.helenus.api.cql.Adapter
import net.nmoncho.helenus.pekko.*
case class Person(id: Int, name: String, city: String) derives RowMapper
val peopleQuery = "SELECT * FROM pekko_people WHERE id = ?".toCQLAsync
.prepare[Int].as[Person]
val helenusSource: Source[Person, NotUsed] = peopleQuery.asReadSource(1)
The integration usage is inteded as follows:
- Transform your query with
.toCQLAsync
. - Prepare the query with
.prepare
, providing the query parameter types. - Define a query result, which uses an implicit
RowMapper
. If this isn't provided,Row
will be used as query result type. - Transform your query into a source with
asReadSource
. This method follows the same principle asScalaPreparedStatement
. Use this method as a factory forSource
s.
This last method isn't the only integration point with Pekko. CassandraSource
can also be used with Helenus:
import _root_.org.apache.pekko.stream.connectors.cassandra.scaladsl.CassandraSource
val pekkoSource: Source[Person, NotUsed] =
CassandraSource.fromFuture(peopleQuery.map(query => query(1))).map(summon[RowMapper[Person]].apply)
We arrive at the same Source[Person, NotUsed]
. Executing these queries is
performed as usual:
Await.result(helenusSource.runWith(Sink.head), 5.seconds)
// res4: Person = Person(id = 1, name = "John", city = "Rome")
Await.result(pekkoSource.runWith(Sink.head), 5.seconds)
// res5: Person = Person(id = 1, name = "John", city = "Rome")
We can use the same facilities when inserting, or updating, to Cassandra.
Unlike Pekko, Helenus offers both Sink
and Flow
for this purpose.
given Adapter[Person, (Int, String, String)] = Adapter[Person]
val sink: Sink[Person, Future[Done]] =
"INSERT INTO pekko_people(id, name, city) VALUES (?, ?, ?)".toCQLAsync
.prepare[Int, String, String]
.from[Person]
.asWriteSink(CassandraWriteSettings.defaults)
val people: Source[Person, NotUsed] = Source(List(
Person(4, "Jane", "Amsterdam"),
Person(5, "Lisa", "Paris"),
Person(6, "Maria", "Madrid")
))
val insert = people.runWith(sink)
We can then query these data as shown above:
Await.result(
peopleQuery.asReadSource(5).runWith(Sink.head),
5.seconds
)
// res7: Person = Person(id = 5, name = "Lisa", city = "Paris")