Skip to content

Commit

Permalink
Merge pull request #774 from akka/wip-eventsByPersistenceId-offset-pa…
Browse files Browse the repository at this point in the history
…triknw

Change offset type in eventsByPersistenceId, #773
  • Loading branch information
patriknw authored Apr 20, 2020
2 parents bc58942 + 32ea05b commit 23844fa
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 54 deletions.
23 changes: 15 additions & 8 deletions core/src/main/scala/akka/persistence/cassandra/Extractors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,21 @@ package akka.persistence.cassandra
import com.datastax.oss.driver.api.core.cql.Row
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import akka.persistence.PersistentRepr
import akka.persistence.cassandra.journal._
import akka.persistence.cassandra.journal.CassandraJournal._
import akka.annotation.InternalApi
import java.{ util => ju }

import akka.util.OptionVal
import akka.serialization.Serialization
import scala.collection.JavaConverters._
import java.nio.ByteBuffer

import com.datastax.oss.protocol.internal.util.Bytes
import akka.actor.ActorSystem
import akka.persistence.query.TimeBasedUUID

/**
* An Extractor takes reads a row from the messages table. There are different extractors
Expand Down Expand Up @@ -49,10 +53,6 @@ import akka.actor.ActorSystem

final case class SeqNrValue(sequenceNr: Long)

final case class PersistentReprValue(persistentRepr: PersistentRepr) {
def sequenceNr: Long = persistentRepr.sequenceNr
}

private[akka] def deserializeRawEvent(
system: ActorSystem,
bucketSize: BucketSize,
Expand Down Expand Up @@ -124,10 +124,17 @@ import akka.actor.ActorSystem
}
}

def persistentRepr(e: EventDeserializer, s: Serialization): Extractor[PersistentReprValue] =
new Extractor[PersistentReprValue] {
override def extract(row: Row, async: Boolean)(implicit ec: ExecutionContext): Future[PersistentReprValue] =
extractPersistentRepr(row, e, s, async).map(PersistentReprValue.apply)
def persistentRepr(e: EventDeserializer, s: Serialization): Extractor[PersistentRepr] =
new Extractor[PersistentRepr] {
override def extract(row: Row, async: Boolean)(implicit ec: ExecutionContext): Future[PersistentRepr] =
extractPersistentRepr(row, e, s, async)
}

def persistentReprAndOffset(e: EventDeserializer, s: Serialization): Extractor[(PersistentRepr, TimeBasedUUID)] =
new Extractor[(PersistentRepr, TimeBasedUUID)] {
override def extract(row: Row, async: Boolean)(
implicit ec: ExecutionContext): Future[(PersistentRepr, TimeBasedUUID)] =
extractPersistentRepr(row, e, s, async).map(repr => repr -> TimeBasedUUID(row.getUuid("timestamp")))
}

def taggedPersistentRepr(ed: EventDeserializer, s: Serialization): Extractor[TaggedPersistentRepr] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ import akka.stream.scaladsl.Source
settings.journalSettings.readProfile,
"asyncReplayMessages",
extractor = Extractors.persistentRepr(eventDeserializer, serialization))
.map(p => queries.mapEvent(p.persistentRepr))
.map(queries.mapEvent)
.runForeach(replayCallback)
.map(_ => ())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ class CassandraReadJournal(scaladslReadJournal: akka.persistence.cassandra.query
*
* The offset of each event is provided in the streamed envelopes returned,
* which makes it possible to resume the stream at a later point from a given offset.
* The `offset` parameter is exclusive, i.e. the event corresponding to the given `offset` parameter is not
* included in the stream. The `Offset` type is `akka.persistence.query.TimeBasedUUID`.
*
* For querying events that happened after a long unix timestamp you can use [[timeBasedUUIDFrom]]
* to create the offset to use with this method.
Expand Down Expand Up @@ -160,19 +162,19 @@ class CassandraReadJournal(scaladslReadJournal: akka.persistence.cassandra.query
/**
* `eventsByPersistenceId` is used to retrieve a stream of events for a particular persistenceId.
*
* In addition to the `offset` the `EventEnvelope` also provides `persistenceId` and `sequenceNr`
* The `EventEnvelope` contains the event and provides `persistenceId` and `sequenceNr`
* for each event. The `sequenceNr` is the sequence number for the persistent actor with the
* `persistenceId` that persisted the event. The `persistenceId` + `sequenceNr` is an unique
* identifier for the event.
*
* `sequenceNr` and `offset` are always the same for an event and they define ordering for events
* emitted by this query. Causality is guaranteed (`sequenceNr`s of events for a particular
* `persistenceId` are always ordered in a sequence monotonically increasing by one). Multiple
* executions of the same bounded stream are guaranteed to emit exactly the same stream of events.
*
* `fromSequenceNr` and `toSequenceNr` can be specified to limit the set of returned events.
* The `fromSequenceNr` and `toSequenceNr` are inclusive.
*
* The `EventEnvelope` also provides an `offset`, which is the same kind of offset as is used in the
* `eventsByTag` query. The `Offset` type is `akka.persistence.query.TimeBasedUUID`.
*
* The returned event stream is ordered by `sequenceNr`.
*
* Deleted events are also deleted from the event stream.
*
* The stream is not completed when it reaches the end of the currently stored events,
Expand Down Expand Up @@ -210,12 +212,6 @@ class CassandraReadJournal(scaladslReadJournal: akka.persistence.cassandra.query
* but it continues to push new `persistenceId`s when new events are persisted.
* Corresponding query that is completed when it reaches the end of the currently
* known `persistenceId`s is provided by `currentPersistenceIds`.
*
* Note the query is inefficient, especially for large numbers of `persistenceId`s, because
* of limitation of current internal implementation providing no information supporting
* ordering/offset queries. The query uses Cassandra's `select distinct` capabilities.
* More importantly the live query has to repeatedly execute the query each `refresh-interval`,
* because order is not defined and new `persistenceId`s may appear anywhere in the query results.
*/
override def persistenceIds(): Source[String, NotUsed] =
scaladslReadJournal.persistenceIds().asJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ class CassandraReadJournal protected (
*
* The offset of each event is provided in the streamed envelopes returned,
* which makes it possible to resume the stream at a later point from a given offset.
* The `offset` parameter is exclusive, i.e. the event corresponding to the given `offset` parameter is not
* included in the stream. The `Offset` type is `akka.persistence.query.TimeBasedUUID`.
*
* For querying events that happened after a long unix timestamp you can use [[timeBasedUUIDFrom]]
* to create the offset to use with this method.
Expand Down Expand Up @@ -513,19 +515,19 @@ class CassandraReadJournal protected (
/**
* `eventsByPersistenceId` is used to retrieve a stream of events for a particular persistenceId.
*
* In addition to the `offset` the `EventEnvelope` also provides `persistenceId` and `sequenceNr`
* The `EventEnvelope` contains the event and provides `persistenceId` and `sequenceNr`
* for each event. The `sequenceNr` is the sequence number for the persistent actor with the
* `persistenceId` that persisted the event. The `persistenceId` + `sequenceNr` is an unique
* identifier for the event.
*
* `sequenceNr` and `offset` are always the same for an event and they define ordering for events
* emitted by this query. Causality is guaranteed (`sequenceNr`s of events for a particular
* `persistenceId` are always ordered in a sequence monotonically increasing by one). Multiple
* executions of the same bounded stream are guaranteed to emit exactly the same stream of events.
*
* `fromSequenceNr` and `toSequenceNr` can be specified to limit the set of returned events.
* The `fromSequenceNr` and `toSequenceNr` are inclusive.
*
* The `EventEnvelope` also provides an `offset`, which is the same kind of offset as is used in the
* `eventsByTag` query. The `Offset` type is `akka.persistence.query.TimeBasedUUID`.
*
* The returned event stream is ordered by `sequenceNr`.
*
* Deleted events are also deleted from the event stream.
*
* The stream is not completed when it reaches the end of the currently stored events,
Expand All @@ -545,10 +547,11 @@ class CassandraReadJournal protected (
Some(querySettings.refreshInterval),
querySettings.readProfile,
s"eventsByPersistenceId-$persistenceId",
extractor = Extractors.persistentRepr(eventsByPersistenceIdDeserializer, serialization))
extractor = Extractors.persistentReprAndOffset(eventsByPersistenceIdDeserializer, serialization))
.mapConcat {
case (persistentRepr, offset) => toEventEnvelope(mapEvent(persistentRepr), offset)
}
.mapMaterializedValue(_ => NotUsed)
.map(p => mapEvent(p.persistentRepr))
.mapConcat(r => toEventEnvelopes(r, r.sequenceNr))

/**
* Same type of query as `eventsByPersistenceId` but the event stream
Expand All @@ -567,10 +570,11 @@ class CassandraReadJournal protected (
None,
querySettings.readProfile,
s"currentEventsByPersistenceId-$persistenceId",
extractor = Extractors.persistentRepr(eventsByPersistenceIdDeserializer, serialization))
extractor = Extractors.persistentReprAndOffset(eventsByPersistenceIdDeserializer, serialization))
.mapConcat {
case (persistentRepr, offset) => toEventEnvelope(mapEvent(persistentRepr), offset)
}
.mapMaterializedValue(_ => NotUsed)
.map(p => mapEvent(p.persistentRepr))
.mapConcat(r => toEventEnvelopes(r, r.sequenceNr))

/**
* INTERNAL API
Expand All @@ -589,8 +593,10 @@ class CassandraReadJournal protected (
refreshInterval.orElse(Some(querySettings.refreshInterval)),
settings.journalSettings.readProfile, // write journal read-profile
s"eventsByPersistenceId-$persistenceId",
extractor = Extractors.persistentRepr(eventsByPersistenceIdDeserializer, serialization),
fastForwardEnabled = true).map(p => mapEvent(p.persistentRepr)).mapConcat(r => toEventEnvelopes(r, r.sequenceNr))
extractor = Extractors.persistentReprAndOffset(eventsByPersistenceIdDeserializer, serialization),
fastForwardEnabled = true).mapConcat {
case (persistentRepr, offset) => toEventEnvelope(mapEvent(persistentRepr), offset)
}

/**
* INTERNAL API: This is a low-level method that return journal events as they are persisted.
Expand Down Expand Up @@ -644,11 +650,6 @@ class CassandraReadJournal protected (
@InternalApi private[akka] def mapEvent(persistentRepr: PersistentRepr): PersistentRepr =
persistentRepr

private def toEventEnvelopes(persistentRepr: PersistentRepr, offset: Long): immutable.Iterable[EventEnvelope] =
adaptFromJournal(persistentRepr).map { payload =>
EventEnvelope(Offset.sequence(offset), persistentRepr.persistenceId, persistentRepr.sequenceNr, payload, 0L)
}

private def toEventEnvelope(persistentRepr: PersistentRepr, offset: Offset): immutable.Iterable[EventEnvelope] =
adaptFromJournal(persistentRepr).map { payload =>
EventEnvelope(offset, persistentRepr.persistenceId, persistentRepr.sequenceNr, payload, timestampFrom(offset))
Expand Down Expand Up @@ -682,12 +683,6 @@ class CassandraReadJournal protected (
* but it continues to push new `persistenceId`s when new events are persisted.
* Corresponding query that is completed when it reaches the end of the currently
* known `persistenceId`s is provided by `currentPersistenceIds`.
*
* Note the query is inefficient, especially for large numbers of `persistenceId`s, because
* of limitation of current internal implementation providing no information supporting
* ordering/offset queries. The query uses Cassandra's `select distinct` capabilities.
* More importantly the live query has to repeatedly execute the query each `refresh-interval`,
* because order is not defined and new `persistenceId`s may appear anywhere in the query results.
*/
override def persistenceIds(): Source[String, NotUsed] =
persistenceIds(Some(querySettings.refreshInterval), "allPersistenceIds")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ import java.util.UUID

import akka.actor.ActorRef
import akka.persistence.cassandra.{ CassandraLifecycle, CassandraSpec }
import akka.persistence.query.Offset
import akka.persistence.{ DeleteMessagesSuccess, PersistentRepr }
import akka.stream.KillSwitches
import akka.stream.scaladsl.Keep
import akka.stream.testkit.scaladsl.TestSink
import com.typesafe.config.ConfigFactory

import scala.concurrent.duration._

import akka.persistence.query.TimeBasedUUID
import akka.stream.scaladsl.Sink
import akka.util.UUIDComparator

object EventsByPersistenceIdSpec {
val config = ConfigFactory.parseString(s"""
akka.persistence.cassandra.journal.target-partition-size = 15
Expand Down Expand Up @@ -111,18 +113,30 @@ class EventsByPersistenceIdSpec extends CassandraSpec(EventsByPersistenceIdSpec.
src.runWith(TestSink.probe[Any]).request(2).expectComplete()
}

"produce correct sequence of sequence numbers and offsets" in {
"produce correct sequence of sequence numbers" in {
setup("h", 3)

val src = queries.currentEventsByPersistenceId("h", 0L, Long.MaxValue)
src
.map(x => (x.persistenceId, x.sequenceNr, x.offset))
.map(x => (x.persistenceId, x.sequenceNr))
.runWith(TestSink.probe[Any])
.request(4)
.expectNext(("h", 1, Offset.sequence(1)), ("h", 2, Offset.sequence(2)), ("h", 3, Offset.sequence(3)))
.expectNext(("h", 1), ("h", 2), ("h", 3))
.expectComplete()
}

"produce correct offsets" in {
setup("offset", 3)

val src = queries.currentEventsByPersistenceId("offset", 0L, Long.MaxValue)
val offsets =
src.map(x => x.offset).runWith(Sink.seq).futureValue

val timeUuids = offsets.map(_.asInstanceOf[TimeBasedUUID].value)
UUIDComparator.comparator.compare(timeUuids.head, timeUuids(1)) should be < 0
UUIDComparator.comparator.compare(timeUuids(1), timeUuids(2)) should be < 0
}

"produce correct sequence of events across multiple partitions" in {
setup("i", 20)

Expand All @@ -146,13 +160,18 @@ class EventsByPersistenceIdSpec extends CassandraSpec(EventsByPersistenceIdSpec.
val (killKill, probe) = queries
.eventsByPersistenceId("jo", 0L, Long.MaxValue)
.viaMat(KillSwitches.single)(Keep.right)
.map(x => (x.persistenceId, x.sequenceNr, x.offset))
.map(x => (x.event, x.sequenceNr, x.offset))
.toMat(TestSink.probe[Any])(Keep.both)
.run()

probe.request(5)

probe.expectNext(("jo", 1, Offset.sequence(1)), ("jo", 2, Offset.sequence(2)))
val (event1, seqNr1, TimeBasedUUID(uuid1)) = probe.requestNext()
event1 shouldBe "jo-1"
seqNr1 shouldBe 1
val (event2, seqNr2, TimeBasedUUID(uuid2)) = probe.requestNext()
event2 shouldBe "jo-2"
seqNr2 shouldBe 2
system.log.debug("Saw evt 1 and 2")

// 4 arrived out of order
Expand All @@ -169,7 +188,22 @@ class EventsByPersistenceIdSpec extends CassandraSpec(EventsByPersistenceIdSpec.
writeTestEvent(PersistentRepr("jo-3", 3L, "jo"))
system.log.debug("Wrote evt 3")

probe.expectNext(("jo", 3, Offset.sequence(3)), ("jo", 4, Offset.sequence(4)), ("jo", 5, Offset.sequence(5)))
val (event3, seqNr3, TimeBasedUUID(uuid3)) = probe.requestNext()
event3 shouldBe "jo-3"
seqNr3 shouldBe 3

val (event4, seqNr4, TimeBasedUUID(uuid4)) = probe.requestNext()
event4 shouldBe "jo-4"
seqNr4 shouldBe 4

val (event5, seqNr5, TimeBasedUUID(uuid5)) = probe.requestNext()
event5 shouldBe "jo-5"
seqNr5 shouldBe 5

UUIDComparator.comparator.compare(uuid1, uuid2) should be < 0
UUIDComparator.comparator.compare(uuid2, uuid4) should be < 0
UUIDComparator.comparator.compare(uuid4, uuid5) should be < 0
UUIDComparator.comparator.compare(uuid5, uuid3) should be < 0

killKill.shutdown()
probe.expectComplete()
Expand Down

0 comments on commit 23844fa

Please sign in to comment.