Skip to content

Commit

Permalink
Update README.md
Browse files Browse the repository at this point in the history
  • Loading branch information
iravid committed May 16, 2020
1 parent ad51639 commit 41b58d8
Showing 1 changed file with 4 additions and 7 deletions.
11 changes: 4 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ client. It integrates effortlessly with ZIO and ZIO Streams.
Add the following dependencies to your `build.sbt` file:
```
libraryDependencies ++= Seq(
"dev.zio" %% "zio-streams" % "1.0.0-RC18-2",
"dev.zio" %% "zio-streams" % "1.0.0-RC19",
"dev.zio" %% "zio-kafka" % "<version>"
)
```
Expand Down Expand Up @@ -97,7 +97,6 @@ import zio.kafka.consumer._

Consumer.subscribeAnd(Subscription.topics("topic150"))
.plainStream(Serde.string, Serde.string)
.flattenChunks
.tap(cr => putStrLn(s"key: ${cr.record.key}, value: ${cr.record.value}"))
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
Expand All @@ -115,7 +114,7 @@ import zio.kafka.consumer._
Consumer.subscribeAnd(Subscription.topics("topic150"))
.partitionedStream(Serde.string, Serde.string)
.tap(tpAndStr => putStrLn(s"topic: ${tpAndStr._1.topic}, partition: ${tpAndStr._1.partition}"))
.flatMap(_._2.flattenChunks)
.flatMap(_._2)
.tap(cr => putStrLn(s"key: ${cr.record.key}, value: ${cr.record.value}"))
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
Expand Down Expand Up @@ -152,12 +151,11 @@ val consumeProduceStream = Consumer
val producerRecord: ProducerRecord[Int, String] = new ProducerRecord("my-output-topic", key, newValue)
(producerRecord, record.offset)
}
.chunks
.mapM { chunk =>
.mapChunksM { chunk =>
val records = chunk.map(_._1)
val offsetBatch = OffsetBatch(chunk.map(_._2).toSeq)

Producer.produceChunk[Any, Int, String](records) *> offsetBatch.commit
Producer.produceChunk[Any, Int, String](records) *> offsetBatch.commit.as(Chunk(()))
}
.runDrain
.provideSomeLayer(consumerAndProducer)
Expand Down Expand Up @@ -228,7 +226,6 @@ stream
ZIO.succeed(offset)
}
}
.flattenChunks
.aggregateAsync(Consumer.offsetBatches)
.mapM(_.commit)
.runDrain
Expand Down

0 comments on commit 41b58d8

Please sign in to comment.