Skip to content

Commit

Permalink
use mongo store for netty flush settings
Browse files Browse the repository at this point in the history
  • Loading branch information
schlawg committed Sep 24, 2024
1 parent ace0faf commit b63ed80
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 74 deletions.
6 changes: 3 additions & 3 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ netty {
threads = 0 # auto

flush {
step = 100 # target number of messages to flush per interval
interval = 1.millis # interval between flush cycles
max-delay = 500.millis # exceed step when this threshold is passed
step = 100 # minimum number of channels to flush per interval
interval-millis = 1 # interval between flush cycles
max-delay-millis = 500 # max flush step targets this threshold if passed
}
}

Expand Down
1 change: 0 additions & 1 deletion src/main/scala/LilaWs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ object LilaWs extends App:
lazy val router = wire[Router]
lazy val seenAt = wire[SeenAtUpdate]
lazy val auth = wire[Auth]
lazy val workerLoop = wire[netty.WorkerLoop]
lazy val nettyServer = wire[netty.NettyServer]
lazy val monitor = wire[Monitor]

Expand Down
58 changes: 44 additions & 14 deletions src/main/scala/netty/ActorChannelConnector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,29 @@ import io.netty.buffer.Unpooled
import io.netty.channel.*
import io.netty.handler.codec.http.websocketx.*
import io.netty.util.concurrent.{ Future as NettyFuture, GenericFutureListener }
import org.apache.pekko.actor.typed.ActorRef
import org.apache.pekko.actor.typed.{ ActorRef, Scheduler }

import lila.ws.Controller.Endpoint
import lila.ws.netty.ProtocolHandler.key

final private class ActorChannelConnector(clients: ActorRef[Clients.Control], loop: WorkerLoop)(using
Executor
final private class ActorChannelConnector(
clients: ActorRef[Clients.Control],
config: com.typesafe.config.Config,
settings: util.SettingStore
)(using
scheduler: Scheduler,
ec: Executor
):
private val step = settings.makeSetting("netty.flush.step", config.getInt("netty.flush.step"))
private val interval =
settings.makeSetting("netty.flush.interval-millis", config.getInt("netty.flush.interval-millis"))
private val maxDelay =
settings.makeSetting("netty.flush.max-delay-millis", config.getInt("netty.flush.max-delay-millis"))

private val flushQ = new java.util.concurrent.ConcurrentLinkedQueue[Channel]()

private var future =
scheduler.scheduleOnce(1 second, () => flush())

def apply(endpoint: Endpoint, channel: Channel): Unit =
val clientPromise = Promise[Client]()
Expand All @@ -31,14 +46,29 @@ final private class ActorChannelConnector(clients: ActorRef[Clients.Control], lo
}

private def emitToChannel(channel: Channel, withFlush: Boolean): ClientEmit =
msg =>
msg.match
case ipc.ClientIn.Disconnect(reason) =>
channel
.writeAndFlush(CloseWebSocketFrame(WebSocketCloseStatus(4010, reason)))
.addListener(ChannelFutureListener.CLOSE)
case ipc.ClientIn.RoundPingFrameNoFlush =>
channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) }
case in =>
if withFlush then channel.writeAndFlush(TextWebSocketFrame(in.write))
else loop.writeShaped(channel, TextWebSocketFrame(in.write))
case ipc.ClientIn.Disconnect(reason) =>
channel
.writeAndFlush(CloseWebSocketFrame(WebSocketCloseStatus(4010, reason)))
.addListener(ChannelFutureListener.CLOSE)
case ipc.ClientIn.RoundPingFrameNoFlush =>
channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) }
case in if withFlush || interval.get <= 0 =>
channel.writeAndFlush(TextWebSocketFrame(in.write))
case in =>
channel.write(TextWebSocketFrame(in.write))
flushQ.add(channel)

private def maxDelayFactor: Double = interval.get.toDouble / maxDelay.get

private def flush(): Unit =
var channelsToFlush = step.get.atLeast((flushQ.size * maxDelayFactor).toInt)

while channelsToFlush > 0 do
Option(flushQ.poll()) match
case Some(channel) =>
if channel.isOpen then channel.eventLoop().execute(() => channel.flush())
channelsToFlush -= 1
case _ =>
channelsToFlush = 0

if !future.isCancelled then future = scheduler.scheduleOnce(interval.get.millis, () => flush())
25 changes: 17 additions & 8 deletions src/main/scala/netty/NettyServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,43 @@ package netty
import com.typesafe.config.Config
import com.typesafe.scalalogging.Logger
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.epoll.{ EpollEventLoopGroup, EpollServerSocketChannel }
import io.netty.channel.kqueue.{ KQueueEventLoopGroup, KQueueServerSocketChannel }
import io.netty.channel.{ Channel, ChannelInitializer }
import io.netty.handler.codec.http.*
import org.apache.pekko.actor.typed.Scheduler

final class NettyServer(
clients: ClientSystem,
router: Router,
config: Config,
workerLoop: WorkerLoop
)(using Executor):

settings: util.SettingStore
)(using Executor, Scheduler):
private val logger = Logger(getClass)

def start(): Unit =

logger.info("Start")

val port = config.getInt("http.port")
val port = config.getInt("http.port")
val threads = config.getInt("netty.threads")
val (parent, workers, channelClass) =
if System.getProperty("os.name").toLowerCase.startsWith("mac") then
(new KQueueEventLoopGroup(1), new KQueueEventLoopGroup(threads), classOf[KQueueServerSocketChannel])
else (new EpollEventLoopGroup(1), new EpollEventLoopGroup(threads), classOf[EpollServerSocketChannel])
try
val boot = new ServerBootstrap
boot
.group(workerLoop.parentGroup, workerLoop.group)
.channel(workerLoop.channelClass)
.group(parent, workers)
.channel(channelClass)
.childHandler(
new ChannelInitializer[Channel]:
override def initChannel(ch: Channel): Unit =
val pipeline = ch.pipeline()
pipeline.addLast(HttpServerCodec())
pipeline.addLast(HttpObjectAggregator(4096))
pipeline.addLast(RequestHandler(router))
pipeline.addLast(ProtocolHandler(ActorChannelConnector(clients, workerLoop)))
pipeline.addLast(ProtocolHandler(ActorChannelConnector(clients, config, settings)))
pipeline.addLast(FrameHandler())
)

Expand All @@ -44,4 +51,6 @@ final class NettyServer(
server.closeFuture().sync()

logger.info(s"Closed $port")
finally workerLoop.shutdown()
finally
parent.shutdownGracefully()
workers.shutdownGracefully()
48 changes: 0 additions & 48 deletions src/main/scala/netty/WorkerLoop.scala

This file was deleted.

0 comments on commit b63ed80

Please sign in to comment.