diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index df0cd8e..35f566c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -1,13 +1,25 @@ -name: Test +name: Continuous Integration -on: [push, pull_request] +on: + pull_request: + branches: ['**'] + push: + branches: ['**'] jobs: - openjdk13: + openjdk21: runs-on: ubuntu-latest + container: sbtscala/scala-sbt:eclipse-temurin-jammy-21.0.1_12_1.9.7_3.3.1 steps: - - uses: actions/checkout@v2 - - uses: actions/setup-java@v1 - with: - java-version: 13 - - run: sbt compile + + - name: Checkout current branch + uses: actions/checkout@v4 + + - name: Compile + run: sbt compile + + - name: Test + run: sbt test + + - name: Check Formatting + run: sbt scalafmtCheckAll diff --git a/.scalafmt.conf b/.scalafmt.conf index ea5cb92..73954e3 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,6 +1,21 @@ -version = "2.6.3" +version = "3.7.17" +runner.dialect = scala3 + align.preset = more maxColumn = 110 spaces.inImportCurlyBraces = true -rewrite.rules = [SortImports, RedundantParens, SortModifiers] +rewrite.rules = [SortModifiers] rewrite.redundantBraces.stringInterpolation = true + +rewrite.scala3.convertToNewSyntax = yes +rewrite.scala3.removeOptionalBraces = yes + +fileOverride { + "glob:**/build.sbt" { + runner.dialect = scala213 + } + "glob:**/project/**" { + runner.dialect = scala213 + } +} + diff --git a/README.md b/README.md index 1b66f80..5009b96 100644 --- a/README.md +++ b/README.md @@ -1,36 +1,41 @@ +# Lila Fishnet + Proxy between lila and fishnet move clients ``` lila <-> redis <-> lila-fishnet <- http <- fishnet-clients ``` +## Developement + Start: -``` +```sh sbt ``` -Start with custom port: -``` -sbt -Dhttp.port=9665 +Start with default config: +```sh +sbt app/run ``` -Start with custom config file: +Start with custom config (`redis.host` for example): ``` -sbt -Dconfig.file=/path/to/my.conf +sbt -Dredis.host=redis ``` -Custom config file example: +For other `config` check [Config.scala](https://github.com/lichess-org/lila-fishnet/blob/master/app/src/main/scala/Config.scala) + +Run all tests (required Docker for IntegrationTest): ``` -include "application" -redis.uri = "redis://127.0.0.1" +sbt app/test ``` -Code formatting -### - -This repository uses [scalafmt](https://scalameta.org/scalafmt/). - -Please [install it for your code editor](https://scalameta.org/scalafmt/docs/installation.html) -if you're going to contribute to this project. +Run a single test: +``` +sbt app/testOnly lila.fishnet.ExecutorTest +``` -If you don't install it, please run `scalafmtAll` in the sbt console before committing. +Format: +``` +sbt scalafmtAll +``` diff --git a/app/AppLoader.scala b/app/AppLoader.scala deleted file mode 100644 index 5f401b0..0000000 --- a/app/AppLoader.scala +++ /dev/null @@ -1,46 +0,0 @@ -package lila.app - -import play.api._ -import play.api.routing.{ Router, SimpleRouter } -import play.api.routing.sird._ -import akka.actor.ActorSystem - -final class AppLoader extends ApplicationLoader { - def load(ctx: ApplicationLoader.Context): Application = new LilaComponents(ctx).application -} - -final class LilaComponents(ctx: ApplicationLoader.Context) extends BuiltInComponentsFromContext(ctx) { - - LoggerConfigurator(ctx.environment.classLoader).foreach { - _.configure(ctx.environment, ctx.initialConfiguration, Map.empty) - } - - println { - val java = System.getProperty("java.version") - val mem = Runtime.getRuntime().maxMemory() / 1024 / 1024 - s"lila-fishnet ${ctx.environment.mode} / java ${java}, memory: ${mem}MB" - } - - override val httpFilters = Seq.empty - - import _root_.controllers._ - - implicit def system: ActorSystem = actorSystem - - lazy val moveDb = new lila.fishnet.MoveDb - lazy val redis = new lila.fishnet.Lila(moveDb, configuration) - lazy val controller = new FishnetController(configuration, redis, moveDb, controllerComponents) - - // eagerly wire up all controllers - val router: Router = new SimpleRouter { - def routes: Router.Routes = { - case POST(p"/fishnet/acquire") => controller.acquire - case POST(p"/fishnet/move/$workId") => controller.move(workId) - } - } - - if (configuration.get[Boolean]("kamon.enabled")) { - println("Kamon is enabled") - kamon.Kamon.loadModules() - } -} diff --git a/app/JsonApi.scala b/app/JsonApi.scala deleted file mode 100644 index 4b42936..0000000 --- a/app/JsonApi.scala +++ /dev/null @@ -1,87 +0,0 @@ -package lila.fishnet - -import play.api.libs.json._ - -import chess.format.{ FEN, Uci } -import chess.variant.Variant -import lila.fishnet.{ Work => W } - -object JsonApi { - - sealed trait Request { - val fishnet: Request.Fishnet - def clientKey = fishnet.apikey - } - - object Request { - - sealed trait Result - - case class Fishnet(apikey: ClientKey) - - case class Acquire(fishnet: Fishnet) extends Request - - case class PostMove(fishnet: Fishnet, move: MoveResult) extends Request with Result - - case class MoveResult(bestmove: String) { - def uci: Option[Uci] = Uci(bestmove) - } - } - - case class Game( - game_id: String, - position: FEN, - variant: Variant, - moves: String - ) - - def fromGame(g: W.Game) = - Game( - game_id = g.id, - position = g.initialFen | g.variant.initialFen, - variant = g.variant, - moves = g.moves - ) - - sealed trait Work { - val id: String - val game: Game - } - case class Move( - id: String, - level: Int, - game: Game, - clock: Option[Work.Clock] - ) extends Work - - def moveFromWork(m: Work.Move) = Move(m.id.value, m.level, fromGame(m.game), m.clock) - - object readers { - implicit val ClientKeyReads: Reads[ClientKey] = Reads.of[String].map(new ClientKey(_)) - implicit val FishnetReads: Reads[Request.Fishnet] = Json.reads[Request.Fishnet] - implicit val AcquireReads: Reads[Request.Acquire] = Json.reads[Request.Acquire] - implicit val MoveResultReads: Reads[Request.MoveResult] = Json.reads[Request.MoveResult] - implicit val PostMoveReads: Reads[Request.PostMove] = Json.reads[Request.PostMove] - } - - object writers { - implicit val VariantWrites: Writes[Variant] = Writes[Variant] { v => JsString(v.key) } - implicit val FENWrites: Writes[FEN] = Writes[FEN] { fen => JsString(fen.value) } - implicit val GameWrites: Writes[Game] = Json.writes[Game] - implicit val ClockWrites: Writes[Work.Clock] = Json.writes[Work.Clock] - implicit val WorkIdWrites: Writes[Work.Id] = Writes[Work.Id] { id => JsString(id.value) } - implicit val WorkWrites: OWrites[Work] = OWrites[Work] { work => - (work match { - case m: Move => - Json.obj( - "work" -> Json.obj( - "type" -> "move", - "id" -> m.id, - "level" -> m.level, - "clock" -> m.clock - ) - ) - }) ++ Json.toJson(work.game).as[JsObject] - } - } -} diff --git a/app/Lila.scala b/app/Lila.scala deleted file mode 100644 index a3e8cdf..0000000 --- a/app/Lila.scala +++ /dev/null @@ -1,82 +0,0 @@ -package lila.fishnet - -import chess.format.{ FEN, Uci } -import io.lettuce.core._ -import io.lettuce.core.pubsub._ -import org.joda.time.DateTime -import play.api.Configuration -import play.api.Logger - -final class Lila( - moveDb: MoveDb, - config: Configuration -) { - - private val logger = Logger(getClass) - private val redis = RedisClient create config.get[String]("redis.uri") - - def pubsub(chanIn: String, chanOut: String): Lila.Move => Unit = { - - val connIn = redis.connectPubSub() - val connOut = redis.connectPubSub() - - def send(move: Lila.Move): Unit = connIn.async.publish(chanIn, move.write) - - connOut.addListener(new RedisPubSubAdapter[String, String] { - override def message(chan: String, msg: String): Unit = - Lila.readMoveReq(msg) match { - case None => logger warn s"LilaOut invalid move $msg" - case Some(req) => moveDb add req - } - }) - - connOut.async.subscribe(chanOut) thenRun { () => connIn.async.publish(chanIn, "start") } - - send - } -} - -object Lila { - - case class Move(game: Work.Game, uci: Uci) { - def sign = game.moves.takeRight(20).replace(" ", "") - def write = s"${game.id} $sign ${uci.uci}" - } - - def readMoveReq(msg: String): Option[Work.Move] = - msg.split(";", 6) match { - case Array(gameId, levelS, clockS, variantS, initialFenS, moves) => - for { - level <- levelS.toIntOption - variant = chess.variant.Variant.orDefault(variantS) - initialFen = if (initialFenS.isEmpty) None else Some(FEN(initialFenS)) - clock = readClock(clockS) - } yield Work.Move( - _id = Work.makeId, - game = Work.Game( - id = gameId, - initialFen = initialFen, - variant = variant, - moves = moves - ), - level = level, - clock = clock, - tries = 0, - lastTryByKey = None, - acquired = None, - createdAt = DateTime.now - ) - case _ => None - } - - def readClock(s: String) = - s split ' ' match { - case Array(ws, bs, incs) => - for { - wtime <- ws.toIntOption - btime <- bs.toIntOption - inc <- incs.toIntOption - } yield Work.Clock(wtime, btime, inc) - case _ => None - } -} diff --git a/app/MoveDb.scala b/app/MoveDb.scala deleted file mode 100644 index d474cc0..0000000 --- a/app/MoveDb.scala +++ /dev/null @@ -1,158 +0,0 @@ -package lila.fishnet - -import akka.actor._ -import akka.pattern.ask -import java.util.concurrent.TimeUnit -import kamon.Kamon -import org.joda.time.DateTime -import play.api.Logger -import scala.concurrent.duration._ -import scala.concurrent.{ ExecutionContext, Future } - -final class MoveDb(implicit system: ActorSystem, ec: ExecutionContext) { - - import MoveDb._ - import Work.Move - - implicit private val timeout = new akka.util.Timeout(2.seconds) - - def add(move: Move) = actor ! Add(move) - - def acquire(clientKey: ClientKey): Future[Option[Move]] = { - actor ? Acquire(clientKey) mapTo manifest[Option[Move]] - } - - def postResult( - workId: Work.Id, - data: JsonApi.Request.PostMove - ): Future[Option[Lila.Move]] = { - actor ? PostResult(workId, data) mapTo manifest[Option[Lila.Move]] - } - - private val actor = system.actorOf(Props(new Actor { - - val coll = scala.collection.mutable.Map.empty[Work.Id, Move] - - val maxSize = 300 - - def receive = { - - case Add(move) if !coll.exists(_._2 similar move) => coll += (move.id -> move) - - case Add(move) => - clearIfFull() - coll += (move.id -> move) - - case Acquire(clientKey) => - sender() ! coll.values - .foldLeft[Option[Move]](None) { - case (found, m) if m.nonAcquired => - Some { - found.fold(m) { a => - if (m.canAcquire(clientKey) && m.createdAt.isBefore(a.createdAt)) m else a - } - } - case (found, _) => found - } - .map { m => - val move = m assignTo clientKey - coll += (move.id -> move) - move - } - - case PostResult(workId, data) => - coll get workId match { - case None => - monitor.notFound(workId, data.clientKey) - sender() ! None - case Some(move) if move isAcquiredBy data.clientKey => - data.move.uci match { - case Some(uci) => - sender() ! Some(Lila.Move(move.game, uci)) - coll -= move.id - monitor.success(move) - case _ => - sender() ! None - updateOrGiveUp(move.invalid) - monitor.failure(move, data.clientKey, new Exception("Missing move")) - } - case Some(move) => - sender() ! None - monitor.notAcquired(move, data.clientKey) - } - - case Clean => - val since = DateTime.now minusSeconds 3 - val timedOut = coll.values.filter(_ acquiredBefore since) - if (timedOut.nonEmpty) logger.debug(s"cleaning ${timedOut.size} of ${coll.size} moves") - timedOut.foreach { m => - logger.info(s"Timeout move $m") - updateOrGiveUp(m.timeout) - } - monitor.dbSize.update(coll.size.toDouble) - monitor.dbQueued.update(coll.count(_._2.nonAcquired).toDouble) - monitor.dbAcquired.update(coll.count(_._2.isAcquired).toDouble) - } - - def updateOrGiveUp(move: Move) = - if (move.isOutOfTries) { - logger.warn(s"Give up on move $move") - coll -= move.id - } else coll += (move.id -> move) - - def clearIfFull() = - if (coll.size > maxSize) { - logger.warn(s"MoveDB collection is full! maxSize=$maxSize. Dropping all now!") - coll.clear() - } - })) - - system.scheduler.scheduleWithFixedDelay(5.seconds, 3.seconds) { () => - actor ? Clean mapTo manifest[Iterable[Move]] map { moves => - moves foreach { move => logger.info(s"Timeout move $move") } - } - } - - private val logger = Logger(getClass) - private val monitor = new Monitor(logger) -} - -object MoveDb { - - private case class Add(move: Work.Move) - private case class Acquire(clientKey: ClientKey) - private case class PostResult(workId: Work.Id, data: JsonApi.Request.PostMove) - private object Clean - - final private class Monitor(logger: Logger) { - - val dbSize = Kamon.gauge("db.size").withoutTags() - val dbQueued = Kamon.gauge("db.queued").withoutTags() - val dbAcquired = Kamon.gauge("db.acquired").withoutTags() - val lvl8AcquiredTimeRequest = Kamon.timer("move.acquired.lvl8").withoutTags() - val lvl1FullTimeRequest = Kamon.timer("move.full.lvl1").withoutTags() - - def success(work: Work.Move) = { - val now = System.currentTimeMillis - if (work.level == 8) work.acquiredAt foreach { acquiredAt => - lvl8AcquiredTimeRequest.record(now - acquiredAt.getMillis, TimeUnit.MILLISECONDS) - } - if (work.level == 1) - lvl1FullTimeRequest.record(now - work.createdAt.getMillis, TimeUnit.MILLISECONDS) - } - - def failure(work: Work, clientKey: ClientKey, e: Exception) = { - logger.warn(s"Received invalid move ${work.id} for ${work.game.id} by $clientKey", e) - } - - def notFound(id: Work.Id, clientKey: ClientKey) = { - logger.info(s"Received unknown work $id by $clientKey") - } - - def notAcquired(work: Work, clientKey: ClientKey) = { - logger.info( - s"Received unacquired move ${work.id} for ${work.game.id} by $clientKey. Work current tries: ${work.tries} acquired: ${work.acquired}" - ) - } - } -} diff --git a/app/Work.scala b/app/Work.scala deleted file mode 100644 index 935bf2a..0000000 --- a/app/Work.scala +++ /dev/null @@ -1,85 +0,0 @@ -package lila.fishnet - -import org.joda.time.DateTime -import chess.variant.Variant -import chess.format.FEN - -sealed trait Work { - def _id: Work.Id - def game: Work.Game - def tries: Int - def lastTryByKey: Option[ClientKey] - def acquired: Option[Work.Acquired] - def createdAt: DateTime - - def id = _id - - def acquiredAt = acquired.map(_.date) - def acquiredByKey = acquired.map(_.clientKey) - def isAcquiredBy(clientKey: ClientKey) = acquiredByKey contains clientKey - def isAcquired = acquired.isDefined - def nonAcquired = !isAcquired - def canAcquire(clientKey: ClientKey) = lastTryByKey.fold(true)(clientKey.!=) - - def acquiredBefore(date: DateTime) = acquiredAt.fold(false)(_ isBefore date) -} - -object Work { - - case class Id(value: String) extends AnyVal with StringValue - - case class Acquired( - clientKey: ClientKey, - date: DateTime - ) { - - def ageInMillis = System.currentTimeMillis - date.getMillis - - override def toString = s"by $clientKey at $date" - } - - case class Game( - id: String, // can be a study chapter ID, if studyId is set - initialFen: Option[FEN], - variant: Variant, - moves: String - ) - - case class Clock(wtime: Int, btime: Int, inc: Int) - - case class Move( - _id: Work.Id, // random - game: Game, - level: Int, - clock: Option[Work.Clock], - tries: Int, - lastTryByKey: Option[ClientKey], - acquired: Option[Acquired], - createdAt: DateTime - ) extends Work { - - def assignTo(clientKey: ClientKey) = - copy( - acquired = Some( - Acquired( - clientKey = clientKey, - date = DateTime.now - ) - ), - lastTryByKey = Some(clientKey), - tries = tries + 1 - ) - - def timeout = copy(acquired = None) - def invalid = copy(acquired = None) - - def isOutOfTries = tries >= 3 - - def similar(to: Move) = game.id == to.game.id && game.moves == to.game.moves - - override def toString = - s"id:$id game:${game.id} variant:${game.variant.key} level:$level tries:$tries created:$createdAt acquired:$acquired" - } - - def makeId = Id(scala.util.Random.alphanumeric.take(8).mkString) -} diff --git a/app/controllers/FishnetController.scala b/app/controllers/FishnetController.scala deleted file mode 100644 index 7e54f59..0000000 --- a/app/controllers/FishnetController.scala +++ /dev/null @@ -1,62 +0,0 @@ -package controllers - -import play.api.Configuration -import play.api.libs.json._ -import play.api.mvc._ -import scala.concurrent.{ ExecutionContext, Future } - -import lila.fishnet._ - -class FishnetController( - config: Configuration, - lila: Lila, - moveDb: MoveDb, - val controllerComponents: ControllerComponents -)(implicit ec: ExecutionContext) - extends BaseController { - - val logger = play.api.Logger(getClass) - - val version = System.getProperty("java.version") - val memory = Runtime.getRuntime().maxMemory() / 1024 / 1024 - val useKamon = config.get[String]("kamon.influxdb.hostname").nonEmpty - - logger.info(s"lila-fishnet netty kamon=$useKamon") - logger.info(s"Java version: $version, memory: ${memory}MB") - - if (useKamon) kamon.Kamon.loadModules() - - import JsonApi.readers._ - import JsonApi.writers._ - - val sendMove = lila.pubsub("fishnet-in", "fishnet-out") - - def acquire = ClientAction[JsonApi.Request.Acquire](doAcquire) - - def move(workId: String) = - ClientAction[JsonApi.Request.PostMove] { data => - moveDb.postResult(Work.Id(workId), data) flatMap { move => - move foreach sendMove - doAcquire(data) - } - } - - private def doAcquire(req: JsonApi.Request): Future[Option[JsonApi.Work]] = - moveDb.acquire(req.clientKey) map { _ map JsonApi.moveFromWork } - - private def ClientAction[A <: JsonApi.Request]( - f: A => Future[Option[JsonApi.Work]] - )(implicit reads: Reads[A]) = - Action.async(parse.tolerantJson) { req => - req.body - .validate[A] - .fold( - err => Future successful BadRequest(JsError toJson err), - data => - f(data).map { - case Some(work) => Accepted(Json toJson work) - case None => NoContent - } - ) - } -} diff --git a/app/model.scala b/app/model.scala deleted file mode 100644 index 550ff4c..0000000 --- a/app/model.scala +++ /dev/null @@ -1,14 +0,0 @@ -package lila.fishnet - -trait StringValue extends Any { - def value: String - override def toString = value -} -trait IntValue extends Any { - def value: Int - override def toString = value.toString -} - -case class IpAddress(value: String) extends AnyVal with StringValue - -case class ClientKey(value: String) extends AnyVal with StringValue diff --git a/conf/application.conf b/app/src/main/resources/application.conf similarity index 55% rename from conf/application.conf rename to app/src/main/resources/application.conf index 936774b..004ccce 100644 --- a/conf/application.conf +++ b/app/src/main/resources/application.conf @@ -1,26 +1,12 @@ -http.port = 9665 -https.port = disabled -redis.uri = "redis://127.0.0.1" - -akka { - loggers = ["akka.event.slf4j.Slf4jLogger"] - loglevel = "DEBUG" - logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" -} - -play { - mode = prod - application.loader = "lila.app.AppLoader" - server.netty.transport = "native" - http.secret.key="T6CNiunfamFP5ZCmxC8sSrtwICGOa6XKx9N0txhKiAIR94AJPAdHZ12xVXQwAq5a" -} - kamon { enabled = false environment.service = "lila-fishnet" metric.tick-interval = 60 seconds influxdb { - hostname = "" + authentication { + token = "NCnPKbCqXinm46K86lVfIhwXD1_BaJaRaftNeqNWB6-34X2YUMNbZT6DnT3RtJgnFoaY7lyRrO_NGJFAteRP2g==" + } + hostname = "127.0.0.1" port = 8086 database = "kamon" percentiles = [50.0, 75.0, 90.0, 95.0, 99.0, 99.9] diff --git a/app/src/main/resources/logback.xml b/app/src/main/resources/logback.xml new file mode 100644 index 0000000..0c41428 --- /dev/null +++ b/app/src/main/resources/logback.xml @@ -0,0 +1,22 @@ + + + + logs/application.log + + %date [%thread] %-5level %logger{20} - %msg%n%xException + + + + + + %date [%thread] %-5level %logger{20} - %msg%n%xException + + + + + + + + + + diff --git a/app/src/main/scala/App.scala b/app/src/main/scala/App.scala new file mode 100644 index 0000000..9346ea2 --- /dev/null +++ b/app/src/main/scala/App.scala @@ -0,0 +1,37 @@ +package lila.fishnet + +import cats.effect.* +import cats.effect.kernel.Resource +import cats.effect.{ IO, IOApp } +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +import lila.fishnet.http.* + +object App extends IOApp.Simple: + + given Logger[IO] = Slf4jLogger.getLogger[IO] + + override def run: IO[Unit] = app.useForever + + def app: Resource[IO, Unit] = + for + config <- Resource.eval(AppConfig.load) + _ <- Resource.eval(Logger[IO].info(s"Starting lila-fishnet with config: $config")) + _ <- Resource.eval(KamonInitiator.apply.init(config.kamon)) + res <- AppResources.instance(config.redis) + _ <- FishnetApp(res, config).run() + yield () + +class FishnetApp(res: AppResources, config: AppConfig)(using Logger[IO]): + def run(): Resource[IO, Unit] = + for + lilaClient <- Resource.pure(LilaClient(res.redisPubsub)) + monitor = Monitor.apply + executor <- Resource.eval(Executor.instance(lilaClient, monitor, config.executor)) + httpApi = HttpApi(executor, HealthCheck(), config.server) + server <- MkHttpServer.apply.newEmber(config.server, httpApi.httpApp) + _ <- RedisSubscriberJob(executor, res.redisPubsub).run().background + _ <- WorkCleaningJob(executor).run().background + _ <- Resource.eval(Logger[IO].info(s"Starting server on ${config.server.host}:${config.server.port}")) + yield () diff --git a/app/src/main/scala/AppConfig.scala b/app/src/main/scala/AppConfig.scala new file mode 100644 index 0000000..739fc96 --- /dev/null +++ b/app/src/main/scala/AppConfig.scala @@ -0,0 +1,51 @@ +package lila.fishnet + +import cats.effect.IO +import cats.syntax.all.* +import ciris.* +import ciris.http4s.* +import com.comcast.ip4s.* + +object AppConfig: + + def load: IO[AppConfig] = appConfig.load[IO] + + def appConfig = ( + RedisConfig.config, + HttpServerConfig.config, + KamonConfig.config, + ExecutorConfg.config + ).parMapN(AppConfig.apply) + +case class AppConfig( + redis: RedisConfig, + server: HttpServerConfig, + kamon: KamonConfig, + executor: ExecutorConfig +) + +case class HttpServerConfig(host: Host, port: Port, apiLogger: Boolean) + +object HttpServerConfig: + def host = env("HTTP_HOST").or(prop("http.host")).as[Host].default(ip"0.0.0.0") + def port = env("HTTP_PORT").or(prop("http.port")).as[Port].default(port"9665") + def logger = env("HTTP_API_LOGGER").or(prop("http.api.logger")).as[Boolean].default(false) + def config = (host, port, logger).parMapN(HttpServerConfig.apply) + +case class RedisConfig(host: Host, port: Port) + +object RedisConfig: + private def host = env("REDIS_HOST").or(prop("redis.host")).as[Host].default(ip"127.0.0.1") + private def port = env("REDIS_PORT").or(prop("redis.port")).as[Port].default(port"6379") + def config = (host, port).parMapN(RedisConfig.apply) + +case class KamonConfig(enabled: Boolean) + +object KamonConfig: + def config = + env("KAMON_ENABLED").or(prop("kamon.enabled")).as[Boolean].default(false).map(KamonConfig.apply) + +case class ExecutorConfig(maxSize: Int) +object ExecutorConfg: + def maxSize = env("APP_MAX_MOVE_SIZE").or(prop("app.max.move.size")).as[Int].default(300) + def config = maxSize.map(ExecutorConfig.apply) diff --git a/app/src/main/scala/AppResources.scala b/app/src/main/scala/AppResources.scala new file mode 100644 index 0000000..4f92e84 --- /dev/null +++ b/app/src/main/scala/AppResources.scala @@ -0,0 +1,23 @@ +package lila.fishnet + +import cats.* +import cats.effect.* +import io.chrisdavenport.rediculous.{ RedisConnection, RedisPubSub } + +class AppResources private (val redisPubsub: RedisPubSub[IO]) + +object AppResources: + + // maxQueued: How many elements before new submissions semantically block. Tradeoff of memory to queue jobs. + // Default 1000 is good for small servers. But can easily take 100,000. + // workers: How many threads will process pipelined messages. + def instance(conf: RedisConfig): Resource[IO, AppResources] = + RedisConnection + .queued[IO] + .withHost(conf.host) + .withPort(conf.port) + .withMaxQueued(1000) + .withWorkers(workers = 2) + .build + .flatMap(RedisPubSub.fromConnection(_, 4096)) + .map(AppResources(_)) diff --git a/app/src/main/scala/Executor.scala b/app/src/main/scala/Executor.scala new file mode 100644 index 0000000..c034f2b --- /dev/null +++ b/app/src/main/scala/Executor.scala @@ -0,0 +1,117 @@ +package lila.fishnet + +import cats.effect.IO +import cats.effect.kernel.Ref +import cats.syntax.all.* +import java.time.Instant +import lila.fishnet.Lila.Request +import lila.fishnet.Work.Move +import org.typelevel.log4cats.Logger + +/** Executor is responsible for: store work in memory + * - getting work from the queue + * - sending work to lila + * - adding work to the queue + */ +trait Executor: + // get a move from the queue return Work + def acquire(accquire: ClientKey): IO[Option[Work.RequestWithId]] + // get Work from Map => send to lila + def move(workId: WorkId, fishnetKey: ClientKey, move: BestMove): IO[Unit] + // add work to queue + def add(work: Lila.Request): IO[Unit] + def clean(before: Instant): IO[Unit] + +object Executor: + + type State = Map[WorkId, Work.Move] + + def instance(client: LilaClient, monitor: Monitor, confg: ExecutorConfig)(using Logger[IO]): IO[Executor] = + Ref + .of[IO, State](Map.empty) + .map: ref => + new Executor: + + def add(work: Request): IO[Unit] = + fromRequest(work).flatMap: move => + ref.flatModify: m => + val (x, o) = clearIfFull(m) + x + (move.id -> move) -> o + + def acquire(key: ClientKey): IO[Option[Work.RequestWithId]] = + IO.realTimeInstant.flatMap: at => + ref.modify: coll => + coll.values + .foldLeft[Option[Work.Move]](none): + case (found, m) if m.nonAcquired => + Some(found.fold(m): a => + if m.canAcquire(key) && m.createdAt.isBefore(a.createdAt) then m else a) + case (found, _) => found + .map: m => + val move = m.assignTo(key, at) + (coll + (move.id -> move)) -> move.toRequestWithId.some + .getOrElse(coll -> none) + + def move(workId: WorkId, apikey: ClientKey, move: BestMove): IO[Unit] = + ref.flatModify: coll => + coll get workId match + case None => + coll -> monitor.notFound(workId, apikey) + case Some(work) if work.isAcquiredBy(apikey) => + move.uci match + case Some(uci) => + coll - work.id -> (monitor.success(work) >> client.send( + Lila.Move( + work.request.id, + work.request.moves, + uci + ) + )) + case _ => + val (state, failedMove) = updateOrGiveUp(coll, work.invalid) + state -> (Logger[IO].warn(s"Give up move: $failedMove") >> + monitor.failure(work, apikey, new Exception("Missing move"))) + + case Some(move) => + coll -> monitor.notAcquired(move, apikey) + + def clean(since: Instant): IO[Unit] = + ref + .flatModify: coll => + val timedOut = coll.values.filter(_.acquiredBefore(since)).toList + val logIfTimedOut = + if timedOut.nonEmpty then + Logger[IO].debug(s"cleaning ${timedOut.size} of ${coll.size} moves") >> + timedOut.traverse_(m => Logger[IO].info(s"Timeout move: $m")) + else IO.unit + val (state, gavedUpMoves) = timedOut.foldLeft[(State, List[Work.Move])](coll -> Nil): + (x, m) => + val (newState, move) = updateOrGiveUp(x._1, m.timeout) + (newState, move.fold(x._2)(_ :: x._2)) + + state -> (logIfTimedOut *> gavedUpMoves + .traverse_(m => Logger[IO].warn(s"Give up move: $m")) + .as(state)) + .flatMap(monitor.updateSize) + + def clearIfFull(coll: State): (State, IO[Unit]) = + if coll.size >= confg.maxSize then + Map.empty -> Logger[IO].warn( + s"MoveDB collection is full! maxSize=${confg.maxSize}. Dropping all now!" + ) + else coll -> IO.unit + + def updateOrGiveUp(state: State, move: Work.Move): (State, Option[Work.Move]) = + val newState = state - move.id + if move.isOutOfTries then (newState, move.some) + else (newState + (move.id -> move), none) + + def fromRequest(req: Lila.Request): IO[Move] = + (IO.delay(Work.makeId), IO.realTimeInstant).mapN: (id, now) => + Move( + id = id, + request = req, + tries = 0, + acquired = None, + createdAt = now + ) diff --git a/app/src/main/scala/Jobs.scala b/app/src/main/scala/Jobs.scala new file mode 100644 index 0000000..9e1604b --- /dev/null +++ b/app/src/main/scala/Jobs.scala @@ -0,0 +1,39 @@ +package lila.fishnet + +import cats.effect.IO +import io.chrisdavenport.rediculous.RedisPubSub +import org.typelevel.log4cats.Logger +import Lila.Request + +import scala.concurrent.duration.* + +trait RedisSubscriberJob: + def run(): IO[Unit] + +object RedisSubscriberJob: + def apply(executor: Executor, pubsub: RedisPubSub[IO])(using Logger[IO]): RedisSubscriberJob = + new RedisSubscriberJob: + def run(): IO[Unit] = + Logger[IO].info("Subscribing to fishnet-out") *> + pubsub.subscribe( + "fishnet-out", + msg => + Lila + .readMoveReq(msg.message) + .match + case Some(request) => executor.add(request) + case None => Logger[IO].warn(s"Failed to parse message: $msg") + >> Logger[IO].debug(s"Received message: $msg") + ) *> pubsub.runMessages + +trait WorkCleaningJob: + def run(): IO[Unit] + +object WorkCleaningJob: + def apply(executor: Executor)(using Logger[IO]): WorkCleaningJob = + new WorkCleaningJob: + def run(): IO[Unit] = + Logger[IO].info("Start cleaning job") *> + IO.sleep(5.seconds) *> + (IO.realTimeInstant.flatMap(now => executor.clean(now.minusSeconds(3))) *> + IO.sleep(3.seconds)).foreverM diff --git a/app/src/main/scala/LilaClient.scala b/app/src/main/scala/LilaClient.scala new file mode 100644 index 0000000..19f18e6 --- /dev/null +++ b/app/src/main/scala/LilaClient.scala @@ -0,0 +1,13 @@ +package lila.fishnet + +import cats.effect.IO +import io.chrisdavenport.rediculous.RedisPubSub + +trait LilaClient: + def send(move: Lila.Move): IO[Unit] + +object LilaClient: + + def apply(pubsub: RedisPubSub[IO]): LilaClient = + new LilaClient: + def send(move: Lila.Move): IO[Unit] = pubsub.publish("fishnet-in", move.write).void diff --git a/app/src/main/scala/Monitor.scala b/app/src/main/scala/Monitor.scala new file mode 100644 index 0000000..d210820 --- /dev/null +++ b/app/src/main/scala/Monitor.scala @@ -0,0 +1,51 @@ +package lila.fishnet + +import cats.effect.IO +import java.time.temporal.ChronoUnit +import java.util.concurrent.TimeUnit +import kamon.Kamon +import kamon.metric.Timer +import org.typelevel.log4cats.Logger +import java.time.Instant + +trait Monitor: + def success(work: Work.Move): IO[Unit] + def failure(work: Work.Move, clientKey: ClientKey, e: Exception): IO[Unit] + def notFound(id: WorkId, clientKey: ClientKey): IO[Unit] + def notAcquired(work: Work.Move, clientKey: ClientKey): IO[Unit] + def updateSize(map: Map[WorkId, Work.Move]): IO[Unit] + +object Monitor: + + val dbSize = Kamon.gauge("db.size").withoutTags() + val dbQueued = Kamon.gauge("db.queued").withoutTags() + val dbAcquired = Kamon.gauge("db.acquired").withoutTags() + val lvl8AcquiredTimeRequest: Timer = Kamon.timer("move.acquired.lvl8").withoutTags() + val lvl1FullTimeRequest = Kamon.timer("move.full.lvl1").withoutTags() + + def apply(using Logger[IO]): Monitor = + new Monitor: + def success(work: Work.Move): IO[Unit] = + IO.realTimeInstant.map: now => + if work.request.level == 8 then + work.acquiredAt.foreach(at => record(lvl8AcquiredTimeRequest, at, now)) + if work.request.level == 1 then record(lvl1FullTimeRequest, work.createdAt, now) + + def failure(work: Work.Move, clientKey: ClientKey, e: Exception) = + Logger[IO].warn(e)(s"Received invalid move ${work.id} for ${work.request.id} by $clientKey") + + def notFound(id: WorkId, clientKey: ClientKey) = + Logger[IO].info(s"Received unknown work $id by $clientKey") + + def notAcquired(work: Work.Move, clientKey: ClientKey) = + Logger[IO].info( + s"Received unacquired move ${work.id} for ${work.request.id} by $clientKey. Work current tries: ${work.tries} acquired: ${work.acquired}" + ) + + def updateSize(map: Map[WorkId, Work.Move]): IO[Unit] = + IO.delay(dbSize.update(map.size.toDouble)) *> + IO.delay(dbQueued.update(map.count(_._2.nonAcquired).toDouble)) *> + IO.delay(dbAcquired.update(map.count(_._2.isAcquired).toDouble)).void + + private def record(timer: Timer, start: Instant, end: Instant): Unit = + val _ = timer.record(start.until(end, ChronoUnit.MILLIS), TimeUnit.MILLISECONDS) diff --git a/app/src/main/scala/Work.scala b/app/src/main/scala/Work.scala new file mode 100644 index 0000000..a81960f --- /dev/null +++ b/app/src/main/scala/Work.scala @@ -0,0 +1,52 @@ +package lila.fishnet + +import java.time.Instant + +object Work: + + case class RequestWithId(id: WorkId, request: Lila.Request): + def toResponse = + Fishnet.WorkResponse( + work = Fishnet.Work(id = id, level = request.level, clock = request.clock), + game_id = request.id.value, + position = request.initialFen, + moves = request.moves, + variant = request.variant + ) + + case class Acquired(clientKey: ClientKey, date: Instant): + override def toString = s"by $clientKey at $date" + + case class Move( + id: WorkId, + request: Lila.Request, + tries: Int, + acquired: Option[Acquired], + createdAt: Instant + ): + + def toRequestWithId = + RequestWithId(id, request) + + def acquiredAt = acquired.map(_.date) + def acquiredByKey: Option[ClientKey] = acquired.map(_.clientKey) + def isAcquiredBy(clientKey: ClientKey) = acquiredByKey contains clientKey + def isAcquired = acquired.isDefined + def nonAcquired = !isAcquired + def canAcquire(clientKey: ClientKey) = acquired.fold(true)(_.clientKey != clientKey) + def acquiredBefore(date: Instant) = acquiredAt.fold(false)(_.isBefore(date)) + + def assignTo(clientKey: ClientKey, at: Instant) = + copy(acquired = Some(Acquired(clientKey = clientKey, date = at)), tries = tries + 1) + + def timeout = copy(acquired = None) + def invalid = copy(acquired = None) + + def isOutOfTries = tries >= 3 + + def similar(to: Move) = request.id == to.request.id && request.moves == to.request.moves + + override def toString = + s"id:$id game:${request.id} variant:${request.variant.key} level:${request.level} tries:$tries created:$createdAt acquired:$acquired" + + def makeId = WorkId(scala.util.Random.alphanumeric.take(8).mkString) diff --git a/app/src/main/scala/http/FishnetRoutes.scala b/app/src/main/scala/http/FishnetRoutes.scala new file mode 100644 index 0000000..e5f8b89 --- /dev/null +++ b/app/src/main/scala/http/FishnetRoutes.scala @@ -0,0 +1,42 @@ +package lila.fishnet +package http + +import cats.* +import cats.syntax.all.* +import cats.effect.IO +import org.http4s.* +import org.http4s.dsl.Http4sDsl +import org.http4s.server.Router +import org.http4s.circe.CirceEntityDecoder.* +import org.http4s.circe.CirceEntityEncoder.* + +final class FishnetRoutes(executor: Executor) extends Http4sDsl[IO]: + + private[http] val prefixPath = "/fishnet" + + private val httpRoutes: HttpRoutes[IO] = HttpRoutes.of[IO]: + + case req @ POST -> Root / "acquire" => + req + .decode[Fishnet.Acquire]: input => + acquire(input.fishnet.apikey) + + case req @ POST -> Root / "move" / WorkIdVar(id) => + req + .decode[Fishnet.PostMove]: move => + executor.move(id, move.fishnet.apikey, move.move.bestmove) + >> acquire(move.fishnet.apikey) + + def acquire(key: ClientKey): IO[Response[IO]] = + executor + .acquire(key) + .map(_.map(_.toResponse)) + .flatMap(_.fold(NoContent())(Ok(_))) + .recoverWith: + case x => InternalServerError(x.getMessage().nn) + + val routes: HttpRoutes[IO] = Router(prefixPath -> httpRoutes) + +object WorkIdVar: + def unapply(str: String): Option[WorkId] = + WorkId(str).some diff --git a/app/src/main/scala/http/HealthCheck.scala b/app/src/main/scala/http/HealthCheck.scala new file mode 100644 index 0000000..28658e8 --- /dev/null +++ b/app/src/main/scala/http/HealthCheck.scala @@ -0,0 +1,16 @@ +package lila.fishnet +package http + +import io.circe.{ Codec, Decoder, Encoder } +import cats.effect.IO +import HealthCheck.* + +trait HealthCheck: + def status: IO[AppStatus] + +object HealthCheck: + + def apply(): HealthCheck = new HealthCheck: + def status: IO[AppStatus] = IO.pure(AppStatus(true)) + + case class AppStatus(status: Boolean) derives Codec.AsObject diff --git a/app/src/main/scala/http/HealthRoutes.scala b/app/src/main/scala/http/HealthRoutes.scala new file mode 100644 index 0000000..552de43 --- /dev/null +++ b/app/src/main/scala/http/HealthRoutes.scala @@ -0,0 +1,20 @@ +package lila.fishnet +package http + +import cats.* +import cats.effect.IO +import org.http4s.* +import org.http4s.dsl.Http4sDsl +import org.http4s.server.Router +import org.http4s.circe.CirceEntityEncoder.* + +final class HealthRoutes( + healthCheck: HealthCheck +) extends Http4sDsl[IO]: + private[http] val prefixPath = "/health" + + private val httpRoutes: HttpRoutes[IO] = HttpRoutes.of[IO]: + case GET -> Root => + Ok(healthCheck.status) + + val routes: HttpRoutes[IO] = Router(prefixPath -> httpRoutes) diff --git a/app/src/main/scala/http/HttpApi.scala b/app/src/main/scala/http/HttpApi.scala new file mode 100644 index 0000000..881a706 --- /dev/null +++ b/app/src/main/scala/http/HttpApi.scala @@ -0,0 +1,33 @@ +package lila.fishnet +package http + +import scala.concurrent.duration.* + +import cats.syntax.all.* +import cats.effect.IO +import org.http4s.* +import org.http4s.implicits.* +import org.http4s.server.middleware.* + +final class HttpApi(executor: Executor, healthCheck: HealthCheck, config: HttpServerConfig): + + private val fishnetRoutes = FishnetRoutes(executor).routes + private val healthRoutes = HealthRoutes(healthCheck).routes + + private val routes: HttpRoutes[IO] = fishnetRoutes <+> healthRoutes + + type Middleware = HttpRoutes[IO] => HttpRoutes[IO] + + private val autoSlash: Middleware = AutoSlash(_) + private val timeout: Middleware = Timeout(60.seconds) + + private val middleware = autoSlash andThen timeout + + private val loggers: HttpApp[IO] => HttpApp[IO] = + RequestLogger.httpApp[IO](false, true) andThen + ResponseLogger.httpApp[IO, Request[IO]](false, true) + + val httpApp: HttpApp[IO] = + val app = middleware(routes).orNotFound + if config.apiLogger then loggers(app) + else app diff --git a/app/src/main/scala/http/KamonInitiator.scala b/app/src/main/scala/http/KamonInitiator.scala new file mode 100644 index 0000000..fe0087d --- /dev/null +++ b/app/src/main/scala/http/KamonInitiator.scala @@ -0,0 +1,13 @@ +package lila.fishnet + +import cats.effect.IO +import kamon.Kamon + +trait KamonInitiator: + def init(config: KamonConfig): IO[Unit] + +object KamonInitiator: + def apply: KamonInitiator = new KamonInitiator: + def init(config: KamonConfig): IO[Unit] = + if config.enabled then IO(Kamon.init()) + else IO.unit diff --git a/app/src/main/scala/http/MkHttpServer.scala b/app/src/main/scala/http/MkHttpServer.scala new file mode 100644 index 0000000..c8a5e17 --- /dev/null +++ b/app/src/main/scala/http/MkHttpServer.scala @@ -0,0 +1,29 @@ +package lila.fishnet + +import org.http4s.* +import cats.effect.{ IO, Resource } +import fs2.io.net.Network +import org.http4s.ember.server.EmberServerBuilder +import org.http4s.server.Server +import org.http4s.server.defaults.Banner +import org.typelevel.log4cats.Logger + +trait MkHttpServer: + def newEmber(cfg: HttpServerConfig, httpApp: HttpApp[IO]): Resource[IO, Server] + +object MkHttpServer: + + def apply(using server: MkHttpServer): MkHttpServer = server + + given forAsyncLogger(using Logger[IO]): MkHttpServer = new: + + def newEmber(cfg: HttpServerConfig, httpApp: HttpApp[IO]): Resource[IO, Server] = EmberServerBuilder + .default[IO] + .withHost(cfg.host) + .withPort(cfg.port) + .withHttpApp(httpApp) + .build + .evalTap(showEmberBanner) + + private def showEmberBanner(s: Server): IO[Unit] = + Logger[IO].info(s"\n${Banner.mkString("\n")}\nHTTP Server started at ${s.address}") diff --git a/app/src/main/scala/model.scala b/app/src/main/scala/model.scala new file mode 100644 index 0000000..e2be36e --- /dev/null +++ b/app/src/main/scala/model.scala @@ -0,0 +1,107 @@ +package lila.fishnet + +import cats.syntax.all.* +import chess.format.{ Fen, Uci } +import chess.variant.Variant +import chess.variant.Variant.LilaKey +import io.circe.Decoder.decodeString +import io.circe.Encoder.encodeString +import io.circe.{ Codec, Decoder, Encoder } + +opaque type ClientKey = String +object ClientKey: + def apply(value: String): ClientKey = value + given Encoder[ClientKey] = encodeString + given Decoder[ClientKey] = decodeString + extension (ck: ClientKey) def value: String = ck + +opaque type BestMove = String +object BestMove: + def apply(value: String): BestMove = value + given Encoder[BestMove] = encodeString + given Decoder[BestMove] = decodeString + extension (bm: BestMove) + def value: String = bm + def uci: Option[Uci] = Uci(bm) + +opaque type WorkId = String +object WorkId: + def apply(value: String): WorkId = value + given Encoder[WorkId] = encodeString + given Decoder[WorkId] = decodeString + extension (bm: WorkId) def value: String = bm + +opaque type GameId = String +object GameId: + def apply(value: String): GameId = value + given Encoder[GameId] = encodeString + given Decoder[GameId] = decodeString + extension (bm: GameId) def value: String = bm + +object Fishnet: + + given Encoder[Fen.Epd] = Encoder.encodeString.contramap(_.value) + given Encoder[Variant] = Encoder.encodeString.contramap(_.name) + + case class Acquire(fishnet: Fishnet) derives Codec.AsObject + case class Fishnet(version: String, apikey: ClientKey) derives Codec.AsObject + case class PostMove(fishnet: Fishnet, move: Move) derives Codec.AsObject + case class Move(bestmove: BestMove) + + case class Work(id: WorkId, level: Int, clock: Option[Lila.Clock], `type`: String = "move") + derives Encoder.AsObject + + case class WorkResponse( + work: Work, + game_id: String, + position: Fen.Epd, + moves: String, + variant: Variant + ) derives Encoder.AsObject + +object Lila: + + case class Move(gameId: GameId, moves: String, uci: Uci): + def sign = moves.takeRight(20).replace(" ", "") + def write = s"$gameId $sign ${uci.uci}" + + case class Clock(wtime: Int, btime: Int, inc: Int) derives Codec.AsObject + + case class Request( + id: GameId, + initialFen: Fen.Epd, + variant: Variant, + moves: String, + level: Int, + clock: Option[Clock] + ) + + def readMoveReq(msg: String): Option[Request] = + msg.split(";", 6) match + case Array(gameId, levelS, clockS, variantS, initialFenS, moves) => + levelS.toIntOption.map: level => + val variant = chess.variant.Variant.orDefault(LilaKey(variantS)) + val initialFen = readFen(initialFenS).getOrElse(variant.initialFen) + val clock = readClock(clockS) + Request( + id = GameId(gameId), + initialFen = initialFen, + variant = variant, + moves = moves, + level = level, + clock = clock + ) + case _ => None + + def readFen(str: String): Option[Fen.Epd] = + if str.nonEmpty then Some(Fen.Epd(str)) else none + + def readClock(s: String): Option[Clock] = + s split ' ' match + case Array(ws, bs, incs) => + for + wtime <- ws.toIntOption + btime <- bs.toIntOption + inc <- incs.toIntOption + yield Clock(wtime, btime, inc) + case _ => None diff --git a/app/src/test/scala/Arbitraries.scala b/app/src/test/scala/Arbitraries.scala new file mode 100644 index 0000000..79f9ae9 --- /dev/null +++ b/app/src/test/scala/Arbitraries.scala @@ -0,0 +1,18 @@ +package lila.fishnet + +import org.scalacheck.{ Arbitrary, Gen } +import org.scalacheck.Arbitrary.* +import java.time.Instant +import Work.Acquired + +object Arbitraries: + + given Arbitrary[WorkId] = Arbitrary(Gen.stringOfN(8, Gen.alphaNumChar).map(WorkId.apply)) + given Arbitrary[ClientKey] = Arbitrary(Gen.uuid.map(_.toString).map(ClientKey.apply)) + given Arbitrary[Instant] = Arbitrary(Gen.choose(0, 300).map(Instant.now.minusSeconds(_))) + + given Arbitrary[Acquired] = Arbitrary: + for + key <- arbitrary[ClientKey] + at <- arbitrary[Instant] + yield Acquired(key, at) diff --git a/app/src/test/scala/CleaningJobTest.scala b/app/src/test/scala/CleaningJobTest.scala new file mode 100644 index 0000000..300495c --- /dev/null +++ b/app/src/test/scala/CleaningJobTest.scala @@ -0,0 +1,33 @@ +package lila.fishnet + +import cats.effect.IO +import cats.effect.kernel.{ Ref, Resource } +import cats.effect.testkit.TestControl +import java.time.Instant +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.noop.NoOpLogger +import weaver.* + +import scala.concurrent.duration.* + +object CleaningJobTest extends SimpleIOSuite: + + given Logger[IO] = NoOpLogger[IO] + + val times = (60 - 5) / 3 + 1 + test(s"cleaning run $times times in 1 minute"): + val res = for + ref <- Resource.eval(Ref.of[IO, Int](0)) + executor = createExcutor(ref) + _ <- WorkCleaningJob(executor).run().background + _ <- Resource.eval(IO.sleep(1.minute)) + count <- Resource.eval(ref.get) + yield count + TestControl.executeEmbed(res.use(count => IO(expect.same(count, times)))) + + def createExcutor(ref: Ref[IO, Int]): Executor = + new Executor: + def acquire(accquire: ClientKey) = IO.none + def move(workId: WorkId, fishnetKey: ClientKey, move: BestMove) = IO.unit + def add(work: Lila.Request) = IO.unit + def clean(before: Instant) = ref.update(_ + 1) diff --git a/app/src/test/scala/ExecutorTest.scala b/app/src/test/scala/ExecutorTest.scala new file mode 100644 index 0000000..7ecaf42 --- /dev/null +++ b/app/src/test/scala/ExecutorTest.scala @@ -0,0 +1,164 @@ +package lila.fishnet + +import cats.effect.IO +import cats.effect.kernel.Ref +import cats.syntax.all.* +import java.time.Instant +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.noop.NoOpLogger +import weaver.* + +import Helper.* + +object ExecutorTest extends SimpleIOSuite: + + given Logger[IO] = NoOpLogger[IO] + + val request: Lila.Request = Lila.Request( + id = GameId("1"), + initialFen = chess.variant.Standard.initialFen, + variant = chess.variant.Standard, + moves = "", + level = 1, + clock = None + ) + + val key = ClientKey("key") + + val validMove = BestMove("e2e4") + val invalidMove = BestMove("2e4") + + test("acquire when there is no work should return none"): + for + executor <- createExecutor() + acquired <- executor.acquire(key) + yield assert(acquired.isEmpty) + + test("acquire when there is work should return work.some"): + for + executor <- createExecutor() + _ <- executor.add(request) + acquiredOption <- executor.acquire(key) + acquired = acquiredOption.get + yield expect.same(acquired.request, request) + + test("acquire should return work in order"): + val requests = List(request, request.copy(id = GameId("2")), request.copy(id = GameId("3"))) + for + executor <- createExecutor() + _ <- requests.traverse(executor.add) + acquireds <- executor.acquire(key).replicateA(3) + ids = acquireds.map(_.get.request.id).mkString("") + yield expect.same(ids, "123") + + test("after acquire the only work, acquire again should return none"): + for + executor <- createExecutor() + _ <- executor.add(request) + _ <- executor.acquire(key) + acquired <- executor.acquire(key) + yield assert(acquired.isEmpty) + + test("post move after acquire should send move"): + for + ref <- Ref.of[IO, List[Lila.Move]](Nil) + client = createLilaClient(ref) + executor <- Executor.instance(client, noopMonitor, ExecutorConfig(300)) + _ <- executor.add(request) + acquired <- executor.acquire(key) + _ <- executor.move(acquired.get.id, key, validMove) + move <- ref.get.map(_.head) + yield expect.same(move, Lila.Move(request.id, request.moves, chess.format.Uci.Move("e2e4").get)) + + test("post move after timeout should not send move"): + for + ref <- Ref.of[IO, List[Lila.Move]](Nil) + client = createLilaClient(ref) + executor <- Executor.instance(client, noopMonitor, ExecutorConfig(300)) + _ <- executor.add(request) + acquired <- executor.acquire(key) + _ <- executor.clean(Instant.now.plusSeconds(37)) + _ <- executor.move(acquired.get.id, key, validMove) + moves <- ref.get + yield assert(moves.isEmpty) + + test("after timeout move should be able to acquired again"): + for + ref <- Ref.of[IO, List[Lila.Move]](Nil) + client = createLilaClient(ref) + executor <- Executor.instance(client, noopMonitor, ExecutorConfig(300)) + _ <- executor.add(request) + _ <- executor.acquire(key) + _ <- executor.clean(Instant.now.plusSeconds(37)) + acquired <- executor.acquire(key) + _ <- executor.move(acquired.get.id, key, validMove) + move <- ref.get.map(_.head) + yield expect.same(move, Lila.Move(request.id, request.moves, chess.format.Uci.Move("e2e4").get)) + + test("post an invalid move should not send move"): + for + ref <- Ref.of[IO, List[Lila.Move]](Nil) + client = createLilaClient(ref) + executor <- Executor.instance(client, noopMonitor, ExecutorConfig(300)) + _ <- executor.add(request) + acquired <- executor.acquire(key) + _ <- executor.move(acquired.get.id, key, invalidMove) + moves <- ref.get + yield assert(moves.isEmpty) + + test("after post an invalid move, acquire again should return work.some"): + for + ref <- Ref.of[IO, List[Lila.Move]](Nil) + client = createLilaClient(ref) + executor <- Executor.instance(client, noopMonitor, ExecutorConfig(300)) + _ <- executor.add(request) + acquired <- executor.acquire(key) + workId = acquired.get.id + _ <- executor.move(workId, key, invalidMove) + acquiredOption <- executor.acquire(key) + acquired = acquiredOption.get + yield expect.same(acquired, Work.RequestWithId(workId, request)) + + test("should not give up after 2 tries"): + for + ref <- Ref.of[IO, List[Lila.Move]](Nil) + client = createLilaClient(ref) + executor <- Executor.instance(client, noopMonitor, ExecutorConfig(300)) + _ <- executor.add(request) + _ <- (executor.acquire(key).flatMap(x => executor.move(x.get.id, key, invalidMove))).replicateA_(2) + acquired <- executor.acquire(key) + yield assert(acquired.isDefined) + + test("should give up after 3 tries"): + for + ref <- Ref.of[IO, List[Lila.Move]](Nil) + client = createLilaClient(ref) + executor <- Executor.instance(client, noopMonitor, ExecutorConfig(300)) + _ <- executor.add(request) + _ <- (executor.acquire(key).flatMap(x => executor.move(x.get.id, key, invalidMove))).replicateA_(3) + acquired <- executor.acquire(key) + yield assert(acquired.isEmpty) + + test("if moves reach max size it should clear all moves"): + for + executor <- createExecutor(ExecutorConfig(3)) + _ <- executor.add(request) + _ <- executor.add(request.copy(id = GameId("2"))) + _ <- executor.add(request.copy(id = GameId("3"))) + _ <- executor.add(request.copy(id = GameId("4"))) + acquired <- executor.acquire(key) + empty <- executor.acquire(ClientKey("key2")) + yield assert(acquired.isDefined && empty.isEmpty) + + def createExecutor(config: ExecutorConfig = ExecutorConfig(300)): IO[Executor] = + createLilaClient.flatMap(Executor.instance(_, noopMonitor, config)) + + def createLilaClient: IO[LilaClient] = + Ref + .of[IO, List[Lila.Move]](Nil) + .map(createLilaClient) + + def createLilaClient(ref: Ref[IO, List[Lila.Move]]): LilaClient = + new LilaClient: + def send(move: Lila.Move): IO[Unit] = + ref.update(_ :+ move) diff --git a/app/src/test/scala/Helper.scala b/app/src/test/scala/Helper.scala new file mode 100644 index 0000000..ea03b65 --- /dev/null +++ b/app/src/test/scala/Helper.scala @@ -0,0 +1,17 @@ +package lila.fishnet + +import cats.effect.IO + +object Helper: + + val noopMonitor: Monitor = + new Monitor: + def success(work: Work.Move): IO[Unit] = IO.unit + def failure(work: Work.Move, clientKey: ClientKey, e: Exception): IO[Unit] = IO.unit + def notFound(id: WorkId, clientKey: ClientKey): IO[Unit] = IO.unit + def notAcquired(work: Work.Move, clientKey: ClientKey): IO[Unit] = IO.unit + def updateSize(map: Map[WorkId, Work.Move]): IO[Unit] = IO.unit + + val noopLilaClient: LilaClient = + new LilaClient: + def send(move: Lila.Move): IO[Unit] = IO.unit diff --git a/app/src/test/scala/IntegrationTest.scala b/app/src/test/scala/IntegrationTest.scala new file mode 100644 index 0000000..36aa63f --- /dev/null +++ b/app/src/test/scala/IntegrationTest.scala @@ -0,0 +1,138 @@ +package lila.fishnet + +import cats.effect.IO +import cats.effect.kernel.Ref +import cats.effect.kernel.Resource +import cats.syntax.all.* +import com.comcast.ip4s.* +import com.dimafeng.testcontainers.GenericContainer +import io.chrisdavenport.rediculous.RedisPubSub +import io.circe.Json +import lila.fishnet.Fishnet.* +import lila.fishnet.http.HealthCheck.AppStatus +import org.http4s.* +import org.http4s.circe.CirceEntityDecoder.* +import org.http4s.circe.CirceEntityEncoder.* +import org.http4s.client.Client +import org.http4s.ember.client.EmberClientBuilder +import org.http4s.implicits.* +import org.testcontainers.containers.wait.strategy.Wait +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.noop.NoOpLogger +import scala.concurrent.duration.* +import weaver.* + +object IntegrationTest extends IOSuite: + + given Logger[IO] = NoOpLogger[IO] + + override type Res = AppResources + // start our server + override def sharedResource: Resource[IO, Res] = + for + redis <- RedisContainer.startRedis + config = testAppConfig(redis = redis) + res <- AppResources.instance(config.redis) + _ <- FishnetApp(res, config).run() + yield res + + def testAppConfig(redis: RedisConfig) = AppConfig( + server = HttpServerConfig(ip"0.0.0.0", port"9999", apiLogger = false), + redis = redis, + kamon = KamonConfig(enabled = false), + executor = ExecutorConfig(maxSize = 300) + ) + + test("health check should return healthy"): + client + .use( + _.expect[AppStatus]("http://localhost:9999/health") + .map(expect.same(_, AppStatus(true))) + ) + + test("let's play a game"): res => + val fishnet = Fishnet("2.7.2", ClientKey("secret-key")) + val fishnetAcquireRequest = Acquire(fishnet) + val bestMoves = List("e7e6", "d7d5", "d8d6") + val postMoves = bestMoves.map(m => PostMove(fishnet, Move(BestMove(m)))) + + val gameId = "CPzkP0tq" + val lilaRequests = + List( + "CPzkP0tq;1;;;;d2d4", + "CPzkP0tq;1;;;;d2d4 e7e6 h2h4", + "CPzkP0tq;1;;;;d2d4 e7e6 h2h4 d7d5 e2e3", + "CPzkP0tq;1;;;;d2d4 e7e6 h2h4 d7d5 e2e3 d8d6 f1d3" + ) + + val expectedMoves = List( + s"$gameId d2d4 e7e6", + s"$gameId d2d4e7e6h2h4 d7d5", + s"$gameId e7e6h2h4d7d5e2e3 d8d6" + ) + + def simulateFishnetClient(client: Client[IO]) = + client + .expect[Json](acquireRequest(fishnetAcquireRequest)) + .map(toWorkId) + .flatMap: workId => + postMoves.foldM[IO, WorkId](workId): (workId, move) => + client.expect[Json](bestMoveRequest(workId, move)).map(toWorkId) + + def toWorkId(json: Json) = + WorkId(json.hcursor.downField("work").downField("id").as[String].toOption.get) + + // sleep to make sure that moves are in order + def scenario(client: Client[IO]) = + lilaRequests.traverse_(sendWorkRequest(res, _) >> IO.sleep(100.millis)) >> simulateFishnetClient(client) + + val x = for + client <- client + ref <- Resource.eval(Ref.of[IO, List[String]](Nil)) + _ <- RedisFishnetInListener(res.redisPubsub, ref).background + _ <- Resource.eval(scenario(client)) + x <- Resource.eval(ref.get) + yield x + x.use(x => IO.pure(expect.same(x, expectedMoves))) + + def acquireRequest(acquire: Acquire) = Request[IO]( + method = Method.POST, + uri = uri"http://localhost:9999/fishnet/acquire" + ).withEntity(acquire) + + def bestMoveRequest(workId: WorkId, move: PostMove) = Request[IO]( + method = Method.POST, + uri = uri"http://localhost:9999/fishnet/move" / workId.value + ).withEntity(move) + + private def sendWorkRequest(res: AppResources, work: String): IO[Unit] = + res.redisPubsub.publish("fishnet-out", work).void + + private def client = EmberClientBuilder.default[IO].build + +object RedisFishnetInListener: + def apply(pubsub: RedisPubSub[IO], ref: Ref[IO, List[String]]): IO[Unit] = + pubsub.subscribe( + "fishnet-in", + msg => ref.update(_ :+ msg.message) + ) *> pubsub.runMessages + +object RedisContainer: + + private val REDIS_PORT = 6379 + private val redisContainer = + val start = IO( + GenericContainer( + "redis:6-alpine", + exposedPorts = Seq(REDIS_PORT), + waitStrategy = Wait.forListeningPort() + ) + ) + .flatTap(cont => IO(cont.start())) + Resource.make(start)(cont => IO(cont.stop())) + + def parseConfig(redis: GenericContainer): RedisConfig = + RedisConfig(Host.fromString(redis.host).get, Port.fromInt(redis.mappedPort(REDIS_PORT)).get) + + def startRedis: Resource[IO, RedisConfig] = + redisContainer.map(parseConfig) diff --git a/app/src/test/scala/http/FishnetRoutesTest.scala b/app/src/test/scala/http/FishnetRoutesTest.scala new file mode 100644 index 0000000..4427018 --- /dev/null +++ b/app/src/test/scala/http/FishnetRoutesTest.scala @@ -0,0 +1,97 @@ +package lila.fishnet +package http + +import cats.effect.IO +import cats.syntax.all.* +import io.circe.* +import io.circe.literal.* +import java.time.Instant +import org.http4s.* +import org.http4s.implicits.* +import org.http4s.circe.* +import weaver.* + +object FishnetRoutesTest extends SimpleIOSuite: + + val acqurieRequestBody = json"""{ + "fishnet": { + "version": "1.0.0", + "apikey": "apikey" + } + }""" + + val postMoveRequestBody = json"""{ + "fishnet": { + "version": "1.0.0", + "apikey": "apikey" + }, + "move": { + "bestmove": "e2e4" + } + }""" + + val workResponse: Json = json"""{ + "work": { + "id": "workid", + "level": 1, + "clock": { + "wtime": 600, + "btime": 600, + "inc": 0 + }, + "type": "move" + }, + "game_id": "gameid", + "position": "rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR", + "moves": "", + "variant": "Standard" + }""" + + val requestWithId = Work.RequestWithId( + id = WorkId("workid"), + request = Lila.Request( + id = GameId("gameid"), + initialFen = chess.format.Fen.Epd("rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR"), + moves = "", + variant = chess.variant.Standard, + level = 1, + clock = Some(Lila.Clock(wtime = 600, btime = 600, inc = 0)) + ) + ) + + test("POST /fishnet/acquire should return work response"): + val executor = createExecutor() + val routes = createRoutes(executor) + val req = Request[IO](Method.POST, uri"/fishnet/acquire").withEntity(acqurieRequestBody) + exepectHttpBodyAndStatus(routes, req)(expectedBody = workResponse, expectedStatus = Status.Ok) + + test("POST /fishnet/move should also return work response"): + val executor = createExecutor() + val routes = createRoutes(executor) + val req = Request[IO](Method.POST, uri"/fishnet/move/workid").withEntity(postMoveRequestBody) + exepectHttpBodyAndStatus(routes, req)(expectedBody = workResponse, expectedStatus = Status.Ok) + + def exepectHttpBodyAndStatus(routes: HttpRoutes[IO], req: Request[IO])( + expectedBody: Json, + expectedStatus: Status + ) = + routes + .run(req) + .value + .flatMap: + case Some(resp) => + resp.asJson.map: + expect.same(_, expectedBody) `and` expect.same(resp.status, expectedStatus) + case _ => IO.pure(failure("expected response but not found")) + + def createRoutes(executor: Executor): HttpRoutes[IO] = + FishnetRoutes(executor).routes + + def createExecutor(): Executor = + new Executor: + def acquire(key: ClientKey) = IO.pure(requestWithId.some) + def move(id: WorkId, key: ClientKey, move: BestMove): IO[Unit] = + if id == requestWithId.id then IO.unit + else IO.raiseError(new Exception("invalid work id")) + def add(request: Lila.Request): IO[Unit] = IO.unit + def clean(before: Instant) = IO.unit diff --git a/build.sbt b/build.sbt index 5868cbb..966200c 100644 --- a/build.sbt +++ b/build.sbt @@ -1,67 +1,53 @@ -name := "lila-fishnet" - -version := "2.0" - -maintainer := "lichess.org" - -lazy val root = Project("lila-fishnet", file(".")) - .enablePlugins(PlayScala, PlayNettyServer) - -scalaVersion := "2.13.12" -Compile / resourceDirectory := baseDirectory.value / "conf" - -val kamonVersion = "2.5.11" - -libraryDependencies += "io.lettuce" % "lettuce-core" % "6.2.6.RELEASE" -libraryDependencies += "io.netty" % "netty-transport-native-epoll" % "4.1.101.Final" classifier "linux-x86_64" -libraryDependencies += "joda-time" % "joda-time" % "2.12.5" - -libraryDependencies += "org.lichess" %% "scalachess" % "10.6.3" -libraryDependencies += "io.kamon" %% "kamon-core" % kamonVersion -libraryDependencies += "io.kamon" %% "kamon-influxdb" % kamonVersion -libraryDependencies += "io.kamon" %% "kamon-system-metrics" % kamonVersion - -resolvers += "lila-maven" at "https://raw.githubusercontent.com/ornicar/lila-maven/master" - -scalacOptions ++= Seq( - "-explaintypes", - "-feature", - "-language:higherKinds", - "-language:implicitConversions", - "-language:postfixOps", - "-Ymacro-annotations", - // Warnings as errors! - // "-Xfatal-warnings", - // Linting options - "-unchecked", - "-Xcheckinit", - "-Xlint:adapted-args", - "-Xlint:constant", - "-Xlint:delayedinit-select", - "-Xlint:deprecation", - "-Xlint:inaccessible", - "-Xlint:infer-any", - "-Xlint:missing-interpolator", - "-Xlint:nullary-unit", - "-Xlint:option-implicit", - "-Xlint:package-object-classes", - "-Xlint:poly-implicit-overload", - "-Xlint:private-shadow", - "-Xlint:stars-align", - "-Xlint:type-parameter-shadow", - "-Wdead-code", - "-Wextra-implicit", - "-Wnumeric-widen", - "-Wunused:imports", - "-Wunused:locals", - "-Wunused:patvars", - "-Wunused:privates", - "-Wunused:implicits", - "-Wunused:params" - /* "-Wvalue-discard" */ +import Dependencies.* + +inThisBuild( + Seq( + scalaVersion := "3.3.1", + versionScheme := Some("early-semver"), + version := "3.0", + run / fork := true + ) ) -javaOptions ++= Seq("-Xms64m", "-Xmx128m") - -Compile / doc / sources := Seq.empty -Compile / packageDoc / publishArtifact := false +lazy val app = project + .in(file("app")) + .settings( + name := "lila-fishnet", + scalacOptions -= "-Xfatal-warnings", + scalacOptions ++= Seq("-source:future", "-rewrite", "-indent", "-explain", "-Wunused:all", "-release:21"), + resolvers ++= Seq(Dependencies.lilaMaven), + libraryDependencies ++= Seq( + catsCore, + catsEffect, + chess, + circeCore, + cirisCore, + cirisHtt4s, + http4sCirce, + http4sDsl, + http4sServer, + kamonCore, + kamonInflux, + kamonSystemMetrics, + log4Cats, + logback, + redis, + circeLiteral, + chessTestKit, + munit, + munitScalacheck, + weaver, + weaverScalaCheck, + testContainers, + log4CatsNoop, + http4sClient, + catsEffectTestKit, + ), + javaAgents += kamonAgent, + ) + .enablePlugins(JavaAppPackaging, JavaAgent) + + +lazy val root = project + .in(file(".")) + .aggregate(app) diff --git a/conf/logback.xml b/conf/logback.xml deleted file mode 100644 index b8b5f27..0000000 --- a/conf/logback.xml +++ /dev/null @@ -1,33 +0,0 @@ - - - - - - logs/application.log - - %date [%level] from %logger in %thread - %message%n%xException - - - - - - %coloredLevel %logger{15} - %message%n%xException{10} - - - - - - - - - - - - - - - - - - - diff --git a/deploy.sh b/deploy.sh index 92a009d..8515c9d 100755 --- a/deploy.sh +++ b/deploy.sh @@ -24,8 +24,8 @@ RSYNC_OPTIONS=" \ --exclude RUNNING_PID \ --exclude '.git/'" -stage="target/universal/stage" -include="$stage/bin $stage/lib $stage/conf" +stage="app/target/universal/stage" +include="$stage/bin $stage/kanela-agent $stage/lib" rsync_command="rsync $RSYNC_OPTIONS $include $REMOTE:$REMOTE_DIR" echo "$rsync_command" $rsync_command diff --git a/project/Dependencies.scala b/project/Dependencies.scala new file mode 100644 index 0000000..556dd77 --- /dev/null +++ b/project/Dependencies.scala @@ -0,0 +1,56 @@ +import sbt.* + +object Dependencies { + + val lilaMaven = "lila-maven" at "https://raw.githubusercontent.com/lichess-org/lila-maven/master" + + object V { + val circe = "0.14.6" + val http4s = "0.23.23" + val ciris = "3.4.0" + val kamon = "2.5.11" + val kamonAgent = "1.0.16" + val chess = "15.6.11" + val munit = "1.0.0-M8" + val catsEffect = "3.5.2" + } + + def http4s(artifact: String) = "org.http4s" %% s"http4s-$artifact" % V.http4s + def circe(artifact: String) = "io.circe" %% s"circe-$artifact" % V.circe + + val chess = "org.lichess" %% "scalachess" % V.chess + + val catsCore = "org.typelevel" %% "cats-core" % "2.10.0" + val catsEffect = "org.typelevel" %% "cats-effect" % V.catsEffect + + val circeCore = circe("core") + val circeLiteral = circe("literal") % Test + + val cirisCore = "is.cir" %% "ciris" % V.ciris + val cirisHtt4s = "is.cir" %% "ciris-http4s" % V.ciris + val cirisRefined = "is.cir" %% "ciris-refined" % V.ciris + + val kamonCore = "io.kamon" %% "kamon-core" % V.kamon + val kamonInflux = "io.kamon" %% "kamon-influxdb" % V.kamon + val kamonSystemMetrics = "io.kamon" %% "kamon-system-metrics" % V.kamon + val kamonAgent = "io.kamon" % "kanela-agent" % V.kamonAgent + + val http4sDsl = http4s("dsl") + val http4sServer = http4s("ember-server") + val http4sClient = http4s("ember-client") % Test + val http4sCirce = http4s("circe") + + val log4Cats = "org.typelevel" %% "log4cats-slf4j" % "2.6.0" + val logback = "ch.qos.logback" % "logback-classic" % "1.4.11" + + val redis = "io.chrisdavenport" %% "rediculous" % "0.5.1" + + val chessTestKit = "org.lichess" %% "scalachess-test-kit" % V.chess % Test + val log4CatsNoop = "org.typelevel" %% "log4cats-noop" % "2.6.0" % Test + val munit = "org.scalameta" %% "munit" % V.munit % Test + val munitScalacheck = "org.scalameta" %% "munit-scalacheck" % V.munit % Test + val testContainers = "com.dimafeng" %% "testcontainers-scala-core" % "0.41.0" % Test + val weaver = "com.disneystreaming" %% "weaver-cats" % "0.8.3" % Test + val weaverScalaCheck = "com.disneystreaming" %% "weaver-scalacheck" % "0.8.3" % Test + val catsEffectTestKit = "org.typelevel" %% "cats-effect-testkit" % V.catsEffect % Test +} diff --git a/project/plugins.sbt b/project/plugins.sbt index 404a606..6119efe 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,6 +1,5 @@ -resolvers += Resolver.url( - "lila-maven-sbt", - url("https://raw.githubusercontent.com/ornicar/lila-maven/master") -)(Resolver.ivyStylePatterns) -addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.8.18-lila_1.21") -addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2") +addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.9.16") +addSbtPlugin("com.github.sbt" % "sbt-javaagent" % "0.1.8") +addSbtPlugin("io.kamon" % "sbt-kanela-runner" % "2.0.14") +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2") +addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.0")