Skip to content

Commit

Permalink
Merge pull request #601 from schlawg/ws-toilet
Browse files Browse the repository at this point in the history
shape outbound traffic on endpoints other than round/play
  • Loading branch information
ornicar authored Sep 26, 2024
2 parents 69fce9d + fd233a1 commit aa78adb
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 53 deletions.
12 changes: 10 additions & 2 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ study.mongo.uri = ${mongo.uri}
yolo.mongo.uri = ${mongo.uri}
redis.uri = "redis://127.0.0.1"
csrf.origin = "http://localhost:9663"
netty.native = true
netty.threads = 0 # auto
cookie.name = "lila2"

socialGraph {
Expand All @@ -29,6 +27,16 @@ reactivemongo {
}
}

netty {
threads = 0 # auto

flush {
interval-millis = 1 # interval between flush cycles, set to 0 to disable flush queue
max-delay-millis = 500 # max flush step targets this threshold if passed
step = 100 # minimum number of channels to flush per interval
}
}

storm.secret = "somethingElseInProd"
oauth.secret = "somethingElseInProd"

Expand Down
12 changes: 8 additions & 4 deletions src/main/scala/Controller.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ final class Controller(
) { Deps(emit, req, services) },
header,
credits = 100,
interval = 20.seconds
interval = 20.seconds,
alwaysFlush = true
)
case _ => notFound

Expand Down Expand Up @@ -301,14 +302,16 @@ object Controller:
val behavior: ClientEmit => ClientBehavior,
val rateLimit: RateLimit,
val header: RequestHeader,
val emitCounter: kamon.metric.Counter
val emitCounter: kamon.metric.Counter,
val alwaysFlush: Boolean
)
def endpoint(
name: String,
behavior: ClientEmit => ClientBehavior,
header: RequestHeader,
credits: Int,
interval: FiniteDuration
interval: FiniteDuration,
alwaysFlush: Boolean = false
): ResponseSync =
Monitor.connection.open(name)
Right:
Expand All @@ -320,7 +323,8 @@ object Controller:
name = name
),
header,
Monitor.clientInCounter(name)
Monitor.clientInCounter(name),
alwaysFlush
)

type ResponseSync = Either[HttpResponseStatus, Endpoint]
Expand Down
12 changes: 10 additions & 2 deletions src/main/scala/Monitor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ final class Monitor(

val version = System.getProperty("java.version")
val memory = Runtime.getRuntime.maxMemory() / 1024 / 1024
val native = config.getBoolean("netty.native")
val useKamon = config.getString("kamon.influxdb.hostname").nonEmpty

logger.info(s"lila-ws 3.0 netty native=$native kamon=$useKamon")
logger.info(s"lila-ws 3.0 netty kamon=$useKamon")
logger.info(s"Java version: $version, memory: ${memory}MB")

if useKamon then kamon.Kamon.init()
Expand Down Expand Up @@ -182,3 +181,12 @@ object Monitor:
val expirable = Kamon.gauge("evalCache.upgrade.expirable").withTag("style", key)
val single = Style("single")
val multi = Style("multi")

object connector:
object flush:
object config:
val step = Kamon.gauge("connector.flush.config.step").withoutTags()
val interval = Kamon.gauge("connector.flush.config.interval").withoutTags()
val maxDelay = Kamon.gauge("connector.flush.config.maxDelay").withoutTags()
val qSize = Kamon.histogram("connector.flush.qSize").withoutTags()
val channelsToFlush = Kamon.histogram("connector.flush.channelsToFlush").withoutTags()
64 changes: 55 additions & 9 deletions src/main/scala/netty/ActorChannelConnector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,80 @@ import io.netty.buffer.Unpooled
import io.netty.channel.*
import io.netty.handler.codec.http.websocketx.*
import io.netty.util.concurrent.{ Future as NettyFuture, GenericFutureListener }
import org.apache.pekko.actor.typed.ActorRef
import org.apache.pekko.actor.typed.{ ActorRef, Scheduler }

import lila.ws.Controller.Endpoint
import lila.ws.netty.ProtocolHandler.key

final private class ActorChannelConnector(clients: ActorRef[Clients.Control])(using Executor):
final private class ActorChannelConnector(
clients: ActorRef[Clients.Control],
staticConfig: com.typesafe.config.Config,
settings: util.SettingStore
)(using scheduler: Scheduler, ec: Executor):

private val flushQ = java.util.concurrent.ConcurrentLinkedQueue[Channel]()
private val monitor = Monitor.connector.flush

private object config:
private def int(key: String) = settings.makeSetting(key, staticConfig.getInt(key))
val step = int("netty.flush.step")
val interval = int("netty.flush.interval-millis")
val maxDelay = int("netty.flush.max-delay-millis")
inline def alwaysFlush() = interval.get() <= 0
scheduler.scheduleWithFixedDelay(1 minute, 1 minute): () =>
monitor.config.step.update(step.get())
monitor.config.interval.update(interval.get())
monitor.config.maxDelay.update(maxDelay.get())

scheduler.scheduleOnce(1 second, () => flush())

def apply(endpoint: Endpoint, channel: Channel): Unit =
val clientPromise = Promise[Client]()
channel.attr(key.client).set(clientPromise.future)
val channelEmit = emitToChannel(channel)
val monitoredEmit: ClientEmit = (msg: ipc.ClientIn) =>
endpoint.emitCounter.increment()
channelEmit(msg)
clients ! Clients.Control.Start(endpoint.behavior(monitoredEmit), clientPromise)
val channelEmit: ClientEmit =
val emitter = emitToChannel(channel, withFlush = endpoint.alwaysFlush)
(msg: ipc.ClientIn) =>
endpoint.emitCounter.increment()
emitter(msg)
clients ! Clients.Control.Start(endpoint.behavior(channelEmit), clientPromise)
channel.closeFuture.addListener:
new GenericFutureListener[NettyFuture[Void]]:
def operationComplete(f: NettyFuture[Void]): Unit =
channel.attr(key.client).get.foreach { client =>
clients ! Clients.Control.Stop(client)
}

private def emitToChannel(channel: Channel): ClientEmit =
private def emitToChannel(channel: Channel, withFlush: Boolean): ClientEmit =
case ipc.ClientIn.Disconnect(reason) =>
channel
.writeAndFlush(CloseWebSocketFrame(WebSocketCloseStatus(4010, reason)))
.addListener(ChannelFutureListener.CLOSE)
case ipc.ClientIn.RoundPingFrameNoFlush =>
channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) }
case in =>
case in if withFlush || config.alwaysFlush() =>
channel.writeAndFlush(TextWebSocketFrame(in.write))
case in =>
channel.write(TextWebSocketFrame(in.write))
flushQ.add(channel)

