Skip to content

Commit

Permalink
move utilities from lila, bump 10.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
ornicar committed Mar 30, 2024
1 parent 6127a9a commit b9471b2
Show file tree
Hide file tree
Showing 12 changed files with 470 additions and 9 deletions.
19 changes: 11 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
lazy val scalalib = Project("scalalib", file("."))
organization := "com.github.ornicar"
name := "scalalib"
version := "9.5.8"
scalaVersion := "3.4.1"
licenses += "MIT" -> url("https://opensource.org/licenses/MIT")
libraryDependencies += "org.typelevel" %% "cats-core" % "2.10.0"
libraryDependencies += "org.typelevel" %% "alleycats-core" % "2.10.0"
libraryDependencies += "com.lihaoyi" %% "pprint" % "0.7.0"
organization := "com.github.ornicar"
name := "scalalib"
version := "10.0.0"
scalaVersion := "3.4.1"
licenses += "MIT" -> url("https://opensource.org/licenses/MIT")
libraryDependencies += "org.typelevel" %% "cats-core" % "2.10.0"
libraryDependencies += "org.typelevel" %% "alleycats-core" % "2.10.0"
libraryDependencies += "com.lihaoyi" %% "pprint" % "0.7.0"
libraryDependencies += "com.github.ben-manes.caffeine" % "caffeine" % "3.1.8" % "compile"
libraryDependencies += "com.github.blemale" %% "scaffeine" % "5.2.1" % "compile"

