Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[venice-server] Dropping unassigned partitions #1196

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,10 @@ public Instance getInstance() {
return instance;
}

public SafeHelixManager getHelixManager() {
return helixManager;
}

public VeniceOfflinePushMonitorAccessor getVeniceOfflinePushMonitorAccessor() {
return veniceOfflinePushMonitorAccessor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class StorageService extends AbstractVeniceService {
* @param restoreDataPartitions indicates if store data needs to be restored.
* @param restoreMetadataPartitions indicates if meta data needs to be restored.
* @param checkWhetherStorageEngineShouldBeKeptOrNot check whether the local storage engine should be kept or not.
* @param checkWhetherStoragePartitionsShouldBeKeptOrNot check whether the partition is assigned and thus should be kept or not.
*/
StorageService(
VeniceConfigLoader configLoader,
Expand All @@ -88,6 +89,7 @@ public class StorageService extends AbstractVeniceService {
boolean restoreDataPartitions,
boolean restoreMetadataPartitions,
Function<String, Boolean> checkWhetherStorageEngineShouldBeKeptOrNot,
Function<AbstractStorageEngine, Void> checkWhetherStoragePartitionsShouldBeKeptOrNot,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create an additional constructor that doesn't have checkWhetherStoragePartitionsShouldBeKeptOrNot, so if users don't want to use this, they wouldn't have to pass in a functional interface that does nothing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that sounds correct; checkWhetherStoragePartitionsShouldBeKeptOrNot is mainly being written in VeniceServer.

Optional<Map<PersistenceType, StorageEngineFactory>> persistenceTypeToStorageEngineFactoryMapOptional) {
String dataPath = configLoader.getVeniceServerConfig().getDataBasePath();
if (!Utils.directoryExists(dataPath)) {
Expand Down Expand Up @@ -122,10 +124,97 @@ public class StorageService extends AbstractVeniceService {
configLoader,
restoreDataPartitions,
restoreMetadataPartitions,
checkWhetherStorageEngineShouldBeKeptOrNot);
checkWhetherStorageEngineShouldBeKeptOrNot,
checkWhetherStoragePartitionsShouldBeKeptOrNot);
}
}

/**
* Allocates a new {@code StorageService} object.
* @param configLoader a config loader to load configs related to cluster and server.
* @param storageEngineStats storage engine related stats.
* @param rocksDBMemoryStats RocksDB memory consumption stats.
* @param storeVersionStateSerializer serializer for translating a store-version level state into avro-format.
* @param partitionStateSerializer serializer for translating a partition state into avro-format.
* @param storeRepository supports readonly operations to access stores
* @param restoreDataPartitions indicates if store data needs to be restored.
* @param restoreMetadataPartitions indicates if meta data needs to be restored.
* @param checkWhetherStorageEngineShouldBeKeptOrNot check whether the local storage engine should be kept or not.
*/
StorageService(
VeniceConfigLoader configLoader,
AggVersionedStorageEngineStats storageEngineStats,
RocksDBMemoryStats rocksDBMemoryStats,
InternalAvroSpecificSerializer<StoreVersionState> storeVersionStateSerializer,
InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer,
ReadOnlyStoreRepository storeRepository,
boolean restoreDataPartitions,
boolean restoreMetadataPartitions,
Function<String, Boolean> checkWhetherStorageEngineShouldBeKeptOrNot,
Optional<Map<PersistenceType, StorageEngineFactory>> persistenceTypeToStorageEngineFactoryMapOptional) {
String dataPath = configLoader.getVeniceServerConfig().getDataBasePath();
if (!Utils.directoryExists(dataPath)) {
if (!configLoader.getVeniceServerConfig().isAutoCreateDataPath()) {
throw new VeniceException(
"Data directory '" + dataPath + "' does not exist and " + ConfigKeys.AUTOCREATE_DATA_PATH
+ " is disabled.");
}

File dataDir = new File(dataPath);
LOGGER.info("Creating data directory {}", dataDir.getAbsolutePath());
dataDir.mkdirs();
}

this.configLoader = configLoader;
this.serverConfig = configLoader.getVeniceServerConfig();
this.storageEngineRepository = new StorageEngineRepository();

this.aggVersionedStorageEngineStats = storageEngineStats;
this.rocksDBMemoryStats = rocksDBMemoryStats;
this.storeVersionStateSerializer = storeVersionStateSerializer;
this.partitionStateSerializer = partitionStateSerializer;
this.storeRepository = storeRepository;
if (persistenceTypeToStorageEngineFactoryMapOptional.isPresent()) {
this.persistenceTypeToStorageEngineFactoryMap = persistenceTypeToStorageEngineFactoryMapOptional.get();
} else {
this.persistenceTypeToStorageEngineFactoryMap = new HashMap<>();
initInternalStorageEngineFactories();
}
if (restoreDataPartitions || restoreMetadataPartitions) {
restoreAllStores(
configLoader,
restoreDataPartitions,
restoreMetadataPartitions,
checkWhetherStorageEngineShouldBeKeptOrNot,
se -> null);
}
}

public StorageService(
VeniceConfigLoader configLoader,
AggVersionedStorageEngineStats storageEngineStats,
RocksDBMemoryStats rocksDBMemoryStats,
InternalAvroSpecificSerializer<StoreVersionState> storeVersionStateSerializer,
InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer,
ReadOnlyStoreRepository storeRepository,
boolean restoreDataPartitions,
boolean restoreMetadataPartitions,
Function<String, Boolean> checkWhetherStorageEngineShouldBeKeptOrNot,
Function<AbstractStorageEngine, Void> checkWhetherStoragePartitionsShouldBeKeptOrNot) {
this(
configLoader,
storageEngineStats,
rocksDBMemoryStats,
storeVersionStateSerializer,
partitionStateSerializer,
storeRepository,
restoreDataPartitions,
restoreMetadataPartitions,
checkWhetherStorageEngineShouldBeKeptOrNot,
checkWhetherStoragePartitionsShouldBeKeptOrNot,
Optional.empty());
}

public StorageService(
VeniceConfigLoader configLoader,
AggVersionedStorageEngineStats storageEngineStats,
Expand Down Expand Up @@ -233,7 +322,8 @@ private void restoreAllStores(
VeniceConfigLoader configLoader,
boolean restoreDataPartitions,
boolean restoreMetadataPartitions,
Function<String, Boolean> checkWhetherStorageEngineShouldBeKeptOrNot) {
Function<String, Boolean> checkWhetherStorageEngineShouldBeKeptOrNot,
Function<AbstractStorageEngine, Void> checkWhetherStoragePartitionsShouldBeKeptOrNot) {
LOGGER.info("Start restoring all the stores persisted previously");
for (Map.Entry<PersistenceType, StorageEngineFactory> entry: persistenceTypeToStorageEngineFactoryMap.entrySet()) {
PersistenceType pType = entry.getKey();
Expand All @@ -254,6 +344,7 @@ private void restoreAllStores(
if (checkWhetherStorageEngineShouldBeKeptOrNot.apply(storeName)) {
try {
storageEngine = openStore(storeConfig, () -> null);
checkWhetherStoragePartitionsShouldBeKeptOrNot.apply(storageEngine);
} catch (Exception e) {
if (ExceptionUtils.recursiveClassEquals(e, RocksDBException.class)) {
LOGGER.warn("Encountered RocksDB error while opening store: {}", storeName, e);
Expand Down Expand Up @@ -467,7 +558,7 @@ public synchronized void closeStorageEngine(String kafkaTopic) {
public void cleanupAllStores(VeniceConfigLoader configLoader) {
// Load local storage and delete them safely.
// TODO Just clean the data dir in case loading and deleting is too slow.
restoreAllStores(configLoader, true, true, s -> true);
restoreAllStores(configLoader, true, true, s -> true, se -> null);
LOGGER.info("Start cleaning up all the stores persisted previously");
storageEngineRepository.getAllLocalStorageEngines().stream().forEach(storageEngine -> {
String storeName = storageEngine.getStoreVersionName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public void testGetStoreAndUserPartitionsMapping() {
true,
true,
(s) -> true,
(se) -> null,
Optional.of(persistenceTypeToStorageEngineFactoryMap));

Map<String, Set<Integer>> expectedMapping = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.venice.acl.DynamicAccessController;
import com.linkedin.venice.acl.StaticAccessController;
import com.linkedin.venice.cleaner.BackupVersionOptimizationService;
Expand All @@ -42,10 +43,12 @@
import com.linkedin.venice.listener.ServerReadMetadataRepository;
import com.linkedin.venice.listener.ServerStoreAclHandler;
import com.linkedin.venice.listener.StoreValueSchemasCacheService;
import com.linkedin.venice.meta.IngestionMode;
import com.linkedin.venice.meta.ReadOnlyLiveClusterConfigRepository;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.StaticClusterInfoProvider;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubClientsFactory;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.security.SSLFactory;
Expand All @@ -66,11 +69,16 @@
import io.tehuti.metrics.MetricsRepository;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.IdealState;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -310,14 +318,21 @@ private List<AbstractVeniceService> createServices() {
? new RocksDBMemoryStats(metricsRepository, "RocksDBMemoryStats", plainTableEnabled)
: null;

boolean whetherToRestoreDataPartitions = !isIsolatedIngestion()
|| veniceConfigLoader.getVeniceServerConfig().freezeIngestionIfReadyToServeOrLocalDataExists();

// Create and add StorageService. storeRepository will be populated by StorageService
storageService = new StorageService(
veniceConfigLoader,
storageEngineStats,
rocksDBMemoryStats,
storeVersionStateSerializer,
partitionStateSerializer,
metadataRepo);
metadataRepo,
whetherToRestoreDataPartitions,
true,
functionToCheckWhetherStorageEngineShouldBeKeptOrNot(),
functionToCheckWhetherStoragePartitionsShouldBeKeptOrNot());
storageEngineMetadataService =
new StorageEngineMetadataService(storageService.getStorageEngineRepository(), partitionStateSerializer);
services.add(storageEngineMetadataService);
Expand Down Expand Up @@ -693,6 +708,40 @@ protected VeniceConfigLoader getConfigLoader() {
return veniceConfigLoader;
}

protected final boolean isIsolatedIngestion() {
return veniceConfigLoader.getVeniceServerConfig().getIngestionMode().equals(IngestionMode.ISOLATED);
}

private Function<String, Boolean> functionToCheckWhetherStorageEngineShouldBeKeptOrNot() {
return storageEngineName -> true;
}

private Function<AbstractStorageEngine, Void> functionToCheckWhetherStoragePartitionsShouldBeKeptOrNot() {
return storageEngine -> {
String storageEngineName = storageEngine.toString();
String storeName = Version.parseStoreFromKafkaTopicName(storageEngineName);
PropertyKey.Builder propertyKeyBuilder =
new PropertyKey.Builder(veniceConfigLoader.getVeniceClusterConfig().getClusterName());
IdealState idealState = getHelixParticipationService().getHelixManager()
.getHelixDataAccessor()
.getProperty(propertyKeyBuilder.idealStates(storeName));

Set<Integer> idealStatePartitionIds = new HashSet<>();
idealState.getPartitionSet().stream().forEach(partitionId -> {
idealStatePartitionIds.add(Integer.parseInt(partitionId));
});
Set<Integer> storageEnginePartitionIds = storageEngine.getPartitionIds();

for (Integer storageEnginePartitionId: storageEnginePartitionIds) {
if (idealStatePartitionIds.contains(storageEnginePartitionId)) {
continue;
}
storageEngine.dropPartition(storageEnginePartitionId);
}
return null;
};
}

public MetricsRepository getMetricsRepository() {
return metricsRepository;
}
Expand Down
Loading