Skip to content
This repository has been archived by the owner on Dec 19, 2023. It is now read-only.

Commit

Permalink
Swap Semaphore(1) + mutable.Map with juc.ConcurrentHashMap (#21)
Browse files Browse the repository at this point in the history
* Add Prometheus integration
* Example
* Update libraries

* Replace mutable Map + Semaphore(1) with a juc ConcurrentHashMap
  • Loading branch information
calvinlfer committed Jul 18, 2023
1 parent 485af5d commit 0cf4a49
Showing 1 changed file with 36 additions and 42 deletions.
78 changes: 36 additions & 42 deletions core/src/main/scala/com/ovoenergy/meters4s/Reporter.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package com.ovoenergy.meters4s

import cats.effect.{Async, Sync}
import cats.effect.Sync
import cats.implicits._
import cats.effect.implicits._
import cats.effect.std.Semaphore
import Reporter.{Counter, DistributionSummary, Gauge, Timer}
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import io.micrometer.core.instrument.{MeterRegistry, Tag}
import io.micrometer.core.{instrument => micrometer}

import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.collection.mutable
import java.util.concurrent.atomic.AtomicInteger

/**
Expand Down Expand Up @@ -236,7 +235,7 @@ object Reporter {
* @param c configuration for this reporter
* @return an effect evaluating to an instance of the wrapping reporter
*/
def createSimple[F[_]: Async](
def createSimple[F[_]: Sync](
c: MetricsConfig
): F[Reporter[F]] = {
fromRegistry[F](new SimpleMeterRegistry, c)
Expand All @@ -249,14 +248,18 @@ object Reporter {
* @param config configuration for this reporter
* @return an effect evaluating to an instance of the wrapping reporter
*/
def fromRegistry[F[_]: Async](
def fromRegistry[F[_]: Sync](
mx: MeterRegistry,
config: MetricsConfig = MetricsConfig()
): F[Reporter[F]] =
Semaphore[F](1).map { sem =>
new MeterRegistryReporter[F](mx, config, mutable.Map.empty, sem)
): F[Reporter[F]] = {
Sync[F].delay {
new MeterRegistryReporter[F](
mx,
config,
new ConcurrentHashMap[GaugeKey, AtomicInteger]
)
}

}
}

private class GaugeKey(
Expand All @@ -281,8 +284,7 @@ private class GaugeKey(
private class MeterRegistryReporter[F[_]](
mx: MeterRegistry,
config: MetricsConfig,
activeGauges: mutable.Map[GaugeKey, AtomicInteger],
gaugeSem: Semaphore[F]
activeGauges: ConcurrentMap[GaugeKey, AtomicInteger]
)(
implicit F: Sync[F]
) extends Reporter[F] {
Expand Down Expand Up @@ -321,10 +323,10 @@ private class MeterRegistryReporter[F[_]](
}
.map { t =>
new Timer[F] {
def record(d: FiniteDuration) =
def record(d: FiniteDuration): F[Unit] =
F.delay(t.record(d.toMillis, MILLISECONDS))

def start = F.delay {
def start: F[Sample[F]] = F.delay {
val internal = micrometer.Timer.start(mx)
new Sample[F] {
def stop: F[Long] = F.delay(internal.stop(t))
Expand All @@ -342,44 +344,37 @@ private class MeterRegistryReporter[F[_]](
}
}

private def registerGauge(
name: String,
tags: java.lang.Iterable[Tag],
initialValue: Int
): F[AtomicInteger] = F.delay(new AtomicInteger(initialValue)).flatTap {
gauge =>
F.blocking(
micrometer.Gauge
.builder(
name,
gauge, { (i: AtomicInteger) => i.doubleValue }
)
.tags(tags)
.register(mx)
)
}

def gauge(
name: String,
tags: Map[String, String],
initialValue: Int
): F[Gauge[F]] = {
val pname = metricName(name)
def registerGauge(
name: String,
tags: java.lang.Iterable[Tag],
initialValue: Int
): AtomicInteger = {
val gauge = new AtomicInteger(initialValue)
micrometer.Gauge
.builder(name, gauge, (i: AtomicInteger) => i.doubleValue)
.tags(tags)
.register(mx)
gauge
}

val pName = metricName(name)
val allTags = effectiveTags(tags)
val gaugeKey = new GaugeKey(pname, allTags)
val gaugeKey = new GaugeKey(pName, allTags)

val gaugeValue: F[AtomicInteger] = gaugeSem.permit.use(_ =>
activeGauges
.get(gaugeKey)
.fold {
registerGauge(pname, allTags, initialValue)
.flatTap(g => F.delay(activeGauges.put(gaugeKey, g)))
}(_.pure[F])
)
val gaugeValue: F[AtomicInteger] = F.delay {
activeGauges.computeIfAbsent(
gaugeKey,
_ => registerGauge(pName, allTags, initialValue)
)
}

gaugeValue.map { counter =>
new Gauge[F] {

def modify(f: Int => Int): F[Unit] =
F.delay {
counter.getAndUpdate(x => f(x))
Expand All @@ -390,7 +385,6 @@ private class MeterRegistryReporter[F[_]](
increment.bracket(_ => action)(_ => decrement)
}
}

}

def summary(
Expand Down

0 comments on commit 0cf4a49

Please sign in to comment.