Skip to content

Commit

Permalink
Merge pull request #4143 from iRevive/async-dropping-queue
Browse files Browse the repository at this point in the history
Implement async dropping queue
  • Loading branch information
djspiewak authored Oct 25, 2024
2 parents 28e774e + 8fe95e6 commit 0600ea6
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,22 @@ class QueueBenchmark {
def unboundedAsyncEnqueueDequeueContended(): Unit =
Queue.unboundedForAsync[IO, Unit].flatMap(enqueueDequeueContended(_)).unsafeRunSync()

@Benchmark
def droppingConcurrentEnqueueDequeueOne(): Unit =
Queue.droppingForConcurrent[IO, Unit](size).flatMap(enqueueDequeueOne(_)).unsafeRunSync()

@Benchmark
def droppingConcurrentEnqueueDequeueMany(): Unit =
Queue.droppingForConcurrent[IO, Unit](size).flatMap(enqueueDequeueMany(_)).unsafeRunSync()

@Benchmark
def droppingAsyncEnqueueDequeueOne(): Unit =
Queue.droppingForAsync[IO, Unit](size).flatMap(enqueueDequeueOne(_)).unsafeRunSync()

@Benchmark
def droppingAsyncEnqueueDequeueMany(): Unit =
Queue.droppingForAsync[IO, Unit](size).flatMap(enqueueDequeueMany(_)).unsafeRunSync()

private[this] def enqueueDequeueOne(q: Queue[IO, Unit]): IO[Unit] = {
def loop(i: Int): IO[Unit] =
if (i > 0)
Expand Down
237 changes: 142 additions & 95 deletions std/shared/src/main/scala/cats/effect/std/Queue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ object Queue {
private[effect] def unboundedForAsync[F[_], A](implicit F: Async[F]): F[Queue[F, A]] =
F.delay(new UnboundedAsyncQueue())

private[effect] def droppingForConcurrent[F[_], A](capacity: Int)(
implicit F: GenConcurrent[F, _]): F[Queue[F, A]] =
F.ref(State.empty[F, A]).map(new DroppingQueue(capacity, _))

private[effect] def droppingForAsync[F[_], A](capacity: Int)(
implicit F: Async[F]): F[Queue[F, A]] =
F.delay(new DroppingAsyncQueue(capacity))

/**
* Creates a new `Queue` subject to some `capacity` bound which supports a side-effecting
* `tryOffer` function, allowing impure code to directly add values to the queue without
Expand Down Expand Up @@ -184,7 +192,18 @@ object Queue {
*/
def dropping[F[_], A](capacity: Int)(implicit F: GenConcurrent[F, _]): F[Queue[F, A]] = {
assertPositive(capacity, "Dropping")
F.ref(State.empty[F, A]).map(new DroppingQueue(capacity, _))
// async queue can't handle capacity == 1 and allocates eagerly, so cap at 64k
if (1 < capacity && capacity < Short.MaxValue.toInt * 2) {
F match {
case f0: Async[F] =>
droppingForAsync[F, A](capacity)(f0)

case _ =>
droppingForConcurrent[F, A](capacity)
}
} else {
droppingForConcurrent[F, A](capacity)
}
}

/**
Expand Down Expand Up @@ -573,108 +592,21 @@ object Queue {

private val EitherUnit: Either[Nothing, Unit] = Right(())

/*
* Does not correctly handle bound = 0 because take waiters are async[Unit]
*/
private final class BoundedAsyncQueue[F[_], A](capacity: Int)(implicit F: Async[F])
extends Queue[F, A]
with unsafe.BoundedQueue[F, A] {
private abstract class BaseBoundedAsyncQueue[F[_], A](capacity: Int)(implicit F: Async[F])
extends Queue[F, A] {

require(capacity > 1)

private[this] val buffer = new UnsafeBounded[A](capacity)
protected[this] val buffer = new UnsafeBounded[A](capacity)

private[this] val takers = new UnsafeUnbounded[Either[Throwable, Unit] => Unit]()
private[this] val offerers = new UnsafeUnbounded[Either[Throwable, Unit] => Unit]()
protected[this] val takers = new UnsafeUnbounded[Either[Throwable, Unit] => Unit]()
protected[this] val offerers = new UnsafeUnbounded[Either[Throwable, Unit] => Unit]()

private[this] val FailureSignal = cats.effect.std.FailureSignal // prefetch
protected[this] val FailureSignal = cats.effect.std.FailureSignal // prefetch

// private[this] val takers = new ConcurrentLinkedQueue[AtomicReference[Either[Throwable, Unit] => Unit]]()
// private[this] val offerers = new ConcurrentLinkedQueue[AtomicReference[Either[Throwable, Unit] => Unit]]()

def offer(a: A): F[Unit] =
F uncancelable { poll =>
F defer {
try {
// attempt to put into the buffer; if the buffer is full, it will raise an exception
buffer.put(a)
// println(s"offered: size = ${buffer.size()}")

// we successfully put, if there are any takers, grab the first one and wake it up
notifyOne(takers)
F.unit
} catch {
case FailureSignal =>
// capture whether or not we were successful in our retry
var succeeded = false

// a latch blocking until some taker notifies us
val wait = F.async[Unit] { k =>
F delay {
// add ourselves to the listeners queue
val clear = offerers.put(k)

try {
// now that we're listening, re-attempt putting
buffer.put(a)

// it worked! clear ourselves out of the queue
clear()
// our retry succeeded
succeeded = true

// manually complete our own callback
// note that we could have a race condition here where we're already completed
// async will deduplicate these calls for us
// additionally, the continuation (below) is held until the registration completes
k(EitherUnit)

// we *might* have negated a notification by succeeding here
// unnecessary wake-ups are mostly harmless (only slight fairness loss)
notifyOne(offerers)

// technically it's possible to already have waiting takers. notify one of them
notifyOne(takers)

// we're immediately complete, so no point in creating a finalizer
None
} catch {
case FailureSignal =>
// our retry failed, meaning the queue is still full and we're listening, so suspend
// println(s"failed offer size = ${buffer.size()}")
Some(F.delay(clear()))
}
}
}

val notifyAnyway = F delay {
// we might have been awakened and canceled simultaneously
// try waking up another offerer just in case
notifyOne(offerers)
}

// suspend until the buffer put can succeed
// if succeeded is true then we've *already* put
// if it's false, then some taker woke us up, so race the retry with other offers
(poll(wait) *> F.defer(if (succeeded) F.unit else poll(offer(a))))
.onCancel(notifyAnyway)
}
}
}

def unsafeTryOffer(a: A): Boolean = {
try {
buffer.put(a)
notifyOne(takers)
true
} catch {
case FailureSignal =>
false
}
}

def tryOffer(a: A): F[Boolean] = F.delay(unsafeTryOffer(a))

val size: F[Int] = F.delay(buffer.size())

val take: F[A] =
Expand Down Expand Up @@ -808,7 +740,7 @@ object Queue {

// TODO could optimize notifications by checking if buffer is completely empty on put
@tailrec
private[this] def notifyOne(
protected[this] final def notifyOne(
waiters: UnsafeUnbounded[Either[Throwable, Unit] => Unit]): Unit = {
// capture whether or not we should loop (structured in this way to avoid nested try/catch, which has a performance cost)
val retry =
Expand Down Expand Up @@ -841,6 +773,98 @@ object Queue {
}
}

/*
* Does not correctly handle bound = 0 because take waiters are async[Unit]
*/
private final class BoundedAsyncQueue[F[_], A](capacity: Int)(implicit F: Async[F])
extends BaseBoundedAsyncQueue[F, A](capacity)
with unsafe.BoundedQueue[F, A] {

def offer(a: A): F[Unit] =
F uncancelable { poll =>
F defer {
try {
// attempt to put into the buffer; if the buffer is full, it will raise an exception
buffer.put(a)
// println(s"offered: size = ${buffer.size()}")

// we successfully put, if there are any takers, grab the first one and wake it up
notifyOne(takers)
F.unit
} catch {
case FailureSignal =>
// capture whether or not we were successful in our retry
var succeeded = false

// a latch blocking until some taker notifies us
val wait = F.async[Unit] { k =>
F delay {
// add ourselves to the listeners queue
val clear = offerers.put(k)

try {
// now that we're listening, re-attempt putting
buffer.put(a)

// it worked! clear ourselves out of the queue
clear()
// our retry succeeded
succeeded = true

// manually complete our own callback
// note that we could have a race condition here where we're already completed
// async will deduplicate these calls for us
// additionally, the continuation (below) is held until the registration completes
k(EitherUnit)

// we *might* have negated a notification by succeeding here
// unnecessary wake-ups are mostly harmless (only slight fairness loss)
notifyOne(offerers)

// technically it's possible to already have waiting takers. notify one of them
notifyOne(takers)

// we're immediately complete, so no point in creating a finalizer
None
} catch {
case FailureSignal =>
// our retry failed, meaning the queue is still full and we're listening, so suspend
// println(s"failed offer size = ${buffer.size()}")
Some(F.delay(clear()))
}
}
}

val notifyAnyway = F delay {
// we might have been awakened and canceled simultaneously
// try waking up another offerer just in case
notifyOne(offerers)
}

// suspend until the buffer put can succeed
// if succeeded is true then we've *already* put
// if it's false, then some taker woke us up, so race the retry with other offers
(poll(wait) *> F.defer(if (succeeded) F.unit else poll(offer(a))))
.onCancel(notifyAnyway)
}
}
}

def unsafeTryOffer(a: A): Boolean = {
try {
buffer.put(a)
notifyOne(takers)
true
} catch {
case FailureSignal =>
false
}
}

def tryOffer(a: A): F[Boolean] = F.delay(unsafeTryOffer(a))

}

private final class UnboundedAsyncQueue[F[_], A]()(implicit F: Async[F])
extends Queue[F, A]
with unsafe.UnboundedQueue[F, A] {
Expand Down Expand Up @@ -960,6 +984,29 @@ object Queue {
}
}

private final class DroppingAsyncQueue[F[_], A](capacity: Int)(implicit F: Async[F])
extends BaseBoundedAsyncQueue[F, A](capacity) {

def offer(a: A): F[Unit] =
F.delay {
tryOfferUnsafe(a)
()
}

def tryOffer(a: A): F[Boolean] =
F.delay(tryOfferUnsafe(a))

private def tryOfferUnsafe(a: A): Boolean =
try {
buffer.put(a)
notifyOne(takers)
true
} catch {
case FailureSignal =>
false
}
}

// ported with love from https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java
private[effect] final class UnsafeBounded[A](bound: Int) {
require(bound > 1)
Expand Down
6 changes: 5 additions & 1 deletion tests/shared/src/test/scala/cats/effect/std/QueueSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,11 @@ class UnboundedQueueSpec extends BaseSpec with QueueTests[Queue] {
class DroppingQueueSpec extends BaseSpec with QueueTests[Queue] {
sequential

"DroppingQueue" should {
"DroppingQueue (concurrent)" should {
droppingQueueTests(i => if (i < 1) Queue.dropping(i) else Queue.droppingForConcurrent(i))
}

"DroppingQueue (async)" should {
droppingQueueTests(Queue.dropping)
}

Expand Down

0 comments on commit 0600ea6

Please sign in to comment.