Skip to content

Commit

Permalink
[server][test] Race condition fix for re-subscription of real-time to…
Browse files Browse the repository at this point in the history
…pic partitions. (#1263)

This improvement addresses a previously reverted #1192. The unit test in the prior fix triggered separate threads resubscribing to the same real-time topic partition, resulting in frequent resubscriptions. These frequent resubscriptions caused excessive metric emissions, leading to GC issues and causing other unit tests to fail in CI. To prevent metric emissions, we use a mocked AggKafkaConsumerServiceStats instead of passing null when instantiating KafkaConsumerService in the unit test, as KafkaConsumerService would otherwise create a real AggKafkaConsumerServiceStats when null is passed.

Previous #1192 commit message:

Recently we introduced re-subscription feature for different store versions. Leader and follower partitions from different version topics will experience re-subscription triggered by store version role change concurrently.

Problem:
Two StoreIngestionTask threads for different store versions (store version 1 and store version 2) try to do re-subscription (unsub and sub) for the same real-time topic partition concurrently. During re-subscription triggered by one store version, operation of consumer.unSubscribe to remove assignment of the topic partition and operation of consumerToConsumptionTask.get(consumer).removeDataReceiver(pubSubTopicPartition) are sequentially executed. It is possible that StoreIngestionTask thread for store version 2 got the same ConsumptionTask, but DataReceiver inside ConsumptionTask is still from store version 1). As each ConsumptionTask will only allow one DataReceiver for one particular real-time topic partition, the DataReceiver from store version 2 will not be able to be added to this ConsumptionTask.

Solution:
Using SharedKafkaConsumer level lock to protect KafkaConsumcerService for unSubscribe, SetDataReceiver, unSubscribeAll to guarantee there will one real-time partition from specific store version doing DataReceiver related assignment change.
Co-authored-by: Hao Xu <xhao@xhao-mn3.linkedin.biz>
  • Loading branch information
