From 4292174088b4c1f5bd317c6a5c5a1be6f60eef9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20F=C4=85derski?= Date: Thu, 1 Aug 2024 16:59:05 +0200 Subject: [PATCH] Rewrite victoriametrics client to pure promql (#1882) * Initial rewriting of victoriametrics client to the pure PromQl * Get rid of hacky code related to handling status codes query in one requests * Add few fixes * Add more tests for caching unavailable metrics * Debug reason of wiremock server timestous * Fix failing prometheus metrics tests * Add code refactor * Add metrics for prometheus requests * Improve creating prometheus client dependencies * Improve creating prometheus client dependencies * Fix handling errors when one of the requests fails * Remove unnecessary log * CR fixes * CR fixes * CR fixes --- build.gradle | 2 +- docs/docs/configuration/metrics.md | 4 +- .../tech/hermes/api/MetricDecimalValue.java | 13 ++ .../ExternalMonitoringClientProperties.java | 22 +- .../ExternalMonitoringConfiguration.java | 38 +++- .../subscription/SubscriptionService.java | 2 + .../metrics/MonitoringMetricsContainer.java | 12 +- .../prometheus/CachingPrometheusClient.java | 30 ++- .../prometheus/PrometheusClient.java | 39 +++- ...er.java => PrometheusMetricsProvider.java} | 90 ++++---- .../prometheus/PrometheusResponse.java | 26 --- .../RestTemplatePrometheusClient.java | 143 +++++++------ ...edSubscriptionMetricsRepositoryTest.groovy | 47 ++-- ...heusBasedTopicMetricsRepositoryTest.groovy | 19 +- .../CachingPrometheusClientTest.groovy | 40 +++- .../RestTemplatePrometheusClientTest.groovy | 200 ++++++++++++++---- .../__files/full_response.json | 156 -------------- .../__files/partial_response.json | 115 ---------- .../__files/prometheus_empty_response.json | 7 + ...scription_2xx_http_status_codes_total.json | 19 ++ ...scription_4xx_http_status_codes_total.json | 19 ++ ...scription_5xx_http_status_codes_total.json | 19 ++ .../__files/subscription_batches_total.json | 19 ++ .../__files/subscription_delivered_total.json | 19 ++ .../subscription_other_errors_total.json | 19 ++ .../__files/subscription_retries_total.json | 19 ++ .../subscription_throughput_bytes_total.json | 19 ++ .../__files/subscription_timeouts_total.json | 19 ++ .../prometheus/PrometheusExtension.java | 73 ++++--- .../prometheus/PrometheusResponse.java | 9 +- .../prometheus/SubscriptionMetrics.java | 32 ++- .../prometheus/TopicMetrics.java | 34 ++- .../management/QueryEndpointTest.java | 2 +- .../SubscriptionManagementTest.java | 5 +- 34 files changed, 741 insertions(+), 590 deletions(-) rename hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/{VictoriaMetricsMetricsProvider.java => PrometheusMetricsProvider.java} (50%) delete mode 100644 hermes-management/src/test/resources/prometheus-stubs/__files/full_response.json delete mode 100644 hermes-management/src/test/resources/prometheus-stubs/__files/partial_response.json create mode 100644 hermes-management/src/test/resources/prometheus-stubs/__files/prometheus_empty_response.json create mode 100644 hermes-management/src/test/resources/prometheus-stubs/__files/subscription_2xx_http_status_codes_total.json create mode 100644 hermes-management/src/test/resources/prometheus-stubs/__files/subscription_4xx_http_status_codes_total.json create mode 100644 hermes-management/src/test/resources/prometheus-stubs/__files/subscription_5xx_http_status_codes_total.json create mode 100644 hermes-management/src/test/resources/prometheus-stubs/__files/subscription_batches_total.json create mode 100644 hermes-management/src/test/resources/prometheus-stubs/__files/subscription_delivered_total.json create mode 100644 hermes-management/src/test/resources/prometheus-stubs/__files/subscription_other_errors_total.json create mode 100644 hermes-management/src/test/resources/prometheus-stubs/__files/subscription_retries_total.json create mode 100644 hermes-management/src/test/resources/prometheus-stubs/__files/subscription_throughput_bytes_total.json create mode 100644 hermes-management/src/test/resources/prometheus-stubs/__files/subscription_timeouts_total.json diff --git a/build.gradle b/build.gradle index 0c855d7a33..3d68efc4d1 100644 --- a/build.gradle +++ b/build.gradle @@ -56,7 +56,7 @@ allprojects { curator : '5.4.0', dropwizard_metrics: '4.2.25', micrometer_metrics: '1.12.5', - wiremock : '3.5.2', + wiremock : '3.9.0', spock : '2.4-M4-groovy-4.0', groovy : '4.0.21', avro : '1.11.3', diff --git a/docs/docs/configuration/metrics.md b/docs/docs/configuration/metrics.md index 86f78fe134..5e172d4ebf 100644 --- a/docs/docs/configuration/metrics.md +++ b/docs/docs/configuration/metrics.md @@ -12,9 +12,9 @@ Option | Description {modulePrefix}.metrics.prometheus.step | The step size to use in computing windowed statistics | 60s {modulePrefix}.metrics.prometheus.descriptions | If meter descriptions should be sent to Prometheus | true -In order to be able to access basic metrics via Management API, it needs to be configured to reach VictoriaMetrics API: +In order to be able to access basic metrics via Management API, it needs to be configured to reach Prometheus API: Option | Description | Default value ------------------------------------------|-----------------------------------------------| ------------- prometheus.client.enabled | Should fetch external metrics from Prometheus | true -prometheus.client.externalMonitoringUrl | URI to VictoriaMetrics HTTP API | http://localhost:18090 +prometheus.client.externalMonitoringUrl | URI to Prometheus HTTP API | http://localhost:18090 diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/MetricDecimalValue.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/MetricDecimalValue.java index ff033e775a..c412b1c3a4 100644 --- a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/MetricDecimalValue.java +++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/MetricDecimalValue.java @@ -10,6 +10,7 @@ public class MetricDecimalValue { private static final String UNAVAILABLE_STRING = "unavailable"; private static final MetricDecimalValue UNAVAILABLE = new MetricDecimalValue(false, "-1.0"); + private static final MetricDecimalValue DEFAULT_VALUE = new MetricDecimalValue(true, "0.0"); private final boolean available; private final String value; @@ -23,6 +24,10 @@ public static MetricDecimalValue unavailable() { return UNAVAILABLE; } + public static MetricDecimalValue defaultValue() { + return DEFAULT_VALUE; + } + public static MetricDecimalValue of(String value) { return new MetricDecimalValue(true, value); } @@ -65,4 +70,12 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(available, value); } + + @Override + public String toString() { + return "MetricDecimalValue{" + + "available=" + available + + ", value='" + value + '\'' + + '}'; + } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ExternalMonitoringClientProperties.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ExternalMonitoringClientProperties.java index 92f20ea63c..4b50c67e32 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ExternalMonitoringClientProperties.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ExternalMonitoringClientProperties.java @@ -8,14 +8,18 @@ public class ExternalMonitoringClientProperties { private int maxConnections = 100; - private int maxConnectionsPerRoute = 10; + private int maxConnectionsPerRoute = 100; private int cacheTtlSeconds = 55; private int cacheSize = 100_000; + private int fetchingTimeoutMillis = 5000; + private int fetchingThreads = 30; + private String externalMonitoringUrl = "http://localhost:18090"; + public int getConnectionTimeoutMillis() { return connectionTimeoutMillis; } @@ -71,4 +75,20 @@ public String getExternalMonitoringUrl() { public void setExternalMonitoringUrl(String externalMonitoringUrl) { this.externalMonitoringUrl = externalMonitoringUrl; } + + public int getFetchingThreads() { + return fetchingThreads; + } + + public void setFetchingThreads(int fetchingThreads) { + this.fetchingThreads = fetchingThreads; + } + + public int getFetchingTimeoutMillis() { + return fetchingTimeoutMillis; + } + + public void setFetchingTimeoutMillis(int fetchingTimeoutMillis) { + this.fetchingTimeoutMillis = fetchingTimeoutMillis; + } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ExternalMonitoringConfiguration.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ExternalMonitoringConfiguration.java index ed3148d5b3..96149ec69f 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ExternalMonitoringConfiguration.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ExternalMonitoringConfiguration.java @@ -1,5 +1,7 @@ package pl.allegro.tech.hermes.management.config; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.micrometer.core.instrument.MeterRegistry; import org.apache.hc.client5.http.classic.HttpClient; import org.apache.hc.client5.http.config.RequestConfig; import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; @@ -7,6 +9,7 @@ import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder; import org.apache.hc.core5.util.Timeout; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -15,31 +18,40 @@ import org.springframework.web.client.RestTemplate; import pl.allegro.tech.hermes.management.infrastructure.prometheus.CachingPrometheusClient; import pl.allegro.tech.hermes.management.infrastructure.prometheus.PrometheusClient; +import pl.allegro.tech.hermes.management.infrastructure.prometheus.PrometheusMetricsProvider; import pl.allegro.tech.hermes.management.infrastructure.prometheus.RestTemplatePrometheusClient; -import pl.allegro.tech.hermes.management.infrastructure.prometheus.VictoriaMetricsMetricsProvider; import java.net.URI; +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static com.google.common.base.Ticker.systemTicker; @Configuration +@ConditionalOnProperty(value = "prometheus.client.enabled", havingValue = "true") public class ExternalMonitoringConfiguration { @Bean - @ConditionalOnProperty(value = "prometheus.client.enabled", havingValue = "true") - public VictoriaMetricsMetricsProvider prometheusMetricsProvider(PrometheusClient prometheusClient, - PrometheusMonitoringClientProperties properties) { - return new VictoriaMetricsMetricsProvider(prometheusClient, + public PrometheusMetricsProvider prometheusMetricsProvider(PrometheusClient prometheusClient, + PrometheusMonitoringClientProperties properties) { + return new PrometheusMetricsProvider(prometheusClient, properties.getConsumersMetricsPrefix(), properties.getFrontendMetricsPrefix(), properties.getAdditionalFilters()); } @Bean - @ConditionalOnProperty(value = "prometheus.client.enabled", havingValue = "true") public PrometheusClient prometheusClient(@Qualifier("monitoringRestTemplate") RestTemplate monitoringRestTemplate, - PrometheusMonitoringClientProperties clientProperties) { + PrometheusMonitoringClientProperties clientProperties, + @Qualifier("prometheusFetcherExecutorService") ExecutorService executorService, + MeterRegistry meterRegistry) { RestTemplatePrometheusClient underlyingPrometheusClient = - new RestTemplatePrometheusClient(monitoringRestTemplate, URI.create(clientProperties.getExternalMonitoringUrl())); + new RestTemplatePrometheusClient( + monitoringRestTemplate, + URI.create(clientProperties.getExternalMonitoringUrl()), + executorService, + Duration.ofMillis(clientProperties.getFetchingTimeoutMillis()), + meterRegistry); return new CachingPrometheusClient( underlyingPrometheusClient, systemTicker(), @@ -49,6 +61,7 @@ public PrometheusClient prometheusClient(@Qualifier("monitoringRestTemplate") Re } @Bean("monitoringRestTemplate") + @ConditionalOnMissingBean(name = "monitoringRestTemplate") public RestTemplate restTemplate(ExternalMonitoringClientProperties clientProperties) { PoolingHttpClientConnectionManager connectionManager = PoolingHttpClientConnectionManagerBuilder.create() .setMaxConnTotal(clientProperties.getMaxConnections()) @@ -66,7 +79,14 @@ public RestTemplate restTemplate(ExternalMonitoringClientProperties clientProper .build(); ClientHttpRequestFactory clientHttpRequestFactory = new HttpComponentsClientHttpRequestFactory(client); - return new RestTemplate(clientHttpRequestFactory); } + + @Bean("prometheusFetcherExecutorService") + @ConditionalOnMissingBean(name = "prometheusFetcherExecutorService") + public ExecutorService executorService(ExternalMonitoringClientProperties clientProperties) { + return Executors.newFixedThreadPool(clientProperties.getFetchingThreads(), + new ThreadFactoryBuilder().setNameFormat("prometheus-metrics-fetcher-%d").build() + ); + } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java index 82b65febd6..4cbfeae6ec 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java @@ -344,8 +344,10 @@ private List getUnhealthyList(Collection()); } - public MonitoringMetricsContainer addMetricValue(String metricPath, MetricDecimalValue value) { + public MonitoringMetricsContainer addMetricValue(String query, MetricDecimalValue value) { if (!isAvailable) { throw new IllegalStateException("Adding value to unavailable metrics container"); } - this.metrics.put(metricPath, value); + this.metrics.put(query, value); return this; } - public MetricDecimalValue metricValue(String metricPath) { + public MetricDecimalValue metricValue(String query) { if (!isAvailable) { return MetricDecimalValue.unavailable(); } - return metrics.getOrDefault(metricPath, DEFAULT_VALUE); + return metrics.getOrDefault(query, DEFAULT_VALUE); + } + + public boolean hasUnavailableMetrics() { + return !isAvailable || metrics.entrySet().stream().anyMatch(e -> !e.getValue().isAvailable()); } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/CachingPrometheusClient.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/CachingPrometheusClient.java index 942beb67a7..36675b8b6f 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/CachingPrometheusClient.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/CachingPrometheusClient.java @@ -6,14 +6,25 @@ import com.google.common.cache.LoadingCache; import pl.allegro.tech.hermes.management.infrastructure.metrics.MonitoringMetricsContainer; +import java.util.List; import java.util.concurrent.ExecutionException; import static java.util.concurrent.TimeUnit.SECONDS; + public class CachingPrometheusClient implements PrometheusClient { private final PrometheusClient underlyingPrometheusClient; - private final LoadingCache prometheusMetricsCache; + /* + Metrics will always be requested in the context of a single subscription/topic. The single sub/topic will + always result in the same list of metrics queries. There is no overlapping between metrics used in the context of + topic or subscriptions. That's why it is safe to use a list of queries as a caching key. + + Maybe it will be worth to cache it per query except of queries when there will be too much overhead + of refreshing all sub/topic metrics if the single fetch fails (currently we invalidate whole metrics container + when one of the sub metric is unavailable) + */ + private final LoadingCache, MonitoringMetricsContainer> prometheusMetricsCache; public CachingPrometheusClient(PrometheusClient underlyingPrometheusClient, Ticker ticker, long cacheTtlInSeconds, long cacheSize) { @@ -26,19 +37,24 @@ public CachingPrometheusClient(PrometheusClient underlyingPrometheusClient, Tick } @Override - public MonitoringMetricsContainer readMetrics(String query) { + public MonitoringMetricsContainer readMetrics(List queries) { try { - return prometheusMetricsCache.get(query); + MonitoringMetricsContainer monitoringMetricsContainer = prometheusMetricsCache.get(List.copyOf(queries)); + if (monitoringMetricsContainer.hasUnavailableMetrics()) { + // try to reload the on the next fetch + prometheusMetricsCache.invalidate(queries); + } + return monitoringMetricsContainer; } catch (ExecutionException e) { - // should never happen because the loader does not throw any checked exceptions + // should never happen because the loader does not throw any exceptions throw new RuntimeException(e); } } - private class PrometheusMetricsCacheLoader extends CacheLoader { + private class PrometheusMetricsCacheLoader extends CacheLoader, MonitoringMetricsContainer> { @Override - public MonitoringMetricsContainer load(String query) { - return underlyingPrometheusClient.readMetrics(query); + public MonitoringMetricsContainer load(List queries) { + return underlyingPrometheusClient.readMetrics(queries); } } } \ No newline at end of file diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/PrometheusClient.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/PrometheusClient.java index 6c5c4a5493..33170fb9ee 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/PrometheusClient.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/PrometheusClient.java @@ -1,7 +1,44 @@ package pl.allegro.tech.hermes.management.infrastructure.prometheus; +import pl.allegro.tech.hermes.api.SubscriptionName; +import pl.allegro.tech.hermes.api.TopicName; import pl.allegro.tech.hermes.management.infrastructure.metrics.MonitoringMetricsContainer; +import java.util.List; + + public interface PrometheusClient { - MonitoringMetricsContainer readMetrics(String query); + String SUBSCRIPTION_QUERY_FORMAT = "sum by (group, topic, subscription)" + + " (irate({__name__='%s', group='%s', topic='%s', subscription='%s', %s}[1m]))"; + + String SUBSCRIPTION_QUERY_FORMAT_STATUS_CODE = "sum by (group, topic, subscription)" + + " (irate({__name__='%s', group='%s', topic='%s', subscription='%s', status_code=~'%s', %s}[1m]))"; + + String TOPIC_QUERY_FORMAT = "sum by (group, topic) (irate({__name__='%s', group='%s', " + + "topic='%s', %s}[1m]))"; + + default MonitoringMetricsContainer readMetrics(String... query) { + return readMetrics(List.of(query)); + } + + MonitoringMetricsContainer readMetrics(List queries); + + static String forSubscription(String name, SubscriptionName subscriptionName, String additionalFilters) { + return String.format(SUBSCRIPTION_QUERY_FORMAT, name, + subscriptionName.getTopicName().getGroupName(), subscriptionName.getTopicName().getName(), + subscriptionName.getName(), additionalFilters); + } + + static String forSubscriptionStatusCode(String name, SubscriptionName subscriptionName, + String regex, String additionalFilters) { + return String.format(SUBSCRIPTION_QUERY_FORMAT_STATUS_CODE, name, + subscriptionName.getTopicName().getGroupName(), subscriptionName.getTopicName().getName(), + subscriptionName.getName(), regex, additionalFilters); + } + + + static String forTopic(String name, TopicName topicName, String additionalFilters) { + return String.format(TOPIC_QUERY_FORMAT, name, + topicName.getGroupName(), topicName.getName(), additionalFilters); + } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/VictoriaMetricsMetricsProvider.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/PrometheusMetricsProvider.java similarity index 50% rename from hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/VictoriaMetricsMetricsProvider.java rename to hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/PrometheusMetricsProvider.java index 48bcd67600..7121281b5a 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/VictoriaMetricsMetricsProvider.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/PrometheusMetricsProvider.java @@ -6,11 +6,12 @@ import pl.allegro.tech.hermes.management.infrastructure.metrics.MonitoringSubscriptionMetricsProvider; import pl.allegro.tech.hermes.management.infrastructure.metrics.MonitoringTopicMetricsProvider; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import static pl.allegro.tech.hermes.management.infrastructure.prometheus.PrometheusClient.forSubscription; +import static pl.allegro.tech.hermes.management.infrastructure.prometheus.PrometheusClient.forSubscriptionStatusCode; +import static pl.allegro.tech.hermes.management.infrastructure.prometheus.PrometheusClient.forTopic; -public class VictoriaMetricsMetricsProvider implements MonitoringSubscriptionMetricsProvider, MonitoringTopicMetricsProvider { + +public class PrometheusMetricsProvider implements MonitoringSubscriptionMetricsProvider, MonitoringTopicMetricsProvider { private static final String SUBSCRIPTION_DELIVERED = "subscription_delivered_total"; private static final String SUBSCRIPTION_TIMEOUTS = "subscription_timeouts_total"; @@ -18,9 +19,6 @@ public class VictoriaMetricsMetricsProvider implements MonitoringSubscriptionMet private static final String SUBSCRIPTION_OTHER_ERRORS = "subscription_other_errors_total"; private static final String SUBSCRIPTION_BATCHES = "subscription_batches_total"; private static final String SUBSCRIPTION_STATUS_CODES = "subscription_http_status_codes_total"; - private static final String SUBSCRIPTION_STATUS_CODES_2XX = SUBSCRIPTION_STATUS_CODES + "_2xx"; - private static final String SUBSCRIPTION_STATUS_CODES_4XX = SUBSCRIPTION_STATUS_CODES + "_4xx"; - private static final String SUBSCRIPTION_STATUS_CODES_5XX = SUBSCRIPTION_STATUS_CODES + "_5xx"; private static final String SUBSCRIPTION_RETRIES = "subscription_retries_total"; private static final String TOPIC_RATE = "topic_requests_total"; @@ -30,72 +28,62 @@ public class VictoriaMetricsMetricsProvider implements MonitoringSubscriptionMet private final String consumersMetricsPrefix; private final String frontendMetricsPrefix; private final String additionalFilters; - private final String subscriptionMetricsToQuery; - private final String topicMetricsToQuery; private final PrometheusClient prometheusClient; - public VictoriaMetricsMetricsProvider(PrometheusClient prometheusClient, String consumersMetricsPrefix, - String frontendMetricsPrefix, String additionalFilters) { + public PrometheusMetricsProvider(PrometheusClient prometheusClient, String consumersMetricsPrefix, + String frontendMetricsPrefix, String additionalFilters) { this.prometheusClient = prometheusClient; this.consumersMetricsPrefix = consumersMetricsPrefix.isEmpty() ? "" : consumersMetricsPrefix + "_"; this.frontendMetricsPrefix = frontendMetricsPrefix.isEmpty() ? "" : frontendMetricsPrefix + "_"; this.additionalFilters = additionalFilters; - this.subscriptionMetricsToQuery = Stream.of(SUBSCRIPTION_DELIVERED, SUBSCRIPTION_TIMEOUTS, SUBSCRIPTION_RETRIES, - SUBSCRIPTION_THROUGHPUT, SUBSCRIPTION_OTHER_ERRORS, SUBSCRIPTION_BATCHES, SUBSCRIPTION_STATUS_CODES) - .map(this::consumerMetricName) - .collect(Collectors.joining("|")); - this.topicMetricsToQuery = String.join("|", List.of( - frontendMetricName(TOPIC_RATE), - consumerMetricName(TOPIC_DELIVERY_RATE), - frontendMetricName(TOPIC_THROUGHPUT_RATE) - )); } @Override public MonitoringSubscriptionMetrics subscriptionMetrics(SubscriptionName subscriptionName) { - /* - The query is based on MetricsQL, available only in VictoriaMetrics - https://docs.victoriametrics.com/MetricsQL.html. Basic PromQL does not support `keep_metric_names` param. - */ - String queryFormat = "sum by (__name__, group, topic, subscription, status_code)" - + " (irate({__name__=~'%s', group='%s', topic='%s', subscription='%s', %s}[1m]) keep_metric_names)"; - String query = String.format(queryFormat, subscriptionMetricsToQuery, subscriptionName.getTopicName().getGroupName(), - subscriptionName.getTopicName().getName(), subscriptionName.getName(), additionalFilters); - MonitoringMetricsContainer prometheusMetricsContainer = prometheusClient.readMetrics(query); + String subscriptionDeliveredQuery = forSubscription(consumerMetricName(SUBSCRIPTION_DELIVERED), subscriptionName, additionalFilters); + String subscriptionTimeoutsQuery = forSubscription(consumerMetricName(SUBSCRIPTION_TIMEOUTS), subscriptionName, additionalFilters); + String subscriptionThroughputQuery = forSubscription(consumerMetricName(SUBSCRIPTION_THROUGHPUT), subscriptionName, additionalFilters); + String subscriptionOtherErrorsQuery = forSubscription(consumerMetricName(SUBSCRIPTION_OTHER_ERRORS), subscriptionName, additionalFilters); + String subscriptionBatchesQuery = forSubscription(consumerMetricName(SUBSCRIPTION_BATCHES), subscriptionName, additionalFilters); + String subscriptionRetriesQuery = forSubscription(consumerMetricName(SUBSCRIPTION_RETRIES), subscriptionName, additionalFilters); + String subscription2xx = forSubscriptionStatusCode(consumerMetricName(SUBSCRIPTION_STATUS_CODES), subscriptionName, "2.*", additionalFilters); + String subscription4xx = forSubscriptionStatusCode(consumerMetricName(SUBSCRIPTION_STATUS_CODES), subscriptionName, "4.*", additionalFilters); + String subscription5xx = forSubscriptionStatusCode(consumerMetricName(SUBSCRIPTION_STATUS_CODES), subscriptionName, "5.*", additionalFilters); + + MonitoringMetricsContainer prometheusMetricsContainer = prometheusClient.readMetrics( + subscriptionDeliveredQuery, subscriptionTimeoutsQuery, subscriptionRetriesQuery, subscriptionThroughputQuery, + subscriptionOtherErrorsQuery, subscriptionBatchesQuery, subscription2xx, subscription4xx, subscription5xx + ); return MonitoringSubscriptionMetricsProvider .metricsBuilder() - .withRate(prometheusMetricsContainer.metricValue(consumerMetricName(SUBSCRIPTION_DELIVERED))) - .withTimeouts(prometheusMetricsContainer.metricValue(consumerMetricName(SUBSCRIPTION_TIMEOUTS))) - .withThroughput(prometheusMetricsContainer.metricValue(consumerMetricName(SUBSCRIPTION_THROUGHPUT))) - .withOtherErrors(prometheusMetricsContainer.metricValue(consumerMetricName(SUBSCRIPTION_OTHER_ERRORS))) - .withMetricPathBatchRate(prometheusMetricsContainer.metricValue(consumerMetricName(SUBSCRIPTION_BATCHES))) - .withCodes2xx(prometheusMetricsContainer.metricValue(consumerMetricName(SUBSCRIPTION_STATUS_CODES_2XX))) - .withCode4xx(prometheusMetricsContainer.metricValue(consumerMetricName(SUBSCRIPTION_STATUS_CODES_4XX))) - .withCode5xx(prometheusMetricsContainer.metricValue(consumerMetricName(SUBSCRIPTION_STATUS_CODES_5XX))) - .withRetries(prometheusMetricsContainer.metricValue(consumerMetricName(SUBSCRIPTION_RETRIES))) + .withRate(prometheusMetricsContainer.metricValue(subscriptionDeliveredQuery)) + .withTimeouts(prometheusMetricsContainer.metricValue(subscriptionTimeoutsQuery)) + .withThroughput(prometheusMetricsContainer.metricValue(subscriptionThroughputQuery)) + .withOtherErrors(prometheusMetricsContainer.metricValue(subscriptionOtherErrorsQuery)) + .withMetricPathBatchRate(prometheusMetricsContainer.metricValue(subscriptionBatchesQuery)) + .withCodes2xx(prometheusMetricsContainer.metricValue(subscription2xx)) + .withCode4xx(prometheusMetricsContainer.metricValue(subscription4xx)) + .withCode5xx(prometheusMetricsContainer.metricValue(subscription5xx)) + .withRetries(prometheusMetricsContainer.metricValue(subscriptionRetriesQuery)) .build(); } @Override public MonitoringTopicMetrics topicMetrics(TopicName topicName) { - /* - The query is based on MetricsQL, available only in VictoriaMetrics - https://docs.victoriametrics.com/MetricsQL.html. Basic PromQL does not support `keep_metric_names` param. - */ - String queryFormat = "sum by (__name__, group, topic) (irate({__name__=~'%s', group='%s', " - + "topic='%s', %s}[1m]) keep_metric_names)"; - String query = String.format(queryFormat, topicMetricsToQuery, topicName.getGroupName(), topicName.getName(), - additionalFilters); - MonitoringMetricsContainer prometheusMetricsContainer = prometheusClient.readMetrics(query); + String topicRateQuery = forTopic(frontendMetricName(TOPIC_RATE), topicName, additionalFilters); + String topicDeliveryRateQuery = forTopic(consumerMetricName(TOPIC_DELIVERY_RATE), topicName, additionalFilters); + String topicThroughputQuery = forTopic(frontendMetricName(TOPIC_THROUGHPUT_RATE), topicName, additionalFilters); + + MonitoringMetricsContainer prometheusMetricsContainer = prometheusClient.readMetrics( + topicRateQuery, topicDeliveryRateQuery, topicThroughputQuery); return MonitoringTopicMetricsProvider .metricsBuilder() - .withRate(prometheusMetricsContainer.metricValue(frontendMetricName(TOPIC_RATE))) - .withDeliveryRate(prometheusMetricsContainer.metricValue(consumerMetricName(TOPIC_DELIVERY_RATE))) - .withThroughput(prometheusMetricsContainer.metricValue(frontendMetricName(TOPIC_THROUGHPUT_RATE))) + .withRate(prometheusMetricsContainer.metricValue(topicRateQuery)) + .withDeliveryRate(prometheusMetricsContainer.metricValue(topicDeliveryRateQuery)) + .withThroughput(prometheusMetricsContainer.metricValue(topicThroughputQuery)) .build(); } - private String consumerMetricName(String name) { return consumersMetricsPrefix + name; } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/PrometheusResponse.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/PrometheusResponse.java index bcb3156dc7..74c1c29c9e 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/PrometheusResponse.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/PrometheusResponse.java @@ -22,7 +22,6 @@ boolean isVector() { @JsonIgnoreProperties(ignoreUnknown = true) record VectorResult( - @JsonProperty("metric") MetricName metricName, @JsonProperty("value") List vector) { private static final int VALID_VECTOR_LENGTH = 2; @@ -34,31 +33,6 @@ Optional getValue() { } return Optional.of(Double.parseDouble(vector.get(SCALAR_INDEX_VALUE))); } - - VectorResult renameMetric(String newMetricName) { - return new VectorResult(new MetricName(newMetricName, metricName.statusCode), vector); - } - } - - @JsonIgnoreProperties(ignoreUnknown = true) - record MetricName( - @JsonProperty(value = "__name__") String name, - @JsonProperty(value = "status_code") Optional statusCode) { - boolean is2xxStatusCode() { - return hasStatusCode() && statusCode.get().startsWith("2"); - } - - boolean is4xxStatusCode() { - return hasStatusCode() && statusCode.get().startsWith("4"); - } - - boolean is5xxStatusCode() { - return hasStatusCode() && statusCode.get().startsWith("5"); - } - - private boolean hasStatusCode() { - return statusCode.isPresent(); - } } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/RestTemplatePrometheusClient.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/RestTemplatePrometheusClient.java index 3d5139e17a..d10e09e425 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/RestTemplatePrometheusClient.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/RestTemplatePrometheusClient.java @@ -1,22 +1,25 @@ package pl.allegro.tech.hermes.management.infrastructure.prometheus; import com.google.common.base.Preconditions; +import io.micrometer.core.instrument.MeterRegistry; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpEntity; import org.springframework.http.HttpMethod; -import org.springframework.http.ResponseEntity; +import org.springframework.web.client.HttpStatusCodeException; import org.springframework.web.client.RestTemplate; import pl.allegro.tech.hermes.api.MetricDecimalValue; import pl.allegro.tech.hermes.management.infrastructure.metrics.MonitoringMetricsContainer; import java.net.URI; +import java.time.Duration; import java.util.List; import java.util.Map; -import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import java.util.stream.Stream; import static java.net.URLEncoder.encode; import static java.nio.charset.StandardCharsets.UTF_8; @@ -28,94 +31,90 @@ public class RestTemplatePrometheusClient implements PrometheusClient { private final URI prometheusUri; private final RestTemplate restTemplate; + private final ExecutorService executorService; + private final Duration fetchingTimeout; + private final MeterRegistry meterRegistry; - public RestTemplatePrometheusClient(RestTemplate restTemplate, URI prometheusUri) { + + public RestTemplatePrometheusClient(RestTemplate restTemplate, + URI prometheusUri, + ExecutorService executorService, + Duration fetchingTimeoutMillis, + MeterRegistry meterRegistry) { this.restTemplate = restTemplate; this.prometheusUri = prometheusUri; + this.executorService = executorService; + this.fetchingTimeout = fetchingTimeoutMillis; + this.meterRegistry = meterRegistry; } @Override - public MonitoringMetricsContainer readMetrics(String query) { - try { - PrometheusResponse response = queryPrometheus(query); - Preconditions.checkNotNull(response, "Prometheus response is null"); - Preconditions.checkState(response.isSuccess(), "Prometheus response does not contain valid data"); + public MonitoringMetricsContainer readMetrics(List queries) { + return fetchInParallelFromPrometheus(queries); + } + + private MonitoringMetricsContainer fetchInParallelFromPrometheus(List queries) { + CompletableFuture> aggregatedFuture = getAggregatedCompletableFuture(queries); - Map> metricsGroupedByName = groupMetricsByName(response); - return produceMetricsContainer(metricsGroupedByName); - } catch (Exception exception) { - logger.warn("Unable to read from Prometheus...", exception); + try { + Map metrics = aggregatedFuture.get(fetchingTimeout.toMillis(), TimeUnit.MILLISECONDS); + return MonitoringMetricsContainer.initialized(metrics); + } catch (InterruptedException e) { + // possibly let know the caller that the thread was interrupted + Thread.currentThread().interrupt(); + logger.warn("Prometheus fetching thread was interrupted...", e); + return MonitoringMetricsContainer.unavailable(); + } catch (Exception ex) { + logger.warn("Unexpected exception during fetching metrics from prometheus...", ex); return MonitoringMetricsContainer.unavailable(); } } - private PrometheusResponse queryPrometheus(String query) { - URI queryUri = URI.create(prometheusUri.toString() + "/api/v1/query?query=" + encode(query, UTF_8)); - - ResponseEntity response = restTemplate.exchange(queryUri, - HttpMethod.GET, HttpEntity.EMPTY, PrometheusResponse.class); - return response.getBody(); + private CompletableFuture> getAggregatedCompletableFuture(List queries) { + // has to be collected to run in parallel + List>> futures = queries.stream() + .map(this::readSingleMetric) + .toList(); + + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .thenApply( + v -> futures.stream().map(CompletableFuture::join) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)) + ); } - private static Map> groupMetricsByName(PrometheusResponse response) { - return response.data().results().stream() - .map(RestTemplatePrometheusClient::renameStatusCodesMetricsNames) - .collect(Collectors.groupingBy(r -> r.metricName().name())); + private CompletableFuture> readSingleMetric(String query) { + return CompletableFuture.supplyAsync(() -> queryPrometheus(query), executorService); } - private static MonitoringMetricsContainer produceMetricsContainer( - Map> metricsGroupedByName) { - MonitoringMetricsContainer metricsContainer = MonitoringMetricsContainer.createEmpty(); - - Stream> metricsSummedByStatusCodeFamily = metricsGroupedByName.entrySet().stream() - .map(RestTemplatePrometheusClient::sumMetricsWithTheSameName); + private Pair queryPrometheus(String query) { + try { + URI queryUri = URI.create(prometheusUri.toString() + "/api/v1/query?query=" + encode(query, UTF_8)); + PrometheusResponse response = restTemplate.exchange(queryUri, + HttpMethod.GET, HttpEntity.EMPTY, PrometheusResponse.class).getBody(); - metricsSummedByStatusCodeFamily.forEach(pair -> metricsContainer.addMetricValue( - pair.getKey(), - MetricDecimalValue.of(pair.getValue().toString()))); - return metricsContainer; - } + Preconditions.checkNotNull(response, "Prometheus response is null"); + Preconditions.checkState(response.isSuccess(), "Prometheus response does not contain valid data"); - private static PrometheusResponse.VectorResult renameStatusCodesMetricsNames(PrometheusResponse.VectorResult r) { - /* - Renames any metric containing status_code tag to the _2xx/3xx/4xx/5xx> metric name. For example: - VectorResult( - metricName=MetricName( - name=hermes_consumers_subscription_http_status_codes_total, - statusCode=Optional[200]), - vector=[...] - ) - ----> - VectorResult( - metricName=MetricName( - name=hermes_consumers_subscription_http_status_codes_total_2xx, - statusCode=Optional[200]), - vector=[...] - ) - It allows then to sum metrics accordingly to the status code family. - */ - String suffix = ""; - if (r.metricName().is2xxStatusCode()) { - suffix = "_2xx"; - } else if (r.metricName().is4xxStatusCode()) { - suffix = "_4xx"; - } else if (r.metricName().is5xxStatusCode()) { - suffix = "_5xx"; + MetricDecimalValue result = parseResponse(response); + meterRegistry.counter("read-metric-from-prometheus.success").increment(); + return Pair.of(query, result); + } catch (HttpStatusCodeException ex) { + logger.warn("Unable to read from Prometheus. Query: {}, Status code: {}. Response body: {}", + query, ex.getStatusCode(), ex.getResponseBodyAsString(), ex); + return Pair.of(query, MetricDecimalValue.unavailable()); + } catch (Exception ex) { + logger.warn("Unable to read from Prometheus. Query: {}", query, ex); + meterRegistry.counter("read-metric-from-prometheus.error").increment(); + return Pair.of(query, MetricDecimalValue.unavailable()); } - return r.renameMetric(r.metricName().name() + suffix); } - /* - We have to sum some metrics on the client side because Prometheus does not support this kind of aggregation when using - query for multiple __name__ metrics. - */ - private static Pair sumMetricsWithTheSameName(Map.Entry> e) { - return Pair.of( - e.getKey(), - e.getValue().stream() - .map(PrometheusResponse.VectorResult::getValue) - .filter(Optional::isPresent) - .map(Optional::get) - .mapToDouble(d -> d).sum()); + private MetricDecimalValue parseResponse(PrometheusResponse response) { + return response.data().results().stream() + .findFirst() + .flatMap(PrometheusResponse.VectorResult::getValue) + .map(value -> MetricDecimalValue.of(value.toString())) + .orElse(MetricDecimalValue.defaultValue()); } } diff --git a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridPrometheusBasedSubscriptionMetricsRepositoryTest.groovy b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridPrometheusBasedSubscriptionMetricsRepositoryTest.groovy index 6c18b04c48..8ed2088bf3 100644 --- a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridPrometheusBasedSubscriptionMetricsRepositoryTest.groovy +++ b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridPrometheusBasedSubscriptionMetricsRepositoryTest.groovy @@ -7,7 +7,7 @@ import pl.allegro.tech.hermes.api.TopicName import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths import pl.allegro.tech.hermes.management.domain.subscription.SubscriptionLagSource import pl.allegro.tech.hermes.management.infrastructure.prometheus.PrometheusClient -import pl.allegro.tech.hermes.management.infrastructure.prometheus.VictoriaMetricsMetricsProvider +import pl.allegro.tech.hermes.management.infrastructure.prometheus.PrometheusMetricsProvider import spock.lang.Specification import static pl.allegro.tech.hermes.api.MetricDecimalValue.of @@ -22,29 +22,36 @@ class HybridPrometheusBasedSubscriptionMetricsRepositoryTest extends Specificati private SubscriptionLagSource lagSource = new NoOpSubscriptionLagSource() - private VictoriaMetricsMetricsProvider prometheusMetricsProvider = new VictoriaMetricsMetricsProvider( + private PrometheusMetricsProvider prometheusMetricsProvider = new PrometheusMetricsProvider( client, "hermes_consumers", "hermes_frontend", "service=~'hermes'") private HybridSubscriptionMetricsRepository repository = new HybridSubscriptionMetricsRepository(prometheusMetricsProvider, summedSharedCounter, zookeeperPaths, lagSource) - private static final String query = "sum by (__name__, group, topic, subscription, status_code) " + - "(irate({__name__=~'hermes_consumers_subscription_delivered_total" + - "|hermes_consumers_subscription_timeouts_total" + - "|hermes_consumers_subscription_retries_total" + - "|hermes_consumers_subscription_throughput_bytes_total" + - "|hermes_consumers_subscription_other_errors_total" + - "|hermes_consumers_subscription_batches_total" + - "|hermes_consumers_subscription_http_status_codes_total', " + - "group='group', topic='topic', subscription='subscription', service=~'hermes'}[1m]) keep_metric_names)" + private final static String subscriptionQuery = "sum by (group, topic, subscription) (irate({__name__='hermes_consumers_subscription_%s_total', group='group', topic='topic', subscription='subscription', service=~'hermes'}[1m]))"; + private final static String deliveredQuery = String.format(subscriptionQuery, "delivered") + private final static String timeoutsQuery = String.format(subscriptionQuery, "timeouts") + private final static String retriesQuery = String.format(subscriptionQuery, "retries") + private final static String throughputQuery = String.format(subscriptionQuery, "throughput_bytes") + private final static String otherErrorsQuery = String.format(subscriptionQuery, "other_errors") + private final static String batchesQuery = String.format(subscriptionQuery, "batches") + // these queries are different as they contains additional status code filters + private final static String status2xxQuery = "sum by (group, topic, subscription) (irate({__name__='hermes_consumers_subscription_http_status_codes_total', group='group', topic='topic', subscription='subscription', status_code=~'2.*', service=~'hermes'}[1m]))" + private final static String status4xxQuery = "sum by (group, topic, subscription) (irate({__name__='hermes_consumers_subscription_http_status_codes_total', group='group', topic='topic', subscription='subscription', status_code=~'4.*', service=~'hermes'}[1m]))" + private final static String status5xxQuery = "sum by (group, topic, subscription) (irate({__name__='hermes_consumers_subscription_http_status_codes_total', group='group', topic='topic', subscription='subscription', status_code=~'5.*', service=~'hermes'}[1m]))" + + private static final List queries = List.of( + deliveredQuery, timeoutsQuery, retriesQuery, throughputQuery, otherErrorsQuery, batchesQuery, + status2xxQuery, status4xxQuery, status5xxQuery + ) def "should read subscription metrics from multiple places"() { given: - client.readMetrics(query) >> MonitoringMetricsContainer.createEmpty() - .addMetricValue("hermes_consumers_subscription_delivered_total", of('10')) - .addMetricValue("hermes_consumers_subscription_timeouts_total", of('100')) - .addMetricValue("hermes_consumers_subscription_retries_total", of('20')) - .addMetricValue("hermes_consumers_subscription_other_errors_total", of('1000')) + client.readMetrics(queries) >> MonitoringMetricsContainer.createEmpty() + .addMetricValue(deliveredQuery, of('10')) + .addMetricValue(timeoutsQuery, of('100')) + .addMetricValue(retriesQuery, of('20')) + .addMetricValue(otherErrorsQuery, of('1000')) summedSharedCounter.getValue('/hermes/groups/group/topics/topic/subscriptions/subscription/metrics/delivered') >> 100 summedSharedCounter.getValue('/hermes/groups/group/topics/topic/subscriptions/subscription/metrics/discarded') >> 1 summedSharedCounter.getValue('/hermes/groups/group/topics/topic/subscriptions/subscription/metrics/volume') >> 16 @@ -66,10 +73,10 @@ class HybridPrometheusBasedSubscriptionMetricsRepositoryTest extends Specificati def "should read subscription metrics for all http status codes"() { given: - client.readMetrics(query) >> MonitoringMetricsContainer.createEmpty() - .addMetricValue("hermes_consumers_subscription_http_status_codes_total_2xx", of('2')) - .addMetricValue("hermes_consumers_subscription_http_status_codes_total_4xx", of('4')) - .addMetricValue("hermes_consumers_subscription_http_status_codes_total_5xx", of('5')) + client.readMetrics(queries) >> MonitoringMetricsContainer.createEmpty() + .addMetricValue(status2xxQuery, of('2')) + .addMetricValue(status4xxQuery, of('4')) + .addMetricValue(status5xxQuery, of('5')) when: SubscriptionMetrics metrics = repository.loadMetrics(new TopicName('group', 'topic'), 'subscription') diff --git a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridPrometheusBasedTopicMetricsRepositoryTest.groovy b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridPrometheusBasedTopicMetricsRepositoryTest.groovy index c3dcad72ad..c1dcd3cc38 100644 --- a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridPrometheusBasedTopicMetricsRepositoryTest.groovy +++ b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridPrometheusBasedTopicMetricsRepositoryTest.groovy @@ -5,7 +5,7 @@ import pl.allegro.tech.hermes.api.TopicName import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths import pl.allegro.tech.hermes.management.infrastructure.prometheus.PrometheusClient -import pl.allegro.tech.hermes.management.infrastructure.prometheus.VictoriaMetricsMetricsProvider +import pl.allegro.tech.hermes.management.infrastructure.prometheus.PrometheusMetricsProvider import spock.lang.Specification import static pl.allegro.tech.hermes.api.MetricDecimalValue.of @@ -20,23 +20,24 @@ class HybridPrometheusBasedTopicMetricsRepositoryTest extends Specification { private SubscriptionRepository subscriptionRepository = Mock(SubscriptionRepository) - private VictoriaMetricsMetricsProvider prometheusMetricsProvider = new VictoriaMetricsMetricsProvider(client, + private PrometheusMetricsProvider prometheusMetricsProvider = new PrometheusMetricsProvider(client, "hermes_consumers", "hermes_frontend", "service='hermes'") private HybridTopicMetricsRepository repository = new HybridTopicMetricsRepository(prometheusMetricsProvider, summedSharedCounter, zookeeperPaths, subscriptionRepository) + private String topicRequestsQuery = "sum by (group, topic) (irate({__name__='hermes_frontend_topic_requests_total', group='group', topic='topic', service='hermes'}[1m]))" + private String topicDeliveredQuery = "sum by (group, topic) (irate({__name__='hermes_consumers_subscription_delivered_total', group='group', topic='topic', service='hermes'}[1m]))" + private String topicThroughputQuery = "sum by (group, topic) (irate({__name__='hermes_frontend_topic_throughput_bytes_total', group='group', topic='topic', service='hermes'}[1m]))" + def "should load metrics from graphite and zookeeper"() { given: - String query = "sum by (__name__, group, topic) (irate({__name__=~'hermes_frontend_topic_requests_total" + - "|hermes_consumers_subscription_delivered_total" + - "|hermes_frontend_topic_throughput_bytes_total', group='group', " + - "topic='topic', service='hermes'}[1m]) keep_metric_names)" + List queries = List.of(topicRequestsQuery, topicDeliveredQuery, topicThroughputQuery) TopicName topic = new TopicName('group', 'topic') - client.readMetrics(query) >> MonitoringMetricsContainer.createEmpty() - .addMetricValue("hermes_frontend_topic_requests_total", of('10')) - .addMetricValue("hermes_consumers_subscription_delivered_total", of('20')) + client.readMetrics(queries) >> MonitoringMetricsContainer.createEmpty() + .addMetricValue(topicRequestsQuery, of('10')) + .addMetricValue(topicDeliveredQuery, of('20')) summedSharedCounter.getValue('/hermes/groups/group/topics/topic/metrics/published') >> 100 summedSharedCounter.getValue('/hermes/groups/group/topics/topic/metrics/volume') >> 1024 subscriptionRepository.listSubscriptionNames(topic) >> ["subscription1", "subscription2"] diff --git a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/prometheus/CachingPrometheusClientTest.groovy b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/prometheus/CachingPrometheusClientTest.groovy index 1345607d0f..07d52f64a5 100644 --- a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/prometheus/CachingPrometheusClientTest.groovy +++ b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/prometheus/CachingPrometheusClientTest.groovy @@ -8,6 +8,7 @@ import spock.lang.Subject import java.time.Duration import static pl.allegro.tech.hermes.api.MetricDecimalValue.of +import static pl.allegro.tech.hermes.api.MetricDecimalValue.unavailable class CachingPrometheusClientTest extends Specification { static final CACHE_TTL_IN_SECONDS = 30 @@ -16,16 +17,18 @@ class CachingPrometheusClientTest extends Specification { def underlyingClient = Mock(PrometheusClient) def ticker = new FakeTicker() + def queries = List.of("query") @Subject def cachingClient = new CachingPrometheusClient(underlyingClient, ticker, CACHE_TTL_IN_SECONDS, CACHE_SIZE) def "should return metrics from the underlying client"() { given: - underlyingClient.readMetrics("someQuery") >> MonitoringMetricsContainer.initialized([metric_1: of("1"), metric_2: of("2")]) + underlyingClient.readMetrics(queries) >> MonitoringMetricsContainer.initialized( + ["metric_1": of("1"), "metric_2": of("2")]) when: - def metrics = cachingClient.readMetrics("someQuery") + def metrics = cachingClient.readMetrics(queries) then: metrics.metricValue("metric_1") == of("1") @@ -34,21 +37,42 @@ class CachingPrometheusClientTest extends Specification { def "should return metrics from cache while TTL has not expired"() { when: - cachingClient.readMetrics("someQuery") + cachingClient.readMetrics(queries) ticker.advance(CACHE_TTL.minusSeconds(1)) - cachingClient.readMetrics("someQuery") + cachingClient.readMetrics(queries) then: - 1 * underlyingClient.readMetrics("someQuery") >> MonitoringMetricsContainer.initialized([metric_1: of("1"), metric_2: of("2")]) + 1 * underlyingClient.readMetrics(queries) >> MonitoringMetricsContainer.initialized( + ["metric_1": of("1"), "metric_2": of("2")]) } def "should get metrics from the underlying client after TTL expires"() { when: - cachingClient.readMetrics("someQuery") + cachingClient.readMetrics(queries) ticker.advance(CACHE_TTL.plusSeconds(1)) - cachingClient.readMetrics("someQuery") + cachingClient.readMetrics(queries) then: - 2 * underlyingClient.readMetrics("someQuery") >> MonitoringMetricsContainer.initialized([metric_1: of("1"), metric_2: of("2")]) + 2 * underlyingClient.readMetrics(queries) >> MonitoringMetricsContainer.initialized( + ["metric_1": of("1"), "metric_2": of("2")]) + } + + def "should invalidate partially unavailable data and retry fetch on the next client metrics read"() { + when: + cachingClient.readMetrics(queries) + cachingClient.readMetrics(queries) + + then: + 2 * underlyingClient.readMetrics(queries) >> MonitoringMetricsContainer.initialized( + ["metric_1": unavailable(), "metric_2": of("2")]) + } + + def "should invalidate completely unavailable data and retry fetch on the next client metrics read"() { + when: + cachingClient.readMetrics(queries) + cachingClient.readMetrics(queries) + + then: + 2 * underlyingClient.readMetrics(queries) >> MonitoringMetricsContainer.unavailable() } } diff --git a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/prometheus/RestTemplatePrometheusClientTest.groovy b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/prometheus/RestTemplatePrometheusClientTest.groovy index 6f68f65075..5d8c8024dd 100644 --- a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/prometheus/RestTemplatePrometheusClientTest.groovy +++ b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/prometheus/RestTemplatePrometheusClientTest.groovy @@ -1,88 +1,204 @@ package pl.allegro.tech.hermes.management.infrastructure.prometheus + import com.github.tomakehurst.wiremock.client.WireMock import com.github.tomakehurst.wiremock.junit.WireMockRule +import io.micrometer.core.instrument.simple.SimpleMeterRegistry import jakarta.ws.rs.core.MediaType +import org.apache.hc.client5.http.classic.HttpClient +import org.apache.hc.client5.http.config.RequestConfig +import org.apache.hc.client5.http.impl.classic.HttpClientBuilder +import org.apache.hc.core5.util.Timeout import org.junit.Rule +import org.springframework.http.client.ClientHttpRequestFactory +import org.springframework.http.client.HttpComponentsClientHttpRequestFactory import org.springframework.web.client.RestTemplate import pl.allegro.tech.hermes.management.infrastructure.metrics.MonitoringMetricsContainer import pl.allegro.tech.hermes.test.helper.util.Ports import spock.lang.Specification import java.nio.charset.StandardCharsets +import java.time.Duration +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig +import static pl.allegro.tech.hermes.api.MetricDecimalValue.defaultValue import static pl.allegro.tech.hermes.api.MetricDecimalValue.of +import static pl.allegro.tech.hermes.api.MetricDecimalValue.unavailable class RestTemplatePrometheusClientTest extends Specification { private static final int PROMETHEUS_HTTP_PORT = Ports.nextAvailable() - private static final String query = "sum by (__name__,group,topic,subscription,status_code)" + - "(irate({__name__=~'hermes_consumers_subscription_delivered_total" + - "|hermes_consumers_subscription_timeouts_total" + - "|hermes_consumers_subscription_retries_total" + - "|hermes_consumers_subscription_throughput_bytes_total" + - "|hermes_consumers_subscription_other_errors_total" + - "|hermes_consumers_subscription_batches_total" + - "|hermes_consumers_subscription_http_status_codes_total'," + - "group='pl.allegro.tech.hermes',topic='Monitor',subscription='consumer1'}[1m]) keep_metric_names)" + def subscriptionDeliveredQuery = "sum by (group, topic, subscription) (irate({__name__='hermes_consumers_subscription_delivered_total', group='pl.allegro.tech.hermes', topic='Monitor', subscription='consumer1' service=~'hermes'}[1m]))" + def subscriptionTimeoutsQuery = "sum by (group, topic, subscription) (irate({__name__='hermes_consumers_subscription_timeouts_total', group='pl.allegro.tech.hermes', topic='Monitor', subscription='consumer1', service=~'hermes'}[1m]))" + def subscriptionRetriesQuery = "sum by (group, topic, subscription) (irate({__name__='hermes_consumers_subscription_retries_total', group='pl.allegro.tech.hermes', topic='Monitor', subscription='consumer1', service=~'hermes'}[1m]))" + def subscriptionThroughputQuery = "sum by (group, topic, subscription) (irate({__name__='hermes_consumers_subscription_throughput_bytes_total', group='pl.allegro.tech.hermes', topic='Monitor', subscription='consumer1', service=~'hermes'}[1m]))" + def subscriptionErrorsQuery = "sum by (group, topic, subscription) (irate({__name__='hermes_consumers_subscription_other_errors_total', group='pl.allegro.tech.hermes', topic='Monitor', subscription='consumer1', service=~'hermes'}[1m]))" + def subscriptionBatchesQuery = "sum by (group, topic, subscription) (irate({__name__='hermes_consumers_subscription_batches_total', group='pl.allegro.tech.hermes', topic='Monitor', subscription='consumer1', service=~'hermes'}[1m]))" + def subscription2xxStatusCodesQuery = "sum by (group, topic, subscription) (irate({__name__='hermes_consumers_subscription_http_status_codes_total', group='pl.allegro.tech.hermes', topic='Monitor', subscription='consumer1', status_code=~'2.*', service=~'hermes'}[1m]))" + def subscription4xxStatusCodesQuery = "sum by (group, topic, subscription) (irate({__name__='hermes_consumers_subscription_http_status_codes_total', group='pl.allegro.tech.hermes', topic='Monitor', subscription='consumer1', status_code=~'4.*', service=~'hermes'}[1m]))" + def subscription5xxStatusCodesQuery = "sum by (group, topic, subscription) (irate({__name__='hermes_consumers_subscription_http_status_codes_total', group='pl.allegro.tech.hermes', topic='Monitor', subscription='consumer1', status_code=~'5.*', service=~'hermes'}[1m]))" + + def queries = List.of(subscriptionDeliveredQuery, subscriptionTimeoutsQuery, subscriptionRetriesQuery, subscriptionThroughputQuery, + subscriptionErrorsQuery, subscriptionBatchesQuery, subscription2xxStatusCodesQuery, subscription4xxStatusCodesQuery, + subscription5xxStatusCodesQuery + ) @Rule - WireMockRule wireMockRule = new WireMockRule( - wireMockConfig().port(PROMETHEUS_HTTP_PORT).usingFilesUnderClasspath("prometheus-stubs")) + WireMockRule wireMockServer = new WireMockRule( + wireMockConfig() + .port(PROMETHEUS_HTTP_PORT).usingFilesUnderClasspath("prometheus-stubs")) private RestTemplatePrometheusClient client void setup() { - RestTemplate restTemplate = new RestTemplate() - client = new RestTemplatePrometheusClient(restTemplate, URI.create("http://localhost:$PROMETHEUS_HTTP_PORT")) + ExecutorService executorService = Executors.newFixedThreadPool(10) + RestTemplate restTemplate = createRestTemplateWithTimeout(Duration.ofSeconds(1)) + client = new RestTemplatePrometheusClient(restTemplate, URI.create("http://localhost:$PROMETHEUS_HTTP_PORT"), + executorService, Duration.ofSeconds(5), new SimpleMeterRegistry()) + wireMockServer.resetAll() } def "should get metrics for path"() { given: - mockPrometheus(query, "full_response.json") + def queriesStubs = List.of( + new FileStub(subscriptionDeliveredQuery, "subscription_delivered_total.json"), + new FileStub(subscriptionTimeoutsQuery, "subscription_timeouts_total.json"), + new FileStub(subscriptionRetriesQuery, "subscription_retries_total.json"), + new FileStub(subscriptionThroughputQuery, "subscription_throughput_bytes_total.json"), + new FileStub(subscriptionErrorsQuery, "subscription_other_errors_total.json"), + new FileStub(subscriptionBatchesQuery, "subscription_batches_total.json"), + new FileStub(subscription2xxStatusCodesQuery, "subscription_2xx_http_status_codes_total.json"), + new FileStub(subscription4xxStatusCodesQuery, "subscription_4xx_http_status_codes_total.json"), + new FileStub(subscription5xxStatusCodesQuery, "subscription_5xx_http_status_codes_total.json"), + ) + mockPrometheus(queriesStubs) when: - MonitoringMetricsContainer metrics = client.readMetrics(query) + MonitoringMetricsContainer metrics = client.readMetrics(queries) then: - metrics.metricValue("hermes_consumers_subscription_delivered_total") == of("1.0") - metrics.metricValue("hermes_consumers_subscription_timeouts_total") == of("2.0") - metrics.metricValue("hermes_consumers_subscription_retries_total") == of("1.0") - metrics.metricValue("hermes_consumers_subscription_throughput_bytes_total") == of("3.0") - metrics.metricValue("hermes_consumers_subscription_other_errors_total") == of("4.0") - metrics.metricValue("hermes_consumers_subscription_batches_total") == of("5.0") - metrics.metricValue("hermes_consumers_subscription_http_status_codes_total_2xx") == of("2.0") - metrics.metricValue("hermes_consumers_subscription_http_status_codes_total_4xx") == of("1.0") - metrics.metricValue("hermes_consumers_subscription_http_status_codes_total_5xx") == of("2.0") + metrics.metricValue(subscriptionDeliveredQuery) == of("1.0") + metrics.metricValue(subscriptionTimeoutsQuery) == of("2.0") + metrics.metricValue(subscriptionRetriesQuery) == of("1.0") + metrics.metricValue(subscriptionThroughputQuery) == of("3.0") + metrics.metricValue(subscriptionErrorsQuery) == of("4.0") + metrics.metricValue(subscriptionBatchesQuery) == of("5.0") + metrics.metricValue(subscription2xxStatusCodesQuery) == of("2.0") + metrics.metricValue(subscription4xxStatusCodesQuery) == of("1.0") + metrics.metricValue(subscription5xxStatusCodesQuery) == of("2.0") } def "should return default value when metric has no value"() { given: - mockPrometheus(query, "partial_response.json") + def queriesStubs = List.of( + emptyStub(subscriptionDeliveredQuery), + new FileStub(subscriptionTimeoutsQuery, "subscription_timeouts_total.json"), + new FileStub(subscriptionRetriesQuery, "subscription_retries_total.json"), + emptyStub(subscriptionThroughputQuery), + emptyStub(subscriptionErrorsQuery), + emptyStub(subscriptionBatchesQuery), + emptyStub(subscription2xxStatusCodesQuery), + emptyStub(subscription4xxStatusCodesQuery), + emptyStub(subscription5xxStatusCodesQuery) + ) + mockPrometheus(queriesStubs) + + when: + MonitoringMetricsContainer metrics = client.readMetrics(queries) + + then: + metrics.metricValue(subscriptionDeliveredQuery) == defaultValue() + metrics.metricValue(subscriptionTimeoutsQuery) == of("2.0") + metrics.metricValue(subscriptionRetriesQuery) == of("1.0") + metrics.metricValue(subscriptionThroughputQuery) == defaultValue() + metrics.metricValue(subscriptionErrorsQuery) == defaultValue() + metrics.metricValue(subscriptionBatchesQuery) == defaultValue() + metrics.metricValue(subscription2xxStatusCodesQuery) == defaultValue() + metrics.metricValue(subscription4xxStatusCodesQuery) == defaultValue() + metrics.metricValue(subscription5xxStatusCodesQuery) == defaultValue() + } + + def "should return partial results when some of the requests fails"() { + given: + def queriesToFail = List.of( + subscriptionDeliveredQuery, + subscriptionThroughputQuery, + subscriptionErrorsQuery, + subscriptionBatchesQuery, + subscription2xxStatusCodesQuery, + subscription4xxStatusCodesQuery, + subscription5xxStatusCodesQuery, + ) + def queriesToSuccess = List.of( + new FileStub(subscriptionTimeoutsQuery, "subscription_timeouts_total.json"), + new FileStub(subscriptionRetriesQuery, "subscription_retries_total.json"), + ) + mockPrometheus(queriesToSuccess) + mockPrometheusTimeout(queriesToFail, Duration.ofSeconds(5)) when: - MonitoringMetricsContainer metrics = client.readMetrics(query) + MonitoringMetricsContainer metrics = client.readMetrics(queries) then: - metrics.metricValue("hermes_consumers_subscription_delivered_total") == of("0.0") - metrics.metricValue("hermes_consumers_subscription_timeouts_total") == of("2.0") - metrics.metricValue("hermes_consumers_subscription_retries_total") == of("1.0") - metrics.metricValue("hermes_consumers_subscription_throughput_bytes_total") == of("3.0") - metrics.metricValue("hermes_consumers_subscription_other_errors_total") == of("4.0") - metrics.metricValue("hermes_consumers_subscription_batches_total") == of("5.0") - metrics.metricValue("hermes_consumers_subscription_http_status_codes_total_2xx") == of("2.0") - metrics.metricValue("hermes_consumers_subscription_http_status_codes_total_4xx") == of("1.0") - metrics.metricValue("hermes_consumers_subscription_http_status_codes_total_5xx") == of("0.0") + metrics.metricValue(subscriptionDeliveredQuery) == unavailable() + metrics.metricValue(subscriptionTimeoutsQuery) == of("2.0") + metrics.metricValue(subscriptionRetriesQuery) == of("1.0") + metrics.metricValue(subscriptionThroughputQuery) == unavailable() + metrics.metricValue(subscriptionErrorsQuery) == unavailable() + metrics.metricValue(subscriptionBatchesQuery) == unavailable() + metrics.metricValue(subscription2xxStatusCodesQuery) == unavailable() + metrics.metricValue(subscription4xxStatusCodesQuery) == unavailable() + metrics.metricValue(subscription5xxStatusCodesQuery) == unavailable() + } + + private void mockPrometheus(List stubs) { + stubs.forEach { s -> + String encodedQuery = URLEncoder.encode(s.query, StandardCharsets.UTF_8) + wireMockServer.stubFor(WireMock.get(urlEqualTo(String.format("/api/v1/query?query=%s", encodedQuery))) + .willReturn(WireMock.aResponse() + .withStatus(200) + .withHeader("Content-Type", MediaType.APPLICATION_JSON) + .withBodyFile(s.fileName))) + } + } + + private void mockPrometheusTimeout(List queries, Duration delay) { + queries.forEach { q -> + String encodedQuery = URLEncoder.encode(q, StandardCharsets.UTF_8) + wireMockServer.stubFor(WireMock.get(urlEqualTo(String.format("/api/v1/query?query=%s", encodedQuery))) + .willReturn(WireMock.aResponse() + .withHeader("Content-Type", MediaType.APPLICATION_JSON) + .withFixedDelay(delay.toMillis() as Integer))); + } + } + + private RestTemplate createRestTemplateWithTimeout(Duration timeout) { + RequestConfig requestConfig = RequestConfig.custom() + .setConnectTimeout(Timeout.ofMilliseconds(timeout.toMillis())) + .setResponseTimeout(Timeout.ofMilliseconds(timeout.toMillis())) + .build(); + + HttpClient client = HttpClientBuilder.create() + .setDefaultRequestConfig(requestConfig) + .build(); + + ClientHttpRequestFactory clientHttpRequestFactory = new HttpComponentsClientHttpRequestFactory(client); + return new RestTemplate(clientHttpRequestFactory); + } + + static class FileStub { + FileStub(String query, String fileName) { + this.query = query + this.fileName = fileName + } + String query; + String fileName } - private void mockPrometheus(String query, String responseFile) { - String encodedQuery = URLEncoder.encode(query, StandardCharsets.UTF_8) - WireMock.stubFor(WireMock.get(urlEqualTo(String.format("/api/v1/query?query=%s", encodedQuery))) - .willReturn(WireMock.aResponse() - .withStatus(200) - .withHeader("Content-Type", MediaType.APPLICATION_JSON) - .withBodyFile(responseFile))) + FileStub emptyStub(String query) { + return new FileStub(query, "prometheus_empty_response.json") } } diff --git a/hermes-management/src/test/resources/prometheus-stubs/__files/full_response.json b/hermes-management/src/test/resources/prometheus-stubs/__files/full_response.json deleted file mode 100644 index 7f813819d0..0000000000 --- a/hermes-management/src/test/resources/prometheus-stubs/__files/full_response.json +++ /dev/null @@ -1,156 +0,0 @@ -{ - "status": "success", - "data": { - "resultType": "vector", - "result": [ - { - "metric": { - "__name__": "hermes_consumers_subscription_delivered_total", - "group": "pl.allegro.tech.hermes", - "subscription": "hermesSubscription", - "topic": "hermesTopic" - }, - "value": [ - 1692281425.609, - "1" - ], - "group": 1 - }, - { - "metric": { - "__name__": "hermes_consumers_subscription_timeouts_total", - "group": "pl.allegro.tech.hermes", - "subscription": "hermesSubscription", - "topic": "hermesTopic" - }, - "value": [ - 1692281425.609, - "2" - ], - "group": 1 - }, - { - "metric": { - "__name__": "hermes_consumers_subscription_retries_total", - "group": "pl.allegro.tech.hermes", - "subscription": "hermesSubscription", - "topic": "hermesTopic" - }, - "value": [ - 1692281425.609, - "1" - ], - "group": 1 - }, - { - "metric": { - "__name__": "hermes_consumers_subscription_throughput_bytes_total", - "group": "pl.allegro.tech.hermes", - "subscription": "hermesSubscription", - "topic": "hermesTopic" - }, - "value": [ - 1692281425.609, - "3" - ], - "group": 1 - }, - { - "metric": { - "__name__": "hermes_consumers_subscription_other_errors_total", - "group": "pl.allegro.tech.hermes", - "subscription": "hermesSubscription", - "topic": "hermesTopic" - }, - "value": [ - 1692281425.609, - "4" - ], - "group": 1 - }, - { - "metric": { - "__name__": "hermes_consumers_subscription_batches_total", - "group": "pl.allegro.tech.hermes", - "subscription": "hermesSubscription", - "topic": "hermesTopic" - }, - "value": [ - 1692281425.609, - "5" - ], - "group": 1 - }, - { - "metric": { - "__name__": "hermes_consumers_subscription_http_status_codes_total", - "status_code": "200", - "group": "pl.allegro.tech.hermes", - "subscription": "hermesSubscription", - "topic": "hermesTopic" - }, - "value": [ - 1692281425.609, - "1" - ], - "group": 1 - }, - { - "metric": { - "__name__": "hermes_consumers_subscription_http_status_codes_total", - "status_code": "201", - "group": "pl.allegro.tech.hermes", - "subscription": "hermesSubscription", - "topic": "hermesTopic" - }, - "value": [ - 1692281425.609, - "1" - ], - "group": 1 - }, - { - "metric": { - "__name__": "hermes_consumers_subscription_http_status_codes_total", - "status_code": "401", - "group": "pl.allegro.tech.hermes", - "subscription": "hermesSubscription", - "topic": "hermesTopic" - }, - "value": [ - 1692281425.609, - "1" - ], - "group": 1 - }, - { - "metric": { - "__name__": "hermes_consumers_subscription_http_status_codes_total", - "status_code": "503", - "group": "pl.allegro.tech.hermes", - "subscription": "hermesSubscription", - "topic": "hermesTopic" - }, - "value": [ - 1692281425.609, - "1" - ], - "group": 1 - }, - { - "metric": { - "__name__": "hermes_consumers_subscription_http_status_codes_total", - "status_code": "500", - "group": "pl.allegro.tech.hermes", - "subscription": "hermesSubscription", - "topic": "hermesTopic" - }, - "value": [ - 1692281425.609, - "1" - ], - "group": 1 - } - ] - } -} \ No newline at end of file diff --git a/hermes-management/src/test/resources/prometheus-stubs/__files/partial_response.json b/hermes-management/src/test/resources/prometheus-stubs/__files/partial_response.json deleted file mode 100644 index a2da55c2b5..0000000000 --- a/hermes-management/src/test/resources/prometheus-stubs/__files/partial_response.json +++ /dev/null @@ -1,115 +0,0 @@ -{ - "status": "success", - "data": { - "resultType": "vector", - "result": [ - { - "metric": { - "__name__": "hermes_consumers_subscription_timeouts_total", - "group": "pl.allegro.tech.hermes", - "subscription": "hermesSubscription", - "topic": "hermesTopic" - }, - "value": [ - 1692281425.609, - "2" - ], - "group": 1 - }, - { - "metric": { - "__name__": "hermes_consumers_subscription_retries_total", - "group": "pl.allegro.tech.hermes", - "subscription": "hermesSubscription", - "topic": "hermesTopic" - }, - "value": [ - 1692281425.609, - "1" - ], - "group": 1 - }, - { - "metric": { - "__name__": "hermes_consumers_subscription_throughput_bytes_total", - "group": "pl.allegro.tech.hermes", - "subscription": "hermesSubscription", - "topic": "hermesTopic" - }, - "value": [ - 1692281425.609, - "3" - ], - "group": 1 - }, - { - "metric": { - "__name__": "hermes_consumers_subscription_other_errors_total", - "group": "pl.allegro.tech.hermes", - "subscription": "hermesSubscription", - "topic": "hermesTopic" - }, - "value": [ - 1692281425.609, - "4" - ], - "group": 1 - }, - { - "metric": { - "__name__": "hermes_consumers_subscription_batches_total", - "group": "pl.allegro.tech.hermes", - "subscription": "hermesSubscription", - "topic": "hermesTopic" - }, - "value": [ - 1692281425.609, - "5" - ], - "group": 1 - }, - { - "metric": { - "__name__": "hermes_consumers_subscription_http_status_codes_total", - "status_code": "200", - "group": "pl.allegro.tech.hermes", - "subscription": "hermesSubscription", - "topic": "hermesTopic" - }, - "value": [ - 1692281425.609, - "1" - ], - "group": 1 - }, - { - "metric": { - "__name__": "hermes_consumers_subscription_http_status_codes_total", - "status_code": "201", - "group": "pl.allegro.tech.hermes", - "subscription": "hermesSubscription", - "topic": "hermesTopic" - }, - "value": [ - 1692281425.609, - "1" - ], - "group": 1 - }, - { - "metric": { - "__name__": "hermes_consumers_subscription_http_status_codes_total", - "status_code": "401", - "group": "pl.allegro.tech.hermes", - "subscription": "hermesSubscription", - "topic": "hermesTopic" - }, - "value": [ - 1692281425.609, - "1" - ], - "group": 1 - } - ] - } -} \ No newline at end of file diff --git a/hermes-management/src/test/resources/prometheus-stubs/__files/prometheus_empty_response.json b/hermes-management/src/test/resources/prometheus-stubs/__files/prometheus_empty_response.json new file mode 100644 index 0000000000..ef28156d55 --- /dev/null +++ b/hermes-management/src/test/resources/prometheus-stubs/__files/prometheus_empty_response.json @@ -0,0 +1,7 @@ +{ + "status": "success", + "data": { + "resultType": "vector", + "result": [] + } +} diff --git a/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_2xx_http_status_codes_total.json b/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_2xx_http_status_codes_total.json new file mode 100644 index 0000000000..7a0da2287c --- /dev/null +++ b/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_2xx_http_status_codes_total.json @@ -0,0 +1,19 @@ +{ + "status": "success", + "data": { + "resultType": "vector", + "result": [ + { + "metric": { + "group": "pl.allegro.tech.hermes", + "subscription": "hermesSubscription", + "topic": "hermesTopic" + }, + "value": [ + 1692281425.609, + "2" + ] + } + ] + } +} diff --git a/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_4xx_http_status_codes_total.json b/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_4xx_http_status_codes_total.json new file mode 100644 index 0000000000..b4ae98b299 --- /dev/null +++ b/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_4xx_http_status_codes_total.json @@ -0,0 +1,19 @@ +{ + "status": "success", + "data": { + "resultType": "vector", + "result": [ + { + "metric": { + "group": "pl.allegro.tech.hermes", + "subscription": "hermesSubscription", + "topic": "hermesTopic" + }, + "value": [ + 1692281425.609, + "1" + ] + } + ] + } +} diff --git a/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_5xx_http_status_codes_total.json b/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_5xx_http_status_codes_total.json new file mode 100644 index 0000000000..7a0da2287c --- /dev/null +++ b/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_5xx_http_status_codes_total.json @@ -0,0 +1,19 @@ +{ + "status": "success", + "data": { + "resultType": "vector", + "result": [ + { + "metric": { + "group": "pl.allegro.tech.hermes", + "subscription": "hermesSubscription", + "topic": "hermesTopic" + }, + "value": [ + 1692281425.609, + "2" + ] + } + ] + } +} diff --git a/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_batches_total.json b/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_batches_total.json new file mode 100644 index 0000000000..5a460603df --- /dev/null +++ b/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_batches_total.json @@ -0,0 +1,19 @@ +{ + "status": "success", + "data": { + "resultType": "vector", + "result": [ + { + "metric": { + "group": "pl.allegro.tech.hermes", + "subscription": "hermesSubscription", + "topic": "hermesTopic" + }, + "value": [ + 1692281425.609, + "5" + ] + } + ] + } +} diff --git a/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_delivered_total.json b/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_delivered_total.json new file mode 100644 index 0000000000..b4ae98b299 --- /dev/null +++ b/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_delivered_total.json @@ -0,0 +1,19 @@ +{ + "status": "success", + "data": { + "resultType": "vector", + "result": [ + { + "metric": { + "group": "pl.allegro.tech.hermes", + "subscription": "hermesSubscription", + "topic": "hermesTopic" + }, + "value": [ + 1692281425.609, + "1" + ] + } + ] + } +} diff --git a/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_other_errors_total.json b/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_other_errors_total.json new file mode 100644 index 0000000000..ac0dc498bb --- /dev/null +++ b/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_other_errors_total.json @@ -0,0 +1,19 @@ +{ + "status": "success", + "data": { + "resultType": "vector", + "result": [ + { + "metric": { + "group": "pl.allegro.tech.hermes", + "subscription": "hermesSubscription", + "topic": "hermesTopic" + }, + "value": [ + 1692281425.609, + "4" + ] + } + ] + } +} diff --git a/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_retries_total.json b/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_retries_total.json new file mode 100644 index 0000000000..b4ae98b299 --- /dev/null +++ b/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_retries_total.json @@ -0,0 +1,19 @@ +{ + "status": "success", + "data": { + "resultType": "vector", + "result": [ + { + "metric": { + "group": "pl.allegro.tech.hermes", + "subscription": "hermesSubscription", + "topic": "hermesTopic" + }, + "value": [ + 1692281425.609, + "1" + ] + } + ] + } +} diff --git a/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_throughput_bytes_total.json b/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_throughput_bytes_total.json new file mode 100644 index 0000000000..1b944f3d6b --- /dev/null +++ b/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_throughput_bytes_total.json @@ -0,0 +1,19 @@ +{ + "status": "success", + "data": { + "resultType": "vector", + "result": [ + { + "metric": { + "group": "pl.allegro.tech.hermes", + "subscription": "hermesSubscription", + "topic": "hermesTopic" + }, + "value": [ + 1692281425.609, + "3" + ] + } + ] + } +} diff --git a/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_timeouts_total.json b/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_timeouts_total.json new file mode 100644 index 0000000000..7a0da2287c --- /dev/null +++ b/hermes-management/src/test/resources/prometheus-stubs/__files/subscription_timeouts_total.json @@ -0,0 +1,19 @@ +{ + "status": "success", + "data": { + "resultType": "vector", + "result": [ + { + "metric": { + "group": "pl.allegro.tech.hermes", + "subscription": "hermesSubscription", + "topic": "hermesTopic" + }, + "value": [ + 1692281425.609, + "2" + ] + } + ] + } +} diff --git a/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/prometheus/PrometheusExtension.java b/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/prometheus/PrometheusExtension.java index 8daf23be19..b39b5c3f48 100644 --- a/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/prometheus/PrometheusExtension.java +++ b/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/prometheus/PrometheusExtension.java @@ -12,12 +12,14 @@ import java.time.Duration; import java.util.List; -import java.util.stream.Collectors; import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; import static com.github.tomakehurst.wiremock.client.WireMock.get; import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; +import static pl.allegro.tech.hermes.management.infrastructure.prometheus.PrometheusClient.forSubscription; +import static pl.allegro.tech.hermes.management.infrastructure.prometheus.PrometheusClient.forSubscriptionStatusCode; +import static pl.allegro.tech.hermes.management.infrastructure.prometheus.PrometheusClient.forTopic; public class PrometheusExtension implements AfterEachCallback, BeforeAllCallback, ExtensionContext.Store.CloseableResource { @@ -47,19 +49,31 @@ public String getEndpoint() { return "http://localhost:" + wiremock.port(); } + @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance") public void stubSubscriptionMetrics(SubscriptionMetrics metrics) { - SubscriptionName name = metrics.name(); - String query = """ - sum by (__name__, group, topic, subscription, status_code) ( - irate( - {__name__=~'hermes_consumers_subscription_delivered_total|hermes_consumers_subscription_timeouts_total|hermes_consumers_subscription_retries_total|hermes_consumers_subscription_throughput_bytes_total|hermes_consumers_subscription_other_errors_total|hermes_consumers_subscription_batches_total|hermes_consumers_subscription_http_status_codes_total', group='%s', topic='%s', subscription='%s', }[1m] - ) keep_metric_names - ) - """ - .formatted(name.getTopicName().getGroupName(), name.getTopicName().getName(), name.getName()) - .lines() - .map(String::stripLeading) - .collect(Collectors.joining()); + SubscriptionName subName = metrics.name(); + String deliveredQuery = forSubscription("hermes_consumers_subscription_delivered_total", subName, ""); + String timeoutsQuery = forSubscription("hermes_consumers_subscription_timeouts_total", subName, ""); + String retriesQuery = forSubscription("hermes_consumers_subscription_retries_total", subName, ""); + String throughputQuery = forSubscription("hermes_consumers_subscription_throughput_bytes_total", subName, ""); + String errorsQuery = forSubscription("hermes_consumers_subscription_other_errors_total", subName, ""); + String batchesQuery = forSubscription("hermes_consumers_subscription_batches_total", subName, ""); + String statusCodes2xxQuery = forSubscriptionStatusCode("hermes_consumers_subscription_http_status_codes_total", subName, "2.*", ""); + String statusCodes4xxQuery = forSubscriptionStatusCode("hermes_consumers_subscription_http_status_codes_total", subName, "4.*", ""); + String statusCodes5xxQuery = forSubscriptionStatusCode("hermes_consumers_subscription_http_status_codes_total", subName, "5.*", ""); + + stub(deliveredQuery, metrics.toPrometheusRateResponse()); + stub(timeoutsQuery, metrics.toPrometheusDefaultResponse()); + stub(retriesQuery, metrics.toPrometheusDefaultResponse()); + stub(throughputQuery, metrics.toPrometheusThroughputResponse()); + stub(errorsQuery, metrics.toPrometheusDefaultResponse()); + stub(batchesQuery, metrics.toPrometheusDefaultResponse()); + stub(statusCodes2xxQuery, metrics.toPrometheusStatusCodesResponse()); + stub(statusCodes4xxQuery, metrics.toPrometheusStatusCodesResponse()); + stub(statusCodes5xxQuery, metrics.toPrometheusStatusCodesResponse()); + } + + private void stub(String query, PrometheusResponse response) { wiremock.addStubMapping( get(urlPathEqualTo("/api/v1/query")) .withQueryParam("query", equalTo(query)) @@ -67,36 +81,21 @@ sum by (__name__, group, topic, subscription, status_code) ( aResponse() .withStatus(200) .withHeader("Content-Type", "application/json") - .withBody(writeValueAsString(metrics.toPrometheusResponse())) + .withBody(writeValueAsString(response)) ) .build() ); } public void stubTopicMetrics(TopicMetrics metrics) { - TopicName name = metrics.name(); - String query = """ - sum by (__name__, group, topic) ( - irate( - {__name__=~'hermes_frontend_topic_requests_total|hermes_consumers_subscription_delivered_total|hermes_frontend_topic_throughput_bytes_total', group='%s', topic='%s', }[1m] - ) keep_metric_names - ) - """ - .formatted(name.getGroupName(), name.getName()) - .lines() - .map(String::stripLeading) - .collect(Collectors.joining()); - wiremock.addStubMapping( - get(urlPathEqualTo("/api/v1/query")) - .withQueryParam("query", equalTo(query)) - .willReturn( - aResponse() - .withStatus(200) - .withHeader("Content-Type", "application/json") - .withBody(writeValueAsString(metrics.toPrometheusResponse())) - ) - .build() - ); + TopicName topicName = metrics.name(); + String requestsQuery = forTopic("hermes_frontend_topic_requests_total", topicName, ""); + String deliveredQuery = forTopic("hermes_consumers_subscription_delivered_total", topicName, ""); + String throughputQuery = forTopic("hermes_frontend_topic_throughput_bytes_total", topicName, ""); + + stub(requestsQuery, metrics.toPrometheusRequestsResponse()); + stub(deliveredQuery, metrics.toDeliveredResponse()); + stub(throughputQuery, metrics.toPrometheusThroughputResponse()); } public void stubDelay(Duration duration) { diff --git a/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/prometheus/PrometheusResponse.java b/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/prometheus/PrometheusResponse.java index f6aa84e197..df4bd8757d 100644 --- a/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/prometheus/PrometheusResponse.java +++ b/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/prometheus/PrometheusResponse.java @@ -1,6 +1,5 @@ package pl.allegro.tech.hermes.integrationtests.prometheus; -import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; @@ -12,12 +11,6 @@ record Data(@JsonProperty("resultType") String resultType, @JsonProperty("result") List results) { } - record Result(@JsonProperty("metric") MetricName metricName, - @JsonProperty("value") List values) { - } - - @JsonInclude(JsonInclude.Include.NON_NULL) - record MetricName(@JsonProperty(value = "__name__") String name, - @JsonProperty(value = "status_code") String statusCode) { + record Result(@JsonProperty("value") List values) { } } diff --git a/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/prometheus/SubscriptionMetrics.java b/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/prometheus/SubscriptionMetrics.java index fbe2efbd0c..438def3702 100644 --- a/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/prometheus/SubscriptionMetrics.java +++ b/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/prometheus/SubscriptionMetrics.java @@ -7,33 +7,38 @@ import java.util.List; import java.util.Map; -public record SubscriptionMetrics(SubscriptionName name, int rate, int throughput, Map ratePerStatusCode) { +public record SubscriptionMetrics(SubscriptionName name, int rate, int throughput, + Map ratePerStatusCode) { private static final String TIMESTAMP = "1396860420"; - private static final String SUBSCRIPTION_DELIVERED = "hermes_consumers_subscription_delivered_total"; - private static final String SUBSCRIPTION_THROUGHPUT = "hermes_consumers_subscription_throughput_bytes_total"; - private static final String SUBSCRIPTION_STATUS_CODES = "hermes_consumers_subscription_http_status_codes_total"; public static SubscriptionMetricsBuilder subscriptionMetrics(SubscriptionName name) { return new SubscriptionMetricsBuilder(name); } - PrometheusResponse toPrometheusResponse() { + PrometheusResponse toPrometheusRateResponse() { List results = new ArrayList<>(); results.add( new PrometheusResponse.Result( - new PrometheusResponse.MetricName(SUBSCRIPTION_DELIVERED, null), List.of(TIMESTAMP, String.valueOf(rate))) ); + return new PrometheusResponse("success", new PrometheusResponse.Data("vector", results)); + } + + PrometheusResponse toPrometheusThroughputResponse() { + List results = new ArrayList<>(); results.add( new PrometheusResponse.Result( - new PrometheusResponse.MetricName(SUBSCRIPTION_THROUGHPUT, null), List.of(TIMESTAMP, String.valueOf(throughput)) ) ); + return new PrometheusResponse("success", new PrometheusResponse.Data("vector", results)); + } + + PrometheusResponse toPrometheusStatusCodesResponse() { + List results = new ArrayList<>(); ratePerStatusCode.forEach((code, rate) -> results.add( new PrometheusResponse.Result( - new PrometheusResponse.MetricName(SUBSCRIPTION_STATUS_CODES, code), List.of(TIMESTAMP, String.valueOf(rate)) ) ) @@ -41,6 +46,17 @@ PrometheusResponse toPrometheusResponse() { return new PrometheusResponse("success", new PrometheusResponse.Data("vector", results)); } + PrometheusResponse toPrometheusDefaultResponse() { + List results = new ArrayList<>(); + ratePerStatusCode.forEach((code, rate) -> results.add( + new PrometheusResponse.Result( + List.of(TIMESTAMP, "0.0") + ) + ) + ); + return new PrometheusResponse("success", new PrometheusResponse.Data("vector", results)); + } + public static class SubscriptionMetricsBuilder { private final SubscriptionName name; private int rate = 0; diff --git a/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/prometheus/TopicMetrics.java b/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/prometheus/TopicMetrics.java index 06ec44cf6b..3481e953ea 100644 --- a/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/prometheus/TopicMetrics.java +++ b/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/prometheus/TopicMetrics.java @@ -7,30 +7,46 @@ public record TopicMetrics(TopicName name, int rate, int deliveryRate, int throughput) { private static final String TIMESTAMP = "1396860420"; - private static final String TOPIC_REQUESTS_TOTAL = "hermes_frontend_topic_requests_total"; - private static final String TOPIC_DELIVERED_TOTAL = "hermes_consumers_subscription_delivered_total"; - private static final String TOPIC_THROUGHPUT_TOTAL = "hermes_frontend_topic_throughput_bytes_total"; public static TopicMetricsBuilder topicMetrics(TopicName name) { return new TopicMetricsBuilder(name); } - PrometheusResponse toPrometheusResponse() { + PrometheusResponse toPrometheusRequestsResponse() { return new PrometheusResponse( "success", new PrometheusResponse.Data( "vector", List.of( new PrometheusResponse.Result( - new PrometheusResponse.MetricName(TOPIC_REQUESTS_TOTAL, null), List.of(TIMESTAMP, String.valueOf(rate)) - ), + ) + ) + ) + ); + } + + PrometheusResponse toDeliveredResponse() { + return new PrometheusResponse( + "success", + new PrometheusResponse.Data( + "vector", + List.of( new PrometheusResponse.Result( - new PrometheusResponse.MetricName(TOPIC_DELIVERED_TOTAL, null), List.of(TIMESTAMP, String.valueOf(deliveryRate)) - ), + ) + ) + ) + ); + } + + PrometheusResponse toPrometheusThroughputResponse() { + return new PrometheusResponse( + "success", + new PrometheusResponse.Data( + "vector", + List.of( new PrometheusResponse.Result( - new PrometheusResponse.MetricName(TOPIC_THROUGHPUT_TOTAL, null), List.of(TIMESTAMP, String.valueOf(throughput)) ) ) diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/QueryEndpointTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/QueryEndpointTest.java index 12f9badb8b..023c092dcb 100644 --- a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/QueryEndpointTest.java +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/QueryEndpointTest.java @@ -527,7 +527,7 @@ public void shouldHandleUnavailableSubscriptionsMetrics() { ); String queryGetAllSubscriptionsMetrics = "{\"query\": {}}"; String queryGetSubscriptionsMetricsWithPositiveRate = "{\"query\": {\"rate\": {\"gt\": 0}}}"; - prometheus.stubDelay(ofMinutes(10)); + prometheus.stubDelay(Duration.ofMillis(3000)); waitAtMost(adjust(Duration.ofMinutes(1))).untilAsserted(() -> { // when diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/SubscriptionManagementTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/SubscriptionManagementTest.java index a5629282be..a1e3f1d475 100644 --- a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/SubscriptionManagementTest.java +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/SubscriptionManagementTest.java @@ -332,7 +332,6 @@ public void shouldReturnHealthyStatusForAHealthySubscription() { WebTestClient.ResponseSpec response = hermes.api().getSubscriptionHealth(topic.getQualifiedName(), subscription.getName()); // then - assertThat(response.expectBody(SubscriptionHealth.class).returnResult().getResponseBody()) .isEqualTo(SubscriptionHealth.HEALTHY); } @@ -386,8 +385,8 @@ public void shouldNotAllowSubscriptionWithBatchDeliveryAndAvroContentType() { // given Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); Subscription subscription = subscriptionWithRandomName(topic.getName()) - .withDeliveryType(DeliveryType.BATCH) - .withContentType(ContentType.AVRO) + .withDeliveryType(DeliveryType.BATCH) + .withContentType(ContentType.AVRO) .build(); // when