Skip to content

Commit

Permalink
Set metadata on events by persisence id query (#814)
Browse files Browse the repository at this point in the history
  • Loading branch information
chbatey authored Sep 11, 2020
1 parent 5be6ee0 commit 64ff267
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,13 @@ class CassandraReadJournal protected (

private def toEventEnvelope(persistentRepr: PersistentRepr, offset: Offset): immutable.Iterable[EventEnvelope] =
adaptFromJournal(persistentRepr).map { payload =>
EventEnvelope(offset, persistentRepr.persistenceId, persistentRepr.sequenceNr, payload, timestampFrom(offset))
EventEnvelope(
offset,
persistentRepr.persistenceId,
persistentRepr.sequenceNr,
payload,
timestampFrom(offset),
persistentRepr.metadata)
}

private def offsetToInternalOffset(offset: Offset): (UUID, Boolean) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ trait DirectWriting extends BeforeAndAfterAll {

private lazy val writeStatements: CassandraJournalStatements = new CassandraJournalStatements(settings)

private lazy val preparedWriteMessage = cluster.prepare(writeStatements.writeMessage(withMeta = false))
private lazy val preparedWriteMessage = cluster.prepare(writeStatements.writeMessage(withMeta = true))

private lazy val preparedDeleteMessage = cluster.prepare(writeStatements.deleteMessage)

Expand All @@ -42,7 +42,7 @@ trait DirectWriting extends BeforeAndAfterAll {
val now = Uuids.unixTimestamp(nowUuid)
val serManifest = Serializers.manifestFor(serializer, persistent)

val bs = preparedWriteMessage
var bs = preparedWriteMessage
.bind()
.setString("persistence_id", persistent.persistenceId)
.setLong("partition_nr", partitionNr)
Expand All @@ -53,6 +53,20 @@ trait DirectWriting extends BeforeAndAfterAll {
.setString("ser_manifest", serManifest)
.setString("event_manifest", persistent.manifest)
.setByteBuffer("event", serialized)

bs = persistent.metadata match {
case Some(meta) =>
val metaPayload = meta.asInstanceOf[AnyRef]
val metaSerializer = serialization.findSerializerFor(metaPayload)
val metaSerialized = ByteBuffer.wrap(serialization.serialize(metaPayload).get)
val metaSerializedManifest = Serializers.manifestFor(metaSerializer, metaPayload)
bs.setString("meta_ser_manifest", metaSerializedManifest)
.setInt("meta_ser_id", metaSerializer.identifier)
.setByteBuffer("meta", metaSerialized)
case _ =>
bs
}

cluster.execute(bs)
system.log.debug("Directly wrote payload [{}] for entity [{}]", persistent.payload, persistent.persistenceId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,14 @@ class EventsByPersistenceIdSpec extends CassandraSpec(EventsByPersistenceIdSpec.
}
}
}

"return metadata" in {
val meta = "cats"
val pr1 = PersistentRepr("e1", 1L, "with-meta", "").withMetadata(meta)
writeTestEvent(pr1)
val src = queries.currentEventsByPersistenceId("with-meta", 0L, Long.MaxValue)
src.map(_.eventMetadata).runWith(TestSink.probe[Any]).request(2).expectNext(Some(meta)).expectComplete()
}
}

"Cassandra live query EventsByPersistenceId" must {
Expand Down

0 comments on commit 64ff267

Please sign in to comment.