Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' into r-0.9
Browse files Browse the repository at this point in the history
  • Loading branch information
volkerstampa committed Apr 3, 2017
2 parents 533e05f + 6495478 commit 9685476
Show file tree
Hide file tree
Showing 21 changed files with 558 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ object DurableEventSerializerSpec {
localLogId = "p3",
localSequenceNr = 17L,
deliveryId = Some("x"),
persistOnEventSequenceNr = Some(12L))
persistOnEventSequenceNr = Some(12L),
persistOnEventId = Some(EventId("p4", 0L)))
}

class DurableEventSerializerSpec extends WordSpec with Matchers with Inside with BeforeAndAfterAll {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ object SnapshotSerializerSpec {
DeliveryAttempt("4", payload, destination))

def persistOnEventRequests(payload: Any) = Vector(
PersistOnEventRequest(7L, Vector(PersistOnEventInvocation(payload, Set("a"))), 17),
PersistOnEventRequest(8L, Vector(PersistOnEventInvocation(payload, Set("b"))), 17))
PersistOnEventRequest(7L, None, Vector(PersistOnEventInvocation(payload, Set("a"))), 17),
PersistOnEventRequest(8L, Some(EventId("p-a", 3L)), Vector(PersistOnEventInvocation(payload, Set("b"))), 17))

