Skip to content

Commit

Permalink
Implement Monitor trait (wip)
Browse files Browse the repository at this point in the history
  • Loading branch information
lenguyenthanh committed Nov 22, 2023
1 parent 783342e commit 17da26d
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 69 deletions.
3 changes: 2 additions & 1 deletion app/src/main/scala/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ object App extends IOApp.Simple:
_ <- Resource.eval(Logger[IO].info(s"Starting lila-fishnet with config: $config"))
res <- AppResources.instance(config.redis)
lilaClient = LilaClient(res.redisPubsub)
executor <- Resource.eval(Executor.instance(lilaClient))
monitor = Monitor.apply
executor <- Resource.eval(Executor.instance(lilaClient, monitor))
workListenerJob = RedisSubscriberJob(executor, res.redisPubsub)
cleanJob = CleanJob(executor)
httpApi = HttpApi(executor, HealthCheck())
Expand Down
39 changes: 9 additions & 30 deletions app/src/main/scala/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,10 @@ trait Executor:

object Executor:

def apply(using client: LilaClient): IO[Executor] =
instance(client)

val maxSize = 300
type State = Map[WorkId, Work.Move]

def instance(client: LilaClient): IO[Executor] =
def instance(client: LilaClient, monitor: Monitor): IO[Executor] =
Ref.of[IO, State](Map.empty).map: ref =>
new Executor:

Expand Down Expand Up @@ -60,33 +57,31 @@ object Executor:
ref.flatModify: coll =>
coll get workId match
case None =>
coll -> notFound(workId, apikey)
coll -> monitor.notFound(workId, apikey)
case Some(work) if work.isAcquiredBy(apikey) =>
move.uci match
case Some(uci) =>
coll - work.id -> (success(work) >> client.send(Lila.Move(
coll - work.id -> (monitor.success(work) >> client.send(Lila.Move(
work.request.id,
work.request.moves,
uci,
)))
case _ =>
updateOrGiveUp(coll, work.invalid) ->
failure(work, apikey, new Exception("Missing move"))
monitor.failure(work, apikey, new Exception("Missing move"))

case Some(move) =>
coll -> notAcquired(move, apikey)
coll -> monitor.notAcquired(move, apikey)

def clean(since: Instant): IO[Unit] =
ref.update: coll =>
ref.updateAndGet: coll =>
val timedOut = coll.values.filter(_.acquiredBefore(since))
// if (timedOut.nonEmpty)
// logger.debug(s"cleaning ${timedOut.size} of ${coll.size} moves")
timedOut.foldLeft(coll) { (coll, m) =>
timedOut.foldLeft(coll): (coll, m) =>
// logger.info(s"Timeout move $m")
updateOrGiveUp(coll, m.timeout)
}
// monitor.dbSize.update(coll.size.toDouble)
// monitor.dbQueued.update(coll.count(_._2.nonAcquired).toDouble)
// monitor.dbAcquired.update(coll.count(_._2.isAcquired).toDouble)
.flatMap(monitor.updateSize)

def clearIfFull(coll: State): State =
if coll.size > maxSize then
Expand All @@ -102,22 +97,6 @@ object Executor:
else
newState + (move.id -> move)

// report not found
def notFound(id: WorkId, key: ClientKey): IO[Unit] =
IO.println(s"not found $id, $key")

// report not acquired
def notAcquired(work: Work.Move, key: ClientKey): IO[Unit] =
IO.println(s"not acquired $work, $key")

// success
def success(move: Work.Move): IO[Unit] =
IO.println(s"success $move")

// failure
def failure(move: Work.Move, client: ClientKey, ex: Throwable): IO[Unit] =
IO.println(s"failure $move, $client, $ex")

def fromRequest(req: Lila.Request): IO[Move] =
(IO.delay(Work.makeId), IO.realTimeInstant).mapN: (id, now) =>
Move(
Expand Down
80 changes: 49 additions & 31 deletions app/src/main/scala/Monitor.scala
Original file line number Diff line number Diff line change
@@ -1,35 +1,53 @@
package lila.fishnet

import kamon.Kamon
import cats.effect.IO
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit
import kamon.Kamon
import kamon.metric.Timer
import org.typelevel.log4cats.Logger
import java.time.Instant

trait Monitor:
def success(work: Work.Move): IO[Unit]
def failure(work: Work.Move, clientKey: ClientKey, e: Exception): IO[Unit]
def notFound(id: WorkId, clientKey: ClientKey): IO[Unit]
def notAcquired(work: Work.Move, clientKey: ClientKey): IO[Unit]
def updateSize(map: Map[WorkId, Work.Move]): IO[Unit]

object Monitor:

val dbSize = Kamon.gauge("db.size").withoutTags()
val dbQueued = Kamon.gauge("db.queued").withoutTags()
val dbAcquired = Kamon.gauge("db.acquired").withoutTags()
val lvl8AcquiredTimeRequest: Timer = Kamon.timer("move.acquired.lvl8").withoutTags()
val lvl1FullTimeRequest = Kamon.timer("move.full.lvl1").withoutTags()

def apply(using Logger[IO]): Monitor =
new Monitor:
def success(work: Work.Move): IO[Unit] =
IO.realTimeInstant.map: now =>
if work.request.level == 8 then
work.acquiredAt.foreach(at => record(lvl8AcquiredTimeRequest, at, now))
if work.request.level == 1 then
record(lvl1FullTimeRequest, work.createdAt, now)

def failure(work: Work.Move, clientKey: ClientKey, e: Exception) =
Logger[IO].warn(e)(s"Received invalid move ${work.id} for ${work.request.id} by $clientKey")

def notFound(id: WorkId, clientKey: ClientKey) =
Logger[IO].info(s"Received unknown work $id by $clientKey")

def notAcquired(work: Work.Move, clientKey: ClientKey) =
Logger[IO].info(
s"Received unacquired move ${work.id} for ${work.request.id} by $clientKey. Work current tries: ${work.tries} acquired: ${work.acquired}"
)

def updateSize(map: Map[WorkId, Work.Move]): IO[Unit] =
IO.delay(dbSize.update(map.size.toDouble)) *>
IO.delay(dbQueued.update(map.count(_._2.nonAcquired).toDouble)) *>
IO.delay(dbAcquired.update(map.count(_._2.isAcquired).toDouble)).void

class Monitor:

val dbSize = Kamon.gauge("db.size").withoutTags()
val dbQueued = Kamon.gauge("db.queued").withoutTags()
val dbAcquired = Kamon.gauge("db.acquired").withoutTags()
val lvl8AcquiredTimeRequest = Kamon.timer("move.acquired.lvl8").withoutTags()
val lvl1FullTimeRequest = Kamon.timer("move.full.lvl1").withoutTags()

def success(work: Work.Move) =
val now = System.currentTimeMillis
if work.request.level == 8 then
work.acquiredAt foreach { acquiredAt =>
lvl8AcquiredTimeRequest.record(now - acquiredAt.toEpochMilli(), TimeUnit.MILLISECONDS)
}
if work.request.level == 1 then
lvl1FullTimeRequest.record(now - work.createdAt.toEpochMilli(), TimeUnit.MILLISECONDS)

def failure(work: Work.Move, clientKey: ClientKey, e: Exception) = {
// logger.warn(s"Received invalid move ${work.id} for ${work.game.id} by $clientKey", e)
}

def notFound(id: WorkId, clientKey: ClientKey) = {
// logger.info(s"Received unknown work $id by $clientKey")
}

def notAcquired(work: Work.Move, clientKey: ClientKey) = {
// logger.info(
// s"Received unacquired move ${work.id} for ${work.game.id} by $clientKey. Work current tries: ${work.tries} acquired: ${work.acquired}"
// )
}
private def record(timer: Timer, start: Instant, end: Instant): Unit =
timer.record(start.until(end, ChronoUnit.MILLIS), TimeUnit.MILLISECONDS)
()
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ lazy val app = project
kamonCore,
kamonInflux,
kamonSystemMetrics,
// kamonHttp4s,
log4Cats,
logback,
redis,
Expand Down
16 changes: 9 additions & 7 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ object Dependencies {
val lilaMaven = "lila-maven" at "https://raw.githubusercontent.com/lichess-org/lila-maven/master"

object V {
val fs2 = "3.9.3"
val circe = "0.14.6"
val http4s = "0.23.23"
val ciris = "3.4.0"
val kamon = "2.5.11"
val chess = "15.6.11"
val munit = "1.0.0-M8"
val fs2 = "3.9.3"
val circe = "0.14.6"
val http4s = "0.23.23"
val ciris = "3.4.0"
val kamon = "2.5.11"
val kamonHttp4s = "2.6.6"
val chess = "15.6.11"
val munit = "1.0.0-M8"
}

def http4s(artifact: String) = "org.http4s" %% s"http4s-$artifact" % V.http4s
Expand All @@ -38,6 +39,7 @@ object Dependencies {
val kamonCore = "io.kamon" %% "kamon-core" % V.kamon
val kamonInflux = "io.kamon" %% "kamon-influxdb" % V.kamon
val kamonSystemMetrics = "io.kamon" %% "kamon-system-metrics" % V.kamon
val kamonHttp4s = "io.kamon" %% "kamon-http4s" % V.kamonHttp4s

val http4sDsl = http4s("dsl")
val http4sServer = http4s("ember-server")
Expand Down

0 comments on commit 17da26d

Please sign in to comment.