Skip to content

Commit

Permalink
[server] Add store-aware partition-wise shared consumer assignment st…
Browse files Browse the repository at this point in the history
…rategy (linkedin#1261)

Add a new version of partition wise shared consumer assignment strategy.
We have been seeing subscriptions to the same topic / store be assigned to the same consumer, and for a particular store push's view (can be inc push / full push) it can be competing with each other and becomes the long-tail partition and slow down the overall progress.
Assuming the store/topic itself does not have data-skew, then we should try to assign these subscriptions to different consumers as even as possible. Especially for RT topics, backup / current / future version will share the same input volume, so we should not treat them differently within the same pool, but we can further optimize that in different level (Pool assignment strategy)
This PR adds the new strategy so when a new topic partition is looking for assignment, it will compute and sort all the consumer's load based on general load and the store-specific load. It will assign the new topic partition to the least loaded consumer based on the computed load.
  • Loading branch information
sixpluszero authored Oct 31, 2024
1 parent 8889ab3 commit 7e9aa79
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ protected KafkaConsumerService(
}

/** May be overridden to clean up state in sub-classes */
void handleUnsubscription(SharedKafkaConsumer consumer, PubSubTopicPartition topicPartition) {
void handleUnsubscription(
SharedKafkaConsumer consumer,
PubSubTopic versionTopic,
PubSubTopicPartition topicPartition) {
}

private String getUniqueClientId(String kafkaUrl, int suffix) {
Expand Down Expand Up @@ -563,7 +566,8 @@ private interface OffsetGetter {
*/
public enum ConsumerAssignmentStrategy {
TOPIC_WISE_SHARED_CONSUMER_ASSIGNMENT_STRATEGY(TopicWiseKafkaConsumerService::new),
PARTITION_WISE_SHARED_CONSUMER_ASSIGNMENT_STRATEGY(PartitionWiseKafkaConsumerService::new);
PARTITION_WISE_SHARED_CONSUMER_ASSIGNMENT_STRATEGY(PartitionWiseKafkaConsumerService::new),
STORE_AWARE_PARTITION_WISE_SHARED_CONSUMER_ASSIGNMENT_STRATEGY(StoreAwarePartitionWiseKafkaConsumerService::new);

final KCSConstructor constructor;

Expand All @@ -576,4 +580,8 @@ public enum ConsumerAssignmentStrategy {
public void setThreadFactory(RandomAccessDaemonThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}

IndexedMap<SharedKafkaConsumer, ConsumptionTask> getConsumerToConsumptionTask() {
return consumerToConsumptionTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class PartitionWiseKafkaConsumerService extends KafkaConsumerService {
* have same real-time topics, we should avoid same real-time topic partition from different version topics sharing
* the same consumer from consumer pool.
*/
private final Map<PubSubTopicPartition, Set<PubSubConsumerAdapter>> rtTopicPartitionToConsumerMap =
protected final Map<PubSubTopicPartition, Set<PubSubConsumerAdapter>> rtTopicPartitionToConsumerMap =
new VeniceConcurrentHashMap<>();

private final Logger LOGGER;
Expand All @@ -58,6 +58,48 @@ public class PartitionWiseKafkaConsumerService extends KafkaConsumerService {
final boolean isKafkaConsumerOffsetCollectionEnabled,
final ReadOnlyStoreRepository metadataRepository,
final boolean isUnregisterMetricForDeletedStoreEnabled) {
this(
poolType,
consumerFactory,
consumerProperties,
readCycleDelayMs,
numOfConsumersPerKafkaCluster,
ingestionThrottler,
kafkaClusterBasedRecordThrottler,
metricsRepository,
kafkaClusterAlias,
sharedConsumerNonExistingTopicCleanupDelayMS,
topicExistenceChecker,
liveConfigBasedKafkaThrottlingEnabled,
pubSubDeserializer,
time,
stats,
isKafkaConsumerOffsetCollectionEnabled,
metadataRepository,
isUnregisterMetricForDeletedStoreEnabled,
PartitionWiseKafkaConsumerService.class.toString());
}

PartitionWiseKafkaConsumerService(
final ConsumerPoolType poolType,
final PubSubConsumerAdapterFactory consumerFactory,
final Properties consumerProperties,
final long readCycleDelayMs,
final int numOfConsumersPerKafkaCluster,
final IngestionThrottler ingestionThrottler,
final KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler,
final MetricsRepository metricsRepository,
final String kafkaClusterAlias,
final long sharedConsumerNonExistingTopicCleanupDelayMS,
final TopicExistenceChecker topicExistenceChecker,
final boolean liveConfigBasedKafkaThrottlingEnabled,
final PubSubMessageDeserializer pubSubDeserializer,
final Time time,
final AggKafkaConsumerServiceStats stats,
final boolean isKafkaConsumerOffsetCollectionEnabled,
final ReadOnlyStoreRepository metadataRepository,
final boolean isUnregisterMetricForDeletedStoreEnabled,
final String loggerNamePrefix) {
super(
poolType,
consumerFactory,
Expand All @@ -77,7 +119,7 @@ public class PartitionWiseKafkaConsumerService extends KafkaConsumerService {
isKafkaConsumerOffsetCollectionEnabled,
metadataRepository,
isUnregisterMetricForDeletedStoreEnabled);
this.LOGGER = LogManager.getLogger(PartitionWiseKafkaConsumerService.class + " [" + kafkaUrlForLogger + "]");
this.LOGGER = LogManager.getLogger(loggerNamePrefix + " [" + kafkaUrlForLogger + "]");
}

@Override
Expand Down Expand Up @@ -137,20 +179,31 @@ protected synchronized SharedKafkaConsumer pickConsumerForPartition(
return consumer;
}

private boolean alreadySubscribedRealtimeTopicPartition(
protected boolean alreadySubscribedRealtimeTopicPartition(
SharedKafkaConsumer consumer,
PubSubTopicPartition topicPartition) {
Set<PubSubConsumerAdapter> consumers = rtTopicPartitionToConsumerMap.get(topicPartition);
Set<PubSubConsumerAdapter> consumers = getRtTopicPartitionToConsumerMap().get(topicPartition);
return consumers != null && consumers.contains(consumer);
}

@Override
void handleUnsubscription(SharedKafkaConsumer consumer, PubSubTopicPartition pubSubTopicPartition) {
void handleUnsubscription(
SharedKafkaConsumer consumer,
PubSubTopic versionTopic,
PubSubTopicPartition pubSubTopicPartition) {
if (pubSubTopicPartition.getPubSubTopic().isRealTime()) {
Set<PubSubConsumerAdapter> rtTopicConsumers = rtTopicPartitionToConsumerMap.get(pubSubTopicPartition);
if (rtTopicConsumers != null) {
rtTopicConsumers.remove(consumer);
}
}
}

Map<PubSubTopicPartition, Set<PubSubConsumerAdapter>> getRtTopicPartitionToConsumerMap() {
return rtTopicPartitionToConsumerMap;
}

Logger getLOGGER() {
return LOGGER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public SharedKafkaConsumer(
* Listeners may use this callback to clean up lingering state they may be holding about a consumer.
*/
interface UnsubscriptionListener {
void call(SharedKafkaConsumer consumer, PubSubTopicPartition pubSubTopicPartition);
void call(SharedKafkaConsumer consumer, PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition);
}

protected synchronized void updateCurrentAssignment(Set<PubSubTopicPartition> newAssignment) {
Expand Down Expand Up @@ -155,8 +155,8 @@ synchronized void subscribe(
public synchronized void unSubscribe(PubSubTopicPartition pubSubTopicPartition) {
unSubscribeAction(() -> {
this.delegate.unSubscribe(pubSubTopicPartition);
subscribedTopicPartitionToVersionTopic.remove(pubSubTopicPartition);
unsubscriptionListener.call(this, pubSubTopicPartition);
PubSubTopic versionTopic = subscribedTopicPartitionToVersionTopic.remove(pubSubTopicPartition);
unsubscriptionListener.call(this, versionTopic, pubSubTopicPartition);
return Collections.singleton(pubSubTopicPartition);
});
}
Expand All @@ -166,8 +166,8 @@ public synchronized void batchUnsubscribe(Set<PubSubTopicPartition> pubSubTopicP
unSubscribeAction(() -> {
this.delegate.batchUnsubscribe(pubSubTopicPartitionSet);
for (PubSubTopicPartition pubSubTopicPartition: pubSubTopicPartitionSet) {
subscribedTopicPartitionToVersionTopic.remove(pubSubTopicPartition);
unsubscriptionListener.call(this, pubSubTopicPartition);
PubSubTopic versionTopic = subscribedTopicPartitionToVersionTopic.remove(pubSubTopicPartition);
unsubscriptionListener.call(this, versionTopic, pubSubTopicPartition);
}
return pubSubTopicPartitionSet;
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.stats.AggKafkaConsumerServiceStats;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubConsumerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.utils.Time;
import io.tehuti.metrics.MetricsRepository;
import java.util.HashSet;
import java.util.Properties;


/**
* {@link StoreAwarePartitionWiseKafkaConsumerService} is used to allocate share consumer from consumer pool at partition
* granularity. One shared consumer may have multiple topics, and each topic may have multiple consumers.
* This is store-aware version of topic-wise shared consumer service. The topic partition assignment in this service has
* a heuristic that we should distribute the all the subscriptions related to a same store / version as even as possible.
* The load calculation for each consumer will be:
* Consumer assignment size + IMPOSSIBLE_MAX_PARTITION_COUNT_PER_CONSUMER * subscription count for the same store;
* and we will pick the least loaded consumer for a new topic partition request. If there is no eligible consumer, it
* will throw {@link IllegalStateException}
*/
public class StoreAwarePartitionWiseKafkaConsumerService extends PartitionWiseKafkaConsumerService {
// This constant makes sure the store subscription count will always be prioritized over consumer assignment count.
private static final int IMPOSSIBLE_MAX_PARTITION_COUNT_PER_CONSUMER = 10000;

StoreAwarePartitionWiseKafkaConsumerService(
final ConsumerPoolType poolType,
final PubSubConsumerAdapterFactory consumerFactory,
final Properties consumerProperties,
final long readCycleDelayMs,
final int numOfConsumersPerKafkaCluster,
final IngestionThrottler ingestionThrottler,
final KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler,
final MetricsRepository metricsRepository,
final String kafkaClusterAlias,
final long sharedConsumerNonExistingTopicCleanupDelayMS,
final TopicExistenceChecker topicExistenceChecker,
final boolean liveConfigBasedKafkaThrottlingEnabled,
final PubSubMessageDeserializer pubSubDeserializer,
final Time time,
final AggKafkaConsumerServiceStats stats,
final boolean isKafkaConsumerOffsetCollectionEnabled,
final ReadOnlyStoreRepository metadataRepository,
final boolean isUnregisterMetricForDeletedStoreEnabled) {
super(
poolType,
consumerFactory,
consumerProperties,
readCycleDelayMs,
numOfConsumersPerKafkaCluster,
ingestionThrottler,
kafkaClusterBasedRecordThrottler,
metricsRepository,
kafkaClusterAlias,
sharedConsumerNonExistingTopicCleanupDelayMS,
topicExistenceChecker,
liveConfigBasedKafkaThrottlingEnabled,
pubSubDeserializer,
time,
stats,
isKafkaConsumerOffsetCollectionEnabled,
metadataRepository,
isUnregisterMetricForDeletedStoreEnabled,
StoreAwarePartitionWiseKafkaConsumerService.class.toString());
}

@Override
protected synchronized SharedKafkaConsumer pickConsumerForPartition(
PubSubTopic versionTopic,
PubSubTopicPartition topicPartition) {
String storeName = versionTopic.getStoreName();
long minLoad = Long.MAX_VALUE;
SharedKafkaConsumer minLoadConsumer = null;
for (SharedKafkaConsumer consumer: getConsumerToConsumptionTask().keySet()) {
int index = getConsumerToConsumptionTask().indexOf(consumer);
if (topicPartition.getPubSubTopic().isRealTime()
&& alreadySubscribedRealtimeTopicPartition(consumer, topicPartition)) {
getLOGGER().info(
"Consumer id: {} has already subscribed the same real time topic-partition: {} and thus cannot be picked",
index,
topicPartition);
continue;
}
long overallLoad = getConsumerStoreLoad(consumer, storeName);
if (overallLoad < minLoad) {
minLoadConsumer = consumer;
minLoad = overallLoad;
}
}
if (minLoad == Long.MAX_VALUE) {
throw new IllegalStateException("Unable to find least loaded consumer entry.");
}

// Update RT topic partition consumer map.
if (topicPartition.getPubSubTopic().isRealTime()) {
getRtTopicPartitionToConsumerMap().computeIfAbsent(topicPartition, key -> new HashSet<>()).add(minLoadConsumer);
}

getLOGGER().info(
"Picked consumer id: {}, assignment size: {}, computed load: {} for topic partition: {}, version topic: {}",
getConsumerToConsumptionTask().indexOf(minLoadConsumer),
minLoadConsumer.getAssignmentSize(),
minLoad,
topicPartition,
versionTopic);
return minLoadConsumer;
}

long getConsumerStoreLoad(SharedKafkaConsumer consumer, String storeName) {
long baseAssignmentCount = consumer.getAssignmentSize();
long storeSubscriptionCount = consumer.getAssignment()
.stream()
.filter(x -> Version.parseStoreFromKafkaTopicName(x.getTopicName()).equals(storeName))
.count();
return storeSubscriptionCount * IMPOSSIBLE_MAX_PARTITION_COUNT_PER_CONSUMER + baseAssignmentCount;
}
}
Loading

0 comments on commit 7e9aa79

Please sign in to comment.