Skip to content

Commit

Permalink
Upgrade ZIO to 1.0.0-RC19 (#170)
Browse files Browse the repository at this point in the history
  • Loading branch information
iravid authored May 16, 2020
1 parent b89e05a commit ad51639
Show file tree
Hide file tree
Showing 13 changed files with 107 additions and 104 deletions.
22 changes: 11 additions & 11 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ version: 2.1
jobs:
lint:
docker:
- image: hseeberger/scala-sbt:8u222_1.3.4_2.13.1
- image: hseeberger/scala-sbt:8u252_1.3.10_2.13.2
steps:
- checkout
- restore_cache:
key: sbt-cache
- run: sbt ++2.13.1! check
- run: sbt ++2.13.2! check
- save_cache:
key: sbt-cache
paths:
Expand All @@ -16,12 +16,12 @@ jobs:
- "~/.m2"
test213_jdk8:
docker:
- image: hseeberger/scala-sbt:8u222_1.3.4_2.13.1
- image: hseeberger/scala-sbt:8u252_1.3.10_2.13.2
steps:
- checkout
- restore_cache:
key: sbt-cache
- run: sbt ++2.13.1! test packageDoc
- run: sbt ++2.13.2! test packageDoc
- save_cache:
key: sbt-cache
paths:
Expand All @@ -30,12 +30,12 @@ jobs:
- "~/.m2"
test213_jdk11:
docker:
- image: hseeberger/scala-sbt:11.0.4_1.3.4_2.13.1
- image: hseeberger/scala-sbt:11.0.7_1.3.10_2.13.2
steps:
- checkout
- restore_cache:
key: sbt-cache
- run: sbt ++2.13.1! test
- run: sbt ++2.13.2! test
- save_cache:
key: sbt-cache
paths:
Expand All @@ -44,12 +44,12 @@ jobs:
- "~/.m2"
test212_jdk8:
docker:
- image: hseeberger/scala-sbt:8u222_1.3.4_2.12.10
- image: hseeberger/scala-sbt:8u252_1.3.10_2.12.11
steps:
- checkout
- restore_cache:
key: sbt-cache
- run: sbt ++2.12.10! test packageDoc
- run: sbt ++2.12.11! test packageDoc
- save_cache:
key: sbt-cache
paths:
Expand All @@ -58,12 +58,12 @@ jobs:
- "~/.m2"
test212_jdk11:
docker:
- image: hseeberger/scala-sbt:11.0.4_1.3.4_2.12.10
- image: hseeberger/scala-sbt:11.0.7_1.3.10_2.12.11
steps:
- checkout
- restore_cache:
key: sbt-cache
- run: sbt ++2.12.10! test
- run: sbt ++2.12.11! test
- save_cache:
key: sbt-cache
paths:
Expand Down Expand Up @@ -128,7 +128,7 @@ jobs:

release:
docker:
- image: hseeberger/scala-sbt:8u222_1.3.4_2.13.1
- image: hseeberger/scala-sbt:8u252_1.3.10_2.13.2
steps:
- checkout
- run: git fetch --tags
Expand Down
8 changes: 4 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
lazy val scala211 = "2.11.12"
lazy val scala212 = "2.12.10"
lazy val scala213 = "2.13.1"
lazy val scala212 = "2.12.11"
lazy val scala213 = "2.13.2"
lazy val mainScala = scala213
lazy val allScala = Seq(scala211, scala212, mainScala)

lazy val zioVersion = "1.0.0-RC18-2"
lazy val zioVersion = "1.0.0-RC19"
lazy val kafkaVersion = "2.4.1"

// Allows to silence scalac compilation warnings selectively by code block or file path
// This is only compile time dependency, therefore it does not affect the generated bytecode
// https://github.com/ghik/silencer
lazy val silencer = {
val Version = "1.4.4"
val Version = "1.7.0"
Seq(
compilerPlugin("com.github.ghik" % "silencer-plugin" % Version cross CrossVersion.full),
"com.github.ghik" % "silencer-lib" % Version % Provided cross CrossVersion.full
Expand Down
14 changes: 7 additions & 7 deletions src/main/scala/zio/kafka/consumer/SubscribedConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.apache.kafka.common.TopicPartition
import zio.RIO
import zio.blocking.Blocking
import zio.clock.Clock
import zio.stream.{ ZStream, ZStreamChunk }
import zio.stream.ZStream
import zio.kafka.serde.Deserializer

class SubscribedConsumer(
Expand All @@ -14,15 +14,15 @@ class SubscribedConsumer(
def partitionedStream[R, K, V](keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V]): ZStream[
Clock with Blocking,
Throwable,
(TopicPartition, ZStreamChunk[R, Throwable, CommittableRecord[K, V]])
(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])
] =
ZStream.fromEffect(underlying).flatMap(_.partitionedStream(keyDeserializer, valueDeserializer))

def plainStream[R, K, V](
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V]
): ZStreamChunk[R with Clock with Blocking, Throwable, CommittableRecord[K, V]] =
ZStreamChunk(partitionedStream(keyDeserializer, valueDeserializer).flatMapPar(n = Int.MaxValue)(_._2.chunks))
): ZStream[R with Clock with Blocking, Throwable, CommittableRecord[K, V]] =
partitionedStream(keyDeserializer, valueDeserializer).flatMapPar(n = Int.MaxValue)(_._2)
}

