Skip to content

Commit

Permalink
Configuring persistence plugins at runtime for EventSourcedProvider (#…
Browse files Browse the repository at this point in the history
…225)

* Add runtime journal config to tag query of event sourced provider

* Add runtime journal config to slice query of event sourced provider

* Fix imports

* Add header

* Fix compile error

* Add logback config to test scope of eventsourced module

* Fix compilation on Scala 2.12

* Add java API for events by slices with runtime config

* test with pekko 1.1.3-RC1

* Update PekkoCoreDependency.scala

* Update build.sbt

---------

Co-authored-by: PJ Fanning <pjfanning@users.noreply.github.com>
  • Loading branch information
ptrdom and pjfanning authored Jan 9, 2025
1 parent 33c22a9 commit 3dc9f5a
Show file tree
Hide file tree
Showing 6 changed files with 319 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.function.Supplier

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import com.typesafe.config.Config
import org.apache.pekko
import pekko.NotUsed
import pekko.actor.typed.ActorSystem
Expand Down Expand Up @@ -56,6 +56,18 @@ object EventSourcedProvider {
new EventsByTagSourceProvider(system, eventsByTagQuery, tag)
}

def eventsByTag[Event](
system: ActorSystem[_],
readJournalPluginId: String,
readJournalConfig: Config,
tag: String): SourceProvider[Offset, EventEnvelope[Event]] = {

val eventsByTagQuery =
PersistenceQuery(system).getReadJournalFor(classOf[EventsByTagQuery], readJournalPluginId, readJournalConfig)

new EventsByTagSourceProvider(system, eventsByTagQuery, tag)
}

/**
* INTERNAL API
*/
Expand Down Expand Up @@ -105,6 +117,30 @@ object EventSourcedProvider {
new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType, minSlice, maxSlice, system)
}

def eventsBySlices[Event](
system: ActorSystem[_],
readJournalPluginId: String,
readJournalConfig: Config,
entityType: String,
minSlice: Int,
maxSlice: Int): SourceProvider[Offset, pekko.persistence.query.typed.EventEnvelope[Event]] = {

val eventsBySlicesQuery =
PersistenceQuery(system).getReadJournalFor(classOf[EventsBySliceQuery], readJournalPluginId, readJournalConfig)

if (!eventsBySlicesQuery.isInstanceOf[EventTimestampQuery])
throw new IllegalArgumentException(
s"[${eventsBySlicesQuery.getClass.getName}] with readJournalPluginId " +
s"[$readJournalPluginId] must implement [${classOf[EventTimestampQuery].getName}]")

if (!eventsBySlicesQuery.isInstanceOf[LoadEventQuery])
throw new IllegalArgumentException(
s"[${eventsBySlicesQuery.getClass.getName}] with readJournalPluginId " +
s"[$readJournalPluginId] must implement [${classOf[LoadEventQuery].getName}]")

new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType, minSlice, maxSlice, system)
}

def sliceForPersistenceId(system: ActorSystem[_], readJournalPluginId: String, persistenceId: String): Int =
PersistenceQuery(system)
.getReadJournalFor(classOf[EventsBySliceQuery], readJournalPluginId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import java.time.Instant
import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import com.typesafe.config.Config
import org.apache.pekko
import pekko.NotUsed
import pekko.actor.typed.ActorSystem
Expand Down Expand Up @@ -49,6 +49,18 @@ object EventSourcedProvider {
new EventsByTagSourceProvider(eventsByTagQuery, tag, system)
}

def eventsByTag[Event](
system: ActorSystem[_],
readJournalPluginId: String,
readJournalConfig: Config,
tag: String): SourceProvider[Offset, EventEnvelope[Event]] = {

val eventsByTagQuery =
PersistenceQuery(system).readJournalFor[EventsByTagQuery](readJournalPluginId, readJournalConfig)

new EventsByTagSourceProvider(eventsByTagQuery, tag, system)
}

private class EventsByTagSourceProvider[Event](
eventsByTagQuery: EventsByTagQuery,
tag: String,
Expand Down Expand Up @@ -81,6 +93,19 @@ object EventSourcedProvider {
new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType, minSlice, maxSlice, system)
}

def eventsBySlices[Event](
system: ActorSystem[_],
readJournalPluginId: String,
readJournalConfig: Config,
entityType: String,
minSlice: Int,
maxSlice: Int): SourceProvider[Offset, pekko.persistence.query.typed.EventEnvelope[Event]] = {
val eventsBySlicesQuery =
PersistenceQuery(system).readJournalFor[EventsBySliceQuery](readJournalPluginId, readJournalConfig)

new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType, minSlice, maxSlice, system)
}

