Skip to content

Commit

Permalink
[controller][compat] Controller part change for supporting separate r…
Browse files Browse the repository at this point in the history
…eal-time topic functionality for hybrid stores. (#1172)

This change add store version level config separateRealTimeTopicEnabled to allow creating separate real-time topic for isolate bulk real-time traffic in short time range from incremental push. There is cluster level config for enabling this config: ENABLE_SEPARATE_REAL_TIME_TOPIC_FOR_STORE_WITH_INCREMENTAL_PUSH for all newly converted hybrid store with incremental push enabled.

Creation: it will be automatically created by checking whether incremental push and separateRealTimeTopicEnabled is enabled when hybrid enabled store version is added and normal real-time topic is created.
Deletion: separate real time topic will be cleaned up when normal real-time topic is deleted due to all hybrid store version has been deleted.
Added Integration test to see if incremental push job would send expected traffic to the separate real-time topic after new store is turned this feature.

PS: skip the flaky test: testDaVinciMemoryLimitShouldFailLargeDataPushAndResumeHybridStore.

Co-authored-by: Hao Xu <xhao@xhao-mn3.linkedin.biz>
  • Loading branch information
haoxu07 and Hao Xu committed Sep 25, 2024
1 parent 1df15f8 commit 939d655
Show file tree
Hide file tree
Showing 27 changed files with 1,867 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,7 @@ static UpdateStoreQueryParams getUpdateStoreQueryParams(CommandLine cmd) {
integerParam(cmd, Arg.BATCH_GET_LIMIT, p -> params.setBatchGetLimit(p), argSet);
integerParam(cmd, Arg.NUM_VERSIONS_TO_PRESERVE, p -> params.setNumVersionsToPreserve(p), argSet);
booleanParam(cmd, Arg.INCREMENTAL_PUSH_ENABLED, p -> params.setIncrementalPushEnabled(p), argSet);
booleanParam(cmd, Arg.SEPARATE_REALTIME_TOPIC_ENABLED, p -> params.setSeparateRealTimeTopicEnabled(p), argSet);
booleanParam(cmd, Arg.WRITE_COMPUTATION_ENABLED, p -> params.setWriteComputationEnabled(p), argSet);
booleanParam(cmd, Arg.READ_COMPUTATION_ENABLED, p -> params.setReadComputationEnabled(p), argSet);
integerParam(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ public enum Arg {
),
INCREMENTAL_PUSH_ENABLED(
"incremental-push-enabled", "ipe", true, "a flag to see if the store supports incremental push or not"
),
SEPARATE_REALTIME_TOPIC_ENABLED(
"separate-realtime-topic-enabled", "srte", true,
"a flag to see if the store supports separate real-time topic or not"
), BATCH_GET_LIMIT("batch-get-limit", "bgl", true, "Key number limit inside one batch-get request"),
NUM_VERSIONS_TO_PRESERVE("num-versions-to-preserve", "nvp", true, "Number of version that store should preserve."),
KAFKA_BOOTSTRAP_SERVERS("kafka-bootstrap-servers", "kbs", true, "Kafka bootstrap server URL(s)"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import static com.linkedin.venice.Arg.REPLICATION_FACTOR;
import static com.linkedin.venice.Arg.RETRY;
import static com.linkedin.venice.Arg.RMD_CHUNKING_ENABLED;
import static com.linkedin.venice.Arg.SEPARATE_REALTIME_TOPIC_ENABLED;
import static com.linkedin.venice.Arg.SERVER_KAFKA_FETCH_QUOTA_RECORDS_PER_SECOND;
import static com.linkedin.venice.Arg.SERVER_URL;
import static com.linkedin.venice.Arg.SKIP_DIV;
Expand Down Expand Up @@ -267,7 +268,7 @@ public enum Command {
ACTIVE_ACTIVE_REPLICATION_ENABLED, REGIONS_FILTER, DISABLE_META_STORE, DISABLE_DAVINCI_PUSH_STATUS_STORE,
STORAGE_PERSONA, STORE_VIEW_CONFIGS, LATEST_SUPERSET_SCHEMA_ID, MIN_COMPACTION_LAG_SECONDS,
MAX_COMPACTION_LAG_SECONDS, MAX_RECORD_SIZE_BYTES, MAX_NEARLINE_RECORD_SIZE_BYTES,
UNUSED_SCHEMA_DELETION_ENABLED, BLOB_TRANSFER_ENABLED }
UNUSED_SCHEMA_DELETION_ENABLED, BLOB_TRANSFER_ENABLED, SEPARATE_REALTIME_TOPIC_ENABLED }
),
UPDATE_CLUSTER_CONFIG(
"update-cluster-config", "Update live cluster configs", new Arg[] { URL, CLUSTER },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,14 @@ private ConfigKeys() {
*/
public static final String ENABLE_INCREMENTAL_PUSH_FOR_HYBRID_ACTIVE_ACTIVE_USER_STORES =
"enable.incremental.push.for.hybrid.active.active.user.stores";

/**
* We will use this config to determine whether we should enable separate real-time topic for incremental push enabled stores.
* If this config is set to true, we will enable separate real-time topic for incremental push enabled stores.
*/
public static final String ENABLE_SEPARATE_REAL_TIME_TOPIC_FOR_STORE_WITH_INCREMENTAL_PUSH =
"enable.separate.real.time.topic.for.store.with.incremental.push";

/**
* We will use this config to determine whether we should enable partial update for hybrid active-active user stores.
* If this config is set to true, we will enable partial update for hybrid active-active user stores whose latest value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class ControllerApiConstants {
public static final String CHUNKING_ENABLED = "chunking_enabled";
public static final String RMD_CHUNKING_ENABLED = "rmd_chunking_enabled";
public static final String INCREMENTAL_PUSH_ENABLED = "incremental_push_enabled";
public static final String SEPARATE_REAL_TIME_TOPIC_ENABLED = "separate_realtime_topic_enabled";
public static final String SINGLE_GET_ROUTER_CACHE_ENABLED = "single_get_router_cache_enabled";
public static final String BATCH_GET_ROUTER_CACHE_ENABLED = "batch_get_router_cache_enabled";
public static final String BATCH_GET_LIMIT = "batch_get_limit";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.REPLICATION_METADATA_PROTOCOL_VERSION_ID;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.REWIND_TIME_IN_SECONDS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.RMD_CHUNKING_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.SEPARATE_REAL_TIME_TOPIC_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORAGE_NODE_READ_QUOTA_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORAGE_QUOTA_IN_BYTE;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_MIGRATION;
Expand Down Expand Up @@ -406,6 +407,14 @@ public Optional<Boolean> getIncrementalPushEnabled() {
return getBoolean(INCREMENTAL_PUSH_ENABLED);
}

public UpdateStoreQueryParams setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled) {
return putBoolean(SEPARATE_REAL_TIME_TOPIC_ENABLED, separateRealTimeTopicEnabled);
}

public Optional<Boolean> getSeparateRealTimeTopicEnabled() {
return getBoolean(SEPARATE_REAL_TIME_TOPIC_ENABLED);
}

public UpdateStoreQueryParams setBatchGetLimit(int batchGetLimit) {
return putInteger(BATCH_GET_LIMIT, batchGetLimit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ private void addVersion(Version version, boolean checkDisableWrite, boolean isCl

version.setIncrementalPushEnabled(isIncrementalPushEnabled());

version.setSeparateRealTimeTopicEnabled(isSeparateRealTimeTopicEnabled());

version.setBlobTransferEnabled(isBlobTransferEnabled());

version.setUseVersionLevelIncrementalPushEnabled(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,16 @@ public void setIncrementalPushEnabled(boolean incrementalPushEnabled) {
throw new UnsupportedOperationException();
}

@Override
public boolean isSeparateRealTimeTopicEnabled() {
return this.delegate.isSeparateRealTimeTopicEnabled();
}

@Override
public void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled) {
throw new UnsupportedOperationException();
}

@Override
public boolean isBlobTransferEnabled() {
return this.delegate.isBlobTransferEnabled();
Expand Down Expand Up @@ -955,6 +965,16 @@ public void setIncrementalPushEnabled(boolean incrementalPushEnabled) {
throw new UnsupportedOperationException();
}

@Override
public boolean isSeparateRealTimeTopicEnabled() {
return this.delegate.isSeparateRealTimeTopicEnabled();
}

@Override
public void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled) {
throw new UnsupportedOperationException();
}

@Override
public boolean isAccessControlled() {
return this.delegate.isAccessControlled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ static boolean isSystemStore(String storeName) {

void setIncrementalPushEnabled(boolean incrementalPushEnabled);

boolean isSeparateRealTimeTopicEnabled();

void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled);

boolean isAccessControlled();

void setAccessControlled(boolean accessControlled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,16 @@ public void setIncrementalPushEnabled(boolean incrementalPushEnabled) {
throwUnsupportedOperationException("setIncrementalPushEnabled");
}

@Override
public boolean isSeparateRealTimeTopicEnabled() {
return zkSharedStore.isSeparateRealTimeTopicEnabled();
}

@Override
public void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled) {
throwUnsupportedOperationException("setSeparateRealTimeTopicEnabled");
}

@Override
public boolean isAccessControlled() {
return zkSharedStore.isAccessControlled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public interface Version extends Comparable<Version>, DataModelBackedStructure<S
String VERSION_SEPARATOR = "_v";
String REAL_TIME_TOPIC_SUFFIX = "_rt";
String STREAM_REPROCESSING_TOPIC_SUFFIX = "_sr";

String SEPARATE_REAL_TIME_TOPIC_SUFFIX = "_rt_sep";
/**
* Special number indicating no replication metadata version is set.
*/
Expand Down Expand Up @@ -162,6 +162,10 @@ default void setLeaderFollowerModelEnabled(boolean leaderFollowerModelEnabled) {

void setIncrementalPushEnabled(boolean incrementalPushEnabled);

boolean isSeparateRealTimeTopicEnabled();

void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled);

boolean isBlobTransferEnabled();

void setBlobTransferEnabled(boolean blobTransferEnabled);
Expand Down Expand Up @@ -289,6 +293,10 @@ static String composeRealTimeTopic(String storeName) {
return storeName + REAL_TIME_TOPIC_SUFFIX;
}

static String composeSeparateRealTimeTopic(String storeName) {
return storeName + SEPARATE_REAL_TIME_TOPIC_SUFFIX;
}

static String composeStreamReprocessingTopic(String storeName, int versionNumber) {
return composeKafkaTopic(storeName, versionNumber) + STREAM_REPROCESSING_TOPIC_SUFFIX;
}
Expand All @@ -308,7 +316,10 @@ static String parseStoreFromRealTimeTopic(String kafkaTopic) {
if (!isRealTimeTopic(kafkaTopic)) {
throw new VeniceException("Kafka topic: " + kafkaTopic + " is not a real-time topic");
}
return kafkaTopic.substring(0, kafkaTopic.length() - REAL_TIME_TOPIC_SUFFIX.length());
if (kafkaTopic.endsWith(REAL_TIME_TOPIC_SUFFIX)) {
return kafkaTopic.substring(0, kafkaTopic.length() - REAL_TIME_TOPIC_SUFFIX.length());
}
return kafkaTopic.substring(0, kafkaTopic.length() - SEPARATE_REAL_TIME_TOPIC_SUFFIX.length());
}

static String parseStoreFromStreamReprocessingTopic(String kafkaTopic) {
Expand Down Expand Up @@ -337,7 +348,7 @@ static String parseStoreFromKafkaTopicName(String kafkaTopic) {
}

static boolean isRealTimeTopic(String kafkaTopic) {
return kafkaTopic.endsWith(REAL_TIME_TOPIC_SUFFIX);
return kafkaTopic.endsWith(REAL_TIME_TOPIC_SUFFIX) || kafkaTopic.endsWith(SEPARATE_REAL_TIME_TOPIC_SUFFIX);
}

static boolean isStreamReprocessingTopic(String kafkaTopic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,16 @@ public void setIncrementalPushEnabled(boolean incrementalPushEnabled) {
this.storeVersion.incrementalPushEnabled = incrementalPushEnabled;
}

@Override
public boolean isSeparateRealTimeTopicEnabled() {
return storeVersion.separateRealTimeTopicEnabled;
}

@Override
public void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled) {
this.storeVersion.setSeparateRealTimeTopicEnabled(separateRealTimeTopicEnabled);
}

@Override
public boolean isBlobTransferEnabled() {
return this.storeVersion.blobTransferEnabled;
Expand Down Expand Up @@ -450,6 +460,7 @@ public Version cloneVersion() {
clonedVersion.setReplicationFactor(getReplicationFactor());
clonedVersion.setNativeReplicationSourceFabric(getNativeReplicationSourceFabric());
clonedVersion.setIncrementalPushEnabled(isIncrementalPushEnabled());
clonedVersion.setSeparateRealTimeTopicEnabled(isSeparateRealTimeTopicEnabled());
clonedVersion.setUseVersionLevelIncrementalPushEnabled(isUseVersionLevelIncrementalPushEnabled());
clonedVersion.setHybridStoreConfig(getHybridStoreConfig());
clonedVersion.setUseVersionLevelHybridConfig(isUseVersionLevelHybridConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ public ZKStore(Store store) {
setBatchGetLimit(store.getBatchGetLimit());
setNumVersionsToPreserve(store.getNumVersionsToPreserve());
setIncrementalPushEnabled(store.isIncrementalPushEnabled());
setSeparateRealTimeTopicEnabled(store.isSeparateRealTimeTopicEnabled());
setLargestUsedVersionNumber(store.getLargestUsedVersionNumber());
setMigrating(store.isMigrating());
setWriteComputationEnabled(store.isWriteComputationEnabled());
Expand Down Expand Up @@ -544,6 +545,16 @@ public void setIncrementalPushEnabled(boolean incrementalPushEnabled) {
this.storeProperties.incrementalPushEnabled = incrementalPushEnabled;
}

@Override
public boolean isSeparateRealTimeTopicEnabled() {
return this.storeProperties.separateRealTimeTopicEnabled;
}

@Override
public void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled) {
this.storeProperties.separateRealTimeTopicEnabled = separateRealTimeTopicEnabled;
}

/**
* @deprecated The store level accessControlled flag is no longer valid to be used to skip ACL checks.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public enum AvroProtocolDefinition {
*
* TODO: Move AdminOperation to venice-common module so that we can properly reference it here.
*/
ADMIN_OPERATION(80, SpecificData.get().getSchema(ByteBuffer.class), "AdminOperation"),
ADMIN_OPERATION(81, SpecificData.get().getSchema(ByteBuffer.class), "AdminOperation"),

/**
* Single chunk of a large multi-chunk value. Just a bunch of bytes.
Expand Down Expand Up @@ -143,7 +143,7 @@ public enum AvroProtocolDefinition {
/**
* Value schema for metadata system store.
*/
METADATA_SYSTEM_SCHEMA_STORE(23, StoreMetaValue.class),
METADATA_SYSTEM_SCHEMA_STORE(24, StoreMetaValue.class),

/**
* Key schema for push status system store.
Expand Down
Loading

0 comments on commit 939d655

Please sign in to comment.