diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Message.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Message.java index c0ed185669..46fd852618 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Message.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Message.java @@ -22,8 +22,6 @@ import java.util.Optional; import java.util.Set; -import static java.util.stream.Collectors.toList; - /** * Implementation note: this class is partially mutable and may be accessed from multiple * threads involved in message lifecycle, it must be thread safe. @@ -53,6 +51,8 @@ public class Message implements FilterableMessage { private long currentMessageBackoff = -1; + private boolean isFiltered = false; + public Message(String id, String topic, byte[] content, @@ -122,7 +122,7 @@ public boolean isTtlExceeded(long ttlMillis) { public synchronized void incrementRetryCounter(Collection succeededUris) { this.retryCounter++; - this.succeededUris.addAll(succeededUris.stream().map(URI::toString).collect(toList())); + this.succeededUris.addAll(succeededUris.stream().map(URI::toString).toList()); } public synchronized int getRetryCounter() { @@ -206,6 +206,14 @@ public String getSubscription() { return subscription; } + public synchronized boolean isFiltered() { + return isFiltered; + } + + public synchronized void setFiltered(boolean filtered) { + isFiltered = filtered; + } + public static class Builder { private String id; private PartitionOffset partitionOffset; diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/SerialConsumer.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/SerialConsumer.java index aa4bc636e2..bf4f4f2025 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/SerialConsumer.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/SerialConsumer.java @@ -124,15 +124,19 @@ public void consume(Runnable signalsInterrupt) { if (maybeMessage.isPresent()) { Message message = maybeMessage.get(); - if (logger.isDebugEnabled()) { - logger.debug( - "Read message {} partition {} offset {}", - message.getContentType(), message.getPartition(), message.getOffset() - ); + if (message.isFiltered()) { + profiler.flushMeasurements(ConsumerRun.FILTERED); + } else { + if (logger.isDebugEnabled()) { + logger.debug( + "Read message {} partition {} offset {}", + message.getContentType(), message.getPartition(), message.getOffset() + ); + } + + Message convertedMessage = messageConverterResolver.converterFor(message, subscription).convert(message, topic); + sendMessage(convertedMessage, profiler); } - - Message convertedMessage = messageConverterResolver.converterFor(message, subscription).convert(message, topic); - sendMessage(convertedMessage, profiler); } else { pendingOffsets.releaseSlot(); profiler.flushMeasurements(ConsumerRun.EMPTY); diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/batch/MessageBatchReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/batch/MessageBatchReceiver.java index efbdc09f77..f85236ea36 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/batch/MessageBatchReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/batch/MessageBatchReceiver.java @@ -109,12 +109,14 @@ private Optional readAndTransform(Subscription subscription, String bat if (maybeMessage.isPresent()) { Message message = maybeMessage.get(); - Message transformed = messageConverterResolver.converterFor(message, subscription).convert(message, topic); - transformed = message().fromMessage(transformed).withData(wrap(subscription, transformed)).build(); + if (!message.isFiltered()) { + Message transformed = messageConverterResolver.converterFor(message, subscription).convert(message, topic); + transformed = message().fromMessage(transformed).withData(wrap(subscription, transformed)).build(); - trackers.get(subscription).logInflight(toMessageMetadata(transformed, subscription, batchId)); + trackers.get(subscription).logInflight(toMessageMetadata(transformed, subscription, batchId)); - return Optional.of(transformed); + return Optional.of(transformed); + } } return Optional.empty(); } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/profiling/ConsumerRun.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/profiling/ConsumerRun.java index ea4e3dbfe1..41cce38a1a 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/profiling/ConsumerRun.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/profiling/ConsumerRun.java @@ -1,5 +1,5 @@ package pl.allegro.tech.hermes.consumers.consumer.profiling; public enum ConsumerRun { - EMPTY, DELIVERED, DISCARDED, RETRIED + EMPTY, DELIVERED, DISCARDED, RETRIED, FILTERED } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/MessageReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/MessageReceiver.java index 8ba64a8d3f..b875bd8b40 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/MessageReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/MessageReceiver.java @@ -10,6 +10,21 @@ public interface MessageReceiver { + /** + * Retrieves the next available message from the queue. + * + *

Depending on the context, the returned {@link Optional} can contain: + *

+ * + * @return an {@link Optional} containing the next {@link Message} if available; + * an {@link Optional} containing a filtered message if it should be skipped; + * or an empty {@link Optional} if there are no messages in the queue. + */ Optional next(); default void stop() {} diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/FilteringMessageReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/FilteringMessageReceiver.java index 430e426002..d2a3634458 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/FilteringMessageReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/FilteringMessageReceiver.java @@ -35,15 +35,14 @@ public FilteringMessageReceiver(MessageReceiver receiver, @Override public Optional next() { - return receiver.next().map(message -> - allow(message) ? message : null - ); + return receiver.next().map(this::filter); } - private boolean allow(Message message) { + private Message filter(Message message) { FilterResult result = filterChain.apply(message); filteredMessageHandler.handle(result, message, subscription); - return !result.isFiltered(); + message.setFiltered(result.isFiltered()); + return message; } @Override diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/BatchDeliveryTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/BatchDeliveryTest.java index 1a700ebe59..e3ead4bd1b 100644 --- a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/BatchDeliveryTest.java +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/BatchDeliveryTest.java @@ -6,6 +6,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import pl.allegro.tech.hermes.api.BatchSubscriptionPolicy; import pl.allegro.tech.hermes.api.ContentType; +import pl.allegro.tech.hermes.api.MessageFilterSpecification; import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.Topic; import pl.allegro.tech.hermes.integrationtests.setup.HermesExtension; @@ -23,10 +24,14 @@ import static com.github.tomakehurst.wiremock.client.WireMock.post; import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED; +import static com.google.common.collect.ImmutableMap.of; import static java.util.Arrays.stream; +import static org.awaitility.Awaitility.waitAtMost; import static pl.allegro.tech.hermes.api.BatchSubscriptionPolicy.Builder.batchSubscriptionPolicy; import static pl.allegro.tech.hermes.api.TopicWithSchema.topicWithSchema; +import static pl.allegro.tech.hermes.integrationtests.assertions.HermesAssertions.assertThatMetrics; import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription; +import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscriptionWithRandomName; import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topicWithRandomName; public class BatchDeliveryTest { @@ -39,10 +44,19 @@ public class BatchDeliveryTest { @RegisterExtension public static final TestSubscribersExtension subscribers = new TestSubscribersExtension(); + static final AvroUser BOB = new AvroUser("Bob", 50, "blue"); + + static final AvroUser ALICE = new AvroUser("Alice", 20, "magenta"); + private static final TestMessage[] SMALL_BATCH = TestMessage.simpleMessages(2); private static final TestMessage SINGLE_MESSAGE = TestMessage.simple(); + private static final TestMessage SINGLE_MESSAGE_FILTERED = BOB.asTestMessage(); + + private static final MessageFilterSpecification MESSAGE_NAME_FILTER = + new MessageFilterSpecification(of("type", "jsonpath", "path", ".name", "matcher", "^Bob.*")); + @Test public void shouldDeliverMessagesInBatch() { // given @@ -67,6 +81,43 @@ public void shouldDeliverMessagesInBatch() { expectSingleBatch(subscriber, SMALL_BATCH); } + @Test + public void shouldFilterIncomingEventsForBatch() { + // given + TestSubscriber subscriber = subscribers.createSubscriber(); + Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); + final Subscription subscription = hermes.initHelper().createSubscription(subscriptionWithRandomName(topic.getName(), subscriber.getEndpoint()) + .withSubscriptionPolicy(buildBatchPolicy() + .withBatchSize(2) + .withBatchTime(3) + .withBatchVolume(1024) + .build()) + .withFilter(MESSAGE_NAME_FILTER) + .build()); + + // when + hermes.api().publishUntilSuccess(topic.getQualifiedName(), BOB.asJson()); + hermes.api().publishUntilSuccess(topic.getQualifiedName(), ALICE.asJson()); + + // then + expectSingleBatch(subscriber, SINGLE_MESSAGE_FILTERED); + waitAtMost(Duration.ofSeconds(10)).untilAsserted(() -> + hermes.api().getConsumersMetrics() + .expectStatus() + .isOk() + .expectBody(String.class) + .value((body) -> assertThatMetrics(body) + .contains("hermes_consumers_subscription_filtered_out_total") + .withLabels( + "group", topic.getName().getGroupName(), + "subscription", subscription.getName(), + "topic", topic.getName().getName() + ) + .withValue(1.0) + ) + ); + } + @Test public void shouldDeliverBatchInGivenTimePeriod() { // given diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/MetricsTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/MetricsTest.java index a10cec2ba3..5559711bde 100644 --- a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/MetricsTest.java +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/MetricsTest.java @@ -257,6 +257,69 @@ public void shouldReportMetricForFilteredSubscription() { ); } + @Test + public void shouldNotIncreaseInflightForFilteredSubscription() { + // given + TestSubscriber subscriber = subscribers.createSubscriber(503); + Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); + final Subscription subscription = hermes.initHelper().createSubscription( + subscription(topic, "subscription") + .withEndpoint(subscriber.getEndpoint()) + .withSubscriptionPolicy( + subscriptionPolicy() + .withMessageTtl(3600) + .withInflightSize(1) + .build() + ) + .withFilter(filterMatchingHeaderByPattern("Trace-Id", "^vte.*")) + .build() + ); + TestMessage unfiltered = TestMessage.of("msg", "unfiltered"); + TestMessage filtered = TestMessage.of("msg", "filtered"); + + // when + hermes.api().publishUntilSuccess(topic.getQualifiedName(), filtered.body(), header("Trace-Id", "otherTraceId")); + hermes.api().publishUntilSuccess(topic.getQualifiedName(), filtered.body(), header("Trace-Id", "otherTraceId")); + + // then + waitAtMost(Duration.ofSeconds(10)).untilAsserted(() -> + hermes.api().getConsumersMetrics() + .expectStatus() + .isOk() + .expectBody(String.class) + .value((body) -> assertThatMetrics(body) + .contains("hermes_consumers_subscription_inflight") + .withLabels( + "group", topic.getName().getGroupName(), + "subscription", subscription.getName(), + "topic", topic.getName().getName() + ) + .withValue(0.0) + ) + ); + + // when + hermes.api().publishUntilSuccess(topic.getQualifiedName(), unfiltered.body(), header("Trace-Id", "vte12")); + hermes.api().publishUntilSuccess(topic.getQualifiedName(), unfiltered.body(), header("Trace-Id", "vte12")); + + // then + waitAtMost(Duration.ofSeconds(10)).untilAsserted(() -> + hermes.api().getConsumersMetrics() + .expectStatus() + .isOk() + .expectBody(String.class) + .value((body) -> assertThatMetrics(body) + .contains("hermes_consumers_subscription_inflight") + .withLabels( + "group", topic.getName().getGroupName(), + "subscription", subscription.getName(), + "topic", topic.getName().getName() + ) + .withValue(1.0) + ) + ); + } + @Test public void shouldReportMetricsForSuccessfulBatchDelivery() { // given