From 9508e0cef8658919ec45bc57683cbb61946580bf Mon Sep 17 00:00:00 2001 From: Luis Rodero-Merino Date: Fri, 21 Jul 2023 21:36:33 +0200 Subject: [PATCH] Changed tutorial so consumers are not overwhelmed by producers --- docs/tutorial.md | 294 ++++++++++++++++++++++++++++------------------- 1 file changed, 173 insertions(+), 121 deletions(-) diff --git a/docs/tutorial.md b/docs/tutorial.md index 89d3ca77815..01409809efb 100644 --- a/docs/tutorial.md +++ b/docs/tutorial.md @@ -42,11 +42,13 @@ running the code snippets in this tutorial, it is recommended to use the same ```scala name := "cats-effect-tutorial" -version := "3.5.1" +val ceVersion = "3.5.1" -scalaVersion := "2.13.6" +version := ceVersion -libraryDependencies += "org.typelevel" %% "cats-effect" % "3.5.1" withSources() withJavadoc() +scalaVersion := "2.13.11" + +libraryDependencies += "org.typelevel" %% "cats-effect" % ceVersion withSources() withJavadoc() scalacOptions ++= Seq( "-feature", @@ -56,11 +58,11 @@ scalacOptions ++= Seq( ) ``` -Also make sure that you use a recent version of `sbt`, at least `1.4.2`. You can -set the `sbt` version in `project/build.properties` file: +Also make sure that you use a recent version of `sbt`, _e.g._ `1.9.3`. You can set +the `sbt` version in `project/build.properties` file: ```scala -sbt.version=1.4.2 +sbt.version=1.9.3 ``` Almost all code snippets in this tutorial can be pasted and compiled right in @@ -162,7 +164,7 @@ import cats.effect.{IO, Resource} import java.io.{File, FileInputStream} def inputStream(f: File): Resource[IO, FileInputStream] = - Resource.fromAutoCloseable(IO(new FileInputStream(f))) + Resource.fromAutoCloseable(IO.blocking(new FileInputStream(f))) ``` That code is way simpler, but with that code we would not have control over what @@ -181,11 +183,11 @@ import java.io._ def inputOutputStreams(in: File, out: File): Resource[IO, (InputStream, OutputStream)] = ??? // transfer will do the real work -def transfer(origin: InputStream, destination: OutputStream): IO[Long] = ??? +def transfer(origin: InputStream, destination: OutputStream, buffer: Array[Byte], acc: Long): IO[Long] = ??? def copy(origin: File, destination: File): IO[Long] = inputOutputStreams(origin, destination).use { case (in, out) => - transfer(in, out) + transfer(in, out, new Array[Byte](1024 * 10), 0) } ``` @@ -220,17 +222,17 @@ import java.io._ // function inputOutputStreams not needed // transfer will do the real work -def transfer(origin: InputStream, destination: OutputStream): IO[Long] = ??? +def transfer(origin: InputStream, destination: OutputStream, buffer: Array[Byte], acc: Long): IO[Long] = ??? def copy(origin: File, destination: File): IO[Long] = { - val inIO: IO[InputStream] = IO(new FileInputStream(origin)) - val outIO:IO[OutputStream] = IO(new FileOutputStream(destination)) + val inIO: IO[InputStream] = IO.blocking(new FileInputStream(origin)) + val outIO:IO[OutputStream] = IO.blocking(new FileOutputStream(destination)) (inIO, outIO) // Stage 1: Getting resources .tupled // From (IO[InputStream], IO[OutputStream]) to IO[(InputStream, OutputStream)] .bracket{ case (in, out) => - transfer(in, out) // Stage 2: Using resources (for copying data, in this case) + transfer(in, out, new Array[Byte](1024 * 10), 0L) // Stage 2: Using resources (for copying data, in this case) } { case (in, out) => // Stage 3: Freeing resources (IO(in.close()), IO(out.close())) @@ -261,28 +263,22 @@ Finally we have our streams ready to go! We have to focus now on coding `transfer`. That function will have to define a loop that at each iteration reads data from the input stream into a buffer, and then writes the buffer contents into the output stream. At the same time, the loop will keep a counter -of the bytes transferred. To reuse the same buffer we should define it outside -the main loop, and leave the actual transmission of data to another function -`transmit` that uses that loop. Something like: +of the bytes transferred. ```scala mdoc:compile-only import cats.effect.IO -import cats.syntax.all._ import java.io._ -def transmit(origin: InputStream, destination: OutputStream, buffer: Array[Byte], acc: Long): IO[Long] = +def transfer(origin: InputStream, destination: OutputStream, buffer: Array[Byte], acc: Long): IO[Long] = for { - amount <- IO.blocking(origin.read(buffer, 0, buffer.size)) - count <- if(amount > -1) IO.blocking(destination.write(buffer, 0, amount)) >> transmit(origin, destination, buffer, acc + amount) - else IO.pure(acc) // End of read stream reached (by java.io.InputStream contract), nothing to write - } yield count // Returns the actual amount of bytes transmitted - -def transfer(origin: InputStream, destination: OutputStream): IO[Long] = - transmit(origin, destination, new Array[Byte](1024 * 10), 0L) + amount <- IO.blocking(origin.read(buffer, 0, buffer.length)) + count <- if (amount > -1) IO.blocking(destination.write(buffer, 0, amount)) >> transfer(origin, destination, buffer, acc + amount) + else IO.pure(acc) // End of read stream reached (by java.io.InputStream contract), nothing to write + } yield count // Returns the actual amount of bytes transferred ``` -Take a look at `transmit`, observe that both input and output actions are -created by invoking `IO.blocking` which return the actions encapsulated in a +Take a look at `transfer`, observe that both input and output actions are +created by invoking `IO.blocking` which return the actions encapsulated in (suspended in) `IO`. We can also just embed the actions by calling `IO(action)`, but when dealing with input/output actions it is advised that you instead use `IO.blocking(action)`. This way we help cats-effect to better plan how to assign @@ -295,7 +291,7 @@ the call to `read()` does not return a negative value that would signal that the end of the stream has been reached. `>>` is a Cats operator to sequence two operations where the output of the first is not needed by the second (_i.e._ it is equivalent to `first.flatMap(_ => second)`). In the code above that means -that after each write operation we recursively call `transmit` again, but as +that after each write operation we recursively call `transfer` again, but as `IO` is stack safe we are not concerned about stack overflow issues. At each iteration we increase the counter `acc` with the amount of bytes read at that iteration. @@ -309,7 +305,7 @@ cancelation in the next section. ### Dealing with cancelation Cancelation is a powerful but non-trivial cats-effect feature. In cats-effect, -some `IO` instances can be canceled ( _e.g._ by other `IO` instances running +some `IO` instances can be canceled (_e.g._ by other `IO` instances running concurrently) meaning that their evaluation will be aborted. If the programmer is careful, an alternative `IO` task will be run under cancelation, for example to deal with potential cleaning up activities. @@ -318,7 +314,7 @@ Thankfully, `Resource` makes dealing with cancelation an easy task. If the `IO` inside a `Resource.use` is canceled, the release section of that resource is run. In our example this means the input and output streams will be properly closed. Also, cats-effect does not cancel code inside `IO.blocking` instances. -In the case of our `transmit` function this means the execution would be +In the case of our `transfer` function this means the execution would be interrupted only between two calls to `IO.blocking`. If we want the execution of an IO instance to be interrupted when canceled, without waiting for it to finish, we must instantiate it using `IO.interruptible`. @@ -351,8 +347,7 @@ object Main extends IOApp { override def run(args: List[String]): IO[ExitCode] = for { - _ <- if(args.length < 2) IO.raiseError(new IllegalArgumentException("Need origin and destination files")) - else IO.unit + _ <- IO.raiseWhen(args.length < 2)(new IllegalArgumentException("Need origin and destination files")) orig = new File(args(0)) dest = new File(args(1)) count <- copy(orig, dest) @@ -363,8 +358,9 @@ object Main extends IOApp { Heed how `run` verifies the `args` list passed. If there are fewer than two arguments, an error is raised. As `IO` implements `MonadError` we can at any -moment call to `IO.raiseError` to interrupt a sequence of `IO` operations. The log -message is printed by means of handy `IO.println` method. +moment call to `IO.raiseWhen` or `IO.raiseError` to interrupt a sequence of `IO` +operations. The log message is printed by means of the handy `IO.println` +method. #### Copy program code You can check the [final version of our copy program @@ -373,7 +369,7 @@ here](https://github.com/lrodero/cats-effect-tutorial/blob/series/3.x/src/main/s The program can be run from `sbt` just by issuing this call: ```scala -> runMain catseffecttutorial.CopyFile origin.txt destination.txt +> runMain catseffecttutorial.copyfile.CopyFile origin.txt destination.txt ``` It can be argued that using `IO{java.nio.file.Files.copy(...)}` would get an @@ -407,23 +403,22 @@ import cats.effect.Sync import cats.syntax.all._ import java.io._ -def transmit[F[_]: Sync](origin: InputStream, destination: OutputStream, buffer: Array[Byte], acc: Long): F[Long] = +def transfer[F[_]: Sync](origin: InputStream, destination: OutputStream, buffer: Array[Byte], acc: Long): F[Long] = for { amount <- Sync[F].blocking(origin.read(buffer, 0, buffer.length)) - count <- if(amount > -1) Sync[F].blocking(destination.write(buffer, 0, amount)) >> transmit(origin, destination, buffer, acc + amount) + count <- if(amount > -1) Sync[F].blocking(destination.write(buffer, 0, amount)) >> transfer(origin, destination, buffer, acc + amount) else Sync[F].pure(acc) // End of read stream reached (by java.io.InputStream contract), nothing to write - } yield count // Returns the actual amount of bytes transmitted + } yield count // Returns the actual amount of bytes transferred ``` We leave as an exercise to code the polymorphic versions of `inputStream`, -`outputStream`, `inputOutputStreams`, `transfer` and `copy` functions. +`outputStream`, `inputOutputStreams` and `copy` functions. ```scala mdoc:compile-only import cats.effect._ import java.io._ -def transmit[F[_]: Sync](origin: InputStream, destination: OutputStream, buffer: Array[Byte], acc: Long): F[Long] = ??? -def transfer[F[_]: Sync](origin: InputStream, destination: OutputStream): F[Long] = ??? +def transfer[F[_]: Sync](origin: InputStream, destination: OutputStream, buffer: Array[Byte], acc: Long): F[Long] = ??? def inputStream[F[_]: Sync](f: File): Resource[F, FileInputStream] = ??? def outputStream[F[_]: Sync](f: File): Resource[F, FileOutputStream] = ??? def inputOutputStreams[F[_]: Sync](in: File, out: File): Resource[F, (InputStream, OutputStream)] = ??? @@ -452,18 +447,16 @@ your IO-kungfu: 1. Modify the `IOApp` so it shows an error and abort the execution if the origin and destination files are the same, the origin file cannot be open for - reading or the destination file cannot be opened for writing. Also, if the + reading, or the destination file cannot be opened for writing. Also, if the destination file already exists, the program should ask for confirmation before overwriting that file. -2. Modify `transmit` so the buffer size is not hardcoded but passed as - parameter. -3. Test safe cancelation, checking that the streams are indeed being properly +2. Test safe cancelation, checking that the streams are indeed being properly closed. You can do that just by interrupting the program execution pressing `Ctrl-c`. To make sure you have the time to interrupt the program, introduce - a delay of a few seconds in the `transmit` function (see `IO.sleep`). And to + a delay of a few seconds in the `transfer` function (see `IO.sleep`). And to ensure that the release functionality in the `Resource`s is run you can add some log message there (see `IO.println`). -4. Create a new program able to copy folders. If the origin folder has +3. Create a new program able to copy folders. If the origin folder has subfolders, then their contents must be recursively copied too. Of course the copying must be safely cancelable at any moment. @@ -509,23 +502,26 @@ info as a hint to optimize `IO` scheduling. Another difference with threads is that fibers are very cheap entities. We can spawn millions of them at ease without impacting the performance. -A worthy note is that you do not have to explicitly shut down fibers. If you spawn -a fiber and it finishes actively running its `IO` it will get cleaned up by the -garbage collector unless there is some other active memory reference to it. So basically -you can treat a fiber as any other regular object, except that when the fiber is _running_ -(present tense), the cats-effect runtime itself keeps the fiber alive. - -This has some interesting implications as well. Like if you create an `IO.async` node and -register the callback with something, and you're in a Fiber which has no strong object -references anywhere else (i.e. you did some sort of fire-and-forget thing), then the callback -itself is the only strong reference to the fiber. Meaning if the registration fails or the -system you registered with throws it away, the fiber will just gracefully disappear. - -Cats-effect implements some concurrency primitives to coordinate concurrent -fibers: [Deferred](std/deferred.md), [Ref](std/ref.md), `Semaphore`... - -Way more detailed info about concurrency in cats-effect can be found in [this -other tutorial 'Concurrency in Scala with +A worthy note is that you do not have to explicitly shut down fibers. If you +spawn a fiber and it finishes actively running its `IO` it will get cleaned up +by the garbage collector unless there is some other active memory reference to +it. So basically you can treat a fiber as any other regular object, except that +when the fiber is _running_ (present tense), the cats-effect runtime itself +keeps the fiber alive. + +This has some interesting implications as well. Like if you create an `IO.async` +node and register the callback with something, and you're in a fiber which has +no strong object references anywhere else (i.e. you did some sort of +fire-and-forget thing), then the callback itself is the only strong reference to +the fiber. Meaning if the registration fails or the system you registered with +throws it away, the fiber will just gracefully disappear. + +And a final hint: as with threads, often you will need to coordinate the work of +concurrent fibers. Writing concurrent code is a difficult exercise, but +cats-effect implements some concurrency primitives such as +[Deferred](std/deferred.md), [Ref](std/ref.md), [Semaphore](std/semaphore.md)... +that will help you in that task. Way more detailed info about concurrency in +cats-effect can be found in [this other tutorial 'Concurrency in Scala with Cats-Effect'](https://github.com/slouc/concurrency-in-scala-with-ce). Ok, now we have briefly discussed fibers we can start working on our @@ -539,33 +535,34 @@ integers (`1`, `2`, `3`...), consumer will just read that sequence. Our shared queue will be an instance of an immutable `Queue[Int]`. Accesses to the queue can (and will!) be concurrent, thus we need some way to -protect the queue so only one fiber at a time is handling it. The best way to -ensure an ordered access to some shared data is [Ref](std/ref.md). A -`Ref` instance wraps some given data and implements methods to manipulate that -data in a safe manner. When some fiber is runnning one of those methods, any -other call to any method of the `Ref` instance will be blocked. +protect the queue so only one fiber at a time is handling it. A good way to +ensure an ordered access to some shared data is [Ref](std/ref.md). A `Ref` +instance wraps some given data and implements methods to manipulate that data in +a safe manner. The `Ref` wrapping our queue will be `Ref[F, Queue[Int]]` (for some `F[_]`). Now, our `producer` method will be: - ```scala mdoc:compile-only import cats.effect._ import cats.effect.std.Console import cats.syntax.all._ -import collection.immutable.Queue +import scala.collection.immutable.Queue def producer[F[_]: Sync: Console](queueR: Ref[F, Queue[Int]], counter: Int): F[Unit] = for { - _ <- if(counter % 10000 == 0) Console[F].println(s"Produced $counter items") else Sync[F].unit + _ <- Sync[F].whenA(counter % 10000 == 0)(Console[F].println(s"Produced $counter items")) _ <- queueR.getAndUpdate(_.enqueue(counter + 1)) _ <- producer(queueR, counter + 1) } yield () ``` -First line just prints some log message every `10000` items, so we know if it is -'alive'. It uses type class `Console[_]`, which brings the capacity to print -and read strings (`IO.println` just uses `Console[IO].println` underneath). +First line just prints some log message every `10000` items, so we know if our +producer is still 'alive'. We can as well do `if(cond) then Console... else +Sync[F].unit` but this approach is more idiomatic. To print logs the code uses +type class `Console[_]`, which brings the capacity to print and read strings +(the `IO.println` call we used before just invokes `Console[IO].println` under +the hood). Then our code calls `queueR.getAndUpdate` to add data into the queue. Note that `.getAndUpdate` provides the current queue, then we use `.enqueue` to @@ -580,14 +577,14 @@ queue but it must be aware that the queue can be empty: import cats.effect._ import cats.effect.std.Console import cats.syntax.all._ -import collection.immutable.Queue +import scala.collection.immutable.Queue def consumer[F[_]: Sync: Console](queueR: Ref[F, Queue[Int]]): F[Unit] = for { iO <- queueR.modify{ queue => queue.dequeueOption.fold((queue, Option.empty[Int])){case (i,queue) => (queue, Option(i))} } - _ <- if(iO.exists(_ % 10000 == 0)) Console[F].println(s"Consumed ${iO.get} items") else Sync[F].unit + _ <- Sync[F].whenA(iO.exists(_ % 10000 == 0))(Console[F].println(s"Consumed ${iO.get} items")) _ <- consumer(queueR) } yield () ``` @@ -605,7 +602,7 @@ We can now create a program that instantiates our `queueR` and runs both import cats.effect._ import cats.effect.std.Console import cats.syntax.all._ -import collection.immutable.Queue +import scala.collection.immutable.Queue object InefficientProducerConsumer extends IOApp { @@ -625,14 +622,11 @@ object InefficientProducerConsumer extends IOApp { } ``` -The full implementation of this naive producer consumer is available -[here](https://github.com/lrodero/cats-effect-tutorial/blob/series/3.x/src/main/scala/catseffecttutorial/producerconsumer/InefficientProducerConsumer.scala). - Our `run` function instantiates the shared queue wrapped in a `Ref` and boots the producer and consumer in parallel. To do to it uses `parMapN`, that creates and runs the fibers that will run the `IO`s passed as parameter. Then it takes the output of each fiber and applies a given function to them. In our case -both producer and consumer shall run forever until user presses CTRL-C which +both producer and consumer shall run forever until the user presses CTRL-C which will trigger a cancelation. Alternatively we could have used `start` method to explicitly create new @@ -641,7 +635,7 @@ wait for them to finish, something like: ```scala mdoc:compile-only import cats.effect._ -import collection.immutable.Queue +import scala.collection.immutable.Queue object InefficientProducerConsumer extends IOApp { @@ -668,7 +662,7 @@ happened. Cats Effect provides additional `joinWith` or `joinWithNever` methods to make sure at least that the error is raised with the usual `MonadError` semantics -(e.g., short-circuiting). Now that we are raising the error, we also need to +(_i.e._, short-circuiting). Now that we are raising the error, we also need to cancel the other running fibers. We can easily get ourselves trapped in a tangled mess of fibers to keep an eye on. On top of that the error raised by a fiber is not promoted until the call to `joinWith` or `.joinWithNever` is @@ -685,13 +679,65 @@ have some specific and unusual requirements you should prefer to use higher level commands such as `parMapN` or `parSequence` to work with fibers_. Ok, we stick to our implementation based on `.parMapN`. Are we done? Does it -Work? Well, it works... but it is far from ideal. If we run it we will find that -the producer runs faster than the consumer so the queue is constantly growing. -And, even if that was not the case, we must realize that the consumer will be -continually running regardless if there are elements in the queue, which is far -from ideal. We will try to improve it in the next section by using -[Deferred](std/deferred.md). Also we will use several consumers and -producers to balance production and consumption rate. +Work? Well, it works... but it is far from ideal. + +#### Issue 1: the producer outpaces the consumer +Now, if you run the program you will notice that almost no consumer logs are +shown, if any. This is a signal that the producer is running way faster than +the consumer. And why is that? Well, this is because how `Ref.modify` works. It +gets the current value, then it computes the update, and finally it tries to set +the new value if the current one has not been changed (by some other fiber), +otherwise it starts from the beginning. Unfortunately the producer is way faster +running its `queueR.getAndUpdate` call than the consumer is running its +`queueR.modify` call. So the consumer gets 'stuck' trying once and again to +update the `queueR` content. + +Can we alleviate this? Sure! There are a few options you can implement: +1. Making the producer artifically slower by introducing a call to `Async[F].sleep` + (_e.g._ for 1 microsecond). Truth is, in real world scenarios a producer will + not be as fast as in our example so this tweak is not that 'strange'. Note that + to be able to use `sleep` now `F` requires an implicit `Async[F]` instance. The + new producer will look like this: + ```scala mdoc:compile-only + import cats.effect._ + import cats.effect.std.Console + import cats.syntax.all._ + import scala.collection.immutable.Queue + import scala.concurrent.duration.DurationInt + + def producer[F[_]: Async: Console](queueR: Ref[F, Queue[Int]], counter: Int): F[Unit] = + for { + _ <- Async[F].whenA(counter % 10000 == 0)(Console[F].println(s"Produced $counter items")) + _ <- Async[F].sleep(1.microsecond) // To prevent overwhelming consumers + _ <- queueR.getAndUpdate(_.enqueue(counter + 1)) + _ <- producer(queueR, counter + 1) + } yield () + ``` +2. Replace `Ref` with `AtomicCell` to keep the `Queue` instance. `AtomicCell`, + as `Ref`, is a concurrent data structure to keep a reference to some data. + But unlike `Ref` it ensures that only one fiber can operate on that reference + at any given time. Thus the consumer won't have to try once and again to modify + its content. Otoh `AtomicCell` is slower than `Ref`. This is because `Ref` is + nonblocking while `AtomicCell` will block calling fibers to ensure only one + operates on its content. +3. Make the queue bound by size so producers are forced to wait for consumers to + extract data when the queue is full. We will do this later on in Section + [Producer consumer with bounded +queue](#producer-consumer-with-bounded-queue). + +By the way, you may be tempted to speed up the `queueR.modify` call in the +consumer by using a mutable `Queue` instance. Do not! `Ref` _must_ be used only +with immutable data. + +#### Issue 2: consumer runs even if there are no elements in the queue +The consumer will be continually running regardless if there are elements in the +queue, which is far from ideal. If we have several consumers competing for the +data the problem gets even worse. We will address this problem in the next +section by using [Deferred](std/deferred.md). + +The full implementation of the naive producer consumer we have just created in +this section is available +[here](https://github.com/lrodero/cats-effect-tutorial/blob/series/3.x/src/main/scala/catseffecttutorial/producerconsumer/InefficientProducerConsumer.scala). ### A more solid implementation of the producer/consumer problem In our producer/consumer code we already protect access to the queue (our shared @@ -701,13 +747,18 @@ somehow if queue is empty until some element can be returned. This will be done by creating and keeping instances of `Deferred`. A `Deferred[F, A]` instance can hold one single element of some type `A`. `Deferred` instances are created empty, and can be filled only once. If some fiber tries to read the element from -an empty `Deferred` then it will be semantically blocked until some other fiber -fills (completes) it. +an empty `Deferred` then it will wait until some other fiber fills (completes) +it. But recall that this waiting does not involve blocking any physical thread, +that's the beauty of fibers! + +Also, we will step up our code so we can handle several producers and consumers +in parallel. -Thus, alongside the queue of produced but not yet consumed elements, we have to -keep track of the `Deferred` instances created when the queue was empty that are -waiting for elements to be available. These instances will be kept in a new -queue `takers`. We will keep both queues in a new type `State`: +Ok, so, alongside the queue of produced but not yet consumed elements, we have +to keep track of the `Deferred` instances (created because consumers found an +emnpty queue) that are waiting for elements to be available. These instances +will be kept in a new queue `takers`. We will keep both queues in a new type +`State`: ```scala mdoc:compile-only import cats.effect.Deferred @@ -719,7 +770,7 @@ Both producer and consumer will access the same shared state instance, which will be carried and safely modified by an instance of `Ref`. Consumer shall work as follows: 1. If `queue` is not empty, it will extract and return its head. The new state - will keep the tail of the queue, not change on `takers` will be needed. + will keep the tail of the queue, no change on `takers` will be needed. 2. If `queue` is empty it will use a new `Deferred` instance as a new `taker`, add it to the `takers` queue, and 'block' the caller by invoking `taker.get` @@ -749,7 +800,7 @@ def consumer[F[_]: Async: Console](id: Int, stateR: Ref[F, State[F, Int]]): F[Un for { i <- take - _ <- if(i % 10000 == 0) Console[F].println(s"Consumer $id has reached $i items") else Async[F].unit + _ <- Async[F].whenA(i % 10000 == 0)(Console[F].println(s"Consumer $id has reached $i items")) _ <- consumer(id, stateR) } yield () } @@ -761,21 +812,22 @@ we will have now several producers and consumers running in parallel). The Note how it will block on `taker.get` when the queue is empty. The producer, for its part, will: -1. If there are waiting `takers`, it will take the first in the queue and offer - it the newly produced element (`taker.complete`). +1. If there are waiting `takers`, it will take the first one in the takers queue + and offer it the newly produced element (`taker.complete`). 2. If no `takers` are present, it will just enqueue the produced element. Thus the producer will look like: ```scala mdoc:compile-only -import cats.effect.{Deferred, Ref, Sync} +import cats.effect.{Async, Deferred, Ref} import cats.effect.std.Console import cats.syntax.all._ import scala.collection.immutable.Queue +import scala.concurrent.duration.DurationInt case class State[F[_], A](queue: Queue[A], takers: Queue[Deferred[F,A]]) -def producer[F[_]: Sync: Console](id: Int, counterR: Ref[F, Int], stateR: Ref[F, State[F,Int]]): F[Unit] = { +def producer[F[_]: Async: Console](id: Int, counterR: Ref[F, Int], stateR: Ref[F, State[F,Int]]): F[Unit] = { def offer(i: Int): F[Unit] = stateR.modify { @@ -783,18 +835,22 @@ def producer[F[_]: Sync: Console](id: Int, counterR: Ref[F, Int], stateR: Ref[F, val (taker, rest) = takers.dequeue State(queue, rest) -> taker.complete(i).void case State(queue, takers) => - State(queue.enqueue(i), takers) -> Sync[F].unit + State(queue.enqueue(i), takers) -> Async[F].unit }.flatten for { i <- counterR.getAndUpdate(_ + 1) _ <- offer(i) - _ <- if(i % 10000 == 0) Console[F].println(s"Producer $id has reached $i items") else Sync[F].unit + _ <- Async[F].whenA(i % 100000 == 0)(Console[F].println(s"Producer $id has reached $i items")) + _ <- Async[F].sleep(1.microsecond) // To prevent overwhelming consumers _ <- producer(id, counterR, stateR) } yield () } ``` +As in the previous section we introduce an artificial delay in order not to +overwhelm consumers. + Finally we modify our main program so it instantiates the counter and state `Ref`s. Also it will create several consumers and producers, 10 of each, and will start all of them in parallel: @@ -842,18 +898,12 @@ are started in their own fiber by the call to `parSequence`, which will wait for all of them to finish and then return the value passed as parameter. As in the previous example this program shall run forever until the user presses CTRL-C. -Having several consumers and producers improves the balance between consumers -and producers... but still, on the long run, queue tends to grow in size. To -fix this we will ensure the size of the queue is bounded, so whenever that max -size is reached producers will block as consumers do when the queue is empty. - - ### Producer consumer with bounded queue -Having a bounded queue implies that producers, when queue is full, will wait (be -'semantically blocked') until there is some empty bucket available to be filled. -So an implementation needs to keep track of these waiting producers. To do so we -will add a new queue `offerers` that will be added to the `State` alongside -`takers`. For each waiting producer the `offerers` queue will keep a +Having a bounded queue implies that producers, when the queue is full, will wait +(be 'semantically blocked') until there is some empty bucket available to be +filled. So an implementation needs to keep track of these waiting producers. To +do so we will add a new queue `offerers` that will be added to the `State` +alongside `takers`. For each waiting producer the `offerers` queue will keep a `Deferred[F, Unit]` that will be used to block the producer until the element it offers can be added to `queue` or directly passed to some consumer (`taker`). Alongside the `Deferred` instance we need to keep as well the actual element @@ -917,7 +967,7 @@ def consumer[F[_]: Async: Console](id: Int, stateR: Ref[F, State[F, Int]]): F[Un for { i <- take - _ <- if(i % 10000 == 0) Console[F].println(s"Consumer $id has reached $i items") else Async[F].unit + _ <- Async[F].whenA(i % 100000 == 0)(Console[F].println(s"Consumer $id has reached $i items")) _ <- consumer(id, stateR) } yield () } @@ -960,14 +1010,16 @@ def producer[F[_]: Async: Console](id: Int, counterR: Ref[F, Int], stateR: Ref[F for { i <- counterR.getAndUpdate(_ + 1) _ <- offer(i) - _ <- if(i % 10000 == 0) Console[F].println(s"Producer $id has reached $i items") else Async[F].unit + _ <- Async[F].whenA(i % 100000 == 0)(Console[F].println(s"Producer $id has reached $i items")) _ <- producer(id, counterR, stateR) } yield () } ``` As you see, producer and consumer are coded around the idea of keeping and -modifying state, just as with unbounded queues. +modifying state, just as with unbounded queues. Also we do not need to introduce +an artificial delay in producers, as soon as the queue gets full they will be +'blocked' thus giving a chance to consumers to read data. As the final step we must adapt the main program to use these new consumers and producers. Let's say we limit the queue size to `100`, then we have: @@ -1100,7 +1152,7 @@ def producer[F[_]: Async: Console](id: Int, counterR: Ref[F, Int], stateR: Ref[F for { i <- counterR.getAndUpdate(_ + 1) _ <- offer(i) - _ <- if(i % 10000 == 0) Console[F].println(s"Producer $id has reached $i items") else Async[F].unit + _ <- Async[F].whenA(i % 100000 == 0)(Console[F].println(s"Producer $id has reached $i items")) _ <- producer(id, counterR, stateR) } yield () } @@ -1146,7 +1198,7 @@ def consumer[F[_]: Async: Console](id: Int, stateR: Ref[F, State[F, Int]]): F[Un for { i <- take - _ <- if(i % 10000 == 0) Console[F].println(s"Consumer $id has reached $i items") else Async[F].unit + _ <- Async[F].whenA(i % 100000 == 0)(Console[F].println(s"Consumer $id has reached $i items")) _ <- consumer(id, stateR) } yield () }