haoxu07 and Hao Xu authored Oct 30, 2024
1 parent d0ee623 commit 416b7db
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,14 @@ public void unsubscribeAll(PubSubTopic versionTopic) {
versionTopicToTopicPartitionToConsumer.compute(versionTopic, (k, topicPartitionToConsumerMap) -> {
if (topicPartitionToConsumerMap != null) {
topicPartitionToConsumerMap.forEach((topicPartition, sharedConsumer) -> {
sharedConsumer.unSubscribe(topicPartition);
removeTopicPartitionFromConsumptionTask(sharedConsumer, topicPartition);
/**
* Refer {@link KafkaConsumerService#startConsumptionIntoDataReceiver} for avoiding race condition caused by
* setting data receiver and unsubscribing concurrently for the same topic partition on a shared consumer.
*/
synchronized (sharedConsumer) {
sharedConsumer.unSubscribe(topicPartition);
removeTopicPartitionFromConsumptionTask(sharedConsumer, topicPartition);
}
});
}
return null;
Expand All @@ -237,8 +243,14 @@ public void unsubscribeAll(PubSubTopic versionTopic) {
public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) {
PubSubConsumerAdapter consumer = getConsumerAssignedToVersionTopicPartition(versionTopic, pubSubTopicPartition);
if (consumer != null) {
consumer.unSubscribe(pubSubTopicPartition);
consumerToConsumptionTask.get(consumer).removeDataReceiver(pubSubTopicPartition);
/**
* Refer {@link KafkaConsumerService#startConsumptionIntoDataReceiver} for avoiding race condition caused by
* setting data receiver and unsubscribing concurrently for the same topic partition on a shared consumer.
*/
synchronized (consumer) {
consumer.unSubscribe(pubSubTopicPartition);
removeTopicPartitionFromConsumptionTask(consumer, pubSubTopicPartition);
}
versionTopicToTopicPartitionToConsumer.compute(versionTopic, (k, topicPartitionToConsumerMap) -> {
if (topicPartitionToConsumerMap != null) {
topicPartitionToConsumerMap.remove(pubSubTopicPartition);
Expand All @@ -265,20 +277,25 @@ public void batchUnsubscribe(PubSubTopic versionTopic, Set<PubSubTopicPartition>
/**
* Leverage {@link PubSubConsumerAdapter#batchUnsubscribe(Set)}.
*/
consumerUnSubTopicPartitionSet.forEach((c, tpSet) -> {
c.batchUnsubscribe(tpSet);
ConsumptionTask task = consumerToConsumptionTask.get(c);
tpSet.forEach(tp -> {
task.removeDataReceiver(tp);
versionTopicToTopicPartitionToConsumer.compute(versionTopic, (k, topicPartitionToConsumerMap) -> {
if (topicPartitionToConsumerMap != null) {
topicPartitionToConsumerMap.remove(tp);
return topicPartitionToConsumerMap.isEmpty() ? null : topicPartitionToConsumerMap;
} else {
return null;
}
});
});
consumerUnSubTopicPartitionSet.forEach((sharedConsumer, tpSet) -> {
ConsumptionTask task = consumerToConsumptionTask.get(sharedConsumer);
/**
* Refer {@link KafkaConsumerService#startConsumptionIntoDataReceiver} for avoiding race condition caused by
* setting data receiver and unsubscribing concurrently for the same topic partition on a shared consumer.
*/
synchronized (sharedConsumer) {
sharedConsumer.batchUnsubscribe(tpSet);
tpSet.forEach(task::removeDataReceiver);
}
tpSet.forEach(
tp -> versionTopicToTopicPartitionToConsumer.compute(versionTopic, (k, topicPartitionToConsumerMap) -> {
if (topicPartitionToConsumerMap != null) {
topicPartitionToConsumerMap.remove(tp);
return topicPartitionToConsumerMap.isEmpty() ? null : topicPartitionToConsumerMap;
} else {
return null;
}
}));
});
}

Expand Down Expand Up @@ -387,26 +404,32 @@ public void startConsumptionIntoDataReceiver(
PubSubTopic versionTopic = consumedDataReceiver.destinationIdentifier();
PubSubTopicPartition topicPartition = partitionReplicaIngestionContext.getPubSubTopicPartition();
SharedKafkaConsumer consumer = assignConsumerFor(versionTopic, topicPartition);

if (consumer == null) {
// Defensive code. Shouldn't happen except in case of a regression.
throw new VeniceException(
"Shared consumer must exist for version topic: " + versionTopic + " in Kafka cluster: " + kafkaUrl);
}

ConsumptionTask consumptionTask = consumerToConsumptionTask.get(consumer);
if (consumptionTask == null) {
// Defensive coding. Should never happen except in case of a regression.
throw new IllegalStateException(
"There should be a " + ConsumptionTask.class.getSimpleName() + " assigned for this "
+ SharedKafkaConsumer.class.getSimpleName());
}
/**
* N.B. it's important to set the {@link ConsumedDataReceiver} prior to subscribing, otherwise the
* {@link KafkaConsumerService.ConsumptionTask} will not be able to funnel the messages.
* It is possible that when one {@link StoreIngestionTask} thread finishes unsubscribing a topic partition but not
* finish removing data receiver, but the other {@link StoreIngestionTask} thread is setting data receiver for this
* topic partition before subscription. As {@link ConsumptionTask} does not allow 2 different data receivers for
* the same topic partition, it will throw exception.
*/
consumptionTask.setDataReceiver(topicPartition, consumedDataReceiver);
consumer.subscribe(consumedDataReceiver.destinationIdentifier(), topicPartition, lastReadOffset);
synchronized (consumer) {
ConsumptionTask consumptionTask = consumerToConsumptionTask.get(consumer);
if (consumptionTask == null) {
// Defensive coding. Should never happen except in case of a regression.
throw new IllegalStateException(
"There should be a " + ConsumptionTask.class.getSimpleName() + " assigned for this "
+ SharedKafkaConsumer.class.getSimpleName());
}
/**
* N.B. it's important to set the {@link ConsumedDataReceiver} prior to subscribing, otherwise the
* {@link KafkaConsumerService.ConsumptionTask} will not be able to funnel the messages.
*/
consumptionTask.setDataReceiver(topicPartition, consumedDataReceiver);
consumer.subscribe(consumedDataReceiver.destinationIdentifier(), topicPartition, lastReadOffset);
}
}

interface KCSConstructor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,9 @@ public synchronized void batchUnsubscribe(Set<PubSubTopicPartition> pubSubTopicP
*/
protected synchronized void unSubscribeAction(Supplier<Set<PubSubTopicPartition>> supplier) {
long currentPollTimes = pollTimes;
long startTime = System.currentTimeMillis();
Set<PubSubTopicPartition> topicPartitions = supplier.get();
long startTime = System.currentTimeMillis();
long elapsedTime = System.currentTimeMillis() - startTime;

LOGGER.info(
"Shared consumer {} unsubscribed {} partition(s): ({}) in {} ms",
this.getClass().getSimpleName(),
Expand Down Expand Up @@ -348,4 +347,9 @@ public Long endOffset(PubSubTopicPartition pubSubTopicPartition) {
public List<PubSubTopicPartitionInfo> partitionsFor(PubSubTopic topic) {
throw new UnsupportedOperationException("partitionsFor is not supported in SharedKafkaConsumer");
}

// Test only
public void setNextPollTimeOutSeconds(long seconds) {
this.nextPollTimeOutSeconds = seconds;
}
}
Original file line number Diff line number Diff line change
@@ -1,28 +1,54 @@
package com.linkedin.davinci.kafka.consumer;

import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.utils.TestUtils.waitForNonDeterministicAssertion;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.ingestion.consumption.ConsumedDataReceiver;
import com.linkedin.davinci.stats.AggKafkaConsumerServiceStats;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubConsumerAdapterFactory;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.pools.LandFillObjectPool;
import io.tehuti.metrics.MetricsRepository;
import io.tehuti.metrics.Sensor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -458,4 +484,154 @@ private void verifyConsumerServiceStartConsumptionIntoDataReceiver(
partitionReplicaIngestionContext.getVersionTopic(),
partitionReplicaIngestionContext.getPubSubTopicPartition());
}

/**
* This test simulates multiple threads resubscribing to the same real-time topic partition for different store
* versions. It verifies if the lock effectively protects the handoff between {@link ConsumptionTask} and
* {@link ConsumedDataReceiver} during the re-subscription process. Previously, an unprotected handoff led to an
* {@link IllegalStateException} being thrown within {@link ConsumptionTask#setDataReceiver}.
* To induce the race condition, we assume there are 5 store versions, with each version assigned to a dedicated
* thread that continuously resubscribes to the same real-time topic partition. This continues until either a race
* condition is encountered or the test times out (30 seconds).
*/
@Test
public void testKafkaConsumerServiceResubscriptionConcurrency() throws Exception {
ApacheKafkaConsumerAdapter consumer1 = mock(ApacheKafkaConsumerAdapter.class);
PubSubConsumerAdapterFactory factory = mock(PubSubConsumerAdapterFactory.class);
when(factory.create(any(), anyBoolean(), any(), any())).thenReturn(consumer1);

Properties properties = new Properties();
String testKafkaUrl = "test_kafka_url";
properties.put(KAFKA_BOOTSTRAP_SERVERS, testKafkaUrl);
MetricsRepository mockMetricsRepository = mock(MetricsRepository.class);
final Sensor mockSensor = mock(Sensor.class);
doReturn(mockSensor).when(mockMetricsRepository).sensor(anyString(), any());

int versionNum = 5;
PubSubMessageDeserializer pubSubDeserializer = new PubSubMessageDeserializer(
new OptimizedKafkaValueSerializer(),
new LandFillObjectPool<>(KafkaMessageEnvelope::new),
new LandFillObjectPool<>(KafkaMessageEnvelope::new));
KafkaConsumerService consumerService = new PartitionWiseKafkaConsumerService(
ConsumerPoolType.REGULAR_POOL,
factory,
properties,
1000l,
versionNum * 2, // To simulate real production cases: consumers # >> version # per store.
mock(IngestionThrottler.class),
mock(KafkaClusterBasedRecordThrottler.class),
mockMetricsRepository,
"test_kafka_cluster_alias",
TimeUnit.MINUTES.toMillis(1),
mock(TopicExistenceChecker.class),
false,
pubSubDeserializer,
SystemTime.INSTANCE,
mock(AggKafkaConsumerServiceStats.class),
false,
mock(ReadOnlyStoreRepository.class),
false);
String storeName = Utils.getUniqueString("test_consumer_service");

Function<String, Boolean> isAAWCStoreFunc = vt -> true;
KafkaConsumerServiceDelegator.KafkaConsumerServiceBuilder consumerServiceBuilder =
(ignored, poolType) -> consumerService;
VeniceServerConfig mockConfig = mock(VeniceServerConfig.class);
doReturn(false).when(mockConfig).isDedicatedConsumerPoolForAAWCLeaderEnabled();
doReturn(true).when(mockConfig).isResubscriptionTriggeredByVersionIngestionContextChangeEnabled();
doReturn(KafkaConsumerServiceDelegator.ConsumerPoolStrategyType.CURRENT_VERSION_PRIORITIZATION).when(mockConfig)
.getConsumerPoolStrategyType();
KafkaConsumerServiceDelegator delegator =
new KafkaConsumerServiceDelegator(mockConfig, consumerServiceBuilder, isAAWCStoreFunc);
PubSubTopicPartition realTimeTopicPartition =
new PubSubTopicPartitionImpl(TOPIC_REPOSITORY.getTopic(Version.composeRealTimeTopic(storeName)), 0);

CountDownLatch countDownLatch = new CountDownLatch(1);
List<Thread> infiniteSubUnSubThreads = new ArrayList<>();
for (int i = 0; i < versionNum; i++) {
PubSubTopic versionTopicForStoreName3 = TOPIC_REPOSITORY.getTopic(Version.composeKafkaTopic(storeName, i));
StoreIngestionTask task = mock(StoreIngestionTask.class);
when(task.getVersionTopic()).thenReturn(versionTopicForStoreName3);
when(task.isHybridMode()).thenReturn(true);

PartitionReplicaIngestionContext partitionReplicaIngestionContext = new PartitionReplicaIngestionContext(
versionTopicForStoreName3,
realTimeTopicPartition,
PartitionReplicaIngestionContext.VersionRole.CURRENT,
PartitionReplicaIngestionContext.WorkloadType.AA_OR_WRITE_COMPUTE);
ConsumedDataReceiver consumedDataReceiver = mock(ConsumedDataReceiver.class);
when(consumedDataReceiver.destinationIdentifier()).thenReturn(versionTopicForStoreName3);
Runnable infiniteSubUnSub = getResubscriptionRunnableFor(
delegator,
partitionReplicaIngestionContext,
consumedDataReceiver,
countDownLatch);
Thread infiniteSubUnSubThread = new Thread(infiniteSubUnSub, "infiniteResubscribe: " + versionTopicForStoreName3);
infiniteSubUnSubThread.start();
infiniteSubUnSubThreads.add(infiniteSubUnSubThread);
// Wait for the thread to start.
waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, () -> {
assertTrue(
infiniteSubUnSubThread.getState().equals(Thread.State.WAITING)
|| infiniteSubUnSubThread.getState().equals(Thread.State.TIMED_WAITING)
|| infiniteSubUnSubThread.getState().equals(Thread.State.BLOCKED)
|| infiniteSubUnSubThread.getState().equals(Thread.State.RUNNABLE));
});
}
long currentTime = System.currentTimeMillis();
Boolean raceConditionFound = countDownLatch.await(30, TimeUnit.SECONDS);
long elapsedTime = System.currentTimeMillis() - currentTime;
for (Thread infiniteSubUnSubThread: infiniteSubUnSubThreads) {
assertTrue(
infiniteSubUnSubThread.getState().equals(Thread.State.WAITING)
|| infiniteSubUnSubThread.getState().equals(Thread.State.TIMED_WAITING)
|| infiniteSubUnSubThread.getState().equals(Thread.State.BLOCKED)
|| infiniteSubUnSubThread.getState().equals(Thread.State.RUNNABLE));
infiniteSubUnSubThread.interrupt();
infiniteSubUnSubThread.join();
assertEquals(Thread.State.TERMINATED, infiniteSubUnSubThread.getState());
}
Assert.assertFalse(
raceConditionFound,
"Found race condition in KafkaConsumerService with time passed in milliseconds: " + elapsedTime);
delegator.close();
}

private Runnable getResubscriptionRunnableFor(
KafkaConsumerServiceDelegator consumerServiceDelegator,
PartitionReplicaIngestionContext partitionReplicaIngestionContext,
ConsumedDataReceiver consumedDataReceiver,
CountDownLatch countDownLatch) {
PubSubTopic versionTopic = partitionReplicaIngestionContext.getVersionTopic();
PubSubTopicPartition pubSubTopicPartition = partitionReplicaIngestionContext.getPubSubTopicPartition();
return () -> {
try {
while (true) {
if (Thread.currentThread().isInterrupted()) {
consumerServiceDelegator.unSubscribe(versionTopic, pubSubTopicPartition);
break;
}
consumerServiceDelegator
.startConsumptionIntoDataReceiver(partitionReplicaIngestionContext, 0, consumedDataReceiver);
// Avoid wait time here to increase the chance for race condition.
consumerServiceDelegator.assignConsumerFor(versionTopic, pubSubTopicPartition).setNextPollTimeOutSeconds(0);
int versionNum =
Version.parseVersionFromKafkaTopicName(partitionReplicaIngestionContext.getVersionTopic().getName());
if (versionNum % 3 == 0) {
consumerServiceDelegator.unSubscribe(versionTopic, pubSubTopicPartition);
} else if (versionNum % 3 == 1) {
consumerServiceDelegator.unsubscribeAll(partitionReplicaIngestionContext.getVersionTopic());
} else {
consumerServiceDelegator.batchUnsubscribe(
partitionReplicaIngestionContext.getVersionTopic(),
Collections.singleton(partitionReplicaIngestionContext.getPubSubTopicPartition()));
}
}
} catch (Exception e) {
// If any thread encounter an exception, count down the latch to 0 to indicate main thread to catch the issue.
e.printStackTrace();
countDownLatch.countDown();
}
};
}
}

0 comments on commit 416b7db

Please sign in to comment.