From 7de46e3f940f4f4deca865f837d78c52ccf511b6 Mon Sep 17 00:00:00 2001 From: Jonathan Gamble Date: Mon, 23 Sep 2024 17:01:38 -0500 Subject: [PATCH 01/17] shape outbound traffic on endpoints other than round/play --- src/main/resources/application.conf | 12 ++++- src/main/scala/Controller.scala | 6 ++- src/main/scala/LilaWs.scala | 1 + src/main/scala/Monitor.scala | 3 +- .../scala/netty/ActorChannelConnector.scala | 32 +++++++++----- src/main/scala/netty/NettyServer.scala | 44 ++++--------------- src/main/scala/netty/WorkerLoop.scala | 43 ++++++++++++++++++ 7 files changed, 88 insertions(+), 53 deletions(-) create mode 100644 src/main/scala/netty/WorkerLoop.scala diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 96aa47aa..7603a398 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -4,8 +4,6 @@ study.mongo.uri = ${mongo.uri} yolo.mongo.uri = ${mongo.uri} redis.uri = "redis://127.0.0.1" csrf.origin = "http://localhost:9663" -netty.native = true -netty.threads = 0 # auto cookie.name = "lila2" socialGraph { @@ -29,6 +27,16 @@ reactivemongo { } } +netty { + threads = 0 # auto + + flush { + step = 100 # target number of messages to flush per interval + interval = 1.millis # interval between flush cycles + max-delay = 500.millis # exceed step when this threshold is passed + } +} + storm.secret = "somethingElseInProd" oauth.secret = "somethingElseInProd" diff --git a/src/main/scala/Controller.scala b/src/main/scala/Controller.scala index 810769ef..e456737d 100644 --- a/src/main/scala/Controller.scala +++ b/src/main/scala/Controller.scala @@ -301,7 +301,8 @@ object Controller: val behavior: ClientEmit => ClientBehavior, val rateLimit: RateLimit, val header: RequestHeader, - val emitCounter: kamon.metric.Counter + val emitCounter: kamon.metric.Counter, + val name: String ) def endpoint( name: String, @@ -320,7 +321,8 @@ object Controller: name = name ), header, - Monitor.clientInCounter(name) + Monitor.clientInCounter(name), + name ) type ResponseSync = Either[HttpResponseStatus, Endpoint] diff --git a/src/main/scala/LilaWs.scala b/src/main/scala/LilaWs.scala index f006e551..9cd6d878 100644 --- a/src/main/scala/LilaWs.scala +++ b/src/main/scala/LilaWs.scala @@ -32,6 +32,7 @@ object LilaWs extends App: lazy val router = wire[Router] lazy val seenAt = wire[SeenAtUpdate] lazy val auth = wire[Auth] + lazy val workerLoop = wire[netty.WorkerLoop] lazy val nettyServer = wire[netty.NettyServer] lazy val monitor = wire[Monitor] diff --git a/src/main/scala/Monitor.scala b/src/main/scala/Monitor.scala index 4537af5f..bb386569 100644 --- a/src/main/scala/Monitor.scala +++ b/src/main/scala/Monitor.scala @@ -24,10 +24,9 @@ final class Monitor( val version = System.getProperty("java.version") val memory = Runtime.getRuntime.maxMemory() / 1024 / 1024 - val native = config.getBoolean("netty.native") val useKamon = config.getString("kamon.influxdb.hostname").nonEmpty - logger.info(s"lila-ws 3.0 netty native=$native kamon=$useKamon") + logger.info(s"lila-ws 3.0 netty kamon=$useKamon") logger.info(s"Java version: $version, memory: ${memory}MB") if useKamon then kamon.Kamon.init() diff --git a/src/main/scala/netty/ActorChannelConnector.scala b/src/main/scala/netty/ActorChannelConnector.scala index 3108e19c..053496f7 100644 --- a/src/main/scala/netty/ActorChannelConnector.scala +++ b/src/main/scala/netty/ActorChannelConnector.scala @@ -10,12 +10,15 @@ import org.apache.pekko.actor.typed.ActorRef import lila.ws.Controller.Endpoint import lila.ws.netty.ProtocolHandler.key -final private class ActorChannelConnector(clients: ActorRef[Clients.Control])(using Executor): +final private class ActorChannelConnector(clients: ActorRef[Clients.Control], loop: WorkerLoop)(using + Executor +): def apply(endpoint: Endpoint, channel: Channel): Unit = val clientPromise = Promise[Client]() channel.attr(key.client).set(clientPromise.future) - val channelEmit = emitToChannel(channel) + val channelEmit: ClientEmit = + emitToChannel(channel, withFlush = endpoint.name == "round/play") val monitoredEmit: ClientEmit = (msg: ipc.ClientIn) => endpoint.emitCounter.increment() channelEmit(msg) @@ -27,12 +30,19 @@ final private class ActorChannelConnector(clients: ActorRef[Clients.Control])(us clients ! Clients.Control.Stop(client) } - private def emitToChannel(channel: Channel): ClientEmit = - case ipc.ClientIn.Disconnect(reason) => - channel - .writeAndFlush(CloseWebSocketFrame(WebSocketCloseStatus(4010, reason))) - .addListener(ChannelFutureListener.CLOSE) - case ipc.ClientIn.RoundPingFrameNoFlush => - channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) } - case in => - channel.writeAndFlush(TextWebSocketFrame(in.write)) + private inline def emitDisconnect(inline channel: Channel, inline reason: String): Unit = + channel + .writeAndFlush(CloseWebSocketFrame(WebSocketCloseStatus(4010, reason))) + .addListener(ChannelFutureListener.CLOSE) + + private inline def emitPingFrame(inline channel: Channel): Unit = + channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) } + + private def emitToChannel(channel: Channel, withFlush: Boolean): ClientEmit = + msg => + msg.match + case ipc.ClientIn.Disconnect(reason) => emitDisconnect(channel, reason) + case ipc.ClientIn.RoundPingFrameNoFlush => emitPingFrame(channel) + case in => + if withFlush then channel.writeAndFlush(TextWebSocketFrame(in.write)) + else loop.writeShaped(channel, TextWebSocketFrame(in.write)) diff --git a/src/main/scala/netty/NettyServer.scala b/src/main/scala/netty/NettyServer.scala index cbfaa8ac..f78bf4e0 100644 --- a/src/main/scala/netty/NettyServer.scala +++ b/src/main/scala/netty/NettyServer.scala @@ -4,54 +4,28 @@ package netty import com.typesafe.config.Config import com.typesafe.scalalogging.Logger import io.netty.bootstrap.ServerBootstrap -import io.netty.channel.epoll.{ EpollEventLoopGroup, EpollServerSocketChannel } -import io.netty.channel.kqueue.{ KQueueEventLoopGroup, KQueueServerSocketChannel } -import io.netty.channel.nio.NioEventLoopGroup -import io.netty.channel.socket.nio.NioServerSocketChannel import io.netty.channel.{ Channel, ChannelInitializer } import io.netty.handler.codec.http.* final class NettyServer( clients: ClientSystem, router: Router, - config: Config + config: Config, + workerLoop: WorkerLoop )(using Executor): - private val connector = ActorChannelConnector(clients) - private val logger = Logger(getClass) + private val logger = Logger(getClass) def start(): Unit = logger.info("Start") - val port = config.getInt("http.port") - val workerThreads = config.getInt("netty.threads") - - val (bossGroup, workerGroup, channelClz) = - if !config.getBoolean("netty.native") then - ( - NioEventLoopGroup(1), - NioEventLoopGroup(workerThreads), - classOf[NioServerSocketChannel] - ) - else if System.getProperty("os.name").toLowerCase.startsWith("mac") then - ( - KQueueEventLoopGroup(1), - KQueueEventLoopGroup(workerThreads), - classOf[KQueueServerSocketChannel] - ) - else - ( - EpollEventLoopGroup(1), - EpollEventLoopGroup(workerThreads), - classOf[EpollServerSocketChannel] - ) - + val port = config.getInt("http.port") try val boot = new ServerBootstrap boot - .group(bossGroup, workerGroup) - .channel(channelClz) + .group(workerLoop.parentGroup, workerLoop.group) + .channel(workerLoop.channelClass) .childHandler( new ChannelInitializer[Channel]: override def initChannel(ch: Channel): Unit = @@ -59,7 +33,7 @@ final class NettyServer( pipeline.addLast(HttpServerCodec()) pipeline.addLast(HttpObjectAggregator(4096)) pipeline.addLast(RequestHandler(router)) - pipeline.addLast(ProtocolHandler(connector)) + pipeline.addLast(ProtocolHandler(ActorChannelConnector(clients, workerLoop))) pipeline.addLast(FrameHandler()) ) @@ -70,6 +44,4 @@ final class NettyServer( server.closeFuture().sync() logger.info(s"Closed $port") - finally - bossGroup.shutdownGracefully() - workerGroup.shutdownGracefully() + finally workerLoop.shutdown() diff --git a/src/main/scala/netty/WorkerLoop.scala b/src/main/scala/netty/WorkerLoop.scala new file mode 100644 index 00000000..2af3e64f --- /dev/null +++ b/src/main/scala/netty/WorkerLoop.scala @@ -0,0 +1,43 @@ +package lila.ws +package netty + +import com.typesafe.config.Config +import io.netty.channel.{ Channel, EventLoopGroup } +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame +import io.netty.channel.epoll.{ EpollEventLoopGroup, EpollServerSocketChannel } +import io.netty.channel.kqueue.{ KQueueEventLoopGroup, KQueueServerSocketChannel } +import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit } + +final class WorkerLoop(config: Config)(using Executor): + private val isMacOS = System.getProperty("os.name").toLowerCase.startsWith("mac") + private val step = config.getInt("netty.flush.step") + private val interval: Long = config.getDuration("netty.flush.interval").toNanos + private val maxDelay: Long = config.getDuration("netty.flush.max-delay").toNanos + private val maxDelayFactor: Double = interval.toDouble / maxDelay + private val flushQ = new ConcurrentLinkedQueue[Channel]() + + val channelClass = if isMacOS then classOf[KQueueServerSocketChannel] else classOf[EpollServerSocketChannel] + val parentGroup = makeGroup(1) + val group = makeGroup(config.getInt("netty.threads")) + + private val f = group.scheduleAtFixedRate(() => flush(), 1_000_000_000L, interval, TimeUnit.NANOSECONDS) + + def writeShaped(channel: Channel, frame: TextWebSocketFrame): Unit = + channel.write(frame) + flushQ.add(channel) + + def shutdown(): Unit = + f.cancel(false) + parentGroup.shutdownGracefully() + group.shutdownGracefully() + + private def flush(): Unit = + val channelsToFlush = step.atLeast((flushQ.size * maxDelayFactor).toInt) + val iterator = flushQ.iterator() + for count <- 0 until channelsToFlush if iterator.hasNext do + iterator.next().flush() + iterator.remove() + + private def makeGroup(n: Int): EventLoopGroup = + if isMacOS then new KQueueEventLoopGroup(n) + else new EpollEventLoopGroup(n) From 7befcad7103c571503aace3b04746e492125e905 Mon Sep 17 00:00:00 2001 From: Jonathan Gamble Date: Mon, 23 Sep 2024 17:41:27 -0500 Subject: [PATCH 02/17] scalafix wants things alphabetized --- src/main/scala/netty/WorkerLoop.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/scala/netty/WorkerLoop.scala b/src/main/scala/netty/WorkerLoop.scala index 2af3e64f..cf161530 100644 --- a/src/main/scala/netty/WorkerLoop.scala +++ b/src/main/scala/netty/WorkerLoop.scala @@ -2,10 +2,11 @@ package lila.ws package netty import com.typesafe.config.Config -import io.netty.channel.{ Channel, EventLoopGroup } -import io.netty.handler.codec.http.websocketx.TextWebSocketFrame import io.netty.channel.epoll.{ EpollEventLoopGroup, EpollServerSocketChannel } import io.netty.channel.kqueue.{ KQueueEventLoopGroup, KQueueServerSocketChannel } +import io.netty.channel.{ Channel, EventLoopGroup } +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame + import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit } final class WorkerLoop(config: Config)(using Executor): From 509a207dbd7b9b6445b41b0cbce3314404b32194 Mon Sep 17 00:00:00 2001 From: Jonathan Gamble Date: Mon, 23 Sep 2024 18:02:21 -0500 Subject: [PATCH 03/17] remove unnecessary methods --- src/main/scala/netty/ActorChannelConnector.scala | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/src/main/scala/netty/ActorChannelConnector.scala b/src/main/scala/netty/ActorChannelConnector.scala index 053496f7..bb9a1b43 100644 --- a/src/main/scala/netty/ActorChannelConnector.scala +++ b/src/main/scala/netty/ActorChannelConnector.scala @@ -30,19 +30,15 @@ final private class ActorChannelConnector(clients: ActorRef[Clients.Control], lo clients ! Clients.Control.Stop(client) } - private inline def emitDisconnect(inline channel: Channel, inline reason: String): Unit = - channel - .writeAndFlush(CloseWebSocketFrame(WebSocketCloseStatus(4010, reason))) - .addListener(ChannelFutureListener.CLOSE) - - private inline def emitPingFrame(inline channel: Channel): Unit = - channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) } - private def emitToChannel(channel: Channel, withFlush: Boolean): ClientEmit = msg => msg.match - case ipc.ClientIn.Disconnect(reason) => emitDisconnect(channel, reason) - case ipc.ClientIn.RoundPingFrameNoFlush => emitPingFrame(channel) + case ipc.ClientIn.Disconnect(reason) => + channel + .writeAndFlush(CloseWebSocketFrame(WebSocketCloseStatus(4010, reason))) + .addListener(ChannelFutureListener.CLOSE) + case ipc.ClientIn.RoundPingFrameNoFlush => + channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) } case in => if withFlush then channel.writeAndFlush(TextWebSocketFrame(in.write)) else loop.writeShaped(channel, TextWebSocketFrame(in.write)) From b1918f29ed14194b4b0d74af9bc3f4ce1d9203dd Mon Sep 17 00:00:00 2001 From: Jonathan Gamble Date: Tue, 24 Sep 2024 08:42:11 -0500 Subject: [PATCH 04/17] fix flush --- src/main/scala/netty/WorkerLoop.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/main/scala/netty/WorkerLoop.scala b/src/main/scala/netty/WorkerLoop.scala index cf161530..9304e9c4 100644 --- a/src/main/scala/netty/WorkerLoop.scala +++ b/src/main/scala/netty/WorkerLoop.scala @@ -33,11 +33,15 @@ final class WorkerLoop(config: Config)(using Executor): group.shutdownGracefully() private def flush(): Unit = - val channelsToFlush = step.atLeast((flushQ.size * maxDelayFactor).toInt) - val iterator = flushQ.iterator() - for count <- 0 until channelsToFlush if iterator.hasNext do - iterator.next().flush() - iterator.remove() + var channelsToFlush = step.atLeast((flushQ.size * maxDelayFactor).toInt) + + while channelsToFlush > 0 do + Option(flushQ.poll()) match + case Some(channel) => + channel.eventLoop().execute(() => channel.flush()) + channelsToFlush -= 1 + case _ => + channelsToFlush = 0 private def makeGroup(n: Int): EventLoopGroup = if isMacOS then new KQueueEventLoopGroup(n) From 24adaa7bd0c006e9ba93e8aa198418a55adbac51 Mon Sep 17 00:00:00 2001 From: Jonathan Gamble Date: Tue, 24 Sep 2024 08:50:19 -0500 Subject: [PATCH 05/17] keep endpoint urls out of lower level code --- src/main/scala/Controller.scala | 10 ++++++---- src/main/scala/netty/ActorChannelConnector.scala | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/main/scala/Controller.scala b/src/main/scala/Controller.scala index e456737d..7f767a4e 100644 --- a/src/main/scala/Controller.scala +++ b/src/main/scala/Controller.scala @@ -155,7 +155,8 @@ final class Controller( ) { Deps(emit, req, services) }, header, credits = 100, - interval = 20.seconds + interval = 20.seconds, + alwaysFlush = true ) case _ => notFound @@ -302,14 +303,15 @@ object Controller: val rateLimit: RateLimit, val header: RequestHeader, val emitCounter: kamon.metric.Counter, - val name: String + val alwaysFlush: Boolean ) def endpoint( name: String, behavior: ClientEmit => ClientBehavior, header: RequestHeader, credits: Int, - interval: FiniteDuration + interval: FiniteDuration, + alwaysFlush: Boolean = false ): ResponseSync = Monitor.connection.open(name) Right: @@ -322,7 +324,7 @@ object Controller: ), header, Monitor.clientInCounter(name), - name + alwaysFlush ) type ResponseSync = Either[HttpResponseStatus, Endpoint] diff --git a/src/main/scala/netty/ActorChannelConnector.scala b/src/main/scala/netty/ActorChannelConnector.scala index bb9a1b43..3901049a 100644 --- a/src/main/scala/netty/ActorChannelConnector.scala +++ b/src/main/scala/netty/ActorChannelConnector.scala @@ -18,7 +18,7 @@ final private class ActorChannelConnector(clients: ActorRef[Clients.Control], lo val clientPromise = Promise[Client]() channel.attr(key.client).set(clientPromise.future) val channelEmit: ClientEmit = - emitToChannel(channel, withFlush = endpoint.name == "round/play") + emitToChannel(channel, withFlush = endpoint.alwaysFlush) val monitoredEmit: ClientEmit = (msg: ipc.ClientIn) => endpoint.emitCounter.increment() channelEmit(msg) From ace0faf6a2201898768778e89143db6c141d95b5 Mon Sep 17 00:00:00 2001 From: Jonathan Gamble Date: Tue, 24 Sep 2024 09:51:32 -0500 Subject: [PATCH 06/17] flush() on a closed channel should noop but be safe --- src/main/scala/netty/WorkerLoop.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/netty/WorkerLoop.scala b/src/main/scala/netty/WorkerLoop.scala index 9304e9c4..866dff02 100644 --- a/src/main/scala/netty/WorkerLoop.scala +++ b/src/main/scala/netty/WorkerLoop.scala @@ -38,7 +38,7 @@ final class WorkerLoop(config: Config)(using Executor): while channelsToFlush > 0 do Option(flushQ.poll()) match case Some(channel) => - channel.eventLoop().execute(() => channel.flush()) + if channel.isOpen then channel.eventLoop().execute(() => channel.flush()) channelsToFlush -= 1 case _ => channelsToFlush = 0 From b63ed80bd351c3fb6aee0911fabb08152ce03533 Mon Sep 17 00:00:00 2001 From: Jonathan Gamble Date: Tue, 24 Sep 2024 12:49:00 -0500 Subject: [PATCH 07/17] use mongo store for netty flush settings --- src/main/resources/application.conf | 6 +- src/main/scala/LilaWs.scala | 1 - .../scala/netty/ActorChannelConnector.scala | 58 ++++++++++++++----- src/main/scala/netty/NettyServer.scala | 25 +++++--- src/main/scala/netty/WorkerLoop.scala | 48 --------------- 5 files changed, 64 insertions(+), 74 deletions(-) delete mode 100644 src/main/scala/netty/WorkerLoop.scala diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 7603a398..6e597d6b 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -31,9 +31,9 @@ netty { threads = 0 # auto flush { - step = 100 # target number of messages to flush per interval - interval = 1.millis # interval between flush cycles - max-delay = 500.millis # exceed step when this threshold is passed + step = 100 # minimum number of channels to flush per interval + interval-millis = 1 # interval between flush cycles + max-delay-millis = 500 # max flush step targets this threshold if passed } } diff --git a/src/main/scala/LilaWs.scala b/src/main/scala/LilaWs.scala index 2187a02c..3473bba2 100644 --- a/src/main/scala/LilaWs.scala +++ b/src/main/scala/LilaWs.scala @@ -33,7 +33,6 @@ object LilaWs extends App: lazy val router = wire[Router] lazy val seenAt = wire[SeenAtUpdate] lazy val auth = wire[Auth] - lazy val workerLoop = wire[netty.WorkerLoop] lazy val nettyServer = wire[netty.NettyServer] lazy val monitor = wire[Monitor] diff --git a/src/main/scala/netty/ActorChannelConnector.scala b/src/main/scala/netty/ActorChannelConnector.scala index 3901049a..d33b8d01 100644 --- a/src/main/scala/netty/ActorChannelConnector.scala +++ b/src/main/scala/netty/ActorChannelConnector.scala @@ -5,14 +5,29 @@ import io.netty.buffer.Unpooled import io.netty.channel.* import io.netty.handler.codec.http.websocketx.* import io.netty.util.concurrent.{ Future as NettyFuture, GenericFutureListener } -import org.apache.pekko.actor.typed.ActorRef +import org.apache.pekko.actor.typed.{ ActorRef, Scheduler } import lila.ws.Controller.Endpoint import lila.ws.netty.ProtocolHandler.key -final private class ActorChannelConnector(clients: ActorRef[Clients.Control], loop: WorkerLoop)(using - Executor +final private class ActorChannelConnector( + clients: ActorRef[Clients.Control], + config: com.typesafe.config.Config, + settings: util.SettingStore +)(using + scheduler: Scheduler, + ec: Executor ): + private val step = settings.makeSetting("netty.flush.step", config.getInt("netty.flush.step")) + private val interval = + settings.makeSetting("netty.flush.interval-millis", config.getInt("netty.flush.interval-millis")) + private val maxDelay = + settings.makeSetting("netty.flush.max-delay-millis", config.getInt("netty.flush.max-delay-millis")) + + private val flushQ = new java.util.concurrent.ConcurrentLinkedQueue[Channel]() + + private var future = + scheduler.scheduleOnce(1 second, () => flush()) def apply(endpoint: Endpoint, channel: Channel): Unit = val clientPromise = Promise[Client]() @@ -31,14 +46,29 @@ final private class ActorChannelConnector(clients: ActorRef[Clients.Control], lo } private def emitToChannel(channel: Channel, withFlush: Boolean): ClientEmit = - msg => - msg.match - case ipc.ClientIn.Disconnect(reason) => - channel - .writeAndFlush(CloseWebSocketFrame(WebSocketCloseStatus(4010, reason))) - .addListener(ChannelFutureListener.CLOSE) - case ipc.ClientIn.RoundPingFrameNoFlush => - channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) } - case in => - if withFlush then channel.writeAndFlush(TextWebSocketFrame(in.write)) - else loop.writeShaped(channel, TextWebSocketFrame(in.write)) + case ipc.ClientIn.Disconnect(reason) => + channel + .writeAndFlush(CloseWebSocketFrame(WebSocketCloseStatus(4010, reason))) + .addListener(ChannelFutureListener.CLOSE) + case ipc.ClientIn.RoundPingFrameNoFlush => + channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) } + case in if withFlush || interval.get <= 0 => + channel.writeAndFlush(TextWebSocketFrame(in.write)) + case in => + channel.write(TextWebSocketFrame(in.write)) + flushQ.add(channel) + + private def maxDelayFactor: Double = interval.get.toDouble / maxDelay.get + + private def flush(): Unit = + var channelsToFlush = step.get.atLeast((flushQ.size * maxDelayFactor).toInt) + + while channelsToFlush > 0 do + Option(flushQ.poll()) match + case Some(channel) => + if channel.isOpen then channel.eventLoop().execute(() => channel.flush()) + channelsToFlush -= 1 + case _ => + channelsToFlush = 0 + + if !future.isCancelled then future = scheduler.scheduleOnce(interval.get.millis, () => flush()) diff --git a/src/main/scala/netty/NettyServer.scala b/src/main/scala/netty/NettyServer.scala index f78bf4e0..59de5527 100644 --- a/src/main/scala/netty/NettyServer.scala +++ b/src/main/scala/netty/NettyServer.scala @@ -4,28 +4,35 @@ package netty import com.typesafe.config.Config import com.typesafe.scalalogging.Logger import io.netty.bootstrap.ServerBootstrap +import io.netty.channel.epoll.{ EpollEventLoopGroup, EpollServerSocketChannel } +import io.netty.channel.kqueue.{ KQueueEventLoopGroup, KQueueServerSocketChannel } import io.netty.channel.{ Channel, ChannelInitializer } import io.netty.handler.codec.http.* +import org.apache.pekko.actor.typed.Scheduler final class NettyServer( clients: ClientSystem, router: Router, config: Config, - workerLoop: WorkerLoop -)(using Executor): - + settings: util.SettingStore +)(using Executor, Scheduler): private val logger = Logger(getClass) def start(): Unit = logger.info("Start") - val port = config.getInt("http.port") + val port = config.getInt("http.port") + val threads = config.getInt("netty.threads") + val (parent, workers, channelClass) = + if System.getProperty("os.name").toLowerCase.startsWith("mac") then + (new KQueueEventLoopGroup(1), new KQueueEventLoopGroup(threads), classOf[KQueueServerSocketChannel]) + else (new EpollEventLoopGroup(1), new EpollEventLoopGroup(threads), classOf[EpollServerSocketChannel]) try val boot = new ServerBootstrap boot - .group(workerLoop.parentGroup, workerLoop.group) - .channel(workerLoop.channelClass) + .group(parent, workers) + .channel(channelClass) .childHandler( new ChannelInitializer[Channel]: override def initChannel(ch: Channel): Unit = @@ -33,7 +40,7 @@ final class NettyServer( pipeline.addLast(HttpServerCodec()) pipeline.addLast(HttpObjectAggregator(4096)) pipeline.addLast(RequestHandler(router)) - pipeline.addLast(ProtocolHandler(ActorChannelConnector(clients, workerLoop))) + pipeline.addLast(ProtocolHandler(ActorChannelConnector(clients, config, settings))) pipeline.addLast(FrameHandler()) ) @@ -44,4 +51,6 @@ final class NettyServer( server.closeFuture().sync() logger.info(s"Closed $port") - finally workerLoop.shutdown() + finally + parent.shutdownGracefully() + workers.shutdownGracefully() diff --git a/src/main/scala/netty/WorkerLoop.scala b/src/main/scala/netty/WorkerLoop.scala deleted file mode 100644 index 866dff02..00000000 --- a/src/main/scala/netty/WorkerLoop.scala +++ /dev/null @@ -1,48 +0,0 @@ -package lila.ws -package netty - -import com.typesafe.config.Config -import io.netty.channel.epoll.{ EpollEventLoopGroup, EpollServerSocketChannel } -import io.netty.channel.kqueue.{ KQueueEventLoopGroup, KQueueServerSocketChannel } -import io.netty.channel.{ Channel, EventLoopGroup } -import io.netty.handler.codec.http.websocketx.TextWebSocketFrame - -import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit } - -final class WorkerLoop(config: Config)(using Executor): - private val isMacOS = System.getProperty("os.name").toLowerCase.startsWith("mac") - private val step = config.getInt("netty.flush.step") - private val interval: Long = config.getDuration("netty.flush.interval").toNanos - private val maxDelay: Long = config.getDuration("netty.flush.max-delay").toNanos - private val maxDelayFactor: Double = interval.toDouble / maxDelay - private val flushQ = new ConcurrentLinkedQueue[Channel]() - - val channelClass = if isMacOS then classOf[KQueueServerSocketChannel] else classOf[EpollServerSocketChannel] - val parentGroup = makeGroup(1) - val group = makeGroup(config.getInt("netty.threads")) - - private val f = group.scheduleAtFixedRate(() => flush(), 1_000_000_000L, interval, TimeUnit.NANOSECONDS) - - def writeShaped(channel: Channel, frame: TextWebSocketFrame): Unit = - channel.write(frame) - flushQ.add(channel) - - def shutdown(): Unit = - f.cancel(false) - parentGroup.shutdownGracefully() - group.shutdownGracefully() - - private def flush(): Unit = - var channelsToFlush = step.atLeast((flushQ.size * maxDelayFactor).toInt) - - while channelsToFlush > 0 do - Option(flushQ.poll()) match - case Some(channel) => - if channel.isOpen then channel.eventLoop().execute(() => channel.flush()) - channelsToFlush -= 1 - case _ => - channelsToFlush = 0 - - private def makeGroup(n: Int): EventLoopGroup = - if isMacOS then new KQueueEventLoopGroup(n) - else new EpollEventLoopGroup(n) From 56b90f981e701171864855265cd2c673431f6375 Mon Sep 17 00:00:00 2001 From: Jonathan Gamble Date: Tue, 24 Sep 2024 13:18:50 -0500 Subject: [PATCH 08/17] dont busy wait for interval to become non-zero --- src/main/scala/netty/ActorChannelConnector.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/scala/netty/ActorChannelConnector.scala b/src/main/scala/netty/ActorChannelConnector.scala index d33b8d01..0283f10c 100644 --- a/src/main/scala/netty/ActorChannelConnector.scala +++ b/src/main/scala/netty/ActorChannelConnector.scala @@ -26,8 +26,7 @@ final private class ActorChannelConnector( private val flushQ = new java.util.concurrent.ConcurrentLinkedQueue[Channel]() - private var future = - scheduler.scheduleOnce(1 second, () => flush()) + scheduler.scheduleOnce(1 second, () => flush()) def apply(endpoint: Endpoint, channel: Channel): Unit = val clientPromise = Promise[Client]() @@ -71,4 +70,5 @@ final private class ActorChannelConnector( case _ => channelsToFlush = 0 - if !future.isCancelled then future = scheduler.scheduleOnce(interval.get.millis, () => flush()) + val nextInterval = if interval.get <= 0 then 1.second else interval.get.millis + scheduler.scheduleOnce(nextInterval, () => flush()) From 614315ee56d8ea8a0e75e601de887ae988a1d140 Mon Sep 17 00:00:00 2001 From: Jonathan Gamble Date: Tue, 24 Sep 2024 16:01:47 -0500 Subject: [PATCH 09/17] always flush remaining messages when traffic shaping is disabled --- .../scala/netty/ActorChannelConnector.scala | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/main/scala/netty/ActorChannelConnector.scala b/src/main/scala/netty/ActorChannelConnector.scala index 0283f10c..d5c47469 100644 --- a/src/main/scala/netty/ActorChannelConnector.scala +++ b/src/main/scala/netty/ActorChannelConnector.scala @@ -18,13 +18,10 @@ final private class ActorChannelConnector( scheduler: Scheduler, ec: Executor ): - private val step = settings.makeSetting("netty.flush.step", config.getInt("netty.flush.step")) - private val interval = - settings.makeSetting("netty.flush.interval-millis", config.getInt("netty.flush.interval-millis")) - private val maxDelay = - settings.makeSetting("netty.flush.max-delay-millis", config.getInt("netty.flush.max-delay-millis")) - - private val flushQ = new java.util.concurrent.ConcurrentLinkedQueue[Channel]() + private val step = intSetting("netty.flush.step") + private val interval = intSetting("netty.flush.interval-millis") + private val maxDelay = intSetting("netty.flush.max-delay-millis") + private val flushQ = new java.util.concurrent.ConcurrentLinkedQueue[Channel]() scheduler.scheduleOnce(1 second, () => flush()) @@ -44,6 +41,9 @@ final private class ActorChannelConnector( clients ! Clients.Control.Stop(client) } + private def intSetting(key: String) = + settings.makeSetting(key, config.getInt(key)) + private def emitToChannel(channel: Channel, withFlush: Boolean): ClientEmit = case ipc.ClientIn.Disconnect(reason) => channel @@ -57,9 +57,8 @@ final private class ActorChannelConnector( channel.write(TextWebSocketFrame(in.write)) flushQ.add(channel) - private def maxDelayFactor: Double = interval.get.toDouble / maxDelay.get - private def flush(): Unit = + val maxDelayFactor = maxDelay.get.toDouble / interval.get var channelsToFlush = step.get.atLeast((flushQ.size * maxDelayFactor).toInt) while channelsToFlush > 0 do @@ -70,5 +69,8 @@ final private class ActorChannelConnector( case _ => channelsToFlush = 0 - val nextInterval = if interval.get <= 0 then 1.second else interval.get.millis + val nextInterval = + if interval.get > 0 then interval.get.millis + else if flushQ.isEmpty then 1.second // hibernate + else 1.millis // interval is 0 but we still need to empty the queue scheduler.scheduleOnce(nextInterval, () => flush()) From 73f5c5a9c623e70e19f9ba9dd4876c8966399ff9 Mon Sep 17 00:00:00 2001 From: Thibault Duplessis Date: Wed, 25 Sep 2024 09:46:30 +0200 Subject: [PATCH 10/17] monitor the connector flush config to make it easier to track the effects of changing these values --- src/main/scala/Monitor.scala | 7 +++++++ src/main/scala/netty/ActorChannelConnector.scala | 6 ++++++ 2 files changed, 13 insertions(+) diff --git a/src/main/scala/Monitor.scala b/src/main/scala/Monitor.scala index bb386569..68e612a2 100644 --- a/src/main/scala/Monitor.scala +++ b/src/main/scala/Monitor.scala @@ -181,3 +181,10 @@ object Monitor: val expirable = Kamon.gauge("evalCache.upgrade.expirable").withTag("style", key) val single = Style("single") val multi = Style("multi") + + object connector: + object flush: + object config: + val step = Kamon.gauge("connector.flush.config.step").withoutTags() + val interval = Kamon.gauge("connector.flush.config.interval").withoutTags() + val maxDelay = Kamon.gauge("connector.flush.config.maxDelay").withoutTags() diff --git a/src/main/scala/netty/ActorChannelConnector.scala b/src/main/scala/netty/ActorChannelConnector.scala index d5c47469..2a543e15 100644 --- a/src/main/scala/netty/ActorChannelConnector.scala +++ b/src/main/scala/netty/ActorChannelConnector.scala @@ -22,9 +22,15 @@ final private class ActorChannelConnector( private val interval = intSetting("netty.flush.interval-millis") private val maxDelay = intSetting("netty.flush.max-delay-millis") private val flushQ = new java.util.concurrent.ConcurrentLinkedQueue[Channel]() + private val monitor = Monitor.connector.flush scheduler.scheduleOnce(1 second, () => flush()) + scheduler.scheduleWithFixedDelay(1 minute, 1 minute): () => + monitor.config.step.update(step.get) + monitor.config.interval.update(interval.get) + monitor.config.maxDelay.update(maxDelay.get) + def apply(endpoint: Endpoint, channel: Channel): Unit = val clientPromise = Promise[Client]() channel.attr(key.client).set(clientPromise.future) From f6bcb1dd5ff77ac166ceae8a470410ffd3548614 Mon Sep 17 00:00:00 2001 From: Thibault Duplessis Date: Wed, 25 Sep 2024 09:47:00 +0200 Subject: [PATCH 11/17] increase live setting ttl as I don't think we'll be changing those very often --- src/main/scala/util/SettingStore.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/util/SettingStore.scala b/src/main/scala/util/SettingStore.scala index 609e9185..10a8a110 100644 --- a/src/main/scala/util/SettingStore.scala +++ b/src/main/scala/util/SettingStore.scala @@ -33,7 +33,7 @@ case class Setting[A](default: A, ttl: FiniteDuration)(fetch: () => Future[Optio final class SettingStore(mongo: Mongo)(using Executor, Scheduler): - def makeSetting[A: BSONReader](key: String, default: A, ttl: FiniteDuration = 10.seconds): Setting[A] = + def makeSetting[A: BSONReader](key: String, default: A, ttl: FiniteDuration = 30.seconds): Setting[A] = Setting[A](default, ttl): () => mongo.settingColl.flatMap: _.find(selector = BSONDocument("_id" -> key)) From 7af9d691ecaaf184c84076c361ae5e37e0c4c9f5 Mon Sep 17 00:00:00 2001 From: Thibault Duplessis Date: Wed, 25 Sep 2024 09:55:39 +0200 Subject: [PATCH 12/17] convention for side-effecting functions without arguments including functions that don't always return the same value --- src/main/scala/netty/ActorChannelConnector.scala | 14 +++++++------- src/main/scala/util/SettingStore.scala | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/main/scala/netty/ActorChannelConnector.scala b/src/main/scala/netty/ActorChannelConnector.scala index 2a543e15..d7a8cf6d 100644 --- a/src/main/scala/netty/ActorChannelConnector.scala +++ b/src/main/scala/netty/ActorChannelConnector.scala @@ -27,9 +27,9 @@ final private class ActorChannelConnector( scheduler.scheduleOnce(1 second, () => flush()) scheduler.scheduleWithFixedDelay(1 minute, 1 minute): () => - monitor.config.step.update(step.get) - monitor.config.interval.update(interval.get) - monitor.config.maxDelay.update(maxDelay.get) + monitor.config.step.update(step.get()) + monitor.config.interval.update(interval.get()) + monitor.config.maxDelay.update(maxDelay.get()) def apply(endpoint: Endpoint, channel: Channel): Unit = val clientPromise = Promise[Client]() @@ -57,15 +57,15 @@ final private class ActorChannelConnector( .addListener(ChannelFutureListener.CLOSE) case ipc.ClientIn.RoundPingFrameNoFlush => channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) } - case in if withFlush || interval.get <= 0 => + case in if withFlush || interval.get() <= 0 => channel.writeAndFlush(TextWebSocketFrame(in.write)) case in => channel.write(TextWebSocketFrame(in.write)) flushQ.add(channel) private def flush(): Unit = - val maxDelayFactor = maxDelay.get.toDouble / interval.get - var channelsToFlush = step.get.atLeast((flushQ.size * maxDelayFactor).toInt) + val maxDelayFactor = maxDelay.get().toDouble / interval.get() + var channelsToFlush = step.get().atLeast((flushQ.size * maxDelayFactor).toInt) while channelsToFlush > 0 do Option(flushQ.poll()) match @@ -76,7 +76,7 @@ final private class ActorChannelConnector( channelsToFlush = 0 val nextInterval = - if interval.get > 0 then interval.get.millis + if interval.get() > 0 then interval.get().millis else if flushQ.isEmpty then 1.second // hibernate else 1.millis // interval is 0 but we still need to empty the queue scheduler.scheduleOnce(nextInterval, () => flush()) diff --git a/src/main/scala/util/SettingStore.scala b/src/main/scala/util/SettingStore.scala index 10a8a110..5704032c 100644 --- a/src/main/scala/util/SettingStore.scala +++ b/src/main/scala/util/SettingStore.scala @@ -23,7 +23,7 @@ case class Setting[A](default: A, ttl: FiniteDuration)(fetch: () => Future[Optio ): private var value: A = default - def get: A = value + def get(): A = value private def readFromDb(): Unit = fetch().foreach: opt => From aea879fa9c883aa0ff5b2c72b0fe126bb70686ce Mon Sep 17 00:00:00 2001 From: Thibault Duplessis Date: Wed, 25 Sep 2024 09:58:25 +0200 Subject: [PATCH 13/17] code golf --- src/main/scala/netty/ActorChannelConnector.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/main/scala/netty/ActorChannelConnector.scala b/src/main/scala/netty/ActorChannelConnector.scala index d7a8cf6d..c0a6d47e 100644 --- a/src/main/scala/netty/ActorChannelConnector.scala +++ b/src/main/scala/netty/ActorChannelConnector.scala @@ -34,12 +34,10 @@ final private class ActorChannelConnector( def apply(endpoint: Endpoint, channel: Channel): Unit = val clientPromise = Promise[Client]() channel.attr(key.client).set(clientPromise.future) - val channelEmit: ClientEmit = - emitToChannel(channel, withFlush = endpoint.alwaysFlush) - val monitoredEmit: ClientEmit = (msg: ipc.ClientIn) => + val channelEmit: ClientEmit = (msg: ipc.ClientIn) => endpoint.emitCounter.increment() - channelEmit(msg) - clients ! Clients.Control.Start(endpoint.behavior(monitoredEmit), clientPromise) + emitToChannel(channel, withFlush = endpoint.alwaysFlush) + clients ! Clients.Control.Start(endpoint.behavior(channelEmit), clientPromise) channel.closeFuture.addListener: new GenericFutureListener[NettyFuture[Void]]: def operationComplete(f: NettyFuture[Void]): Unit = From 7ad53abda4ae361524d2f722a2e0f1ef974aab63 Mon Sep 17 00:00:00 2001 From: Thibault Duplessis Date: Wed, 25 Sep 2024 10:10:02 +0200 Subject: [PATCH 14/17] monitor traffic shaping --- src/main/scala/Monitor.scala | 2 ++ src/main/scala/netty/ActorChannelConnector.scala | 6 +++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/scala/Monitor.scala b/src/main/scala/Monitor.scala index 68e612a2..a228bb0b 100644 --- a/src/main/scala/Monitor.scala +++ b/src/main/scala/Monitor.scala @@ -188,3 +188,5 @@ object Monitor: val step = Kamon.gauge("connector.flush.config.step").withoutTags() val interval = Kamon.gauge("connector.flush.config.interval").withoutTags() val maxDelay = Kamon.gauge("connector.flush.config.maxDelay").withoutTags() + val qSize = Kamon.histogram("connector.flush.qSize").withoutTags() + val channelsToFlush = Kamon.histogram("connector.flush.channelsToFlush").withoutTags() diff --git a/src/main/scala/netty/ActorChannelConnector.scala b/src/main/scala/netty/ActorChannelConnector.scala index c0a6d47e..ed61c88e 100644 --- a/src/main/scala/netty/ActorChannelConnector.scala +++ b/src/main/scala/netty/ActorChannelConnector.scala @@ -62,8 +62,12 @@ final private class ActorChannelConnector( flushQ.add(channel) private def flush(): Unit = + val qSize = flushQ.size val maxDelayFactor = maxDelay.get().toDouble / interval.get() - var channelsToFlush = step.get().atLeast((flushQ.size * maxDelayFactor).toInt) + var channelsToFlush = step.get().atLeast((qSize * maxDelayFactor).toInt) + + monitor.qSize.record(qSize) + monitor.channelsToFlush.record(channelsToFlush) while channelsToFlush > 0 do Option(flushQ.poll()) match From 42bbe0b396f81c8243a63ad5a914de1da4eeca56 Mon Sep 17 00:00:00 2001 From: Jonathan Gamble Date: Wed, 25 Sep 2024 08:37:16 -0500 Subject: [PATCH 15/17] fix inverted multiply, divide by zero, and not passing msg to channelEmit --- src/main/resources/application.conf | 4 ++-- src/main/scala/netty/ActorChannelConnector.scala | 5 ++--- src/main/scala/netty/NettyServer.scala | 6 +++--- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 6e597d6b..70e87912 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -31,9 +31,9 @@ netty { threads = 0 # auto flush { - step = 100 # minimum number of channels to flush per interval - interval-millis = 1 # interval between flush cycles + interval-millis = 1 # interval between flush cycles, set to 0 to disable flush queue max-delay-millis = 500 # max flush step targets this threshold if passed + step = 100 # minimum number of channels to flush per interval } } diff --git a/src/main/scala/netty/ActorChannelConnector.scala b/src/main/scala/netty/ActorChannelConnector.scala index ed61c88e..6e71e427 100644 --- a/src/main/scala/netty/ActorChannelConnector.scala +++ b/src/main/scala/netty/ActorChannelConnector.scala @@ -25,7 +25,6 @@ final private class ActorChannelConnector( private val monitor = Monitor.connector.flush scheduler.scheduleOnce(1 second, () => flush()) - scheduler.scheduleWithFixedDelay(1 minute, 1 minute): () => monitor.config.step.update(step.get()) monitor.config.interval.update(interval.get()) @@ -36,7 +35,7 @@ final private class ActorChannelConnector( channel.attr(key.client).set(clientPromise.future) val channelEmit: ClientEmit = (msg: ipc.ClientIn) => endpoint.emitCounter.increment() - emitToChannel(channel, withFlush = endpoint.alwaysFlush) + emitToChannel(channel, withFlush = endpoint.alwaysFlush)(msg) clients ! Clients.Control.Start(endpoint.behavior(channelEmit), clientPromise) channel.closeFuture.addListener: new GenericFutureListener[NettyFuture[Void]]: @@ -63,7 +62,7 @@ final private class ActorChannelConnector( private def flush(): Unit = val qSize = flushQ.size - val maxDelayFactor = maxDelay.get().toDouble / interval.get() + val maxDelayFactor = interval.get().toDouble / maxDelay.get().atLeast(1) var channelsToFlush = step.get().atLeast((qSize * maxDelayFactor).toInt) monitor.qSize.record(qSize) diff --git a/src/main/scala/netty/NettyServer.scala b/src/main/scala/netty/NettyServer.scala index 59de5527..dc4a7c09 100644 --- a/src/main/scala/netty/NettyServer.scala +++ b/src/main/scala/netty/NettyServer.scala @@ -16,12 +16,12 @@ final class NettyServer( config: Config, settings: util.SettingStore )(using Executor, Scheduler): - private val logger = Logger(getClass) + private val logger = Logger(getClass) + private val connector = ActorChannelConnector(clients, config, settings) def start(): Unit = logger.info("Start") - val port = config.getInt("http.port") val threads = config.getInt("netty.threads") val (parent, workers, channelClass) = @@ -40,7 +40,7 @@ final class NettyServer( pipeline.addLast(HttpServerCodec()) pipeline.addLast(HttpObjectAggregator(4096)) pipeline.addLast(RequestHandler(router)) - pipeline.addLast(ProtocolHandler(ActorChannelConnector(clients, config, settings))) + pipeline.addLast(ProtocolHandler(connector)) pipeline.addLast(FrameHandler()) ) From c75b4002e9532c863a3f01cfa6834d163e41dd12 Mon Sep 17 00:00:00 2001 From: Thibault Duplessis Date: Wed, 25 Sep 2024 16:36:50 +0200 Subject: [PATCH 16/17] avoid calling emitToChannel on each message --- src/main/scala/netty/ActorChannelConnector.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/main/scala/netty/ActorChannelConnector.scala b/src/main/scala/netty/ActorChannelConnector.scala index 6e71e427..5b0c52b7 100644 --- a/src/main/scala/netty/ActorChannelConnector.scala +++ b/src/main/scala/netty/ActorChannelConnector.scala @@ -33,9 +33,11 @@ final private class ActorChannelConnector( def apply(endpoint: Endpoint, channel: Channel): Unit = val clientPromise = Promise[Client]() channel.attr(key.client).set(clientPromise.future) - val channelEmit: ClientEmit = (msg: ipc.ClientIn) => - endpoint.emitCounter.increment() - emitToChannel(channel, withFlush = endpoint.alwaysFlush)(msg) + val channelEmit: ClientEmit = + val emitter = emitToChannel(channel, withFlush = endpoint.alwaysFlush) + (msg: ipc.ClientIn) => + endpoint.emitCounter.increment() + emitter(msg) clients ! Clients.Control.Start(endpoint.behavior(channelEmit), clientPromise) channel.closeFuture.addListener: new GenericFutureListener[NettyFuture[Void]]: From fd233a108c97e92b29d2d0d73d55e82c10634ef1 Mon Sep 17 00:00:00 2001 From: Thibault Duplessis Date: Wed, 25 Sep 2024 20:30:48 +0200 Subject: [PATCH 17/17] refactor ActorChannelConnector, no functional change intended --- .../scala/netty/ActorChannelConnector.scala | 41 +++++++++---------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/src/main/scala/netty/ActorChannelConnector.scala b/src/main/scala/netty/ActorChannelConnector.scala index 5b0c52b7..094f19da 100644 --- a/src/main/scala/netty/ActorChannelConnector.scala +++ b/src/main/scala/netty/ActorChannelConnector.scala @@ -12,23 +12,25 @@ import lila.ws.netty.ProtocolHandler.key final private class ActorChannelConnector( clients: ActorRef[Clients.Control], - config: com.typesafe.config.Config, + staticConfig: com.typesafe.config.Config, settings: util.SettingStore -)(using - scheduler: Scheduler, - ec: Executor -): - private val step = intSetting("netty.flush.step") - private val interval = intSetting("netty.flush.interval-millis") - private val maxDelay = intSetting("netty.flush.max-delay-millis") - private val flushQ = new java.util.concurrent.ConcurrentLinkedQueue[Channel]() - private val monitor = Monitor.connector.flush +)(using scheduler: Scheduler, ec: Executor): + + private val flushQ = java.util.concurrent.ConcurrentLinkedQueue[Channel]() + private val monitor = Monitor.connector.flush + + private object config: + private def int(key: String) = settings.makeSetting(key, staticConfig.getInt(key)) + val step = int("netty.flush.step") + val interval = int("netty.flush.interval-millis") + val maxDelay = int("netty.flush.max-delay-millis") + inline def alwaysFlush() = interval.get() <= 0 + scheduler.scheduleWithFixedDelay(1 minute, 1 minute): () => + monitor.config.step.update(step.get()) + monitor.config.interval.update(interval.get()) + monitor.config.maxDelay.update(maxDelay.get()) scheduler.scheduleOnce(1 second, () => flush()) - scheduler.scheduleWithFixedDelay(1 minute, 1 minute): () => - monitor.config.step.update(step.get()) - monitor.config.interval.update(interval.get()) - monitor.config.maxDelay.update(maxDelay.get()) def apply(endpoint: Endpoint, channel: Channel): Unit = val clientPromise = Promise[Client]() @@ -46,9 +48,6 @@ final private class ActorChannelConnector( clients ! Clients.Control.Stop(client) } - private def intSetting(key: String) = - settings.makeSetting(key, config.getInt(key)) - private def emitToChannel(channel: Channel, withFlush: Boolean): ClientEmit = case ipc.ClientIn.Disconnect(reason) => channel @@ -56,7 +55,7 @@ final private class ActorChannelConnector( .addListener(ChannelFutureListener.CLOSE) case ipc.ClientIn.RoundPingFrameNoFlush => channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) } - case in if withFlush || interval.get() <= 0 => + case in if withFlush || config.alwaysFlush() => channel.writeAndFlush(TextWebSocketFrame(in.write)) case in => channel.write(TextWebSocketFrame(in.write)) @@ -64,8 +63,8 @@ final private class ActorChannelConnector( private def flush(): Unit = val qSize = flushQ.size - val maxDelayFactor = interval.get().toDouble / maxDelay.get().atLeast(1) - var channelsToFlush = step.get().atLeast((qSize * maxDelayFactor).toInt) + val maxDelayFactor = config.interval.get().toDouble / config.maxDelay.get().atLeast(1) + var channelsToFlush = config.step.get().atLeast((qSize * maxDelayFactor).toInt) monitor.qSize.record(qSize) monitor.channelsToFlush.record(channelsToFlush) @@ -79,7 +78,7 @@ final private class ActorChannelConnector( channelsToFlush = 0 val nextInterval = - if interval.get() > 0 then interval.get().millis + if !config.alwaysFlush() then config.interval.get().millis else if flushQ.isEmpty then 1.second // hibernate else 1.millis // interval is 0 but we still need to empty the queue scheduler.scheduleOnce(nextInterval, () => flush())