Skip to content

Commit

Permalink
monitor websocket messages sent per endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
ornicar committed Sep 20, 2024
1 parent 97c0a9e commit cd1179e
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 3 deletions.
6 changes: 4 additions & 2 deletions src/main/scala/Controller.scala
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,8 @@ object Controller:
final class Endpoint(
val behavior: ClientEmit => ClientBehavior,
val rateLimit: RateLimit,
val header: RequestHeader
val header: RequestHeader,
val emitCounter: kamon.metric.Counter
)
def endpoint(
name: String,
Expand All @@ -318,7 +319,8 @@ object Controller:
intervalMillis = interval.toMillis.toInt,
name = name
),
header
header,
Monitor.clientInCounter(name)
)

type ResponseSync = Either[HttpResponseStatus, Endpoint]
Expand Down
4 changes: 4 additions & 0 deletions src/main/scala/Monitor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import kamon.tag.TagSet
import java.util.concurrent.TimeUnit

import lila.ws.util.Domain
import kamon.metric.Counter

final class Monitor(
config: Config,
Expand Down Expand Up @@ -75,6 +76,9 @@ object Monitor:
.withTag("name", name)
.increment()

def clientInCounter(endpointName: String): Counter =
Kamon.counter("client.in").withTag("name", endpointName)

val historyRoomSize = Kamon.gauge("history.room.size").withoutTags()
val historyRoundSize = Kamon.gauge("history.round.size").withoutTags()

Expand Down
6 changes: 5 additions & 1 deletion src/main/scala/netty/ActorChannelConnector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ final private class ActorChannelConnector(clients: ActorRef[Clients.Control])(us
def apply(endpoint: Endpoint, channel: Channel): Unit =
val clientPromise = Promise[Client]()
channel.attr(key.client).set(clientPromise.future)
clients ! Clients.Control.Start(endpoint.behavior(emitToChannel(channel)), clientPromise)
val channelEmit = emitToChannel(channel)
val monitoredEmit: ClientEmit = (msg: ipc.ClientIn) =>
endpoint.emitCounter.increment()
channelEmit(msg)
clients ! Clients.Control.Start(endpoint.behavior(monitoredEmit), clientPromise)
channel.closeFuture.addListener:
new GenericFutureListener[NettyFuture[Void]]:
def operationComplete(f: NettyFuture[Void]): Unit =
Expand Down

0 comments on commit cd1179e

Please sign in to comment.