diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 96aa47aa..7603a398 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -4,8 +4,6 @@ study.mongo.uri = ${mongo.uri} yolo.mongo.uri = ${mongo.uri} redis.uri = "redis://127.0.0.1" csrf.origin = "http://localhost:9663" -netty.native = true -netty.threads = 0 # auto cookie.name = "lila2" socialGraph { @@ -29,6 +27,16 @@ reactivemongo { } } +netty { + threads = 0 # auto + + flush { + step = 100 # target number of messages to flush per interval + interval = 1.millis # interval between flush cycles + max-delay = 500.millis # exceed step when this threshold is passed + } +} + storm.secret = "somethingElseInProd" oauth.secret = "somethingElseInProd" diff --git a/src/main/scala/Controller.scala b/src/main/scala/Controller.scala index 810769ef..e456737d 100644 --- a/src/main/scala/Controller.scala +++ b/src/main/scala/Controller.scala @@ -301,7 +301,8 @@ object Controller: val behavior: ClientEmit => ClientBehavior, val rateLimit: RateLimit, val header: RequestHeader, - val emitCounter: kamon.metric.Counter + val emitCounter: kamon.metric.Counter, + val name: String ) def endpoint( name: String, @@ -320,7 +321,8 @@ object Controller: name = name ), header, - Monitor.clientInCounter(name) + Monitor.clientInCounter(name), + name ) type ResponseSync = Either[HttpResponseStatus, Endpoint] diff --git a/src/main/scala/LilaWs.scala b/src/main/scala/LilaWs.scala index f006e551..9cd6d878 100644 --- a/src/main/scala/LilaWs.scala +++ b/src/main/scala/LilaWs.scala @@ -32,6 +32,7 @@ object LilaWs extends App: lazy val router = wire[Router] lazy val seenAt = wire[SeenAtUpdate] lazy val auth = wire[Auth] + lazy val workerLoop = wire[netty.WorkerLoop] lazy val nettyServer = wire[netty.NettyServer] lazy val monitor = wire[Monitor] diff --git a/src/main/scala/Monitor.scala b/src/main/scala/Monitor.scala index 4537af5f..bb386569 100644 --- a/src/main/scala/Monitor.scala +++ b/src/main/scala/Monitor.scala @@ -24,10 +24,9 @@ final class Monitor( val version = System.getProperty("java.version") val memory = Runtime.getRuntime.maxMemory() / 1024 / 1024 - val native = config.getBoolean("netty.native") val useKamon = config.getString("kamon.influxdb.hostname").nonEmpty - logger.info(s"lila-ws 3.0 netty native=$native kamon=$useKamon") + logger.info(s"lila-ws 3.0 netty kamon=$useKamon") logger.info(s"Java version: $version, memory: ${memory}MB") if useKamon then kamon.Kamon.init() diff --git a/src/main/scala/netty/ActorChannelConnector.scala b/src/main/scala/netty/ActorChannelConnector.scala index 3108e19c..053496f7 100644 --- a/src/main/scala/netty/ActorChannelConnector.scala +++ b/src/main/scala/netty/ActorChannelConnector.scala @@ -10,12 +10,15 @@ import org.apache.pekko.actor.typed.ActorRef import lila.ws.Controller.Endpoint import lila.ws.netty.ProtocolHandler.key -final private class ActorChannelConnector(clients: ActorRef[Clients.Control])(using Executor): +final private class ActorChannelConnector(clients: ActorRef[Clients.Control], loop: WorkerLoop)(using + Executor +): def apply(endpoint: Endpoint, channel: Channel): Unit = val clientPromise = Promise[Client]() channel.attr(key.client).set(clientPromise.future) - val channelEmit = emitToChannel(channel) + val channelEmit: ClientEmit = + emitToChannel(channel, withFlush = endpoint.name == "round/play") val monitoredEmit: ClientEmit = (msg: ipc.ClientIn) => endpoint.emitCounter.increment() channelEmit(msg) @@ -27,12 +30,19 @@ final private class ActorChannelConnector(clients: ActorRef[Clients.Control])(us clients ! Clients.Control.Stop(client) } - private def emitToChannel(channel: Channel): ClientEmit = - case ipc.ClientIn.Disconnect(reason) => - channel - .writeAndFlush(CloseWebSocketFrame(WebSocketCloseStatus(4010, reason))) - .addListener(ChannelFutureListener.CLOSE) - case ipc.ClientIn.RoundPingFrameNoFlush => - channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) } - case in => - channel.writeAndFlush(TextWebSocketFrame(in.write)) + private inline def emitDisconnect(inline channel: Channel, inline reason: String): Unit = + channel + .writeAndFlush(CloseWebSocketFrame(WebSocketCloseStatus(4010, reason))) + .addListener(ChannelFutureListener.CLOSE) + + private inline def emitPingFrame(inline channel: Channel): Unit = + channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) } + + private def emitToChannel(channel: Channel, withFlush: Boolean): ClientEmit = + msg => + msg.match + case ipc.ClientIn.Disconnect(reason) => emitDisconnect(channel, reason) + case ipc.ClientIn.RoundPingFrameNoFlush => emitPingFrame(channel) + case in => + if withFlush then channel.writeAndFlush(TextWebSocketFrame(in.write)) + else loop.writeShaped(channel, TextWebSocketFrame(in.write)) diff --git a/src/main/scala/netty/NettyServer.scala b/src/main/scala/netty/NettyServer.scala index cbfaa8ac..f78bf4e0 100644 --- a/src/main/scala/netty/NettyServer.scala +++ b/src/main/scala/netty/NettyServer.scala @@ -4,54 +4,28 @@ package netty import com.typesafe.config.Config import com.typesafe.scalalogging.Logger import io.netty.bootstrap.ServerBootstrap -import io.netty.channel.epoll.{ EpollEventLoopGroup, EpollServerSocketChannel } -import io.netty.channel.kqueue.{ KQueueEventLoopGroup, KQueueServerSocketChannel } -import io.netty.channel.nio.NioEventLoopGroup -import io.netty.channel.socket.nio.NioServerSocketChannel import io.netty.channel.{ Channel, ChannelInitializer } import io.netty.handler.codec.http.* final class NettyServer( clients: ClientSystem, router: Router, - config: Config + config: Config, + workerLoop: WorkerLoop )(using Executor): - private val connector = ActorChannelConnector(clients) - private val logger = Logger(getClass) + private val logger = Logger(getClass) def start(): Unit = logger.info("Start") - val port = config.getInt("http.port") - val workerThreads = config.getInt("netty.threads") - - val (bossGroup, workerGroup, channelClz) = - if !config.getBoolean("netty.native") then - ( - NioEventLoopGroup(1), - NioEventLoopGroup(workerThreads), - classOf[NioServerSocketChannel] - ) - else if System.getProperty("os.name").toLowerCase.startsWith("mac") then - ( - KQueueEventLoopGroup(1), - KQueueEventLoopGroup(workerThreads), - classOf[KQueueServerSocketChannel] - ) - else - ( - EpollEventLoopGroup(1), - EpollEventLoopGroup(workerThreads), - classOf[EpollServerSocketChannel] - ) - + val port = config.getInt("http.port") try val boot = new ServerBootstrap boot - .group(bossGroup, workerGroup) - .channel(channelClz) + .group(workerLoop.parentGroup, workerLoop.group) + .channel(workerLoop.channelClass) .childHandler( new ChannelInitializer[Channel]: override def initChannel(ch: Channel): Unit = @@ -59,7 +33,7 @@ final class NettyServer( pipeline.addLast(HttpServerCodec()) pipeline.addLast(HttpObjectAggregator(4096)) pipeline.addLast(RequestHandler(router)) - pipeline.addLast(ProtocolHandler(connector)) + pipeline.addLast(ProtocolHandler(ActorChannelConnector(clients, workerLoop))) pipeline.addLast(FrameHandler()) ) @@ -70,6 +44,4 @@ final class NettyServer( server.closeFuture().sync() logger.info(s"Closed $port") - finally - bossGroup.shutdownGracefully() - workerGroup.shutdownGracefully() + finally workerLoop.shutdown() diff --git a/src/main/scala/netty/WorkerLoop.scala b/src/main/scala/netty/WorkerLoop.scala new file mode 100644 index 00000000..2af3e64f --- /dev/null +++ b/src/main/scala/netty/WorkerLoop.scala @@ -0,0 +1,43 @@ +package lila.ws +package netty + +import com.typesafe.config.Config +import io.netty.channel.{ Channel, EventLoopGroup } +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame +import io.netty.channel.epoll.{ EpollEventLoopGroup, EpollServerSocketChannel } +import io.netty.channel.kqueue.{ KQueueEventLoopGroup, KQueueServerSocketChannel } +import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit } + +final class WorkerLoop(config: Config)(using Executor): + private val isMacOS = System.getProperty("os.name").toLowerCase.startsWith("mac") + private val step = config.getInt("netty.flush.step") + private val interval: Long = config.getDuration("netty.flush.interval").toNanos + private val maxDelay: Long = config.getDuration("netty.flush.max-delay").toNanos + private val maxDelayFactor: Double = interval.toDouble / maxDelay + private val flushQ = new ConcurrentLinkedQueue[Channel]() + + val channelClass = if isMacOS then classOf[KQueueServerSocketChannel] else classOf[EpollServerSocketChannel] + val parentGroup = makeGroup(1) + val group = makeGroup(config.getInt("netty.threads")) + + private val f = group.scheduleAtFixedRate(() => flush(), 1_000_000_000L, interval, TimeUnit.NANOSECONDS) + + def writeShaped(channel: Channel, frame: TextWebSocketFrame): Unit = + channel.write(frame) + flushQ.add(channel) + + def shutdown(): Unit = + f.cancel(false) + parentGroup.shutdownGracefully() + group.shutdownGracefully() + + private def flush(): Unit = + val channelsToFlush = step.atLeast((flushQ.size * maxDelayFactor).toInt) + val iterator = flushQ.iterator() + for count <- 0 until channelsToFlush if iterator.hasNext do + iterator.next().flush() + iterator.remove() + + private def makeGroup(n: Int): EventLoopGroup = + if isMacOS then new KQueueEventLoopGroup(n) + else new EpollEventLoopGroup(n)