def sliceForPersistenceId(system: ActorSystem[_], readJournalPluginId: String, persistenceId: String): Int =
PersistenceQuery(system)
.readJournalFor[EventsBySliceQuery](readJournalPluginId)
Expand Down
20 changes: 20 additions & 0 deletions eventsourced/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<!-- Silence initial setup logging from Logback -->
<statusListener class="ch.qos.logback.core.status.NopStatusListener" />
<appender name="STDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%date{ISO8601}] [%level] [%logger] [%marker] [%thread] - %msg MDC: {%mdc}%n</pattern>
</encoder>
</appender>

<appender name="CapturingAppender" class="org.apache.pekko.actor.testkit.typed.internal.CapturingAppender" />

<logger name="org.apache.pekko.actor.testkit.typed.internal.CapturingAppenderDelegate" >
<appender-ref ref="STDOUT"/>
</logger>

<root level="DEBUG">
<appender-ref ref="CapturingAppender" />
</root>
</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.projection.eventsourced.scaldsl

import scala.collection.immutable.Seq
import scala.concurrent.Future
import com.typesafe.config.ConfigFactory
import org.apache.pekko
import pekko.Done
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.typed.ActorRef
import pekko.persistence.Persistence
import pekko.persistence.query.NoOffset
import pekko.persistence.testkit.PersistenceTestKitPlugin
import pekko.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import pekko.persistence.typed.PersistenceId
import pekko.persistence.typed.scaladsl.Effect
import pekko.persistence.typed.scaladsl.EventSourcedBehavior
import pekko.projection.eventsourced.scaladsl.EventSourcedProvider
import pekko.stream.testkit.scaladsl.TestSink
import org.scalatest.freespec.AnyFreeSpecLike

object EventSourcedProviderSpec {
private val config = PersistenceTestKitPlugin.config.withFallback(
ConfigFactory.parseString("""
pekko.loglevel = DEBUG
pekko.persistence.testkit.events.serialize = off
"""))

private case class Command(event: String, ack: ActorRef[Done])
private case class State()

private def testBehaviour(persistenceId: String, tags: Set[String], maybeJournal: Option[String]) = {
val behavior = EventSourcedBehavior[Command, String, State](
PersistenceId.ofUniqueId(persistenceId),
State(),
(_, command) =>
Effect.persist(command.event).thenRun { _ =>
command.ack ! Done
},
(state, _) => state)
.withTagger(_ => tags)
maybeJournal.fold(
behavior
)(journal =>
behavior
.withJournalPluginId(s"$journal.journal")
.withJournalPluginConfig(Some(journalConfig(journal))))
}

private def journalConfig(journal: String) = {
ConfigFactory.parseString(s"""
$journal {
journal.class = "${classOf[PersistenceTestKitPlugin].getName}"
query = $${pekko.persistence.testkit.query}
}
""").withFallback(ConfigFactory.load()).resolve()
}

private val entityType = "Test"
private def makeFullPersistenceId(persistenceId: String) = {
s"$entityType|$persistenceId"
}
}

