Skip to content

Commit

Permalink
Merge pull request #131 from ybasket/record-exemplars
Browse files Browse the repository at this point in the history
Record Prometheus exemplars
  • Loading branch information
ybasket authored Oct 9, 2023
2 parents c41c42a + d78e521 commit 92ef84a
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 45 deletions.
5 changes: 5 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,8 @@ val prefixedClient: Resource[IO, Client[IO]] =
metrics <- Prometheus.metricsOps[IO](registry, "prefix")
} yield Metrics[IO](metrics, classifier)(httpClient)
```

## Exemplars

You can add Prometheus exemplars to most of the metrics (except gauges) recorded by `http4s-prometheus-metrics`
by using `Prometheus.metricsOpsWithExemplars` and passing an effect that captures the related exemplar labels.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@

package org.http4s.metrics.prometheus

import cats.Applicative
import cats.data.NonEmptyList
import cats.effect.Resource
import cats.effect.Sync
import cats.syntax.apply._
import cats.syntax.flatMap._
import cats.syntax.functor._
import io.prometheus.client._
import org.http4s.Method
import org.http4s.Status
Expand Down Expand Up @@ -95,10 +98,32 @@ object Prometheus {
): Resource[F, MetricsOps[F]] =
for {
metrics <- createMetricsCollection(registry, prefix, responseDurationSecondsHistogramBuckets)
} yield createMetricsOps(metrics)
} yield createMetricsOps(metrics, Applicative[F].pure(None))

/** Creates a [[MetricsOps]] that supports Prometheus metrics and records exemplars.
*
* Warning: The sampler effect is responsible for producing exemplar labels that are valid for the underlying
* implementation as errors happening during metric recording will not be handled! For Prometheus version < 1.0,
* this means the combined length of keys and values may not exceed 128 characters and the parts must adhere
* to the label regex Prometheus defines.
*
* @param registry a metrics collector registry
* @param sampleExemplar an effect that returns the corresponding exemplar labels
* @param prefix a prefix that will be added to all metrics
*/
def metricsOpsWithExemplars[F[_]: Sync](
registry: CollectorRegistry,
sampleExemplar: F[Option[Map[String, String]]],
prefix: String = "org_http4s_server",
responseDurationSecondsHistogramBuckets: NonEmptyList[Double] = DefaultHistogramBuckets,
): Resource[F, MetricsOps[F]] =
for {
metrics <- createMetricsCollection(registry, prefix, responseDurationSecondsHistogramBuckets)
} yield createMetricsOps(metrics, sampleExemplar.map(_.map(toFlatArray)))

private def createMetricsOps[F[_]](
metrics: MetricsCollection
metrics: MetricsCollection,
exemplarLabels: F[Option[Array[String]]],
)(implicit F: Sync[F]): MetricsOps[F] =
new MetricsOps[F] {
override def increaseActiveRequests(classifier: Option[String]): F[Unit] =
Expand All @@ -120,10 +145,15 @@ object Prometheus {
elapsed: Long,
classifier: Option[String],
): F[Unit] =
F.delay {
metrics.responseDuration
.labels(label(classifier), reportMethod(method), Phase.report(Phase.Headers))
.observe(SimpleTimer.elapsedSecondsFromNanos(0, elapsed))
exemplarLabels.flatMap { exemplarOpt =>
F.delay {
metrics.responseDuration
.labels(label(classifier), reportMethod(method), Phase.report(Phase.Headers))
.observeWithExemplar(
SimpleTimer.elapsedSecondsFromNanos(0, elapsed),
exemplarOpt.orNull: _*
)
}
}

override def recordTotalTime(
Expand All @@ -132,13 +162,18 @@ object Prometheus {
elapsed: Long,
classifier: Option[String],
): F[Unit] =
F.delay {
metrics.responseDuration
.labels(label(classifier), reportMethod(method), Phase.report(Phase.Body))
.observe(SimpleTimer.elapsedSecondsFromNanos(0, elapsed))
metrics.requests
.labels(label(classifier), reportMethod(method), reportStatus(status))
.inc()
exemplarLabels.flatMap { exemplarOpt =>
F.delay {
metrics.responseDuration
.labels(label(classifier), reportMethod(method), Phase.report(Phase.Body))
.observeWithExemplar(
SimpleTimer.elapsedSecondsFromNanos(0, elapsed),
exemplarOpt.orNull: _*
)
metrics.requests
.labels(label(classifier), reportMethod(method), reportStatus(status))
.incWithExemplar(exemplarOpt.orNull: _*)
}
}

override def recordAbnormalTermination(
Expand All @@ -154,55 +189,75 @@ object Prometheus {
}

private def recordCanceled(elapsed: Long, classifier: Option[String]): F[Unit] =
F.delay {
metrics.abnormalTerminations
.labels(
label(classifier),
AbnormalTermination.report(AbnormalTermination.Canceled),
label(Option.empty),
)
.observe(SimpleTimer.elapsedSecondsFromNanos(0, elapsed))
exemplarLabels.flatMap { exemplarOpt =>
F.delay {
metrics.abnormalTerminations
.labels(
label(classifier),
AbnormalTermination.report(AbnormalTermination.Canceled),
label(Option.empty),
)
.observeWithExemplar(
SimpleTimer.elapsedSecondsFromNanos(0, elapsed),
exemplarOpt.orNull: _*
)
}
}

private def recordAbnormal(
elapsed: Long,
classifier: Option[String],
cause: Throwable,
): F[Unit] =
F.delay {
metrics.abnormalTerminations
.labels(
label(classifier),
AbnormalTermination.report(AbnormalTermination.Abnormal),
label(Option(cause.getClass.getName)),
)
.observe(SimpleTimer.elapsedSecondsFromNanos(0, elapsed))
exemplarLabels.flatMap { exemplarOpt =>
F.delay {
metrics.abnormalTerminations
.labels(
label(classifier),
AbnormalTermination.report(AbnormalTermination.Abnormal),
label(Option(cause.getClass.getName)),
)
.observeWithExemplar(
SimpleTimer.elapsedSecondsFromNanos(0, elapsed),
exemplarOpt.orNull: _*
)
}
}

private def recordError(
elapsed: Long,
classifier: Option[String],
cause: Throwable,
): F[Unit] =
F.delay {
metrics.abnormalTerminations
.labels(
label(classifier),
AbnormalTermination.report(AbnormalTermination.Error),
label(Option(cause.getClass.getName)),
)
.observe(SimpleTimer.elapsedSecondsFromNanos(0, elapsed))
exemplarLabels.flatMap { exemplarOpt =>
F.delay {
metrics.abnormalTerminations
.labels(
label(classifier),
AbnormalTermination.report(AbnormalTermination.Error),
label(Option(cause.getClass.getName)),
)
.observeWithExemplar(
SimpleTimer.elapsedSecondsFromNanos(0, elapsed),
exemplarOpt.orNull: _*
)
}
}

private def recordTimeout(elapsed: Long, classifier: Option[String]): F[Unit] =
F.delay {
metrics.abnormalTerminations
.labels(
label(classifier),
AbnormalTermination.report(AbnormalTermination.Timeout),
label(Option.empty),
)
.observe(SimpleTimer.elapsedSecondsFromNanos(0, elapsed))
exemplarLabels.flatMap { exemplarOpt =>
F.delay {
metrics.abnormalTerminations
.labels(
label(classifier),
AbnormalTermination.report(AbnormalTermination.Timeout),
label(Option.empty),
)
.observeWithExemplar(
SimpleTimer.elapsedSecondsFromNanos(0, elapsed),
exemplarOpt.orNull: _*
)
}
}

private def label(value: Option[String]): String = value.getOrElse("")
Expand Down Expand Up @@ -292,6 +347,18 @@ object Prometheus {
// https://github.com/prometheus/client_java/blob/parent-0.6.0/simpleclient/src/main/java/io/prometheus/client/Histogram.java#L73
private val DefaultHistogramBuckets: NonEmptyList[Double] =
NonEmptyList(.005, List(.01, .025, .05, .075, .1, .25, .5, .75, 1, 2.5, 5, 7.5, 10))

// Prometheus expects exemplars as alternating key-value strings: k1, v1, k2, v2, ...
private def toFlatArray(m: Map[String, String]): Array[String] = {
val arr = new Array[String](m.size * 2)
var i = 0
m.foreach { case (key, value) =>
arr(i) = key
arr(i + 1) = value
i += 2
}
arr
}
}

final case class MetricsCollection(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2018 http4s.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.http4s.metrics.prometheus

import cats.effect.*
import io.prometheus.client.CollectorRegistry
import io.prometheus.client.exemplars.Exemplar
import munit.CatsEffectSuite
import org.http4s.HttpApp
import org.http4s.client.Client
import org.http4s.client.middleware.Metrics
import org.http4s.metrics.prometheus.util.*

class PrometheusExemplarsSuite extends CatsEffectSuite {
val client: Client[IO] = Client.fromHttpApp[IO](HttpApp[IO](stub))

meteredClient(exemplar = Map("trace_id" -> "123")).test(
"A http client with a prometheus metrics middleware should sample an exemplar"
) { case (registry, client) =>
client.expect[String]("/ok").map { resp =>
val filter = new java.util.HashSet[String]()
filter.add("exemplars_request_count_total")
val exemplar: Exemplar = registry
.filteredMetricFamilySamples(filter)
.nextElement()
.samples
.get(0)
.exemplar

assertEquals(exemplar.getLabelName(0), "trace_id")
assertEquals(exemplar.getLabelValue(0), "123")
assertEquals(resp, "200 OK")
}
}

private def buildMeteredClient(
exemplar: Map[String, String]
): Resource[IO, (CollectorRegistry, Client[IO])] = {
implicit val clock: Clock[IO] = FakeClock[IO]

for {
registry <- Prometheus.collectorRegistry[IO]
metrics <- Prometheus
.metricsOpsWithExemplars[IO](registry, IO.pure(Some(exemplar)), "exemplars")
} yield (registry, Metrics[IO](metrics)(client))
}

def meteredClient(
exemplar: Map[String, String]
): SyncIO[FunFixture[(CollectorRegistry, Client[IO])]] =
ResourceFixture(buildMeteredClient(exemplar))
}

0 comments on commit 92ef84a

Please sign in to comment.