diff --git a/core/src/main/scala/akka/persistence/cassandra/Extractors.scala b/core/src/main/scala/akka/persistence/cassandra/Extractors.scala index a2ae39edf..721640c11 100644 --- a/core/src/main/scala/akka/persistence/cassandra/Extractors.scala +++ b/core/src/main/scala/akka/persistence/cassandra/Extractors.scala @@ -7,17 +7,21 @@ package akka.persistence.cassandra import com.datastax.oss.driver.api.core.cql.Row import scala.concurrent.ExecutionContext import scala.concurrent.Future + import akka.persistence.PersistentRepr import akka.persistence.cassandra.journal._ import akka.persistence.cassandra.journal.CassandraJournal._ import akka.annotation.InternalApi import java.{ util => ju } + import akka.util.OptionVal import akka.serialization.Serialization import scala.collection.JavaConverters._ import java.nio.ByteBuffer + import com.datastax.oss.protocol.internal.util.Bytes import akka.actor.ActorSystem +import akka.persistence.query.TimeBasedUUID /** * An Extractor takes reads a row from the messages table. There are different extractors @@ -49,10 +53,6 @@ import akka.actor.ActorSystem final case class SeqNrValue(sequenceNr: Long) - final case class PersistentReprValue(persistentRepr: PersistentRepr) { - def sequenceNr: Long = persistentRepr.sequenceNr - } - private[akka] def deserializeRawEvent( system: ActorSystem, bucketSize: BucketSize, @@ -124,10 +124,17 @@ import akka.actor.ActorSystem } } - def persistentRepr(e: EventDeserializer, s: Serialization): Extractor[PersistentReprValue] = - new Extractor[PersistentReprValue] { - override def extract(row: Row, async: Boolean)(implicit ec: ExecutionContext): Future[PersistentReprValue] = - extractPersistentRepr(row, e, s, async).map(PersistentReprValue.apply) + def persistentRepr(e: EventDeserializer, s: Serialization): Extractor[PersistentRepr] = + new Extractor[PersistentRepr] { + override def extract(row: Row, async: Boolean)(implicit ec: ExecutionContext): Future[PersistentRepr] = + extractPersistentRepr(row, e, s, async) + } + + def persistentReprAndOffset(e: EventDeserializer, s: Serialization): Extractor[(PersistentRepr, TimeBasedUUID)] = + new Extractor[(PersistentRepr, TimeBasedUUID)] { + override def extract(row: Row, async: Boolean)( + implicit ec: ExecutionContext): Future[(PersistentRepr, TimeBasedUUID)] = + extractPersistentRepr(row, e, s, async).map(repr => repr -> TimeBasedUUID(row.getUuid("timestamp"))) } def taggedPersistentRepr(ed: EventDeserializer, s: Serialization): Extractor[TaggedPersistentRepr] = diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala index 32eb46189..75f079127 100644 --- a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala +++ b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala @@ -751,7 +751,7 @@ import akka.stream.scaladsl.Source settings.journalSettings.readProfile, "asyncReplayMessages", extractor = Extractors.persistentRepr(eventDeserializer, serialization)) - .map(p => queries.mapEvent(p.persistentRepr)) + .map(queries.mapEvent) .runForeach(replayCallback) .map(_ => ()) } diff --git a/core/src/main/scala/akka/persistence/cassandra/query/javadsl/CassandraReadJournal.scala b/core/src/main/scala/akka/persistence/cassandra/query/javadsl/CassandraReadJournal.scala index e5cd11975..8b9019883 100644 --- a/core/src/main/scala/akka/persistence/cassandra/query/javadsl/CassandraReadJournal.scala +++ b/core/src/main/scala/akka/persistence/cassandra/query/javadsl/CassandraReadJournal.scala @@ -112,6 +112,8 @@ class CassandraReadJournal(scaladslReadJournal: akka.persistence.cassandra.query * * The offset of each event is provided in the streamed envelopes returned, * which makes it possible to resume the stream at a later point from a given offset. + * The `offset` parameter is exclusive, i.e. the event corresponding to the given `offset` parameter is not + * included in the stream. The `Offset` type is `akka.persistence.query.TimeBasedUUID`. * * For querying events that happened after a long unix timestamp you can use [[timeBasedUUIDFrom]] * to create the offset to use with this method. @@ -160,19 +162,19 @@ class CassandraReadJournal(scaladslReadJournal: akka.persistence.cassandra.query /** * `eventsByPersistenceId` is used to retrieve a stream of events for a particular persistenceId. * - * In addition to the `offset` the `EventEnvelope` also provides `persistenceId` and `sequenceNr` + * The `EventEnvelope` contains the event and provides `persistenceId` and `sequenceNr` * for each event. The `sequenceNr` is the sequence number for the persistent actor with the * `persistenceId` that persisted the event. The `persistenceId` + `sequenceNr` is an unique * identifier for the event. * - * `sequenceNr` and `offset` are always the same for an event and they define ordering for events - * emitted by this query. Causality is guaranteed (`sequenceNr`s of events for a particular - * `persistenceId` are always ordered in a sequence monotonically increasing by one). Multiple - * executions of the same bounded stream are guaranteed to emit exactly the same stream of events. - * * `fromSequenceNr` and `toSequenceNr` can be specified to limit the set of returned events. * The `fromSequenceNr` and `toSequenceNr` are inclusive. * + * The `EventEnvelope` also provides an `offset`, which is the same kind of offset as is used in the + * `eventsByTag` query. The `Offset` type is `akka.persistence.query.TimeBasedUUID`. + * + * The returned event stream is ordered by `sequenceNr`. + * * Deleted events are also deleted from the event stream. * * The stream is not completed when it reaches the end of the currently stored events, @@ -210,12 +212,6 @@ class CassandraReadJournal(scaladslReadJournal: akka.persistence.cassandra.query * but it continues to push new `persistenceId`s when new events are persisted. * Corresponding query that is completed when it reaches the end of the currently * known `persistenceId`s is provided by `currentPersistenceIds`. - * - * Note the query is inefficient, especially for large numbers of `persistenceId`s, because - * of limitation of current internal implementation providing no information supporting - * ordering/offset queries. The query uses Cassandra's `select distinct` capabilities. - * More importantly the live query has to repeatedly execute the query each `refresh-interval`, - * because order is not defined and new `persistenceId`s may appear anywhere in the query results. */ override def persistenceIds(): Source[String, NotUsed] = scaladslReadJournal.persistenceIds().asJava diff --git a/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala b/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala index 6aa02a624..b58bea737 100644 --- a/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala +++ b/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala @@ -264,6 +264,8 @@ class CassandraReadJournal protected ( * * The offset of each event is provided in the streamed envelopes returned, * which makes it possible to resume the stream at a later point from a given offset. + * The `offset` parameter is exclusive, i.e. the event corresponding to the given `offset` parameter is not + * included in the stream. The `Offset` type is `akka.persistence.query.TimeBasedUUID`. * * For querying events that happened after a long unix timestamp you can use [[timeBasedUUIDFrom]] * to create the offset to use with this method. @@ -513,19 +515,19 @@ class CassandraReadJournal protected ( /** * `eventsByPersistenceId` is used to retrieve a stream of events for a particular persistenceId. * - * In addition to the `offset` the `EventEnvelope` also provides `persistenceId` and `sequenceNr` + * The `EventEnvelope` contains the event and provides `persistenceId` and `sequenceNr` * for each event. The `sequenceNr` is the sequence number for the persistent actor with the * `persistenceId` that persisted the event. The `persistenceId` + `sequenceNr` is an unique * identifier for the event. * - * `sequenceNr` and `offset` are always the same for an event and they define ordering for events - * emitted by this query. Causality is guaranteed (`sequenceNr`s of events for a particular - * `persistenceId` are always ordered in a sequence monotonically increasing by one). Multiple - * executions of the same bounded stream are guaranteed to emit exactly the same stream of events. - * * `fromSequenceNr` and `toSequenceNr` can be specified to limit the set of returned events. * The `fromSequenceNr` and `toSequenceNr` are inclusive. * + * The `EventEnvelope` also provides an `offset`, which is the same kind of offset as is used in the + * `eventsByTag` query. The `Offset` type is `akka.persistence.query.TimeBasedUUID`. + * + * The returned event stream is ordered by `sequenceNr`. + * * Deleted events are also deleted from the event stream. * * The stream is not completed when it reaches the end of the currently stored events, @@ -545,10 +547,11 @@ class CassandraReadJournal protected ( Some(querySettings.refreshInterval), querySettings.readProfile, s"eventsByPersistenceId-$persistenceId", - extractor = Extractors.persistentRepr(eventsByPersistenceIdDeserializer, serialization)) + extractor = Extractors.persistentReprAndOffset(eventsByPersistenceIdDeserializer, serialization)) + .mapConcat { + case (persistentRepr, offset) => toEventEnvelope(mapEvent(persistentRepr), offset) + } .mapMaterializedValue(_ => NotUsed) - .map(p => mapEvent(p.persistentRepr)) - .mapConcat(r => toEventEnvelopes(r, r.sequenceNr)) /** * Same type of query as `eventsByPersistenceId` but the event stream @@ -567,10 +570,11 @@ class CassandraReadJournal protected ( None, querySettings.readProfile, s"currentEventsByPersistenceId-$persistenceId", - extractor = Extractors.persistentRepr(eventsByPersistenceIdDeserializer, serialization)) + extractor = Extractors.persistentReprAndOffset(eventsByPersistenceIdDeserializer, serialization)) + .mapConcat { + case (persistentRepr, offset) => toEventEnvelope(mapEvent(persistentRepr), offset) + } .mapMaterializedValue(_ => NotUsed) - .map(p => mapEvent(p.persistentRepr)) - .mapConcat(r => toEventEnvelopes(r, r.sequenceNr)) /** * INTERNAL API @@ -589,8 +593,10 @@ class CassandraReadJournal protected ( refreshInterval.orElse(Some(querySettings.refreshInterval)), settings.journalSettings.readProfile, // write journal read-profile s"eventsByPersistenceId-$persistenceId", - extractor = Extractors.persistentRepr(eventsByPersistenceIdDeserializer, serialization), - fastForwardEnabled = true).map(p => mapEvent(p.persistentRepr)).mapConcat(r => toEventEnvelopes(r, r.sequenceNr)) + extractor = Extractors.persistentReprAndOffset(eventsByPersistenceIdDeserializer, serialization), + fastForwardEnabled = true).mapConcat { + case (persistentRepr, offset) => toEventEnvelope(mapEvent(persistentRepr), offset) + } /** * INTERNAL API: This is a low-level method that return journal events as they are persisted. @@ -644,11 +650,6 @@ class CassandraReadJournal protected ( @InternalApi private[akka] def mapEvent(persistentRepr: PersistentRepr): PersistentRepr = persistentRepr - private def toEventEnvelopes(persistentRepr: PersistentRepr, offset: Long): immutable.Iterable[EventEnvelope] = - adaptFromJournal(persistentRepr).map { payload => - EventEnvelope(Offset.sequence(offset), persistentRepr.persistenceId, persistentRepr.sequenceNr, payload, 0L) - } - private def toEventEnvelope(persistentRepr: PersistentRepr, offset: Offset): immutable.Iterable[EventEnvelope] = adaptFromJournal(persistentRepr).map { payload => EventEnvelope(offset, persistentRepr.persistenceId, persistentRepr.sequenceNr, payload, timestampFrom(offset)) @@ -682,12 +683,6 @@ class CassandraReadJournal protected ( * but it continues to push new `persistenceId`s when new events are persisted. * Corresponding query that is completed when it reaches the end of the currently * known `persistenceId`s is provided by `currentPersistenceIds`. - * - * Note the query is inefficient, especially for large numbers of `persistenceId`s, because - * of limitation of current internal implementation providing no information supporting - * ordering/offset queries. The query uses Cassandra's `select distinct` capabilities. - * More importantly the live query has to repeatedly execute the query each `refresh-interval`, - * because order is not defined and new `persistenceId`s may appear anywhere in the query results. */ override def persistenceIds(): Source[String, NotUsed] = persistenceIds(Some(querySettings.refreshInterval), "allPersistenceIds") diff --git a/core/src/test/scala/akka/persistence/cassandra/query/EventsByPersistenceIdSpec.scala b/core/src/test/scala/akka/persistence/cassandra/query/EventsByPersistenceIdSpec.scala index b89658acc..f300e7f9d 100644 --- a/core/src/test/scala/akka/persistence/cassandra/query/EventsByPersistenceIdSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/query/EventsByPersistenceIdSpec.scala @@ -8,15 +8,17 @@ import java.util.UUID import akka.actor.ActorRef import akka.persistence.cassandra.{ CassandraLifecycle, CassandraSpec } -import akka.persistence.query.Offset import akka.persistence.{ DeleteMessagesSuccess, PersistentRepr } import akka.stream.KillSwitches import akka.stream.scaladsl.Keep import akka.stream.testkit.scaladsl.TestSink import com.typesafe.config.ConfigFactory - import scala.concurrent.duration._ +import akka.persistence.query.TimeBasedUUID +import akka.stream.scaladsl.Sink +import akka.util.UUIDComparator + object EventsByPersistenceIdSpec { val config = ConfigFactory.parseString(s""" akka.persistence.cassandra.journal.target-partition-size = 15 @@ -111,18 +113,30 @@ class EventsByPersistenceIdSpec extends CassandraSpec(EventsByPersistenceIdSpec. src.runWith(TestSink.probe[Any]).request(2).expectComplete() } - "produce correct sequence of sequence numbers and offsets" in { + "produce correct sequence of sequence numbers" in { setup("h", 3) val src = queries.currentEventsByPersistenceId("h", 0L, Long.MaxValue) src - .map(x => (x.persistenceId, x.sequenceNr, x.offset)) + .map(x => (x.persistenceId, x.sequenceNr)) .runWith(TestSink.probe[Any]) .request(4) - .expectNext(("h", 1, Offset.sequence(1)), ("h", 2, Offset.sequence(2)), ("h", 3, Offset.sequence(3))) + .expectNext(("h", 1), ("h", 2), ("h", 3)) .expectComplete() } + "produce correct offsets" in { + setup("offset", 3) + + val src = queries.currentEventsByPersistenceId("offset", 0L, Long.MaxValue) + val offsets = + src.map(x => x.offset).runWith(Sink.seq).futureValue + + val timeUuids = offsets.map(_.asInstanceOf[TimeBasedUUID].value) + UUIDComparator.comparator.compare(timeUuids.head, timeUuids(1)) should be < 0 + UUIDComparator.comparator.compare(timeUuids(1), timeUuids(2)) should be < 0 + } + "produce correct sequence of events across multiple partitions" in { setup("i", 20) @@ -146,13 +160,18 @@ class EventsByPersistenceIdSpec extends CassandraSpec(EventsByPersistenceIdSpec. val (killKill, probe) = queries .eventsByPersistenceId("jo", 0L, Long.MaxValue) .viaMat(KillSwitches.single)(Keep.right) - .map(x => (x.persistenceId, x.sequenceNr, x.offset)) + .map(x => (x.event, x.sequenceNr, x.offset)) .toMat(TestSink.probe[Any])(Keep.both) .run() probe.request(5) - probe.expectNext(("jo", 1, Offset.sequence(1)), ("jo", 2, Offset.sequence(2))) + val (event1, seqNr1, TimeBasedUUID(uuid1)) = probe.requestNext() + event1 shouldBe "jo-1" + seqNr1 shouldBe 1 + val (event2, seqNr2, TimeBasedUUID(uuid2)) = probe.requestNext() + event2 shouldBe "jo-2" + seqNr2 shouldBe 2 system.log.debug("Saw evt 1 and 2") // 4 arrived out of order @@ -169,7 +188,22 @@ class EventsByPersistenceIdSpec extends CassandraSpec(EventsByPersistenceIdSpec. writeTestEvent(PersistentRepr("jo-3", 3L, "jo")) system.log.debug("Wrote evt 3") - probe.expectNext(("jo", 3, Offset.sequence(3)), ("jo", 4, Offset.sequence(4)), ("jo", 5, Offset.sequence(5))) + val (event3, seqNr3, TimeBasedUUID(uuid3)) = probe.requestNext() + event3 shouldBe "jo-3" + seqNr3 shouldBe 3 + + val (event4, seqNr4, TimeBasedUUID(uuid4)) = probe.requestNext() + event4 shouldBe "jo-4" + seqNr4 shouldBe 4 + + val (event5, seqNr5, TimeBasedUUID(uuid5)) = probe.requestNext() + event5 shouldBe "jo-5" + seqNr5 shouldBe 5 + + UUIDComparator.comparator.compare(uuid1, uuid2) should be < 0 + UUIDComparator.comparator.compare(uuid2, uuid4) should be < 0 + UUIDComparator.comparator.compare(uuid4, uuid5) should be < 0 + UUIDComparator.comparator.compare(uuid5, uuid3) should be < 0 killKill.shutdown() probe.expectComplete()