class EventSourcedProviderSpec
extends ScalaTestWithActorTestKit(EventSourcedProviderSpec.config)
with LogCapturing
with AnyFreeSpecLike {
import EventSourcedProviderSpec._

implicit private val classic: pekko.actor.ActorSystem = system.classicSystem
private lazy val persistence = Persistence(system)
private lazy val numberOfSlices = persistence.numberOfSlices

private def setup(persistenceId: String, tags: Set[String] = Set.empty, maybeJournal: Option[String] = None) = {
val probe = createTestProbe[Done]()
val ref = spawn(testBehaviour(persistenceId, tags, maybeJournal))
def makeEvent(event: String) = {
Seq(Some(s"$persistenceId-$event"), maybeJournal).flatten.mkString("-")
}
ref ! Command(makeEvent("event-1"), probe.ref)
ref ! Command(makeEvent("event-2"), probe.ref)
ref ! Command(makeEvent("event-3"), probe.ref)
probe.expectMessage(Done)
}

private def assertTag(tag: String, expectedEvents: Seq[String], maybeJournal: Option[String] = None) = {
maybeJournal
.fold(
EventSourcedProvider
.eventsByTag[String](
system,
PersistenceTestKitReadJournal.Identifier,
tag
)
)(journal =>
EventSourcedProvider
.eventsByTag[String](
system,
s"$journal.query",
journalConfig(journal),
tag
))
.source(() => Future.successful(Some(NoOffset)))
.futureValue
.map(_.event)
.runWith(TestSink.probe)
.request(expectedEvents.size)
.expectNextN(expectedEvents)
.request(1)
.expectNoMessage()
}

private def assertSlices(minSlice: Int, maxSlice: Int, expectedEvents: Seq[String],
maybeJournal: Option[String] = None) = {
maybeJournal
.fold(
EventSourcedProvider
.eventsBySlices[String](
system,
PersistenceTestKitReadJournal.Identifier,
entityType,
minSlice,
maxSlice
)
)(journal =>
EventSourcedProvider
.eventsBySlices[String](
system,
s"$journal.query",
journalConfig(journal),
entityType,
minSlice,
maxSlice
))
.source(() => Future.successful(Some(NoOffset)))
.futureValue
.map(_.event)
.runWith(TestSink.probe)
.request(expectedEvents.size)
.expectNextN(expectedEvents)
.request(1)
.expectNoMessage()
}

"Should provide different events" - {
"by tags" - {
"for different tags" in {
val persistenceId1 = "a-id-1"
val persistenceId2 = "a-id-2"
val tag1 = "a-tag-1"
val tag2 = "a-tag-2"

setup(persistenceId1, Set(tag1))
setup(persistenceId2, Set(tag2))

assertTag(tag1, Seq(s"$persistenceId1-event-1", s"$persistenceId1-event-2", s"$persistenceId1-event-3"))
assertTag(tag2, Seq(s"$persistenceId2-event-1", s"$persistenceId2-event-2", s"$persistenceId2-event-3"))
}

"for different journals" in {
val persistenceId1 = "b-id-1"
val tag1 = "b-tag-1"
val journal1 = "b-journal-1"
val journal2 = "b-journal-2"

setup(persistenceId1, Set(tag1), Some(journal1))
setup(persistenceId1, Set(tag1), Some(journal2))

val expectedEvents = Seq(s"$persistenceId1-event-1", s"$persistenceId1-event-2", s"$persistenceId1-event-3")
assertTag(tag1, expectedEvents.map(_ + s"-$journal1"), Some(journal1))
assertTag(tag1, expectedEvents.map(_ + s"-$journal2"), Some(journal2))
}
}

"by slices" - {
"for different slices" in {
val persistenceId1 = makeFullPersistenceId("c-id-1")
val persistenceId2 = makeFullPersistenceId("c-id-2")

val slice1 = persistence.sliceForPersistenceId(persistenceId1)
val slice2 = persistence.sliceForPersistenceId(persistenceId2)
slice1 should not be slice2

setup(persistenceId1)
setup(persistenceId2)

val expectedEvents1 = Seq(s"$persistenceId1-event-1", s"$persistenceId1-event-2", s"$persistenceId1-event-3")
val expectedEvents2 = Seq(s"$persistenceId2-event-1", s"$persistenceId2-event-2", s"$persistenceId2-event-3")
assertSlices(0, numberOfSlices - 1, expectedEvents1 ++ expectedEvents2)
assertSlices(slice1, slice1, expectedEvents1)
assertSlices(slice2, slice2, expectedEvents2)
}

"for different journals" in {
val persistenceId1 = makeFullPersistenceId("d-id-1")
val journal1 = "d-journal-1"
val journal2 = "d-journal-2"

setup(persistenceId1, maybeJournal = Some(journal1))
setup(persistenceId1, maybeJournal = Some(journal2))

val expectedEvents = Seq(s"$persistenceId1-event-1", s"$persistenceId1-event-2", s"$persistenceId1-event-3")
assertSlices(0, numberOfSlices - 1, expectedEvents.map(_ + s"-$journal1"), maybeJournal = Some(journal1))
assertSlices(0, numberOfSlices - 1, expectedEvents.map(_ + s"-$journal2"), maybeJournal = Some(journal2))
}
}
}
}
10 changes: 9 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ object Dependencies {
val h2Driver = "com.h2database" % "h2" % Versions.h2Driver
}

object TestNonIt {
val persistenceTestkit = "org.apache.pekko" %% "pekko-persistence-testkit" % Versions.pekko % "test"

val scalatest = "org.scalatest" %% "scalatest" % Versions.scalaTest % "test"

val logback = "ch.qos.logback" % "logback-classic" % Versions.logback % "test"
}

object Test {
private val allTestConfig = "test,it"

Expand Down Expand Up @@ -140,7 +148,7 @@ object Dependencies {
Test.logback)

val eventsourced =
deps ++= Seq(Compile.pekkoPersistenceQuery)
deps ++= Seq(Compile.pekkoPersistenceQuery, TestNonIt.persistenceTestkit, TestNonIt.scalatest, TestNonIt.logback)

val state =
deps ++= Seq(Compile.pekkoPersistenceQuery, Test.persistenceTestkit, Test.pekkoStreamTestkit, Test.scalatest)
Expand Down
2 changes: 1 addition & 1 deletion project/PekkoCoreDependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
object PekkoCoreDependency extends PekkoDependency {
override val checkProject: String = "pekko-cluster-sharding-typed"
override val module: Option[String] = None
override val currentVersion: String = "1.1.2"
override val currentVersion: String = "1.1.3"
}

0 comments on commit 3dc9f5a

Please sign in to comment.