diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 494a73c0ab..141af20a36 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -252,7 +252,7 @@ public DaVinciBackend( .map(objectCacheConfig -> new ObjectCacheBackend(clientConfig, objectCacheConfig, schemaRepository)); ingestionService = new KafkaStoreIngestionService( - storageService.getStorageEngineRepository(), + storageService, configLoader, storageMetadataService, clusterInfoProvider, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java index a43ed1decf..79b8c9959f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java @@ -145,8 +145,7 @@ public void dropStoragePartitionGracefully( final int waitIntervalInSecond = 1; final int maxRetry = timeoutInSeconds / waitIntervalInSecond; getStoreIngestionService().stopConsumptionAndWait(storeConfig, partition, waitIntervalInSecond, maxRetry, true); - // Drops corresponding data partition from storage. - this.storageService.dropStorePartition(storeConfig, partition, removeEmptyStorageEngine); + getStoreIngestionService().dropStoragePartitionGracefully(storeConfig, partition); } @Override diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java index 2456fcfe31..b232b77fbd 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java @@ -695,7 +695,7 @@ private void initializeIsolatedIngestionServer() { // Create KafkaStoreIngestionService storeIngestionService = new KafkaStoreIngestionService( - storageService.getStorageEngineRepository(), + storageService, configLoader, storageMetadataService, clusterInfoProvider, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java index a4f1a17095..90bc6598bf 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java @@ -15,6 +15,7 @@ import com.linkedin.davinci.replication.merge.RmdSerDe; import com.linkedin.davinci.replication.merge.StringAnnotatedStoreSchemaCache; import com.linkedin.davinci.stats.AggVersionedIngestionStats; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer; import com.linkedin.davinci.storage.chunking.RawBytesChunkingAdapter; import com.linkedin.davinci.storage.chunking.SingleGetChunkingAdapter; @@ -103,6 +104,7 @@ private static class ReusableObjects { private final ThreadLocal threadLocalReusableObjects = ThreadLocal.withInitial(ReusableObjects::new); public ActiveActiveStoreIngestionTask( + StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, @@ -114,6 +116,7 @@ public ActiveActiveStoreIngestionTask( Optional cacheBackend, DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) { super( + storageService, builder, store, version, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumerActionType.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumerActionType.java index 398191044d..8aaca3c07f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumerActionType.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumerActionType.java @@ -4,7 +4,7 @@ * An Enum enumerating all valid types of {@link ConsumerAction}. */ public enum ConsumerActionType { - SUBSCRIBE(1), UNSUBSCRIBE(1), RESET_OFFSET(1), PAUSE(1), RESUME(1), + SUBSCRIBE(1), UNSUBSCRIBE(1), RESET_OFFSET(1), PAUSE(1), RESUME(1), DROP_PARTITION(1), /** * KILL action has higher priority than others, so that once KILL action is added to the action queue, * we will process it immediately to avoid doing throw-away works. diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index ba3bb10c1a..4b54d17396 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -29,8 +29,8 @@ import com.linkedin.davinci.stats.AggVersionedIngestionStats; import com.linkedin.davinci.stats.ParticipantStoreConsumptionStats; import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService; -import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; import com.linkedin.davinci.store.view.VeniceViewWriterFactory; import com.linkedin.venice.SSLConfig; @@ -130,6 +130,7 @@ public class KafkaStoreIngestionService extends AbstractVeniceService implements private static final String GROUP_ID_FORMAT = "%s_%s"; private static final Logger LOGGER = LogManager.getLogger(KafkaStoreIngestionService.class); + private final StorageService storageService; private final VeniceConfigLoader veniceConfigLoader; @@ -190,7 +191,7 @@ public class KafkaStoreIngestionService extends AbstractVeniceService implements private final ExecutorService aaWCWorkLoadProcessingThreadPool; public KafkaStoreIngestionService( - StorageEngineRepository storageEngineRepository, + StorageService storageService, VeniceConfigLoader veniceConfigLoader, StorageMetadataService storageMetadataService, ClusterInfoProvider clusterInfoProvider, @@ -212,6 +213,7 @@ public KafkaStoreIngestionService( PubSubClientsFactory pubSubClientsFactory, Optional sslFactory, HeartbeatMonitoringService heartbeatMonitoringService) { + this.storageService = storageService; this.cacheBackend = cacheBackend; this.recordTransformerFunction = recordTransformerFunction; this.storageMetadataService = storageMetadataService; @@ -448,7 +450,7 @@ public void handleStoreDeleted(Store store) { ingestionTaskFactory = StoreIngestionTaskFactory.builder() .setVeniceWriterFactory(veniceWriterFactory) - .setStorageEngineRepository(storageEngineRepository) + .setStorageEngineRepository(storageService.getStorageEngineRepository()) .setStorageMetadataService(storageMetadataService) .setLeaderFollowerNotifiersQueue(leaderFollowerNotifiers) .setSchemaRepository(schemaRepo) @@ -519,6 +521,7 @@ private StoreIngestionTask createStoreIngestionTask( }; return ingestionTaskFactory.getNewIngestionTask( + storageService, store, version, getKafkaConsumerProperties(veniceStoreVersionConfig), @@ -920,6 +923,37 @@ public void stopConsumptionAndWait( } } + /** + * Drops the corresponding Venice Partition gracefully. + * This should only be called after {@link #stopConsumptionAndWait} has been called + * @param veniceStore Venice Store for the partition. + * @param partitionId Venice partition's id. + */ + public void dropStoragePartitionGracefully(VeniceStoreVersionConfig veniceStore, int partitionId) { + final String topic = veniceStore.getStoreVersionName(); + + if (isPartitionConsuming(topic, partitionId)) { + throw new VeniceException("Tried to drop storage partition that is still consuming"); + } + + try (AutoCloseableLock ignore = topicLockManager.getLockForResource(topic)) { + StoreIngestionTask ingestionTask = topicNameToIngestionTaskMap.get(topic); + if (ingestionTask != null && ingestionTask.isRunning()) { + LOGGER.info( + "Ingestion task is still running for Topic {}. Dropping partition {} asynchronously", + veniceStore.getStoreVersionName(), + partitionId); + ingestionTask.dropPartition(new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topic), partitionId)); + } else { + LOGGER.info( + "Ingestion task isn't running for Topic {}. Dropping partition {} synchronously", + veniceStore.getStoreVersionName(), + partitionId); + this.storageService.dropStorePartition(veniceStore, partitionId, true); + } + } + } + /** * This function will try to kill the ingestion tasks belonging to non-current versions. * And this is mainly being used by memory limiter feature to free up resources when encountering memory diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index a09a201c58..c3be01d5f6 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -23,6 +23,7 @@ import com.linkedin.davinci.schema.merge.CollectionTimestampMergeRecordHelper; import com.linkedin.davinci.schema.merge.MergeRecordHelper; import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer; import com.linkedin.davinci.storage.chunking.GenericRecordChunkingAdapter; import com.linkedin.davinci.store.AbstractStorageEngine; @@ -204,6 +205,7 @@ public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask { private final Version version; public LeaderFollowerStoreIngestionTask( + StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, @@ -215,6 +217,7 @@ public LeaderFollowerStoreIngestionTask( Optional cacheBackend, DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) { super( + storageService, builder, store, version, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 8ac571b5d1..8467c21bb0 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -2,6 +2,7 @@ import static com.linkedin.davinci.ingestion.LagType.OFFSET_LAG; import static com.linkedin.davinci.ingestion.LagType.TIME_LAG; +import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.DROP_PARTITION; import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.RESET_OFFSET; import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.SUBSCRIBE; import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.UNSUBSCRIBE; @@ -34,6 +35,7 @@ import com.linkedin.davinci.stats.HostLevelIngestionStats; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.davinci.store.StoragePartitionConfig; import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; @@ -189,6 +191,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); /** storage destination for consumption */ + protected final StorageService storageService; protected final StorageEngineRepository storageEngineRepository; protected final AbstractStorageEngine storageEngine; @@ -342,8 +345,10 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { protected final boolean batchReportIncPushStatusEnabled; protected final ExecutorService parallelProcessingThreadPool; + private final Set pendingPartitionDrops = Collections.synchronizedSet(new HashSet<>()); public StoreIngestionTask( + StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, @@ -361,6 +366,7 @@ public StoreIngestionTask( this.databaseSyncBytesIntervalForTransactionalMode = storeConfig.getDatabaseSyncBytesIntervalForTransactionalMode(); this.databaseSyncBytesIntervalForDeferredWriteMode = storeConfig.getDatabaseSyncBytesIntervalForDeferredWriteMode(); this.kafkaProps = kafkaConsumerProperties; + this.storageService = storageService; this.storageEngineRepository = builder.getStorageEngineRepository(); this.storageMetadataService = builder.getStorageMetadataService(); this.storeRepository = builder.getMetadataRepo(); @@ -630,6 +636,18 @@ public synchronized CompletableFuture unSubscribePartition( return consumerAction.getFuture(); } + /** + * + * Adds an asynchronous partition drop request for the task. + * This is always a Helix triggered action + */ + public synchronized void dropPartition(PubSubTopicPartition topicPartition) { + throwIfNotRunning(); + ConsumerAction consumerAction = new ConsumerAction(DROP_PARTITION, topicPartition, nextSeqNum(), true); + consumerActionsQueue.add(consumerAction); + pendingPartitionDrops.add(topicPartition.getPartitionNumber()); + } + public boolean hasAnySubscription() { return !partitionConsumptionStateMap.isEmpty(); } @@ -2133,8 +2151,27 @@ protected void processCommonConsumerAction(ConsumerAction consumerAction) throws break; case KILL: LOGGER.info("Kill this consumer task for Topic: {}", topic); + + if (!this.pendingPartitionDrops.isEmpty()) { + LOGGER.info( + "Partitions {} are pending to be dropped for Topic: {}. Dropping them before killing consumer task.", + this.pendingPartitionDrops, + topic); + synchronized (this.pendingPartitionDrops) { + for (Integer partitionToDrop: this.pendingPartitionDrops) { + this.storageService.dropStorePartition(storeConfig, partitionToDrop, true); + this.pendingPartitionDrops.remove(partitionToDrop); + } + } + } // Throw the exception here to break the consumption loop, and then this task is marked as error status. throw new VeniceIngestionTaskKilledException(KILLED_JOB_MESSAGE + topic); + case DROP_PARTITION: + LOGGER.info("{} Dropping partition: {}", ingestionTaskName, topicPartition); + this.storageService.dropStorePartition(storeConfig, partition, true); + this.pendingPartitionDrops.remove(partition); + LOGGER.info("{} Dropped partition: {}", ingestionTaskName, topicPartition); + break; default: throw new UnsupportedOperationException(operation.name() + " is not supported in " + getClass().getName()); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java index 6a48231c3c..4c1ef625fa 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java @@ -11,6 +11,7 @@ import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; import com.linkedin.davinci.store.view.VeniceViewWriterFactory; import com.linkedin.venice.kafka.protocol.state.PartitionState; @@ -43,6 +44,7 @@ private StoreIngestionTaskFactory(Builder builder) { } public StoreIngestionTask getNewIngestionTask( + StorageService storageService, Store store, Version version, Properties kafkaConsumerProperties, @@ -54,6 +56,7 @@ public StoreIngestionTask getNewIngestionTask( DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) { if (version.isActiveActiveReplicationEnabled()) { return new ActiveActiveStoreIngestionTask( + storageService, builder, store, version, @@ -66,6 +69,7 @@ public StoreIngestionTask getNewIngestionTask( recordTransformerFunction); } return new LeaderFollowerStoreIngestionTask( + storageService, builder, store, version, diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java index fa8e79576a..3311fc95c9 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java @@ -28,6 +28,7 @@ import com.linkedin.davinci.stats.AggVersionedIngestionStats; import com.linkedin.davinci.stats.HostLevelIngestionStats; import com.linkedin.davinci.storage.StorageEngineRepository; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer; import com.linkedin.davinci.storage.chunking.ChunkingUtils; import com.linkedin.davinci.store.AbstractStorageEngine; @@ -231,6 +232,7 @@ public void testisReadyToServeAnnouncedWithRTLag() { Version mockVersion = new VersionImpl(STORE_NAME, 1, PUSH_JOB_ID); mockVersion.setHybridStoreConfig(hybridStoreConfig); + StorageService storageService = mock(StorageService.class); Store store = new ZKStore( STORE_NAME, "Felix", @@ -250,6 +252,7 @@ public void testisReadyToServeAnnouncedWithRTLag() { VeniceStoreVersionConfig storeVersionConfig = new VeniceStoreVersionConfig(STORE_NAME + "_v1", new VeniceProperties(kafkaConsumerProperties)); ActiveActiveStoreIngestionTask ingestionTask = new ActiveActiveStoreIngestionTask( + storageService, builder, store, mockVersion, diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java index 65ca79333e..77202178cd 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java @@ -5,12 +5,14 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atMostOnce; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; @@ -22,6 +24,7 @@ import com.linkedin.davinci.config.VeniceStoreVersionConfig; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.davinci.store.AbstractStorageEngineTest; import com.linkedin.venice.exceptions.VeniceNoStoreException; @@ -52,9 +55,11 @@ import com.linkedin.venice.utils.DataProviderUtils; import com.linkedin.venice.utils.Pair; import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.locks.ResourceAutoClosableLockManager; import io.tehuti.metrics.MetricsRepository; import it.unimi.dsi.fastutil.ints.Int2ObjectMaps; import it.unimi.dsi.fastutil.objects.Object2IntMaps; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; import java.util.NavigableMap; @@ -62,6 +67,8 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import org.apache.avro.Schema; import org.mockito.Mockito; @@ -72,6 +79,7 @@ @Test public abstract class KafkaStoreIngestionServiceTest { + private StorageService mockStorageService; private StorageEngineRepository mockStorageEngineRepository; private VeniceConfigLoader mockVeniceConfigLoader; private StorageMetadataService storageMetadataService; @@ -88,7 +96,9 @@ public abstract class KafkaStoreIngestionServiceTest { @BeforeClass public void setUp() { + mockStorageService = mock(StorageService.class); mockStorageEngineRepository = mock(StorageEngineRepository.class); + when(mockStorageService.getStorageEngineRepository()).thenReturn(mockStorageEngineRepository); doReturn(mock(AbstractStorageEngine.class)).when(mockStorageEngineRepository).getLocalStorageEngine(anyString()); storageMetadataService = mock(StorageMetadataService.class); mockClusterInfoProvider = mock(ClusterInfoProvider.class); @@ -149,7 +159,7 @@ private void setupMockConfig() { @Test public void testDisableMetricsEmission() { kafkaStoreIngestionService = new KafkaStoreIngestionService( - mockStorageEngineRepository, + mockStorageService, mockVeniceConfigLoader, storageMetadataService, mockClusterInfoProvider, @@ -233,7 +243,7 @@ public void testGetIngestingTopicsNotWithOnlineVersion() { // Without starting the ingestion service test getIngestingTopicsWithVersionStatusNotOnline would return the correct // topics under different scenarios. kafkaStoreIngestionService = new KafkaStoreIngestionService( - mockStorageEngineRepository, + mockStorageService, mockVeniceConfigLoader, storageMetadataService, mockClusterInfoProvider, @@ -321,7 +331,7 @@ public void testGetIngestingTopicsNotWithOnlineVersion() { @Test public void testCloseStoreIngestionTask() { kafkaStoreIngestionService = new KafkaStoreIngestionService( - mockStorageEngineRepository, + mockStorageService, mockVeniceConfigLoader, storageMetadataService, mockClusterInfoProvider, @@ -386,7 +396,7 @@ public void testCloseStoreIngestionTask() { @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class) public void testStoreIngestionTaskShutdownLastPartition(boolean isIsolatedIngestion) { kafkaStoreIngestionService = new KafkaStoreIngestionService( - mockStorageEngineRepository, + mockStorageService, mockVeniceConfigLoader, storageMetadataService, mockClusterInfoProvider, @@ -489,4 +499,63 @@ public void testHasCurrentVersionBootstrapping() { assertTrue(KafkaStoreIngestionService.hasCurrentVersionBootstrapping(mapContainsCurrentBootstrappingTask)); } + + @Test + public void testDropStoragePartitionGracefully() throws NoSuchFieldException, IllegalAccessException { + kafkaStoreIngestionService = mock(KafkaStoreIngestionService.class); + String topicName = "test-store_v1"; + int partitionId = 0; + VeniceProperties veniceProperties = AbstractStorageEngineTest.getServerProperties(PersistenceType.ROCKS_DB); + VeniceStoreVersionConfig config = new VeniceStoreVersionConfig(topicName, veniceProperties); + doCallRealMethod().when(kafkaStoreIngestionService).dropStoragePartitionGracefully(config, partitionId); + + Field topicLockManagerField = kafkaStoreIngestionService.getClass().getDeclaredField("topicLockManager"); + topicLockManagerField.setAccessible(true); + topicLockManagerField.set(kafkaStoreIngestionService, new ResourceAutoClosableLockManager<>(ReentrantLock::new)); + + NavigableMap topicNameToIngestionTaskMap = mock(NavigableMap.class); + Field topicNameToIngestionTaskMapField = + kafkaStoreIngestionService.getClass().getDeclaredField("topicNameToIngestionTaskMap"); + topicNameToIngestionTaskMapField.setAccessible(true); + topicNameToIngestionTaskMapField.set(kafkaStoreIngestionService, topicNameToIngestionTaskMap); + + PubSubTopicRepository pubSubTopicRepository = mock(PubSubTopicRepository.class); + Field pubSubTopicRepositoryField = kafkaStoreIngestionService.getClass().getDeclaredField("pubSubTopicRepository"); + pubSubTopicRepositoryField.setAccessible(true); + pubSubTopicRepositoryField.set(kafkaStoreIngestionService, pubSubTopicRepository); + + StorageService storageService = mock(StorageService.class); + Field storageServiceField = kafkaStoreIngestionService.getClass().getDeclaredField("storageService"); + storageServiceField.setAccessible(true); + storageServiceField.set(kafkaStoreIngestionService, storageService); + + StoreIngestionTask storeIngestionTask = mock(StoreIngestionTask.class); + + PriorityBlockingQueue consumerActionsQueue = mock(PriorityBlockingQueue.class); + Field consumerActionsQueueField = StoreIngestionTask.class.getDeclaredField("consumerActionsQueue"); + consumerActionsQueueField.setAccessible(true); + consumerActionsQueueField.set(storeIngestionTask, consumerActionsQueue); + + Set pendingPartitionDrops = mock(Set.class); + Field pendingPartitionDropsField = StoreIngestionTask.class.getDeclaredField("pendingPartitionDrops"); + pendingPartitionDropsField.setAccessible(true); + pendingPartitionDropsField.set(storeIngestionTask, pendingPartitionDrops); + + when(topicNameToIngestionTaskMap.get(topicName)).thenReturn(storeIngestionTask); + doCallRealMethod().when(storeIngestionTask).dropPartition(any()); + + PubSubTopic pubSubTopic = mock(PubSubTopic.class); + when(pubSubTopicRepository.getTopic(topicName)).thenReturn(pubSubTopic); + + // Verify that when the ingestion task is running, it drops the store partition asynchronously + when(storeIngestionTask.isRunning()).thenReturn(true); + kafkaStoreIngestionService.dropStoragePartitionGracefully(config, partitionId); + verify(storeIngestionTask).dropPartition(any()); + verify(consumerActionsQueue).add(any()); + + // Verify that when the ingestion task isn't running, it drops the store partition synchronously + when(storeIngestionTask.isRunning()).thenReturn(false); + kafkaStoreIngestionService.dropStoragePartitionGracefully(config, partitionId); + verify(storageService).dropStorePartition(config, partitionId, true); + } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PushTimeoutTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PushTimeoutTest.java index 966ea0906e..9042bc2848 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PushTimeoutTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PushTimeoutTest.java @@ -12,6 +12,7 @@ import com.linkedin.davinci.stats.AggHostLevelIngestionStats; import com.linkedin.davinci.stats.HostLevelIngestionStats; import com.linkedin.davinci.storage.StorageMetadataService; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.venice.exceptions.VeniceTimeoutException; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; @@ -55,6 +56,7 @@ public void testPushTimeoutForLeaderFollowerStores() { .setHostLevelIngestionStats(mockAggStoreIngestionStats) .setPubSubTopicRepository(pubSubTopicRepository); + StorageService storageService = mock(StorageService.class); Store mockStore = builder.getMetadataRepo().getStoreOrThrow(storeName); Version version = mockStore.getVersion(versionNumber); @@ -66,6 +68,7 @@ public void testPushTimeoutForLeaderFollowerStores() { doReturn(versionTopic).when(mockVeniceStoreVersionConfig).getStoreVersionName(); LeaderFollowerStoreIngestionTask leaderFollowerStoreIngestionTask = new LeaderFollowerStoreIngestionTask( + storageService, builder, mockStore, version, @@ -113,6 +116,7 @@ public void testReportIfCatchUpBaseTopicOffsetRouteWillNotMakePushTimeout() { .setHostLevelIngestionStats(mockAggStoreIngestionStats) .setPubSubTopicRepository(pubSubTopicRepository); + StorageService storageService = mock(StorageService.class); Store mockStore = builder.getMetadataRepo().getStoreOrThrow(storeName); Version version = mockStore.getVersion(versionNumber); @@ -139,6 +143,7 @@ public void testReportIfCatchUpBaseTopicOffsetRouteWillNotMakePushTimeout() { doReturn(mockOffsetRecord).when(mockStorageMetadataService).getLastOffset(eq(versionTopic), eq(0)); LeaderFollowerStoreIngestionTask leaderFollowerStoreIngestionTask = new LeaderFollowerStoreIngestionTask( + storageService, builder, mockStore, version, diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 4966daf8dc..f18609dfe7 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -96,6 +96,7 @@ import com.linkedin.davinci.stats.KafkaConsumerServiceStats; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.davinci.store.AbstractStorageIterator; import com.linkedin.davinci.store.AbstractStoragePartition; @@ -818,6 +819,7 @@ private void runTest( true, aaConfig, storeVersionConfigOverride); + StorageService storageService = mock(StorageService.class); Store mockStore = storeAndVersionConfigsUnderTest.store; Version version = storeAndVersionConfigsUnderTest.version; VeniceStoreVersionConfig storeConfig = storeAndVersionConfigsUnderTest.storeVersionConfig; @@ -835,6 +837,7 @@ private void runTest( storeIngestionTaskUnderTest = spy( ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, version, kafkaProps, @@ -2759,6 +2762,7 @@ public void testRecordsCanBeThrottledPerRegion() throws ExecutionException, Inte false, false, AA_ON); + StorageService storageService = mock(StorageService.class); Store mockStore = storeAndVersionConfigs.store; Version version = storeAndVersionConfigs.version; VeniceStoreVersionConfig storeConfig = storeAndVersionConfigs.storeVersionConfig; @@ -2778,6 +2782,7 @@ public void testRecordsCanBeThrottledPerRegion() throws ExecutionException, Inte kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); storeIngestionTaskUnderTest = ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, version, kafkaProps, @@ -2895,6 +2900,7 @@ public void testIsReadyToServe(NodeType nodeType, AAConfig aaConfig, DataReplica false, true, aaConfig); + StorageService storageService = mock(StorageService.class); Store mockStore = storeAndVersionConfigs.store; Version version = storeAndVersionConfigs.version; VeniceStoreVersionConfig storeConfig = storeAndVersionConfigs.storeVersionConfig; @@ -2926,6 +2932,7 @@ public void testIsReadyToServe(NodeType nodeType, AAConfig aaConfig, DataReplica kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); storeIngestionTaskUnderTest = ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, version, kafkaProps, @@ -3119,6 +3126,7 @@ public void testActiveActiveStoreIsReadyToServe(HybridConfig hybridConfig, NodeT false, true, AA_ON); + StorageService storageService = mock(StorageService.class); Store mockStore = storeAndVersionConfigs.store; Version version = storeAndVersionConfigs.version; VeniceStoreVersionConfig storeConfig = storeAndVersionConfigs.storeVersionConfig; @@ -3142,6 +3150,7 @@ public void testActiveActiveStoreIsReadyToServe(HybridConfig hybridConfig, NodeT Properties kafkaProps = new Properties(); kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); storeIngestionTaskUnderTest = ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, version, kafkaProps, @@ -3258,6 +3267,7 @@ public void testCheckAndLogIfLagIsAcceptableForHybridStore( false, true, aaConfig); + StorageService storageService = mock(StorageService.class); Store mockStore = storeAndVersionConfigs.store; Version version = storeAndVersionConfigs.version; VeniceStoreVersionConfig storeConfig = storeAndVersionConfigs.storeVersionConfig; @@ -3287,6 +3297,7 @@ public void testCheckAndLogIfLagIsAcceptableForHybridStore( Properties kafkaProps = new Properties(); kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); storeIngestionTaskUnderTest = ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, version, kafkaProps, @@ -3423,6 +3434,7 @@ public void testGetAndUpdateLeaderCompletedState(HybridConfig hybridConfig, Node false, true, AA_ON); + StorageService storageService = mock(StorageService.class); Store mockStore = storeAndVersionConfigs.store; Version version = storeAndVersionConfigs.version; VeniceStoreVersionConfig storeConfig = storeAndVersionConfigs.storeVersionConfig; @@ -3439,6 +3451,7 @@ public void testGetAndUpdateLeaderCompletedState(HybridConfig hybridConfig, Node kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); LeaderFollowerStoreIngestionTask ingestionTask = (LeaderFollowerStoreIngestionTask) ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, version, kafkaProps, @@ -3518,6 +3531,7 @@ public void testProcessTopicSwitch(NodeType nodeType) { new HybridStoreConfigImpl(100, 100, 100, DataReplicationPolicy.AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP); MockStoreVersionConfigs storeAndVersionConfigs = setupStoreAndVersionMocks(2, partitionerConfig, Optional.of(hybridStoreConfig), false, true, AA_OFF); + StorageService storageService = mock(StorageService.class); Store mockStore = storeAndVersionConfigs.store; Version version = storeAndVersionConfigs.version; VeniceStoreVersionConfig storeConfig = storeAndVersionConfigs.storeVersionConfig; @@ -3534,6 +3548,7 @@ public void testProcessTopicSwitch(NodeType nodeType) { kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); storeIngestionTaskUnderTest = ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, version, kafkaProps, @@ -3577,6 +3592,7 @@ public void testUpdateConsumedUpstreamRTOffsetMapDuringRTSubscription(AAConfig a doReturn(VersionStatus.STARTED).when(mockVersion).getStatus(); ReadOnlyStoreRepository mockReadOnlyStoreRepository = mock(ReadOnlyStoreRepository.class); + StorageService storageService = mock(StorageService.class); Store mockStore = mock(Store.class); doReturn(storeName).when(mockStore).getName(); doReturn(mockStore).when(mockReadOnlyStoreRepository).getStoreOrThrow(eq(storeName)); @@ -3606,6 +3622,7 @@ public void testUpdateConsumedUpstreamRTOffsetMapDuringRTSubscription(AAConfig a LeaderFollowerStoreIngestionTask ingestionTask = (LeaderFollowerStoreIngestionTask) ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, mockVersion, mockKafkaConsumerProperties, @@ -3711,6 +3728,7 @@ public void testLeaderShouldSubscribeToCorrectVTOffset() { doReturn("localhost").when(version).getPushStreamSourceAddress(); Store store = mock(Store.class); + StorageService storageService = mock(StorageService.class); doReturn(version).when(store).getVersion(eq(1)); String versionTopicName = "testStore_v1"; @@ -3719,6 +3737,7 @@ public void testLeaderShouldSubscribeToCorrectVTOffset() { doReturn(versionTopicName).when(storeConfig).getStoreVersionName(); LeaderFollowerStoreIngestionTask leaderFollowerStoreIngestionTask = spy( new LeaderFollowerStoreIngestionTask( + storageService, builder, store, version, @@ -4214,6 +4233,7 @@ public void testBatchOnlyStoreDataRecovery() { DataRecoveryVersionConfig dataRecoveryVersionConfig = new DataRecoveryVersionConfigImpl("dc-0", false, 1); doReturn(dataRecoveryVersionConfig).when(version).getDataRecoveryVersionConfig(); + StorageService storageService = mock(StorageService.class); Store store = mock(Store.class); doReturn(version).when(store).getVersion(eq(1)); @@ -4230,6 +4250,7 @@ public void testBatchOnlyStoreDataRecovery() { null).build(); doReturn(Version.parseStoreFromVersionTopic(topic)).when(store).getName(); storeIngestionTaskUnderTest = ingestionTaskFactory.getNewIngestionTask( + storageService, store, version, new Properties(), @@ -4289,6 +4310,7 @@ public void testMaybeSendIngestionHeartbeat( NodeType nodeType, HybridConfig hybridConfig) { String storeName = Utils.getUniqueString("store"); + StorageService storageService = mock(StorageService.class); Store mockStore = mock(Store.class); doReturn(storeName).when(mockStore).getName(); String versionTopic = Version.composeKafkaTopic(storeName, 1); @@ -4345,6 +4367,7 @@ public void testMaybeSendIngestionHeartbeat( .build(); LeaderFollowerStoreIngestionTask ingestionTask = (LeaderFollowerStoreIngestionTask) ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, mockVersion, mockKafkaConsumerProperties, @@ -4378,6 +4401,7 @@ public void testMaybeSendIngestionHeartbeat( @Test public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() throws InterruptedException { String storeName = Utils.getUniqueString("store"); + StorageService storageService = mock(StorageService.class); Store mockStore = mock(Store.class); doReturn(storeName).when(mockStore).getName(); String versionTopic = Version.composeKafkaTopic(storeName, 1); @@ -4432,6 +4456,7 @@ public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() throws Inter .build(); LeaderFollowerStoreIngestionTask ingestionTask = (LeaderFollowerStoreIngestionTask) ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, mockVersion, mockKafkaConsumerProperties, diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java index 2e978c5087..f1101f34b8 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java @@ -9,6 +9,7 @@ import com.linkedin.d2.balancer.D2Client; import com.linkedin.davinci.listener.response.ServerCurrentVersionResponse; import com.linkedin.davinci.storage.StorageEngineRepository; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.r2.message.rest.RestRequest; import com.linkedin.r2.message.rest.RestRequestBuilder; import com.linkedin.r2.message.rest.RestResponse; @@ -49,6 +50,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.Encoder; @@ -338,4 +340,73 @@ public void testStartServerWithSystemSchemaInitialization() { }); } } + + @Test + public void testDropStorePartitionAsynchronously() { + try (VeniceClusterWrapper cluster = ServiceFactory.getVeniceCluster(1, 1, 0)) { + Properties featureProperties = new Properties(); + featureProperties.setProperty(SERVER_ENABLE_SERVER_ALLOW_LIST, Boolean.toString(true)); + featureProperties.setProperty(SERVER_IS_AUTO_JOIN, Boolean.toString(true)); + VeniceServerWrapper server = cluster.getVeniceServers().get(0); + Assert.assertTrue(server.getVeniceServer().isStarted()); + + StorageService storageService = server.getVeniceServer().getStorageService(); + Assert.assertTrue(server.getVeniceServer().isStarted()); + final StorageEngineRepository repository = storageService.getStorageEngineRepository(); + Assert + .assertTrue(repository.getAllLocalStorageEngines().isEmpty(), "New node should not have any storage engine."); + + // Create a new store + String storeName = cluster.createStore(1); + String storeVersionName = Version.composeKafkaTopic(storeName, 1); + Assert.assertEquals(repository.getAllLocalStorageEngines().size(), 1); + Assert.assertTrue(server.getVeniceServer().getHelixParticipationService().isRunning()); + Assert.assertEquals(storageService.getStorageEngine(storeVersionName).getPartitionIds().size(), 3); + + // Add servers to trigger a rebalance, which will redistribute and drop partitions for the current participant + cluster.addVeniceServer(featureProperties, new Properties()); + cluster.addVeniceServer(featureProperties, new Properties()); + cluster.addVeniceServer(featureProperties, new Properties()); + + Assert.assertEquals(repository.getAllLocalStorageEngines().size(), 1); + + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, () -> { + // Partitions should have been dropped asynchronously due to rebalancing + Assert.assertTrue(storageService.getStorageEngine(storeVersionName).getPartitionIds().size() < 3); + }); + } + } + + @Test + public void testDropStorePartitionSynchronously() { + try (VeniceClusterWrapper cluster = ServiceFactory.getVeniceCluster(1, 1, 0)) { + Properties featureProperties = new Properties(); + featureProperties.setProperty(SERVER_ENABLE_SERVER_ALLOW_LIST, Boolean.toString(true)); + featureProperties.setProperty(SERVER_IS_AUTO_JOIN, Boolean.toString(true)); + VeniceServerWrapper server = cluster.getVeniceServers().get(0); + Assert.assertTrue(server.getVeniceServer().isStarted()); + + StorageService storageService = server.getVeniceServer().getStorageService(); + Assert.assertTrue(server.getVeniceServer().isStarted()); + final StorageEngineRepository repository = storageService.getStorageEngineRepository(); + Assert + .assertTrue(repository.getAllLocalStorageEngines().isEmpty(), "New node should not have any storage engine."); + + // Create a new store + String storeName = cluster.createStore(1); + String storeVersionName = Version.composeKafkaTopic(storeName, 1); + Assert.assertEquals(repository.getAllLocalStorageEngines().size(), 1); + Assert.assertTrue(server.getVeniceServer().getHelixParticipationService().isRunning()); + Assert.assertEquals(storageService.getStorageEngine(storeVersionName).getPartitionIds().size(), 3); + + cluster.useControllerClient(controllerClient -> { + controllerClient.disableAndDeleteStore(storeName); + }); + + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, () -> { + // All partitions should have been dropped synchronously + Assert.assertEquals(repository.getAllLocalStorageEngines().size(), 0); + }); + } + } } diff --git a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java index 2611f92dec..9db76f074c 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java @@ -365,7 +365,7 @@ private List createServices() { // create and add KafkaSimpleConsumerService this.kafkaStoreIngestionService = new KafkaStoreIngestionService( - storageService.getStorageEngineRepository(), + storageService, veniceConfigLoader, storageMetadataService, new StaticClusterInfoProvider(Collections.singleton(clusterConfig.getClusterName())),