scalacOptions := Seq(
"-encoding",
"utf-8",
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/Random.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ final class RandomApi(impl: java.util.Random):

private val chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
private inline def nextAlphanumeric(): Char =
chars charAt nextInt(chars.length) // Constant time
chars `charAt` nextInt(chars.length) // Constant time

def nextString(len: Int): String =
val sb = StringBuilder(len)
Expand Down
58 changes: 58 additions & 0 deletions src/main/scala/actor/AsyncActor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package ornicar.scalalib
package actor

import java.util.concurrent.atomic.AtomicReference
import java.util.function.UnaryOperator
import scala.collection.immutable.Queue
import scala.concurrent.{ ExecutionContext, Future, Promise }

/*
* Sequential like an actor, but for async functions,
* and using an atomic backend instead of akka actor.
*/
abstract class AsyncActor(monitor: AsyncActor.Monitor)(using ExecutionContext):

import AsyncActor.*

// implement async behaviour here
protected val process: ReceiveAsync

def !(msg: Matchable): Unit =
if stateRef.getAndUpdate(state => Some(state.fold(Queue.empty[Matchable])(_.enqueue(msg)))).isEmpty then
run(msg)

def ask[A](makeMsg: Promise[A] => Matchable): Future[A] =
val promise = Promise[A]()
this ! makeMsg(promise)
promise.future

/*
* Idle: None
* Busy: Some(Queue.empty)
* Busy with backlog: Some(Queue.nonEmpty)
*/
private val stateRef: AtomicReference[State] = new AtomicReference(None)

private def run(msg: Matchable): Unit =
process.applyOrElse(msg, fallback).onComplete(postRun)

private val postRun = (_: Matchable) =>
stateRef.getAndUpdate(postRunUpdate).flatMap(_.headOption).foreach(run)

private val fallback = (msg: Matchable) =>
monitor.unhandled(msg)
Future.unit

object AsyncActor:

type ReceiveAsync = PartialFunction[Matchable, Future[Matchable]]

case class Monitor(unhandled: Any => Unit)
// lila.log("asyncActor").warn(s"unhandled msg: $msg")

private type State = Option[Queue[Matchable]]

private val postRunUpdate = new UnaryOperator[State]:
override def apply(state: State): State =
state.flatMap: q =>
if q.isEmpty then None else Some(q.tail)
90 changes: 90 additions & 0 deletions src/main/scala/actor/AsyncActorBounded.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package ornicar.scalalib
package actor

import java.util.concurrent.atomic.AtomicReference
import java.util.function.UnaryOperator
import scala.collection.immutable.Queue
import scala.concurrent.{ ExecutionContext, Future, Promise }

import ornicar.scalalib.model.Max

/*
* Sequential like an actor, but for async functions,
* and using an atomic backend instead of akka actor.
*/
final class AsyncActorBounded(
maxSize: Max,
name: String,
monitor: AsyncActorBounded.Monitor
)(
process: AsyncActor.ReceiveAsync
)(using ExecutionContext):

import AsyncActorBounded.*

def !(msg: Matchable): Boolean =
stateRef
.getAndUpdate: state =>
Some:
state.fold(emptyQueue): q =>
if q.size >= maxSize.value then q
else q.enqueue(msg)
.match
case None => // previous state was idle, we can run immediately
run(msg)
true
case Some(q) =>
val success = q.size < maxSize.value
if !success then monitor.overflow(name)
else if q.size >= monitorQueueSize then monitor.queueSize(name, q.size)
success

def ask[A](makeMsg: Promise[A] => Matchable): Future[A] =
val promise = Promise[A]()
val success = this ! makeMsg(promise)
if !success then promise.failure(new EnqueueException(s"The $name asyncActor queue is full ($maxSize)"))
promise.future

def queueSize = stateRef.get().fold(0)(_.size + 1)

private val monitorQueueSize = maxSize.value / 4

/*
* Idle: None
* Busy: Some(Queue.empty)
* Busy with backlog: Some(Queue.nonEmpty)
*/
private val stateRef: AtomicReference[State] = new AtomicReference(None)

private def run(msg: Matchable): Unit =
process.applyOrElse(msg, fallback).onComplete(postRun)

private val postRun = (_: Matchable) =>
stateRef.getAndUpdate(postRunUpdate).flatMap(_.headOption).foreach(run)

private lazy val fallback = (msg: Any) =>
monitor.unhandled(name, msg)
Future.unit

object AsyncActorBounded:

case class Monitor(
overflow: String => Unit,
queueSize: (String, Int) => Unit,
unhandled: (String, Any) => Unit
)

final class EnqueueException(msg: String) extends Exception(msg)

private case class SizedQueue(queue: Queue[Matchable], size: Int):
def enqueue(a: Matchable) = SizedQueue(queue.enqueue(a), size + 1)
def isEmpty = size == 0
def nonEmpty = !isEmpty
def tailOption = Option.when(nonEmpty)(SizedQueue(queue.tail, size - 1))
def headOption = queue.headOption
private val emptyQueue = SizedQueue(Queue.empty, 0)

private type State = Option[SizedQueue]

private val postRunUpdate = new UnaryOperator[State]:
override def apply(state: State): State = state.flatMap(_.tailOption)
77 changes: 77 additions & 0 deletions src/main/scala/actor/AsyncActorConcMap.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package ornicar.scalalib
package actor

import alleycats.Zero
import scala.concurrent.{ ExecutionContext, Future, Promise }
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Function
import scala.jdk.CollectionConverters.*
import ornicar.scalalib.extensions.*
import ornicar.scalalib.future.FutureExtension.*

trait TellMap[Id]:
def tell(id: Id, msg: Matchable): Unit

final class AsyncActorConcMap[Id, D <: AsyncActor](
mkAsyncActor: Id => D,
initialCapacity: Int
) extends TellMap[Id]:

def tell(id: Id, msg: Matchable): Unit = getOrMake(id) ! msg

def getOrMake(id: Id): D = asyncActors.computeIfAbsent(id, loadFunction)

def getIfPresent(id: Id): Option[D] = Option(asyncActors.get(id))

def tellIfPresent(id: Id, msg: => Matchable): Unit = getIfPresent(id).foreach(_ ! msg)

def tellAll(msg: Matchable) = asyncActors.forEachValue(16, _ ! msg)

def tellIds(ids: Seq[Id], msg: Matchable): Unit = ids.foreach { tell(_, msg) }

def ask[A](id: Id)(makeMsg: Promise[A] => Matchable): Future[A] = getOrMake(id).ask(makeMsg)

def askIfPresent[A](id: Id)(makeMsg: Promise[A] => Matchable): Future[Option[A]] =
getIfPresent(id).soFu:
_.ask(makeMsg)

def askIfPresentOrZero[A: Zero](id: Id)(makeMsg: Promise[A] => Matchable): Future[A] =
askIfPresent(id)(makeMsg).dmap(_.orZero)

def exists(id: Id): Boolean = asyncActors.get(id) != null

def foreachKey(f: Id => Unit): Unit =
asyncActors.forEachKey(16, f(_))

def tellAllWithAck(makeMsg: Promise[Unit] => Matchable)(using ExecutionContext): Future[Int] =
Future
.sequence(asyncActors.values.asScala.map(_.ask(makeMsg)))
.map(_.size)

def size: Int = asyncActors.size()

def loadOrTell(id: Id, load: () => D, tell: D => Unit): Unit =
asyncActors.compute(
id,
(_, a) =>
Option(a).fold(load()) { present =>
tell(present)
present
}
)

def terminate(id: Id, lastWill: AsyncActor => Unit): Unit =
asyncActors.computeIfPresent(
id,
(_, d) =>
lastWill(d)
nullD
)

private val asyncActors = ConcurrentHashMap[Id, D](initialCapacity)

private val loadFunction = new Function[Id, D]:
def apply(k: Id) = mkAsyncActor(k)

// used to remove entries
var nullD: D = scala.compiletime.uninitialized
51 changes: 51 additions & 0 deletions src/main/scala/actor/AsyncActorSequencer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package ornicar.scalalib
package actor

import com.github.blemale.scaffeine.{ LoadingCache, Scaffeine }
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Future, Promise }

