Skip to content

Commit

Permalink
Upgrade to ZIO RC20 (#184)
Browse files Browse the repository at this point in the history
  • Loading branch information
iravid authored Jun 2, 2020
1 parent 570f9eb commit b431e7f
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 10 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ lazy val scala213 = "2.13.2"
lazy val mainScala = scala213
lazy val allScala = Seq(scala211, scala212, mainScala)

lazy val zioVersion = "1.0.0-RC19-2"
lazy val zioVersion = "1.0.0-RC20"
lazy val kafkaVersion = "2.4.1"

// Allows to silence scalac compilation warnings selectively by code block or file path
Expand Down
8 changes: 4 additions & 4 deletions src/test/scala/zio/kafka/Benchmarks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object PopulateTopic extends App {
.take(length)
.chunkN(500)

def run(args: List[String]): ZIO[zio.ZEnv, Nothing, Int] =
def run(args: List[String]): ZIO[zio.ZEnv, Nothing, ExitCode] =
dataStream(872000).map {
case (k, v) => new ProducerRecord("inputs-topic", null, null, k, v)
}.mapChunksM(Producer.produceChunkAsync[Any, String, String](_).map(Chunk(_)))
Expand All @@ -35,7 +35,7 @@ object PopulateTopic extends App {
)
)
)
.fold(_ => 1, _ => 0)
.exitCode
}

object Plain {
Expand Down Expand Up @@ -79,7 +79,7 @@ object ZIOKafka extends App {
import zio.kafka.consumer._
import zio.duration._

def run(args: List[String]): ZIO[zio.ZEnv, Nothing, Int] = {
def run(args: List[String]): ZIO[zio.ZEnv, Nothing, ExitCode] = {
val expectedCount = 1000000
val settings = ConsumerSettings(List("localhost:9092"))
.withGroupId(s"zio-kafka-${scala.util.Random.nextInt}")
Expand Down Expand Up @@ -113,7 +113,7 @@ object ZIOKafka extends App {
}
})
.provideCustomLayer(ZLayer.fromManaged(Consumer.make(settings)))
.fold(_ => 1, _ => 0)
.exitCode

}
}
4 changes: 2 additions & 2 deletions src/test/scala/zio/kafka/ConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ object ConsumerSpec extends DefaultRunnableSpec {
} yield assert(kvOut)(equalTo(kvs))
},
testM("Consuming+provideCustomLayer") {
val kvs = (1 to 10000).toList.map(i => (s"key$i", s"msg$i"))
val kvs = (1 to 100).toList.map(i => (s"key$i", s"msg$i"))
for {
_ <- produceMany("topic170", kvs)

records <- Consumer
.subscribeAnd(Subscription.Topics(Set("topic170")))
.plainStream(Serde.string, Serde.string)
.take(10000)
.take(100)
.runCollect
.provideSomeLayer[Kafka with Blocking with Clock](consumer("group170", "client170"))
kvOut = records.map(r => (r.record.key, r.record.value))
Expand Down
4 changes: 3 additions & 1 deletion src/test/scala/zio/kafka/KafkaFutureSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ object KafkaFutureSpec extends DefaultRunnableSpec {
testM("interrupted") {
withKafkaFuture.use { f =>
for {
fiber <- AdminClient.fromKafkaFuture(ZIO.effectTotal(f)).fork
latch <- Promise.make[Nothing, Unit]
fiber <- AdminClient.fromKafkaFuture(latch.succeed(()) *> ZIO.effectTotal(f)).fork
_ <- latch.await
result <- fiber.interrupt
} yield {
assert(result.interrupted)(equalTo(true) ?? "fiber was interrupted") &&
Expand Down
4 changes: 2 additions & 2 deletions src/test/scala/zio/kafka/ProducerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ object ProducerSpec extends DefaultRunnableSpec {
settings <- consumerSettings("testGroup", "testClient")
record1 <- withConsumer(Topics(Set(topic1)), settings).use { consumer =>
for {
messages <- consumer.take.flatMap(ZIO.done(_)).mapError(_.getOrElse(new NoSuchElementException))
messages <- consumer.take.flatMap(_.done).mapError(_.getOrElse(new NoSuchElementException))
record = messages
.filter(rec => rec.record.key == key1 && rec.record.value == value1)
.toSeq
} yield record
}
record2 <- withConsumer(Topics(Set(topic2)), settings).use { consumer =>
for {
messages <- consumer.take.flatMap(ZIO.done(_)).mapError(_.getOrElse(new NoSuchElementException))
messages <- consumer.take.flatMap(_.done).mapError(_.getOrElse(new NoSuchElementException))
record = messages.filter(rec => rec.record.key == key2 && rec.record.value == value2)
} yield record
}
Expand Down

0 comments on commit b431e7f

Please sign in to comment.