Skip to content

Commit

Permalink
Fixed bug leading to semaphore leak
Browse files Browse the repository at this point in the history
  • Loading branch information
szczygiel-m committed Aug 14, 2024
1 parent 4292174 commit ffcc75b
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ private void initializeMessageReceiver() {
rateLimiter,
loadRecorder,
metrics,
pendingOffsets::markAsProcessed
pendingOffsets::markAsFiltered
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

public enum MessageState {
INFLIGHT,
PROCESSED
PROCESSED,
FILTERED
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ public Set<SubscriptionPartitionOffset> calculateOffsetsToBeCommitted(Map<Subscr
try (HermesTimerContext ignored = timer.time()) {
List<SubscriptionPartitionOffset> processedOffsets = new ArrayList<>();
for (Map.Entry<SubscriptionPartitionOffset, MessageState> entry : offsets.entrySet()) {
if (entry.getValue() == MessageState.PROCESSED) {
// we consider filtered messages as processed
if (entry.getValue() != MessageState.INFLIGHT) {
processedOffsets.add(entry.getKey());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ public void markAsProcessed(SubscriptionPartitionOffset subscriptionPartitionOff
slots.put(subscriptionPartitionOffset, MessageState.PROCESSED);
}

public void markAsFiltered(SubscriptionPartitionOffset subscriptionPartitionOffset) {
slots.put(subscriptionPartitionOffset, MessageState.FILTERED);
}

public boolean tryAcquireSlot(Duration processingInterval) throws InterruptedException {
if (inflightSemaphore.tryAcquire(processingInterval.toMillis(), TimeUnit.MILLISECONDS)) {
if (maxPendingOffsetsSemaphore.tryAcquire(processingInterval.toMillis(), TimeUnit.MILLISECONDS)) {
Expand All @@ -72,6 +76,8 @@ public Map<SubscriptionPartitionOffset, MessageState> getOffsetsSnapshotAndRelea
if (entry.getValue() == MessageState.PROCESSED) {
slots.remove(entry.getKey());
permitsReleased++;
} else if (entry.getValue() == MessageState.FILTERED) {
slots.remove(entry.getKey());
}
}
maxPendingOffsetsSemaphore.release(permitsReleased);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,69 @@ public void shouldReportMetricForFilteredSubscription() {
);
}

@Test
public void shouldNotIncreaseInflightForFilteredSubscription() {
// given
TestSubscriber subscriber = subscribers.createSubscriber(503);
Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build());
final Subscription subscription = hermes.initHelper().createSubscription(
subscription(topic, "subscription")
.withEndpoint(subscriber.getEndpoint())
.withSubscriptionPolicy(
subscriptionPolicy()
.withMessageTtl(3600)
.withInflightSize(1)
.build()
)
.withFilter(filterMatchingHeaderByPattern("Trace-Id", "^vte.*"))
.build()
);
TestMessage unfiltered = TestMessage.of("msg", "unfiltered");
TestMessage filtered = TestMessage.of("msg", "filtered");

// when
hermes.api().publishUntilSuccess(topic.getQualifiedName(), filtered.body(), header("Trace-Id", "otherTraceId"));
hermes.api().publishUntilSuccess(topic.getQualifiedName(), filtered.body(), header("Trace-Id", "otherTraceId"));

// then
waitAtMost(Duration.ofSeconds(10)).untilAsserted(() ->
hermes.api().getConsumersMetrics()
.expectStatus()
.isOk()
.expectBody(String.class)
.value((body) -> assertThatMetrics(body)
.contains("hermes_consumers_subscription_inflight")
.withLabels(
"group", topic.getName().getGroupName(),
"subscription", subscription.getName(),
"topic", topic.getName().getName()
)
.withValue(0.0)
)
);

// when
hermes.api().publishUntilSuccess(topic.getQualifiedName(), unfiltered.body(), header("Trace-Id", "vte12"));
hermes.api().publishUntilSuccess(topic.getQualifiedName(), unfiltered.body(), header("Trace-Id", "vte12"));

// then
waitAtMost(Duration.ofSeconds(10)).untilAsserted(() ->
hermes.api().getConsumersMetrics()
.expectStatus()
.isOk()
.expectBody(String.class)
.value((body) -> assertThatMetrics(body)
.contains("hermes_consumers_subscription_inflight")
.withLabels(
"group", topic.getName().getGroupName(),
"subscription", subscription.getName(),
"topic", topic.getName().getName()
)
.withValue(1.0)
)
);
}

@Test
public void shouldReportMetricsForSuccessfulBatchDelivery() {
// given
Expand Down

0 comments on commit ffcc75b

Please sign in to comment.