Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support scala3 with slick #167

Merged
merged 7 commits into from
Jun 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ lazy val core =
Project(id = "core", base = file("core"))
.configs(IntegrationTest)
.enablePlugins(ReproducibleBuildsPlugin)
.settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(headerSettings(IntegrationTest))
.settings(Defaults.itSettings)
.settings(Dependencies.core)
Expand All @@ -31,7 +30,6 @@ lazy val core =
lazy val coreTest =
Project(id = "core-test", base = file("core-test"))
.configs(IntegrationTest)
.settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(headerSettings(IntegrationTest))
.disablePlugins(MimaPlugin)
.settings(Defaults.itSettings)
Expand All @@ -46,7 +44,6 @@ lazy val testkit =
Project(id = "testkit", base = file("testkit"))
.configs(IntegrationTest)
.enablePlugins(ReproducibleBuildsPlugin)
.settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(headerSettings(IntegrationTest))
.settings(Defaults.itSettings)
.settings(Dependencies.testKit)
Expand All @@ -59,7 +56,6 @@ lazy val jdbc =
Project(id = "jdbc", base = file("jdbc"))
.configs(IntegrationTest.extend(Test))
.enablePlugins(ReproducibleBuildsPlugin)
.settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(headerSettings(IntegrationTest))
.settings(Defaults.itSettings)
.settings(Dependencies.jdbc)
Expand All @@ -74,7 +70,6 @@ lazy val slick =
Project(id = "slick", base = file("slick"))
.configs(IntegrationTest.extend(Test))
.enablePlugins(ReproducibleBuildsPlugin)
.settings(crossScalaVersions := Dependencies.Scala2Versions)
.settings(headerSettings(IntegrationTest))
.settings(Defaults.itSettings)
.settings(Dependencies.slick)
Expand All @@ -92,7 +87,6 @@ lazy val cassandra =
Project(id = "cassandra", base = file("cassandra"))
.configs(IntegrationTest)
.enablePlugins(ReproducibleBuildsPlugin)
.settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(headerSettings(IntegrationTest))
.settings(Defaults.itSettings)
.settings(Dependencies.cassandra)
Expand All @@ -109,7 +103,6 @@ lazy val cassandra =
lazy val eventsourced =
Project(id = "eventsourced", base = file("eventsourced"))
.enablePlugins(ReproducibleBuildsPlugin)
.settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(Dependencies.eventsourced)
.settings(AutomaticModuleName.settings("pekko.projection.eventsourced"))
.settings(name := "pekko-projection-eventsourced")
Expand All @@ -120,7 +113,6 @@ lazy val eventsourced =
lazy val kafka =
Project(id = "kafka", base = file("kafka"))
.enablePlugins(ReproducibleBuildsPlugin)
.settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(Dependencies.kafka)
.settings(AutomaticModuleName.settings("pekko.projection.kafka"))
.settings(name := "pekko-projection-kafka")
Expand All @@ -146,7 +138,6 @@ lazy val `durable-state` =
Project(id = "durable-state", base = file("durable-state"))
.configs(IntegrationTest)
.enablePlugins(ReproducibleBuildsPlugin)
.settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(Dependencies.state)
.settings(AutomaticModuleName.settings("pekko.projection.durable-state"))
.settings(name := "pekko-projection-durable-state")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory
* INTERNAL API
*/
@InternalApi
private[projection] class JdbcOffsetStore[S <: JdbcSession](
class JdbcOffsetStore[S <: JdbcSession](
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is an example test class in the jdocs package that Scala 3 won't allow to access this package private class

system: ActorSystem[_],
settings: JdbcSettings,
jdbcSessionFactory: () => S,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import com.typesafe.config.ConfigValueType
* INTERNAL API
*/
@InternalApi
private[projection] case class JdbcSettings(config: Config, executionContext: ExecutionContext) {
case class JdbcSettings(config: Config, executionContext: ExecutionContext) {

val schema: Option[String] =
Option(config.getString("offset-store.schema")).filterNot(_.trim.isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class KafkaToSlickIntegrationSpec extends KafkaSpecBase(ConfigFactory.load().wit
PatienceConfig(timeout = Span(30, Seconds), interval = Span(500, Milliseconds))

val dbConfig: DatabaseConfig[H2Profile] = DatabaseConfig.forConfig(SlickSettings.configPath, config)
val offsetStore = new SlickOffsetStore(system.toTyped, dbConfig.db, dbConfig.profile, SlickSettings(system.toTyped))
val offsetStore = new SlickOffsetStore(system.toTyped, dbConfig, SlickSettings(system.toTyped))
val repository = new EventTypeCountRepository(dbConfig)

override protected def beforeAll(): Unit = {
Expand Down
2 changes: 1 addition & 1 deletion project/Common.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object Common extends AutoPlugin {
override lazy val projectSettings = Seq(
projectInfoVersion := (if (isSnapshot.value) "snapshot" else version.value),
crossVersion := CrossVersion.binary,
crossScalaVersions := Dependencies.Scala2Versions,
crossScalaVersions := Dependencies.ScalaVersions,
scalaVersion := Dependencies.Scala213,
javacOptions ++= List("-Xlint:unchecked", "-Xlint:deprecation"),
Compile / doc / scalacOptions := scalacOptions.value ++ Seq(
Expand Down
7 changes: 3 additions & 4 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@ object Dependencies {
val Scala213 = "2.13.14"
val Scala212 = "2.12.19"
val Scala3 = "3.3.3"
val Scala2Versions = Seq(Scala213, Scala212)
val Scala2And3Versions = Scala2Versions.+:(Scala3)
val ScalaVersions = Seq(Scala213, Scala212, Scala3)

val PekkoVersionInDocs = "1.0"
val PekkoVersionInDocs = "1.1"
val ConnectorsVersionInDocs = "1.0"
val ConnectorsKafkaVersionInDocs = "1.0"

object Versions {
val pekko = PekkoCoreDependency.version
val pekkoPersistenceJdbc = "1.0.0"
val pekkoPersistenceJdbc = "1.1.0-M1"
val pekkoPersistenceCassandra = "1.0.0"
val connectors = PekkoConnectorsDependency.version
val connectorsKafka = PekkoConnectorsKafkaDependency.version
Expand Down
2 changes: 1 addition & 1 deletion project/PekkoCoreDependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
object PekkoCoreDependency extends PekkoDependency {
override val checkProject: String = "pekko-cluster-sharding-typed"
override val module: Option[String] = None
override val currentVersion: String = "1.0.2"
override val currentVersion: String = "1.1.0-M1"
}
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ object SlickProjection {

private def createOffsetStore[P <: JdbcProfile: ClassTag](databaseConfig: DatabaseConfig[P])(
implicit system: ActorSystem[_]) =
new SlickOffsetStore(system, databaseConfig.db, databaseConfig.profile, SlickSettings(system))
new SlickOffsetStore(system, databaseConfig, SlickSettings(system))
}

@ApiMayChange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,26 @@ import pekko.projection.jdbc.internal.MySQLDialect
import pekko.projection.jdbc.internal.OracleDialect
import pekko.projection.jdbc.internal.PostgresDialect
import pekko.util.Helpers.toRootLowerCase
import slick.basic.DatabaseConfig
import slick.jdbc.JdbcProfile

/**
* INTERNAL API
*/
@InternalApi private[projection] class SlickOffsetStore[P <: JdbcProfile](
system: ActorSystem[_],
val db: P#Backend#Database,
val profile: P,
databaseConfig: DatabaseConfig[P],
slickSettings: SlickSettings,
clock: Clock) {

def this(system: ActorSystem[_], databaseConfig: DatabaseConfig[P], slickSettings: SlickSettings) =
this(system, databaseConfig, slickSettings, Clock.systemUTC())

private[projection] val profile: P = databaseConfig.profile

import profile.api._
import OffsetSerialization.MultipleOffsets
import OffsetSerialization.SingleOffset
import profile.api._

def this(system: ActorSystem[_], db: P#Backend#Database, profile: P, slickSettings: SlickSettings) =
this(system, db, profile, slickSettings, Clock.systemUTC())

val (dialect, useLowerCase): (Dialect, Boolean) = {

Expand Down Expand Up @@ -88,7 +91,7 @@ import slick.jdbc.JdbcProfile
SingleOffset(ProjectionId(projectionId.name, row.projectionKey), row.manifest, row.offsetStr, row.mergeable))
}

val results = db.run(action)
val results = databaseConfig.db.run(action)

results.map {
case Nil => None
Expand Down Expand Up @@ -177,7 +180,8 @@ import slick.jdbc.JdbcProfile
stmt.execute(sql)
})
}
db.run(DBIO.seq(prepareSchemaDBIO, prepareManagementSchemaDBIO)).map(_ => Done)(ExecutionContexts.parasitic)
databaseConfig.db.run(DBIO.seq(prepareSchemaDBIO, prepareManagementSchemaDBIO))
.map(_ => Done)(ExecutionContexts.parasitic)
}

def dropIfExists(): Future[Done] = {
Expand All @@ -193,7 +197,8 @@ import slick.jdbc.JdbcProfile
stmt.execute(dialect.dropManagementTableStatement)
}
}
db.run(DBIO.seq(prepareSchemaDBIO, prepareManagementSchemaDBIO)).map(_ => Done)(ExecutionContexts.parasitic)
databaseConfig.db.run(DBIO.seq(prepareSchemaDBIO, prepareManagementSchemaDBIO))
.map(_ => Done)(ExecutionContexts.parasitic)
}

def readManagementState(projectionId: ProjectionId)(
Expand All @@ -206,14 +211,14 @@ import slick.jdbc.JdbcProfile
maybeRow.map(row => ManagementState(row.paused))
}

db.run(action)
databaseConfig.db.run(action)
}

def savePaused(projectionId: ProjectionId, paused: Boolean): Future[Done] = {
val millisSinceEpoch = clock.instant().toEpochMilli
val action =
managementTable.insertOrUpdate(ManagementStateRow(projectionId.name, projectionId.key, paused, millisSinceEpoch))

db.run(action).map(_ => Done)(ExecutionContexts.parasitic)
databaseConfig.db.run(action).map(_ => Done)(ExecutionContexts.parasitic)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private[projection] class SlickProjectionImpl[Offset, Envelope, P <: JdbcProfile
settings) {

implicit val executionContext: ExecutionContext = system.executionContext
override val logger: LoggingAdapter = Logging(system.classicSystem, this.getClass)
override val logger: LoggingAdapter = Logging(system.classicSystem, classOf[SlickInternalProjectionState])

override def readPaused(): Future[Boolean] =
offsetStore.readManagementState(projectionId).map(_.exists(_.paused))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ abstract class SlickOffsetStoreSpec(specConfig: SlickSpecConfig)
private val clock = new TestClock

private val offsetStore =
new SlickOffsetStore(system, dbConfig.db, dbConfig.profile, SlickSettings(slickConfig), clock)
new SlickOffsetStore(system, dbConfig, SlickSettings(slickConfig), clock)

override protected def beforeAll(): Unit = {
// create offset table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class SlickProjectionSpec

val dbConfig: DatabaseConfig[H2Profile] = DatabaseConfig.forConfig(SlickSettings.configPath, config)

val offsetStore = new SlickOffsetStore(system, dbConfig.db, dbConfig.profile, SlickSettings(system))
val offsetStore = new SlickOffsetStore(system, dbConfig, SlickSettings(system))

val projectionTestKit = ProjectionTestKit(system)

Expand Down
Loading