def snapshot(payload: Any, destination: ActorPath) =
Snapshot(payload, "x", last(payload), vectorTime(17, 18), event.localSequenceNr,
Expand Down
6 changes: 6 additions & 0 deletions eventuate-core/src/main/protobuf/DurableEventFormats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,10 @@ message DurableEventFormat {
optional int64 localSequenceNr = 9;
optional int64 persistOnEventSequenceNr = 10;
optional string deliveryId = 11;
optional EventIdFormat persistOnEventId = 12;
}

message EventIdFormat {
optional string processId = 1;
optional int64 sequenceNr = 2;
}
3 changes: 2 additions & 1 deletion eventuate-core/src/main/protobuf/SnapshotFormats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ message DeliveryAttemptFormat {
}

message PersistOnEventRequestFormat {
optional int64 persistOnEventSequenceNr = 1;
optional int64 sequenceNr = 1;
repeated PersistOnEventInvocationFormat invocation = 2;
optional int32 instanceId = 3;
optional EventIdFormat eventId = 4;
}

message PersistOnEventInvocationFormat {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ package com.rbmhtechnology.eventuate

import scala.collection.immutable.Seq

/**
* Unique id of a [[DurableEvent]].
*
* This is a stable id of an event across all replicated logs.
*
* @param processId the id of the event log the initially wrote the event.
* @param sequenceNr the initial sequence number in this log.
*/
case class EventId(processId: String, sequenceNr: Long)

/**
* Provider API.
*
Expand All @@ -43,7 +53,14 @@ import scala.collection.immutable.Seq
* @param deliveryId Delivery id chosen by an application that persisted this event with [[ConfirmedDelivery.persistConfirmation]].
* @param persistOnEventSequenceNr Sequence number of the event that caused the emission of this event in an event handler.
* Defined if an [[EventsourcedActor]] with a [[PersistOnEvent]] mixin emitted this event
* with `persistOnEvent`.
* with `persistOnEvent`. Actually superseded by `persistOnEventId`, but still
* has to be maintained for backwards compatibility. It is required for confirmation
* of old [[com.rbmhtechnology.eventuate.PersistOnEvent.PersistOnEventRequest]]s from
* a snapshot that do not have [[com.rbmhtechnology.eventuate.PersistOnEvent.PersistOnEventRequest.persistOnEventId]]
* set.
* @param persistOnEventId event id of the event that caused the emission of this event in an event handler.
* Defined if an [[EventsourcedActor]] with a [[PersistOnEvent]] mixin emitted this event
* with `persistOnEvent`.
*/
case class DurableEvent(
payload: Any,
Expand All @@ -56,15 +73,16 @@ case class DurableEvent(
localLogId: String = DurableEvent.UndefinedLogId,
localSequenceNr: Long = DurableEvent.UndefinedSequenceNr,
deliveryId: Option[String] = None,
persistOnEventSequenceNr: Option[Long] = None) {
persistOnEventSequenceNr: Option[Long] = None,
persistOnEventId: Option[EventId] = None) {

import DurableEvent._

/**
* Unique event identifier.
*/
def id: VectorTime =
vectorTimestamp
val id: EventId =
EventId(processId, vectorTimestamp.localTime(processId))

/**
* Returns `true` if this event did not happen before or at the given `vectorTime`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,12 @@ trait EventsourcedActor extends EventsourcedView with EventsourcedVersion {
messageStash.unstash()
}
}
case PersistOnEventRequest(persistOnEventSequenceNr: Long, invocations, iid) => if (iid == instanceId) {
case PersistOnEventRequest(persistOnEventSequenceNr, persistOnEventId, invocations, iid) => if (iid == instanceId) {
writeOrDelay {
writeHandlers = Vector.fill(invocations.length)(PersistOnEvent.DefaultHandler)
writeRequests = invocations.map {
case PersistOnEventInvocation(event, customDestinationAggregateIds) =>
durableEvent(event, customDestinationAggregateIds, None, Some(persistOnEventSequenceNr))
durableEvent(event, customDestinationAggregateIds, None, Some(persistOnEventSequenceNr), persistOnEventId)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,16 @@ trait EventsourcedVersion extends EventsourcedView {
* Internal API.
*/
private[eventuate] def durableEvent(payload: Any, customDestinationAggregateIds: Set[String],
deliveryId: Option[String] = None, persistOnEventSequenceNr: Option[Long] = None): DurableEvent =
deliveryId: Option[String] = None, persistOnEventSequenceNr: Option[Long] = None, persistOnEventId: Option[EventId] = None): DurableEvent =
DurableEvent(
payload = payload,
emitterId = id,
emitterAggregateId = aggregateId,
customDestinationAggregateIds = customDestinationAggregateIds,
vectorTimestamp = currentVersion,
deliveryId = deliveryId,
persistOnEventSequenceNr = persistOnEventSequenceNr)
persistOnEventSequenceNr = persistOnEventSequenceNr,
persistOnEventId = persistOnEventId)

/**
* Internal API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,29 @@
package com.rbmhtechnology.eventuate

import scala.collection.immutable.SortedMap

import com.rbmhtechnology.eventuate.EventsourcedView.Handler

import scala.util._

object PersistOnEvent {
private[eventuate] object PersistOnEvent {
/**
* Records a `persistOnEvent` invocation.
*/
case class PersistOnEventInvocation(event: Any, customDestinationAggregateIds: Set[String])

/**
* A request sent by [[PersistOnEvent]] instances to `self` in order to persist events recorded by `invocations`.
* @param persistOnEventSequenceNr the sequence number of the event that caused this request.
* @param persistOnEventId [[EventId]] of the event that caused this request. This is optional for backwards
* compatibility, as old snapshots might contain `PersistOnEventRequest`s
* without this field being defined.
*/
case class PersistOnEventRequest(persistOnEventSequenceNr: Long, invocations: Vector[PersistOnEventInvocation], instanceId: Int)
case class PersistOnEventRequest(persistOnEventSequenceNr: Long, persistOnEventId: Option[EventId], invocations: Vector[PersistOnEventInvocation], instanceId: Int)

/**
* Default `persist` handler to use when processing [[PersistOnEventRequest]]s in [[EventsourcedActor]].
*/
private[eventuate] val DefaultHandler: Handler[Any] = {
val DefaultHandler: Handler[Any] = {
case Success(_) =>
case Failure(e) => throw new PersistOnEventException(e)
}
Expand All @@ -61,7 +64,22 @@ trait PersistOnEvent extends EventsourcedActor {
import PersistOnEvent._

private var invocations: Vector[PersistOnEventInvocation] = Vector.empty
private var requests: SortedMap[Long, PersistOnEventRequest] = SortedMap.empty
/**
* [[PersistOnEventRequest]] by sequence number of the event that caused the persist on event request.
*
* This map keeps the requests in the order they were submitted.
*/
private var requestsBySequenceNr: SortedMap[Long, PersistOnEventRequest] = SortedMap.empty

/**
* [[PersistOnEventRequest]] by [[EventId]] of the event that caused the persist on event request.
*
* This map ensures that requests can be confirmed properly even if the sequence number of the event
* that caused the request changed its local sequence number due to a disaster recovery.
*
* @see https://github.com/RBMHTechnology/eventuate/issues/385
*/
private var requestsByEventId: Map[EventId, PersistOnEventRequest] = Map.empty

/**
* Asynchronously persists the given `event`. Applications that want to handle the persisted event should define
Expand All @@ -77,13 +95,10 @@ trait PersistOnEvent extends EventsourcedActor {
*/
override private[eventuate] def receiveEvent(event: DurableEvent): Unit = {
super.receiveEvent(event)

event.persistOnEventSequenceNr.foreach { persistOnEventSequenceNr =>
if (event.emitterId == id) confirmRequest(persistOnEventSequenceNr)
}
if (event.emitterId == id) findPersistOnEventRequest(event).foreach(confirmRequest)

if (invocations.nonEmpty) {
deliverRequest(PersistOnEventRequest(lastSequenceNr, invocations, instanceId))
deliverRequest(PersistOnEventRequest(lastSequenceNr, Some(lastHandledEvent.id), invocations, instanceId))
invocations = Vector.empty
}
}
Expand All @@ -92,7 +107,7 @@ trait PersistOnEvent extends EventsourcedActor {
* Internal API.
*/
override private[eventuate] def snapshotCaptured(snapshot: Snapshot): Snapshot = {
requests.values.foldLeft(super.snapshotCaptured(snapshot)) {
requestsBySequenceNr.values.foldLeft(super.snapshotCaptured(snapshot)) {
case (s, pr) => s.addPersistOnEventRequest(pr)
}
}
Expand All @@ -103,7 +118,9 @@ trait PersistOnEvent extends EventsourcedActor {
override private[eventuate] def snapshotLoaded(snapshot: Snapshot): Unit = {
super.snapshotLoaded(snapshot)
snapshot.persistOnEventRequests.foreach { pr =>
requests = requests + (pr.persistOnEventSequenceNr -> pr.copy(instanceId = instanceId))
val requestWithUpdatedInstanceId = pr.copy(instanceId = instanceId)
requestsBySequenceNr += (pr.persistOnEventSequenceNr -> requestWithUpdatedInstanceId)
pr.persistOnEventId.foreach(requestsByEventId += _ -> requestWithUpdatedInstanceId)
}
}

Expand All @@ -119,18 +136,26 @@ trait PersistOnEvent extends EventsourcedActor {
* Internal API.
*/
private[eventuate] def unconfirmedRequests: Set[Long] =
requests.keySet
requestsBySequenceNr.keySet

private def deliverRequest(request: PersistOnEventRequest): Unit = {
requests = requests + (request.persistOnEventSequenceNr -> request)
requestsBySequenceNr += request.persistOnEventSequenceNr -> request
request.persistOnEventId.foreach(requestsByEventId += _ -> request)
if (!recovering) self ! request
}

private def confirmRequest(persistOnEventSequenceNr: Long): Unit = {
requests = requests - persistOnEventSequenceNr
private def confirmRequest(request: PersistOnEventRequest): Unit = {
request.persistOnEventId.foreach(requestsByEventId -= _)
requestsBySequenceNr -= request.persistOnEventSequenceNr
}

private def redeliverUnconfirmedRequests(): Unit = requests.foreach {
private def findPersistOnEventRequest(event: DurableEvent) =
event
.persistOnEventId.flatMap(requestsByEventId.get)
// Fallback for old events that have no persistOnEventId
.orElse(event.persistOnEventSequenceNr.flatMap(requestsBySequenceNr.get))

private def redeliverUnconfirmedRequests(): Unit = requestsBySequenceNr.foreach {
case (_, request) => self ! request
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ abstract class DurableEventSerializer(
durableEvent.persistOnEventSequenceNr.foreach { persistOnEventSequenceNr =>
builder.setPersistOnEventSequenceNr(persistOnEventSequenceNr)
}
durableEvent.persistOnEventId.foreach { persistOnEventId =>
builder.setPersistOnEventId(eventIdFormatBuilder(persistOnEventId))
}

durableEvent.emitterAggregateId.foreach { id =>
builder.setEmitterAggregateId(id)
Expand All @@ -114,6 +117,13 @@ abstract class DurableEventSerializer(
builder
}

def eventIdFormatBuilder(eventId: EventId) = {
val builder = EventIdFormat.newBuilder()
builder.setProcessId(eventId.processId)
builder.setSequenceNr(eventId.sequenceNr)
builder
}

// --------------------------------------------------------------------------------
// fromBinary helpers
// --------------------------------------------------------------------------------
Expand All @@ -128,6 +138,7 @@ abstract class DurableEventSerializer(

val deliveryId = if (durableEventFormat.hasDeliveryId) Some(durableEventFormat.getDeliveryId) else None
val persistOnEventSequenceNr = if (durableEventFormat.hasPersistOnEventSequenceNr) Some(durableEventFormat.getPersistOnEventSequenceNr) else None
val persistOnEventId = if (durableEventFormat.hasPersistOnEventId) Some(eventId(durableEventFormat.getPersistOnEventId)) else None

DurableEvent(
payload = payloadSerializer.payload(durableEventFormat.getPayload),
Expand All @@ -140,6 +151,10 @@ abstract class DurableEventSerializer(
localLogId = durableEventFormat.getLocalLogId,
localSequenceNr = durableEventFormat.getLocalSequenceNr,
deliveryId = deliveryId,
persistOnEventSequenceNr = persistOnEventSequenceNr)
persistOnEventSequenceNr = persistOnEventSequenceNr,
persistOnEventId = persistOnEventId)
}

def eventId(eventId: EventIdFormat) =
EventId(eventId.getProcessId, eventId.getSequenceNr)
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer {

private def persistOnEventRequestFormatBuilder(persistOnEventRequest: PersistOnEventRequest): PersistOnEventRequestFormat.Builder = {
val builder = PersistOnEventRequestFormat.newBuilder
builder.setPersistOnEventSequenceNr(persistOnEventRequest.persistOnEventSequenceNr)
builder.setSequenceNr(persistOnEventRequest.persistOnEventSequenceNr)
persistOnEventRequest.persistOnEventId.foreach { eventId =>
builder.setEventId(eventSerializer.eventIdFormatBuilder(eventId))
}
builder.setInstanceId(persistOnEventRequest.instanceId)

persistOnEventRequest.invocations.foreach { invocation =>
Expand Down Expand Up @@ -191,8 +194,11 @@ class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer {
invocationsBuilder += persistOnEventInvocation(pif)
}

val persistOnEventReference = if (persistOnEventRequestFormat.hasEventId) Some(eventSerializer.eventId(persistOnEventRequestFormat.getEventId)) else None

PersistOnEventRequest(
persistOnEventRequestFormat.getPersistOnEventSequenceNr,
persistOnEventRequestFormat.getSequenceNr,
persistOnEventReference,
invocationsBuilder.result(),
persistOnEventRequestFormat.getInstanceId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ protected DurableEvent createEvent(final Object payload, final long sequenceNr)
@SuppressWarnings("unchecked")
protected DurableEvent createEvent(final Object payload, final long sequenceNr, final String emitterId, final String logId, final VectorTime timestamp) {
return new DurableEvent(payload, emitterId, Option.empty(), Set$.MODULE$.<String>empty(), 0L,
timestamp, logId, logId, sequenceNr, Option.empty(), Option.empty());
timestamp, logId, logId, sequenceNr, Option.empty(), Option.empty(), Option.empty());
}

protected VectorTime timestamp(final long a, final long b) {
Expand Down
Loading

0 comments on commit 9685476

Please sign in to comment.