Skip to content

Commit

Permalink
Merge branch 'series/0.24' into update/series/0.24/http4s-core-0.23.27
Browse files Browse the repository at this point in the history
  • Loading branch information
rossabaker authored May 28, 2024
2 parents 3508117 + 607294a commit 5d0f1e2
Show file tree
Hide file tree
Showing 7 changed files with 833 additions and 112 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ ThisBuild / developers := List(
)

val Scala213 = "2.13.12"
ThisBuild / crossScalaVersions := Seq("2.12.18", Scala213, "3.3.1")
ThisBuild / crossScalaVersions := Seq("2.12.19", Scala213, "3.3.3")
ThisBuild / scalaVersion := Scala213

lazy val root = project.in(file(".")).aggregate(prometheusMetrics).enablePlugins(NoPublishPlugin)
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.9.9
sbt.version=1.10.0
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@

package org.http4s.metrics.prometheus

import cats.Applicative
import cats.data.NonEmptyList
import cats.effect.Resource
import cats.effect.Sync
import cats.syntax.apply._
import cats.syntax.flatMap._
import cats.syntax.functor._
import io.prometheus.client._
import cats.syntax.all.*
import io.prometheus.client.*
import org.http4s.Method
import org.http4s.Status
import org.http4s.metrics.MetricsOps
Expand All @@ -32,6 +29,8 @@ import org.http4s.metrics.TerminationType.Abnormal
import org.http4s.metrics.TerminationType.Canceled
import org.http4s.metrics.TerminationType.Error
import org.http4s.metrics.TerminationType.Timeout
import org.http4s.metrics.prometheus.Prometheus.registerCollector
import org.http4s.metrics.prometheus.Prometheus.toFlatArray

