Skip to content

Commit

Permalink
Replicated event sourcing support (#811)
Browse files Browse the repository at this point in the history
* Add support for meta data for Replicated Event Sourcing
* Remove the old commercial metadata support for events
  • Loading branch information
johanandren authored Sep 10, 2020
1 parent 3e97af4 commit 5be6ee0
Show file tree
Hide file tree
Showing 14 changed files with 123 additions and 228 deletions.

This file was deleted.

23 changes: 14 additions & 9 deletions core/src/main/scala/akka/persistence/cassandra/Extractors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,20 @@ import akka.persistence.query.TimeBasedUUID
implicit ec: ExecutionContext): Future[PersistentRepr] = {

def deserializeEvent(): Future[PersistentRepr] = {
ed.deserializeEvent(row, async).map { payload =>
PersistentRepr(
payload,
sequenceNr = row.getLong("sequence_nr"),
persistenceId = row.getString("persistence_id"),
manifest = row.getString("event_manifest"), // manifest for event adapters
deleted = false,
sender = null,
writerUuid = row.getString("writer_uuid"))
ed.deserializeEvent(row, async).map {
case DeserializedEvent(payload, metadata) =>
val repr = PersistentRepr(
payload,
sequenceNr = row.getLong("sequence_nr"),
persistenceId = row.getString("persistence_id"),
manifest = row.getString("event_manifest"), // manifest for event adapters
deleted = false,
sender = null,
writerUuid = row.getString("writer_uuid"))
metadata match {
case OptionVal.None => repr
case OptionVal.Some(m) => repr.withMetadata(m)
}
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import akka.annotation.InternalApi
import akka.event.{ Logging, LoggingAdapter }
import akka.pattern.pipe
import akka.persistence._
import akka.persistence.cassandra.EventWithMetaData.UnknownMetaData
import akka.persistence.cassandra._
import akka.persistence.cassandra.Extractors
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
Expand All @@ -26,6 +25,7 @@ import akka.persistence.cassandra.journal.TagWriter.TagProgress
import akka.serialization.{ AsyncSerializer, Serialization, SerializationExtension }
import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessionRegistry }
import akka.stream.scaladsl.Sink
import akka.dispatch.ExecutionContexts
import akka.util.OptionVal
import com.datastax.oss.driver.api.core.cql._
import com.typesafe.config.Config
Expand Down Expand Up @@ -847,12 +847,16 @@ import akka.stream.scaladsl.Source
case object HealthCheckQuery extends HealthCheck
case object HealthCheckResponse extends HealthCheck

final case class DeserializedEvent(event: Any, meta: OptionVal[Any])

class EventDeserializer(system: ActorSystem) {

private val log = Logging(system, this.getClass)

private val serialization = SerializationExtension(system)
val columnDefinitionCache = new ColumnDefinitionCache

def deserializeEvent(row: Row, async: Boolean)(implicit ec: ExecutionContext): Future[Any] =
def deserializeEvent(row: Row, async: Boolean)(implicit ec: ExecutionContext): Future[DeserializedEvent] =
try {

def meta: OptionVal[AnyRef] = {
Expand All @@ -864,14 +868,19 @@ import akka.stream.scaladsl.Source
// has meta data, wrap in EventWithMetaData
val metaSerId = row.getInt("meta_ser_id")
val metaSerManifest = row.getString("meta_ser_manifest")
val meta = serialization.deserialize(Bytes.getArray(metaBytes), metaSerId, metaSerManifest) match {
case Success(m) => m
case Failure(_) =>
// don't fail replay/query because of deserialization problem with meta data
// see motivation in UnknownMetaData
UnknownMetaData(metaSerId, metaSerManifest)
serialization.deserialize(Bytes.getArray(metaBytes), metaSerId, metaSerManifest) match {
case Success(m) => OptionVal.Some(m)
case Failure(ex) =>
log.warning(
"Deserialization of event metadata failed (pid: [{}], seq_nr: [{}], meta_ser_id: [{}], meta_ser_manifest: [{}], ignoring metadata content. Exception: {}",
Array(
row.getString("persistence_id"),
row.getLong("sequence_nr"),
metaSerId,
metaSerManifest,
ex.toString))
OptionVal.None
}
OptionVal.Some(meta)
}
} else {
// for backwards compatibility, when table was not altered, meta columns not added
Expand All @@ -883,30 +892,20 @@ import akka.stream.scaladsl.Source
val serId = row.getInt("ser_id")
val manifest = row.getString("ser_manifest")

serialization.serializerByIdentity.get(serId) match {
(serialization.serializerByIdentity.get(serId) match {
case Some(asyncSerializer: AsyncSerializer) =>
Serialization.withTransportInformation(system.asInstanceOf[ExtendedActorSystem]) { () =>
asyncSerializer.fromBinaryAsync(bytes, manifest).map { event =>
meta match {
case OptionVal.None => event
case OptionVal.Some(m) => EventWithMetaData(event, m)
}
}
asyncSerializer.fromBinaryAsync(bytes, manifest)
}

case _ =>
def deserializedEvent: AnyRef = {
def deserializedEvent: AnyRef =
// Serialization.deserialize adds transport info
val event = serialization.deserialize(bytes, serId, manifest).get
meta match {
case OptionVal.None => event
case OptionVal.Some(m) => EventWithMetaData(event, m)
}
}
serialization.deserialize(bytes, serId, manifest).get

if (async) Future(deserializedEvent)
else Future.successful(deserializedEvent)
}
}).map(event => DeserializedEvent(event, meta))(ExecutionContexts.parasitic)

} catch {
case NonFatal(e) => Future.failed(e)
Expand Down
20 changes: 7 additions & 13 deletions core/src/main/scala/akka/persistence/cassandra/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,15 @@ package object cassandra {

def serializeMeta(): Option[SerializedMeta] =
// meta data, if any
p.payload match {
case EventWithMetaData(_, m) =>
val m2 = m.asInstanceOf[AnyRef]
val serializer = serialization.findSerializerFor(m2)
val serManifest = Serializers.manifestFor(serializer, m2)
val metaBuf = ByteBuffer.wrap(serialization.serialize(m2).get)
Some(SerializedMeta(metaBuf, serManifest, serializer.identifier))
case _ => None
p.metadata.map { m =>
val m2 = m.asInstanceOf[AnyRef]
val serializer = serialization.findSerializerFor(m2)
val serManifest = Serializers.manifestFor(serializer, m2)
val metaBuf = ByteBuffer.wrap(serialization.serialize(m2).get)
SerializedMeta(metaBuf, serManifest, serializer.identifier)
}

val event: AnyRef = (p.payload match {
case EventWithMetaData(evt, _) => evt // unwrap
case evt => evt
}).asInstanceOf[AnyRef]

val event: AnyRef = p.payload.asInstanceOf[AnyRef]
val serializer = serialization.findSerializerFor(event)
val serManifest = Serializers.manifestFor(serializer, event)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,20 @@ import akka.stream.ActorAttributes
import akka.util.ByteString
import com.datastax.oss.driver.api.core.cql._
import com.typesafe.config.Config

import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{ Failure, Success }
import scala.util.control.NonFatal

import akka.persistence.cassandra.PluginSettings
import akka.persistence.cassandra.CassandraStatements
import akka.persistence.cassandra.journal.CassandraJournal
import akka.persistence.cassandra.journal.CassandraJournal.DeserializedEvent
import akka.serialization.SerializationExtension
import akka.stream.alpakka.cassandra.CassandraSessionSettings
import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessionRegistry }
import akka.util.OptionVal
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.uuid.Uuids

Expand Down Expand Up @@ -345,16 +348,21 @@ class CassandraReadJournal protected (
Flow[EventsByTagStage.UUIDRow]
.mapAsync(querySettings.deserializationParallelism) { uuidRow =>
val row = uuidRow.row
eventsByTagDeserializer.deserializeEvent(row, deserializeEventAsync).map { payload =>
val repr = mapEvent(PersistentRepr(
payload,
sequenceNr = uuidRow.sequenceNr,
persistenceId = uuidRow.persistenceId,
manifest = row.getString("event_manifest"),
deleted = false,
sender = null,
writerUuid = row.getString("writer_uuid")))
UUIDPersistentRepr(uuidRow.offset, uuidRow.tagPidSequenceNr, repr)
eventsByTagDeserializer.deserializeEvent(row, deserializeEventAsync).map {
case DeserializedEvent(payload, metadata) =>
val repr = mapEvent(PersistentRepr(
payload,
sequenceNr = uuidRow.sequenceNr,
persistenceId = uuidRow.persistenceId,
manifest = row.getString("event_manifest"),
deleted = false,
sender = null,
writerUuid = row.getString("writer_uuid")))
val reprWithMeta = metadata match {
case OptionVal.None => repr
case OptionVal.Some(metadata) => repr.withMetadata(metadata)
}
UUIDPersistentRepr(uuidRow.offset, uuidRow.tagPidSequenceNr, reprWithMeta)
}
}
.withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher))
Expand Down
Loading

0 comments on commit 5be6ee0

Please sign in to comment.