Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete Kafka consumer group on Hermes subscription deletion #1921

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,13 @@ public SubscriptionService subscriptionService(
@Bean
public SubscriptionRemover subscriptionRemover(
Auditor auditor,
MultiDCAwareService multiDCAwareService,
MultiDatacenterRepositoryCommandExecutor multiDatacenterRepositoryCommandExecutor,
SubscriptionOwnerCache subscriptionOwnerCache,
SubscriptionRepository subscriptionRepository) {
return new SubscriptionRemover(
auditor,
multiDCAwareService,
multiDatacenterRepositoryCommandExecutor,
subscriptionOwnerCache,
subscriptionRepository);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ MultiDCAwareService multiDCAwareService(
new OffsetsAvailableChecker(consumerPool, storage),
new LogEndOffsetChecker(consumerPool),
brokerAdminClient,
createConsumerGroupManager(kafkaProperties, kafkaNamesMapper),
createConsumerGroupManager(
kafkaProperties, kafkaNamesMapper, brokerAdminClient),
createKafkaConsumerManager(kafkaProperties, kafkaNamesMapper));
})
.collect(toList());
Expand All @@ -126,13 +127,16 @@ MultiDCAwareService multiDCAwareService(
}

private ConsumerGroupManager createConsumerGroupManager(
KafkaProperties kafkaProperties, KafkaNamesMapper kafkaNamesMapper) {
KafkaProperties kafkaProperties,
KafkaNamesMapper kafkaNamesMapper,
AdminClient kafkaAdminClient) {
return subscriptionProperties.isCreateConsumerGroupManuallyEnabled()
? new KafkaConsumerGroupManager(
kafkaNamesMapper,
kafkaProperties.getQualifiedClusterName(),
kafkaProperties.getBrokerList(),
kafkaProperties)
kafkaProperties,
kafkaAdminClient)
: new NoOpConsumerGroupManager();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@
public interface ConsumerGroupManager {

void createConsumerGroup(Topic topic, Subscription subscription);

void deleteConsumerGroup(Topic topic, Subscription subscription);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,39 @@
import pl.allegro.tech.hermes.management.domain.auth.RequestUser;
import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor;
import pl.allegro.tech.hermes.management.domain.subscription.commands.RemoveSubscriptionRepositoryCommand;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService;

public class SubscriptionRemover {

private static final Logger logger = LoggerFactory.getLogger(SubscriptionRemover.class);
private final Auditor auditor;
private final MultiDCAwareService multiDCAwareService;
private final MultiDatacenterRepositoryCommandExecutor multiDcExecutor;
private final SubscriptionOwnerCache subscriptionOwnerCache;
private final SubscriptionRepository subscriptionRepository;

public SubscriptionRemover(
Auditor auditor,
MultiDCAwareService multiDCAwareService,
MultiDatacenterRepositoryCommandExecutor multiDcExecutor,
SubscriptionOwnerCache subscriptionOwnerCache,
SubscriptionRepository subscriptionRepository) {
this.auditor = auditor;
this.multiDCAwareService = multiDCAwareService;
this.multiDcExecutor = multiDcExecutor;
this.subscriptionOwnerCache = subscriptionOwnerCache;
this.subscriptionRepository = subscriptionRepository;
}

public void removeSubscription(
TopicName topicName, String subscriptionName, RequestUser removedBy) {
public void removeSubscription(Topic topic, Subscription subscription, RequestUser removedBy) {
auditor.beforeObjectRemoval(
removedBy.getUsername(), Subscription.class.getSimpleName(), subscriptionName);
Subscription subscription =
subscriptionRepository.getSubscriptionDetails(topicName, subscriptionName);
removedBy.getUsername(), Subscription.class.getSimpleName(), subscription.getName());
multiDCAwareService.deleteConsumerGroups(topic, subscription);
multiDcExecutor.executeByUser(
new RemoveSubscriptionRepositoryCommand(topicName, subscriptionName), removedBy);
new RemoveSubscriptionRepositoryCommand(topic.getName(), subscription.getName()),
removedBy);
auditor.objectRemoved(removedBy.getUsername(), subscription);
subscriptionOwnerCache.onRemovedSubscription(subscriptionName, topicName);
subscriptionOwnerCache.onRemovedSubscription(subscription.getName(), topic.getName());
}

public void removeSubscriptionRelatedToTopic(Topic topic, RequestUser removedBy) {
Expand All @@ -50,7 +53,7 @@ public void removeSubscriptionRelatedToTopic(Topic topic, RequestUser removedBy)
logger.info(
"Removing subscriptions of topic: {}, subscriptions: {}", topic.getName(), subscriptions);
long start = System.currentTimeMillis();
subscriptions.forEach(sub -> removeSubscription(topic.getName(), sub.getName(), removedBy));
subscriptions.forEach(sub -> removeSubscription(topic, sub, removedBy));
logger.info(
"Removed subscriptions of topic: {} in {} ms",
topic.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ private Set<Subscription.State> loadSubscriptionStatesFromAllDc(

public void removeSubscription(
TopicName topicName, String subscriptionName, RequestUser removedBy) {
subscriptionRemover.removeSubscription(topicName, subscriptionName, removedBy);
Topic topic = topicService.getTopicDetails(topicName);
Subscription subscription =
subscriptionRepository.getSubscriptionDetails(topicName, subscriptionName);
subscriptionRemover.removeSubscription(topic, subscription, removedBy);
}

public void updateSubscription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ public void createConsumerGroups(Topic topic, Subscription subscription) {
clusters.forEach(clusterService -> clusterService.createConsumerGroup(topic, subscription));
}

public void deleteConsumerGroups(Topic topic, Subscription subscription) {
clusters.forEach(clusterService -> clusterService.deleteConsumerGroup(topic, subscription));
}

private void waitUntilOffsetsAreMoved(Topic topic, String subscriptionName) {
Instant abortAttemptsInstant = clock.instant().plus(offsetsMovedTimeout);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ public void createConsumerGroup(Topic topic, Subscription subscription) {
consumerGroupManager.createConsumerGroup(topic, subscription);
}

public void deleteConsumerGroup(Topic topic, Subscription subscription) {
consumerGroupManager.deleteConsumerGroup(topic, subscription);
}

public Optional<ConsumerGroup> describeConsumerGroup(Topic topic, String subscriptionName) {
return consumerGroupsDescriber.describeConsumerGroup(topic, subscriptionName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.management.config.kafka.KafkaProperties;
import pl.allegro.tech.hermes.management.domain.subscription.ConsumerGroupManager;
Expand All @@ -25,15 +28,18 @@ public class KafkaConsumerGroupManager implements ConsumerGroupManager {
private final KafkaNamesMapper kafkaNamesMapper;
private final String clusterName;
private final KafkaConsumerManager consumerManager;
private final AdminClient kafkaAdminClient;

public KafkaConsumerGroupManager(
KafkaNamesMapper kafkaNamesMapper,
String clusterName,
String brokerList,
KafkaProperties kafkaProperties) {
KafkaProperties kafkaProperties,
AdminClient kafkaAdminClient) {
this.kafkaNamesMapper = kafkaNamesMapper;
this.clusterName = clusterName;
this.consumerManager = new KafkaConsumerManager(kafkaProperties, kafkaNamesMapper, brokerList);
this.kafkaAdminClient = kafkaAdminClient;
}

@Override
Expand Down Expand Up @@ -80,4 +86,31 @@ public void createConsumerGroup(Topic topic, Subscription subscription) {
e);
}
}

@Override
public void deleteConsumerGroup(Topic topic, Subscription subscription) {
logger.info(
"Deleting consumer group for subscription {}, cluster: {}",
subscription.getQualifiedName(),
clusterName);

try {
ConsumerGroupId groupId = kafkaNamesMapper.toConsumerGroupId(subscription.getQualifiedName());
kafkaAdminClient
.deleteConsumerGroups(Collections.singletonList(groupId.asString()))
.all()
.get();

logger.info(
"Successfully deleted consumer group for subscription {}, cluster: {}",
subscription.getQualifiedName(),
clusterName);
} catch (Exception e) {
logger.error(
"Failed to delete consumer group for subscription {}, cluster: {}",
subscription.getQualifiedName(),
clusterName,
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,9 @@ public class NoOpConsumerGroupManager implements ConsumerGroupManager {
public void createConsumerGroup(Topic topic, Subscription subscription) {
// no operation
}

@Override
public void deleteConsumerGroup(Topic topic, Subscription subscription) {
// no operation
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class KafkaConsumerGroupManagerSpec extends Specification {
}

def setup() {
consumerGroupManager = new KafkaConsumerGroupManager(kafkaNamesMapper, "primary", kafkaContainer.bootstrapServers, new KafkaProperties())
consumerGroupManager = new KafkaConsumerGroupManager(kafkaNamesMapper, "primary", kafkaContainer.bootstrapServers, new KafkaProperties(), adminClient)
}

def "should create consumer group with offset equal to last topic offset"() {
Expand Down Expand Up @@ -100,6 +100,9 @@ class KafkaConsumerGroupManagerSpec extends Specification {
and:
output.toString().contains 'Creating consumer group for subscription pl.allegro.test.Foo$test-subscription, cluster: primary'
output.toString().contains 'Successfully created consumer group for subscription pl.allegro.test.Foo$test-subscription, cluster: primary'

cleanup:
deleteKafkaTopic(kafkaTopicName)
}

def "should override existing consumer group using offsets from the old consumer group"() {
Expand Down Expand Up @@ -131,6 +134,9 @@ class KafkaConsumerGroupManagerSpec extends Specification {
topicPartitionOffsets.get(new TopicPartition(kafkaTopicName, 0)).offset() == 10
topicPartitionOffsets.get(new TopicPartition(kafkaTopicName, 1)).offset() == 20
topicPartitionOffsets.get(new TopicPartition(kafkaTopicName, 2)).offset() == 15

cleanup:
deleteKafkaTopic(kafkaTopicName)
}

def "should not create consumer group and log exception in case of request timeout"() {
Expand All @@ -151,6 +157,68 @@ class KafkaConsumerGroupManagerSpec extends Specification {
kafkaContainer.dockerClient.unpauseContainerCmd(containerId).exec()
}

def "Should delete specified consumer group and retain others"() {
given:
Topic topic = createAvroTopic("pl.allegro.test.DeletionOfConsumerGroup")
String kafkaTopicName = kafkaNamesMapper.toKafkaTopics(topic).primary.name().asString()
createTopicInKafka(kafkaTopicName, 3)

and:
Subscription subscription = createTestSubscription(topic, "test-subscription")
ConsumerGroupId consumerGroupId = kafkaNamesMapper.toConsumerGroupId(subscription.qualifiedName)
consumerGroupManager.createConsumerGroup(topic, subscription)

and:
Subscription subscriptionToDelete = createTestSubscription(topic, "test-subscription-to-delete")
consumerGroupManager.createConsumerGroup(topic, subscriptionToDelete)

when:
consumerGroupManager.deleteConsumerGroup(topic, subscriptionToDelete)

then:
adminClient.listConsumerGroups().all().get().collect { it.groupId() } == [consumerGroupId.asString()]

cleanup:
consumerGroupManager.deleteConsumerGroup(topic, subscription)
deleteKafkaTopic(kafkaTopicName)
}

def "Should handle deletion attempt for non-existing consumer group gracefully"() {
given:
Topic topic = createAvroTopic("pl.allegro.test.DeletionOfNonExistingConsumerGroup")
String kafkaTopicName = kafkaNamesMapper.toKafkaTopics(topic).primary.name().asString()
createTopicInKafka(kafkaTopicName, 3)

and:
Subscription subscription = createTestSubscription(topic, "test-subscription")
ConsumerGroupId consumerGroupId = kafkaNamesMapper.toConsumerGroupId(subscription.qualifiedName)
consumerGroupManager.createConsumerGroup(topic, subscription)

when:
consumerGroupManager.deleteConsumerGroup(topic, createTestSubscription(topic, "test-subscription-to-delete"))

then:
noExceptionThrown()

and:
adminClient.listConsumerGroups().all().get().collect { it.groupId() } == [consumerGroupId.asString()]

cleanup:
deleteKafkaTopic(kafkaTopicName)
}

def "Should handle deletion attempt for non-existing topic gracefully"() {
given:
Topic topic = createAvroTopic("pl.allegro.test.Foo")
Subscription subscription = createTestSubscription(topic, "test-subscription-to-delete")

when:
consumerGroupManager.deleteConsumerGroup(topic, subscription)

then:
noExceptionThrown()
}

private def publishOnPartition(String kafkaTopicName, int partition, int messages) {
CountDownLatch countDownLatch = new CountDownLatch(messages)
for (int i = 0; i < messages; i++) {
Expand Down Expand Up @@ -189,4 +257,8 @@ class KafkaConsumerGroupManagerSpec extends Specification {
.collect { it.partition() }
.containsAll(0..(partitionsNumber - 1))
}

private def deleteKafkaTopic(String kafkaTopicName) {
adminClient.deleteTopics([kafkaTopicName]).all().get()
}
}
Loading