From a51e4380365c108a990e70b89ed96d2a23886452 Mon Sep 17 00:00:00 2001 From: Scala Steward <43047562+scala-steward@users.noreply.github.com> Date: Fri, 7 Aug 2020 21:23:59 +0200 Subject: [PATCH] Update zio-streams, zio-test, zio-test-sbt to 1.0.0 (#207) * Update zio-streams, zio-test, zio-test-sbt to 1.0.0 * Fixes Co-authored-by: Itamar Ravid --- build.sbt | 2 +- src/main/scala/zio/kafka/admin/AdminClient.scala | 2 +- src/main/scala/zio/kafka/consumer/Offset.scala | 7 ++++--- src/main/scala/zio/kafka/consumer/OffsetBatch.scala | 3 ++- .../scala/zio/kafka/consumer/internal/ConsumerAccess.scala | 2 +- src/main/scala/zio/kafka/consumer/internal/Runloop.scala | 2 +- src/main/scala/zio/kafka/producer/package.scala | 2 +- src/test/scala/zio/kafka/ConsumerSpec.scala | 6 +++--- 8 files changed, 14 insertions(+), 12 deletions(-) diff --git a/build.sbt b/build.sbt index a5b90d308..0190ea106 100644 --- a/build.sbt +++ b/build.sbt @@ -4,7 +4,7 @@ lazy val scala213 = "2.13.2" lazy val mainScala = scala213 lazy val allScala = Seq(scala211, scala212, mainScala) -lazy val zioVersion = "1.0.0-RC21-2" +lazy val zioVersion = "1.0.0" lazy val kafkaVersion = "2.4.1" // Allows to silence scalac compilation warnings selectively by code block or file path diff --git a/src/main/scala/zio/kafka/admin/AdminClient.scala b/src/main/scala/zio/kafka/admin/AdminClient.scala index f18a00d20..3bdf289dc 100644 --- a/src/main/scala/zio/kafka/admin/AdminClient.scala +++ b/src/main/scala/zio/kafka/admin/AdminClient.scala @@ -203,5 +203,5 @@ object AdminClient { def make(settings: AdminClientSettings) = ZManaged.make( ZIO(JAdminClient.create(settings.driverSettings.asJava)).map(ac => AdminClient(ac)) - )(client => ZIO.effectTotal(client.adminClient.close(settings.closeTimeout.asJava))) + )(client => ZIO.effectTotal(client.adminClient.close(settings.closeTimeout))) } diff --git a/src/main/scala/zio/kafka/consumer/Offset.scala b/src/main/scala/zio/kafka/consumer/Offset.scala index d4a34047e..c7ccfc6ae 100644 --- a/src/main/scala/zio/kafka/consumer/Offset.scala +++ b/src/main/scala/zio/kafka/consumer/Offset.scala @@ -3,6 +3,7 @@ package zio.kafka.consumer import org.apache.kafka.clients.consumer.RetriableCommitFailedException import org.apache.kafka.common.TopicPartition import zio.{ Schedule, Task, ZIO } +import zio.clock.Clock sealed trait Offset { def topicPartition: TopicPartition @@ -14,7 +15,7 @@ sealed trait Offset { * Attempts to commit and retries according to the given policy when the commit fails * with a RetriableCommitFailedException */ - def commitOrRetry[R](policy: Schedule[R, Throwable, Any]): ZIO[R, Throwable, Unit] = + def commitOrRetry[R](policy: Schedule[R, Throwable, Any]): ZIO[R with Clock, Throwable, Unit] = Offset.commitOrRetry(commit, policy) } @@ -22,9 +23,9 @@ object Offset { private[consumer] def commitOrRetry[R, B]( commit: Task[Unit], policy: Schedule[R, Throwable, B] - ): ZIO[R, Throwable, Unit] = + ): ZIO[R with Clock, Throwable, Unit] = commit.retry( - Schedule.doWhile[Throwable] { + Schedule.recurWhile[Throwable] { case _: RetriableCommitFailedException => true case _ => false } && policy diff --git a/src/main/scala/zio/kafka/consumer/OffsetBatch.scala b/src/main/scala/zio/kafka/consumer/OffsetBatch.scala index e3286780a..83a7867a1 100644 --- a/src/main/scala/zio/kafka/consumer/OffsetBatch.scala +++ b/src/main/scala/zio/kafka/consumer/OffsetBatch.scala @@ -2,6 +2,7 @@ package zio.kafka.consumer import org.apache.kafka.common.TopicPartition import zio.{ Schedule, Task, ZIO } +import zio.clock.Clock sealed trait OffsetBatch { def offsets: Map[TopicPartition, Long] @@ -13,7 +14,7 @@ sealed trait OffsetBatch { * Attempts to commit and retries according to the given policy when the commit fails * with a RetriableCommitFailedException */ - def commitOrRetry[R](policy: Schedule[R, Throwable, Any]): ZIO[R, Throwable, Unit] = + def commitOrRetry[R](policy: Schedule[R, Throwable, Any]): ZIO[R with Clock, Throwable, Unit] = Offset.commitOrRetry(commit, policy) } diff --git a/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala b/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala index f9bbb86c8..0866c69a1 100644 --- a/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala +++ b/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala @@ -39,6 +39,6 @@ private[consumer] object ConsumerAccess { new ByteArrayDeserializer() ) } - }.toManaged(c => blocking(access.withPermit(UIO(c.close(settings.closeTimeout.asJava))))) + }.toManaged(c => blocking(access.withPermit(UIO(c.close(settings.closeTimeout))))) } yield new ConsumerAccess(consumer, access) } diff --git a/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 4e750e6c0..0452f19da 100644 --- a/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -223,7 +223,7 @@ private[consumer] final class Runloop( offsetRetrieval match { case OffsetRetrieval.Manual(getOffsets) => getOffsets(tps) - .flatMap(offsets => ZIO.foreach(offsets) { case (tp, offset) => ZIO(c.seek(tp, offset)) }) + .flatMap(offsets => ZIO.foreach(offsets.toList) { case (tp, offset) => ZIO(c.seek(tp, offset)) }) .when(tps.nonEmpty) case OffsetRetrieval.Auto(_) => diff --git a/src/main/scala/zio/kafka/producer/package.scala b/src/main/scala/zio/kafka/producer/package.scala index 72f1a9d49..d3a8af355 100644 --- a/src/main/scala/zio/kafka/producer/package.scala +++ b/src/main/scala/zio/kafka/producer/package.scala @@ -183,7 +183,7 @@ package object producer { value <- valueSerializer.serialize(r.topic, r.headers, r.value()) } yield new ProducerRecord(r.topic, r.partition(), r.timestamp(), key, value, r.headers) - private[producer] def close: UIO[Unit] = UIO(p.close(producerSettings.closeTimeout.asJava)) + private[producer] def close: UIO[Unit] = UIO(p.close(producerSettings.closeTimeout)) } def live[R: Tag, K: Tag, V: Tag]: ZLayer[Has[Serializer[R, K]] with Has[Serializer[R, V]] with Has[ diff --git a/src/test/scala/zio/kafka/ConsumerSpec.scala b/src/test/scala/zio/kafka/ConsumerSpec.scala index c196a9f2f..1f312c625 100644 --- a/src/test/scala/zio/kafka/ConsumerSpec.scala +++ b/src/test/scala/zio/kafka/ConsumerSpec.scala @@ -191,7 +191,7 @@ object ConsumerSpec extends DefaultRunnableSpec { } // Consume messages - messagesReceived <- ZIO.foreach(0 until nrPartitions)(i => Ref.make[Int](0).map(i -> _)).map(_.toMap) + messagesReceived <- ZIO.foreach((0 until nrPartitions).toList)(i => Ref.make[Int](0).map(i -> _)).map(_.toMap) subscription = Subscription.topics(topic) fib <- Consumer .subscribeAnd(subscription) @@ -232,7 +232,7 @@ object ConsumerSpec extends DefaultRunnableSpec { topic <- randomTopic group <- randomGroup keepProducing <- Ref.make(true) - _ <- (produceOne(topic, "key", "value") *> keepProducing.get).doWhile(b => b).fork + _ <- (produceOne(topic, "key", "value") *> keepProducing.get).repeatWhile(b => b).fork _ <- Consumer .subscribeAnd(Subscription.topics(topic)) .plainStream(Serde.string, Serde.string) @@ -280,7 +280,7 @@ object ConsumerSpec extends DefaultRunnableSpec { } // Consume messages - messagesReceived <- ZIO.foreach(0 until nrPartitions)(i => Ref.make[Int](0).map(i -> _)).map(_.toMap) + messagesReceived <- ZIO.foreach((0 until nrPartitions).toList)(i => Ref.make[Int](0).map(i -> _)).map(_.toMap) subscription = Subscription.topics(topic) offsets <- (Consumer .subscribeAnd(subscription)