From 64ff2677dd74e178d6fb534221e7cf053fbc11b8 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Fri, 11 Sep 2020 16:17:01 +0100 Subject: [PATCH] Set metadata on events by persisence id query (#814) --- .../query/scaladsl/CassandraReadJournal.scala | 8 +++++++- .../cassandra/query/DirectWriting.scala | 18 ++++++++++++++++-- .../query/EventsByPersistenceIdSpec.scala | 8 ++++++++ 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala b/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala index 51093a9ed..937b655c5 100644 --- a/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala +++ b/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala @@ -663,7 +663,13 @@ class CassandraReadJournal protected ( private def toEventEnvelope(persistentRepr: PersistentRepr, offset: Offset): immutable.Iterable[EventEnvelope] = adaptFromJournal(persistentRepr).map { payload => - EventEnvelope(offset, persistentRepr.persistenceId, persistentRepr.sequenceNr, payload, timestampFrom(offset)) + EventEnvelope( + offset, + persistentRepr.persistenceId, + persistentRepr.sequenceNr, + payload, + timestampFrom(offset), + persistentRepr.metadata) } private def offsetToInternalOffset(offset: Offset): (UUID, Boolean) = diff --git a/core/src/test/scala/akka/persistence/cassandra/query/DirectWriting.scala b/core/src/test/scala/akka/persistence/cassandra/query/DirectWriting.scala index 141c6f245..08596d096 100644 --- a/core/src/test/scala/akka/persistence/cassandra/query/DirectWriting.scala +++ b/core/src/test/scala/akka/persistence/cassandra/query/DirectWriting.scala @@ -30,7 +30,7 @@ trait DirectWriting extends BeforeAndAfterAll { private lazy val writeStatements: CassandraJournalStatements = new CassandraJournalStatements(settings) - private lazy val preparedWriteMessage = cluster.prepare(writeStatements.writeMessage(withMeta = false)) + private lazy val preparedWriteMessage = cluster.prepare(writeStatements.writeMessage(withMeta = true)) private lazy val preparedDeleteMessage = cluster.prepare(writeStatements.deleteMessage) @@ -42,7 +42,7 @@ trait DirectWriting extends BeforeAndAfterAll { val now = Uuids.unixTimestamp(nowUuid) val serManifest = Serializers.manifestFor(serializer, persistent) - val bs = preparedWriteMessage + var bs = preparedWriteMessage .bind() .setString("persistence_id", persistent.persistenceId) .setLong("partition_nr", partitionNr) @@ -53,6 +53,20 @@ trait DirectWriting extends BeforeAndAfterAll { .setString("ser_manifest", serManifest) .setString("event_manifest", persistent.manifest) .setByteBuffer("event", serialized) + + bs = persistent.metadata match { + case Some(meta) => + val metaPayload = meta.asInstanceOf[AnyRef] + val metaSerializer = serialization.findSerializerFor(metaPayload) + val metaSerialized = ByteBuffer.wrap(serialization.serialize(metaPayload).get) + val metaSerializedManifest = Serializers.manifestFor(metaSerializer, metaPayload) + bs.setString("meta_ser_manifest", metaSerializedManifest) + .setInt("meta_ser_id", metaSerializer.identifier) + .setByteBuffer("meta", metaSerialized) + case _ => + bs + } + cluster.execute(bs) system.log.debug("Directly wrote payload [{}] for entity [{}]", persistent.payload, persistent.persistenceId) } diff --git a/core/src/test/scala/akka/persistence/cassandra/query/EventsByPersistenceIdSpec.scala b/core/src/test/scala/akka/persistence/cassandra/query/EventsByPersistenceIdSpec.scala index f300e7f9d..ef02f612e 100644 --- a/core/src/test/scala/akka/persistence/cassandra/query/EventsByPersistenceIdSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/query/EventsByPersistenceIdSpec.scala @@ -284,6 +284,14 @@ class EventsByPersistenceIdSpec extends CassandraSpec(EventsByPersistenceIdSpec. } } } + + "return metadata" in { + val meta = "cats" + val pr1 = PersistentRepr("e1", 1L, "with-meta", "").withMetadata(meta) + writeTestEvent(pr1) + val src = queries.currentEventsByPersistenceId("with-meta", 0L, Long.MaxValue) + src.map(_.eventMetadata).runWith(TestSink.probe[Any]).request(2).expectNext(Some(meta)).expectComplete() + } } "Cassandra live query EventsByPersistenceId" must {