Skip to content

Commit

Permalink
Merge pull request #13 from ChristopherDavenport/robustToCancelation
Browse files Browse the repository at this point in the history
Attempt to Make the Reset Action Uncancellable
  • Loading branch information
ChristopherDavenport authored Mar 16, 2019
2 parents 3471af8 + 8055f54 commit a055412
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 10 deletions.
33 changes: 24 additions & 9 deletions core/src/main/scala/io/chrisdavenport/circuit/CircuitBreaker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ package io.chrisdavenport.circuit

import scala.concurrent.duration._

import cats.effect.{Clock, Sync}
import cats.effect.{Clock, Sync, ExitCase}
import cats.effect.concurrent.{Ref}
import cats.implicits._
import cats.effect.implicits._

import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -476,7 +477,7 @@ object CircuitBreaker {
ref.modify {
case Closed(failures) =>
val count = failures + 1
if (count >= maxFailures) (Open(now, resetTimeout), onOpen >> F.raiseError[A](err))
if (count >= maxFailures) (Open(now, resetTimeout), onOpen.attempt.void >> F.raiseError[A](err))
else (Closed(count), F.raiseError[A](err))
case open: Open => (open, F.raiseError[A](err))
case HalfOpen => (HalfOpen, F.raiseError[A](err))
Expand All @@ -495,24 +496,38 @@ object CircuitBreaker {
)
}

def tryReset[A](open:Open, fa: F[A]): F[A] = {
def tryReset[A](open: Open, fa: F[A]): F[A] = {
clock.monotonic(TimeUnit.MILLISECONDS).flatMap { now =>
if (open.startedAt + open.resetTimeout.toMillis >= now) onRejected >> F.raiseError(RejectedExecution(open))
else {
// This operation must succeed at setting backing to some other
// operable state. Otherwise we can get into a state where
// the Circuit Breaker is HalfOpen and all new requests are
// failed automatically.
def resetOnSuccess: F[A] = {
fa.attempt.flatMap {
case Left(err) => ref.set(backoff(open)) >> F.raiseError(err)
case Right(a) => onClosed >> ref.set(ClosedZero) as a
case Right(a) => onClosed.attempt.void >> ref.set(ClosedZero) as a
}
}
ref.modify {
case closed: Closed => (closed, openOnFail(fa))
case currentOpen: Open =>
if (currentOpen.startedAt == open.startedAt && currentOpen.resetTimeout == open.resetTimeout)
(HalfOpen, onHalfOpen >> resetOnSuccess)
else (currentOpen, onRejected >> F.raiseError[A](RejectedExecution(currentOpen)))
case HalfOpen => (HalfOpen, onRejected >> F.raiseError[A](RejectedExecution(HalfOpen)))
}.flatten
(HalfOpen, onHalfOpen.attempt.void >> resetOnSuccess)
else (currentOpen, onRejected.attempt.void >> F.raiseError[A](RejectedExecution(currentOpen)))
case HalfOpen => (HalfOpen, onRejected.attempt.void >> F.raiseError[A](RejectedExecution(HalfOpen)))
}.flatten.guaranteeCase{
// Handles the case of cancelation during this set of operations
// With autocancelable flatMap this guarantee might not hold.
case ExitCase.Canceled => ref.update{
case HalfOpen => open // We Don't leave this in a half-open state.
case closed: Closed => closed
case open: Open => open
}
case ExitCase.Error(_) => F.unit
case ExitCase.Completed => F.unit
}

}
}
Expand All @@ -522,7 +537,7 @@ object CircuitBreaker {
ref.modify {
case closed: Closed => (closed, openOnFail(fa))
case open: Open => (open, tryReset(open, fa))
case HalfOpen => (HalfOpen, onRejected >> F.raiseError[A](RejectedExecution(HalfOpen)))
case HalfOpen => (HalfOpen, onRejected.attempt.void >> F.raiseError[A](RejectedExecution(HalfOpen)))
}.flatten
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class CircuitBreakerTests extends AsyncFunSuite with Matchers {
// Testing half-open state
d <- Deferred[IO, Unit]
fiber <- circuitBreaker.protect(d.get).start
_ <- IO.sleep(10.millis)
_ <- IO.sleep(1.second)
_ = unsafeState() should matchPattern {
case CircuitBreaker.HalfOpen =>
}
Expand Down

0 comments on commit a055412

Please sign in to comment.