diff --git a/build.sbt b/build.sbt index 1989072..68e649a 100644 --- a/build.sbt +++ b/build.sbt @@ -4,12 +4,12 @@ ThisBuild / developers := List( ) val Scala213 = "2.13.12" -ThisBuild / crossScalaVersions := Seq("2.12.18", Scala213, "3.3.1") +ThisBuild / crossScalaVersions := Seq("2.12.19", Scala213, "3.3.3") ThisBuild / scalaVersion := Scala213 lazy val root = project.in(file(".")).aggregate(prometheusMetrics).enablePlugins(NoPublishPlugin) -val http4sVersion = "0.23.25" +val http4sVersion = "0.23.26" val prometheusVersion = "0.16.0" val munitVersion = "0.7.29" val munitCatsEffectVersion = "1.0.7" diff --git a/project/build.properties b/project/build.properties index 04267b1..081fdbb 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.9.9 +sbt.version=1.10.0 diff --git a/prometheus-metrics/src/main/scala/org/http4s/metrics/prometheus/Prometheus.scala b/prometheus-metrics/src/main/scala/org/http4s/metrics/prometheus/Prometheus.scala index 8fe411c..62dc27f 100644 --- a/prometheus-metrics/src/main/scala/org/http4s/metrics/prometheus/Prometheus.scala +++ b/prometheus-metrics/src/main/scala/org/http4s/metrics/prometheus/Prometheus.scala @@ -16,14 +16,11 @@ package org.http4s.metrics.prometheus -import cats.Applicative import cats.data.NonEmptyList import cats.effect.Resource import cats.effect.Sync -import cats.syntax.apply._ -import cats.syntax.flatMap._ -import cats.syntax.functor._ -import io.prometheus.client._ +import cats.syntax.all.* +import io.prometheus.client.* import org.http4s.Method import org.http4s.Status import org.http4s.metrics.MetricsOps @@ -32,6 +29,8 @@ import org.http4s.metrics.TerminationType.Abnormal import org.http4s.metrics.TerminationType.Canceled import org.http4s.metrics.TerminationType.Error import org.http4s.metrics.TerminationType.Timeout +import org.http4s.metrics.prometheus.Prometheus.registerCollector +import org.http4s.metrics.prometheus.Prometheus.toFlatArray /** [[MetricsOps]] algebra capable of recording Prometheus metrics * @@ -81,62 +80,67 @@ import org.http4s.metrics.TerminationType.Timeout * * termination_type: Enumeration * values: abnormal, error, timeout + * + * custom labels: custom labels, provided by customLabelsAndValues.map(_._1) + * values: custom label values, provided by customLabelsAndValues.map(_._2) */ -object Prometheus { - def collectorRegistry[F[_]](implicit F: Sync[F]): Resource[F, CollectorRegistry] = - Resource.make(F.delay(new CollectorRegistry()))(cr => F.blocking(cr.clear())) +final class Prometheus[F[_]: Sync] private ( + private val prefix: String, + private val registry: CollectorRegistry, + private val sampleExemplar: F[Option[Map[String, String]]], + private val customLabelsAndValues: List[(String, String)], + private val responseDurationSecondsHistogramBuckets: NonEmptyList[Double], +) { self => + private def copy( + prefix: String = self.prefix, + registry: CollectorRegistry = self.registry, + sampleExemplar: F[Option[Map[String, String]]] = self.sampleExemplar, + customLabelsAndValues: List[(String, String)] = self.customLabelsAndValues, + responseDurationSecondsHistogramBuckets: NonEmptyList[Double] = + self.responseDurationSecondsHistogramBuckets, + ): Prometheus[F] = + new Prometheus[F]( + prefix = prefix, + registry = registry, + sampleExemplar = sampleExemplar, + customLabelsAndValues = customLabelsAndValues, + responseDurationSecondsHistogramBuckets = responseDurationSecondsHistogramBuckets, + ) - /** Creates a [[MetricsOps]] that supports Prometheus metrics - * - * @param registry a metrics collector registry - * @param prefix a prefix that will be added to all metrics - */ - def metricsOps[F[_]: Sync]( - registry: CollectorRegistry, - prefix: String = "org_http4s_server", - responseDurationSecondsHistogramBuckets: NonEmptyList[Double] = DefaultHistogramBuckets, - ): Resource[F, MetricsOps[F]] = - for { - metrics <- createMetricsCollection(registry, prefix, responseDurationSecondsHistogramBuckets) - } yield createMetricsOps(metrics, Applicative[F].pure(None)) + def withPrefix(prefix: String): Prometheus[F] = copy(prefix = prefix) + def withRegister(registry: CollectorRegistry): Prometheus[F] = copy(registry = registry) + + def withSampleExemplar(sampleExemplar: F[Option[Map[String, String]]]): Prometheus[F] = + copy(sampleExemplar = sampleExemplar) + + def withCustomLabelsAndValues( + customLabelsAndValues: List[(String, String)] + ): Prometheus[F] = copy(customLabelsAndValues = customLabelsAndValues) + + def withResponseDurationSecondsHistogramBuckets( + responseDurationSecondsHistogramBuckets: NonEmptyList[Double] + ): Prometheus[F] = + copy(responseDurationSecondsHistogramBuckets = responseDurationSecondsHistogramBuckets) + + /** Build a [[MetricsOps]] that supports Prometheus metrics */ + def build: Resource[F, MetricsOps[F]] = createMetricsCollection.map(createMetricsOps) + + private def createMetricsOps(metrics: MetricsCollection): MetricsOps[F] = { + val customLabelValues: List[String] = customLabelsAndValues.map(_._2) + val exemplarLabels: F[Option[Array[String]]] = sampleExemplar.map(_.map(toFlatArray)) - /** Creates a [[MetricsOps]] that supports Prometheus metrics and records exemplars. - * - * Warning: The sampler effect is responsible for producing exemplar labels that are valid for the underlying - * implementation as errors happening during metric recording will not be handled! For Prometheus version < 1.0, - * this means the combined length of keys and values may not exceed 128 characters and the parts must adhere - * to the label regex Prometheus defines. - * - * @param registry a metrics collector registry - * @param sampleExemplar an effect that returns the corresponding exemplar labels - * @param prefix a prefix that will be added to all metrics - */ - def metricsOpsWithExemplars[F[_]: Sync]( - registry: CollectorRegistry, - sampleExemplar: F[Option[Map[String, String]]], - prefix: String = "org_http4s_server", - responseDurationSecondsHistogramBuckets: NonEmptyList[Double] = DefaultHistogramBuckets, - ): Resource[F, MetricsOps[F]] = - for { - metrics <- createMetricsCollection(registry, prefix, responseDurationSecondsHistogramBuckets) - } yield createMetricsOps(metrics, sampleExemplar.map(_.map(toFlatArray))) - - private def createMetricsOps[F[_]]( - metrics: MetricsCollection, - exemplarLabels: F[Option[Array[String]]], - )(implicit F: Sync[F]): MetricsOps[F] = new MetricsOps[F] { override def increaseActiveRequests(classifier: Option[String]): F[Unit] = - F.delay { + Sync[F].delay { metrics.activeRequests - .labels(label(classifier)) + .labels(label(classifier) +: customLabelValues: _*) .inc() } override def decreaseActiveRequests(classifier: Option[String]): F[Unit] = - F.delay { + Sync[F].delay { metrics.activeRequests - .labels(label(classifier)) + .labels(label(classifier) +: customLabelValues: _*) .dec() } @@ -146,9 +150,14 @@ object Prometheus { classifier: Option[String], ): F[Unit] = exemplarLabels.flatMap { exemplarOpt => - F.delay { + Sync[F].delay { metrics.responseDuration - .labels(label(classifier), reportMethod(method), Phase.report(Phase.Headers)) + .labels( + label(classifier) +: + reportMethod(method) +: + Phase.report(Phase.Headers) +: + customLabelValues: _* + ) .observeWithExemplar( SimpleTimer.elapsedSecondsFromNanos(0, elapsed), exemplarOpt.orNull: _* @@ -163,15 +172,25 @@ object Prometheus { classifier: Option[String], ): F[Unit] = exemplarLabels.flatMap { exemplarOpt => - F.delay { + Sync[F].delay { metrics.responseDuration - .labels(label(classifier), reportMethod(method), Phase.report(Phase.Body)) + .labels( + label(classifier) +: + reportMethod(method) +: + Phase.report(Phase.Body) +: + customLabelValues: _* + ) .observeWithExemplar( SimpleTimer.elapsedSecondsFromNanos(0, elapsed), exemplarOpt.orNull: _* ) metrics.requests - .labels(label(classifier), reportMethod(method), reportStatus(status)) + .labels( + label(classifier) +: + reportMethod(method) +: + reportStatus(status) +: + customLabelValues: _* + ) .incWithExemplar(exemplarOpt.orNull: _*) } } @@ -190,12 +209,13 @@ object Prometheus { private def recordCanceled(elapsed: Long, classifier: Option[String]): F[Unit] = exemplarLabels.flatMap { exemplarOpt => - F.delay { + Sync[F].delay { metrics.abnormalTerminations .labels( - label(classifier), - AbnormalTermination.report(AbnormalTermination.Canceled), - label(Option.empty), + label(classifier) +: + AbnormalTermination.report(AbnormalTermination.Canceled) +: + label(Option.empty) +: + customLabelValues: _* ) .observeWithExemplar( SimpleTimer.elapsedSecondsFromNanos(0, elapsed), @@ -210,12 +230,13 @@ object Prometheus { cause: Throwable, ): F[Unit] = exemplarLabels.flatMap { exemplarOpt => - F.delay { + Sync[F].delay { metrics.abnormalTerminations .labels( - label(classifier), - AbnormalTermination.report(AbnormalTermination.Abnormal), - label(Option(cause.getClass.getName)), + label(classifier) +: + AbnormalTermination.report(AbnormalTermination.Abnormal) +: + label(Option(cause.getClass.getName)) +: + customLabelValues: _* ) .observeWithExemplar( SimpleTimer.elapsedSecondsFromNanos(0, elapsed), @@ -230,12 +251,13 @@ object Prometheus { cause: Throwable, ): F[Unit] = exemplarLabels.flatMap { exemplarOpt => - F.delay { + Sync[F].delay { metrics.abnormalTerminations .labels( - label(classifier), - AbnormalTermination.report(AbnormalTermination.Error), - label(Option(cause.getClass.getName)), + label(classifier) +: + AbnormalTermination.report(AbnormalTermination.Error) +: + label(Option(cause.getClass.getName)) +: + customLabelValues: _* ) .observeWithExemplar( SimpleTimer.elapsedSecondsFromNanos(0, elapsed), @@ -246,12 +268,13 @@ object Prometheus { private def recordTimeout(elapsed: Long, classifier: Option[String]): F[Unit] = exemplarLabels.flatMap { exemplarOpt => - F.delay { + Sync[F].delay { metrics.abnormalTerminations .labels( - label(classifier), - AbnormalTermination.report(AbnormalTermination.Timeout), - label(Option.empty), + label(classifier) +: + AbnormalTermination.report(AbnormalTermination.Timeout) +: + label(Option.empty) +: + customLabelValues: _* ) .observeWithExemplar( SimpleTimer.elapsedSecondsFromNanos(0, elapsed), @@ -286,19 +309,18 @@ object Prometheus { case _ => "other" } } + } + + private def createMetricsCollection: Resource[F, MetricsCollection] = { + val customLabels: List[String] = customLabelsAndValues.map(_._1) - private def createMetricsCollection[F[_]: Sync]( - registry: CollectorRegistry, - prefix: String, - responseDurationSecondsHistogramBuckets: NonEmptyList[Double], - ): Resource[F, MetricsCollection] = { val responseDuration: Resource[F, Histogram] = registerCollector( Histogram .build() .buckets(responseDurationSecondsHistogramBuckets.toList: _*) .name(prefix + "_" + "response_duration_seconds") .help("Response Duration in seconds.") - .labelNames("classifier", "method", "phase") + .labelNames("classifier" +: "method" +: "phase" +: customLabels: _*) .create(), registry, ) @@ -308,7 +330,7 @@ object Prometheus { .build() .name(prefix + "_" + "active_request_count") .help("Total Active Requests.") - .labelNames("classifier") + .labelNames("classifier" +: customLabels: _*) .create(), registry, ) @@ -318,7 +340,7 @@ object Prometheus { .build() .name(prefix + "_" + "request_count") .help("Total Requests.") - .labelNames("classifier", "method", "status") + .labelNames("classifier" +: "method" +: "status" +: customLabels: _*) .create(), registry, ) @@ -328,13 +350,58 @@ object Prometheus { .build() .name(prefix + "_" + "abnormal_terminations") .help("Total Abnormal Terminations.") - .labelNames("classifier", "termination_type", "cause") + .labelNames("classifier" +: "termination_type" +: "cause" +: customLabels: _*) .create(), registry, ) (responseDuration, activeRequests, requests, abnormalTerminations).mapN(MetricsCollection.apply) } +} + +object Prometheus { + def collectorRegistry[F[_]](implicit F: Sync[F]): Resource[F, CollectorRegistry] = + Resource.make(F.delay(new CollectorRegistry()))(cr => F.blocking(cr.clear())) + + /** Creates a [[MetricsOps]] that supports Prometheus metrics + * + * @param registry a metrics collector registry + * @param prefix a prefix that will be added to all metrics + */ + def metricsOps[F[_]: Sync]( + registry: CollectorRegistry, + prefix: String = "org_http4s_server", + responseDurationSecondsHistogramBuckets: NonEmptyList[Double] = DefaultHistogramBuckets, + ): Resource[F, MetricsOps[F]] = + Prometheus + .default(registry) + .withPrefix(prefix) + .withResponseDurationSecondsHistogramBuckets(responseDurationSecondsHistogramBuckets) + .build + + /** Creates a [[MetricsOps]] that supports Prometheus metrics and records exemplars. + * + * Warning: The sampler effect is responsible for producing exemplar labels that are valid for the underlying + * implementation as errors happening during metric recording will not be handled! For Prometheus version < 1.0, + * this means the combined length of keys and values may not exceed 128 characters and the parts must adhere + * to the label regex Prometheus defines. + * + * @param registry a metrics collector registry + * @param sampleExemplar an effect that returns the corresponding exemplar labels + * @param prefix a prefix that will be added to all metrics + */ + def metricsOpsWithExemplars[F[_]: Sync]( + registry: CollectorRegistry, + sampleExemplar: F[Option[Map[String, String]]], + prefix: String = "org_http4s_server", + responseDurationSecondsHistogramBuckets: NonEmptyList[Double] = DefaultHistogramBuckets, + ): Resource[F, MetricsOps[F]] = + Prometheus + .default[F](registry) + .withPrefix(prefix) + .withSampleExemplar(sampleExemplar) + .withResponseDurationSecondsHistogramBuckets(responseDurationSecondsHistogramBuckets) + .build private[prometheus] def registerCollector[F[_], C <: Collector]( collector: C, @@ -345,7 +412,7 @@ object Prometheus { ) // https://github.com/prometheus/client_java/blob/parent-0.6.0/simpleclient/src/main/java/io/prometheus/client/Histogram.java#L73 - private val DefaultHistogramBuckets: NonEmptyList[Double] = + val DefaultHistogramBuckets: NonEmptyList[Double] = NonEmptyList(.005, List(.01, .025, .05, .075, .1, .25, .5, .75, 1, 2.5, 5, 7.5, 10)) // Prometheus expects exemplars as alternating key-value strings: k1, v1, k2, v2, ... @@ -359,6 +426,16 @@ object Prometheus { } arr } + + def default[F[_]: Sync](registry: CollectorRegistry) = + new Prometheus[F]( + prefix = "org_http4s_server", + registry = registry, + sampleExemplar = Option.empty[Map[String, String]].pure, + customLabelsAndValues = List.empty, + responseDurationSecondsHistogramBuckets = DefaultHistogramBuckets, + ) + } final case class MetricsCollection( diff --git a/prometheus-metrics/src/test/scala/org/http4s/metrics/prometheus/PrometheusClientMetricsCustomLabelsSuite.scala b/prometheus-metrics/src/test/scala/org/http4s/metrics/prometheus/PrometheusClientMetricsCustomLabelsSuite.scala new file mode 100644 index 0000000..6a67778 --- /dev/null +++ b/prometheus-metrics/src/test/scala/org/http4s/metrics/prometheus/PrometheusClientMetricsCustomLabelsSuite.scala @@ -0,0 +1,239 @@ +/* + * Copyright 2018 http4s.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.http4s.metrics.prometheus + +import cats.effect.* +import io.prometheus.client.CollectorRegistry +import munit.CatsEffectSuite +import org.http4s.HttpApp +import org.http4s.Request +import org.http4s.Status +import org.http4s.client.Client +import org.http4s.client.UnexpectedStatus +import org.http4s.client.middleware.Metrics +import org.http4s.dsl.io.* +import org.http4s.metrics.prometheus.util.* +import org.http4s.syntax.literals.* + +import java.io.IOException +import java.util.concurrent.TimeoutException + +class PrometheusClientMetricsCustomLabelsSuite extends CatsEffectSuite { + val client: Client[IO] = Client.fromHttpApp[IO](HttpApp[IO](stub)) + + meteredClient().test( + "A http client with a prometheus metrics middleware should register a 2xx response" + ) { case (registry, client) => + client.expect[String]("/ok").attempt.map { resp => + assertEquals(cntWithCustLbl(registry, "2xx_responses", "client")(custLblVals), 1.0) + assertEquals(cntWithCustLbl(registry, "active_requests", "client")(custLblVals), 0.0) + assertEquals(cntWithCustLbl(registry, "2xx_headers_duration", "client")(custLblVals), 0.05) + assertEquals(cntWithCustLbl(registry, "2xx_total_duration", "client")(custLblVals), 0.1) + assertEquals(resp, Right("200 OK")) + } + } + + meteredClient().test( + "A http client with a prometheus metrics middleware should register a 4xx response" + ) { case (registry, client) => + client.expect[String]("/bad-request").attempt.map { resp => + assertEquals(cntWithCustLbl(registry, "4xx_responses", "client")(custLblVals), 1.0) + assertEquals(cntWithCustLbl(registry, "active_requests", "client")(custLblVals), 0.0) + assertEquals(cntWithCustLbl(registry, "4xx_headers_duration", "client")(custLblVals), 0.05) + assertEquals(cntWithCustLbl(registry, "4xx_total_duration", "client")(custLblVals), 0.1) + resp match { + case Left(UnexpectedStatus(status, _, _)) => assertEquals(status, Status.BadRequest) + case other => fail(s"Unexpected response status: $other") + } + } + } + + meteredClient().test( + "A http client with a prometheus metrics middleware should register a 5xx response" + ) { case (registry, client) => + client.expect[String]("/internal-server-error").attempt.map { resp => + assertEquals(cntWithCustLbl(registry, "5xx_responses", "client")(custLblVals), 1.0) + assertEquals(cntWithCustLbl(registry, "active_requests", "client")(custLblVals), 0.0) + assertEquals(cntWithCustLbl(registry, "5xx_headers_duration", "client")(custLblVals), 0.05) + assertEquals(cntWithCustLbl(registry, "5xx_total_duration", "client")(custLblVals), 0.1) + resp match { + case Left(UnexpectedStatus(status, _, _)) => + assertEquals(status, Status.InternalServerError) + case other => fail(s"Unexpected response status: $other") + } + } + } + + meteredClient().test( + "A http client with a prometheus metrics middleware should register a GET request" + ) { case (registry, client) => + client.expect[String]("/ok").attempt.map { resp => + assertEquals(resp, Right("200 OK")) + assertEquals(cntWithCustLbl(registry, "2xx_responses", "client", "get")(custLblVals), 1.0) + assertEquals(cntWithCustLbl(registry, "active_requests", "client", "get")(custLblVals), 0.0) + assertEquals( + cntWithCustLbl(registry, "2xx_headers_duration", "client", "get")(custLblVals), + 0.05, + ) + assertEquals( + cntWithCustLbl(registry, "2xx_total_duration", "client", "get")(custLblVals), + 0.1, + ) + } + } + + meteredClient().test( + "A http client with a prometheus metrics middleware should register a POST request" + ) { case (registry, client) => + client.expect[String](Request[IO](POST, uri"/ok")).attempt.map { resp => + assertEquals(resp, Right("200 OK")) + assertEquals(cntWithCustLbl(registry, "2xx_responses", "client", "post")(custLblVals), 1.0) + assertEquals(cntWithCustLbl(registry, "active_requests", "client", "post")(custLblVals), 0.0) + assertEquals( + cntWithCustLbl(registry, "2xx_headers_duration", "client", "post")(custLblVals), + 0.05, + ) + assertEquals( + cntWithCustLbl(registry, "2xx_total_duration", "client", "post")(custLblVals), + 0.1, + ) + } + } + + meteredClient().test( + "A http client with a prometheus metrics middleware should register a PUT request" + ) { case (registry, client) => + client.expect[String](Request[IO](PUT, uri"/ok")).attempt.map { resp => + assertEquals(resp, Right("200 OK")) + + assertEquals(cntWithCustLbl(registry, "2xx_responses", "client", "put")(custLblVals), 1.0) + assertEquals(cntWithCustLbl(registry, "active_requests", "client", "put")(custLblVals), 0.0) + assertEquals( + cntWithCustLbl(registry, "2xx_headers_duration", "client", "put")(custLblVals), + 0.05, + ) + assertEquals( + cntWithCustLbl(registry, "2xx_total_duration", "client", "put")(custLblVals), + 0.1, + ) + } + } + + meteredClient().test( + "A http client with a prometheus metrics middleware should register a DELETE request" + ) { case (registry, client) => + client.expect[String](Request[IO](DELETE, uri"/ok")).attempt.map { resp => + assertEquals(resp, Right("200 OK")) + + assertEquals(cntWithCustLbl(registry, "2xx_responses", "client", "delete")(custLblVals), 1.0) + assertEquals( + cntWithCustLbl(registry, "active_requests", "client", "delete")(custLblVals), + 0.0, + ) + assertEquals( + cntWithCustLbl(registry, "2xx_headers_duration", "client", "delete")(custLblVals), + 0.05, + ) + assertEquals( + cntWithCustLbl(registry, "2xx_total_duration", "client", "delete")(custLblVals), + 0.1, + ) + } + } + + meteredClient().test( + "A http client with a prometheus metrics middleware should register an error" + ) { case (registry, client) => + client.expect[String]("/error").attempt.map { + case Left(_: IOException) => + assertEquals( + cntWithCustLbl(registry, "errors", "client", cause = "java.io.IOException")(custLblVals), + 1.0, + ) + assertEquals(cntWithCustLbl(registry, "active_requests", "client")(custLblVals), 0.0) + case other => fail(s"Expected an IOException, got: $other") + } + } + + meteredClient().test( + "A http client with a prometheus metrics middleware should register a timeout" + ) { case (registry, client) => + client.expect[String]("/timeout").attempt.map { + case Left(_: TimeoutException) => + assertEquals(cntWithCustLbl(registry, "timeouts", "client")(custLblVals), 1.0) + assertEquals(cntWithCustLbl(registry, "active_requests", "client")(custLblVals), 0.0) + case other => fail(s"Expected a TimeoutException, got: $other") + } + } + + private val classifier = (_: Request[IO]) => Some("classifier") + + meteredClient(classifier).test("use the provided request classifier") { case (registry, client) => + client.expect[String]("/ok").attempt.map { resp => + assertEquals(resp, Right("200 OK")) + + assertEquals( + cntWithCustLbl(registry, "2xx_responses", "client", "get", "classifier")(custLblVals), + 1.0, + ) + assertEquals( + cntWithCustLbl(registry, "active_requests", "client", "get", "classifier")(custLblVals), + 0.0, + ) + assertEquals( + cntWithCustLbl(registry, "2xx_headers_duration", "client", "get", "classifier")( + custLblVals + ), + 0.05, + ) + assertEquals( + cntWithCustLbl(registry, "2xx_total_duration", "client", "get", "classifier")(custLblVals), + 0.1, + ) + } + } + + // This tests can't be easily done in munit-cats-effect as it wants to test after the Resource is freed + meteredClient().test("unregister collectors".ignore) { case (cr, client) => + client.expect[String]("/ok").as(cr).map { registry => + assertEquals(cntWithCustLbl(registry, "2xx_responses", "client")(custLblVals), 0.0) + assertEquals(cntWithCustLbl(registry, "active_requests", "client")(custLblVals), 0.0) + assertEquals(cntWithCustLbl(registry, "2xx_headers_duration", "client")(custLblVals), 0.0) + assertEquals(cntWithCustLbl(registry, "2xx_total_duration", "client")(custLblVals), 0.0) + } + } + + private def buildMeteredClient( + classifier: Request[IO] => Option[String] + ): Resource[IO, (CollectorRegistry, Client[IO])] = { + implicit val clock: Clock[IO] = FakeClock[IO] + + for { + registry <- Prometheus.collectorRegistry[IO] + metrics <- Prometheus + .default[IO](registry) + .withPrefix("client") + .withCustomLabelsAndValues(custLblVals) + .build + } yield (registry, Metrics(metrics, classifier)(client)) + } + + def meteredClient( + classifier: Request[IO] => Option[String] = (_: Request[IO]) => None + ): SyncIO[FunFixture[(CollectorRegistry, Client[IO])]] = + ResourceFixture(buildMeteredClient(classifier)) +} diff --git a/prometheus-metrics/src/test/scala/org/http4s/metrics/prometheus/PrometheusExemplarsCustomLabelsSuite.scala b/prometheus-metrics/src/test/scala/org/http4s/metrics/prometheus/PrometheusExemplarsCustomLabelsSuite.scala new file mode 100644 index 0000000..16ed109 --- /dev/null +++ b/prometheus-metrics/src/test/scala/org/http4s/metrics/prometheus/PrometheusExemplarsCustomLabelsSuite.scala @@ -0,0 +1,72 @@ +/* + * Copyright 2018 http4s.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.http4s.metrics.prometheus + +import cats.effect.* +import io.prometheus.client.CollectorRegistry +import io.prometheus.client.exemplars.Exemplar +import munit.CatsEffectSuite +import org.http4s.HttpApp +import org.http4s.client.Client +import org.http4s.client.middleware.Metrics +import org.http4s.metrics.prometheus.Prometheus.DefaultHistogramBuckets +import org.http4s.metrics.prometheus.util.* + +class PrometheusExemplarsCustomLabelsSuite extends CatsEffectSuite { + val client: Client[IO] = Client.fromHttpApp[IO](HttpApp[IO](stub)) + + meteredClient(exemplar = Map("trace_id" -> "123")).test( + "A http client with a prometheus metrics middleware should sample an exemplar" + ) { case (registry, client) => + client.expect[String]("/ok").map { resp => + val filter = new java.util.HashSet[String]() + filter.add("exemplars_request_count_total") + val exemplar: Exemplar = registry + .filteredMetricFamilySamples(filter) + .nextElement() + .samples + .get(0) + .exemplar + + assertEquals(exemplar.getLabelName(0), "trace_id") + assertEquals(exemplar.getLabelValue(0), "123") + assertEquals(resp, "200 OK") + } + } + + private def buildMeteredClient( + exemplar: Map[String, String] + ): Resource[IO, (CollectorRegistry, Client[IO])] = { + implicit val clock: Clock[IO] = FakeClock[IO] + + for { + registry <- Prometheus.collectorRegistry[IO] + metrics <- Prometheus + .default[IO](registry) + .withPrefix("exemplars") + .withSampleExemplar(IO.pure(Some(exemplar))) + .withCustomLabelsAndValues(custLblVals) + .withResponseDurationSecondsHistogramBuckets(DefaultHistogramBuckets) + .build + } yield (registry, Metrics[IO](metrics)(client)) + } + + def meteredClient( + exemplar: Map[String, String] + ): SyncIO[FunFixture[(CollectorRegistry, Client[IO])]] = + ResourceFixture(buildMeteredClient(exemplar)) +} diff --git a/prometheus-metrics/src/test/scala/org/http4s/metrics/prometheus/PrometheusServerMetricsCustomLabelsSuite.scala b/prometheus-metrics/src/test/scala/org/http4s/metrics/prometheus/PrometheusServerMetricsCustomLabelsSuite.scala new file mode 100644 index 0000000..f54cc3a --- /dev/null +++ b/prometheus-metrics/src/test/scala/org/http4s/metrics/prometheus/PrometheusServerMetricsCustomLabelsSuite.scala @@ -0,0 +1,305 @@ +/* + * Copyright 2018 http4s.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.http4s.metrics.prometheus + +import cats.effect.* +import io.prometheus.client.CollectorRegistry +import munit.CatsEffectSuite +import org.http4s.HttpApp +import org.http4s.HttpRoutes +import org.http4s.Method.GET +import org.http4s.Request +import org.http4s.Status +import org.http4s.dsl.io.* +import org.http4s.metrics.prometheus.util.* +import org.http4s.server.middleware.Metrics +import org.http4s.syntax.all.* + +class PrometheusServerMetricsCustomLabelsSuite extends CatsEffectSuite { + + private val testRoutes = HttpRoutes.of[IO](stub) + + // "A http routes with a prometheus metrics middleware" should { + meteredRoutes().test( + "A http routes with a prometheus metrics middleware should register a 2xx response" + ) { case (registry, routes) => + val req = Request[IO](uri = uri"/ok") + + val resp = routes.run(req) + resp.flatMap { r => + r.as[String].map { b => + assertEquals(b, "200 OK") + assertEquals(r.status, Status.Ok) + assertEquals(cntWithCustLbl(registry, "2xx_responses", "server")(custLblVals), 1.0) + assertEquals(cntWithCustLbl(registry, "active_requests", "server")(custLblVals), 0.0) + assertEquals(cntWithCustLbl(registry, "2xx_headers_duration", "server")(custLblVals), 0.05) + assertEquals(cntWithCustLbl(registry, "2xx_total_duration", "server")(custLblVals), 0.1) + } + } + } + + meteredRoutes().test( + "A http routes with a prometheus metrics middleware should register a 4xx response" + ) { case (registry, routes) => + val req = Request[IO](uri = uri"/bad-request") + + routes.run(req).flatMap { r => + r.as[String].map { b => + assertEquals(r.status, Status.BadRequest) + assertEquals(b, "400 Bad Request") + + assertEquals(cntWithCustLbl(registry, "4xx_responses", "server")(custLblVals), 1.0) + assertEquals(cntWithCustLbl(registry, "active_requests", "server")(custLblVals), 0.0) + assertEquals(cntWithCustLbl(registry, "4xx_headers_duration", "server")(custLblVals), 0.05) + assertEquals(cntWithCustLbl(registry, "4xx_total_duration", "server")(custLblVals), 0.1) + } + } + } + + meteredRoutes().test( + "A http routes with a prometheus metrics middleware should register a 5xx response" + ) { case (registry, routes) => + val req = Request[IO](uri = uri"/internal-server-error") + + routes.run(req).flatMap { r => + r.as[String].map { b => + assertEquals(r.status, Status.InternalServerError) + assertEquals(b, "500 Internal Server Error") + + assertEquals(cntWithCustLbl(registry, "5xx_responses", "server")(custLblVals), 1.0) + assertEquals(cntWithCustLbl(registry, "active_requests", "server")(custLblVals), 0.0) + assertEquals(cntWithCustLbl(registry, "5xx_headers_duration", "server")(custLblVals), 0.05) + assertEquals(cntWithCustLbl(registry, "5xx_total_duration", "server")(custLblVals), 0.1) + } + } + } + + meteredRoutes().test( + "A http routes with a prometheus metrics middleware should register a GET request" + ) { case (registry, routes) => + val req = Request[IO](method = GET, uri = uri"/ok") + + routes.run(req).flatMap { r => + r.as[String].map { b => + assertEquals(r.status, Status.Ok) + assertEquals(b, "200 OK") + + assertEquals(cntWithCustLbl(registry, "2xx_responses", "server", "get")(custLblVals), 1.0) + assertEquals(cntWithCustLbl(registry, "active_requests", "server", "get")(custLblVals), 0.0) + assertEquals( + cntWithCustLbl(registry, "2xx_headers_duration", "server", "get")(custLblVals), + 0.05, + ) + assertEquals( + cntWithCustLbl(registry, "2xx_total_duration", "server", "get")(custLblVals), + 0.1, + ) + } + } + } + + meteredRoutes().test( + "A http routes with a prometheus metrics middleware should register a POST request" + ) { case (registry, routes) => + val req = Request[IO](method = POST, uri = uri"/ok") + + routes.run(req).flatMap { r => + r.as[String].map { b => + assertEquals(r.status, Status.Ok) + assertEquals(b, "200 OK") + + assertEquals(cntWithCustLbl(registry, "2xx_responses", "server", "post")(custLblVals), 1.0) + assertEquals( + cntWithCustLbl(registry, "active_requests", "server", "post")(custLblVals), + 0.0, + ) + assertEquals( + cntWithCustLbl(registry, "2xx_headers_duration", "server", "post")(custLblVals), + 0.05, + ) + assertEquals( + cntWithCustLbl(registry, "2xx_total_duration", "server", "post")(custLblVals), + 0.1, + ) + } + } + } + + meteredRoutes().test( + "A http routes with a prometheus metrics middleware should register a PUT request" + ) { case (registry, routes) => + val req = Request[IO](method = PUT, uri = uri"/ok") + + routes.run(req).flatMap { r => + r.as[String].map { b => + assertEquals(r.status, Status.Ok) + assertEquals(b, "200 OK") + + assertEquals(cntWithCustLbl(registry, "2xx_responses", "server", "put")(custLblVals), 1.0) + assertEquals(cntWithCustLbl(registry, "active_requests", "server", "put")(custLblVals), 0.0) + assertEquals( + cntWithCustLbl(registry, "2xx_headers_duration", "server", "put")(custLblVals), + 0.05, + ) + assertEquals( + cntWithCustLbl(registry, "2xx_total_duration", "server", "put")(custLblVals), + 0.1, + ) + } + } + } + + meteredRoutes().test( + "A http routes with a prometheus metrics middleware should register a DELETE request" + ) { case (registry, routes) => + val req = Request[IO](method = DELETE, uri = uri"/ok") + + routes.run(req).flatMap { r => + r.as[String].map { b => + assertEquals(r.status, Status.Ok) + assertEquals(b, "200 OK") + + assertEquals( + cntWithCustLbl(registry, "2xx_responses", "server", "delete")(custLblVals), + 1.0, + ) + assertEquals( + cntWithCustLbl(registry, "active_requests", "server", "delete")(custLblVals), + 0.0, + ) + assertEquals( + cntWithCustLbl(registry, "2xx_headers_duration", "server", "delete")(custLblVals), + 0.05, + ) + assertEquals( + cntWithCustLbl(registry, "2xx_total_duration", "server", "delete")(custLblVals), + 0.1, + ) + } + } + } + + meteredRoutes().test( + "A http routes with a prometheus metrics middleware should register an error" + ) { case (registry, routes) => + val req = Request[IO](method = GET, uri = uri"/error") + + routes.run(req).attempt.map { r => + assert(r.isLeft) + + assertEquals( + cntWithCustLbl(registry, "errors", "server", cause = "java.io.IOException")(custLblVals), + 1.0, + ) + assertEquals(cntWithCustLbl(registry, "active_requests", "server")(custLblVals), 0.0) + assertEquals(cntWithCustLbl(registry, "5xx_headers_duration", "server")(custLblVals), 0.05) + assertEquals(cntWithCustLbl(registry, "5xx_total_duration", "server")(custLblVals), 0.05) + } + } + + meteredRoutes().test( + "A http routes with a prometheus metrics middleware should register an abnormal termination" + ) { case (registry, routes) => + val req = Request[IO](method = GET, uri = uri"/abnormal-termination") + + routes.run(req).flatMap { r => + r.body.attempt.compile.lastOrError.map { b => + assertEquals(r.status, Status.Ok) + assert(b.isLeft) + + assertEquals( + cntWithCustLbl( + registry, + "abnormal_terminations", + "server", + cause = "java.lang.RuntimeException", + )(custLblVals), + 1.0, + ) + assertEquals(cntWithCustLbl(registry, "active_requests", "server")(custLblVals), 0.0) + assertEquals(cntWithCustLbl(registry, "2xx_headers_duration", "server")(custLblVals), 0.05) + assertEquals(cntWithCustLbl(registry, "2xx_total_duration", "server")(custLblVals), 0.1) + } + } + } + + private val classifierFunc = (_: Request[IO]) => Some("classifier") + + meteredRoutes(classifierFunc).test("use the provided request classifier") { + case (registry, routes) => + val req = Request[IO](uri = uri"/ok") + + routes.run(req).flatMap { r => + r.as[String].map { b => + assertEquals(r.status, Status.Ok) + assertEquals(b, "200 OK") + + assertEquals( + cntWithCustLbl(registry, "2xx_responses", "server", "get", "classifier")(custLblVals), + 1.0, + ) + assertEquals( + cntWithCustLbl(registry, "active_requests", "server", "get", "classifier")(custLblVals), + 0.0, + ) + assertEquals( + cntWithCustLbl(registry, "2xx_headers_duration", "server", "get", "classifier")( + custLblVals + ), + 0.05, + ) + assertEquals( + cntWithCustLbl(registry, "2xx_total_duration", "server", "get", "classifier")( + custLblVals + ), + 0.1, + ) + } + } + } + + // This tests can't be easily done in munit-cats-effect as it wants to test after the Resource is freed + meteredRoutes().test("unregister collectors".ignore) { case (cr, routes) => + val req = Request[IO](uri = uri"/ok") + + routes.run(req).as(cr).map { registry => + assertEquals(cntWithCustLbl(registry, "2xx_responses", "server")(custLblVals), 0.0) + assertEquals(cntWithCustLbl(registry, "active_requests", "server")(custLblVals), 0.0) + assertEquals(cntWithCustLbl(registry, "2xx_headers_duration", "server")(custLblVals), 0.0) + assertEquals(cntWithCustLbl(registry, "2xx_total_duration", "server")(custLblVals), 0.0) + } + } + + def buildMeteredRoutes( + classifier: Request[IO] => Option[String] = (_: Request[IO]) => None + ): Resource[IO, (CollectorRegistry, HttpApp[IO])] = { + implicit val clock: Clock[IO] = FakeClock[IO] + for { + registry <- Prometheus.collectorRegistry[IO] + metrics <- Prometheus + .default[IO](registry) + .withPrefix("server") + .withCustomLabelsAndValues(custLblVals) + .build + } yield (registry, Metrics(metrics, classifierF = classifier)(testRoutes).orNotFound) + } + + def meteredRoutes( + classifier: Request[IO] => Option[String] = (_: Request[IO]) => None + ): SyncIO[FunFixture[(CollectorRegistry, HttpApp[IO])]] = + ResourceFixture(buildMeteredRoutes(classifier)) +} diff --git a/prometheus-metrics/src/test/scala/org/http4s/metrics/prometheus/util.scala b/prometheus-metrics/src/test/scala/org/http4s/metrics/prometheus/util.scala index daf145f..9d225a7 100644 --- a/prometheus-metrics/src/test/scala/org/http4s/metrics/prometheus/util.scala +++ b/prometheus-metrics/src/test/scala/org/http4s/metrics/prometheus/util.scala @@ -24,7 +24,7 @@ import io.prometheus.client.CollectorRegistry import org.http4s.Method.GET import org.http4s.Request import org.http4s.Response -import org.http4s.dsl.io._ +import org.http4s.dsl.io.* import java.io.IOException import java.util.concurrent.TimeUnit @@ -32,6 +32,12 @@ import java.util.concurrent.TimeoutException import scala.concurrent.duration.FiniteDuration object util { + val custLblVals: List[(String, String)] = List( + "provider" -> "Comcast", + "customLabel2" -> "test-custom-label12", + "customLabel3" -> "test-custom-label13", + ) + def stub: PartialFunction[Request[IO], IO[Response[IO]]] = { case (GET | POST | PUT | DELETE) -> Root / "ok" => Ok("200 OK") @@ -61,95 +67,117 @@ object util { classifier: String = "", cause: String = "", ): Double = + cntWithCustLbl( + registry, + name, + prefix, + method, + classifier, + cause, + )(Seq.empty) + + def cntWithCustLbl( + registry: CollectorRegistry, + name: String, + prefix: String, + method: String = "get", + classifier: String = "", + cause: String = "", + )( + customLabelAndValues: Seq[(String, String)] + ): Double = { + val customLabels = customLabelAndValues.map(_._1) + val customValues: Seq[String] = customLabelAndValues.map(_._2) name match { case "active_requests" => registry.getSampleValue( s"${prefix}_active_request_count", - Array("classifier"), - Array(classifier), + Array("classifier") ++ customLabels, + Array(classifier) ++ customValues, ) case "2xx_responses" => registry .getSampleValue( s"${prefix}_request_count_total", - Array("classifier", "method", "status"), - Array(classifier, method, "2xx"), + Array("classifier", "method", "status") ++ customLabels, + Array(classifier, method, "2xx") ++ customValues, ) case "2xx_headers_duration" => registry.getSampleValue( s"${prefix}_response_duration_seconds_sum", - Array("classifier", "method", "phase"), - Array(classifier, method, "headers"), + Array("classifier", "method", "phase") ++ customLabels, + Array(classifier, method, "headers") ++ customValues, ) case "2xx_total_duration" => registry.getSampleValue( s"${prefix}_response_duration_seconds_sum", - Array("classifier", "method", "phase"), - Array(classifier, method, "body"), + Array("classifier", "method", "phase") ++ customLabels, + Array(classifier, method, "body") ++ customValues, ) case "4xx_responses" => registry .getSampleValue( s"${prefix}_request_count_total", - Array("classifier", "method", "status"), - Array(classifier, method, "4xx"), + Array("classifier", "method", "status") ++ customLabels, + Array(classifier, method, "4xx") ++ customValues, ) case "4xx_headers_duration" => registry.getSampleValue( s"${prefix}_response_duration_seconds_sum", - Array("classifier", "method", "phase"), - Array(classifier, method, "headers"), + Array("classifier", "method", "phase") ++ customLabels, + Array(classifier, method, "headers") ++ customValues, ) case "4xx_total_duration" => registry.getSampleValue( s"${prefix}_response_duration_seconds_sum", - Array("classifier", "method", "phase"), - Array(classifier, method, "body"), + Array("classifier", "method", "phase") ++ customLabels, + Array(classifier, method, "body") ++ customValues, ) case "5xx_responses" => registry .getSampleValue( s"${prefix}_request_count_total", - Array("classifier", "method", "status"), - Array(classifier, method, "5xx"), + Array("classifier", "method", "status") ++ customLabels, + Array(classifier, method, "5xx") ++ customValues, ) case "5xx_headers_duration" => registry.getSampleValue( s"${prefix}_response_duration_seconds_sum", - Array("classifier", "method", "phase"), - Array(classifier, method, "headers"), + Array("classifier", "method", "phase") ++ customLabels, + Array(classifier, method, "headers") ++ customValues, ) case "5xx_total_duration" => registry.getSampleValue( s"${prefix}_response_duration_seconds_sum", - Array("classifier", "method", "phase"), - Array(classifier, method, "body"), + Array("classifier", "method", "phase") ++ customLabels, + Array(classifier, method, "body") ++ customValues, ) case "errors" => registry.getSampleValue( s"${prefix}_abnormal_terminations_count", - Array("classifier", "termination_type", "cause"), - Array(classifier, "error", cause), + Array("classifier", "termination_type", "cause") ++ customLabels, + Array(classifier, "error", cause) ++ customValues, ) case "timeouts" => registry.getSampleValue( s"${prefix}_abnormal_terminations_count", - Array("classifier", "termination_type", "cause"), - Array(classifier, "timeout", cause), + Array("classifier", "termination_type", "cause") ++ customLabels, + Array(classifier, "timeout", cause) ++ customValues, ) case "abnormal_terminations" => registry.getSampleValue( s"${prefix}_abnormal_terminations_count", - Array("classifier", "termination_type", "cause"), - Array(classifier, "abnormal", cause), + Array("classifier", "termination_type", "cause") ++ customLabels, + Array(classifier, "abnormal", cause) ++ customValues, ) case "cancels" => registry.getSampleValue( s"${prefix}_abnormal_terminations_count", - Array("classifier", "termination_type", "cause"), - Array(classifier, "cancel", cause), + Array("classifier", "termination_type", "cause") ++ customLabels, + Array(classifier, "cancel", cause) ++ customValues, ) } + } object FakeClock { def apply[F[_]: Sync]: Clock[F] =