Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server][dvc] Drop partitions asynchronously #1310

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ public DaVinciBackend(
.map(objectCacheConfig -> new ObjectCacheBackend(clientConfig, objectCacheConfig, schemaRepository));

ingestionService = new KafkaStoreIngestionService(
storageService,
storageService.getStorageEngineRepository(),
kvargha marked this conversation as resolved.
Show resolved Hide resolved
configLoader,
storageMetadataService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ public void dropStoragePartitionGracefully(
final int waitIntervalInSecond = 1;
final int maxRetry = timeoutInSeconds / waitIntervalInSecond;
getStoreIngestionService().stopConsumptionAndWait(storeConfig, partition, waitIntervalInSecond, maxRetry, true);
// Drops corresponding data partition from storage.
this.storageService.dropStorePartition(storeConfig, partition, removeEmptyStorageEngine);
getStoreIngestionService().dropStoragePartitionGracefully(storeConfig, partition);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,7 @@ private void initializeIsolatedIngestionServer() {

// Create KafkaStoreIngestionService
storeIngestionService = new KafkaStoreIngestionService(
storageService,
storageService.getStorageEngineRepository(),
kvargha marked this conversation as resolved.
Show resolved Hide resolved
configLoader,
storageMetadataService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.linkedin.davinci.replication.merge.RmdSerDe;
import com.linkedin.davinci.replication.merge.StringAnnotatedStoreSchemaCache;
import com.linkedin.davinci.stats.AggVersionedIngestionStats;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer;
import com.linkedin.davinci.storage.chunking.RawBytesChunkingAdapter;
import com.linkedin.davinci.storage.chunking.SingleGetChunkingAdapter;
Expand Down Expand Up @@ -103,6 +104,7 @@ private static class ReusableObjects {
private final ThreadLocal<ReusableObjects> threadLocalReusableObjects = ThreadLocal.withInitial(ReusableObjects::new);

public ActiveActiveStoreIngestionTask(
StorageService storageService,
StoreIngestionTaskFactory.Builder builder,
Store store,
Version version,
Expand All @@ -114,6 +116,7 @@ public ActiveActiveStoreIngestionTask(
Optional<ObjectCacheBackend> cacheBackend,
DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) {
super(
storageService,
builder,
store,
version,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* An Enum enumerating all valid types of {@link ConsumerAction}.
*/
public enum ConsumerActionType {
SUBSCRIBE(1), UNSUBSCRIBE(1), RESET_OFFSET(1), PAUSE(1), RESUME(1),
SUBSCRIBE(1), UNSUBSCRIBE(1), RESET_OFFSET(1), PAUSE(1), RESUME(1), DROP_PARTITION(1),
/**
* KILL action has higher priority than others, so that once KILL action is added to the action queue,
* we will process it immediately to avoid doing throw-away works.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.view.VeniceViewWriterFactory;
import com.linkedin.venice.SSLConfig;
Expand Down Expand Up @@ -130,6 +131,7 @@ public class KafkaStoreIngestionService extends AbstractVeniceService implements
private static final String GROUP_ID_FORMAT = "%s_%s";

private static final Logger LOGGER = LogManager.getLogger(KafkaStoreIngestionService.class);
private final StorageService storageService;

private final VeniceConfigLoader veniceConfigLoader;

Expand Down Expand Up @@ -190,6 +192,7 @@ public class KafkaStoreIngestionService extends AbstractVeniceService implements
private final ExecutorService aaWCWorkLoadProcessingThreadPool;

public KafkaStoreIngestionService(
StorageService storageService,
StorageEngineRepository storageEngineRepository,
VeniceConfigLoader veniceConfigLoader,
StorageMetadataService storageMetadataService,
Expand All @@ -212,6 +215,7 @@ public KafkaStoreIngestionService(
PubSubClientsFactory pubSubClientsFactory,
Optional<SSLFactory> sslFactory,
HeartbeatMonitoringService heartbeatMonitoringService) {
this.storageService = storageService;
this.cacheBackend = cacheBackend;
this.recordTransformerFunction = recordTransformerFunction;
this.storageMetadataService = storageMetadataService;
Expand Down Expand Up @@ -519,6 +523,7 @@ private StoreIngestionTask createStoreIngestionTask(
};

return ingestionTaskFactory.getNewIngestionTask(
storageService,
store,
version,
getKafkaConsumerProperties(veniceStoreVersionConfig),
Expand Down Expand Up @@ -920,6 +925,37 @@ public void stopConsumptionAndWait(
}
}

/**
* Drops the corresponding Venice Partition gracefully.
* This should only be called after {@link #stopConsumptionAndWait} has been called
* @param veniceStore Venice Store for the partition.
* @param partitionId Venice partition's id.
*/
public void dropStoragePartitionGracefully(VeniceStoreVersionConfig veniceStore, int partitionId) {
final String topic = veniceStore.getStoreVersionName();

if (isPartitionConsuming(topic, partitionId)) {
throw new VeniceException("Tried to drop storage partition that is still consuming");
Copy link
Contributor

@lluwm lluwm Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exception could cause the ST to be in the ERROR state, is that right? I read function stopConsumptionAndWait and, today, we simply log a warning message if consumption couldn't be stopped in time. This sounds like a behavior change in the new PR and we probably want to be careful about it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

STANDBY->OFFLINE issues an UNSUBSCRIBE message, and so will stopConsumptionAndWait.

By the time SIT processes the DROP_PARTITION message, it should have been already unsubscribed.

I think it's safe to remove this check. What do you think?

}

try (AutoCloseableLock ignore = topicLockManager.getLockForResource(topic)) {
StoreIngestionTask ingestionTask = topicNameToIngestionTaskMap.get(topic);
if (ingestionTask != null && ingestionTask.isRunning()) {
Copy link
Contributor

@lluwm lluwm Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking of a race condition that after we add DROP_PARTITION actions to the queue, then SIT terminates due to some exceptions (as we see several cases today) before executing all the remaining actions from the queue and it could probably cause some partition leaks. If this race is possible, we probably need to add some logic in the SIT to make sure that all DROP_PARTITION actions have to be executed before it can terminate itself, or maybe some other measures to avoid it.

LOGGER.info(
"Ingestion task is still running for Topic {}. Dropping partition {} asynchronously",
veniceStore.getStoreVersionName(),
partitionId);
ingestionTask.dropPartition(new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topic), partitionId));
} else {
LOGGER.info(
"Ingestion task isn't running for Topic {}. Dropping partition {} synchronously",
veniceStore.getStoreVersionName(),
partitionId);
this.storageService.dropStorePartition(veniceStore, partitionId, true);
}
}
}

/**
* This function will try to kill the ingestion tasks belonging to non-current versions.
* And this is mainly being used by memory limiter feature to free up resources when encountering memory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.linkedin.davinci.schema.merge.CollectionTimestampMergeRecordHelper;
import com.linkedin.davinci.schema.merge.MergeRecordHelper;
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer;
import com.linkedin.davinci.storage.chunking.GenericRecordChunkingAdapter;
import com.linkedin.davinci.store.AbstractStorageEngine;
Expand Down Expand Up @@ -204,6 +205,7 @@ public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask {
private final Version version;

public LeaderFollowerStoreIngestionTask(
StorageService storageService,
StoreIngestionTaskFactory.Builder builder,
Store store,
Version version,
Expand All @@ -215,6 +217,7 @@ public LeaderFollowerStoreIngestionTask(
Optional<ObjectCacheBackend> cacheBackend,
DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) {
super(
storageService,
builder,
store,
version,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.linkedin.davinci.ingestion.LagType.OFFSET_LAG;
import static com.linkedin.davinci.ingestion.LagType.TIME_LAG;
import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.DROP_PARTITION;
import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.RESET_OFFSET;
import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.SUBSCRIBE;
import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.UNSUBSCRIBE;
Expand Down Expand Up @@ -34,6 +35,7 @@
import com.linkedin.davinci.stats.HostLevelIngestionStats;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.StoragePartitionConfig;
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
Expand Down Expand Up @@ -189,6 +191,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

/** storage destination for consumption */
protected final StorageService storageService;
protected final StorageEngineRepository storageEngineRepository;
protected final AbstractStorageEngine storageEngine;

Expand Down Expand Up @@ -344,6 +347,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
protected final ExecutorService parallelProcessingThreadPool;

public StoreIngestionTask(
StorageService storageService,
StoreIngestionTaskFactory.Builder builder,
Store store,
Version version,
Expand All @@ -361,6 +365,7 @@ public StoreIngestionTask(
this.databaseSyncBytesIntervalForTransactionalMode = storeConfig.getDatabaseSyncBytesIntervalForTransactionalMode();
this.databaseSyncBytesIntervalForDeferredWriteMode = storeConfig.getDatabaseSyncBytesIntervalForDeferredWriteMode();
this.kafkaProps = kafkaConsumerProperties;
this.storageService = storageService;
this.storageEngineRepository = builder.getStorageEngineRepository();
this.storageMetadataService = builder.getStorageMetadataService();
this.storeRepository = builder.getMetadataRepo();
Expand Down Expand Up @@ -630,6 +635,17 @@ public synchronized CompletableFuture<Void> unSubscribePartition(
return consumerAction.getFuture();
}

/**
*
* Adds an asynchronous partition drop request for the task.
* This is always a Helix triggered action
*/
public synchronized void dropPartition(PubSubTopicPartition topicPartition) {
throwIfNotRunning();
ConsumerAction consumerAction = new ConsumerAction(DROP_PARTITION, topicPartition, nextSeqNum(), true);
consumerActionsQueue.add(consumerAction);
}

public boolean hasAnySubscription() {
return !partitionConsumptionStateMap.isEmpty();
}
Expand Down Expand Up @@ -2135,6 +2151,11 @@ protected void processCommonConsumerAction(ConsumerAction consumerAction) throws
LOGGER.info("Kill this consumer task for Topic: {}", topic);
// Throw the exception here to break the consumption loop, and then this task is marked as error status.
throw new VeniceIngestionTaskKilledException(KILLED_JOB_MESSAGE + topic);
case DROP_PARTITION:
LOGGER.info("{} Dropping partition: {}", ingestionTaskName, topicPartition);
this.storageService.dropStorePartition(storeConfig, partition, true);
LOGGER.info("{} Dropped partition: {}", ingestionTaskName, topicPartition);
break;
default:
throw new UnsupportedOperationException(operation.name() + " is not supported in " + getClass().getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.view.VeniceViewWriterFactory;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
Expand Down Expand Up @@ -43,6 +44,7 @@ private StoreIngestionTaskFactory(Builder builder) {
}

public StoreIngestionTask getNewIngestionTask(
StorageService storageService,
Store store,
Version version,
Properties kafkaConsumerProperties,
Expand All @@ -54,6 +56,7 @@ public StoreIngestionTask getNewIngestionTask(
DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) {
if (version.isActiveActiveReplicationEnabled()) {
return new ActiveActiveStoreIngestionTask(
storageService,
builder,
store,
version,
Expand All @@ -66,6 +69,7 @@ public StoreIngestionTask getNewIngestionTask(
recordTransformerFunction);
}
return new LeaderFollowerStoreIngestionTask(
storageService,
builder,
store,
version,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.linkedin.davinci.stats.AggVersionedIngestionStats;
import com.linkedin.davinci.stats.HostLevelIngestionStats;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer;
import com.linkedin.davinci.storage.chunking.ChunkingUtils;
import com.linkedin.davinci.store.AbstractStorageEngine;
Expand Down Expand Up @@ -231,6 +232,7 @@ public void testisReadyToServeAnnouncedWithRTLag() {
Version mockVersion = new VersionImpl(STORE_NAME, 1, PUSH_JOB_ID);
mockVersion.setHybridStoreConfig(hybridStoreConfig);

StorageService storageService = mock(StorageService.class);
Store store = new ZKStore(
STORE_NAME,
"Felix",
Expand All @@ -250,6 +252,7 @@ public void testisReadyToServeAnnouncedWithRTLag() {
VeniceStoreVersionConfig storeVersionConfig =
new VeniceStoreVersionConfig(STORE_NAME + "_v1", new VeniceProperties(kafkaConsumerProperties));
ActiveActiveStoreIngestionTask ingestionTask = new ActiveActiveStoreIngestionTask(
storageService,
builder,
store,
mockVersion,
Expand Down
Loading