From bef815346e49e33c570010389f9ec1fff3290c10 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 11 Jun 2024 12:17:43 +0100 Subject: [PATCH 1/7] try to support scala3 with slick --- build.sbt | 2 +- project/PekkoCoreDependency.scala | 2 +- .../pekko/projection/slick/SlickProjection.scala | 2 +- .../slick/internal/SlickOffsetStore.scala | 15 +++++++++------ .../slick/internal/SlickProjectionImpl.scala | 2 +- .../projection/slick/SlickOffsetStoreSpec.scala | 2 +- .../projection/slick/SlickProjectionSpec.scala | 2 +- 7 files changed, 15 insertions(+), 12 deletions(-) diff --git a/build.sbt b/build.sbt index a807aac8..e253948d 100644 --- a/build.sbt +++ b/build.sbt @@ -74,7 +74,7 @@ lazy val slick = Project(id = "slick", base = file("slick")) .configs(IntegrationTest.extend(Test)) .enablePlugins(ReproducibleBuildsPlugin) - .settings(crossScalaVersions := Dependencies.Scala2Versions) + .settings(crossScalaVersions := Dependencies.Scala2And3Versions) .settings(headerSettings(IntegrationTest)) .settings(Defaults.itSettings) .settings(Dependencies.slick) diff --git a/project/PekkoCoreDependency.scala b/project/PekkoCoreDependency.scala index 5fd56786..155a8b4b 100644 --- a/project/PekkoCoreDependency.scala +++ b/project/PekkoCoreDependency.scala @@ -21,5 +21,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency object PekkoCoreDependency extends PekkoDependency { override val checkProject: String = "pekko-cluster-sharding-typed" override val module: Option[String] = None - override val currentVersion: String = "1.0.2" + override val currentVersion: String = "1.1.0-M1" } diff --git a/slick/src/main/scala/org/apache/pekko/projection/slick/SlickProjection.scala b/slick/src/main/scala/org/apache/pekko/projection/slick/SlickProjection.scala index 999e9e7c..10e88e16 100644 --- a/slick/src/main/scala/org/apache/pekko/projection/slick/SlickProjection.scala +++ b/slick/src/main/scala/org/apache/pekko/projection/slick/SlickProjection.scala @@ -395,7 +395,7 @@ object SlickProjection { private def createOffsetStore[P <: JdbcProfile: ClassTag](databaseConfig: DatabaseConfig[P])( implicit system: ActorSystem[_]) = - new SlickOffsetStore(system, databaseConfig.db, databaseConfig.profile, SlickSettings(system)) + new SlickOffsetStore(system, databaseConfig, SlickSettings(system)) } @ApiMayChange diff --git a/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala b/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala index 36a98a71..c5e2c407 100644 --- a/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala +++ b/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala @@ -35,6 +35,7 @@ import pekko.projection.jdbc.internal.MySQLDialect import pekko.projection.jdbc.internal.OracleDialect import pekko.projection.jdbc.internal.PostgresDialect import pekko.util.Helpers.toRootLowerCase +import slick.basic.DatabaseConfig import slick.jdbc.JdbcProfile /** @@ -42,22 +43,24 @@ import slick.jdbc.JdbcProfile */ @InternalApi private[projection] class SlickOffsetStore[P <: JdbcProfile]( system: ActorSystem[_], - val db: P#Backend#Database, - val profile: P, + val databaseConfig: DatabaseConfig[P], slickSettings: SlickSettings, clock: Clock) { + + def this(system: ActorSystem[_], databaseConfig: DatabaseConfig[P], slickSettings: SlickSettings) = + this(system, databaseConfig, slickSettings, Clock.systemUTC()) + import OffsetSerialization.MultipleOffsets import OffsetSerialization.SingleOffset - import profile.api._ + import databaseConfig.profile.api._ - def this(system: ActorSystem[_], db: P#Backend#Database, profile: P, slickSettings: SlickSettings) = - this(system, db, profile, slickSettings, Clock.systemUTC()) + private val db = databaseConfig.db val (dialect, useLowerCase): (Dialect, Boolean) = { val useLowerCase = slickSettings.useLowerCase - profile match { + databaseConfig.profile match { case _: slick.jdbc.H2Profile => ( H2Dialect(slickSettings.schema, slickSettings.table, slickSettings.managementTable, useLowerCase), diff --git a/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickProjectionImpl.scala b/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickProjectionImpl.scala index 311add3a..27c46711 100644 --- a/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickProjectionImpl.scala +++ b/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickProjectionImpl.scala @@ -186,7 +186,7 @@ private[projection] class SlickProjectionImpl[Offset, Envelope, P <: JdbcProfile settings) { implicit val executionContext: ExecutionContext = system.executionContext - override val logger: LoggingAdapter = Logging(system.classicSystem, this.getClass) + override val logger: LoggingAdapter = Logging(system.classicSystem, classOf[SlickInternalProjectionState]) override def readPaused(): Future[Boolean] = offsetStore.readManagementState(projectionId).map(_.exists(_.paused)) diff --git a/slick/src/test/scala/org/apache/pekko/projection/slick/SlickOffsetStoreSpec.scala b/slick/src/test/scala/org/apache/pekko/projection/slick/SlickOffsetStoreSpec.scala index 7ea91fd5..15b5e55d 100644 --- a/slick/src/test/scala/org/apache/pekko/projection/slick/SlickOffsetStoreSpec.scala +++ b/slick/src/test/scala/org/apache/pekko/projection/slick/SlickOffsetStoreSpec.scala @@ -107,7 +107,7 @@ abstract class SlickOffsetStoreSpec(specConfig: SlickSpecConfig) private val clock = new TestClock private val offsetStore = - new SlickOffsetStore(system, dbConfig.db, dbConfig.profile, SlickSettings(slickConfig), clock) + new SlickOffsetStore(system, dbConfig, SlickSettings(slickConfig), clock) override protected def beforeAll(): Unit = { // create offset table diff --git a/slick/src/test/scala/org/apache/pekko/projection/slick/SlickProjectionSpec.scala b/slick/src/test/scala/org/apache/pekko/projection/slick/SlickProjectionSpec.scala index cadde1e4..2c84bf7e 100644 --- a/slick/src/test/scala/org/apache/pekko/projection/slick/SlickProjectionSpec.scala +++ b/slick/src/test/scala/org/apache/pekko/projection/slick/SlickProjectionSpec.scala @@ -172,7 +172,7 @@ class SlickProjectionSpec val dbConfig: DatabaseConfig[H2Profile] = DatabaseConfig.forConfig(SlickSettings.configPath, config) - val offsetStore = new SlickOffsetStore(system, dbConfig.db, dbConfig.profile, SlickSettings(system)) + val offsetStore = new SlickOffsetStore(system, dbConfig, SlickSettings(system)) val projectionTestKit = ProjectionTestKit(system) From f9922d664158c8e35191981800eb82720e848897 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 11 Jun 2024 12:25:18 +0100 Subject: [PATCH 2/7] further build changes --- build.sbt | 9 --------- .../kafka/integration/KafkaToSlickIntegrationSpec.scala | 2 +- project/Common.scala | 2 +- project/Dependencies.scala | 5 ++--- 4 files changed, 4 insertions(+), 14 deletions(-) diff --git a/build.sbt b/build.sbt index e253948d..84a94f23 100644 --- a/build.sbt +++ b/build.sbt @@ -20,7 +20,6 @@ lazy val core = Project(id = "core", base = file("core")) .configs(IntegrationTest) .enablePlugins(ReproducibleBuildsPlugin) - .settings(crossScalaVersions := Dependencies.Scala2And3Versions) .settings(headerSettings(IntegrationTest)) .settings(Defaults.itSettings) .settings(Dependencies.core) @@ -31,7 +30,6 @@ lazy val core = lazy val coreTest = Project(id = "core-test", base = file("core-test")) .configs(IntegrationTest) - .settings(crossScalaVersions := Dependencies.Scala2And3Versions) .settings(headerSettings(IntegrationTest)) .disablePlugins(MimaPlugin) .settings(Defaults.itSettings) @@ -46,7 +44,6 @@ lazy val testkit = Project(id = "testkit", base = file("testkit")) .configs(IntegrationTest) .enablePlugins(ReproducibleBuildsPlugin) - .settings(crossScalaVersions := Dependencies.Scala2And3Versions) .settings(headerSettings(IntegrationTest)) .settings(Defaults.itSettings) .settings(Dependencies.testKit) @@ -59,7 +56,6 @@ lazy val jdbc = Project(id = "jdbc", base = file("jdbc")) .configs(IntegrationTest.extend(Test)) .enablePlugins(ReproducibleBuildsPlugin) - .settings(crossScalaVersions := Dependencies.Scala2And3Versions) .settings(headerSettings(IntegrationTest)) .settings(Defaults.itSettings) .settings(Dependencies.jdbc) @@ -74,7 +70,6 @@ lazy val slick = Project(id = "slick", base = file("slick")) .configs(IntegrationTest.extend(Test)) .enablePlugins(ReproducibleBuildsPlugin) - .settings(crossScalaVersions := Dependencies.Scala2And3Versions) .settings(headerSettings(IntegrationTest)) .settings(Defaults.itSettings) .settings(Dependencies.slick) @@ -92,7 +87,6 @@ lazy val cassandra = Project(id = "cassandra", base = file("cassandra")) .configs(IntegrationTest) .enablePlugins(ReproducibleBuildsPlugin) - .settings(crossScalaVersions := Dependencies.Scala2And3Versions) .settings(headerSettings(IntegrationTest)) .settings(Defaults.itSettings) .settings(Dependencies.cassandra) @@ -109,7 +103,6 @@ lazy val cassandra = lazy val eventsourced = Project(id = "eventsourced", base = file("eventsourced")) .enablePlugins(ReproducibleBuildsPlugin) - .settings(crossScalaVersions := Dependencies.Scala2And3Versions) .settings(Dependencies.eventsourced) .settings(AutomaticModuleName.settings("pekko.projection.eventsourced")) .settings(name := "pekko-projection-eventsourced") @@ -120,7 +113,6 @@ lazy val eventsourced = lazy val kafka = Project(id = "kafka", base = file("kafka")) .enablePlugins(ReproducibleBuildsPlugin) - .settings(crossScalaVersions := Dependencies.Scala2And3Versions) .settings(Dependencies.kafka) .settings(AutomaticModuleName.settings("pekko.projection.kafka")) .settings(name := "pekko-projection-kafka") @@ -146,7 +138,6 @@ lazy val `durable-state` = Project(id = "durable-state", base = file("durable-state")) .configs(IntegrationTest) .enablePlugins(ReproducibleBuildsPlugin) - .settings(crossScalaVersions := Dependencies.Scala2And3Versions) .settings(Dependencies.state) .settings(AutomaticModuleName.settings("pekko.projection.durable-state")) .settings(name := "pekko-projection-durable-state") diff --git a/kafka-test/src/it/scala/org/apache/pekko/projection/kafka/integration/KafkaToSlickIntegrationSpec.scala b/kafka-test/src/it/scala/org/apache/pekko/projection/kafka/integration/KafkaToSlickIntegrationSpec.scala index b4bb21b7..da5e046d 100644 --- a/kafka-test/src/it/scala/org/apache/pekko/projection/kafka/integration/KafkaToSlickIntegrationSpec.scala +++ b/kafka-test/src/it/scala/org/apache/pekko/projection/kafka/integration/KafkaToSlickIntegrationSpec.scala @@ -131,7 +131,7 @@ class KafkaToSlickIntegrationSpec extends KafkaSpecBase(ConfigFactory.load().wit PatienceConfig(timeout = Span(30, Seconds), interval = Span(500, Milliseconds)) val dbConfig: DatabaseConfig[H2Profile] = DatabaseConfig.forConfig(SlickSettings.configPath, config) - val offsetStore = new SlickOffsetStore(system.toTyped, dbConfig.db, dbConfig.profile, SlickSettings(system.toTyped)) + val offsetStore = new SlickOffsetStore(system.toTyped, dbConfig, SlickSettings(system.toTyped)) val repository = new EventTypeCountRepository(dbConfig) override protected def beforeAll(): Unit = { diff --git a/project/Common.scala b/project/Common.scala index e1f79321..0d0962be 100644 --- a/project/Common.scala +++ b/project/Common.scala @@ -43,7 +43,7 @@ object Common extends AutoPlugin { override lazy val projectSettings = Seq( projectInfoVersion := (if (isSnapshot.value) "snapshot" else version.value), crossVersion := CrossVersion.binary, - crossScalaVersions := Dependencies.Scala2Versions, + crossScalaVersions := Dependencies.ScalaVersions, scalaVersion := Dependencies.Scala213, javacOptions ++= List("-Xlint:unchecked", "-Xlint:deprecation"), Compile / doc / scalacOptions := scalacOptions.value ++ Seq( diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 02dd0ace..31b8eb44 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -17,10 +17,9 @@ object Dependencies { val Scala213 = "2.13.14" val Scala212 = "2.12.19" val Scala3 = "3.3.3" - val Scala2Versions = Seq(Scala213, Scala212) - val Scala2And3Versions = Scala2Versions.+:(Scala3) + val ScalaVersions = Seq(Scala213, Scala212, Scala3) - val PekkoVersionInDocs = "1.0" + val PekkoVersionInDocs = "1.1" val ConnectorsVersionInDocs = "1.0" val ConnectorsKafkaVersionInDocs = "1.0" From 7cacaa386e6412fe90e21e19734d5ee06b6f0092 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 11 Jun 2024 12:31:15 +0100 Subject: [PATCH 3/7] Update Dependencies.scala --- project/Dependencies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 31b8eb44..163af5e5 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -25,7 +25,7 @@ object Dependencies { object Versions { val pekko = PekkoCoreDependency.version - val pekkoPersistenceJdbc = "1.0.0" + val pekkoPersistenceJdbc = "1.1.0-M1" val pekkoPersistenceCassandra = "1.0.0" val connectors = PekkoConnectorsDependency.version val connectorsKafka = PekkoConnectorsKafkaDependency.version From 2fd70ddd8051d2878be574b33c057ad2d48b5f31 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 11 Jun 2024 12:38:34 +0100 Subject: [PATCH 4/7] Scala 3 better enforces package private checks and there is a test in a non-pekko package --- .../apache/pekko/projection/jdbc/internal/JdbcOffsetStore.scala | 2 +- .../apache/pekko/projection/jdbc/internal/JdbcSettings.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcOffsetStore.scala b/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcOffsetStore.scala index 5ede283d..7bb448b1 100644 --- a/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcOffsetStore.scala +++ b/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcOffsetStore.scala @@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory * INTERNAL API */ @InternalApi -private[projection] class JdbcOffsetStore[S <: JdbcSession]( +class JdbcOffsetStore[S <: JdbcSession]( system: ActorSystem[_], settings: JdbcSettings, jdbcSessionFactory: () => S, diff --git a/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcSettings.scala b/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcSettings.scala index 35912e0e..31db11fd 100644 --- a/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcSettings.scala +++ b/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcSettings.scala @@ -27,7 +27,7 @@ import com.typesafe.config.ConfigValueType * INTERNAL API */ @InternalApi -private[projection] case class JdbcSettings(config: Config, executionContext: ExecutionContext) { +case class JdbcSettings(config: Config, executionContext: ExecutionContext) { val schema: Option[String] = Option(config.getString("offset-store.schema")).filterNot(_.trim.isEmpty) From 3a1de3bb75f9e054b27cab062b6e73433dabe09c Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Wed, 12 Jun 2024 08:51:15 +0100 Subject: [PATCH 5/7] Update SlickOffsetStore.scala --- .../projection/slick/internal/SlickOffsetStore.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala b/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala index c5e2c407..83ed3bc6 100644 --- a/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala +++ b/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala @@ -43,24 +43,26 @@ import slick.jdbc.JdbcProfile */ @InternalApi private[projection] class SlickOffsetStore[P <: JdbcProfile]( system: ActorSystem[_], - val databaseConfig: DatabaseConfig[P], + databaseConfig: DatabaseConfig[P], slickSettings: SlickSettings, clock: Clock) { def this(system: ActorSystem[_], databaseConfig: DatabaseConfig[P], slickSettings: SlickSettings) = this(system, databaseConfig, slickSettings, Clock.systemUTC()) + private[projection] val profile: P = databaseConfig.profile + private val db = databaseConfig.db + + import profile.api._ import OffsetSerialization.MultipleOffsets import OffsetSerialization.SingleOffset - import databaseConfig.profile.api._ - private val db = databaseConfig.db val (dialect, useLowerCase): (Dialect, Boolean) = { val useLowerCase = slickSettings.useLowerCase - databaseConfig.profile match { + profile match { case _: slick.jdbc.H2Profile => ( H2Dialect(slickSettings.schema, slickSettings.table, slickSettings.managementTable, useLowerCase), From aa6862f8346b215f388de7da716e10a0d10a022b Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Wed, 12 Jun 2024 08:58:38 +0100 Subject: [PATCH 6/7] Update SlickOffsetStore.scala --- .../pekko/projection/slick/internal/SlickOffsetStore.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala b/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala index 83ed3bc6..111057a2 100644 --- a/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala +++ b/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala @@ -57,7 +57,6 @@ import slick.jdbc.JdbcProfile import OffsetSerialization.MultipleOffsets import OffsetSerialization.SingleOffset - val (dialect, useLowerCase): (Dialect, Boolean) = { val useLowerCase = slickSettings.useLowerCase From 997d187b5cfcd8e4d7bca8705a79000fbb1e8b2a Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 13 Jun 2024 11:20:05 +0100 Subject: [PATCH 7/7] make access to db instance lazy --- .../slick/internal/SlickOffsetStore.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala b/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala index 111057a2..b61d1bef 100644 --- a/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala +++ b/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala @@ -51,7 +51,6 @@ import slick.jdbc.JdbcProfile this(system, databaseConfig, slickSettings, Clock.systemUTC()) private[projection] val profile: P = databaseConfig.profile - private val db = databaseConfig.db import profile.api._ import OffsetSerialization.MultipleOffsets @@ -92,7 +91,7 @@ import slick.jdbc.JdbcProfile SingleOffset(ProjectionId(projectionId.name, row.projectionKey), row.manifest, row.offsetStr, row.mergeable)) } - val results = db.run(action) + val results = databaseConfig.db.run(action) results.map { case Nil => None @@ -181,7 +180,8 @@ import slick.jdbc.JdbcProfile stmt.execute(sql) }) } - db.run(DBIO.seq(prepareSchemaDBIO, prepareManagementSchemaDBIO)).map(_ => Done)(ExecutionContexts.parasitic) + databaseConfig.db.run(DBIO.seq(prepareSchemaDBIO, prepareManagementSchemaDBIO)) + .map(_ => Done)(ExecutionContexts.parasitic) } def dropIfExists(): Future[Done] = { @@ -197,7 +197,8 @@ import slick.jdbc.JdbcProfile stmt.execute(dialect.dropManagementTableStatement) } } - db.run(DBIO.seq(prepareSchemaDBIO, prepareManagementSchemaDBIO)).map(_ => Done)(ExecutionContexts.parasitic) + databaseConfig.db.run(DBIO.seq(prepareSchemaDBIO, prepareManagementSchemaDBIO)) + .map(_ => Done)(ExecutionContexts.parasitic) } def readManagementState(projectionId: ProjectionId)( @@ -210,7 +211,7 @@ import slick.jdbc.JdbcProfile maybeRow.map(row => ManagementState(row.paused)) } - db.run(action) + databaseConfig.db.run(action) } def savePaused(projectionId: ProjectionId, paused: Boolean): Future[Done] = { @@ -218,6 +219,6 @@ import slick.jdbc.JdbcProfile val action = managementTable.insertOrUpdate(ManagementStateRow(projectionId.name, projectionId.key, paused, millisSinceEpoch)) - db.run(action).map(_ => Done)(ExecutionContexts.parasitic) + databaseConfig.db.run(action).map(_ => Done)(ExecutionContexts.parasitic) } }