class SubscribedConsumerFromEnvironment(
Expand All @@ -32,13 +32,13 @@ class SubscribedConsumerFromEnvironment(
def partitionedStream[R, K, V](keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V]): ZStream[
Clock with Blocking with Consumer,
Throwable,
(TopicPartition, ZStreamChunk[R, Throwable, CommittableRecord[K, V]])
(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])
] =
ZStream.fromEffect(underlying).flatMap(_.partitionedStream(keyDeserializer, valueDeserializer))

def plainStream[R, K, V](
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V]
): ZStreamChunk[R with Clock with Blocking with Consumer, Throwable, CommittableRecord[K, V]] =
ZStreamChunk(partitionedStream(keyDeserializer, valueDeserializer).flatMapPar(n = Int.MaxValue)(_._2.chunks))
): ZStream[R with Clock with Blocking with Consumer, Throwable, CommittableRecord[K, V]] =
partitionedStream(keyDeserializer, valueDeserializer).flatMapPar(n = Int.MaxValue)(_._2)
}
17 changes: 8 additions & 9 deletions src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ private[consumer] final class Runloop(
pollTimeout: Duration,
requestQueue: RequestBuffer,
commitQueue: Queue[Command.Commit],
val partitions: Queue[Take[Throwable, (TopicPartition, ZStreamChunk[Any, Throwable, ByteArrayCommittableRecord])]],
val partitions: Queue[Exit[Option[Throwable], (TopicPartition, Stream[Throwable, ByteArrayCommittableRecord])]],
rebalancingRef: Ref[Boolean],
diagnostics: Diagnostics,
shutdownRef: Ref[Boolean],
Expand All @@ -34,7 +34,7 @@ private[consumer] final class Runloop(
private val isShutdown = shutdownRef.get

def newPartitionStream(tp: TopicPartition) = {
val stream = ZStreamChunk {
val stream =
ZStream {
ZManaged.succeed {
for {
Expand All @@ -45,15 +45,14 @@ private[consumer] final class Runloop(
} yield result
}
}
}

partitions.offer(Take.Value(tp -> stream)).unit
partitions.offer(Exit.succeed(tp -> stream)).unit
}

def gracefulShutdown: UIO[Unit] =
for {
shutdown <- shutdownRef.modify((_, true))
_ <- partitions.offer(Take.End).when(!shutdown)
_ <- partitions.offer(Exit.fail(None)).when(!shutdown)
} yield ()

val rebalanceListener = {
Expand Down Expand Up @@ -392,7 +391,7 @@ private[consumer] final class Runloop(
else handleOperational(state, cmd)
}
}
.onError(cause => partitions.offer(Take.Fail(cause)))
.onError(cause => partitions.offer(Exit.halt(cause.map(Some(_)))))
.unit
.toManaged_
.fork
Expand Down Expand Up @@ -424,12 +423,12 @@ private[consumer] object Runloop {
commitQueue <- Queue.unbounded[Command.Commit].toManaged(_.shutdown)
partitions <- Queue
.unbounded[
Take[Throwable, (TopicPartition, ZStreamChunk[Any, Throwable, ByteArrayCommittableRecord])]
Exit[Option[Throwable], (TopicPartition, Stream[Throwable, ByteArrayCommittableRecord])]
]
.map { queue =>
queue.mapM {
case Take.End => queue.shutdown.as(Take.End)
case x => ZIO.succeed(x)
case e @ Exit.Failure(cause) if cause.contains(Cause.fail(None)) => queue.shutdown.as(e)
case x => ZIO.succeed(x)
}
}
.toManaged(_.shutdown)
Expand Down
32 changes: 16 additions & 16 deletions src/main/scala/zio/kafka/consumer/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ package object consumer {
): ZStream[
Clock with Blocking,
Throwable,
(TopicPartition, ZStreamChunk[R, Throwable, CommittableRecord[K, V]])
(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])
]

/**
Expand All @@ -77,7 +77,7 @@ package object consumer {
def plainStream[R, K, V](
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V]
): ZStreamChunk[R with Clock with Blocking, Throwable, CommittableRecord[K, V]]
): ZStream[R with Clock with Blocking, Throwable, CommittableRecord[K, V]]

/**
* Stops consumption of data, drains buffered records, and ends the attached
Expand Down Expand Up @@ -173,16 +173,16 @@ package object consumer {
): ZStream[
Clock with Blocking,
Throwable,
(TopicPartition, ZStreamChunk[R, Throwable, CommittableRecord[K, V]])
(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])
] =
ZStream
.fromQueue(runloop.partitions)
.unTake
.collectWhileSuccess
.map {
case (tp, partition) =>
val partitionStream =
if (settings.perPartitionChunkPrefetch <= 0) partition
else ZStreamChunk(partition.chunks.buffer(settings.perPartitionChunkPrefetch))
else partition.buffer(settings.perPartitionChunkPrefetch)

tp -> partitionStream.mapM(_.deserializeWith(keyDeserializer, valueDeserializer))
}
Expand All @@ -199,8 +199,8 @@ package object consumer {
override def plainStream[R, K, V](
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V]
): ZStreamChunk[R with Clock with Blocking, Throwable, CommittableRecord[K, V]] =
ZStreamChunk(partitionedStream(keyDeserializer, valueDeserializer).flatMapPar(n = Int.MaxValue)(_._2.chunks))
): ZStream[R with Clock with Blocking, Throwable, CommittableRecord[K, V]] =
partitionedStream(keyDeserializer, valueDeserializer).flatMapPar(n = Int.MaxValue)(_._2)

override def subscribeAnd(subscription: Subscription): SubscribedConsumer =
new SubscribedConsumer(subscribe(subscription).as(this))
Expand All @@ -225,7 +225,7 @@ package object consumer {
partitionStream.mapM {
case CommittableRecord(record, offset) =>
f(record.key(), record.value()).as(offset)
}.flattenChunks
}
}
}
.aggregateAsync(offsetBatches)
Expand Down Expand Up @@ -261,8 +261,8 @@ package object consumer {
consumer.withConsumer(_.unsubscribe())
}

val offsetBatches: ZSink[Any, Nothing, Nothing, Offset, OffsetBatch] =
ZSink.foldLeft[Offset, OffsetBatch](OffsetBatch.empty)(_ merge _)
val offsetBatches: ZTransducer[Any, Nothing, Offset, OffsetBatch] =
ZTransducer.foldLeft[Offset, OffsetBatch](OffsetBatch.empty)(_ merge _)

def live: ZLayer[Clock with Blocking with Has[ConsumerSettings] with Has[Diagnostics], Throwable, Consumer] =
ZLayer.fromServicesManaged[ConsumerSettings, Diagnostics, Clock with Blocking, Throwable, Service] {
Expand Down Expand Up @@ -333,24 +333,24 @@ package object consumer {
/**
* Accessor method for [[Service.partitionedStream]]
*/
def partitionedStream[R: Tagged, K: Tagged, V: Tagged](
def partitionedStream[R: Tag, K: Tag, V: Tag](
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V]
): ZStream[
Consumer with Clock with Blocking,
Throwable,
(TopicPartition, ZStreamChunk[R, Throwable, CommittableRecord[K, V]])
(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])
] =
ZStream.accessStream(_.get[Service].partitionedStream(keyDeserializer, valueDeserializer))

/**
* Accessor method for [[Service.plainStream]]
*/
def plainStream[R: Tagged, K: Tagged, V: Tagged](
def plainStream[R: Tag, K: Tag, V: Tag](
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V]
): ZStreamChunk[R with Consumer with Clock with Blocking, Throwable, CommittableRecord[K, V]] =
ZStreamChunk(ZStream.accessStream(_.get[Service].plainStream(keyDeserializer, valueDeserializer).chunks))
): ZStream[R with Consumer with Clock with Blocking, Throwable, CommittableRecord[K, V]] =
ZStream.accessStream(_.get[Service].plainStream(keyDeserializer, valueDeserializer))

/**
* Accessor method for [[Service.stopConsumption]]
Expand Down Expand Up @@ -400,7 +400,7 @@ package object consumer {
* @tparam V Type of values (an implicit `Deserializer` should be in scope)
* @return Effect that completes with a unit value only when interrupted. May fail when the [[Consumer]] fails.
*/
def consumeWith[R, R1: Tagged, K: Tagged, V: Tagged](
def consumeWith[R, R1: Tag, K: Tag, V: Tag](
settings: ConsumerSettings,
subscription: Subscription,
keyDeserializer: Deserializer[R1, K],
Expand Down
Loading

0 comments on commit ad51639

Please sign in to comment.