diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/StorageService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/StorageService.java index f6a92a5add..9c449160af 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/StorageService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/StorageService.java @@ -17,6 +17,8 @@ import com.linkedin.venice.ConfigKeys; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.exceptions.VeniceNoStoreException; +import com.linkedin.venice.helix.SafeHelixDataAccessor; +import com.linkedin.venice.helix.SafeHelixManager; import com.linkedin.venice.kafka.protocol.state.PartitionState; import com.linkedin.venice.kafka.protocol.state.StoreVersionState; import com.linkedin.venice.meta.PersistenceType; @@ -42,6 +44,8 @@ import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Supplier; +import org.apache.helix.PropertyKey; +import org.apache.helix.model.IdealState; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.rocksdb.RocksDBException; @@ -371,6 +375,42 @@ public synchronized AbstractStorageEngine openStore( return engine; } + public synchronized void checkWhetherStoragePartitionsShouldBeKeptOrNot(SafeHelixManager manager) { + if (manager == null) { + return; + } + for (AbstractStorageEngine storageEngine: getStorageEngineRepository().getAllLocalStorageEngines()) { + String storeName = storageEngine.getStoreVersionName(); + Set storageEnginePartitionIds = new HashSet<>(storageEngine.getPartitionIds()); + String instanceHostName = manager.getInstanceName(); + PropertyKey.Builder propertyKeyBuilder = + new PropertyKey.Builder(configLoader.getVeniceClusterConfig().getClusterName()); + SafeHelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor(); + IdealState idealState = helixDataAccessor.getProperty(propertyKeyBuilder.idealStates(storeName)); + + if (idealState != null) { + Map> mapFields = idealState.getRecord().getMapFields(); + for (Integer partitionId: storageEnginePartitionIds) { + if (storageEngine.isMetadataPartition(partitionId)) { + continue; + } + String partitionDbName = storeName + "_" + partitionId; + if (!mapFields.containsKey(partitionDbName) + || !mapFields.get(partitionDbName).containsKey(instanceHostName)) { + storageEngine.dropPartition(partitionId); + } + } + if (storageEngine.getPartitionIds().isEmpty()) { + removeStorageEngine(storeName); + } + } else { + // if (storageEngine.getPartitionIds().isEmpty()) { + // removeStorageEngine(storeName); + // } + } + } + } + /** * Drops the partition of the specified store version in the storage service. When all data partitions are dropped, * it will also drop the storage engine of the specific store version. diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/StorageServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/StorageServiceTest.java index a86a4879be..6f163afff5 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/StorageServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/StorageServiceTest.java @@ -1,11 +1,16 @@ package com.linkedin.davinci.storage; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.linkedin.davinci.config.VeniceClusterConfig; import com.linkedin.davinci.config.VeniceConfigLoader; import com.linkedin.davinci.config.VeniceServerConfig; import com.linkedin.davinci.config.VeniceStoreVersionConfig; @@ -14,6 +19,8 @@ import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.davinci.store.StorageEngineFactory; import com.linkedin.venice.exceptions.VeniceNoStoreException; +import com.linkedin.venice.helix.SafeHelixDataAccessor; +import com.linkedin.venice.helix.SafeHelixManager; import com.linkedin.venice.kafka.protocol.state.PartitionState; import com.linkedin.venice.kafka.protocol.state.StoreVersionState; import com.linkedin.venice.meta.PartitionerConfig; @@ -23,13 +30,21 @@ import com.linkedin.venice.meta.Version; import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer; import com.linkedin.venice.utils.Utils; +import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import org.apache.helix.PropertyKey; +import org.apache.helix.model.IdealState; +import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.mockito.internal.util.collections.Sets; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.Test; @@ -121,4 +136,74 @@ public void testGetStoreAndUserPartitionsMapping() { expectedMapping.put(resourceName, partitionSet); Assert.assertEquals(storageService.getStoreAndUserPartitionsMapping(), expectedMapping); } + + @Test + public void testCheckWhetherStoragePartitionsShouldBeKeptOrNot() throws NoSuchFieldException, IllegalAccessException { + StorageService mockStorageService = mock(StorageService.class); + SafeHelixManager manager = mock(SafeHelixManager.class); + StorageEngineRepository mockStorageEngineRepository = mock(StorageEngineRepository.class); + AbstractStorageEngine abstractStorageEngine = mock(AbstractStorageEngine.class); + mockStorageEngineRepository.addLocalStorageEngine(abstractStorageEngine); + + String resourceName = "test_store_v1"; + String storeName = "test_store"; + + when(abstractStorageEngine.getStoreVersionName()).thenReturn(resourceName); + abstractStorageEngine.addStoragePartition(0); + abstractStorageEngine.addStoragePartition(1); + + String clusterName = "test_cluster"; + VeniceConfigLoader mockVeniceConfigLoader = mock(VeniceConfigLoader.class); + VeniceClusterConfig mockClusterConfig = mock(VeniceClusterConfig.class); + when(mockVeniceConfigLoader.getVeniceClusterConfig()).thenReturn(mockClusterConfig); + when(mockVeniceConfigLoader.getVeniceClusterConfig().getClusterName()).thenReturn(clusterName); + + List localStorageEngines = new ArrayList<>(); + localStorageEngines.add(abstractStorageEngine); + + SafeHelixDataAccessor helixDataAccessor = mock(SafeHelixDataAccessor.class); + when(manager.getHelixDataAccessor()).thenReturn(helixDataAccessor); + IdealState idealState = mock(IdealState.class); + when(helixDataAccessor.getProperty((PropertyKey) any())).thenReturn(idealState); + ZNRecord record = new ZNRecord("testId"); + Map> mapFields = new HashMap<>(); + Map testPartitionZero = new HashMap<>(); + Map testPartitionOne = new HashMap<>(); + testPartitionZero.put("host_1430", "LEADER"); + testPartitionZero.put("host_1435", "STANDBY"); + testPartitionZero.put("host_1440", "STANDBY"); + testPartitionOne.put("host_1520", "LEADER"); + testPartitionOne.put("host_1525", "STANDBY"); + testPartitionOne.put("host_1530", "STANDBY"); + mapFields.put("test_store_v1_0", testPartitionZero); + mapFields.put("test_store_v1_1", testPartitionOne); + record.setMapFields(mapFields); + when(idealState.getRecord()).thenReturn(record); + when(manager.getInstanceName()).thenReturn("host_1520"); + + Set partitionSet = new HashSet<>(Arrays.asList(0, 1)); + when(abstractStorageEngine.getPartitionIds()).thenReturn(partitionSet); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + int partitionId = invocation.getArgument(0); + abstractStorageEngine.getPartitionIds().remove(partitionId); + return null; + } + }).when(abstractStorageEngine).dropPartition(anyInt()); + + Field storageEngineRepositoryField = StorageService.class.getDeclaredField("storageEngineRepository"); + storageEngineRepositoryField.setAccessible(true); + storageEngineRepositoryField.set(mockStorageService, mockStorageEngineRepository); + when(mockStorageService.getStorageEngineRepository()).thenReturn(mockStorageEngineRepository); + when(mockStorageService.getStorageEngineRepository().getAllLocalStorageEngines()).thenReturn(localStorageEngines); + Field configLoaderField = StorageService.class.getDeclaredField("configLoader"); + configLoaderField.setAccessible(true); + configLoaderField.set(mockStorageService, mockVeniceConfigLoader); + + doCallRealMethod().when(mockStorageService).checkWhetherStoragePartitionsShouldBeKeptOrNot(manager); + mockStorageService.checkWhetherStoragePartitionsShouldBeKeptOrNot(manager); + verify(abstractStorageEngine).dropPartition(0); + Assert.assertFalse(abstractStorageEngine.getPartitionIds().contains(0)); + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java index 2e978c5087..b96499b250 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java @@ -9,6 +9,7 @@ import com.linkedin.d2.balancer.D2Client; import com.linkedin.davinci.listener.response.ServerCurrentVersionResponse; import com.linkedin.davinci.storage.StorageEngineRepository; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.r2.message.rest.RestRequest; import com.linkedin.r2.message.rest.RestRequestBuilder; import com.linkedin.r2.message.rest.RestResponse; @@ -182,6 +183,38 @@ public void testCheckBeforeJointClusterBeforeHelixInitializingCluster() throws E } } + @Test + public void testStartServerAndShutdownWithPartitionAssignmentVerification() { + try (VeniceClusterWrapper cluster = ServiceFactory.getVeniceCluster(1, 0, 0)) { + Properties featureProperties = new Properties(); + featureProperties.setProperty(SERVER_ENABLE_SERVER_ALLOW_LIST, Boolean.toString(true)); + featureProperties.setProperty(SERVER_IS_AUTO_JOIN, Boolean.toString(true)); + cluster.addVeniceServer(featureProperties, new Properties()); + VeniceServerWrapper server = cluster.getVeniceServers().get(0); + Assert.assertTrue(server.getVeniceServer().isStarted()); + StorageService storageService = server.getVeniceServer().getStorageService(); + StorageEngineRepository repository = storageService.getStorageEngineRepository(); + Assert + .assertTrue(repository.getAllLocalStorageEngines().isEmpty(), "New node should not have any storage engine."); + + // Create a storage engine. + String storeName = Version.composeKafkaTopic(cluster.createStore(1), 1); + Assert.assertEquals(repository.getAllLocalStorageEngines().size(), 1); + Assert.assertTrue(server.getVeniceServer().getHelixParticipationService().isRunning()); + Assert.assertEquals(storageService.getStorageEngine(storeName).getPartitionIds().size(), 3); + + cluster.stopVeniceServer(server.getPort()); + + // Create new servers so partition assignment is removed for the offline participant + cluster.addVeniceServer(featureProperties, new Properties()); + cluster.addVeniceServer(featureProperties, new Properties()); + + cluster.restartVeniceServer(server.getPort()); + repository = server.getVeniceServer().getStorageService().getStorageEngineRepository(); + Assert.assertEquals(repository.getAllLocalStorageEngines().size(), 0); + } + } + @Test public void testMetadataFetchRequest() throws ExecutionException, InterruptedException, IOException { Utils.thisIsLocalhost(); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 92a1c34713..27da62ead3 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -6331,8 +6331,10 @@ private void createClusterIfRequired(String clusterName) { helixClusterProperties .put(ClusterConfig.ClusterConfigProperty.DELAY_REBALANCE_TIME.name(), String.valueOf(delayedTime)); } - // Topology and fault zone type fields are used by CRUSH/CRUSHED/WAGED/etc alg. Helix would apply the constrains on - // these alg to choose proper instance to hold the replica. + helixClusterProperties + .put(ClusterConfig.ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.name(), String.valueOf(true)); + // Topology and fault zone type fields are used by CRUSH alg. Helix would apply the constrains on CRUSH alg to + // choose proper instance to hold the replica. helixClusterProperties .put(ClusterConfig.ClusterConfigProperty.TOPOLOGY.name(), "/" + HelixUtils.TOPOLOGY_CONSTRAINT); helixClusterProperties diff --git a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java index d3a76cf791..2d80fd0930 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java @@ -355,6 +355,11 @@ private List createServices() { return helixData; }); + managerFuture.thenApply(manager -> { + storageService.checkWhetherStoragePartitionsShouldBeKeptOrNot(manager); + return true; + }); + heartbeatMonitoringService = new HeartbeatMonitoringService( metricsRepository, metadataRepo,