From 103948a7e0b6a5c6c1f80615b4e7cf1a9a3a1686 Mon Sep 17 00:00:00 2001 From: "maciej.moscicki" Date: Fri, 26 Jul 2024 12:18:42 +0200 Subject: [PATCH 01/13] add per dc logs for topic removal --- .../config/kafka/KafkaConfiguration.java | 2 +- .../subscription/SubscriptionRemover.java | 4 +++- .../RemoveSubscriptionRepositoryCommand.java | 7 +++++++ .../management/domain/topic/TopicService.java | 6 ++++++ .../commands/RemoveTopicRepositoryCommand.java | 7 +++++++ .../domain/topic/schema/SchemaService.java | 7 +++++++ .../service/KafkaBrokerTopicManagement.java | 17 +++++++++++++++-- 7 files changed, 46 insertions(+), 4 deletions(-) diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java index 45652d01b3..0c50ef5970 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java @@ -82,7 +82,7 @@ MultiDCAwareService multiDCAwareService(KafkaNamesMappers kafkaNamesMappers, Sch AdminClient brokerAdminClient = brokerAdminClient(kafkaProperties); BrokerStorage storage = brokersStorage(brokerAdminClient); BrokerTopicManagement brokerTopicManagement = - new KafkaBrokerTopicManagement(topicProperties, brokerAdminClient, kafkaNamesMapper); + new KafkaBrokerTopicManagement(topicProperties, brokerAdminClient, kafkaNamesMapper, kafkaProperties.getDatacenter()); KafkaConsumerPool consumerPool = kafkaConsumersPool(kafkaProperties, storage, kafkaProperties.getBrokerList()); KafkaRawMessageReader kafkaRawMessageReader = new KafkaRawMessageReader(consumerPool, kafkaProperties.getKafkaConsumer().getPollTimeoutMillis()); diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionRemover.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionRemover.java index 4e9a90edfd..016e18b35f 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionRemover.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionRemover.java @@ -43,9 +43,11 @@ public void removeSubscription(TopicName topicName, String subscriptionName, Req public void removeSubscriptionRelatedToTopic(Topic topic, RequestUser removedBy) { List subscriptions = subscriptionRepository.listSubscriptions(topic.getName()); - ensureSubscriptionsHaveAutoRemove(subscriptions, topic.getName()); + logger.info("Removing subscriptions of topic: {}, subscriptions: {}", topic.getName(), subscriptions); + long start = System.currentTimeMillis(); subscriptions.forEach(sub -> removeSubscription(topic.getName(), sub.getName(), removedBy)); + logger.info("Removed subscriptions of topic: {} in {} ms", topic.getName(), System.currentTimeMillis() - start); } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/commands/RemoveSubscriptionRepositoryCommand.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/commands/RemoveSubscriptionRepositoryCommand.java index 9d0c2d838d..b666935ddd 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/commands/RemoveSubscriptionRepositoryCommand.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/commands/RemoveSubscriptionRepositoryCommand.java @@ -1,5 +1,7 @@ package pl.allegro.tech.hermes.management.domain.subscription.commands; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.SubscriptionName; import pl.allegro.tech.hermes.api.TopicName; @@ -9,6 +11,8 @@ public class RemoveSubscriptionRepositoryCommand extends RepositoryCommand { + private static final Logger logger = LoggerFactory.getLogger(RemoveSubscriptionRepositoryCommand.class); + private final TopicName topicName; private final String subscriptionName; @@ -26,7 +30,10 @@ public void backup(DatacenterBoundRepositoryHolder holde @Override public void execute(DatacenterBoundRepositoryHolder holder) { + logger.info("Removing subscription: {} from topic: {} in ZK dc: {}", subscriptionName, topicName, holder.getDatacenterName()); + long start = System.currentTimeMillis(); holder.getRepository().removeSubscription(topicName, subscriptionName); + logger.info("Removed subscription: {} from topic: {} in ZK dc: {}, in {} ms", subscriptionName, topicName, holder.getDatacenterName(), System.currentTimeMillis() - start); } @Override diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicService.java index 220fbfd09b..598e7adc8d 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicService.java @@ -382,8 +382,14 @@ private void removeSchema(Topic topic) { } private void removeTopic(Topic topic, RequestUser removedBy) { + logger.info("Removing topic {} from ZK clusters", topic.getQualifiedName()); + long start = System.currentTimeMillis(); multiDcExecutor.executeByUser(new RemoveTopicRepositoryCommand(topic.getName()), removedBy); + logger.info("Removed topic {} from ZK clusters in: {}ms", topic.getQualifiedName(), System.currentTimeMillis() - start); + logger.info("Removing topic {} from Kafka clusters", topic.getQualifiedName()); + start = System.currentTimeMillis(); multiDCAwareService.manageTopic(brokerTopicManagement -> brokerTopicManagement.removeTopic(topic)); + logger.info("Removed topic {} from Kafka clusters in: {}ms", topic.getQualifiedName(), System.currentTimeMillis() - start); auditor.objectRemoved(removedBy.getUsername(), topic); topicOwnerCache.onRemovedTopic(topic); } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/commands/RemoveTopicRepositoryCommand.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/commands/RemoveTopicRepositoryCommand.java index d378548603..f0b3449ac1 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/commands/RemoveTopicRepositoryCommand.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/commands/RemoveTopicRepositoryCommand.java @@ -1,5 +1,7 @@ package pl.allegro.tech.hermes.management.domain.topic.commands; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.TopicName; import pl.allegro.tech.hermes.domain.topic.TopicRepository; import pl.allegro.tech.hermes.management.domain.dc.DatacenterBoundRepositoryHolder; @@ -8,6 +10,7 @@ public class RemoveTopicRepositoryCommand extends RepositoryCommand { private final TopicName topicName; + private static final Logger logger = LoggerFactory.getLogger(RemoveTopicRepositoryCommand.class); public RemoveTopicRepositoryCommand(TopicName topicName) { this.topicName = topicName; @@ -18,7 +21,11 @@ public void backup(DatacenterBoundRepositoryHolder holder) {} @Override public void execute(DatacenterBoundRepositoryHolder holder) { + logger.info("Removing topic: {} in ZK dc: {}", topicName, holder.getDatacenterName()); + long start = System.currentTimeMillis(); holder.getRepository().removeTopic(topicName); + logger.info("Removed topic: {} in ZK dc: {}, in {} ms", topicName, holder.getDatacenterName(), System.currentTimeMillis() - start); + } @Override diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/schema/SchemaService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/schema/SchemaService.java index 315cdd4cb1..c0d3bcba2c 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/schema/SchemaService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/schema/SchemaService.java @@ -1,5 +1,7 @@ package pl.allegro.tech.hermes.management.domain.topic.schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import pl.allegro.tech.hermes.api.RawSchema; @@ -24,6 +26,8 @@ public class SchemaService { private final SchemaValidatorProvider validatorProvider; private final TopicProperties topicProperties; + private static final Logger logger = LoggerFactory.getLogger(SchemaService.class); + @Autowired public SchemaService(RawSchemaClient rawSchemaClient, SchemaValidatorProvider validatorProvider, @@ -68,7 +72,10 @@ public void deleteAllSchemaVersions(String qualifiedTopicName) { if (!topicProperties.isRemoveSchema()) { throw new SchemaRemovalDisabledException(); } + logger.info("Removing all schema versions for topic: {}", qualifiedTopicName); + long start = System.currentTimeMillis(); rawSchemaClient.deleteAllSchemaVersions(fromQualifiedName(qualifiedTopicName)); + logger.info("Removed all schema versions for topic: {} in {} ms", qualifiedTopicName, System.currentTimeMillis() - start); } public void validateSchema(Topic topic, String schema) { diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaBrokerTopicManagement.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaBrokerTopicManagement.java index 325d362f46..f712550812 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaBrokerTopicManagement.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaBrokerTopicManagement.java @@ -9,6 +9,8 @@ import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.TopicConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.Topic; import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper; import pl.allegro.tech.hermes.common.kafka.KafkaTopic; @@ -32,10 +34,16 @@ public class KafkaBrokerTopicManagement implements BrokerTopicManagement { private final KafkaNamesMapper kafkaNamesMapper; - public KafkaBrokerTopicManagement(TopicProperties topicProperties, AdminClient kafkaAdminClient, KafkaNamesMapper kafkaNamesMapper) { + private final String datacenterName; + + private static final Logger logger = LoggerFactory.getLogger(KafkaBrokerTopicManagement.class); + + + public KafkaBrokerTopicManagement(TopicProperties topicProperties, AdminClient kafkaAdminClient, KafkaNamesMapper kafkaNamesMapper, String datacenterName) { this.topicProperties = topicProperties; this.kafkaAdminClient = kafkaAdminClient; this.kafkaNamesMapper = kafkaNamesMapper; + this.datacenterName = datacenterName; } @Override @@ -59,7 +67,12 @@ public void removeTopic(Topic topic) { kafkaNamesMapper.toKafkaTopics(topic).stream() .map(k -> kafkaAdminClient.deleteTopics(Collections.singletonList(k.name().asString()))) .map(DeleteTopicsResult::all) - .forEach(this::waitForKafkaFuture); + .forEach(future -> { + logger.info("Removing topic {} from Kafka dc: {}", topic, datacenterName); + long start = System.currentTimeMillis(); + waitForKafkaFuture(future); + logger.info("Removed topic {} from Kafka dc: {} in {}ms", topic, datacenterName, System.currentTimeMillis() - start); + }); } @Override From c1f4c73e0ce04f96a52252e649ec9a0f9f846009 Mon Sep 17 00:00:00 2001 From: "maciej.moscicki" Date: Fri, 26 Jul 2024 13:03:39 +0200 Subject: [PATCH 02/13] log error when removing ZK topics --- .../topic/commands/RemoveTopicRepositoryCommand.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/commands/RemoveTopicRepositoryCommand.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/commands/RemoveTopicRepositoryCommand.java index f0b3449ac1..8e5a635e44 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/commands/RemoveTopicRepositoryCommand.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/commands/RemoveTopicRepositoryCommand.java @@ -3,6 +3,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.TopicName; +import pl.allegro.tech.hermes.common.exception.InternalProcessingException; import pl.allegro.tech.hermes.domain.topic.TopicRepository; import pl.allegro.tech.hermes.management.domain.dc.DatacenterBoundRepositoryHolder; import pl.allegro.tech.hermes.management.domain.dc.RepositoryCommand; @@ -23,7 +24,12 @@ public void backup(DatacenterBoundRepositoryHolder holder) {} public void execute(DatacenterBoundRepositoryHolder holder) { logger.info("Removing topic: {} in ZK dc: {}", topicName, holder.getDatacenterName()); long start = System.currentTimeMillis(); - holder.getRepository().removeTopic(topicName); + try { + holder.getRepository().removeTopic(topicName); + } catch (InternalProcessingException e) { + logger.error("Error while trying to remove topic {} in ZK dc: {}", topicName, holder.getDatacenterName(), e); + throw e; + } logger.info("Removed topic: {} in ZK dc: {}, in {} ms", topicName, holder.getDatacenterName(), System.currentTimeMillis() - start); } From c03cade58db4a57f2f26bebcaf7cba927f9bf94f Mon Sep 17 00:00:00 2001 From: "maciej.moscicki" Date: Mon, 29 Jul 2024 09:33:22 +0200 Subject: [PATCH 03/13] measure path ZK path removal --- .../infrastructure/zookeeper/ZookeeperBasedRepository.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java index dcb958901c..75269dc4e1 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java @@ -173,8 +173,13 @@ protected void touch(String path) throws Exception { } protected void remove(String path) throws Exception { + logger.info("Removing path {} from ZK", path); + long start = System.currentTimeMillis(); ensureConnected(); + logger.info("Ensured connected to ZK in {} ms", System.currentTimeMillis() - start); + start = System.currentTimeMillis(); zookeeper.delete().guaranteed().deletingChildrenIfNeeded().forPath(path); + logger.info("Deleted path {} from ZK in {} ms", path, System.currentTimeMillis() - start); } private interface ThrowingReader { From 8a14299578db1db785893cfaa9aee4cc9b35f657 Mon Sep 17 00:00:00 2001 From: "maciej.moscicki" Date: Mon, 29 Jul 2024 16:07:13 +0200 Subject: [PATCH 04/13] delete subscriptions path in transaction with topic path --- .../zookeeper/ZookeeperBasedRepository.java | 10 ++++++++++ .../zookeeper/ZookeeperTopicRepository.java | 3 ++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java index 75269dc4e1..fed75ef83d 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java @@ -156,6 +156,16 @@ protected void createInTransaction(String path, Object value, String childPath) .commit(); } + protected void deleteInTransaction(String path, String childPath) throws Exception { + ensureConnected(); + zookeeper.inTransaction() + .delete().forPath(childPath) + .and() + .delete().forPath(path) + .and() + .commit(); + } + protected void create(String path, Object value) throws Exception { ensureConnected(); zookeeper.create().forPath(path, mapper.writeValueAsBytes(value)); diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java index 60dcb49bc5..b58593f75c 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java @@ -80,9 +80,10 @@ public void createTopic(Topic topic) { @Override public void removeTopic(TopicName topicName) { ensureTopicExists(topicName); + String topicPath = paths.topicPath(topicName); logger.info("Removing topic: " + topicName); try { - remove(paths.topicPath(topicName)); + deleteInTransaction(topicPath, paths.subscriptionsPath(topicName)); } catch (Exception e) { throw new InternalProcessingException(e); } From d32c40a1737ff95ec4930b30f3cc9234b0b602e4 Mon Sep 17 00:00:00 2001 From: "maciej.moscicki" Date: Tue, 30 Jul 2024 09:26:05 +0200 Subject: [PATCH 05/13] add logging to MultiDatacenterRepositoryCommandExecutor --- .../zookeeper/ZookeeperBasedRepository.java | 5 ----- .../MultiDatacenterRepositoryCommandExecutor.java | 8 +++++++- .../RemoveSubscriptionRepositoryCommand.java | 7 ------- .../management/domain/topic/TopicService.java | 8 ++++---- .../commands/RemoveTopicRepositoryCommand.java | 15 +-------------- .../kafka/service/KafkaBrokerTopicManagement.java | 4 ++-- 6 files changed, 14 insertions(+), 33 deletions(-) diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java index fed75ef83d..559bf3715e 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java @@ -183,13 +183,8 @@ protected void touch(String path) throws Exception { } protected void remove(String path) throws Exception { - logger.info("Removing path {} from ZK", path); - long start = System.currentTimeMillis(); ensureConnected(); - logger.info("Ensured connected to ZK in {} ms", System.currentTimeMillis() - start); - start = System.currentTimeMillis(); zookeeper.delete().guaranteed().deletingChildrenIfNeeded().forPath(path); - logger.info("Deleted path {} from ZK in {} ms", path, System.currentTimeMillis() - start); } private interface ThrowingReader { diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/dc/MultiDatacenterRepositoryCommandExecutor.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/dc/MultiDatacenterRepositoryCommandExecutor.java index 6e4c7cf003..7cbb3847c9 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/dc/MultiDatacenterRepositoryCommandExecutor.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/dc/MultiDatacenterRepositoryCommandExecutor.java @@ -46,9 +46,12 @@ private void execute(RepositoryCommand command, boolean isRollbackEnabled List> executedRepoHolders = new ArrayList<>(); for (DatacenterBoundRepositoryHolder repoHolder : repoHolders) { + long start = System.currentTimeMillis(); try { executedRepoHolders.add(repoHolder); + logger.info("Executing repository command: {} in ZK dc: {}", command, repoHolder.getDatacenterName()); command.execute(repoHolder); + logger.info("Successfully executed repository command: {} in ZK dc: {} in: {} ms", command, repoHolder.getDatacenterName(), System.currentTimeMillis() - start); } catch (RepositoryNotAvailableException e) { logger.warn("Execute failed with an RepositoryNotAvailableException error", e); if (isRollbackEnabled) { @@ -58,7 +61,7 @@ private void execute(RepositoryCommand command, boolean isRollbackEnabled throw ExceptionWrapper.wrapInInternalProcessingExceptionIfNeeded(e, command.toString(), repoHolder.getDatacenterName()); } } catch (Exception e) { - logger.warn("Execute failed with an error", e); + logger.warn("Failed to execute repository command: {} in ZK dc: {}", command, System.currentTimeMillis() - start, e); if (isRollbackEnabled) { rollback(executedRepoHolders, command); } @@ -68,9 +71,12 @@ private void execute(RepositoryCommand command, boolean isRollbackEnabled } private void rollback(List> repoHolders, RepositoryCommand command) { + long start = System.currentTimeMillis(); for (DatacenterBoundRepositoryHolder repoHolder : repoHolders) { + logger.info("Executing rollback of repository command: {} in ZK dc: {}", command, repoHolder.getDatacenterName()); try { command.rollback(repoHolder); + logger.info("Successfully executed rollback of repository command: {} in ZK dc: {} in: {} ms", command, repoHolder.getDatacenterName(), System.currentTimeMillis() - start); } catch (Exception e) { logger.error("Rollback procedure failed for command {} on DC {}", command, repoHolder.getDatacenterName(), e); } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/commands/RemoveSubscriptionRepositoryCommand.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/commands/RemoveSubscriptionRepositoryCommand.java index b666935ddd..9d0c2d838d 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/commands/RemoveSubscriptionRepositoryCommand.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/commands/RemoveSubscriptionRepositoryCommand.java @@ -1,7 +1,5 @@ package pl.allegro.tech.hermes.management.domain.subscription.commands; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.SubscriptionName; import pl.allegro.tech.hermes.api.TopicName; @@ -11,8 +9,6 @@ public class RemoveSubscriptionRepositoryCommand extends RepositoryCommand { - private static final Logger logger = LoggerFactory.getLogger(RemoveSubscriptionRepositoryCommand.class); - private final TopicName topicName; private final String subscriptionName; @@ -30,10 +26,7 @@ public void backup(DatacenterBoundRepositoryHolder holde @Override public void execute(DatacenterBoundRepositoryHolder holder) { - logger.info("Removing subscription: {} from topic: {} in ZK dc: {}", subscriptionName, topicName, holder.getDatacenterName()); - long start = System.currentTimeMillis(); holder.getRepository().removeSubscription(topicName, subscriptionName); - logger.info("Removed subscription: {} from topic: {} in ZK dc: {}, in {} ms", subscriptionName, topicName, holder.getDatacenterName(), System.currentTimeMillis() - start); } @Override diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicService.java index 598e7adc8d..26fe3ae834 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicService.java @@ -382,14 +382,14 @@ private void removeSchema(Topic topic) { } private void removeTopic(Topic topic, RequestUser removedBy) { - logger.info("Removing topic {} from ZK clusters", topic.getQualifiedName()); + logger.info("Removing topic: {} from ZK clusters", topic.getQualifiedName()); long start = System.currentTimeMillis(); multiDcExecutor.executeByUser(new RemoveTopicRepositoryCommand(topic.getName()), removedBy); - logger.info("Removed topic {} from ZK clusters in: {}ms", topic.getQualifiedName(), System.currentTimeMillis() - start); - logger.info("Removing topic {} from Kafka clusters", topic.getQualifiedName()); + logger.info("Removed topic: {} from ZK clusters in: {} ms", topic.getQualifiedName(), System.currentTimeMillis() - start); + logger.info("Removing topic: {} from Kafka clusters", topic.getQualifiedName()); start = System.currentTimeMillis(); multiDCAwareService.manageTopic(brokerTopicManagement -> brokerTopicManagement.removeTopic(topic)); - logger.info("Removed topic {} from Kafka clusters in: {}ms", topic.getQualifiedName(), System.currentTimeMillis() - start); + logger.info("Removed topic: {} from Kafka clusters in: {} ms", topic.getQualifiedName(), System.currentTimeMillis() - start); auditor.objectRemoved(removedBy.getUsername(), topic); topicOwnerCache.onRemovedTopic(topic); } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/commands/RemoveTopicRepositoryCommand.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/commands/RemoveTopicRepositoryCommand.java index 8e5a635e44..d378548603 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/commands/RemoveTopicRepositoryCommand.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/commands/RemoveTopicRepositoryCommand.java @@ -1,9 +1,6 @@ package pl.allegro.tech.hermes.management.domain.topic.commands; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.TopicName; -import pl.allegro.tech.hermes.common.exception.InternalProcessingException; import pl.allegro.tech.hermes.domain.topic.TopicRepository; import pl.allegro.tech.hermes.management.domain.dc.DatacenterBoundRepositoryHolder; import pl.allegro.tech.hermes.management.domain.dc.RepositoryCommand; @@ -11,7 +8,6 @@ public class RemoveTopicRepositoryCommand extends RepositoryCommand { private final TopicName topicName; - private static final Logger logger = LoggerFactory.getLogger(RemoveTopicRepositoryCommand.class); public RemoveTopicRepositoryCommand(TopicName topicName) { this.topicName = topicName; @@ -22,16 +18,7 @@ public void backup(DatacenterBoundRepositoryHolder holder) {} @Override public void execute(DatacenterBoundRepositoryHolder holder) { - logger.info("Removing topic: {} in ZK dc: {}", topicName, holder.getDatacenterName()); - long start = System.currentTimeMillis(); - try { - holder.getRepository().removeTopic(topicName); - } catch (InternalProcessingException e) { - logger.error("Error while trying to remove topic {} in ZK dc: {}", topicName, holder.getDatacenterName(), e); - throw e; - } - logger.info("Removed topic: {} in ZK dc: {}, in {} ms", topicName, holder.getDatacenterName(), System.currentTimeMillis() - start); - + holder.getRepository().removeTopic(topicName); } @Override diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaBrokerTopicManagement.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaBrokerTopicManagement.java index f712550812..f799c0b03c 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaBrokerTopicManagement.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaBrokerTopicManagement.java @@ -68,10 +68,10 @@ public void removeTopic(Topic topic) { .map(k -> kafkaAdminClient.deleteTopics(Collections.singletonList(k.name().asString()))) .map(DeleteTopicsResult::all) .forEach(future -> { - logger.info("Removing topic {} from Kafka dc: {}", topic, datacenterName); + logger.info("Removing topic: {} from Kafka dc: {}", topic, datacenterName); long start = System.currentTimeMillis(); waitForKafkaFuture(future); - logger.info("Removed topic {} from Kafka dc: {} in {}ms", topic, datacenterName, System.currentTimeMillis() - start); + logger.info("Removed topic: {} from Kafka dc: {} in {} ms", topic, datacenterName, System.currentTimeMillis() - start); }); } From 0bd5a7607faf259e5f3d97fd7caba4981fe125b2 Mon Sep 17 00:00:00 2001 From: "maciej.moscicki" Date: Tue, 30 Jul 2024 15:26:52 +0200 Subject: [PATCH 06/13] remove transaction children atomically --- .../zookeeper/ZookeeperBasedRepository.java | 33 ++++++++-- .../ZookeeperMessagePreviewRepository.java | 4 +- .../zookeeper/ZookeeperPaths.java | 8 +++ .../zookeeper/ZookeeperTopicRepository.java | 66 +++++++++++++++++-- .../ZookeeperTopicRepositoryTest.groovy | 28 +++++++- 5 files changed, 122 insertions(+), 17 deletions(-) diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java index 559bf3715e..2578467bfe 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java @@ -5,6 +5,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.ArrayUtils; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.transaction.CuratorTransactionFinal; +import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,6 +20,7 @@ import java.util.List; import java.util.Optional; import java.util.function.BiConsumer; +import java.util.stream.Collectors; public abstract class ZookeeperBasedRepository { @@ -75,6 +78,13 @@ protected List childrenOf(String path) { } } + protected List childrenPathsOf(String path) { + List childNodes = childrenOf(path); + return childNodes.stream() + .map(child -> ZKPaths.makePath(path, child)) + .collect(Collectors.toList()); + } + @SuppressWarnings("unchecked") protected byte[] readFrom(String path) { return readWithStatFrom(path, bytes -> bytes, (t, stat) -> {}, false).get(); @@ -156,14 +166,18 @@ protected void createInTransaction(String path, Object value, String childPath) .commit(); } - protected void deleteInTransaction(String path, String childPath) throws Exception { + protected void deleteInTransaction(List paths) throws Exception { + if (paths.isEmpty()) { + throw new InternalProcessingException("Attempting to remove empty set of paths from ZK"); + } ensureConnected(); - zookeeper.inTransaction() - .delete().forPath(childPath) - .and() - .delete().forPath(path) - .and() - .commit(); + CuratorTransactionFinal transaction = zookeeper.inTransaction().delete().forPath(paths.get(0)).and(); + + for (int i = 1; i < paths.size(); i++) { + transaction = transaction.delete().forPath(paths.get(i)).and(); + } + + transaction.commit(); } protected void create(String path, Object value) throws Exception { @@ -182,6 +196,11 @@ protected void touch(String path) throws Exception { zookeeper.setData().forPath(path, oldData); } + protected void removeIfExists(String path) throws Exception { + ensureConnected(); + zookeeper.delete().quietly().guaranteed().deletingChildrenIfNeeded().forPath(path); + } + protected void remove(String path) throws Exception { ensureConnected(); zookeeper.delete().guaranteed().deletingChildrenIfNeeded().forPath(path); diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperMessagePreviewRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperMessagePreviewRepository.java index 0fd938b668..e80b574f02 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperMessagePreviewRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperMessagePreviewRepository.java @@ -28,7 +28,7 @@ public ZookeeperMessagePreviewRepository(CuratorFramework zookeeper, ObjectMappe @Override public List loadPreview(TopicName topicName) { try { - return Optional.of(paths.topicPath(topicName, ZookeeperPaths.PREVIEW_PATH)) + return Optional.of(paths.topicPreviewPath(topicName)) .filter(this::pathExists) .flatMap(p -> readFrom(p, new TypeReference>() {}, true)) .orElseGet(ArrayList::new); @@ -50,7 +50,7 @@ private void persistMessage(TopicName topic, List messages) { logger.debug("Persisting {} messages for preview of topic: {}", messages.size(), topic.qualifiedName()); try { if (pathExists(paths.topicPath(topic))) { - String previewPath = paths.topicPath(topic, ZookeeperPaths.PREVIEW_PATH); + String previewPath = paths.topicPreviewPath(topic); ensurePathExists(previewPath); overwrite(previewPath, messages); } diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperPaths.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperPaths.java index 9e04bcc64c..be3cb23393 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperPaths.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperPaths.java @@ -77,6 +77,14 @@ public String topicPath(TopicName topicName, String... tail) { return Joiner.on(URL_SEPARATOR).join(topicsPath(topicName.getGroupName()), topicName.getName(), (Object[]) tail); } + public String topicPreviewPath(TopicName topicName) { + return topicPath(topicName, ZookeeperPaths.PREVIEW_PATH); + } + + public String topicMetricsPath(TopicName topicName) { + return topicPath(topicName, METRICS_PATH); + } + public String subscriptionPath(TopicName topicName, String subscriptionName, String... tail) { return Joiner.on(URL_SEPARATOR).join(subscriptionsPath(topicName), subscriptionName, (Object[]) tail); } diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java index b58593f75c..b04122f1ff 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java @@ -13,6 +13,7 @@ import pl.allegro.tech.hermes.domain.topic.TopicNotExistsException; import pl.allegro.tech.hermes.domain.topic.TopicRepository; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Optional; @@ -77,18 +78,58 @@ public void createTopic(Topic topic) { } } + /** + * To remove topic node, we must remove topic node and its children. The tree looks like this: + * - topic + * ----- /subscriptions (required) + * ----- /preview (optional) + * ----- /metrics (optional) + * --------------- /volume + * --------------- /published + *

+ * One way to remove the whole tree for topic that would be to use 'deletingChildrenIfNeeded()': + * e.g. zookeeper.delete().deletingChildrenIfNeeded().forPath(topicPath). + * However, deletingChildrenIfNeeded is not atomic. It first tries to remove the node ('topic') + * and upon receiving 'KeeperException.NotEmptyException' it tries to remove children recursively + * and then retries the node removal. This means that there is a potentially large time gap between + * removal of 'topic/subscriptions' node and 'topic' node, especially when topic removal is being done + * in remote DC. It turns out that 'PathChildrenCache' used for 'HierarchicalCacheLevel' in + * consumers and management recreates 'topic/subscriptions' node when deleted. If the recreation is faster + * than the removal of 'topic' node, than the whole removal process must be repeated resulting in a lengthy loop + * that may even result in StackOverflowException. + *

+ * To solve this we must remove 'topic' and 'topic/subscriptions' atomically. However, we must also remove + * other 'topic' children. Transaction API does not allow for 'optional' deletes so we: + * 1. find all children beforehand + * 2. delete all children in one transaction + */ @Override public void removeTopic(TopicName topicName) { ensureTopicExists(topicName); - String topicPath = paths.topicPath(topicName); - logger.info("Removing topic: " + topicName); + + List pathsForRemoval = new ArrayList<>(); + String topicMetricsPath = paths.topicMetricsPath(topicName); + if (pathExists(topicMetricsPath)) { + pathsForRemoval.addAll(childrenPathsOf(topicMetricsPath)); + pathsForRemoval.add(topicMetricsPath); + } + + String topicPreviewPath = paths.topicPreviewPath(topicName); + if (pathExists(topicPreviewPath)) { + pathsForRemoval.add(topicPreviewPath); + } + + pathsForRemoval.add(paths.subscriptionsPath(topicName)); + pathsForRemoval.add(paths.topicPath(topicName)); + try { - deleteInTransaction(topicPath, paths.subscriptionsPath(topicName)); - } catch (Exception e) { - throw new InternalProcessingException(e); + deleteInTransaction(pathsForRemoval); + } catch (Exception ex) { + throw new InternalProcessingException(ex); } } + @Override public void updateTopic(Topic topic) { ensureTopicExists(topic.getName()); @@ -106,6 +147,7 @@ public void touchTopic(TopicName topicName) { ensureTopicExists(topicName); logger.info("Touching topic: " + topicName.qualifiedName()); + removeTopicChildren(topicName); try { touch(paths.topicPath(topicName)); } catch (Exception ex) { @@ -113,6 +155,20 @@ public void touchTopic(TopicName topicName) { } } + private void removeTopicChildren(TopicName topicName) { + try { + removeIfExists(paths.topicPreviewPath(topicName)); + } catch (Exception e) { + throw new InternalProcessingException(e); + } + + try { + removeIfExists(paths.topicMetricsPath(topicName)); + } catch (Exception e) { + throw new InternalProcessingException(e); + } + } + @Override public Topic getTopicDetails(TopicName topicName) { return getTopicDetails(topicName, false).get(); diff --git a/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepositoryTest.groovy b/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepositoryTest.groovy index 71bcd15bb4..7ac1377790 100644 --- a/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepositoryTest.groovy +++ b/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepositoryTest.groovy @@ -185,17 +185,16 @@ class ZookeeperTopicRepositoryTest extends IntegrationTest { !repository.topicExists(new TopicName(GROUP, 'remove')) } - def "should remove topic with metrics but without subscriptions"() { + def "should remove topic with metrics and without preview"() { given: def topicName = "topicWithMetrics" repository.createTopic(topic(GROUP, topicName).build()) wait.untilTopicCreated(GROUP, topicName) - def path = pathsCompiler.compile(BASE_ZOOKEEPER_PATH + ZookeeperCounterStorage.SUBSCRIPTION_DELIVERED, pathContext() + def path = pathsCompiler.compile(BASE_ZOOKEEPER_PATH + ZookeeperCounterStorage.TOPIC_VOLUME_COUNTER, pathContext() .withGroup(GROUP) .withTopic(topicName) - .withSubscription("sample") .build()) zookeeper().create().creatingParentsIfNeeded().forPath(path, '1'.bytes) wait.untilZookeeperPathIsCreated(path) @@ -207,6 +206,29 @@ class ZookeeperTopicRepositoryTest extends IntegrationTest { !repository.topicExists(new TopicName(GROUP, topicName)) } + def "should remove topic with metrics and preview"() { + given: "a topic" + Topic topic = topic(GROUP, "topicWithMetricsAndPreview").build() + repository.createTopic(topic) + wait.untilTopicCreated(GROUP, topic.getName().getName()) + + and: "volume metric in zk for that topic" + String metricPath = paths.topicMetricPath(topic.getName(), "volume") + zookeeper().create().creatingParentsIfNeeded().forPath(metricPath, '1'.bytes) + wait.untilZookeeperPathIsCreated(metricPath) + + and: "preview in zk for that topic" + String previewPath = paths.topicPreviewPath(topic.getName()) + zookeeper().create().creatingParentsIfNeeded().forPath(previewPath , '1'.bytes) + wait.untilZookeeperPathIsCreated(previewPath) + + when: + repository.removeTopic(topic.getName()) + + then: + !repository.topicExists(topic.getName()) + } + def "should not throw exception on malformed topic when reading list of all topics"() { given: zookeeper().create().forPath(paths.topicPath(new TopicName(GROUP, 'malformed')), ''.bytes) From f88ea54f92888645a130edf17a83dbf43c7fc1c5 Mon Sep 17 00:00:00 2001 From: "maciej.moscicki" Date: Tue, 30 Jul 2024 15:29:10 +0200 Subject: [PATCH 07/13] remove transaction children atomically --- .../zookeeper/ZookeeperBasedRepository.java | 5 ----- .../zookeeper/ZookeeperTopicRepository.java | 20 ++----------------- 2 files changed, 2 insertions(+), 23 deletions(-) diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java index 2578467bfe..241fcb0260 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java @@ -196,11 +196,6 @@ protected void touch(String path) throws Exception { zookeeper.setData().forPath(path, oldData); } - protected void removeIfExists(String path) throws Exception { - ensureConnected(); - zookeeper.delete().quietly().guaranteed().deletingChildrenIfNeeded().forPath(path); - } - protected void remove(String path) throws Exception { ensureConnected(); zookeeper.delete().guaranteed().deletingChildrenIfNeeded().forPath(path); diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java index b04122f1ff..51cd7bc4eb 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java @@ -124,12 +124,11 @@ public void removeTopic(TopicName topicName) { try { deleteInTransaction(pathsForRemoval); - } catch (Exception ex) { - throw new InternalProcessingException(ex); + } catch (Exception e) { + throw new InternalProcessingException(e); } } - @Override public void updateTopic(Topic topic) { ensureTopicExists(topic.getName()); @@ -147,7 +146,6 @@ public void touchTopic(TopicName topicName) { ensureTopicExists(topicName); logger.info("Touching topic: " + topicName.qualifiedName()); - removeTopicChildren(topicName); try { touch(paths.topicPath(topicName)); } catch (Exception ex) { @@ -155,20 +153,6 @@ public void touchTopic(TopicName topicName) { } } - private void removeTopicChildren(TopicName topicName) { - try { - removeIfExists(paths.topicPreviewPath(topicName)); - } catch (Exception e) { - throw new InternalProcessingException(e); - } - - try { - removeIfExists(paths.topicMetricsPath(topicName)); - } catch (Exception e) { - throw new InternalProcessingException(e); - } - } - @Override public Topic getTopicDetails(TopicName topicName) { return getTopicDetails(topicName, false).get(); From 2d20bfb9eced51833f83cc68b3fa9dce96208fd2 Mon Sep 17 00:00:00 2001 From: "maciej.moscicki" Date: Tue, 30 Jul 2024 15:29:46 +0200 Subject: [PATCH 08/13] remove transaction children atomically --- .../infrastructure/zookeeper/ZookeeperTopicRepository.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java index 51cd7bc4eb..cf0da2e965 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java @@ -106,6 +106,7 @@ public void createTopic(Topic topic) { @Override public void removeTopic(TopicName topicName) { ensureTopicExists(topicName); + logger.info("Removing topic: " + topicName); List pathsForRemoval = new ArrayList<>(); String topicMetricsPath = paths.topicMetricsPath(topicName); From 04bf62aeaa1354272f482ae09b32b5aefd9b1c95 Mon Sep 17 00:00:00 2001 From: "maciej.moscicki" Date: Tue, 30 Jul 2024 15:35:31 +0200 Subject: [PATCH 09/13] improve docs --- .../zookeeper/ZookeeperTopicRepository.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java index cf0da2e965..31c6368173 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java @@ -93,10 +93,18 @@ public void createTopic(Topic topic) { * and upon receiving 'KeeperException.NotEmptyException' it tries to remove children recursively * and then retries the node removal. This means that there is a potentially large time gap between * removal of 'topic/subscriptions' node and 'topic' node, especially when topic removal is being done - * in remote DC. It turns out that 'PathChildrenCache' used for 'HierarchicalCacheLevel' in - * consumers and management recreates 'topic/subscriptions' node when deleted. If the recreation is faster - * than the removal of 'topic' node, than the whole removal process must be repeated resulting in a lengthy loop - * that may even result in StackOverflowException. + * in remote DC. + *

+ * It turns out that 'PathChildrenCache' used by 'HierarchicalCacheLevel' in + * Consumers and Frontend listens for 'topics/subscriptions' changes and recreates that node when deleted. + * If the recreation happens between the 'topic/subscriptions' and 'topic' node removal + * than the whole removal process must be repeated resulting in a lengthy loop that may even result in StackOverflowException. + * Example of that scenario would be + * 1. DELETE 'topic' - issued by management, fails with KeeperException.NotEmptyException + * 2. DELETE 'topic/subscriptions' - issued by management, succeeds + * 3. CREATE 'topic/subscriptions' - issued by frontend, succeeds + * 4. DELETE 'topic' - issued by management, fails with KeeperException.NotEmptyException + * [...] *

* To solve this we must remove 'topic' and 'topic/subscriptions' atomically. However, we must also remove * other 'topic' children. Transaction API does not allow for 'optional' deletes so we: From 0f7e412c2a785ae8cada4118abf5d1c496091c7c Mon Sep 17 00:00:00 2001 From: "maciej.moscicki" Date: Tue, 30 Jul 2024 15:47:35 +0200 Subject: [PATCH 10/13] lint --- .../zookeeper/ZookeeperTopicRepository.java | 60 ++++++++++--------- 1 file changed, 33 insertions(+), 27 deletions(-) diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java index 31c6368173..3642677d37 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java @@ -80,36 +80,42 @@ public void createTopic(Topic topic) { /** * To remove topic node, we must remove topic node and its children. The tree looks like this: - * - topic - * ----- /subscriptions (required) - * ----- /preview (optional) - * ----- /metrics (optional) - * --------------- /volume - * --------------- /published - *

- * One way to remove the whole tree for topic that would be to use 'deletingChildrenIfNeeded()': - * e.g. zookeeper.delete().deletingChildrenIfNeeded().forPath(topicPath). - * However, deletingChildrenIfNeeded is not atomic. It first tries to remove the node ('topic') - * and upon receiving 'KeeperException.NotEmptyException' it tries to remove children recursively + *

    + *
  • - topic + *
  • ----- /subscriptions (required) + *
  • ----- /preview (optional) + *
  • ----- /metrics (optional) + *
  • --------------- /volume + *
  • --------------- /published + *
+ * + *

One way to remove the whole tree for topic that would be to use deletingChildrenIfNeeded(): + * e.g. zookeeper.delete().deletingChildrenIfNeeded().forPath(topicPath). + * However, deletingChildrenIfNeeded is not atomic. It first tries to remove the node topic + * and upon receiving KeeperException.NotEmptyException it tries to remove children recursively * and then retries the node removal. This means that there is a potentially large time gap between - * removal of 'topic/subscriptions' node and 'topic' node, especially when topic removal is being done + * removal of topic/subscriptions node and topic node, especially when topic removal is being done * in remote DC. - *

- * It turns out that 'PathChildrenCache' used by 'HierarchicalCacheLevel' in - * Consumers and Frontend listens for 'topics/subscriptions' changes and recreates that node when deleted. - * If the recreation happens between the 'topic/subscriptions' and 'topic' node removal - * than the whole removal process must be repeated resulting in a lengthy loop that may even result in StackOverflowException. + * + *

It turns out that PathChildrenCache used by HierarchicalCacheLevel in + * Consumers and Frontend listens for topics/subscriptions changes and recreates that node when deleted. + * If the recreation happens between the topic/subscriptions and topic node removal + * than the whole removal process must be repeated resulting in a lengthy loop that may even result in StackOverflowException. * Example of that scenario would be - * 1. DELETE 'topic' - issued by management, fails with KeeperException.NotEmptyException - * 2. DELETE 'topic/subscriptions' - issued by management, succeeds - * 3. CREATE 'topic/subscriptions' - issued by frontend, succeeds - * 4. DELETE 'topic' - issued by management, fails with KeeperException.NotEmptyException - * [...] - *

- * To solve this we must remove 'topic' and 'topic/subscriptions' atomically. However, we must also remove - * other 'topic' children. Transaction API does not allow for 'optional' deletes so we: - * 1. find all children beforehand - * 2. delete all children in one transaction + *

    + *
  1. DELETE topic - issued by management, fails with KeeperException.NotEmptyException + *
  2. DELETE topic/subscriptions - issued by management, succeeds + *
  3. CREATE topic/subscriptions - issued by frontend, succeeds + *
  4. DELETE topic - issued by management, fails with KeeperException.NotEmptyException + *
  5. [...] + *
+ * + *

To solve this we must remove topic and topic/subscriptions atomically. However, we must also remove + * other topic children. Transaction API does not allow for optional deletes so we: + *

    + *
  1. find all children paths + *
  2. delete all children in one transaction + *
*/ @Override public void removeTopic(TopicName topicName) { From bf7d5f6a23b849659c1df3a50d6677a63c2eb7f9 Mon Sep 17 00:00:00 2001 From: "maciej.moscicki" Date: Wed, 31 Jul 2024 14:16:22 +0200 Subject: [PATCH 11/13] remove group atomically with topics node --- .../zookeeper/ZookeeperGroupRepository.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperGroupRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperGroupRepository.java index d546ba6044..cc81d85e96 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperGroupRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperGroupRepository.java @@ -7,6 +7,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.Group; +import pl.allegro.tech.hermes.api.TopicName; import pl.allegro.tech.hermes.common.exception.InternalProcessingException; import pl.allegro.tech.hermes.domain.group.GroupAlreadyExistsException; import pl.allegro.tech.hermes.domain.group.GroupNotEmptyException; @@ -65,14 +66,23 @@ public void updateGroup(Group group) { } } + /** + * Atomic removal of group and group/topics + * nodes is required to prevent lengthy loop during removal, see: + * {@link pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperTopicRepository#removeTopic(TopicName)} + */ @Override public void removeGroup(String groupName) { ensureGroupExists(groupName); ensureGroupIsEmpty(groupName); logger.info("Removing group: {}", groupName); + List pathsToDelete = List.of( + paths.topicsPath(groupName), + paths.groupPath(groupName) + ); try { - remove(paths.groupPath(groupName)); + deleteInTransaction(pathsToDelete); } catch (Exception e) { throw new InternalProcessingException(e); } From bed84bc8d086dc80ee3c5eb35733482d2dc7a722 Mon Sep 17 00:00:00 2001 From: "maciej.moscicki" Date: Wed, 31 Jul 2024 14:52:12 +0200 Subject: [PATCH 12/13] lint --- .../infrastructure/zookeeper/ZookeeperGroupRepository.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperGroupRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperGroupRepository.java index cc81d85e96..b9c745796d 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperGroupRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperGroupRepository.java @@ -69,7 +69,7 @@ public void updateGroup(Group group) { /** * Atomic removal of group and group/topics * nodes is required to prevent lengthy loop during removal, see: - * {@link pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperTopicRepository#removeTopic(TopicName)} + * {@link pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperTopicRepository#removeTopic(TopicName)}. */ @Override public void removeGroup(String groupName) { From a75a7bb25c323a6b838401a2413e4fadaf5d231a Mon Sep 17 00:00:00 2001 From: "maciej.moscicki" Date: Fri, 2 Aug 2024 16:57:43 +0200 Subject: [PATCH 13/13] fix MultiDatacenterRepositoryCommandExecutor logging --- .../domain/dc/MultiDatacenterRepositoryCommandExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/dc/MultiDatacenterRepositoryCommandExecutor.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/dc/MultiDatacenterRepositoryCommandExecutor.java index 7cbb3847c9..e4baa9efec 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/dc/MultiDatacenterRepositoryCommandExecutor.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/dc/MultiDatacenterRepositoryCommandExecutor.java @@ -61,7 +61,7 @@ private void execute(RepositoryCommand command, boolean isRollbackEnabled throw ExceptionWrapper.wrapInInternalProcessingExceptionIfNeeded(e, command.toString(), repoHolder.getDatacenterName()); } } catch (Exception e) { - logger.warn("Failed to execute repository command: {} in ZK dc: {}", command, System.currentTimeMillis() - start, e); + logger.warn("Failed to execute repository command: {} in ZK dc: {} in: {} ms", command, repoHolder.getDatacenterName(), System.currentTimeMillis() - start, e); if (isRollbackEnabled) { rollback(executedRepoHolders, command); }