/** [[MetricsOps]] algebra capable of recording Prometheus metrics
*
Expand Down Expand Up @@ -81,62 +80,67 @@ import org.http4s.metrics.TerminationType.Timeout
*
* termination_type: Enumeration
* values: abnormal, error, timeout
*
* custom labels: custom labels, provided by customLabelsAndValues.map(_._1)
* values: custom label values, provided by customLabelsAndValues.map(_._2)
*/
object Prometheus {
def collectorRegistry[F[_]](implicit F: Sync[F]): Resource[F, CollectorRegistry] =
Resource.make(F.delay(new CollectorRegistry()))(cr => F.blocking(cr.clear()))
final class Prometheus[F[_]: Sync] private (
private val prefix: String,
private val registry: CollectorRegistry,
private val sampleExemplar: F[Option[Map[String, String]]],
private val customLabelsAndValues: List[(String, String)],
private val responseDurationSecondsHistogramBuckets: NonEmptyList[Double],
) { self =>
private def copy(
prefix: String = self.prefix,
registry: CollectorRegistry = self.registry,
sampleExemplar: F[Option[Map[String, String]]] = self.sampleExemplar,
customLabelsAndValues: List[(String, String)] = self.customLabelsAndValues,
responseDurationSecondsHistogramBuckets: NonEmptyList[Double] =
self.responseDurationSecondsHistogramBuckets,
): Prometheus[F] =
new Prometheus[F](
prefix = prefix,
registry = registry,
sampleExemplar = sampleExemplar,
customLabelsAndValues = customLabelsAndValues,
responseDurationSecondsHistogramBuckets = responseDurationSecondsHistogramBuckets,
)

/** Creates a [[MetricsOps]] that supports Prometheus metrics
*
* @param registry a metrics collector registry
* @param prefix a prefix that will be added to all metrics
*/
def metricsOps[F[_]: Sync](
registry: CollectorRegistry,
prefix: String = "org_http4s_server",
responseDurationSecondsHistogramBuckets: NonEmptyList[Double] = DefaultHistogramBuckets,
): Resource[F, MetricsOps[F]] =
for {
metrics <- createMetricsCollection(registry, prefix, responseDurationSecondsHistogramBuckets)
} yield createMetricsOps(metrics, Applicative[F].pure(None))
def withPrefix(prefix: String): Prometheus[F] = copy(prefix = prefix)
def withRegister(registry: CollectorRegistry): Prometheus[F] = copy(registry = registry)

def withSampleExemplar(sampleExemplar: F[Option[Map[String, String]]]): Prometheus[F] =
copy(sampleExemplar = sampleExemplar)

def withCustomLabelsAndValues(
customLabelsAndValues: List[(String, String)]
): Prometheus[F] = copy(customLabelsAndValues = customLabelsAndValues)

def withResponseDurationSecondsHistogramBuckets(
responseDurationSecondsHistogramBuckets: NonEmptyList[Double]
): Prometheus[F] =
copy(responseDurationSecondsHistogramBuckets = responseDurationSecondsHistogramBuckets)

/** Build a [[MetricsOps]] that supports Prometheus metrics */
def build: Resource[F, MetricsOps[F]] = createMetricsCollection.map(createMetricsOps)

private def createMetricsOps(metrics: MetricsCollection): MetricsOps[F] = {
val customLabelValues: List[String] = customLabelsAndValues.map(_._2)
val exemplarLabels: F[Option[Array[String]]] = sampleExemplar.map(_.map(toFlatArray))

/** Creates a [[MetricsOps]] that supports Prometheus metrics and records exemplars.
*
* Warning: The sampler effect is responsible for producing exemplar labels that are valid for the underlying
* implementation as errors happening during metric recording will not be handled! For Prometheus version < 1.0,
* this means the combined length of keys and values may not exceed 128 characters and the parts must adhere
* to the label regex Prometheus defines.
*
* @param registry a metrics collector registry
* @param sampleExemplar an effect that returns the corresponding exemplar labels
* @param prefix a prefix that will be added to all metrics
*/
def metricsOpsWithExemplars[F[_]: Sync](
registry: CollectorRegistry,
sampleExemplar: F[Option[Map[String, String]]],
prefix: String = "org_http4s_server",
responseDurationSecondsHistogramBuckets: NonEmptyList[Double] = DefaultHistogramBuckets,
): Resource[F, MetricsOps[F]] =
for {
metrics <- createMetricsCollection(registry, prefix, responseDurationSecondsHistogramBuckets)
} yield createMetricsOps(metrics, sampleExemplar.map(_.map(toFlatArray)))

private def createMetricsOps[F[_]](
metrics: MetricsCollection,
exemplarLabels: F[Option[Array[String]]],
)(implicit F: Sync[F]): MetricsOps[F] =
new MetricsOps[F] {
override def increaseActiveRequests(classifier: Option[String]): F[Unit] =
F.delay {
Sync[F].delay {
metrics.activeRequests
.labels(label(classifier))
.labels(label(classifier) +: customLabelValues: _*)
.inc()
}

override def decreaseActiveRequests(classifier: Option[String]): F[Unit] =
F.delay {
Sync[F].delay {
metrics.activeRequests
.labels(label(classifier))
.labels(label(classifier) +: customLabelValues: _*)
.dec()
}

Expand All @@ -146,9 +150,14 @@ object Prometheus {
classifier: Option[String],
): F[Unit] =
exemplarLabels.flatMap { exemplarOpt =>
F.delay {
Sync[F].delay {
metrics.responseDuration
.labels(label(classifier), reportMethod(method), Phase.report(Phase.Headers))
.labels(
label(classifier) +:
reportMethod(method) +:
Phase.report(Phase.Headers) +:
customLabelValues: _*
)
.observeWithExemplar(
SimpleTimer.elapsedSecondsFromNanos(0, elapsed),
exemplarOpt.orNull: _*
Expand All @@ -163,15 +172,25 @@ object Prometheus {
classifier: Option[String],
): F[Unit] =
exemplarLabels.flatMap { exemplarOpt =>
F.delay {
Sync[F].delay {
metrics.responseDuration
.labels(label(classifier), reportMethod(method), Phase.report(Phase.Body))
.labels(
label(classifier) +:
reportMethod(method) +:
Phase.report(Phase.Body) +:
customLabelValues: _*
)
.observeWithExemplar(
SimpleTimer.elapsedSecondsFromNanos(0, elapsed),
exemplarOpt.orNull: _*
)
metrics.requests
.labels(label(classifier), reportMethod(method), reportStatus(status))
.labels(
label(classifier) +:
reportMethod(method) +:
reportStatus(status) +:
customLabelValues: _*
)
.incWithExemplar(exemplarOpt.orNull: _*)
}
}
Expand All @@ -190,12 +209,13 @@ object Prometheus {

private def recordCanceled(elapsed: Long, classifier: Option[String]): F[Unit] =
exemplarLabels.flatMap { exemplarOpt =>
F.delay {
Sync[F].delay {
metrics.abnormalTerminations
.labels(
label(classifier),
AbnormalTermination.report(AbnormalTermination.Canceled),
label(Option.empty),
label(classifier) +:
AbnormalTermination.report(AbnormalTermination.Canceled) +:
label(Option.empty) +:
customLabelValues: _*
)
.observeWithExemplar(
SimpleTimer.elapsedSecondsFromNanos(0, elapsed),
Expand All @@ -210,12 +230,13 @@ object Prometheus {
cause: Throwable,
): F[Unit] =
exemplarLabels.flatMap { exemplarOpt =>
F.delay {
Sync[F].delay {
metrics.abnormalTerminations
.labels(
label(classifier),
AbnormalTermination.report(AbnormalTermination.Abnormal),
label(Option(cause.getClass.getName)),
label(classifier) +:
AbnormalTermination.report(AbnormalTermination.Abnormal) +:
label(Option(cause.getClass.getName)) +:
customLabelValues: _*
)
.observeWithExemplar(
SimpleTimer.elapsedSecondsFromNanos(0, elapsed),
Expand All @@ -230,12 +251,13 @@ object Prometheus {
cause: Throwable,
): F[Unit] =
exemplarLabels.flatMap { exemplarOpt =>
F.delay {
Sync[F].delay {
metrics.abnormalTerminations
.labels(
label(classifier),
AbnormalTermination.report(AbnormalTermination.Error),
label(Option(cause.getClass.getName)),
label(classifier) +:
AbnormalTermination.report(AbnormalTermination.Error) +:
label(Option(cause.getClass.getName)) +:
customLabelValues: _*
)
.observeWithExemplar(
SimpleTimer.elapsedSecondsFromNanos(0, elapsed),
Expand All @@ -246,12 +268,13 @@ object Prometheus {

private def recordTimeout(elapsed: Long, classifier: Option[String]): F[Unit] =
exemplarLabels.flatMap { exemplarOpt =>
F.delay {
Sync[F].delay {
metrics.abnormalTerminations
.labels(
label(classifier),
AbnormalTermination.report(AbnormalTermination.Timeout),
label(Option.empty),
label(classifier) +:
AbnormalTermination.report(AbnormalTermination.Timeout) +:
label(Option.empty) +:
customLabelValues: _*
)
.observeWithExemplar(
SimpleTimer.elapsedSecondsFromNanos(0, elapsed),
Expand Down Expand Up @@ -286,19 +309,18 @@ object Prometheus {
case _ => "other"
}
}
}

private def createMetricsCollection: Resource[F, MetricsCollection] = {
val customLabels: List[String] = customLabelsAndValues.map(_._1)

private def createMetricsCollection[F[_]: Sync](
registry: CollectorRegistry,
prefix: String,
responseDurationSecondsHistogramBuckets: NonEmptyList[Double],
): Resource[F, MetricsCollection] = {
val responseDuration: Resource[F, Histogram] = registerCollector(
Histogram
.build()
.buckets(responseDurationSecondsHistogramBuckets.toList: _*)
.name(prefix + "_" + "response_duration_seconds")
.help("Response Duration in seconds.")
.labelNames("classifier", "method", "phase")
.labelNames("classifier" +: "method" +: "phase" +: customLabels: _*)
.create(),
registry,
)
Expand All @@ -308,7 +330,7 @@ object Prometheus {
.build()
.name(prefix + "_" + "active_request_count")
.help("Total Active Requests.")
.labelNames("classifier")
.labelNames("classifier" +: customLabels: _*)
.create(),
registry,
)
Expand All @@ -318,7 +340,7 @@ object Prometheus {
.build()
.name(prefix + "_" + "request_count")
.help("Total Requests.")
.labelNames("classifier", "method", "status")
.labelNames("classifier" +: "method" +: "status" +: customLabels: _*)
.create(),
registry,
)
Expand All @@ -328,13 +350,58 @@ object Prometheus {
.build()
.name(prefix + "_" + "abnormal_terminations")
.help("Total Abnormal Terminations.")
.labelNames("classifier", "termination_type", "cause")
.labelNames("classifier" +: "termination_type" +: "cause" +: customLabels: _*)
.create(),
registry,
)

(responseDuration, activeRequests, requests, abnormalTerminations).mapN(MetricsCollection.apply)
}
}

object Prometheus {
def collectorRegistry[F[_]](implicit F: Sync[F]): Resource[F, CollectorRegistry] =
Resource.make(F.delay(new CollectorRegistry()))(cr => F.blocking(cr.clear()))

/** Creates a [[MetricsOps]] that supports Prometheus metrics
*
* @param registry a metrics collector registry
* @param prefix a prefix that will be added to all metrics
*/
def metricsOps[F[_]: Sync](
registry: CollectorRegistry,
prefix: String = "org_http4s_server",
responseDurationSecondsHistogramBuckets: NonEmptyList[Double] = DefaultHistogramBuckets,
): Resource[F, MetricsOps[F]] =
Prometheus
.default(registry)
.withPrefix(prefix)
.withResponseDurationSecondsHistogramBuckets(responseDurationSecondsHistogramBuckets)
.build

/** Creates a [[MetricsOps]] that supports Prometheus metrics and records exemplars.
*
* Warning: The sampler effect is responsible for producing exemplar labels that are valid for the underlying
* implementation as errors happening during metric recording will not be handled! For Prometheus version < 1.0,
* this means the combined length of keys and values may not exceed 128 characters and the parts must adhere
* to the label regex Prometheus defines.
*
* @param registry a metrics collector registry
* @param sampleExemplar an effect that returns the corresponding exemplar labels
* @param prefix a prefix that will be added to all metrics
*/
def metricsOpsWithExemplars[F[_]: Sync](
registry: CollectorRegistry,
sampleExemplar: F[Option[Map[String, String]]],
prefix: String = "org_http4s_server",
responseDurationSecondsHistogramBuckets: NonEmptyList[Double] = DefaultHistogramBuckets,
): Resource[F, MetricsOps[F]] =
Prometheus
.default[F](registry)
.withPrefix(prefix)
.withSampleExemplar(sampleExemplar)
.withResponseDurationSecondsHistogramBuckets(responseDurationSecondsHistogramBuckets)
.build

private[prometheus] def registerCollector[F[_], C <: Collector](
collector: C,
Expand All @@ -345,7 +412,7 @@ object Prometheus {
)

// https://github.com/prometheus/client_java/blob/parent-0.6.0/simpleclient/src/main/java/io/prometheus/client/Histogram.java#L73
private val DefaultHistogramBuckets: NonEmptyList[Double] =
val DefaultHistogramBuckets: NonEmptyList[Double] =
NonEmptyList(.005, List(.01, .025, .05, .075, .1, .25, .5, .75, 1, 2.5, 5, 7.5, 10))

// Prometheus expects exemplars as alternating key-value strings: k1, v1, k2, v2, ...
Expand All @@ -359,6 +426,16 @@ object Prometheus {
}
arr
}

def default[F[_]: Sync](registry: CollectorRegistry) =
new Prometheus[F](
prefix = "org_http4s_server",
registry = registry,
sampleExemplar = Option.empty[Map[String, String]].pure,
customLabelsAndValues = List.empty,
responseDurationSecondsHistogramBuckets = DefaultHistogramBuckets,
)

}

final case class MetricsCollection(
Expand Down
Loading

0 comments on commit 5d0f1e2

Please sign in to comment.