From 8fe95e684752a1a8d53856380235f070abfc6d50 Mon Sep 17 00:00:00 2001 From: Maksym Ochenashko Date: Wed, 25 Sep 2024 11:46:43 +0300 Subject: [PATCH] Implement async dropping queue --- .../effect/benchmarks/QueueBenchmark.scala | 16 ++ .../main/scala/cats/effect/std/Queue.scala | 237 +++++++++++------- .../scala/cats/effect/std/QueueSpec.scala | 6 +- 3 files changed, 163 insertions(+), 96 deletions(-) diff --git a/benchmarks/src/main/scala/cats/effect/benchmarks/QueueBenchmark.scala b/benchmarks/src/main/scala/cats/effect/benchmarks/QueueBenchmark.scala index 6a34c7fcca..cf932f6f80 100644 --- a/benchmarks/src/main/scala/cats/effect/benchmarks/QueueBenchmark.scala +++ b/benchmarks/src/main/scala/cats/effect/benchmarks/QueueBenchmark.scala @@ -115,6 +115,22 @@ class QueueBenchmark { def unboundedAsyncEnqueueDequeueContended(): Unit = Queue.unboundedForAsync[IO, Unit].flatMap(enqueueDequeueContended(_)).unsafeRunSync() + @Benchmark + def droppingConcurrentEnqueueDequeueOne(): Unit = + Queue.droppingForConcurrent[IO, Unit](size).flatMap(enqueueDequeueOne(_)).unsafeRunSync() + + @Benchmark + def droppingConcurrentEnqueueDequeueMany(): Unit = + Queue.droppingForConcurrent[IO, Unit](size).flatMap(enqueueDequeueMany(_)).unsafeRunSync() + + @Benchmark + def droppingAsyncEnqueueDequeueOne(): Unit = + Queue.droppingForAsync[IO, Unit](size).flatMap(enqueueDequeueOne(_)).unsafeRunSync() + + @Benchmark + def droppingAsyncEnqueueDequeueMany(): Unit = + Queue.droppingForAsync[IO, Unit](size).flatMap(enqueueDequeueMany(_)).unsafeRunSync() + private[this] def enqueueDequeueOne(q: Queue[IO, Unit]): IO[Unit] = { def loop(i: Int): IO[Unit] = if (i > 0) diff --git a/std/shared/src/main/scala/cats/effect/std/Queue.scala b/std/shared/src/main/scala/cats/effect/std/Queue.scala index 18e6ca312d..1ca998591d 100644 --- a/std/shared/src/main/scala/cats/effect/std/Queue.scala +++ b/std/shared/src/main/scala/cats/effect/std/Queue.scala @@ -105,6 +105,14 @@ object Queue { private[effect] def unboundedForAsync[F[_], A](implicit F: Async[F]): F[Queue[F, A]] = F.delay(new UnboundedAsyncQueue()) + private[effect] def droppingForConcurrent[F[_], A](capacity: Int)( + implicit F: GenConcurrent[F, _]): F[Queue[F, A]] = + F.ref(State.empty[F, A]).map(new DroppingQueue(capacity, _)) + + private[effect] def droppingForAsync[F[_], A](capacity: Int)( + implicit F: Async[F]): F[Queue[F, A]] = + F.delay(new DroppingAsyncQueue(capacity)) + /** * Creates a new `Queue` subject to some `capacity` bound which supports a side-effecting * `tryOffer` function, allowing impure code to directly add values to the queue without @@ -184,7 +192,18 @@ object Queue { */ def dropping[F[_], A](capacity: Int)(implicit F: GenConcurrent[F, _]): F[Queue[F, A]] = { assertPositive(capacity, "Dropping") - F.ref(State.empty[F, A]).map(new DroppingQueue(capacity, _)) + // async queue can't handle capacity == 1 and allocates eagerly, so cap at 64k + if (1 < capacity && capacity < Short.MaxValue.toInt * 2) { + F match { + case f0: Async[F] => + droppingForAsync[F, A](capacity)(f0) + + case _ => + droppingForConcurrent[F, A](capacity) + } + } else { + droppingForConcurrent[F, A](capacity) + } } /** @@ -573,108 +592,21 @@ object Queue { private val EitherUnit: Either[Nothing, Unit] = Right(()) - /* - * Does not correctly handle bound = 0 because take waiters are async[Unit] - */ - private final class BoundedAsyncQueue[F[_], A](capacity: Int)(implicit F: Async[F]) - extends Queue[F, A] - with unsafe.BoundedQueue[F, A] { + private abstract class BaseBoundedAsyncQueue[F[_], A](capacity: Int)(implicit F: Async[F]) + extends Queue[F, A] { require(capacity > 1) - private[this] val buffer = new UnsafeBounded[A](capacity) + protected[this] val buffer = new UnsafeBounded[A](capacity) - private[this] val takers = new UnsafeUnbounded[Either[Throwable, Unit] => Unit]() - private[this] val offerers = new UnsafeUnbounded[Either[Throwable, Unit] => Unit]() + protected[this] val takers = new UnsafeUnbounded[Either[Throwable, Unit] => Unit]() + protected[this] val offerers = new UnsafeUnbounded[Either[Throwable, Unit] => Unit]() - private[this] val FailureSignal = cats.effect.std.FailureSignal // prefetch + protected[this] val FailureSignal = cats.effect.std.FailureSignal // prefetch // private[this] val takers = new ConcurrentLinkedQueue[AtomicReference[Either[Throwable, Unit] => Unit]]() // private[this] val offerers = new ConcurrentLinkedQueue[AtomicReference[Either[Throwable, Unit] => Unit]]() - def offer(a: A): F[Unit] = - F uncancelable { poll => - F defer { - try { - // attempt to put into the buffer; if the buffer is full, it will raise an exception - buffer.put(a) - // println(s"offered: size = ${buffer.size()}") - - // we successfully put, if there are any takers, grab the first one and wake it up - notifyOne(takers) - F.unit - } catch { - case FailureSignal => - // capture whether or not we were successful in our retry - var succeeded = false - - // a latch blocking until some taker notifies us - val wait = F.async[Unit] { k => - F delay { - // add ourselves to the listeners queue - val clear = offerers.put(k) - - try { - // now that we're listening, re-attempt putting - buffer.put(a) - - // it worked! clear ourselves out of the queue - clear() - // our retry succeeded - succeeded = true - - // manually complete our own callback - // note that we could have a race condition here where we're already completed - // async will deduplicate these calls for us - // additionally, the continuation (below) is held until the registration completes - k(EitherUnit) - - // we *might* have negated a notification by succeeding here - // unnecessary wake-ups are mostly harmless (only slight fairness loss) - notifyOne(offerers) - - // technically it's possible to already have waiting takers. notify one of them - notifyOne(takers) - - // we're immediately complete, so no point in creating a finalizer - None - } catch { - case FailureSignal => - // our retry failed, meaning the queue is still full and we're listening, so suspend - // println(s"failed offer size = ${buffer.size()}") - Some(F.delay(clear())) - } - } - } - - val notifyAnyway = F delay { - // we might have been awakened and canceled simultaneously - // try waking up another offerer just in case - notifyOne(offerers) - } - - // suspend until the buffer put can succeed - // if succeeded is true then we've *already* put - // if it's false, then some taker woke us up, so race the retry with other offers - (poll(wait) *> F.defer(if (succeeded) F.unit else poll(offer(a)))) - .onCancel(notifyAnyway) - } - } - } - - def unsafeTryOffer(a: A): Boolean = { - try { - buffer.put(a) - notifyOne(takers) - true - } catch { - case FailureSignal => - false - } - } - - def tryOffer(a: A): F[Boolean] = F.delay(unsafeTryOffer(a)) - val size: F[Int] = F.delay(buffer.size()) val take: F[A] = @@ -808,7 +740,7 @@ object Queue { // TODO could optimize notifications by checking if buffer is completely empty on put @tailrec - private[this] def notifyOne( + protected[this] final def notifyOne( waiters: UnsafeUnbounded[Either[Throwable, Unit] => Unit]): Unit = { // capture whether or not we should loop (structured in this way to avoid nested try/catch, which has a performance cost) val retry = @@ -841,6 +773,98 @@ object Queue { } } + /* + * Does not correctly handle bound = 0 because take waiters are async[Unit] + */ + private final class BoundedAsyncQueue[F[_], A](capacity: Int)(implicit F: Async[F]) + extends BaseBoundedAsyncQueue[F, A](capacity) + with unsafe.BoundedQueue[F, A] { + + def offer(a: A): F[Unit] = + F uncancelable { poll => + F defer { + try { + // attempt to put into the buffer; if the buffer is full, it will raise an exception + buffer.put(a) + // println(s"offered: size = ${buffer.size()}") + + // we successfully put, if there are any takers, grab the first one and wake it up + notifyOne(takers) + F.unit + } catch { + case FailureSignal => + // capture whether or not we were successful in our retry + var succeeded = false + + // a latch blocking until some taker notifies us + val wait = F.async[Unit] { k => + F delay { + // add ourselves to the listeners queue + val clear = offerers.put(k) + + try { + // now that we're listening, re-attempt putting + buffer.put(a) + + // it worked! clear ourselves out of the queue + clear() + // our retry succeeded + succeeded = true + + // manually complete our own callback + // note that we could have a race condition here where we're already completed + // async will deduplicate these calls for us + // additionally, the continuation (below) is held until the registration completes + k(EitherUnit) + + // we *might* have negated a notification by succeeding here + // unnecessary wake-ups are mostly harmless (only slight fairness loss) + notifyOne(offerers) + + // technically it's possible to already have waiting takers. notify one of them + notifyOne(takers) + + // we're immediately complete, so no point in creating a finalizer + None + } catch { + case FailureSignal => + // our retry failed, meaning the queue is still full and we're listening, so suspend + // println(s"failed offer size = ${buffer.size()}") + Some(F.delay(clear())) + } + } + } + + val notifyAnyway = F delay { + // we might have been awakened and canceled simultaneously + // try waking up another offerer just in case + notifyOne(offerers) + } + + // suspend until the buffer put can succeed + // if succeeded is true then we've *already* put + // if it's false, then some taker woke us up, so race the retry with other offers + (poll(wait) *> F.defer(if (succeeded) F.unit else poll(offer(a)))) + .onCancel(notifyAnyway) + } + } + } + + def unsafeTryOffer(a: A): Boolean = { + try { + buffer.put(a) + notifyOne(takers) + true + } catch { + case FailureSignal => + false + } + } + + def tryOffer(a: A): F[Boolean] = F.delay(unsafeTryOffer(a)) + + } + private final class UnboundedAsyncQueue[F[_], A]()(implicit F: Async[F]) extends Queue[F, A] with unsafe.UnboundedQueue[F, A] { @@ -960,6 +984,29 @@ object Queue { } } + private final class DroppingAsyncQueue[F[_], A](capacity: Int)(implicit F: Async[F]) + extends BaseBoundedAsyncQueue[F, A](capacity) { + + def offer(a: A): F[Unit] = + F.delay { + tryOfferUnsafe(a) + () + } + + def tryOffer(a: A): F[Boolean] = + F.delay(tryOfferUnsafe(a)) + + private def tryOfferUnsafe(a: A): Boolean = + try { + buffer.put(a) + notifyOne(takers) + true + } catch { + case FailureSignal => + false + } + } + // ported with love from https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java private[effect] final class UnsafeBounded[A](bound: Int) { require(bound > 1) diff --git a/tests/shared/src/test/scala/cats/effect/std/QueueSpec.scala b/tests/shared/src/test/scala/cats/effect/std/QueueSpec.scala index 231e584088..c2dc7e6e8f 100644 --- a/tests/shared/src/test/scala/cats/effect/std/QueueSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/QueueSpec.scala @@ -418,7 +418,11 @@ class UnboundedQueueSpec extends BaseSpec with QueueTests[Queue] { class DroppingQueueSpec extends BaseSpec with QueueTests[Queue] { sequential - "DroppingQueue" should { + "DroppingQueue (concurrent)" should { + droppingQueueTests(i => if (i < 1) Queue.dropping(i) else Queue.droppingForConcurrent(i)) + } + + "DroppingQueue (async)" should { droppingQueueTests(Queue.dropping) }