Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster topic removal in MultiDC setup #1884

Merged
merged 13 commits into from
Aug 5, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -18,6 +20,7 @@
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

public abstract class ZookeeperBasedRepository {

Expand Down Expand Up @@ -75,6 +78,13 @@ protected List<String> childrenOf(String path) {
}
}

protected List<String> childrenPathsOf(String path) {
List<String> childNodes = childrenOf(path);
return childNodes.stream()
.map(child -> ZKPaths.makePath(path, child))
.collect(Collectors.toList());
}

@SuppressWarnings("unchecked")
protected byte[] readFrom(String path) {
return readWithStatFrom(path, bytes -> bytes, (t, stat) -> {}, false).get();
Expand Down Expand Up @@ -156,6 +166,20 @@ protected void createInTransaction(String path, Object value, String childPath)
.commit();
}

protected void deleteInTransaction(List<String> paths) throws Exception {
if (paths.isEmpty()) {
throw new InternalProcessingException("Attempting to remove empty set of paths from ZK");
}
ensureConnected();
CuratorTransactionFinal transaction = zookeeper.inTransaction().delete().forPath(paths.get(0)).and();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: ZooKeeper API mentions that inTransaction method is deprecated.

/**
 * Start a transaction builder
 *
 * @return builder object
 * @deprecated use {@link #transaction()} instead
 */
public CuratorTransaction inTransaction();

Can we use transaction() method here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be resolved here: #1886


for (int i = 1; i < paths.size(); i++) {
transaction = transaction.delete().forPath(paths.get(i)).and();
}

transaction.commit();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: Here is my proposition for refactor:

protected void deleteInTransaction(List<String> paths) throws Exception {
    if (paths.isEmpty()) {
        throw new InternalProcessingException("Attempting to remove empty set of paths from ZK");
    }
    ensureConnected();
    zookeeper.transaction().forOperations(
            paths.stream().map(x -> {
                try {
                    return zookeeper.transactionOp().delete().forPath(x);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }).collect(Collectors.toList())
    );
}

Feel free to tweak with it :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be resolved here: #1886


protected void create(String path, Object value) throws Exception {
ensureConnected();
zookeeper.create().forPath(path, mapper.writeValueAsBytes(value));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Group;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.exception.InternalProcessingException;
import pl.allegro.tech.hermes.domain.group.GroupAlreadyExistsException;
import pl.allegro.tech.hermes.domain.group.GroupNotEmptyException;
Expand Down Expand Up @@ -65,14 +66,23 @@ public void updateGroup(Group group) {
}
}

/**
* Atomic removal of <code>group</code> and <code>group/topics</code>
* nodes is required to prevent lengthy loop during removal, see:
* {@link pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperTopicRepository#removeTopic(TopicName)}.
*/
@Override
public void removeGroup(String groupName) {
ensureGroupExists(groupName);
ensureGroupIsEmpty(groupName);

logger.info("Removing group: {}", groupName);
List<String> pathsToDelete = List.of(
paths.topicsPath(groupName),
paths.groupPath(groupName)
);
try {
remove(paths.groupPath(groupName));
deleteInTransaction(pathsToDelete);
} catch (Exception e) {
throw new InternalProcessingException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public ZookeeperMessagePreviewRepository(CuratorFramework zookeeper, ObjectMappe
@Override
public List<MessagePreview> loadPreview(TopicName topicName) {
try {
return Optional.of(paths.topicPath(topicName, ZookeeperPaths.PREVIEW_PATH))
return Optional.of(paths.topicPreviewPath(topicName))
.filter(this::pathExists)
.flatMap(p -> readFrom(p, new TypeReference<List<MessagePreview>>() {}, true))
.orElseGet(ArrayList::new);
Expand All @@ -50,7 +50,7 @@ private void persistMessage(TopicName topic, List<MessagePreview> messages) {
logger.debug("Persisting {} messages for preview of topic: {}", messages.size(), topic.qualifiedName());
try {
if (pathExists(paths.topicPath(topic))) {
String previewPath = paths.topicPath(topic, ZookeeperPaths.PREVIEW_PATH);
String previewPath = paths.topicPreviewPath(topic);
ensurePathExists(previewPath);
overwrite(previewPath, messages);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ public String topicPath(TopicName topicName, String... tail) {
return Joiner.on(URL_SEPARATOR).join(topicsPath(topicName.getGroupName()), topicName.getName(), (Object[]) tail);
}

public String topicPreviewPath(TopicName topicName) {
return topicPath(topicName, ZookeeperPaths.PREVIEW_PATH);
}

public String topicMetricsPath(TopicName topicName) {
return topicPath(topicName, METRICS_PATH);
}

public String subscriptionPath(TopicName topicName, String subscriptionName, String... tail) {
return Joiner.on(URL_SEPARATOR).join(subscriptionsPath(topicName), subscriptionName, (Object[]) tail);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import pl.allegro.tech.hermes.domain.topic.TopicNotExistsException;
import pl.allegro.tech.hermes.domain.topic.TopicRepository;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -77,12 +78,67 @@ public void createTopic(Topic topic) {
}
}

/**
* To remove topic node, we must remove topic node and its children. The tree looks like this:
* <ul>
* <li>- topic
* <li>----- /subscriptions (required)
* <li>----- /preview (optional)
* <li>----- /metrics (optional)
* <li>--------------- /volume
* <li>--------------- /published
* </ul>
*
* <p>One way to remove the whole tree for topic that would be to use <code>deletingChildrenIfNeeded()</code>:
* e.g. <code>zookeeper.delete().deletingChildrenIfNeeded().forPath(topicPath)</code>.
* However, <code>deletingChildrenIfNeeded</code> is not atomic. It first tries to remove the node <code>topic</code>
* and upon receiving <code>KeeperException.NotEmptyException</code> it tries to remove children recursively
* and then retries the node removal. This means that there is a potentially large time gap between
* removal of <code>topic/subscriptions</code> node and <code>topic</code> node, especially when topic removal is being done
* in remote DC.
*
* <p>It turns out that <code>PathChildrenCache</code> used by <code>HierarchicalCacheLevel</code> in
* Consumers and Frontend listens for <code>topics/subscriptions</code> changes and recreates that node when deleted.
* If the recreation happens between the <code>topic/subscriptions</code> and <code>topic</code> node removal
* than the whole removal process must be repeated resulting in a lengthy loop that may even result in <code>StackOverflowException</code>.
* Example of that scenario would be
* <ol>
* <li> DELETE <code>topic</code> - issued by management, fails with KeeperException.NotEmptyException
* <li> DELETE <code>topic/subscriptions</code> - issued by management, succeeds
* <li> CREATE <code>topic/subscriptions</code> - issued by frontend, succeeds
* <li> DELETE <code>topic</code> - issued by management, fails with KeeperException.NotEmptyException
* <li> [...]
* </ol>
*
* <p>To solve this we must remove <code>topic</code> and <code>topic/subscriptions</code> atomically. However, we must also remove
* other <code>topic</code> children. Transaction API does not allow for optional deletes so we:
* <ol>
* <li> find all children paths
* <li> delete all children in one transaction
* </ol>
*/
@Override
public void removeTopic(TopicName topicName) {
ensureTopicExists(topicName);
logger.info("Removing topic: " + topicName);

List<String> pathsForRemoval = new ArrayList<>();
String topicMetricsPath = paths.topicMetricsPath(topicName);
if (pathExists(topicMetricsPath)) {
pathsForRemoval.addAll(childrenPathsOf(topicMetricsPath));
pathsForRemoval.add(topicMetricsPath);
}

String topicPreviewPath = paths.topicPreviewPath(topicName);
if (pathExists(topicPreviewPath)) {
pathsForRemoval.add(topicPreviewPath);
}

pathsForRemoval.add(paths.subscriptionsPath(topicName));
pathsForRemoval.add(paths.topicPath(topicName));

try {
remove(paths.topicPath(topicName));
deleteInTransaction(pathsForRemoval);
Copy link
Contributor

@faderskd faderskd Aug 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also use this method in other places ? because now we have two different methods for removing paths🤔 for paths without children it will be no-brainer (just use a simple remove), but for those with children we now have a choice

Copy link
Collaborator Author

@moscicky moscicky Aug 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Transactional removal should only be used when needed - it has a higher overhead + it is required to supply all the paths. Most of the time we remove nodes with prefix e.g. delete all nodes for topic ((like /hermes/groups/group/topic/*) and this requires deleteChildrenIfNeeded which is not available in transactions. So imo these two methods have different usage scenarios

} catch (Exception e) {
throw new InternalProcessingException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,17 +185,16 @@ class ZookeeperTopicRepositoryTest extends IntegrationTest {
!repository.topicExists(new TopicName(GROUP, 'remove'))
}

def "should remove topic with metrics but without subscriptions"() {
def "should remove topic with metrics and without preview"() {
given:
def topicName = "topicWithMetrics"

repository.createTopic(topic(GROUP, topicName).build())
wait.untilTopicCreated(GROUP, topicName)

def path = pathsCompiler.compile(BASE_ZOOKEEPER_PATH + ZookeeperCounterStorage.SUBSCRIPTION_DELIVERED, pathContext()
def path = pathsCompiler.compile(BASE_ZOOKEEPER_PATH + ZookeeperCounterStorage.TOPIC_VOLUME_COUNTER, pathContext()
.withGroup(GROUP)
.withTopic(topicName)
.withSubscription("sample")
.build())
zookeeper().create().creatingParentsIfNeeded().forPath(path, '1'.bytes)
wait.untilZookeeperPathIsCreated(path)
Expand All @@ -207,6 +206,29 @@ class ZookeeperTopicRepositoryTest extends IntegrationTest {
!repository.topicExists(new TopicName(GROUP, topicName))
}

def "should remove topic with metrics and preview"() {
given: "a topic"
Topic topic = topic(GROUP, "topicWithMetricsAndPreview").build()
repository.createTopic(topic)
wait.untilTopicCreated(GROUP, topic.getName().getName())

and: "volume metric in zk for that topic"
String metricPath = paths.topicMetricPath(topic.getName(), "volume")
zookeeper().create().creatingParentsIfNeeded().forPath(metricPath, '1'.bytes)
wait.untilZookeeperPathIsCreated(metricPath)

and: "preview in zk for that topic"
String previewPath = paths.topicPreviewPath(topic.getName())
zookeeper().create().creatingParentsIfNeeded().forPath(previewPath , '1'.bytes)
wait.untilZookeeperPathIsCreated(previewPath)

when:
repository.removeTopic(topic.getName())

then:
!repository.topicExists(topic.getName())
}

def "should not throw exception on malformed topic when reading list of all topics"() {
given:
zookeeper().create().forPath(paths.topicPath(new TopicName(GROUP, 'malformed')), ''.bytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ MultiDCAwareService multiDCAwareService(KafkaNamesMappers kafkaNamesMappers, Sch
AdminClient brokerAdminClient = brokerAdminClient(kafkaProperties);
BrokerStorage storage = brokersStorage(brokerAdminClient);
BrokerTopicManagement brokerTopicManagement =
new KafkaBrokerTopicManagement(topicProperties, brokerAdminClient, kafkaNamesMapper);
new KafkaBrokerTopicManagement(topicProperties, brokerAdminClient, kafkaNamesMapper, kafkaProperties.getDatacenter());
KafkaConsumerPool consumerPool = kafkaConsumersPool(kafkaProperties, storage, kafkaProperties.getBrokerList());
KafkaRawMessageReader kafkaRawMessageReader =
new KafkaRawMessageReader(consumerPool, kafkaProperties.getKafkaConsumer().getPollTimeoutMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ private <T> void execute(RepositoryCommand<T> command, boolean isRollbackEnabled
List<DatacenterBoundRepositoryHolder<T>> executedRepoHolders = new ArrayList<>();

for (DatacenterBoundRepositoryHolder<T> repoHolder : repoHolders) {
long start = System.currentTimeMillis();
try {
executedRepoHolders.add(repoHolder);
logger.info("Executing repository command: {} in ZK dc: {}", command, repoHolder.getDatacenterName());
command.execute(repoHolder);
logger.info("Successfully executed repository command: {} in ZK dc: {} in: {} ms", command, repoHolder.getDatacenterName(), System.currentTimeMillis() - start);
} catch (RepositoryNotAvailableException e) {
logger.warn("Execute failed with an RepositoryNotAvailableException error", e);
if (isRollbackEnabled) {
Expand All @@ -58,7 +61,7 @@ private <T> void execute(RepositoryCommand<T> command, boolean isRollbackEnabled
throw ExceptionWrapper.wrapInInternalProcessingExceptionIfNeeded(e, command.toString(), repoHolder.getDatacenterName());
}
} catch (Exception e) {
logger.warn("Execute failed with an error", e);
logger.warn("Failed to execute repository command: {} in ZK dc: {} in: {} ms", command, repoHolder.getDatacenterName(), System.currentTimeMillis() - start, e);
if (isRollbackEnabled) {
rollback(executedRepoHolders, command);
}
Expand All @@ -68,9 +71,12 @@ private <T> void execute(RepositoryCommand<T> command, boolean isRollbackEnabled
}

private <T> void rollback(List<DatacenterBoundRepositoryHolder<T>> repoHolders, RepositoryCommand<T> command) {
long start = System.currentTimeMillis();
for (DatacenterBoundRepositoryHolder<T> repoHolder : repoHolders) {
logger.info("Executing rollback of repository command: {} in ZK dc: {}", command, repoHolder.getDatacenterName());
try {
command.rollback(repoHolder);
logger.info("Successfully executed rollback of repository command: {} in ZK dc: {} in: {} ms", command, repoHolder.getDatacenterName(), System.currentTimeMillis() - start);
} catch (Exception e) {
logger.error("Rollback procedure failed for command {} on DC {}", command, repoHolder.getDatacenterName(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ public void removeSubscription(TopicName topicName, String subscriptionName, Req

public void removeSubscriptionRelatedToTopic(Topic topic, RequestUser removedBy) {
List<Subscription> subscriptions = subscriptionRepository.listSubscriptions(topic.getName());

ensureSubscriptionsHaveAutoRemove(subscriptions, topic.getName());
logger.info("Removing subscriptions of topic: {}, subscriptions: {}", topic.getName(), subscriptions);
long start = System.currentTimeMillis();
subscriptions.forEach(sub -> removeSubscription(topic.getName(), sub.getName(), removedBy));
logger.info("Removed subscriptions of topic: {} in {} ms", topic.getName(), System.currentTimeMillis() - start);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,14 @@ private void removeSchema(Topic topic) {
}

private void removeTopic(Topic topic, RequestUser removedBy) {
logger.info("Removing topic: {} from ZK clusters", topic.getQualifiedName());
long start = System.currentTimeMillis();
multiDcExecutor.executeByUser(new RemoveTopicRepositoryCommand(topic.getName()), removedBy);
logger.info("Removed topic: {} from ZK clusters in: {} ms", topic.getQualifiedName(), System.currentTimeMillis() - start);
logger.info("Removing topic: {} from Kafka clusters", topic.getQualifiedName());
start = System.currentTimeMillis();
multiDCAwareService.manageTopic(brokerTopicManagement -> brokerTopicManagement.removeTopic(topic));
logger.info("Removed topic: {} from Kafka clusters in: {} ms", topic.getQualifiedName(), System.currentTimeMillis() - start);
auditor.objectRemoved(removedBy.getUsername(), topic);
topicOwnerCache.onRemovedTopic(topic);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package pl.allegro.tech.hermes.management.domain.topic.schema;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pl.allegro.tech.hermes.api.RawSchema;
Expand All @@ -24,6 +26,8 @@ public class SchemaService {
private final SchemaValidatorProvider validatorProvider;
private final TopicProperties topicProperties;

private static final Logger logger = LoggerFactory.getLogger(SchemaService.class);

@Autowired
public SchemaService(RawSchemaClient rawSchemaClient,
SchemaValidatorProvider validatorProvider,
Expand Down Expand Up @@ -68,7 +72,10 @@ public void deleteAllSchemaVersions(String qualifiedTopicName) {
if (!topicProperties.isRemoveSchema()) {
throw new SchemaRemovalDisabledException();
}
logger.info("Removing all schema versions for topic: {}", qualifiedTopicName);
long start = System.currentTimeMillis();
rawSchemaClient.deleteAllSchemaVersions(fromQualifiedName(qualifiedTopicName));
logger.info("Removed all schema versions for topic: {} in {} ms", qualifiedTopicName, System.currentTimeMillis() - start);
}

public void validateSchema(Topic topic, String schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.KafkaTopic;
Expand All @@ -32,10 +34,16 @@ public class KafkaBrokerTopicManagement implements BrokerTopicManagement {

private final KafkaNamesMapper kafkaNamesMapper;

public KafkaBrokerTopicManagement(TopicProperties topicProperties, AdminClient kafkaAdminClient, KafkaNamesMapper kafkaNamesMapper) {
private final String datacenterName;

private static final Logger logger = LoggerFactory.getLogger(KafkaBrokerTopicManagement.class);


public KafkaBrokerTopicManagement(TopicProperties topicProperties, AdminClient kafkaAdminClient, KafkaNamesMapper kafkaNamesMapper, String datacenterName) {
this.topicProperties = topicProperties;
this.kafkaAdminClient = kafkaAdminClient;
this.kafkaNamesMapper = kafkaNamesMapper;
this.datacenterName = datacenterName;
}

@Override
Expand All @@ -59,7 +67,12 @@ public void removeTopic(Topic topic) {
kafkaNamesMapper.toKafkaTopics(topic).stream()
.map(k -> kafkaAdminClient.deleteTopics(Collections.singletonList(k.name().asString())))
.map(DeleteTopicsResult::all)
.forEach(this::waitForKafkaFuture);
.forEach(future -> {
logger.info("Removing topic: {} from Kafka dc: {}", topic, datacenterName);
long start = System.currentTimeMillis();
waitForKafkaFuture(future);
logger.info("Removed topic: {} from Kafka dc: {} in {} ms", topic, datacenterName, System.currentTimeMillis() - start);
});
}

@Override
Expand Down
Loading