import ornicar.scalalib.model.Max
import ornicar.scalalib.future.FutureAfter
import ornicar.scalalib.future.FutureExtension.*

final class AsyncActorSequencer(
maxSize: Max,
timeout: FiniteDuration,
name: String,
monitor: AsyncActorBounded.Monitor
)(using ExecutionContext, FutureAfter):

import AsyncActorSequencer.*

def apply[A <: Matchable](fu: => Future[A]): Future[A] = run(() => fu)

def run[A <: Matchable](task: Task[A]): Future[A] = asyncActor.ask[A](TaskWithPromise(task, _))

private val asyncActor = AsyncActorBounded(maxSize, name, monitor):
case TaskWithPromise(task, promise) =>
promise.completeWith {
task().withTimeout(timeout, s"AsyncActorSequencer $name")
}.future

// Distributes tasks to many sequencers
final class AsyncActorSequencers[K](
maxSize: Max,
expiration: FiniteDuration,
timeout: FiniteDuration,
name: String,
monitor: AsyncActorBounded.Monitor
)(using ExecutionContext, FutureAfter):

def apply[A <: Matchable](key: K)(task: => Future[A]): Future[A] =
sequencers.get(key).run(() => task)

private val sequencers: LoadingCache[K, AsyncActorSequencer] =
cache.scaffeine
.expireAfterAccess(expiration)
.build(key => AsyncActorSequencer(maxSize, timeout, s"$name:$key", monitor))

object AsyncActorSequencer:

private type Task[A <: Matchable] = () => Future[A]
private case class TaskWithPromise[A <: Matchable](task: Task[A], promise: Promise[A])
66 changes: 66 additions & 0 deletions src/main/scala/actor/SyncActor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package ornicar.scalalib
package actor

import java.util.concurrent.atomic.AtomicReference
import java.util.function.UnaryOperator
import scala.collection.immutable.Queue
import scala.concurrent.{ ExecutionContext, Future, Promise }

/*
* Like an actor, but not an actor.
* Uses an Atomic Reference backend for sequentiality.
* Has an unbounded (!) Queue of messages.
*/
abstract class SyncActor(using ExecutionContext):

import SyncActor.*

// implement async behaviour here
protected val process: Receive

protected var isAlive = true

def getIsAlive = isAlive

def stop(): Unit =
isAlive = false

def !(msg: Matchable): Unit =
if isAlive && stateRef
.getAndUpdate(state => Some(state.fold(Queue.empty[Matchable])(_.enqueue(msg))))
.isEmpty
then run(msg)

def ask[A](makeMsg: Promise[A] => Matchable): Future[A] =
val promise = Promise[A]()
this ! makeMsg(promise)
promise.future

def queueSize = stateRef.get().fold(0)(_.size + 1)

/*
* Idle: None
* Busy: Some(Queue.empty)
* Busy with backlog: Some(Queue.nonEmpty)
*/
private val stateRef: AtomicReference[State] = new AtomicReference(None)

private def run(msg: Matchable): Unit =
Future {
process(msg)
}.onComplete(postRun)

private val postRun = (_: Matchable) =>
stateRef.getAndUpdate(postRunUpdate).flatMap(_.headOption).foreach(run)

object SyncActor:

type Receive = Matchable => Unit

private type State = Option[Queue[Matchable]]

private val postRunUpdate = new UnaryOperator[State]:
override def apply(state: State): State =
state.flatMap { q =>
if q.isEmpty then None else Some(q.tail)
}
Loading

0 comments on commit b9471b2

Please sign in to comment.