Skip to content

Commit

Permalink
chore: akka to 2.10.0-M1, align with changes from upstream (#1772)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastian-alfers authored Sep 26, 2024
1 parent 5ee0a0a commit 1da3efa
Show file tree
Hide file tree
Showing 34 changed files with 194 additions and 199 deletions.
1 change: 0 additions & 1 deletion .scala-steward.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ updates.pin = [
{ groupId = "org.scalatest", artifactId = "scalatest", version = "3.1." }
{ groupId = "org.slf4j", artifactId = "log4j-over-slf4j", version = "1." }
{ groupId = "org.slf4j", artifactId = "jul-to-slf4j", version = "1." }
{ groupId = "ch.qos.logback", artifactId = "logback-classic", version = "1.2." }
]

commits.message = "bump: ${artifactName} ${nextVersion} (was ${currentVersion})"
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package akka.kafka.benchmarks
import java.util.concurrent.atomic.AtomicInteger

import akka.actor.ActorSystem
import akka.dispatch.ExecutionContexts
import akka.kafka.ConsumerMessage.CommittableMessage
import akka.kafka.benchmarks.InflightMetrics.{BrokerMetricRequest, ConsumerMetricRequest}
import akka.kafka.scaladsl.Committer
Expand Down Expand Up @@ -163,7 +162,7 @@ object ReactiveKafkaConsumerBenchmarks extends LazyLogging with InflightMetrics
val control = fixture.source
.mapAsync(1) { m =>
meter.mark()
m.committableOffset.commitInternal().map(_ => m)(ExecutionContexts.parasitic)
m.committableOffset.commitInternal().map(_ => m)(ExecutionContext.parasitic)
}
.toMat(Sink.foreach { msg =>
if (msg.committableOffset.partitionOffset.offset >= fixture.msgCount - 1)
Expand Down
12 changes: 7 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ val ScalaVersions = Seq(Scala213, Scala3)

val Scala3Settings = Seq(crossScalaVersions := ScalaVersions)

val AkkaBinaryVersionForDocs = "2.9"
val akkaVersion = "2.9.3"
val akkaVersion = "2.10.0-M1"
val AkkaBinaryVersionForDocs = VersionNumber(akkaVersion).numbers match {
case Seq(major, minor, _*) => s"$major.$minor"
}

// Keep .scala-steward.conf pin in sync
val kafkaVersion = "3.7.1"
Expand All @@ -26,7 +28,7 @@ val KafkaVersionForDocs = "37"
// https://github.com/akka/akka/blob/main/project/Dependencies.scala#L44
val scalatestVersion = "3.2.16"
val testcontainersVersion = "1.20.1"
val slf4jVersion = "1.7.36"
val slf4jVersion = "2.0.16"
// this depends on Kafka, and should be upgraded to such latest version
// that depends on the same Kafka version, as is defined above
// See https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer?repo=confluent-packages
Expand Down Expand Up @@ -290,7 +292,7 @@ lazy val tests = project
name := "akka-stream-kafka-tests",
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-discovery" % akkaVersion,
"com.google.protobuf" % "protobuf-java" % "3.25.4", // use the same, or later, version as in scalapb
"com.google.protobuf" % "protobuf-java" % "3.25.5", // use the same, or later, version as in scalapb
"io.confluent" % "kafka-avro-serializer" % confluentAvroSerializerVersion % Test excludeAll (confluentLibsExclusionRules: _*),
// See https://github.com/sbt/sbt/issues/3618#issuecomment-448951808
"javax.ws.rs" % "javax.ws.rs-api" % "2.1.1" artifacts Artifact("javax.ws.rs-api", "jar", "jar"),
Expand All @@ -304,7 +306,7 @@ lazy val tests = project
"org.hamcrest" % "hamcrest" % "3.0" % Test,
"net.aichler" % "jupiter-interface" % JupiterKeys.jupiterVersion.value % Test,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion % Test,
"ch.qos.logback" % "logback-classic" % "1.2.13" % Test,
"ch.qos.logback" % "logback-classic" % "1.5.7" % Test,
"org.slf4j" % "log4j-over-slf4j" % slf4jVersion % Test,
// Schema registry uses Glassfish which uses java.util.logging
"org.slf4j" % "jul-to-slf4j" % slf4jVersion % Test,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ import org.apache.kafka.common.utils.Utils

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.jdk.DurationConverters._
import scala.jdk.FutureConverters._
import scala.util.{Failure, Success}
import akka.util.JavaDurationConverters._
import org.slf4j.LoggerFactory

import scala.compat.java8.FutureConverters._

/**
* API MAY CHANGE
*
Expand Down Expand Up @@ -81,9 +80,9 @@ final class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension
def messageExtractor[M](topic: String,
timeout: java.time.Duration,
settings: ConsumerSettings[_, _]): CompletionStage[KafkaShardingMessageExtractor[M]] =
getPartitionCount(topic, timeout.asScala, settings)
getPartitionCount(topic, timeout.toScala, settings)
.map(new KafkaShardingMessageExtractor[M](_))(system.dispatcher)
.toJava
.asJava

/**
* API MAY CHANGE
Expand Down Expand Up @@ -147,11 +146,11 @@ final class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension
entityIdExtractor: java.util.function.Function[M, String],
settings: ConsumerSettings[_, _]
): CompletionStage[KafkaShardingNoEnvelopeExtractor[M]] =
getPartitionCount(topic, timeout.asScala, settings)
getPartitionCount(topic, timeout.toScala, settings)
.map(partitions => new KafkaShardingNoEnvelopeExtractor[M](partitions, e => entityIdExtractor.apply(e)))(
system.dispatcher
)
.toJava
.asJava

/**
* API MAY CHANGE
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/akka/kafka/CommitterSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ package akka.kafka
import java.util.concurrent.TimeUnit

import akka.annotation.ApiMayChange
import akka.util.JavaDurationConverters._

import com.typesafe.config.Config

import scala.concurrent.duration._
import scala.jdk.DurationConverters._

@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/882")
sealed trait CommitDelivery
Expand Down Expand Up @@ -176,7 +177,7 @@ class CommitterSettings private (
copy(maxInterval = maxInterval)

def withMaxInterval(maxInterval: java.time.Duration): CommitterSettings =
copy(maxInterval = maxInterval.asScala)
copy(maxInterval = maxInterval.toScala)

def withParallelism(parallelism: Int): CommitterSettings =
copy(parallelism = parallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

package akka.kafka

import akka.util.JavaDurationConverters._
import com.typesafe.config.Config

import scala.concurrent.duration._
import scala.jdk.DurationConverters._

import java.time.{Duration => JDuration}

Expand Down Expand Up @@ -40,7 +40,7 @@ class ConnectionCheckerSettings private[kafka] (val enable: Boolean,

/** Java API */
def withCheckInterval(checkInterval: JDuration): ConnectionCheckerSettings =
copy(checkInterval = checkInterval.asScala)
copy(checkInterval = checkInterval.toScala)

override def toString: String =
s"akka.kafka.ConnectionCheckerSettings(" +
Expand Down Expand Up @@ -70,7 +70,7 @@ object ConnectionCheckerSettings {
if (enable) {
val retries = config.getInt("max-retries")
val factor = config.getDouble("backoff-factor")
val checkInterval = config.getDuration("check-interval").asScala
val checkInterval = config.getDuration("check-interval").toScala
apply(retries, checkInterval, factor)
} else Disabled
}
Expand Down
Loading

0 comments on commit 1da3efa

Please sign in to comment.