From 9cc42ff2f6695667b2e5bc077d07f1509bb54ad3 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Sat, 16 Mar 2019 00:01:19 -0400 Subject: [PATCH 1/3] Attempt to Make the Reset Action Uncancellable --- .../scala/io/chrisdavenport/circuit/CircuitBreaker.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/io/chrisdavenport/circuit/CircuitBreaker.scala b/core/src/main/scala/io/chrisdavenport/circuit/CircuitBreaker.scala index 6aa83aa..df4706a 100644 --- a/core/src/main/scala/io/chrisdavenport/circuit/CircuitBreaker.scala +++ b/core/src/main/scala/io/chrisdavenport/circuit/CircuitBreaker.scala @@ -30,6 +30,7 @@ import scala.concurrent.duration._ import cats.effect.{Clock, Sync} import cats.effect.concurrent.{Ref} import cats.implicits._ +import cats.effect.implicits._ import java.util.concurrent.TimeUnit @@ -499,6 +500,10 @@ object CircuitBreaker { clock.monotonic(TimeUnit.MILLISECONDS).flatMap { now => if (open.startedAt + open.resetTimeout.toMillis >= now) onRejected >> F.raiseError(RejectedExecution(open)) else { + // This operation must succeed at setting backing to some other + // operable state. Otherwise we can get into a state where + // the Circuit Breaker is HalfOpen and all new requests are + // failed automatically. def resetOnSuccess: F[A] = { fa.attempt.flatMap { case Left(err) => ref.set(backoff(open)) >> F.raiseError(err) @@ -512,7 +517,7 @@ object CircuitBreaker { (HalfOpen, onHalfOpen >> resetOnSuccess) else (currentOpen, onRejected >> F.raiseError[A](RejectedExecution(currentOpen))) case HalfOpen => (HalfOpen, onRejected >> F.raiseError[A](RejectedExecution(HalfOpen))) - }.flatten + }.flatten.uncancelable } } From 839ec357b82e9860b0522b85ca2bec8bc2816a44 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Sat, 16 Mar 2019 00:19:31 -0400 Subject: [PATCH 2/3] Better Reset Mechanism --- .../circuit/CircuitBreaker.scala | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/io/chrisdavenport/circuit/CircuitBreaker.scala b/core/src/main/scala/io/chrisdavenport/circuit/CircuitBreaker.scala index df4706a..640e51a 100644 --- a/core/src/main/scala/io/chrisdavenport/circuit/CircuitBreaker.scala +++ b/core/src/main/scala/io/chrisdavenport/circuit/CircuitBreaker.scala @@ -27,7 +27,7 @@ package io.chrisdavenport.circuit import scala.concurrent.duration._ -import cats.effect.{Clock, Sync} +import cats.effect.{Clock, Sync, ExitCase} import cats.effect.concurrent.{Ref} import cats.implicits._ import cats.effect.implicits._ @@ -477,7 +477,7 @@ object CircuitBreaker { ref.modify { case Closed(failures) => val count = failures + 1 - if (count >= maxFailures) (Open(now, resetTimeout), onOpen >> F.raiseError[A](err)) + if (count >= maxFailures) (Open(now, resetTimeout), onOpen.attempt.void >> F.raiseError[A](err)) else (Closed(count), F.raiseError[A](err)) case open: Open => (open, F.raiseError[A](err)) case HalfOpen => (HalfOpen, F.raiseError[A](err)) @@ -496,7 +496,7 @@ object CircuitBreaker { ) } - def tryReset[A](open:Open, fa: F[A]): F[A] = { + def tryReset[A](open: Open, fa: F[A]): F[A] = { clock.monotonic(TimeUnit.MILLISECONDS).flatMap { now => if (open.startedAt + open.resetTimeout.toMillis >= now) onRejected >> F.raiseError(RejectedExecution(open)) else { @@ -507,17 +507,27 @@ object CircuitBreaker { def resetOnSuccess: F[A] = { fa.attempt.flatMap { case Left(err) => ref.set(backoff(open)) >> F.raiseError(err) - case Right(a) => onClosed >> ref.set(ClosedZero) as a + case Right(a) => onClosed.attempt.void >> ref.set(ClosedZero) as a } } ref.modify { case closed: Closed => (closed, openOnFail(fa)) case currentOpen: Open => if (currentOpen.startedAt == open.startedAt && currentOpen.resetTimeout == open.resetTimeout) - (HalfOpen, onHalfOpen >> resetOnSuccess) - else (currentOpen, onRejected >> F.raiseError[A](RejectedExecution(currentOpen))) - case HalfOpen => (HalfOpen, onRejected >> F.raiseError[A](RejectedExecution(HalfOpen))) - }.flatten.uncancelable + (HalfOpen, onHalfOpen.attempt.void >> resetOnSuccess) + else (currentOpen, onRejected.attempt.void >> F.raiseError[A](RejectedExecution(currentOpen))) + case HalfOpen => (HalfOpen, onRejected.attempt.void >> F.raiseError[A](RejectedExecution(HalfOpen))) + }.flatten.guaranteeCase{ + // Handles the case of cancelation during this set of operations + // With autocancelable flatMap this guarantee might not hold. + case ExitCase.Canceled => ref.update{ + case HalfOpen => open // We Don't leave this in a half-open state. + case closed: Closed => closed + case open: Open => open + } + case ExitCase.Error(_) => F.unit + case ExitCase.Completed => F.unit + } } } @@ -527,7 +537,7 @@ object CircuitBreaker { ref.modify { case closed: Closed => (closed, openOnFail(fa)) case open: Open => (open, tryReset(open, fa)) - case HalfOpen => (HalfOpen, onRejected >> F.raiseError[A](RejectedExecution(HalfOpen))) + case HalfOpen => (HalfOpen, onRejected.attempt.void >> F.raiseError[A](RejectedExecution(HalfOpen))) }.flatten } } From 8055f5459752d020752f32d16d6351a3922890a5 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Sat, 16 Mar 2019 10:27:23 -0400 Subject: [PATCH 3/3] Test Time Certainty --- .../scala/io/chrisdavenport/circuit/CircuitBreakerTests.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/io/chrisdavenport/circuit/CircuitBreakerTests.scala b/core/src/test/scala/io/chrisdavenport/circuit/CircuitBreakerTests.scala index ab166f3..377531b 100644 --- a/core/src/test/scala/io/chrisdavenport/circuit/CircuitBreakerTests.scala +++ b/core/src/test/scala/io/chrisdavenport/circuit/CircuitBreakerTests.scala @@ -194,7 +194,7 @@ class CircuitBreakerTests extends AsyncFunSuite with Matchers { // Testing half-open state d <- Deferred[IO, Unit] fiber <- circuitBreaker.protect(d.get).start - _ <- IO.sleep(10.millis) + _ <- IO.sleep(1.second) _ = unsafeState() should matchPattern { case CircuitBreaker.HalfOpen => }