diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 7603a398..6e597d6b 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -31,9 +31,9 @@ netty { threads = 0 # auto flush { - step = 100 # target number of messages to flush per interval - interval = 1.millis # interval between flush cycles - max-delay = 500.millis # exceed step when this threshold is passed + step = 100 # minimum number of channels to flush per interval + interval-millis = 1 # interval between flush cycles + max-delay-millis = 500 # max flush step targets this threshold if passed } } diff --git a/src/main/scala/LilaWs.scala b/src/main/scala/LilaWs.scala index 2187a02c..3473bba2 100644 --- a/src/main/scala/LilaWs.scala +++ b/src/main/scala/LilaWs.scala @@ -33,7 +33,6 @@ object LilaWs extends App: lazy val router = wire[Router] lazy val seenAt = wire[SeenAtUpdate] lazy val auth = wire[Auth] - lazy val workerLoop = wire[netty.WorkerLoop] lazy val nettyServer = wire[netty.NettyServer] lazy val monitor = wire[Monitor] diff --git a/src/main/scala/netty/ActorChannelConnector.scala b/src/main/scala/netty/ActorChannelConnector.scala index 3901049a..d33b8d01 100644 --- a/src/main/scala/netty/ActorChannelConnector.scala +++ b/src/main/scala/netty/ActorChannelConnector.scala @@ -5,14 +5,29 @@ import io.netty.buffer.Unpooled import io.netty.channel.* import io.netty.handler.codec.http.websocketx.* import io.netty.util.concurrent.{ Future as NettyFuture, GenericFutureListener } -import org.apache.pekko.actor.typed.ActorRef +import org.apache.pekko.actor.typed.{ ActorRef, Scheduler } import lila.ws.Controller.Endpoint import lila.ws.netty.ProtocolHandler.key -final private class ActorChannelConnector(clients: ActorRef[Clients.Control], loop: WorkerLoop)(using - Executor +final private class ActorChannelConnector( + clients: ActorRef[Clients.Control], + config: com.typesafe.config.Config, + settings: util.SettingStore +)(using + scheduler: Scheduler, + ec: Executor ): + private val step = settings.makeSetting("netty.flush.step", config.getInt("netty.flush.step")) + private val interval = + settings.makeSetting("netty.flush.interval-millis", config.getInt("netty.flush.interval-millis")) + private val maxDelay = + settings.makeSetting("netty.flush.max-delay-millis", config.getInt("netty.flush.max-delay-millis")) + + private val flushQ = new java.util.concurrent.ConcurrentLinkedQueue[Channel]() + + private var future = + scheduler.scheduleOnce(1 second, () => flush()) def apply(endpoint: Endpoint, channel: Channel): Unit = val clientPromise = Promise[Client]() @@ -31,14 +46,29 @@ final private class ActorChannelConnector(clients: ActorRef[Clients.Control], lo } private def emitToChannel(channel: Channel, withFlush: Boolean): ClientEmit = - msg => - msg.match - case ipc.ClientIn.Disconnect(reason) => - channel - .writeAndFlush(CloseWebSocketFrame(WebSocketCloseStatus(4010, reason))) - .addListener(ChannelFutureListener.CLOSE) - case ipc.ClientIn.RoundPingFrameNoFlush => - channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) } - case in => - if withFlush then channel.writeAndFlush(TextWebSocketFrame(in.write)) - else loop.writeShaped(channel, TextWebSocketFrame(in.write)) + case ipc.ClientIn.Disconnect(reason) => + channel + .writeAndFlush(CloseWebSocketFrame(WebSocketCloseStatus(4010, reason))) + .addListener(ChannelFutureListener.CLOSE) + case ipc.ClientIn.RoundPingFrameNoFlush => + channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) } + case in if withFlush || interval.get <= 0 => + channel.writeAndFlush(TextWebSocketFrame(in.write)) + case in => + channel.write(TextWebSocketFrame(in.write)) + flushQ.add(channel) + + private def maxDelayFactor: Double = interval.get.toDouble / maxDelay.get + + private def flush(): Unit = + var channelsToFlush = step.get.atLeast((flushQ.size * maxDelayFactor).toInt) + + while channelsToFlush > 0 do + Option(flushQ.poll()) match + case Some(channel) => + if channel.isOpen then channel.eventLoop().execute(() => channel.flush()) + channelsToFlush -= 1 + case _ => + channelsToFlush = 0 + + if !future.isCancelled then future = scheduler.scheduleOnce(interval.get.millis, () => flush()) diff --git a/src/main/scala/netty/NettyServer.scala b/src/main/scala/netty/NettyServer.scala index f78bf4e0..59de5527 100644 --- a/src/main/scala/netty/NettyServer.scala +++ b/src/main/scala/netty/NettyServer.scala @@ -4,28 +4,35 @@ package netty import com.typesafe.config.Config import com.typesafe.scalalogging.Logger import io.netty.bootstrap.ServerBootstrap +import io.netty.channel.epoll.{ EpollEventLoopGroup, EpollServerSocketChannel } +import io.netty.channel.kqueue.{ KQueueEventLoopGroup, KQueueServerSocketChannel } import io.netty.channel.{ Channel, ChannelInitializer } import io.netty.handler.codec.http.* +import org.apache.pekko.actor.typed.Scheduler final class NettyServer( clients: ClientSystem, router: Router, config: Config, - workerLoop: WorkerLoop -)(using Executor): - + settings: util.SettingStore +)(using Executor, Scheduler): private val logger = Logger(getClass) def start(): Unit = logger.info("Start") - val port = config.getInt("http.port") + val port = config.getInt("http.port") + val threads = config.getInt("netty.threads") + val (parent, workers, channelClass) = + if System.getProperty("os.name").toLowerCase.startsWith("mac") then + (new KQueueEventLoopGroup(1), new KQueueEventLoopGroup(threads), classOf[KQueueServerSocketChannel]) + else (new EpollEventLoopGroup(1), new EpollEventLoopGroup(threads), classOf[EpollServerSocketChannel]) try val boot = new ServerBootstrap boot - .group(workerLoop.parentGroup, workerLoop.group) - .channel(workerLoop.channelClass) + .group(parent, workers) + .channel(channelClass) .childHandler( new ChannelInitializer[Channel]: override def initChannel(ch: Channel): Unit = @@ -33,7 +40,7 @@ final class NettyServer( pipeline.addLast(HttpServerCodec()) pipeline.addLast(HttpObjectAggregator(4096)) pipeline.addLast(RequestHandler(router)) - pipeline.addLast(ProtocolHandler(ActorChannelConnector(clients, workerLoop))) + pipeline.addLast(ProtocolHandler(ActorChannelConnector(clients, config, settings))) pipeline.addLast(FrameHandler()) ) @@ -44,4 +51,6 @@ final class NettyServer( server.closeFuture().sync() logger.info(s"Closed $port") - finally workerLoop.shutdown() + finally + parent.shutdownGracefully() + workers.shutdownGracefully() diff --git a/src/main/scala/netty/WorkerLoop.scala b/src/main/scala/netty/WorkerLoop.scala deleted file mode 100644 index 866dff02..00000000 --- a/src/main/scala/netty/WorkerLoop.scala +++ /dev/null @@ -1,48 +0,0 @@ -package lila.ws -package netty - -import com.typesafe.config.Config -import io.netty.channel.epoll.{ EpollEventLoopGroup, EpollServerSocketChannel } -import io.netty.channel.kqueue.{ KQueueEventLoopGroup, KQueueServerSocketChannel } -import io.netty.channel.{ Channel, EventLoopGroup } -import io.netty.handler.codec.http.websocketx.TextWebSocketFrame - -import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit } - -final class WorkerLoop(config: Config)(using Executor): - private val isMacOS = System.getProperty("os.name").toLowerCase.startsWith("mac") - private val step = config.getInt("netty.flush.step") - private val interval: Long = config.getDuration("netty.flush.interval").toNanos - private val maxDelay: Long = config.getDuration("netty.flush.max-delay").toNanos - private val maxDelayFactor: Double = interval.toDouble / maxDelay - private val flushQ = new ConcurrentLinkedQueue[Channel]() - - val channelClass = if isMacOS then classOf[KQueueServerSocketChannel] else classOf[EpollServerSocketChannel] - val parentGroup = makeGroup(1) - val group = makeGroup(config.getInt("netty.threads")) - - private val f = group.scheduleAtFixedRate(() => flush(), 1_000_000_000L, interval, TimeUnit.NANOSECONDS) - - def writeShaped(channel: Channel, frame: TextWebSocketFrame): Unit = - channel.write(frame) - flushQ.add(channel) - - def shutdown(): Unit = - f.cancel(false) - parentGroup.shutdownGracefully() - group.shutdownGracefully() - - private def flush(): Unit = - var channelsToFlush = step.atLeast((flushQ.size * maxDelayFactor).toInt) - - while channelsToFlush > 0 do - Option(flushQ.poll()) match - case Some(channel) => - if channel.isOpen then channel.eventLoop().execute(() => channel.flush()) - channelsToFlush -= 1 - case _ => - channelsToFlush = 0 - - private def makeGroup(n: Int): EventLoopGroup = - if isMacOS then new KQueueEventLoopGroup(n) - else new EpollEventLoopGroup(n)