diff --git a/app/src/main/scala/App.scala b/app/src/main/scala/App.scala index 4d782b2..7676f46 100644 --- a/app/src/main/scala/App.scala +++ b/app/src/main/scala/App.scala @@ -20,7 +20,8 @@ object App extends IOApp.Simple: _ <- Resource.eval(Logger[IO].info(s"Starting lila-fishnet with config: $config")) res <- AppResources.instance(config.redis) lilaClient = LilaClient(res.redisPubsub) - executor <- Resource.eval(Executor.instance(lilaClient)) + monitor = Monitor.apply + executor <- Resource.eval(Executor.instance(lilaClient, monitor)) workListenerJob = RedisSubscriberJob(executor, res.redisPubsub) cleanJob = CleanJob(executor) httpApi = HttpApi(executor, HealthCheck()) diff --git a/app/src/main/scala/Executor.scala b/app/src/main/scala/Executor.scala index f5e5da3..38ad1b7 100644 --- a/app/src/main/scala/Executor.scala +++ b/app/src/main/scala/Executor.scala @@ -25,13 +25,10 @@ trait Executor: object Executor: - def apply(using client: LilaClient): IO[Executor] = - instance(client) - val maxSize = 300 type State = Map[WorkId, Work.Move] - def instance(client: LilaClient): IO[Executor] = + def instance(client: LilaClient, monitor: Monitor): IO[Executor] = Ref.of[IO, State](Map.empty).map: ref => new Executor: @@ -60,33 +57,31 @@ object Executor: ref.flatModify: coll => coll get workId match case None => - coll -> notFound(workId, apikey) + coll -> monitor.notFound(workId, apikey) case Some(work) if work.isAcquiredBy(apikey) => move.uci match case Some(uci) => - coll - work.id -> (success(work) >> client.send(Lila.Move( + coll - work.id -> (monitor.success(work) >> client.send(Lila.Move( work.request.id, work.request.moves, uci, ))) case _ => updateOrGiveUp(coll, work.invalid) -> - failure(work, apikey, new Exception("Missing move")) + monitor.failure(work, apikey, new Exception("Missing move")) + case Some(move) => - coll -> notAcquired(move, apikey) + coll -> monitor.notAcquired(move, apikey) def clean(since: Instant): IO[Unit] = - ref.update: coll => + ref.updateAndGet: coll => val timedOut = coll.values.filter(_.acquiredBefore(since)) // if (timedOut.nonEmpty) // logger.debug(s"cleaning ${timedOut.size} of ${coll.size} moves") - timedOut.foldLeft(coll) { (coll, m) => + timedOut.foldLeft(coll): (coll, m) => // logger.info(s"Timeout move $m") updateOrGiveUp(coll, m.timeout) - } - // monitor.dbSize.update(coll.size.toDouble) - // monitor.dbQueued.update(coll.count(_._2.nonAcquired).toDouble) - // monitor.dbAcquired.update(coll.count(_._2.isAcquired).toDouble) + .flatMap(monitor.updateSize) def clearIfFull(coll: State): State = if coll.size > maxSize then @@ -102,22 +97,6 @@ object Executor: else newState + (move.id -> move) - // report not found - def notFound(id: WorkId, key: ClientKey): IO[Unit] = - IO.println(s"not found $id, $key") - - // report not acquired - def notAcquired(work: Work.Move, key: ClientKey): IO[Unit] = - IO.println(s"not acquired $work, $key") - - // success - def success(move: Work.Move): IO[Unit] = - IO.println(s"success $move") - - // failure - def failure(move: Work.Move, client: ClientKey, ex: Throwable): IO[Unit] = - IO.println(s"failure $move, $client, $ex") - def fromRequest(req: Lila.Request): IO[Move] = (IO.delay(Work.makeId), IO.realTimeInstant).mapN: (id, now) => Move( diff --git a/app/src/main/scala/Monitor.scala b/app/src/main/scala/Monitor.scala index a1ae9c7..3c554c1 100644 --- a/app/src/main/scala/Monitor.scala +++ b/app/src/main/scala/Monitor.scala @@ -1,35 +1,53 @@ package lila.fishnet -import kamon.Kamon +import cats.effect.IO +import java.time.temporal.ChronoUnit import java.util.concurrent.TimeUnit +import kamon.Kamon +import kamon.metric.Timer +import org.typelevel.log4cats.Logger +import java.time.Instant + +trait Monitor: + def success(work: Work.Move): IO[Unit] + def failure(work: Work.Move, clientKey: ClientKey, e: Exception): IO[Unit] + def notFound(id: WorkId, clientKey: ClientKey): IO[Unit] + def notAcquired(work: Work.Move, clientKey: ClientKey): IO[Unit] + def updateSize(map: Map[WorkId, Work.Move]): IO[Unit] + +object Monitor: + + val dbSize = Kamon.gauge("db.size").withoutTags() + val dbQueued = Kamon.gauge("db.queued").withoutTags() + val dbAcquired = Kamon.gauge("db.acquired").withoutTags() + val lvl8AcquiredTimeRequest: Timer = Kamon.timer("move.acquired.lvl8").withoutTags() + val lvl1FullTimeRequest = Kamon.timer("move.full.lvl1").withoutTags() + + def apply(using Logger[IO]): Monitor = + new Monitor: + def success(work: Work.Move): IO[Unit] = + IO.realTimeInstant.map: now => + if work.request.level == 8 then + work.acquiredAt.foreach(at => record(lvl8AcquiredTimeRequest, at, now)) + if work.request.level == 1 then + record(lvl1FullTimeRequest, work.createdAt, now) + + def failure(work: Work.Move, clientKey: ClientKey, e: Exception) = + Logger[IO].warn(e)(s"Received invalid move ${work.id} for ${work.request.id} by $clientKey") + + def notFound(id: WorkId, clientKey: ClientKey) = + Logger[IO].info(s"Received unknown work $id by $clientKey") + + def notAcquired(work: Work.Move, clientKey: ClientKey) = + Logger[IO].info( + s"Received unacquired move ${work.id} for ${work.request.id} by $clientKey. Work current tries: ${work.tries} acquired: ${work.acquired}" + ) + + def updateSize(map: Map[WorkId, Work.Move]): IO[Unit] = + IO.delay(dbSize.update(map.size.toDouble)) *> + IO.delay(dbQueued.update(map.count(_._2.nonAcquired).toDouble)) *> + IO.delay(dbAcquired.update(map.count(_._2.isAcquired).toDouble)).void -class Monitor: - - val dbSize = Kamon.gauge("db.size").withoutTags() - val dbQueued = Kamon.gauge("db.queued").withoutTags() - val dbAcquired = Kamon.gauge("db.acquired").withoutTags() - val lvl8AcquiredTimeRequest = Kamon.timer("move.acquired.lvl8").withoutTags() - val lvl1FullTimeRequest = Kamon.timer("move.full.lvl1").withoutTags() - - def success(work: Work.Move) = - val now = System.currentTimeMillis - if work.request.level == 8 then - work.acquiredAt foreach { acquiredAt => - lvl8AcquiredTimeRequest.record(now - acquiredAt.toEpochMilli(), TimeUnit.MILLISECONDS) - } - if work.request.level == 1 then - lvl1FullTimeRequest.record(now - work.createdAt.toEpochMilli(), TimeUnit.MILLISECONDS) - - def failure(work: Work.Move, clientKey: ClientKey, e: Exception) = { - // logger.warn(s"Received invalid move ${work.id} for ${work.game.id} by $clientKey", e) - } - - def notFound(id: WorkId, clientKey: ClientKey) = { - // logger.info(s"Received unknown work $id by $clientKey") - } - - def notAcquired(work: Work.Move, clientKey: ClientKey) = { - // logger.info( - // s"Received unacquired move ${work.id} for ${work.game.id} by $clientKey. Work current tries: ${work.tries} acquired: ${work.acquired}" - // ) - } + private def record(timer: Timer, start: Instant, end: Instant): Unit = + timer.record(start.until(end, ChronoUnit.MILLIS), TimeUnit.MILLISECONDS) + () diff --git a/build.sbt b/build.sbt index 5dc37cb..1bb9414 100644 --- a/build.sbt +++ b/build.sbt @@ -32,6 +32,7 @@ lazy val app = project kamonCore, kamonInflux, kamonSystemMetrics, + // kamonHttp4s, log4Cats, logback, redis, diff --git a/project/Dependencies.scala b/project/Dependencies.scala index ec3473d..4abc0b4 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -5,13 +5,14 @@ object Dependencies { val lilaMaven = "lila-maven" at "https://raw.githubusercontent.com/lichess-org/lila-maven/master" object V { - val fs2 = "3.9.3" - val circe = "0.14.6" - val http4s = "0.23.23" - val ciris = "3.4.0" - val kamon = "2.5.11" - val chess = "15.6.11" - val munit = "1.0.0-M8" + val fs2 = "3.9.3" + val circe = "0.14.6" + val http4s = "0.23.23" + val ciris = "3.4.0" + val kamon = "2.5.11" + val kamonHttp4s = "2.6.6" + val chess = "15.6.11" + val munit = "1.0.0-M8" } def http4s(artifact: String) = "org.http4s" %% s"http4s-$artifact" % V.http4s @@ -38,6 +39,7 @@ object Dependencies { val kamonCore = "io.kamon" %% "kamon-core" % V.kamon val kamonInflux = "io.kamon" %% "kamon-influxdb" % V.kamon val kamonSystemMetrics = "io.kamon" %% "kamon-system-metrics" % V.kamon + val kamonHttp4s = "io.kamon" %% "kamon-http4s" % V.kamonHttp4s val http4sDsl = http4s("dsl") val http4sServer = http4s("ember-server")