Skip to content

Commit

Permalink
[ConcurrentRateLimiter] Redis backend (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
nryanov committed May 23, 2021
1 parent 2e417a8 commit 0a4d68e
Show file tree
Hide file tree
Showing 99 changed files with 3,203 additions and 131 deletions.
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ Backend | Client |
Aerospike | [aerospike-client-java](https://github.com/aerospike/aerospike-client-java)
Redis | [jedis](https://github.com/redis/jedis) <br> [lettuce](https://github.com/lettuce-io/lettuce-core) <br> [redisson](https://github.com/redisson/redisson)

**RateLimiter**

Class | Effect |
------------ | -------------
`TryRateLimiter` | `scala.util.Try`
Expand All @@ -57,6 +59,30 @@ Class | Effect |
`RedissonZioRateLimiter` | `zio.Task`
`RedissonZioAsyncRateLimiter` | `zio.Task`

**ConcurrentRateLimiter**

Class | Effect |
------------ | -------------
`TryConcurrentRateLimiter` | `scala.util.Try`
`EitherConcurrentRateLimiter` | `Either`
`JedisSyncConcurrentRateLimiter` | None (`Identity`)
`JedisCatsConcurrentRateLimiter` | `F[_]: cats.effect.Sync: cats.effect.ContextShift`
`JedisZioConcurrentRateLimiter` | `zio.Task`
`LettuceSyncConcurrentRateLimiter` | None (`Identity`)
`LettuceAsyncConcurrentRateLimiter` | `scala.concurrent.Future`
`LettuceCatsConcurrentRateLimiter` | `F[_]: cats.effect.Sync: cats.effect.ContextShift`
`LettuceCatsAsyncConcurrentRateLimiter` | `F[_]: cats.effect.Concurrent`
`LettuceMonixAsyncConcurrentRateLimiter` | `monix.eval.Task`
`LettuceZioConcurrentRateLimiter` | `zio.Task`
`LettuceZioAsyncConcurrentRateLimiter` | `zio.Task`
`RedissonSyncConcurrentRateLimiter` | None (`Identity`)
`RedissonAsyncConcurrentRateLimiter` | `scala.concurrent.Future`
`RedissonCatsConcurrentRateLimiter` | `F[_]: cats.effect.Sync: cats.effect.ContextShift`
`RedissonCatsAsyncConcurrentRateLimiter` | `F[_]: cats.effect.Concurrent`
`RedissonMonixAsyncConcurrentRateLimiter` | `monix.eval.Task`
`RedissonZioConcurrentRateLimiter` | `zio.Task`
`RedissonZioAsyncConcurrentRateLimiter` | `zio.Task`

## Usage
```scala
import genkai._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,5 @@ abstract class AerospikeRateLimiter[F[_]](

override def close(): F[Unit] = monad.whenA(closeClient)(monad.eval(client.close()))

override protected def monadError: MonadError[F] = monad
override def monadError: MonadError[F] = monad
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package genkai.aerospike

import com.aerospike.client.AerospikeClient
import com.dimafeng.testcontainers.scalatest.TestContainerForAll
import genkai.BaseSpec
import genkai.RateLimiterBaseSpec

trait AerospikeSpecForAll[F[_]] extends BaseSpec[F] with TestContainerForAll {
trait AerospikeSpecForAll[F[_]] extends RateLimiterBaseSpec[F] with TestContainerForAll {
override val containerDef: AerospikeContainer.Def = AerospikeContainer.Def()

var aerospikeClient: AerospikeClient = _
Expand Down
40 changes: 40 additions & 0 deletions modules/core/src/main/scala/genkai/ConcurrentRateLimiter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package genkai

import java.time.Instant

/**
* @tparam F - effect type
*/
trait ConcurrentRateLimiter[F[_]] extends MonadErrorAware[F] {

final def use[A: Key, B](key: A)(f: => F[B]): F[Either[ConcurrentLimitExhausted[A], B]] =
use(key, Instant.now())(f)

private[genkai] def use[A: Key, B](key: A, instant: Instant)(
f: => F[B]
): F[Either[ConcurrentLimitExhausted[A], B]]

def reset[A: Key](key: A): F[Unit]

final def acquire[A: Key](key: A): F[Boolean] =
acquire(key, Instant.now())

private[genkai] def acquire[A: Key](
key: A,
instant: Instant
): F[Boolean]

final def release[A: Key](key: A): F[Boolean] =
release(key, Instant.now())

private[genkai] def release[A: Key](
key: A,
instant: Instant
): F[Boolean]

final def permissions[A: Key](key: A): F[Long] = permissions(key, Instant.now())

private[genkai] def permissions[A: Key](key: A, instant: Instant): F[Long]

def close(): F[Unit]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package genkai

sealed abstract class ConcurrentRateLimiterError(msg: String, cause: Throwable)
extends RuntimeException(msg, cause)

final case class ConcurrentLimitExhausted[A: Key](key: A)
extends ConcurrentRateLimiterError(s"No available slots for key: ${Key[A].convert(key)}", null)

final case class ConcurrentRateLimiterClientError(cause: Throwable)
extends ConcurrentRateLimiterError(cause.getLocalizedMessage, cause)
14 changes: 14 additions & 0 deletions modules/core/src/main/scala/genkai/ConcurrentStrategy.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package genkai

import scala.concurrent.duration.Duration

sealed trait ConcurrentStrategy

object ConcurrentStrategy {

/**
* @param slots - available slots for concurrent requests
* @param ttl - default ttl for automatic slot acquisition cleanup if manual cleanup did not succeed
*/
final case class Default(slots: Long, ttl: Duration) extends ConcurrentStrategy
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package genkai

import java.time.Instant
import genkai.monad.{EitherMonadError, MonadError}

class EitherConcurrentRateLimiter(concurrentRateLimiter: ConcurrentRateLimiter[Identity])
extends ConcurrentRateLimiter[Either[Throwable, *]] {
type ResultRight[A, B] = Either[ConcurrentLimitExhausted[A], B]
type Result[A, B] = Either[Throwable, ResultRight[A, B]]

override private[genkai] def use[A: Key, B](key: A, instant: Instant)(
f: => Either[Throwable, B]
): Result[A, B] =
monadError.eval(concurrentRateLimiter.use(key, instant)(f)).flatMap {
case Left(value) =>
Right[Throwable, ResultRight[A, B]](Left[ConcurrentLimitExhausted[A], B](value))
case Right(value) =>
value match {
case Left(value) => Left(value)
case Right(value) => Right(Right(value))
}
}

override def reset[A: Key](key: A): Either[Throwable, Unit] =
monadError.eval(concurrentRateLimiter.reset(key))

override private[genkai] def acquire[A: Key](
key: A,
instant: Instant
): Either[Throwable, Boolean] =
monadError.eval(concurrentRateLimiter.acquire(key, instant))

override private[genkai] def release[A: Key](
key: A,
instant: Instant
): Either[Throwable, Boolean] =
monadError.eval(concurrentRateLimiter.release(key, instant))

override private[genkai] def permissions[A: Key](
key: A,
instant: Instant
): Either[Throwable, Long] =
monadError.eval(concurrentRateLimiter.permissions(key, instant))

override def close(): Either[Throwable, Unit] = monadError.eval(concurrentRateLimiter.close())

override def monadError: MonadError[Either[Throwable, *]] = EitherMonadError
}
2 changes: 1 addition & 1 deletion modules/core/src/main/scala/genkai/EitherRateLimiter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ final class EitherRateLimiter(rateLimiter: RateLimiter[Identity])

override def close(): Either[Throwable, Unit] = monadError.eval(rateLimiter.close())

override protected def monadError: MonadError[Either[Throwable, *]] = EitherMonadError
override def monadError: MonadError[Either[Throwable, *]] = EitherMonadError
}
2 changes: 1 addition & 1 deletion modules/core/src/main/scala/genkai/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package genkai

import org.slf4j.{Logger, LoggerFactory}

trait Logging[F[_]] { self: RateLimiter[F] =>
trait Logging[F[_]] { self: MonadErrorAware[F] =>
protected val logger: Logger = LoggerFactory.getLogger(self.getClass)

def trace(msg: String): F[Unit] =
Expand Down
7 changes: 7 additions & 0 deletions modules/core/src/main/scala/genkai/MonadErrorAware.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package genkai

import genkai.monad.MonadError

trait MonadErrorAware[F[_]] {
def monadError: MonadError[F]
}
6 changes: 1 addition & 5 deletions modules/core/src/main/scala/genkai/RateLimiter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package genkai

import java.time.Instant

import genkai.monad.MonadError

/**
* @tparam F - effect type
*/
trait RateLimiter[F[_]] {
trait RateLimiter[F[_]] extends MonadErrorAware[F] {

/**
* @param key - ~ object id
Expand Down Expand Up @@ -76,6 +74,4 @@ trait RateLimiter[F[_]] {
* @return - unit if successfully closed or error wrapped in effect
*/
def close(): F[Unit]

protected def monadError: MonadError[F]
}
2 changes: 1 addition & 1 deletion modules/core/src/main/scala/genkai/RateLimiterError.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ package genkai
sealed abstract class RateLimiterError(msg: String, cause: Throwable)
extends RuntimeException(msg, cause)

final case class ClientError(cause: Throwable)
final case class RateLimiterClientError(cause: Throwable)
extends RateLimiterError(cause.getLocalizedMessage, cause)
43 changes: 43 additions & 0 deletions modules/core/src/main/scala/genkai/TryConcurrentRateLimiter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package genkai

import java.time.Instant

import genkai.monad.{MonadError, TryMonadError}

import scala.util.{Failure, Success, Try}

final class TryConcurrentRateLimiter(concurrentRateLimiter: ConcurrentRateLimiter[Identity])
extends ConcurrentRateLimiter[Try] {
override private[genkai] def use[A: Key, B](key: A, instant: Instant)(
f: => Try[B]
): Try[Either[ConcurrentLimitExhausted[A], B]] =
monadError.eval(concurrentRateLimiter.use(key, instant)(f)).flatMap {
case Left(value) => Success(Left(value))
case Right(value) =>
value match {
case Failure(exception) => Failure(exception)
case Success(value) => Success(Right(value))
}
}

override private[genkai] def acquire[A: Key](
key: A,
instant: Instant
): Try[Boolean] =
monadError.eval(concurrentRateLimiter.acquire(key, instant))

override def reset[A: Key](key: A): Try[Unit] = monadError.eval(concurrentRateLimiter.reset(key))

override private[genkai] def release[A: Key](
key: A,
instant: Instant
): Try[Boolean] =
monadError.eval(concurrentRateLimiter.release(key, instant))

override private[genkai] def permissions[A: Key](key: A, instant: Instant): Try[Long] =
monadError.eval(concurrentRateLimiter.permissions(key, instant))

override def close(): Try[Unit] = monadError.eval(concurrentRateLimiter.close())

override def monadError: MonadError[Try] = TryMonadError
}
2 changes: 1 addition & 1 deletion modules/core/src/main/scala/genkai/TryRateLimiter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ final class TryRateLimiter(

override def close(): Try[Unit] = monadError.eval(rateLimiter.close())

override protected def monadError: MonadError[Try] = TryMonadError
override def monadError: MonadError[Try] = TryMonadError
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object EitherMonadError extends MonadError[Either[Throwable, *]] {
case _ => fa
}

override def ifA[A](
override def ifM[A](
fcond: Either[Throwable, Boolean]
)(ifTrue: => Either[Throwable, A], ifFalse: => Either[Throwable, A]): Either[Throwable, A] =
fcond.flatMap { flag =>
Expand All @@ -66,7 +66,7 @@ object EitherMonadError extends MonadError[Either[Throwable, *]] {
override def eval[A](f: => A): Either[Throwable, A] = Try(f).toEither

override def guarantee[A](
f: Either[Throwable, A]
f: => Either[Throwable, A]
)(g: => Either[Throwable, Unit]): Either[Throwable, A] = {
def tryE = Try(g) match {
case Failure(exception) => Left(exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class FutureMonadAsyncError(implicit ec: ExecutionContext) extends MonadAsyncErr
): Future[A] =
fa.recoverWith(pf)

override def ifA[A](
override def ifM[A](
fcond: Future[Boolean]
)(ifTrue: => Future[A], ifFalse: => Future[A]): Future[A] =
fcond.flatMap { flag =>
Expand Down Expand Up @@ -73,7 +73,7 @@ class FutureMonadAsyncError(implicit ec: ExecutionContext) extends MonadAsyncErr
p.future
}

override def guarantee[A](f: Future[A])(g: => Future[Unit]): Future[A] = {
override def guarantee[A](f: => Future[A])(g: => Future[Unit]): Future[A] = {
val p = Promise[A]()

def tryF = Try(g) match {
Expand Down
4 changes: 2 additions & 2 deletions modules/core/src/main/scala/genkai/monad/IdMonadError.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object IdMonadError extends MonadError[Identity] {
pf: PartialFunction[Throwable, Identity[A]]
): Identity[A] = fa

override def ifA[A](
override def ifM[A](
fcond: Identity[Boolean]
)(ifTrue: => Identity[A], ifFalse: => Identity[A]): Identity[A] =
if (fcond) ifTrue
Expand All @@ -43,7 +43,7 @@ object IdMonadError extends MonadError[Identity] {

override def eval[A](f: => A): Identity[A] = f

override def guarantee[A](f: Identity[A])(g: => Identity[Unit]): Identity[A] =
override def guarantee[A](f: => Identity[A])(g: => Identity[Unit]): Identity[A] =
try f
finally g
}
4 changes: 2 additions & 2 deletions modules/core/src/main/scala/genkai/monad/MonadError.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ trait MonadError[F[_]] {

def handleErrorWith[A](fa: F[A])(pf: PartialFunction[Throwable, F[A]]): F[A]

def ifA[A](fcond: F[Boolean])(ifTrue: => F[A], ifFalse: => F[A]): F[A]
def ifM[A](fcond: F[Boolean])(ifTrue: => F[A], ifFalse: => F[A]): F[A]

def whenA[A](cond: Boolean)(f: => F[A]): F[Unit]

Expand All @@ -35,5 +35,5 @@ trait MonadError[F[_]] {

def flatten[A](fa: F[F[A]]): F[A] = flatMap(fa)(v => identity(v))

def guarantee[A](f: F[A])(g: => F[Unit]): F[A]
def guarantee[A](f: => F[A])(g: => F[Unit]): F[A]
}
4 changes: 2 additions & 2 deletions modules/core/src/main/scala/genkai/monad/TryMonadError.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object TryMonadError extends MonadError[Try] {
case _ => fa
}

override def ifA[A](fcond: Try[Boolean])(ifTrue: => Try[A], ifFalse: => Try[A]): Try[A] =
override def ifM[A](fcond: Try[Boolean])(ifTrue: => Try[A], ifFalse: => Try[A]): Try[A] =
fcond.flatMap { flag =>
if (flag) ifTrue
else ifFalse
Expand All @@ -49,7 +49,7 @@ object TryMonadError extends MonadError[Try] {

override def eval[A](f: => A): Try[A] = Try(f)

override def guarantee[A](f: Try[A])(g: => Try[Unit]): Try[A] =
override def guarantee[A](f: => Try[A])(g: => Try[Unit]): Try[A] =
f match {
case Failure(exception) => suspend(g).flatMap(_ => Failure(exception))
case Success(value) => suspend(g).map(_ => value)
Expand Down
Loading

0 comments on commit 0a4d68e

Please sign in to comment.