private def flush(): Unit =
val qSize = flushQ.size
val maxDelayFactor = config.interval.get().toDouble / config.maxDelay.get().atLeast(1)
var channelsToFlush = config.step.get().atLeast((qSize * maxDelayFactor).toInt)

monitor.qSize.record(qSize)
monitor.channelsToFlush.record(channelsToFlush)

while channelsToFlush > 0 do
Option(flushQ.poll()) match
case Some(channel) =>
if channel.isOpen then channel.eventLoop().execute(() => channel.flush())
channelsToFlush -= 1
case _ =>
channelsToFlush = 0

val nextInterval =
if !config.alwaysFlush() then config.interval.get().millis
else if flushQ.isEmpty then 1.second // hibernate
else 1.millis // interval is 0 but we still need to empty the queue
scheduler.scheduleOnce(nextInterval, () => flush())
49 changes: 15 additions & 34 deletions src/main/scala/netty/NettyServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,52 +6,33 @@ import com.typesafe.scalalogging.Logger
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.epoll.{ EpollEventLoopGroup, EpollServerSocketChannel }
import io.netty.channel.kqueue.{ KQueueEventLoopGroup, KQueueServerSocketChannel }
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.channel.{ Channel, ChannelInitializer }
import io.netty.handler.codec.http.*
import org.apache.pekko.actor.typed.Scheduler

final class NettyServer(
clients: ClientSystem,
router: Router,
config: Config
)(using Executor):

private val connector = ActorChannelConnector(clients)
config: Config,
settings: util.SettingStore
)(using Executor, Scheduler):
private val logger = Logger(getClass)
private val connector = ActorChannelConnector(clients, config, settings)

def start(): Unit =

logger.info("Start")

val port = config.getInt("http.port")
val workerThreads = config.getInt("netty.threads")

val (bossGroup, workerGroup, channelClz) =
if !config.getBoolean("netty.native") then
(
NioEventLoopGroup(1),
NioEventLoopGroup(workerThreads),
classOf[NioServerSocketChannel]
)
else if System.getProperty("os.name").toLowerCase.startsWith("mac") then
(
KQueueEventLoopGroup(1),
KQueueEventLoopGroup(workerThreads),
classOf[KQueueServerSocketChannel]
)
else
(
EpollEventLoopGroup(1),
EpollEventLoopGroup(workerThreads),
classOf[EpollServerSocketChannel]
)

val port = config.getInt("http.port")
val threads = config.getInt("netty.threads")
val (parent, workers, channelClass) =
if System.getProperty("os.name").toLowerCase.startsWith("mac") then
(new KQueueEventLoopGroup(1), new KQueueEventLoopGroup(threads), classOf[KQueueServerSocketChannel])
else (new EpollEventLoopGroup(1), new EpollEventLoopGroup(threads), classOf[EpollServerSocketChannel])
try
val boot = new ServerBootstrap
boot
.group(bossGroup, workerGroup)
.channel(channelClz)
.group(parent, workers)
.channel(channelClass)
.childHandler(
new ChannelInitializer[Channel]:
override def initChannel(ch: Channel): Unit =
Expand All @@ -71,5 +52,5 @@ final class NettyServer(

logger.info(s"Closed $port")
finally
bossGroup.shutdownGracefully()
workerGroup.shutdownGracefully()
parent.shutdownGracefully()
workers.shutdownGracefully()
4 changes: 2 additions & 2 deletions src/main/scala/util/SettingStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ case class Setting[A](default: A, ttl: FiniteDuration)(fetch: () => Future[Optio
):
private var value: A = default

def get: A = value
def get(): A = value

private def readFromDb(): Unit =
fetch().foreach: opt =>
Expand All @@ -33,7 +33,7 @@ case class Setting[A](default: A, ttl: FiniteDuration)(fetch: () => Future[Optio

final class SettingStore(mongo: Mongo)(using Executor, Scheduler):

def makeSetting[A: BSONReader](key: String, default: A, ttl: FiniteDuration = 10.seconds): Setting[A] =
def makeSetting[A: BSONReader](key: String, default: A, ttl: FiniteDuration = 30.seconds): Setting[A] =
Setting[A](default, ttl): () =>
mongo.settingColl.flatMap:
_.find(selector = BSONDocument("_id" -> key))
Expand Down

0 comments on commit aa78adb

Please sign in to comment.