Skip to content

Commit

Permalink
Update zio-streams, zio-test, zio-test-sbt to 1.0.0 (#207)
Browse files Browse the repository at this point in the history
* Update zio-streams, zio-test, zio-test-sbt to 1.0.0

* Fixes

Co-authored-by: Itamar Ravid <iravid@iravid.com>
  • Loading branch information
scala-steward and iravid authored Aug 7, 2020
1 parent 0b5911d commit a51e438
Show file tree
Hide file tree
Showing 8 changed files with 14 additions and 12 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ lazy val scala213 = "2.13.2"
lazy val mainScala = scala213
lazy val allScala = Seq(scala211, scala212, mainScala)

lazy val zioVersion = "1.0.0-RC21-2"
lazy val zioVersion = "1.0.0"
lazy val kafkaVersion = "2.4.1"

// Allows to silence scalac compilation warnings selectively by code block or file path
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/zio/kafka/admin/AdminClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -203,5 +203,5 @@ object AdminClient {
def make(settings: AdminClientSettings) =
ZManaged.make(
ZIO(JAdminClient.create(settings.driverSettings.asJava)).map(ac => AdminClient(ac))
)(client => ZIO.effectTotal(client.adminClient.close(settings.closeTimeout.asJava)))
)(client => ZIO.effectTotal(client.adminClient.close(settings.closeTimeout)))
}
7 changes: 4 additions & 3 deletions src/main/scala/zio/kafka/consumer/Offset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package zio.kafka.consumer
import org.apache.kafka.clients.consumer.RetriableCommitFailedException
import org.apache.kafka.common.TopicPartition
import zio.{ Schedule, Task, ZIO }
import zio.clock.Clock

sealed trait Offset {
def topicPartition: TopicPartition
Expand All @@ -14,17 +15,17 @@ sealed trait Offset {
* Attempts to commit and retries according to the given policy when the commit fails
* with a RetriableCommitFailedException
*/
def commitOrRetry[R](policy: Schedule[R, Throwable, Any]): ZIO[R, Throwable, Unit] =
def commitOrRetry[R](policy: Schedule[R, Throwable, Any]): ZIO[R with Clock, Throwable, Unit] =
Offset.commitOrRetry(commit, policy)
}

object Offset {
private[consumer] def commitOrRetry[R, B](
commit: Task[Unit],
policy: Schedule[R, Throwable, B]
): ZIO[R, Throwable, Unit] =
): ZIO[R with Clock, Throwable, Unit] =
commit.retry(
Schedule.doWhile[Throwable] {
Schedule.recurWhile[Throwable] {
case _: RetriableCommitFailedException => true
case _ => false
} && policy
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/zio/kafka/consumer/OffsetBatch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package zio.kafka.consumer

import org.apache.kafka.common.TopicPartition
import zio.{ Schedule, Task, ZIO }
import zio.clock.Clock

sealed trait OffsetBatch {
def offsets: Map[TopicPartition, Long]
Expand All @@ -13,7 +14,7 @@ sealed trait OffsetBatch {
* Attempts to commit and retries according to the given policy when the commit fails
* with a RetriableCommitFailedException
*/
def commitOrRetry[R](policy: Schedule[R, Throwable, Any]): ZIO[R, Throwable, Unit] =
def commitOrRetry[R](policy: Schedule[R, Throwable, Any]): ZIO[R with Clock, Throwable, Unit] =
Offset.commitOrRetry(commit, policy)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ private[consumer] object ConsumerAccess {
new ByteArrayDeserializer()
)
}
}.toManaged(c => blocking(access.withPermit(UIO(c.close(settings.closeTimeout.asJava)))))
}.toManaged(c => blocking(access.withPermit(UIO(c.close(settings.closeTimeout)))))
} yield new ConsumerAccess(consumer, access)
}
2 changes: 1 addition & 1 deletion src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ private[consumer] final class Runloop(
offsetRetrieval match {
case OffsetRetrieval.Manual(getOffsets) =>
getOffsets(tps)
.flatMap(offsets => ZIO.foreach(offsets) { case (tp, offset) => ZIO(c.seek(tp, offset)) })
.flatMap(offsets => ZIO.foreach(offsets.toList) { case (tp, offset) => ZIO(c.seek(tp, offset)) })
.when(tps.nonEmpty)

case OffsetRetrieval.Auto(_) =>
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/zio/kafka/producer/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ package object producer {
value <- valueSerializer.serialize(r.topic, r.headers, r.value())
} yield new ProducerRecord(r.topic, r.partition(), r.timestamp(), key, value, r.headers)

private[producer] def close: UIO[Unit] = UIO(p.close(producerSettings.closeTimeout.asJava))
private[producer] def close: UIO[Unit] = UIO(p.close(producerSettings.closeTimeout))
}

def live[R: Tag, K: Tag, V: Tag]: ZLayer[Has[Serializer[R, K]] with Has[Serializer[R, V]] with Has[
Expand Down
6 changes: 3 additions & 3 deletions src/test/scala/zio/kafka/ConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ object ConsumerSpec extends DefaultRunnableSpec {
}

// Consume messages
messagesReceived <- ZIO.foreach(0 until nrPartitions)(i => Ref.make[Int](0).map(i -> _)).map(_.toMap)
messagesReceived <- ZIO.foreach((0 until nrPartitions).toList)(i => Ref.make[Int](0).map(i -> _)).map(_.toMap)
subscription = Subscription.topics(topic)
fib <- Consumer
.subscribeAnd(subscription)
Expand Down Expand Up @@ -232,7 +232,7 @@ object ConsumerSpec extends DefaultRunnableSpec {
topic <- randomTopic
group <- randomGroup
keepProducing <- Ref.make(true)
_ <- (produceOne(topic, "key", "value") *> keepProducing.get).doWhile(b => b).fork
_ <- (produceOne(topic, "key", "value") *> keepProducing.get).repeatWhile(b => b).fork
_ <- Consumer
.subscribeAnd(Subscription.topics(topic))
.plainStream(Serde.string, Serde.string)
Expand Down Expand Up @@ -280,7 +280,7 @@ object ConsumerSpec extends DefaultRunnableSpec {
}

// Consume messages
messagesReceived <- ZIO.foreach(0 until nrPartitions)(i => Ref.make[Int](0).map(i -> _)).map(_.toMap)
messagesReceived <- ZIO.foreach((0 until nrPartitions).toList)(i => Ref.make[Int](0).map(i -> _)).map(_.toMap)
subscription = Subscription.topics(topic)
offsets <- (Consumer
.subscribeAnd(subscription)
Expand Down

0 comments on commit a51e438

Please sign in to comment.