diff --git a/build.sbt b/build.sbt index a807aac8..84a94f23 100644 --- a/build.sbt +++ b/build.sbt @@ -20,7 +20,6 @@ lazy val core = Project(id = "core", base = file("core")) .configs(IntegrationTest) .enablePlugins(ReproducibleBuildsPlugin) - .settings(crossScalaVersions := Dependencies.Scala2And3Versions) .settings(headerSettings(IntegrationTest)) .settings(Defaults.itSettings) .settings(Dependencies.core) @@ -31,7 +30,6 @@ lazy val core = lazy val coreTest = Project(id = "core-test", base = file("core-test")) .configs(IntegrationTest) - .settings(crossScalaVersions := Dependencies.Scala2And3Versions) .settings(headerSettings(IntegrationTest)) .disablePlugins(MimaPlugin) .settings(Defaults.itSettings) @@ -46,7 +44,6 @@ lazy val testkit = Project(id = "testkit", base = file("testkit")) .configs(IntegrationTest) .enablePlugins(ReproducibleBuildsPlugin) - .settings(crossScalaVersions := Dependencies.Scala2And3Versions) .settings(headerSettings(IntegrationTest)) .settings(Defaults.itSettings) .settings(Dependencies.testKit) @@ -59,7 +56,6 @@ lazy val jdbc = Project(id = "jdbc", base = file("jdbc")) .configs(IntegrationTest.extend(Test)) .enablePlugins(ReproducibleBuildsPlugin) - .settings(crossScalaVersions := Dependencies.Scala2And3Versions) .settings(headerSettings(IntegrationTest)) .settings(Defaults.itSettings) .settings(Dependencies.jdbc) @@ -74,7 +70,6 @@ lazy val slick = Project(id = "slick", base = file("slick")) .configs(IntegrationTest.extend(Test)) .enablePlugins(ReproducibleBuildsPlugin) - .settings(crossScalaVersions := Dependencies.Scala2Versions) .settings(headerSettings(IntegrationTest)) .settings(Defaults.itSettings) .settings(Dependencies.slick) @@ -92,7 +87,6 @@ lazy val cassandra = Project(id = "cassandra", base = file("cassandra")) .configs(IntegrationTest) .enablePlugins(ReproducibleBuildsPlugin) - .settings(crossScalaVersions := Dependencies.Scala2And3Versions) .settings(headerSettings(IntegrationTest)) .settings(Defaults.itSettings) .settings(Dependencies.cassandra) @@ -109,7 +103,6 @@ lazy val cassandra = lazy val eventsourced = Project(id = "eventsourced", base = file("eventsourced")) .enablePlugins(ReproducibleBuildsPlugin) - .settings(crossScalaVersions := Dependencies.Scala2And3Versions) .settings(Dependencies.eventsourced) .settings(AutomaticModuleName.settings("pekko.projection.eventsourced")) .settings(name := "pekko-projection-eventsourced") @@ -120,7 +113,6 @@ lazy val eventsourced = lazy val kafka = Project(id = "kafka", base = file("kafka")) .enablePlugins(ReproducibleBuildsPlugin) - .settings(crossScalaVersions := Dependencies.Scala2And3Versions) .settings(Dependencies.kafka) .settings(AutomaticModuleName.settings("pekko.projection.kafka")) .settings(name := "pekko-projection-kafka") @@ -146,7 +138,6 @@ lazy val `durable-state` = Project(id = "durable-state", base = file("durable-state")) .configs(IntegrationTest) .enablePlugins(ReproducibleBuildsPlugin) - .settings(crossScalaVersions := Dependencies.Scala2And3Versions) .settings(Dependencies.state) .settings(AutomaticModuleName.settings("pekko.projection.durable-state")) .settings(name := "pekko-projection-durable-state") diff --git a/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcOffsetStore.scala b/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcOffsetStore.scala index 5ede283d..7bb448b1 100644 --- a/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcOffsetStore.scala +++ b/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcOffsetStore.scala @@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory * INTERNAL API */ @InternalApi -private[projection] class JdbcOffsetStore[S <: JdbcSession]( +class JdbcOffsetStore[S <: JdbcSession]( system: ActorSystem[_], settings: JdbcSettings, jdbcSessionFactory: () => S, diff --git a/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcSettings.scala b/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcSettings.scala index 35912e0e..31db11fd 100644 --- a/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcSettings.scala +++ b/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcSettings.scala @@ -27,7 +27,7 @@ import com.typesafe.config.ConfigValueType * INTERNAL API */ @InternalApi -private[projection] case class JdbcSettings(config: Config, executionContext: ExecutionContext) { +case class JdbcSettings(config: Config, executionContext: ExecutionContext) { val schema: Option[String] = Option(config.getString("offset-store.schema")).filterNot(_.trim.isEmpty) diff --git a/kafka-test/src/it/scala/org/apache/pekko/projection/kafka/integration/KafkaToSlickIntegrationSpec.scala b/kafka-test/src/it/scala/org/apache/pekko/projection/kafka/integration/KafkaToSlickIntegrationSpec.scala index b4bb21b7..da5e046d 100644 --- a/kafka-test/src/it/scala/org/apache/pekko/projection/kafka/integration/KafkaToSlickIntegrationSpec.scala +++ b/kafka-test/src/it/scala/org/apache/pekko/projection/kafka/integration/KafkaToSlickIntegrationSpec.scala @@ -131,7 +131,7 @@ class KafkaToSlickIntegrationSpec extends KafkaSpecBase(ConfigFactory.load().wit PatienceConfig(timeout = Span(30, Seconds), interval = Span(500, Milliseconds)) val dbConfig: DatabaseConfig[H2Profile] = DatabaseConfig.forConfig(SlickSettings.configPath, config) - val offsetStore = new SlickOffsetStore(system.toTyped, dbConfig.db, dbConfig.profile, SlickSettings(system.toTyped)) + val offsetStore = new SlickOffsetStore(system.toTyped, dbConfig, SlickSettings(system.toTyped)) val repository = new EventTypeCountRepository(dbConfig) override protected def beforeAll(): Unit = { diff --git a/project/Common.scala b/project/Common.scala index e1f79321..0d0962be 100644 --- a/project/Common.scala +++ b/project/Common.scala @@ -43,7 +43,7 @@ object Common extends AutoPlugin { override lazy val projectSettings = Seq( projectInfoVersion := (if (isSnapshot.value) "snapshot" else version.value), crossVersion := CrossVersion.binary, - crossScalaVersions := Dependencies.Scala2Versions, + crossScalaVersions := Dependencies.ScalaVersions, scalaVersion := Dependencies.Scala213, javacOptions ++= List("-Xlint:unchecked", "-Xlint:deprecation"), Compile / doc / scalacOptions := scalacOptions.value ++ Seq( diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 02dd0ace..163af5e5 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -17,16 +17,15 @@ object Dependencies { val Scala213 = "2.13.14" val Scala212 = "2.12.19" val Scala3 = "3.3.3" - val Scala2Versions = Seq(Scala213, Scala212) - val Scala2And3Versions = Scala2Versions.+:(Scala3) + val ScalaVersions = Seq(Scala213, Scala212, Scala3) - val PekkoVersionInDocs = "1.0" + val PekkoVersionInDocs = "1.1" val ConnectorsVersionInDocs = "1.0" val ConnectorsKafkaVersionInDocs = "1.0" object Versions { val pekko = PekkoCoreDependency.version - val pekkoPersistenceJdbc = "1.0.0" + val pekkoPersistenceJdbc = "1.1.0-M1" val pekkoPersistenceCassandra = "1.0.0" val connectors = PekkoConnectorsDependency.version val connectorsKafka = PekkoConnectorsKafkaDependency.version diff --git a/project/PekkoCoreDependency.scala b/project/PekkoCoreDependency.scala index 5fd56786..155a8b4b 100644 --- a/project/PekkoCoreDependency.scala +++ b/project/PekkoCoreDependency.scala @@ -21,5 +21,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency object PekkoCoreDependency extends PekkoDependency { override val checkProject: String = "pekko-cluster-sharding-typed" override val module: Option[String] = None - override val currentVersion: String = "1.0.2" + override val currentVersion: String = "1.1.0-M1" } diff --git a/slick/src/main/scala/org/apache/pekko/projection/slick/SlickProjection.scala b/slick/src/main/scala/org/apache/pekko/projection/slick/SlickProjection.scala index 999e9e7c..10e88e16 100644 --- a/slick/src/main/scala/org/apache/pekko/projection/slick/SlickProjection.scala +++ b/slick/src/main/scala/org/apache/pekko/projection/slick/SlickProjection.scala @@ -395,7 +395,7 @@ object SlickProjection { private def createOffsetStore[P <: JdbcProfile: ClassTag](databaseConfig: DatabaseConfig[P])( implicit system: ActorSystem[_]) = - new SlickOffsetStore(system, databaseConfig.db, databaseConfig.profile, SlickSettings(system)) + new SlickOffsetStore(system, databaseConfig, SlickSettings(system)) } @ApiMayChange diff --git a/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala b/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala index 36a98a71..b61d1bef 100644 --- a/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala +++ b/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala @@ -35,6 +35,7 @@ import pekko.projection.jdbc.internal.MySQLDialect import pekko.projection.jdbc.internal.OracleDialect import pekko.projection.jdbc.internal.PostgresDialect import pekko.util.Helpers.toRootLowerCase +import slick.basic.DatabaseConfig import slick.jdbc.JdbcProfile /** @@ -42,16 +43,18 @@ import slick.jdbc.JdbcProfile */ @InternalApi private[projection] class SlickOffsetStore[P <: JdbcProfile]( system: ActorSystem[_], - val db: P#Backend#Database, - val profile: P, + databaseConfig: DatabaseConfig[P], slickSettings: SlickSettings, clock: Clock) { + + def this(system: ActorSystem[_], databaseConfig: DatabaseConfig[P], slickSettings: SlickSettings) = + this(system, databaseConfig, slickSettings, Clock.systemUTC()) + + private[projection] val profile: P = databaseConfig.profile + + import profile.api._ import OffsetSerialization.MultipleOffsets import OffsetSerialization.SingleOffset - import profile.api._ - - def this(system: ActorSystem[_], db: P#Backend#Database, profile: P, slickSettings: SlickSettings) = - this(system, db, profile, slickSettings, Clock.systemUTC()) val (dialect, useLowerCase): (Dialect, Boolean) = { @@ -88,7 +91,7 @@ import slick.jdbc.JdbcProfile SingleOffset(ProjectionId(projectionId.name, row.projectionKey), row.manifest, row.offsetStr, row.mergeable)) } - val results = db.run(action) + val results = databaseConfig.db.run(action) results.map { case Nil => None @@ -177,7 +180,8 @@ import slick.jdbc.JdbcProfile stmt.execute(sql) }) } - db.run(DBIO.seq(prepareSchemaDBIO, prepareManagementSchemaDBIO)).map(_ => Done)(ExecutionContexts.parasitic) + databaseConfig.db.run(DBIO.seq(prepareSchemaDBIO, prepareManagementSchemaDBIO)) + .map(_ => Done)(ExecutionContexts.parasitic) } def dropIfExists(): Future[Done] = { @@ -193,7 +197,8 @@ import slick.jdbc.JdbcProfile stmt.execute(dialect.dropManagementTableStatement) } } - db.run(DBIO.seq(prepareSchemaDBIO, prepareManagementSchemaDBIO)).map(_ => Done)(ExecutionContexts.parasitic) + databaseConfig.db.run(DBIO.seq(prepareSchemaDBIO, prepareManagementSchemaDBIO)) + .map(_ => Done)(ExecutionContexts.parasitic) } def readManagementState(projectionId: ProjectionId)( @@ -206,7 +211,7 @@ import slick.jdbc.JdbcProfile maybeRow.map(row => ManagementState(row.paused)) } - db.run(action) + databaseConfig.db.run(action) } def savePaused(projectionId: ProjectionId, paused: Boolean): Future[Done] = { @@ -214,6 +219,6 @@ import slick.jdbc.JdbcProfile val action = managementTable.insertOrUpdate(ManagementStateRow(projectionId.name, projectionId.key, paused, millisSinceEpoch)) - db.run(action).map(_ => Done)(ExecutionContexts.parasitic) + databaseConfig.db.run(action).map(_ => Done)(ExecutionContexts.parasitic) } } diff --git a/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickProjectionImpl.scala b/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickProjectionImpl.scala index 311add3a..27c46711 100644 --- a/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickProjectionImpl.scala +++ b/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickProjectionImpl.scala @@ -186,7 +186,7 @@ private[projection] class SlickProjectionImpl[Offset, Envelope, P <: JdbcProfile settings) { implicit val executionContext: ExecutionContext = system.executionContext - override val logger: LoggingAdapter = Logging(system.classicSystem, this.getClass) + override val logger: LoggingAdapter = Logging(system.classicSystem, classOf[SlickInternalProjectionState]) override def readPaused(): Future[Boolean] = offsetStore.readManagementState(projectionId).map(_.exists(_.paused)) diff --git a/slick/src/test/scala/org/apache/pekko/projection/slick/SlickOffsetStoreSpec.scala b/slick/src/test/scala/org/apache/pekko/projection/slick/SlickOffsetStoreSpec.scala index 7ea91fd5..15b5e55d 100644 --- a/slick/src/test/scala/org/apache/pekko/projection/slick/SlickOffsetStoreSpec.scala +++ b/slick/src/test/scala/org/apache/pekko/projection/slick/SlickOffsetStoreSpec.scala @@ -107,7 +107,7 @@ abstract class SlickOffsetStoreSpec(specConfig: SlickSpecConfig) private val clock = new TestClock private val offsetStore = - new SlickOffsetStore(system, dbConfig.db, dbConfig.profile, SlickSettings(slickConfig), clock) + new SlickOffsetStore(system, dbConfig, SlickSettings(slickConfig), clock) override protected def beforeAll(): Unit = { // create offset table diff --git a/slick/src/test/scala/org/apache/pekko/projection/slick/SlickProjectionSpec.scala b/slick/src/test/scala/org/apache/pekko/projection/slick/SlickProjectionSpec.scala index cadde1e4..2c84bf7e 100644 --- a/slick/src/test/scala/org/apache/pekko/projection/slick/SlickProjectionSpec.scala +++ b/slick/src/test/scala/org/apache/pekko/projection/slick/SlickProjectionSpec.scala @@ -172,7 +172,7 @@ class SlickProjectionSpec val dbConfig: DatabaseConfig[H2Profile] = DatabaseConfig.forConfig(SlickSettings.configPath, config) - val offsetStore = new SlickOffsetStore(system, dbConfig.db, dbConfig.profile, SlickSettings(system)) + val offsetStore = new SlickOffsetStore(system, dbConfig, SlickSettings(system)) val projectionTestKit = ProjectionTestKit(system)