From 41b58d8000bd4284e7778d4eaefcb998489b3691 Mon Sep 17 00:00:00 2001 From: Itamar Ravid Date: Sat, 16 May 2020 14:26:57 +0300 Subject: [PATCH] Update README.md --- README.md | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index b3041b402..d33bc0fed 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ client. It integrates effortlessly with ZIO and ZIO Streams. Add the following dependencies to your `build.sbt` file: ``` libraryDependencies ++= Seq( - "dev.zio" %% "zio-streams" % "1.0.0-RC18-2", + "dev.zio" %% "zio-streams" % "1.0.0-RC19", "dev.zio" %% "zio-kafka" % "" ) ``` @@ -97,7 +97,6 @@ import zio.kafka.consumer._ Consumer.subscribeAnd(Subscription.topics("topic150")) .plainStream(Serde.string, Serde.string) - .flattenChunks .tap(cr => putStrLn(s"key: ${cr.record.key}, value: ${cr.record.value}")) .map(_.offset) .aggregateAsync(Consumer.offsetBatches) @@ -115,7 +114,7 @@ import zio.kafka.consumer._ Consumer.subscribeAnd(Subscription.topics("topic150")) .partitionedStream(Serde.string, Serde.string) .tap(tpAndStr => putStrLn(s"topic: ${tpAndStr._1.topic}, partition: ${tpAndStr._1.partition}")) - .flatMap(_._2.flattenChunks) + .flatMap(_._2) .tap(cr => putStrLn(s"key: ${cr.record.key}, value: ${cr.record.value}")) .map(_.offset) .aggregateAsync(Consumer.offsetBatches) @@ -152,12 +151,11 @@ val consumeProduceStream = Consumer val producerRecord: ProducerRecord[Int, String] = new ProducerRecord("my-output-topic", key, newValue) (producerRecord, record.offset) } - .chunks - .mapM { chunk => + .mapChunksM { chunk => val records = chunk.map(_._1) val offsetBatch = OffsetBatch(chunk.map(_._2).toSeq) - Producer.produceChunk[Any, Int, String](records) *> offsetBatch.commit + Producer.produceChunk[Any, Int, String](records) *> offsetBatch.commit.as(Chunk(())) } .runDrain .provideSomeLayer(consumerAndProducer) @@ -228,7 +226,6 @@ stream ZIO.succeed(offset) } } - .flattenChunks .aggregateAsync(Consumer.offsetBatches) .mapM(_.commit) .runDrain