From 5be6ee0160cdb9210d629a7eb7f41ed2d5fd387e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 10 Sep 2020 10:37:35 +0200 Subject: [PATCH] Replicated event sourcing support (#811) * Add support for meta data for Replicated Event Sourcing * Remove the old commercial metadata support for events --- .../cassandra/EventWithMetaData.scala | 28 ------- .../persistence/cassandra/Extractors.scala | 23 ++++-- .../cassandra/SnapshotWithMetaData.scala | 28 ------- .../cassandra/journal/CassandraJournal.scala | 47 ++++++----- .../akka/persistence/cassandra/package.scala | 20 ++--- .../query/scaladsl/CassandraReadJournal.scala | 30 ++++--- .../snapshot/CassandraSnapshotStore.scala | 82 +++++++++---------- .../cassandra/EventsByTagMigrationSpec.scala | 8 +- .../journal/CassandraJournalSpec.scala | 8 ++ .../journal/CassandraSerializationSpec.scala | 37 +-------- .../cassandra/query/EventsByTagSpec.scala | 25 +----- .../cassandra/query/TestActor.scala | 5 -- .../snapshot/CassandraSnapshotStoreSpec.scala | 8 +- project/Dependencies.scala | 2 +- 14 files changed, 123 insertions(+), 228 deletions(-) delete mode 100644 core/src/main/scala/akka/persistence/cassandra/EventWithMetaData.scala delete mode 100644 core/src/main/scala/akka/persistence/cassandra/SnapshotWithMetaData.scala diff --git a/core/src/main/scala/akka/persistence/cassandra/EventWithMetaData.scala b/core/src/main/scala/akka/persistence/cassandra/EventWithMetaData.scala deleted file mode 100644 index ced2a1fef..000000000 --- a/core/src/main/scala/akka/persistence/cassandra/EventWithMetaData.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (C) 2016-2020 Lightbend Inc. - */ - -package akka.persistence.cassandra - -object EventWithMetaData { - - /** - * If meta data could not be deserialized it will not fail the replay/query. - * The "invalid" meta data is represented with this `UnknownMetaData` and - * it and the event will be wrapped in `EventWithMetaData`. - * - * The reason for not failing the replay/query is that meta data should be - * optional, e.g. the tool that wrote the meta data has been removed. This - * is typically because the serializer for the meta data has been removed - * from the class path (or configuration). - */ - final case class UnknownMetaData(serializerId: Int, manifest: String) -} - -/** - * If the event is wrapped in this class the `metaData` will - * be serialized and stored in the `meta` column. This can be used by event - * adapters or other tools to store additional meta data without altering - * the actual domain event. - */ -final case class EventWithMetaData(event: Any, metaData: Any) diff --git a/core/src/main/scala/akka/persistence/cassandra/Extractors.scala b/core/src/main/scala/akka/persistence/cassandra/Extractors.scala index 721640c11..a8272954a 100644 --- a/core/src/main/scala/akka/persistence/cassandra/Extractors.scala +++ b/core/src/main/scala/akka/persistence/cassandra/Extractors.scala @@ -174,15 +174,20 @@ import akka.persistence.query.TimeBasedUUID implicit ec: ExecutionContext): Future[PersistentRepr] = { def deserializeEvent(): Future[PersistentRepr] = { - ed.deserializeEvent(row, async).map { payload => - PersistentRepr( - payload, - sequenceNr = row.getLong("sequence_nr"), - persistenceId = row.getString("persistence_id"), - manifest = row.getString("event_manifest"), // manifest for event adapters - deleted = false, - sender = null, - writerUuid = row.getString("writer_uuid")) + ed.deserializeEvent(row, async).map { + case DeserializedEvent(payload, metadata) => + val repr = PersistentRepr( + payload, + sequenceNr = row.getLong("sequence_nr"), + persistenceId = row.getString("persistence_id"), + manifest = row.getString("event_manifest"), // manifest for event adapters + deleted = false, + sender = null, + writerUuid = row.getString("writer_uuid")) + metadata match { + case OptionVal.None => repr + case OptionVal.Some(m) => repr.withMetadata(m) + } } } diff --git a/core/src/main/scala/akka/persistence/cassandra/SnapshotWithMetaData.scala b/core/src/main/scala/akka/persistence/cassandra/SnapshotWithMetaData.scala deleted file mode 100644 index 0be1b7c1c..000000000 --- a/core/src/main/scala/akka/persistence/cassandra/SnapshotWithMetaData.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (C) 2016-2020 Lightbend Inc. - */ - -package akka.persistence.cassandra - -object SnapshotWithMetaData { - - /** - * If meta data could not be deserialized it will not fail the query. - * The "invalid" meta data is represented with this `UnknownMetaData` and - * it and the event will be wrapped in `SnapshotWithMetaData`. - * - * The reason for not failing the query is that meta data should be - * optional, e.g. the tool that wrote the meta data has been removed. This - * is typically because the serializer for the meta data has been removed - * from the class path (or configuration). - */ - final case class UnknownMetaData(serializerId: Int, manifest: String) -} - -/** - * If the event is wrapped in this class the `metaData` will - * be serialized and stored in the `meta` column. This can be used by event - * adapters or other tools to store additional meta data without altering - * the actual domain event. - */ -final case class SnapshotWithMetaData(event: Any, metaData: Any) diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala index ae205413f..12e0a405b 100644 --- a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala +++ b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala @@ -15,7 +15,6 @@ import akka.annotation.InternalApi import akka.event.{ Logging, LoggingAdapter } import akka.pattern.pipe import akka.persistence._ -import akka.persistence.cassandra.EventWithMetaData.UnknownMetaData import akka.persistence.cassandra._ import akka.persistence.cassandra.Extractors import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal @@ -26,6 +25,7 @@ import akka.persistence.cassandra.journal.TagWriter.TagProgress import akka.serialization.{ AsyncSerializer, Serialization, SerializationExtension } import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessionRegistry } import akka.stream.scaladsl.Sink +import akka.dispatch.ExecutionContexts import akka.util.OptionVal import com.datastax.oss.driver.api.core.cql._ import com.typesafe.config.Config @@ -847,12 +847,16 @@ import akka.stream.scaladsl.Source case object HealthCheckQuery extends HealthCheck case object HealthCheckResponse extends HealthCheck + final case class DeserializedEvent(event: Any, meta: OptionVal[Any]) + class EventDeserializer(system: ActorSystem) { + private val log = Logging(system, this.getClass) + private val serialization = SerializationExtension(system) val columnDefinitionCache = new ColumnDefinitionCache - def deserializeEvent(row: Row, async: Boolean)(implicit ec: ExecutionContext): Future[Any] = + def deserializeEvent(row: Row, async: Boolean)(implicit ec: ExecutionContext): Future[DeserializedEvent] = try { def meta: OptionVal[AnyRef] = { @@ -864,14 +868,19 @@ import akka.stream.scaladsl.Source // has meta data, wrap in EventWithMetaData val metaSerId = row.getInt("meta_ser_id") val metaSerManifest = row.getString("meta_ser_manifest") - val meta = serialization.deserialize(Bytes.getArray(metaBytes), metaSerId, metaSerManifest) match { - case Success(m) => m - case Failure(_) => - // don't fail replay/query because of deserialization problem with meta data - // see motivation in UnknownMetaData - UnknownMetaData(metaSerId, metaSerManifest) + serialization.deserialize(Bytes.getArray(metaBytes), metaSerId, metaSerManifest) match { + case Success(m) => OptionVal.Some(m) + case Failure(ex) => + log.warning( + "Deserialization of event metadata failed (pid: [{}], seq_nr: [{}], meta_ser_id: [{}], meta_ser_manifest: [{}], ignoring metadata content. Exception: {}", + Array( + row.getString("persistence_id"), + row.getLong("sequence_nr"), + metaSerId, + metaSerManifest, + ex.toString)) + OptionVal.None } - OptionVal.Some(meta) } } else { // for backwards compatibility, when table was not altered, meta columns not added @@ -883,30 +892,20 @@ import akka.stream.scaladsl.Source val serId = row.getInt("ser_id") val manifest = row.getString("ser_manifest") - serialization.serializerByIdentity.get(serId) match { + (serialization.serializerByIdentity.get(serId) match { case Some(asyncSerializer: AsyncSerializer) => Serialization.withTransportInformation(system.asInstanceOf[ExtendedActorSystem]) { () => - asyncSerializer.fromBinaryAsync(bytes, manifest).map { event => - meta match { - case OptionVal.None => event - case OptionVal.Some(m) => EventWithMetaData(event, m) - } - } + asyncSerializer.fromBinaryAsync(bytes, manifest) } case _ => - def deserializedEvent: AnyRef = { + def deserializedEvent: AnyRef = // Serialization.deserialize adds transport info - val event = serialization.deserialize(bytes, serId, manifest).get - meta match { - case OptionVal.None => event - case OptionVal.Some(m) => EventWithMetaData(event, m) - } - } + serialization.deserialize(bytes, serId, manifest).get if (async) Future(deserializedEvent) else Future.successful(deserializedEvent) - } + }).map(event => DeserializedEvent(event, meta))(ExecutionContexts.parasitic) } catch { case NonFatal(e) => Future.failed(e) diff --git a/core/src/main/scala/akka/persistence/cassandra/package.scala b/core/src/main/scala/akka/persistence/cassandra/package.scala index 089fd7159..d27320964 100644 --- a/core/src/main/scala/akka/persistence/cassandra/package.scala +++ b/core/src/main/scala/akka/persistence/cassandra/package.scala @@ -62,21 +62,15 @@ package object cassandra { def serializeMeta(): Option[SerializedMeta] = // meta data, if any - p.payload match { - case EventWithMetaData(_, m) => - val m2 = m.asInstanceOf[AnyRef] - val serializer = serialization.findSerializerFor(m2) - val serManifest = Serializers.manifestFor(serializer, m2) - val metaBuf = ByteBuffer.wrap(serialization.serialize(m2).get) - Some(SerializedMeta(metaBuf, serManifest, serializer.identifier)) - case _ => None + p.metadata.map { m => + val m2 = m.asInstanceOf[AnyRef] + val serializer = serialization.findSerializerFor(m2) + val serManifest = Serializers.manifestFor(serializer, m2) + val metaBuf = ByteBuffer.wrap(serialization.serialize(m2).get) + SerializedMeta(metaBuf, serManifest, serializer.identifier) } - val event: AnyRef = (p.payload match { - case EventWithMetaData(evt, _) => evt // unwrap - case evt => evt - }).asInstanceOf[AnyRef] - + val event: AnyRef = p.payload.asInstanceOf[AnyRef] val serializer = serialization.findSerializerFor(event) val serManifest = Serializers.manifestFor(serializer, event) diff --git a/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala b/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala index afe510186..51093a9ed 100644 --- a/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala +++ b/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala @@ -27,17 +27,20 @@ import akka.stream.ActorAttributes import akka.util.ByteString import com.datastax.oss.driver.api.core.cql._ import com.typesafe.config.Config + import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.{ Failure, Success } import scala.util.control.NonFatal - import akka.persistence.cassandra.PluginSettings import akka.persistence.cassandra.CassandraStatements +import akka.persistence.cassandra.journal.CassandraJournal +import akka.persistence.cassandra.journal.CassandraJournal.DeserializedEvent import akka.serialization.SerializationExtension import akka.stream.alpakka.cassandra.CassandraSessionSettings import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessionRegistry } +import akka.util.OptionVal import com.datastax.oss.driver.api.core.CqlSession import com.datastax.oss.driver.api.core.uuid.Uuids @@ -345,16 +348,21 @@ class CassandraReadJournal protected ( Flow[EventsByTagStage.UUIDRow] .mapAsync(querySettings.deserializationParallelism) { uuidRow => val row = uuidRow.row - eventsByTagDeserializer.deserializeEvent(row, deserializeEventAsync).map { payload => - val repr = mapEvent(PersistentRepr( - payload, - sequenceNr = uuidRow.sequenceNr, - persistenceId = uuidRow.persistenceId, - manifest = row.getString("event_manifest"), - deleted = false, - sender = null, - writerUuid = row.getString("writer_uuid"))) - UUIDPersistentRepr(uuidRow.offset, uuidRow.tagPidSequenceNr, repr) + eventsByTagDeserializer.deserializeEvent(row, deserializeEventAsync).map { + case DeserializedEvent(payload, metadata) => + val repr = mapEvent(PersistentRepr( + payload, + sequenceNr = uuidRow.sequenceNr, + persistenceId = uuidRow.persistenceId, + manifest = row.getString("event_manifest"), + deleted = false, + sender = null, + writerUuid = row.getString("writer_uuid"))) + val reprWithMeta = metadata match { + case OptionVal.None => repr + case OptionVal.Some(metadata) => repr.withMetadata(metadata) + } + UUIDPersistentRepr(uuidRow.offset, uuidRow.tagPidSequenceNr, reprWithMeta) } } .withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher)) diff --git a/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala b/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala index 19ee70b7a..3bd5b45e5 100644 --- a/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala +++ b/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala @@ -16,7 +16,6 @@ import scala.concurrent.Future import scala.util.Failure import scala.util.Success import scala.util.control.NonFatal - import akka.Done import akka.actor._ import akka.annotation.InternalApi @@ -37,6 +36,8 @@ import com.datastax.oss.protocol.internal.util.Bytes import com.typesafe.config.Config import akka.Done import akka.annotation.InternalApi +import akka.dispatch.ExecutionContexts +import akka.event.Logging import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessionRegistry } /** @@ -125,7 +126,10 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi case md +: mds => load1Async(md) .map { - case Snapshot(s) => Some(SelectedSnapshot(md, s)) + case DeserializedSnapshot(payload, OptionVal.Some(snapshotMeta)) => + Some(SelectedSnapshot(md.withMetadata(snapshotMeta), payload)) + case DeserializedSnapshot(payload, OptionVal.None) => + Some(SelectedSnapshot(md, payload)) } .recoverWith { case _: NoSuchElementException if metadata.size == 1 => @@ -154,7 +158,7 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi } } - private def load1Async(metadata: SnapshotMetadata): Future[Snapshot] = { + private def load1Async(metadata: SnapshotMetadata): Future[DeserializedSnapshot] = { val boundSelectSnapshot = preparedSelectSnapshot.map( _.bind(metadata.persistenceId, metadata.sequenceNr: JLong).setExecutionProfileName(snapshotSettings.readProfile)) boundSelectSnapshot.flatMap(session.selectOne).flatMap { @@ -167,16 +171,17 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi case Some(row) => row.getByteBuffer("snapshot") match { case null => - snapshotDeserializer.deserializeSnapshot(row).map(Snapshot.apply) + snapshotDeserializer.deserializeSnapshot(row) case bytes => // for backwards compatibility - Future.successful(serialization.deserialize(Bytes.getArray(bytes), classOf[Snapshot]).get) + val payload = serialization.deserialize(Bytes.getArray(bytes), classOf[Snapshot]).get.data + Future.successful(DeserializedSnapshot(payload, OptionVal.None)) } } } override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = - serialize(snapshot).flatMap { ser => + serialize(snapshot, metadata.metadata).flatMap { ser => // using two separate statements with or without the meta data columns because // then users doesn't have to alter table and add the new columns if they don't use // the meta data feature @@ -262,24 +267,18 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi session.underlying().flatMap(_.executeAsync(batch.build()).toScala).map(_ => ()) } - private def serialize(payload: Any): Future[Serialized] = + private def serialize(payload: Any, meta: Option[Any]): Future[Serialized] = try { def serializeMeta(): Option[SerializedMeta] = - // meta data, if any - payload match { - case SnapshotWithMetaData(_, m) => - val m2 = m.asInstanceOf[AnyRef] - val serializer = serialization.findSerializerFor(m2) - val serManifest = Serializers.manifestFor(serializer, m2) - val metaBuf = ByteBuffer.wrap(serialization.serialize(m2).get) - Some(SerializedMeta(metaBuf, serManifest, serializer.identifier)) - case _ => None + meta.map { m => + val m2 = m.asInstanceOf[AnyRef] + val serializer = serialization.findSerializerFor(m2) + val serManifest = Serializers.manifestFor(serializer, m2) + val metaBuf = ByteBuffer.wrap(serialization.serialize(m2).get) + SerializedMeta(metaBuf, serManifest, serializer.identifier) } - val p: AnyRef = (payload match { - case SnapshotWithMetaData(snap, _) => snap // unwrap - case snap => snap - }).asInstanceOf[AnyRef] + val p: AnyRef = payload.asInstanceOf[AnyRef] val serializer = serialization.findSerializerFor(p) val serManifest = Serializers.manifestFor(serializer, p) serializer match { @@ -350,8 +349,12 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi private case class SerializedMeta(serialized: ByteBuffer, serManifest: String, serId: Int) + final case class DeserializedSnapshot(payload: Any, meta: OptionVal[Any]) + class SnapshotDeserializer(system: ActorSystem) { + private val log = Logging(system, this.getClass) + private val serialization = SerializationExtension(system) // cache to avoid repeated check via ColumnDefinitions @@ -365,7 +368,7 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi b } - def deserializeSnapshot(row: Row)(implicit ec: ExecutionContext): Future[Any] = + def deserializeSnapshot(row: Row)(implicit ec: ExecutionContext): Future[DeserializedSnapshot] = try { def meta: OptionVal[AnyRef] = @@ -377,14 +380,19 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi // has meta data, wrap in EventWithMetaData val metaSerId = row.getInt("meta_ser_id") val metaSerManifest = row.getString("meta_ser_manifest") - val meta = serialization.deserialize(Bytes.getArray(metaBytes), metaSerId, metaSerManifest) match { - case Success(m) => m - case Failure(_) => - // don't fail query because of deserialization problem with meta data - // see motivation in UnknownMetaData - SnapshotWithMetaData.UnknownMetaData(metaSerId, metaSerManifest) + serialization.deserialize(Bytes.getArray(metaBytes), metaSerId, metaSerManifest) match { + case Success(m) => OptionVal.Some(m) + case Failure(ex) => + log.warning( + "Deserialization of snapshot metadata failed (pid: [{}], seq_nr: [{}], meta_ser_id: [{}], meta_ser_manifest: [{}], ignoring metadata content. Exception: {}", + Array( + row.getString("persistence_id"), + row.getLong("sequence_nr"), + metaSerId, + metaSerManifest, + ex.toString)) + OptionVal.None } - OptionVal.Some(meta) } } else { // for backwards compatibility, when table was not altered, meta columns not added @@ -394,28 +402,18 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi val bytes = Bytes.getArray(row.getByteBuffer("snapshot_data")) val serId = row.getInt("ser_id") val manifest = row.getString("ser_manifest") - serialization.serializerByIdentity.get(serId) match { + (serialization.serializerByIdentity.get(serId) match { case Some(asyncSerializer: AsyncSerializer) => Serialization.withTransportInformation(system.asInstanceOf[ExtendedActorSystem]) { () => - asyncSerializer.fromBinaryAsync(bytes, manifest).map { payload => - meta match { - case OptionVal.None => payload - case OptionVal.Some(m) => SnapshotWithMetaData(payload, m) - } - } + asyncSerializer.fromBinaryAsync(bytes, manifest) } case _ => Future.successful { // Serialization.deserialize adds transport info - val payload = - serialization.deserialize(bytes, serId, manifest).get - meta match { - case OptionVal.None => payload - case OptionVal.Some(m) => SnapshotWithMetaData(payload, m) - } + serialization.deserialize(bytes, serId, manifest).get } - } + }).map(payload => DeserializedSnapshot(payload, meta))(ExecutionContexts.parasitic) } catch { case NonFatal(e) => Future.failed(e) diff --git a/core/src/test/scala/akka/persistence/cassandra/EventsByTagMigrationSpec.scala b/core/src/test/scala/akka/persistence/cassandra/EventsByTagMigrationSpec.scala index 19afecd4f..17f4adb09 100644 --- a/core/src/test/scala/akka/persistence/cassandra/EventsByTagMigrationSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/EventsByTagMigrationSpec.scala @@ -125,7 +125,7 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec { writeOldTestEventWithTags( PersistentRepr("g-1", 1L, pidWithMeta), Set("blue"), - Some("This is the best event ever")) + Some("Meta: This is the best event ever")) // These events have been snapshotted writeOldTestEventWithTags(PersistentRepr("h-1", 10L, pidWithSnapshot), Set("red")) @@ -153,7 +153,7 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec { writeOldTestEventWithTags( PersistentRepr("g-1", 1L, pidWithMeta), Set("blue"), - Some("This is the best event ever")) + Some("Meta: This is the best event ever")) // since we are writing the events directly the all_persistence_ids table must also be updated reconciler.rebuildAllPersistenceIds().futureValue @@ -185,7 +185,7 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec { blueProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 4, "e-4") => } blueProbe.expectNextPF { case EventEnvelope(_, `pidTwo`, 2, "f-2") => } blueProbe.expectNextPF { - case EventEnvelope(_, `pidWithMeta`, 1, EventWithMetaData("g-1", "This is the best event ever")) => + case EventEnvelope(_, `pidWithMeta`, 1, "g-1") => } blueProbe.expectNoMessage(waitTime) blueProbe.cancel() @@ -281,7 +281,7 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec { blueProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 4, "e-4") => } blueProbe.expectNextPF { case EventEnvelope(_, `pidTwo`, 2, "f-2") => } blueProbe.expectNextPF { - case EventEnvelope(_, `pidWithMeta`, 1, EventWithMetaData("g-1", "This is the best event ever")) => + case EventEnvelope(_, `pidWithMeta`, 1, "g-1") => } blueProbe.expectNextPF { case EventEnvelope(_, `pidTwo`, 4, "f-4") => } blueProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 5, "new-event-1") => } diff --git a/core/src/test/scala/akka/persistence/cassandra/journal/CassandraJournalSpec.scala b/core/src/test/scala/akka/persistence/cassandra/journal/CassandraJournalSpec.scala index b0bdda191..ee32c8c0c 100644 --- a/core/src/test/scala/akka/persistence/cassandra/journal/CassandraJournalSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/journal/CassandraJournalSpec.scala @@ -5,6 +5,7 @@ package akka.persistence.cassandra.journal import akka.actor.Actor +import akka.persistence.CapabilityFlag import akka.persistence.{ AtomicWrite, PersistentRepr } import akka.persistence.JournalProtocol.{ ReplayMessages, WriteMessageFailure, WriteMessages, WriteMessagesFailed } @@ -76,6 +77,13 @@ class CassandraJournalSpec extends JournalSpec(CassandraJournalConfiguration.con } } +class CassandraJournalMetaSpec extends JournalSpec(CassandraJournalConfiguration.config) with CassandraLifecycle { + override def systemName: String = "CassandraJournalSpec" + + override def supportsRejectingNonSerializableObjects = false + protected override def supportsMetadata: CapabilityFlag = true +} + class CassandraJournalPerfSpec extends JournalPerfSpec(CassandraJournalConfiguration.perfConfig) with CassandraLifecycle { diff --git a/core/src/test/scala/akka/persistence/cassandra/journal/CassandraSerializationSpec.scala b/core/src/test/scala/akka/persistence/cassandra/journal/CassandraSerializationSpec.scala index 961b2de32..3fcff6d22 100644 --- a/core/src/test/scala/akka/persistence/cassandra/journal/CassandraSerializationSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/journal/CassandraSerializationSpec.scala @@ -6,8 +6,7 @@ package akka.persistence.cassandra.journal import akka.actor.{ ExtendedActorSystem, Props } import akka.persistence.RecoveryCompleted -import akka.persistence.cassandra.EventWithMetaData.UnknownMetaData -import akka.persistence.cassandra.{ CassandraLifecycle, CassandraSpec, EventWithMetaData, Persister } +import akka.persistence.cassandra.{ CassandraLifecycle, CassandraSpec, Persister } import akka.serialization.BaseSerializer import akka.testkit.TestProbe import com.typesafe.config.ConfigFactory @@ -64,40 +63,6 @@ class CassandraSerializationSpec extends CassandraSpec(CassandraSerializationSpe incarnation2 } - "be able to store meta data" in { - val probe = TestProbe() - val incarnation1 = system.actorOf(Props(new Persister("id2", probe.ref))) - probe.expectMsgType[RecoveryCompleted] - - val eventWithMeta = EventWithMetaData("TheActualEvent", "TheAdditionalMetaData") - incarnation1 ! eventWithMeta - probe.expectMsg(eventWithMeta) - - probe.watch(incarnation1) - system.stop(incarnation1) - probe.expectTerminated(incarnation1) - - system.actorOf(Props(new Persister("id2", probe.ref))) - probe.expectMsg(eventWithMeta) // from replay - } - - "not fail replay due to deserialization problem of meta data" in { - val probe = TestProbe() - val incarnation1 = system.actorOf(Props(new Persister("id3", probe.ref))) - probe.expectMsgType[RecoveryCompleted] - - val eventWithMeta = EventWithMetaData("TheActualEvent", CrapEvent(13)) - incarnation1 ! eventWithMeta - probe.expectMsg(eventWithMeta) - - probe.watch(incarnation1) - system.stop(incarnation1) - probe.expectTerminated(incarnation1) - - system.actorOf(Props(new Persister("id3", probe.ref))) - probe.expectMsg(EventWithMetaData("TheActualEvent", UnknownMetaData(666, ""))) // from replay, no meta - } - } } diff --git a/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala b/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala index 09555dcb2..35630a0f0 100644 --- a/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala @@ -12,7 +12,7 @@ import java.util.UUID import akka.actor.{ PoisonPill, Props } import akka.event.Logging.Warning import akka.persistence.cassandra.journal.CassandraJournalStatements -import akka.persistence.cassandra.{ CassandraLifecycle, CassandraSpec, Day, EventWithMetaData } +import akka.persistence.cassandra.{ CassandraLifecycle, CassandraSpec, Day } import akka.persistence.journal.{ Tagged, WriteEventAdapter } import akka.persistence.query.scaladsl.{ CurrentEventsByTagQuery, EventsByTagQuery } import akka.persistence.query.{ EventEnvelope, NoOffset, Offset, TimeBasedUUID } @@ -51,12 +51,10 @@ object EventsByTagSpec { event-adapters { color-tagger = akka.persistence.cassandra.query.ColorFruitTagger - metadata-tagger = akka.persistence.cassandra.query.EventWithMetaDataTagger } event-adapter-bindings = { "java.lang.String" = color-tagger - "akka.persistence.cassandra.EventWithMetaData" = metadata-tagger } } @@ -116,15 +114,6 @@ object EventsByTagSpec { """).withFallback(config) } -class EventWithMetaDataTagger extends WriteEventAdapter { - override def manifest(event: Any) = "" - override def toJournal(event: Any) = event match { - case evm: EventWithMetaData => - Tagged(evm, Set("gotmeta")) - case _ => event - } -} - class ColorFruitTagger extends WriteEventAdapter { val colors = Set("green", "black", "blue", "yellow", "orange") val fruits = Set("apple", "banana") @@ -536,18 +525,6 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) { }) } - "return events with their metadata" in { - val w1 = system.actorOf(TestActor.props("W1")) - withProbe(queries.eventsByTag(tag = "gotmeta", offset = NoOffset).runWith(TestSink.probe[Any]), probe => { - w1 ! EventWithMetaData("e1", "this is such a good message") - probe.request(2) - probe.expectNextPF { - case EventEnvelope(_, "W1", 1L, EventWithMetaData("e1", "this is such a good message")) => - } - probe.expectNoMessage(waitTime) - }) - } - "not complete for empty query" in { val probe = queries.eventsByTag(tag = "empty", offset = NoOffset).runWith(TestSink.probe[Any]) probe.request(2) diff --git a/core/src/test/scala/akka/persistence/cassandra/query/TestActor.scala b/core/src/test/scala/akka/persistence/cassandra/query/TestActor.scala index 6d954db89..7aaecf5f3 100644 --- a/core/src/test/scala/akka/persistence/cassandra/query/TestActor.scala +++ b/core/src/test/scala/akka/persistence/cassandra/query/TestActor.scala @@ -9,7 +9,6 @@ import akka.actor.Props import akka.persistence.PersistentActor import akka.actor.ActorRef import akka.persistence.DeleteMessagesSuccess -import akka.persistence.cassandra.EventWithMetaData import akka.persistence.journal.Tagged object TestActor { @@ -33,10 +32,6 @@ class TestActor(override val persistenceId: String, override val journalPluginId persist(cmd) { evt => sender() ! evt + "-done" } - case cmd: EventWithMetaData => - persist(cmd) { evt => - sender() ! s"$evt-done" - } case cmd: Tagged => persist(cmd) { evt => val msg = s"${evt.payload}-done" diff --git a/core/src/test/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStoreSpec.scala b/core/src/test/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStoreSpec.scala index e6e7ec9ea..bbbba594b 100644 --- a/core/src/test/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStoreSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStoreSpec.scala @@ -11,13 +11,13 @@ import java.nio.ByteBuffer import akka.persistence.SnapshotProtocol._ import akka.persistence._ import akka.persistence.cassandra.CassandraLifecycle -import akka.persistence.cassandra.SnapshotWithMetaData import akka.persistence.snapshot.SnapshotStoreSpec import akka.stream.alpakka.cassandra.CassandraMetricsRegistry import akka.testkit.TestProbe import com.datastax.oss.driver.api.core.cql.SimpleStatement import com.typesafe.config.ConfigFactory +import scala.annotation.meta import scala.collection.immutable.Seq object CassandraSnapshotStoreConfiguration { @@ -37,6 +37,8 @@ class CassandraSnapshotStoreSpec extends SnapshotStoreSpec(CassandraSnapshotStoreConfiguration.config) with CassandraLifecycle { + protected override def supportsMetadata: CapabilityFlag = true + val snapshotSettings = new SnapshotSettings(system, system.settings.config.getConfig("akka.persistence.cassandra")) @@ -149,14 +151,14 @@ class CassandraSnapshotStoreSpec // Somewhat confusing that two things are called meta data, SnapshotMetadata and SnapshotWithMetaData. // However, user facing is only SnapshotWithMetaData, and we can't change SnapshotMetadata because that // is in akka-persistence - snapshotStore.tell(SaveSnapshot(SnapshotMetadata(pid, 100), SnapshotWithMetaData("snap", "meta")), probe.ref) + snapshotStore.tell(SaveSnapshot(SnapshotMetadata(pid, 100).withMetadata("meta"), "snap"), probe.ref) probe.expectMsgType[SaveSnapshotSuccess] // load most recent snapshot snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria.Latest, Long.MaxValue), probe.ref) // get most recent snapshot val loaded = probe.expectMsgPF() { case LoadSnapshotResult(Some(snapshot), _) => snapshot } - loaded.snapshot should equal(SnapshotWithMetaData("snap", "meta")) + loaded.metadata.metadata should equal(Some("meta")) } "delete all snapshots matching upper sequence number and no timestamp bounds" in { diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 92e807c40..9dbd44eaf 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -6,7 +6,7 @@ object Dependencies { val Scala213 = "2.13.1" val ScalaVersions = Seq(Scala212, Scala213) - val AkkaVersion = System.getProperty("override.akka.version", "2.6.4") + val AkkaVersion = System.getProperty("override.akka.version", "2.6.9") val AkkaVersionInDocs = AkkaVersion.take(3) val CassandraVersionInDocs = "4.0" // Should be sync with the version of the driver in Alpakka Cassandra