From 939d655324d757c790092d3158dbcbabbb73a528 Mon Sep 17 00:00:00 2001 From: Hao Xu Date: Tue, 24 Sep 2024 18:41:25 -0700 Subject: [PATCH] [controller][compat] Controller part change for supporting separate real-time topic functionality for hybrid stores. (#1172) This change add store version level config separateRealTimeTopicEnabled to allow creating separate real-time topic for isolate bulk real-time traffic in short time range from incremental push. There is cluster level config for enabling this config: ENABLE_SEPARATE_REAL_TIME_TOPIC_FOR_STORE_WITH_INCREMENTAL_PUSH for all newly converted hybrid store with incremental push enabled. Creation: it will be automatically created by checking whether incremental push and separateRealTimeTopicEnabled is enabled when hybrid enabled store version is added and normal real-time topic is created. Deletion: separate real time topic will be cleaned up when normal real-time topic is deleted due to all hybrid store version has been deleted. Added Integration test to see if incremental push job would send expected traffic to the separate real-time topic after new store is turned this feature. PS: skip the flaky test: testDaVinciMemoryLimitShouldFailLargeDataPushAndResumeHybridStore. Co-authored-by: Hao Xu --- .../java/com/linkedin/venice/AdminTool.java | 1 + .../main/java/com/linkedin/venice/Arg.java | 4 + .../java/com/linkedin/venice/Command.java | 3 +- .../java/com/linkedin/venice/ConfigKeys.java | 8 + .../controllerapi/ControllerApiConstants.java | 1 + .../controllerapi/UpdateStoreQueryParams.java | 9 + .../linkedin/venice/meta/AbstractStore.java | 2 + .../linkedin/venice/meta/ReadOnlyStore.java | 20 + .../java/com/linkedin/venice/meta/Store.java | 4 + .../com/linkedin/venice/meta/SystemStore.java | 10 + .../com/linkedin/venice/meta/Version.java | 17 +- .../com/linkedin/venice/meta/VersionImpl.java | 11 + .../com/linkedin/venice/meta/ZKStore.java | 11 + .../avro/AvroProtocolDefinition.java | 4 +- .../StoreMetaValue/v24/StoreMetaValue.avsc | 399 ++++++ ...VeniceHelixAdminWithSharedEnvironment.java | 12 +- .../DaVinciClientMemoryLimitTest.java | 1 - ...TestActiveActiveReplicationForIncPush.java | 127 +- .../com/linkedin/venice/controller/Admin.java | 2 + .../VeniceControllerClusterConfig.java | 13 + .../venice/controller/VeniceHelixAdmin.java | 52 +- .../controller/VeniceParentHelixAdmin.java | 18 + .../kafka/consumer/AdminExecutionTask.java | 1 + .../controller/server/CreateVersion.java | 7 +- .../control/RealTimeTopicSwitcher.java | 36 +- .../AdminOperation/v81/AdminOperation.avsc | 1134 +++++++++++++++++ .../controller/server/CreateVersionTest.java | 11 +- 27 files changed, 1867 insertions(+), 51 deletions(-) create mode 100644 internal/venice-common/src/main/resources/avro/StoreMetaValue/v24/StoreMetaValue.avsc create mode 100644 services/venice-controller/src/main/resources/avro/AdminOperation/v81/AdminOperation.avsc diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java index 31abdd397b..49a3fda801 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java @@ -1116,6 +1116,7 @@ static UpdateStoreQueryParams getUpdateStoreQueryParams(CommandLine cmd) { integerParam(cmd, Arg.BATCH_GET_LIMIT, p -> params.setBatchGetLimit(p), argSet); integerParam(cmd, Arg.NUM_VERSIONS_TO_PRESERVE, p -> params.setNumVersionsToPreserve(p), argSet); booleanParam(cmd, Arg.INCREMENTAL_PUSH_ENABLED, p -> params.setIncrementalPushEnabled(p), argSet); + booleanParam(cmd, Arg.SEPARATE_REALTIME_TOPIC_ENABLED, p -> params.setSeparateRealTimeTopicEnabled(p), argSet); booleanParam(cmd, Arg.WRITE_COMPUTATION_ENABLED, p -> params.setWriteComputationEnabled(p), argSet); booleanParam(cmd, Arg.READ_COMPUTATION_ENABLED, p -> params.setReadComputationEnabled(p), argSet); integerParam( diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java index 89e48d6f4a..c9b7c0940d 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java @@ -100,6 +100,10 @@ public enum Arg { ), INCREMENTAL_PUSH_ENABLED( "incremental-push-enabled", "ipe", true, "a flag to see if the store supports incremental push or not" + ), + SEPARATE_REALTIME_TOPIC_ENABLED( + "separate-realtime-topic-enabled", "srte", true, + "a flag to see if the store supports separate real-time topic or not" ), BATCH_GET_LIMIT("batch-get-limit", "bgl", true, "Key number limit inside one batch-get request"), NUM_VERSIONS_TO_PRESERVE("num-versions-to-preserve", "nvp", true, "Number of version that store should preserve."), KAFKA_BOOTSTRAP_SERVERS("kafka-bootstrap-servers", "kbs", true, "Kafka bootstrap server URL(s)"), diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java index b22a9fc3c7..cc10836b08 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java @@ -99,6 +99,7 @@ import static com.linkedin.venice.Arg.REPLICATION_FACTOR; import static com.linkedin.venice.Arg.RETRY; import static com.linkedin.venice.Arg.RMD_CHUNKING_ENABLED; +import static com.linkedin.venice.Arg.SEPARATE_REALTIME_TOPIC_ENABLED; import static com.linkedin.venice.Arg.SERVER_KAFKA_FETCH_QUOTA_RECORDS_PER_SECOND; import static com.linkedin.venice.Arg.SERVER_URL; import static com.linkedin.venice.Arg.SKIP_DIV; @@ -267,7 +268,7 @@ public enum Command { ACTIVE_ACTIVE_REPLICATION_ENABLED, REGIONS_FILTER, DISABLE_META_STORE, DISABLE_DAVINCI_PUSH_STATUS_STORE, STORAGE_PERSONA, STORE_VIEW_CONFIGS, LATEST_SUPERSET_SCHEMA_ID, MIN_COMPACTION_LAG_SECONDS, MAX_COMPACTION_LAG_SECONDS, MAX_RECORD_SIZE_BYTES, MAX_NEARLINE_RECORD_SIZE_BYTES, - UNUSED_SCHEMA_DELETION_ENABLED, BLOB_TRANSFER_ENABLED } + UNUSED_SCHEMA_DELETION_ENABLED, BLOB_TRANSFER_ENABLED, SEPARATE_REALTIME_TOPIC_ENABLED } ), UPDATE_CLUSTER_CONFIG( "update-cluster-config", "Update live cluster configs", new Arg[] { URL, CLUSTER }, diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index e25fc8fa0e..9960a61097 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -1192,6 +1192,14 @@ private ConfigKeys() { */ public static final String ENABLE_INCREMENTAL_PUSH_FOR_HYBRID_ACTIVE_ACTIVE_USER_STORES = "enable.incremental.push.for.hybrid.active.active.user.stores"; + + /** + * We will use this config to determine whether we should enable separate real-time topic for incremental push enabled stores. + * If this config is set to true, we will enable separate real-time topic for incremental push enabled stores. + */ + public static final String ENABLE_SEPARATE_REAL_TIME_TOPIC_FOR_STORE_WITH_INCREMENTAL_PUSH = + "enable.separate.real.time.topic.for.store.with.incremental.push"; + /** * We will use this config to determine whether we should enable partial update for hybrid active-active user stores. * If this config is set to true, we will enable partial update for hybrid active-active user stores whose latest value diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java index d6caeed561..e647ddd3e7 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java @@ -56,6 +56,7 @@ public class ControllerApiConstants { public static final String CHUNKING_ENABLED = "chunking_enabled"; public static final String RMD_CHUNKING_ENABLED = "rmd_chunking_enabled"; public static final String INCREMENTAL_PUSH_ENABLED = "incremental_push_enabled"; + public static final String SEPARATE_REAL_TIME_TOPIC_ENABLED = "separate_realtime_topic_enabled"; public static final String SINGLE_GET_ROUTER_CACHE_ENABLED = "single_get_router_cache_enabled"; public static final String BATCH_GET_ROUTER_CACHE_ENABLED = "batch_get_router_cache_enabled"; public static final String BATCH_GET_LIMIT = "batch_get_limit"; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java index 71573848dd..168d2ee27e 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java @@ -50,6 +50,7 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.REPLICATION_METADATA_PROTOCOL_VERSION_ID; import static com.linkedin.venice.controllerapi.ControllerApiConstants.REWIND_TIME_IN_SECONDS; import static com.linkedin.venice.controllerapi.ControllerApiConstants.RMD_CHUNKING_ENABLED; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.SEPARATE_REAL_TIME_TOPIC_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORAGE_NODE_READ_QUOTA_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORAGE_QUOTA_IN_BYTE; import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_MIGRATION; @@ -406,6 +407,14 @@ public Optional getIncrementalPushEnabled() { return getBoolean(INCREMENTAL_PUSH_ENABLED); } + public UpdateStoreQueryParams setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled) { + return putBoolean(SEPARATE_REAL_TIME_TOPIC_ENABLED, separateRealTimeTopicEnabled); + } + + public Optional getSeparateRealTimeTopicEnabled() { + return getBoolean(SEPARATE_REAL_TIME_TOPIC_ENABLED); + } + public UpdateStoreQueryParams setBatchGetLimit(int batchGetLimit) { return putInteger(BATCH_GET_LIMIT, batchGetLimit); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java index d3c0a21934..62c318899d 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java @@ -152,6 +152,8 @@ private void addVersion(Version version, boolean checkDisableWrite, boolean isCl version.setIncrementalPushEnabled(isIncrementalPushEnabled()); + version.setSeparateRealTimeTopicEnabled(isSeparateRealTimeTopicEnabled()); + version.setBlobTransferEnabled(isBlobTransferEnabled()); version.setUseVersionLevelIncrementalPushEnabled(true); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java index f393701010..1679bb53b7 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java @@ -483,6 +483,16 @@ public void setIncrementalPushEnabled(boolean incrementalPushEnabled) { throw new UnsupportedOperationException(); } + @Override + public boolean isSeparateRealTimeTopicEnabled() { + return this.delegate.isSeparateRealTimeTopicEnabled(); + } + + @Override + public void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled) { + throw new UnsupportedOperationException(); + } + @Override public boolean isBlobTransferEnabled() { return this.delegate.isBlobTransferEnabled(); @@ -955,6 +965,16 @@ public void setIncrementalPushEnabled(boolean incrementalPushEnabled) { throw new UnsupportedOperationException(); } + @Override + public boolean isSeparateRealTimeTopicEnabled() { + return this.delegate.isSeparateRealTimeTopicEnabled(); + } + + @Override + public void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled) { + throw new UnsupportedOperationException(); + } + @Override public boolean isAccessControlled() { return this.delegate.isAccessControlled(); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java index af27977609..75eded6878 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java @@ -150,6 +150,10 @@ static boolean isSystemStore(String storeName) { void setIncrementalPushEnabled(boolean incrementalPushEnabled); + boolean isSeparateRealTimeTopicEnabled(); + + void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled); + boolean isAccessControlled(); void setAccessControlled(boolean accessControlled); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java index c56691e058..ace64de3c1 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java @@ -339,6 +339,16 @@ public void setIncrementalPushEnabled(boolean incrementalPushEnabled) { throwUnsupportedOperationException("setIncrementalPushEnabled"); } + @Override + public boolean isSeparateRealTimeTopicEnabled() { + return zkSharedStore.isSeparateRealTimeTopicEnabled(); + } + + @Override + public void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled) { + throwUnsupportedOperationException("setSeparateRealTimeTopicEnabled"); + } + @Override public boolean isAccessControlled() { return zkSharedStore.isAccessControlled(); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java index e23d5a1466..0b4ba9f5f0 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java @@ -25,7 +25,7 @@ public interface Version extends Comparable, DataModelBackedStructure REWIND_FROM_EOP (replay from 'EOP - rewindTimeInSeconds'), 1 => REWIND_FROM_SOP (replay from 'SOP - rewindTimeInSeconds')", + "default": 0 + } + ] + } + ], + "default": null + }, + { + "name": "views", + "doc": "A map of views which describe and configure a downstream view of a venice store. Keys in this map are for convenience of managing configs.", + "type": { + "type":"map", + "values": { + "name": "StoreViewConfig", + "type": "record", + "doc": "A configuration for a particular view. This config should inform Venice leaders how to transform and transmit data to destination views.", + "fields": [ + { + "name": "viewClassName", + "type": "string", + "doc": "This informs what kind of view we are materializing. This then informs what kind of parameters are passed to parse this input. This is expected to be a fully formed class path name for materialization.", + "default": "" + }, + { + "name": "viewParameters", + "doc": "Optional parameters to be passed to the given view config.", + "type": ["null", + { + "type": "map", + "java-key-class": "java.lang.String", + "avro.java.string": "String", + "values": { "type": "string", "avro.java.string": "String" } + } + ], + "default": null + } + ] + } + }, + "default": {} + }, + {"name": "accessControlled", "type": "boolean", "default": true, "doc": "Store-level ACL switch. When disabled, Venice Router should accept every request."}, + {"name": "compressionStrategy", "type": "int", "default": 0, "doc": "Strategy used to compress/decompress Record's value, and default is 'NO_OP'"}, + {"name": "clientDecompressionEnabled", "type": "boolean", "default": true, "doc": "le/Disable client-side record decompression (default: true)"}, + {"name": "chunkingEnabled", "type": "boolean", "default": false, "doc": "Whether current store supports large value (typically more than 1MB). By default, the chunking feature is disabled."}, + {"name": "rmdChunkingEnabled", "type": "boolean", "default": false, "doc": "Whether current store supports large replication metadata (typically more than 1MB). By default, the chunking feature is disabled."}, + {"name": "batchGetLimit", "type": "int", "default": -1, "doc": "Batch get key number limit, and Venice will use cluster-level config if it is not positive."}, + {"name": "numVersionsToPreserve", "type": "int", "default": 0, "doc": "How many versions this store preserve at most. By default it's 0 means we use the cluster level config to determine how many version is preserved."}, + {"name": "incrementalPushEnabled", "type": "boolean", "default": false, "doc": "Flag to see if the store supports incremental push or not"}, + {"name": "separateRealTimeTopicEnabled", "type": "boolean", "default": false, "doc": "Flag to see if the store supports separate real-time topic for incremental push."}, + {"name": "migrating", "type": "boolean", "default": false, "doc": "Whether or not the store is in the process of migration."}, + {"name": "writeComputationEnabled", "type": "boolean", "default": false, "doc": "Whether or not write-path computation feature is enabled for this store."}, + {"name": "readComputationEnabled", "type": "boolean", "default": false, "doc": "Whether read-path computation is enabled for this store."}, + {"name": "bootstrapToOnlineTimeoutInHours", "type": "int", "default": 24, "doc": "Maximum number of hours allowed for the store to transition from bootstrap to online state."}, + {"name": "leaderFollowerModelEnabled", "type": "boolean", "default": false, "doc": "Whether or not to use leader follower state transition model for upcoming version."}, + {"name": "nativeReplicationEnabled", "type": "boolean", "default": false, "doc": "Whether or not native should be enabled for this store. Will only successfully apply if leaderFollowerModelEnabled is also true either in this update or a previous version of the store."}, + {"name": "replicationMetadataVersionID", "type": "int", "default": -1, "doc": "RMD (Replication metadata) version ID on the store-level. Default -1 means NOT_SET and the cluster-level RMD version ID should be used for stores."}, + {"name": "pushStreamSourceAddress", "type": "string", "default": "", "doc": "Address to the kafka broker which holds the source of truth topic for this store version."}, + {"name": "backupStrategy", "type": "int", "default": 1, "doc": "Strategies to store backup versions, and default is 'DELETE_ON_NEW_PUSH_START'"}, + {"name": "schemaAutoRegisteFromPushJobEnabled", "type": "boolean", "default": false, "doc": "Whether or not value schema auto registration enabled from push job for this store."}, + {"name": "latestSuperSetValueSchemaId", "type": "int", "default": -1, "doc": "For read compute stores with auto super-set schema enabled, stores the latest super-set value schema ID."}, + {"name": "hybridStoreDiskQuotaEnabled", "type": "boolean", "default": false, "doc": "Whether or not storage disk quota is enabled for a hybrid store. This store config cannot be enabled until the routers and servers in the corresponding cluster are upgraded to the right version: 0.2.249 or above for routers and servers."}, + {"name": "storeMetadataSystemStoreEnabled", "type": "boolean", "default": false, "doc": "Whether or not the store metadata system store is enabled for this store."}, + { + "name": "etlConfig", + "doc": "Properties related to ETL Store behavior.", + "type": [ + "null", + { + "name": "StoreETLConfig", + "type": "record", + "fields": [ + {"name": "etledUserProxyAccount", "type": "string", "doc": "If enabled regular ETL or future version ETL, this account name is part of path for where the ETLed snapshots will go. for example, for user account veniceetl001, snapshots will be published to HDFS /jobs/veniceetl001/storeName."}, + {"name": "regularVersionETLEnabled", "type": "boolean", "doc": "Whether or not enable regular version ETL for this store."}, + {"name": "futureVersionETLEnabled", "type": "boolean", "doc": "Whether or not enable future version ETL - the version that might come online in future - for this store."} + ] + } + ], + "default": null + }, + { + "name": "partitionerConfig", + "doc": "", + "type": [ + "null", + { + "name": "StorePartitionerConfig", + "type": "record", + "fields": [ + {"name": "partitionerClass", "type": "string"}, + {"name": "partitionerParams", "type": {"type": "map", "values": "string"}}, + {"name": "amplificationFactor", "type": "int"} + ] + } + ], + "default": null + }, + {"name": "incrementalPushPolicy", "type": "int", "default": 0, "doc": "Incremental Push Policy to reconcile with real time pushes, and default is 'PUSH_TO_VERSION_TOPIC'"}, + {"name": "latestVersionPromoteToCurrentTimestamp", "type": "long", "default": -1, "doc": "This is used to track the time when a new version is promoted to current version. For now, it is mostly to decide whether a backup version can be removed or not based on retention. For the existing store before this code change, it will be set to be current timestamp."}, + {"name": "backupVersionRetentionMs", "type": "long", "default": -1, "doc": "Backup retention time, and if it is not set (-1), Venice Controller will use the default configured retention. {@link com.linkedin.venice.ConfigKeys#CONTROLLER_BACKUP_VERSION_DEFAULT_RETENTION_MS}."}, + {"name": "replicationFactor", "type": "int", "default": 3, "doc": "The number of replica each store version will keep."}, + {"name": "migrationDuplicateStore", "type": "boolean", "default": false, "doc": "Whether or not the store is a duplicate store in the process of migration."}, + {"name": "nativeReplicationSourceFabric", "type": "string", "default": "", "doc": "The source fabric name to be uses in native replication. Remote consumption will happen from kafka in this fabric."}, + {"name": "daVinciPushStatusStoreEnabled", "type": "boolean", "default": false, "doc": "Whether or not davinci push status store is enabled."}, + {"name": "storeMetaSystemStoreEnabled", "type": "boolean", "default": false, "doc": "Whether or not the store meta system store is enabled for this store."}, + {"name": "activeActiveReplicationEnabled", "type": "boolean", "default": false, "doc": "Whether or not active/active replication is enabled for hybrid stores; eventually this config will replace native replication flag, when all stores are on A/A"}, + {"name": "applyTargetVersionFilterForIncPush", "type": "boolean", "default": false, "doc": "Whether or not the target version field in Kafka messages will be used in increment push to RT policy"}, + {"name": "minCompactionLagSeconds", "type": "long", "default": -1, "doc": "Store level min compaction lag config and if not specified, it will use the global config for version topics"}, + {"name": "maxCompactionLagSeconds", "type": "long", "default": -1, "doc": "Store level max compaction lag config and if not specified, 'max.compaction.lag.ms' config won't be setup in the corresponding version topics"}, + {"name": "maxRecordSizeBytes", "type": "int", "default": -1, "doc": "Store-level max record size in bytes. If not specified (-1), the controller config 'default.max.record.size.bytes' (100MB default) will be backfilled"}, + {"name": "maxNearlineRecordSizeBytes", "type": "int", "default": -1, "doc": "Store-level max record size in bytes for nearline jobs with partial updates. If not specified (-1), the server config 'default.max.record.size.bytes' (100MB default) will be backfilled. This may converge with maxRecordSizeBytes in the future"}, + {"name": "unusedSchemaDeletionEnabled", "type": "boolean", "default": false, "doc": "Store level config to indicate whether unused schema deletion is enabled or not."}, + { + "name": "versions", + "doc": "List of non-retired versions. It's currently sorted and there is code run under the assumption that the last element in the list is the largest. Check out {VeniceHelixAdmin#getIncrementalPushVersion}, and please make it in mind if you want to change this logic", + "type": { + "type": "array", + "items": { + "name": "StoreVersion", + "type": "record", + "doc": "Type describes all the version attributes", + "fields": [ + {"name": "storeName", "type": "string", "doc": "Name of the store which this version belong to."}, + {"name": "number", "type": "int", "doc": "Version number."}, + {"name": "createdTime", "type": "long", "doc": "Time when this version was created."}, + {"name": "status", "type": "int", "default": 1, "doc": "Status of version, and default is 'STARTED'"}, + {"name": "pushJobId", "type": "string", "default": ""}, + {"name": "compressionStrategy", "type": "int", "default": 0, "doc": "strategies used to compress/decompress Record's value, and default is 'NO_OP'"}, + {"name": "leaderFollowerModelEnabled", "type": "boolean", "default": false, "doc": "Whether or not to use leader follower state transition."}, + {"name": "nativeReplicationEnabled", "type": "boolean", "default": false, "doc": "Whether or not native replication is enabled."}, + {"name": "pushStreamSourceAddress", "type": "string", "default": "", "doc": "Address to the kafka broker which holds the source of truth topic for this store version."}, + {"name": "bufferReplayEnabledForHybrid", "type": "boolean", "default": true, "doc": "Whether or not to enable buffer replay for hybrid."}, + {"name": "chunkingEnabled", "type": "boolean", "default": false, "doc": "Whether or not large values are supported (via chunking)."}, + {"name": "rmdChunkingEnabled", "type": "boolean", "default": false, "doc": "Whether or not large replication metadata are supported (via chunking)."}, + {"name": "pushType", "type": "int", "default": 0, "doc": "Producer type for this version, and default is 'BATCH'"}, + {"name": "partitionCount", "type": "int", "default": 0, "doc": "Partition count of this version."}, + { + "name": "partitionerConfig", + "type": [ + "null", + "com.linkedin.venice.systemstore.schemas.StorePartitionerConfig" + ], + "default": null, + "doc": "Config for custom partitioning." + }, + {"name": "incrementalPushPolicy", "type": "int", "default": 0, "doc": "Incremental Push Policy to reconcile with real time pushes., and default is 'PUSH_TO_VERSION_TOPIC'"}, + {"name": "replicationFactor", "type": "int", "default": 3, "doc": "The number of replica this store version is keeping."}, + {"name": "nativeReplicationSourceFabric", "type": "string", "default": "", "doc": "The source fabric name to be uses in native replication. Remote consumption will happen from kafka in this fabric."}, + {"name": "incrementalPushEnabled", "type": "boolean", "default": false, "doc": "Flag to see if the store supports incremental push or not"}, + {"name": "separateRealTimeTopicEnabled", "type": "boolean", "default": false, "doc": "Flag to see if the store supports separate real-time topic for incremental push."}, + {"name": "blobTransferEnabled", "type": "boolean", "default": false, "doc": "Flag to indicate if the blob transfer is allowed or not"}, + {"name": "useVersionLevelIncrementalPushEnabled", "type": "boolean", "default": false, "doc": "Flag to see if incrementalPushEnabled config at StoreVersion should be used. This is needed during migration of this config from Store level to Version level. We can deprecate this field later."}, + { + "name": "hybridConfig", + "type": [ + "null", + "com.linkedin.venice.systemstore.schemas.StoreHybridConfig" + ], + "default": null, + "doc": "Properties related to Hybrid Store behavior. If absent (null), then the store is not hybrid." + }, + {"name": "useVersionLevelHybridConfig", "type": "boolean", "default": false, "doc": "Flag to see if hybridConfig at StoreVersion should be used. This is needed during migration of this config from Store level to Version level. We can deprecate this field later."}, + {"name": "activeActiveReplicationEnabled", "type": "boolean", "default": false, "doc": "Whether or not active/active replication is enabled for hybrid stores; eventually this config will replace native replication flag, when all stores are on A/A"}, + {"name": "timestampMetadataVersionId", "type": "int", "default": -1, "doc": "The A/A timestamp metadata schema version ID that will be used to deserialize metadataPayload."}, + { + "name": "dataRecoveryConfig", + "type": [ + "null", + { + "name": "DataRecoveryConfig", + "type": "record", + "fields": [ + {"name": "dataRecoverySourceFabric", "type": "string", "doc": "The fabric name to be used as the source for data recovery."}, + {"name": "isDataRecoveryComplete", "type": "boolean", "doc": "Whether or not data recovery is complete."}, + {"name": "dataRecoverySourceVersionNumber", "type": "int", "default": 0, "doc": "The store version number to be used as the source for data recovery."} + ] + } + ], + "default": null, + "doc": "Properties related to data recovery mode behavior for this version. If absent (null), then the version never went go through data recovery." + }, + {"name": "deferVersionSwap", "type": "boolean", "default": false, "doc": "flag that informs venice controller to defer marking this version as the serving version after instances report ready to serve. This version must be marked manually as the current version in order to serve traffic from it."}, + { + "name": "views", + "doc": "A list of views which describe and configure a downstream view of a venice store.", + "type": { + "type": "map", + "java-key-class": "java.lang.String", + "avro.java.string": "String", + "values": "com.linkedin.venice.systemstore.schemas.StoreViewConfig" + }, + "default": {} + }, + {"name": "repushSourceVersion", "type": "int", "default": -1, "doc": "For store version created from repush, indicates the source store version its created from."} + ] + } + }, + "default": [] + }, + { + "name": "systemStores", + "doc": "This field is used to maintain a mapping between each type of system store and the corresponding distinct properties", + "type": { + "type": "map", + "values": { + "name": "SystemStoreProperties", + "type": "record", + "doc": "This type describes all the distinct properties", + "fields": [ + {"name": "largestUsedVersionNumber", "type": "int", "default": 0}, + {"name": "currentVersion", "type": "int", "default": 0}, + {"name": "latestVersionPromoteToCurrentTimestamp", "type": "long", "default": -1}, + {"name": "versions", "type": {"type": "array", "items": "com.linkedin.venice.systemstore.schemas.StoreVersion"}, "default": []} + ] + } + }, + "default": {} + }, + {"name": "storageNodeReadQuotaEnabled", "type": "boolean", "default": false, "doc": "Controls the storage node read quota enforcement for the given Venice store"}, + {"name": "blobTransferEnabled", "type": "boolean", "default": false, "doc": "Flag to indicate if the blob transfer is allowed or not"} + ] + } + ], + "default": null + }, + { + "name": "storeKeySchemas", + "doc": "", + "type": [ + "null", + { + "name": "StoreKeySchemas", + "doc": "This type describes the key schemas of the store", + "type": "record", + "fields": [ + { + "name": "keySchemaMap", + "doc": "A string to string map representing the mapping from id to key schema.", + "type": { + "type": "map", + "values": "string" + } + } + ] + } + ], + "default": null + }, + { + "name": "storeValueSchemas", + "doc": "", + "type": [ + "null", + { + "name": "StoreValueSchemas", + "doc": "This type describes the value schemas of the store.", + "type": "record", + "fields": [ + { + "name": "valueSchemaMap", + "doc": "A string to string map representing the mapping from schema id to value schema string. The value could be an empty string indicating the value schema is stored in another field.", + "type": { + "type": "map", + "values": "string" + } + } + ] + } + ], + "default": null + }, + { + "name": "storeValueSchema", + "doc": "", + "type": [ + "null", + { + "name": "StoreValueSchema", + "doc": "This type describes a single version of the value schema of the store.", + "type": "record", + "fields": [ + { + "name": "valueSchema", + "doc": "Store value schema string.", + "type": "string", + "default": "" + } + ] + } + ], + "default": null + }, + { + "name": "storeReplicaStatuses", + "doc": "This field describes the replica statuses per version per partition, and the mapping is 'host_port' -> 'replica status'", + "type": [ + "null", + { + "type": "map", + "values": { + "name": "StoreReplicaStatus", + "type": "record", + "doc": "This structure will contain all kinds of info related to one replica", + "fields": [ + {"name": "status", "type": "int", "doc": "replica status"} + ] + } + } + ], + "default": null + }, + { + "name": "storeValueSchemaIdsWrittenPerStoreVersion", + "doc": "This field described the set of value schemas id written by a store version.", + "type": [ + "null", + { + "name": "StoreValueSchemaIdsWrittenPerStoreVersion", + "doc": "This type describes value schema IDs written by the store version.", + "type": "array", + "items": "int" + } + ], + "default": null + }, + { + "name": "storeClusterConfig", + "doc": "This is the Zk's StoreConfig equivalent which contains various Venice cluster information", + "type": [ + "null", + { + "name": "StoreClusterConfig", + "doc": "This type describes the various Venice cluster information for a store", + "type": "record", + "fields": [ + {"name": "cluster", "type": "string", "default": "", "doc": "The Venice cluster of the store."}, + {"name": "deleting", "type": "boolean", "default": false, "doc": "Is the store undergoing deletion."}, + {"name": "migrationDestCluster", "type": ["null", "string"], "default": null, "doc": "The destination cluster for store migration"}, + {"name": "migrationSrcCluster", "type": ["null", "string"], "default": null, "doc": "The source cluster for store migration"}, + {"name": "storeName", "type": "string", "default": "", "doc": "The name of the store"} + ] + } + ], + "default": null + } + ] +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithSharedEnvironment.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithSharedEnvironment.java index 40dc8d98e6..1fabff7a5a 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithSharedEnvironment.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithSharedEnvironment.java @@ -1845,14 +1845,22 @@ public void testHybridStoreToBatchOnly() { veniceAdmin.updateStore( clusterName, storeName, - new UpdateStoreQueryParams().setHybridOffsetLagThreshold(1).setHybridRewindSeconds(1)); + new UpdateStoreQueryParams().setHybridOffsetLagThreshold(1) + .setHybridRewindSeconds(1) + .setSeparateRealTimeTopicEnabled(true)); veniceAdmin.incrementVersionIdempotent(clusterName, storeName, Version.guidBasedDummyPushId(), 1, 1); TestUtils.waitForNonDeterministicCompletion( TOTAL_TIMEOUT_FOR_SHORT_TEST_MS, TimeUnit.MILLISECONDS, () -> veniceAdmin.getCurrentVersion(clusterName, storeName) == 1); + Assert.assertTrue(veniceAdmin.getStore(clusterName, storeName).isHybrid()); + Assert.assertTrue(veniceAdmin.getStore(clusterName, storeName).isSeparateRealTimeTopicEnabled()); + Assert.assertTrue(veniceAdmin.getStore(clusterName, storeName).getVersion(1).isSeparateRealTimeTopicEnabled()); + String rtTopic = veniceAdmin.getRealTimeTopic(clusterName, storeName); + String incrementalPushRealTimeTopic = veniceAdmin.getSeparateRealTimeTopic(clusterName, storeName); Assert.assertFalse(veniceAdmin.isTopicTruncated(rtTopic)); + Assert.assertFalse(veniceAdmin.isTopicTruncated(incrementalPushRealTimeTopic)); veniceAdmin.updateStore( clusterName, storeName, @@ -1860,6 +1868,7 @@ public void testHybridStoreToBatchOnly() { .setHybridRewindSeconds(-1) .setHybridTimeLagThreshold(-1)); Assert.assertFalse(veniceAdmin.isTopicTruncated(rtTopic)); + Assert.assertFalse(veniceAdmin.isTopicTruncated(incrementalPushRealTimeTopic)); // Perform two new pushes and the RT should be deleted upon the completion of the new pushes. veniceAdmin.incrementVersionIdempotent(clusterName, storeName, Version.guidBasedDummyPushId(), 1, 1); TestUtils.waitForNonDeterministicCompletion( @@ -1872,6 +1881,7 @@ public void testHybridStoreToBatchOnly() { TimeUnit.MILLISECONDS, () -> veniceAdmin.getCurrentVersion(clusterName, storeName) == 3); Assert.assertTrue(veniceAdmin.isTopicTruncated(rtTopic)); + Assert.assertTrue(veniceAdmin.isTopicTruncated(incrementalPushRealTimeTopic)); } @Test(timeOut = TOTAL_TIMEOUT_FOR_LONG_TEST_MS) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientMemoryLimitTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientMemoryLimitTest.java index 1d7f7a0959..b8ab2c13ef 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientMemoryLimitTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientMemoryLimitTest.java @@ -257,7 +257,6 @@ public void testDaVinciMemoryLimitShouldFailLargeDataPush( } } - @Test(timeOut = TEST_TIMEOUT, dataProviderClass = DataProviderUtils.class, dataProvider = "Two-True-and-False") public void testDaVinciMemoryLimitShouldFailLargeDataPushAndResumeHybridStore( boolean ingestionIsolationEnabledInDaVinci, boolean useDaVinciSpecificExecutionStatusForError) throws Exception { diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveReplicationForIncPush.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveReplicationForIncPush.java index 430a28dff2..a4216b42b7 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveReplicationForIncPush.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveReplicationForIncPush.java @@ -8,6 +8,7 @@ import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE; import static com.linkedin.venice.ConfigKeys.SERVER_DEDICATED_CONSUMER_POOL_FOR_AA_WC_LEADER_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS; +import static com.linkedin.venice.pubsub.PubSubConstants.PUBSUB_OPERATION_TIMEOUT_MS_DEFAULT_VALUE; import static com.linkedin.venice.utils.TestUtils.assertCommand; import static com.linkedin.venice.utils.TestUtils.waitForNonDeterministicAssertion; import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory; @@ -33,15 +34,22 @@ import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreInfo; import com.linkedin.venice.meta.Version; +import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; +import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.api.PubSubTopicPartition; +import com.linkedin.venice.pubsub.manager.TopicManager; +import com.linkedin.venice.utils.DataProviderUtils; import com.linkedin.venice.utils.IntegrationTestPushUtils; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.TestWriteUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import java.io.File; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; import org.apache.avro.Schema; @@ -63,9 +71,10 @@ public class TestActiveActiveReplicationForIncPush { private String[] clusterNames; private String parentRegionName; private String[] dcNames; - + private String clusterName; private List childDatacenters; private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper; + private static final PubSubTopicRepository PUB_SUB_TOPIC_REPOSITORY = new PubSubTopicRepository(); PubSubBrokerWrapper veniceParentDefaultKafka; @@ -102,9 +111,9 @@ public void setUp() { false); childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); clusterNames = multiRegionMultiClusterWrapper.getClusterNames(); + clusterName = this.clusterNames[0]; parentRegionName = multiRegionMultiClusterWrapper.getParentRegionName(); dcNames = multiRegionMultiClusterWrapper.getChildRegionNames().toArray(new String[0]); - veniceParentDefaultKafka = multiRegionMultiClusterWrapper.getParentKafkaBrokerWrapper(); } @@ -117,9 +126,8 @@ public void cleanUp() { * The purpose of this test is to verify that incremental push with RT policy succeeds when A/A is enabled in all * regions. And also incremental push can push to the closes kafka cluster from the grid using the SOURCE_GRID_CONFIG. */ - @Test(timeOut = TEST_TIMEOUT) - public void testAAReplicationForIncrementalPushToRT() throws Exception { - String clusterName = this.clusterNames[0]; + @Test(timeOut = TEST_TIMEOUT, dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class) + public void testAAReplicationForIncrementalPushToRT(Boolean isSeparateRealTimeTopicEnabled) throws Exception { File inputDirBatch = getTempDataDirectory(); File inputDirInc1 = getTempDataDirectory(); File inputDirInc2 = getTempDataDirectory(); @@ -174,7 +182,8 @@ public void testAAReplicationForIncrementalPushToRT() throws Exception { .setPartitionCount(1) .setHybridOffsetLagThreshold(TEST_TIMEOUT / 2) .setHybridRewindSeconds(2L) - .setNativeReplicationSourceFabric("dc-2"); + .setNativeReplicationSourceFabric("dc-2") + .setSeparateRealTimeTopicEnabled(isSeparateRealTimeTopicEnabled); TestUtils.assertCommand(parentControllerClient.updateStore(storeName, updateStoreParams)); @@ -208,29 +217,97 @@ public void testAAReplicationForIncrementalPushToRT() throws Exception { job.run(); Assert.assertEquals(job.getKafkaUrl(), childDatacenters.get(2).getKafkaBrokerWrapper().getAddress()); } - // Run inc push with source fabric preference taking effect. - try (VenicePushJob job = new VenicePushJob("Test push job incremental with NR + A/A from dc-2", propsInc1)) { - job.run(); - Assert.assertEquals(job.getKafkaUrl(), childDatacenters.get(2).getKafkaBrokerWrapper().getAddress()); + if (isSeparateRealTimeTopicEnabled) { + verifyForSeparateIncrementalPushTopic(storeName, propsInc1, 2); + } else { + verifyForRealTimeIncrementalPushTopic(storeName, propsInc1, propsInc2); } + } + } - // Verify - for (int i = 0; i < childDatacenters.size(); i++) { - VeniceMultiClusterWrapper childDataCenter = childDatacenters.get(i); - // Verify the current version should be 1. - Version version = - childDataCenter.getRandomController().getVeniceAdmin().getStore(clusterName, storeName).getVersion(1); - Assert.assertNotNull(version, "Version 1 is not present for DC: " + dcNames[i]); - } - NativeReplicationTestUtils.verifyIncrementalPushData(childDatacenters, clusterName, storeName, 150, 2); + private void verifyForSeparateIncrementalPushTopic( + String storeName, + Properties propsInc1, + int dcIndexForSourceRegion) { + // Prepare TopicManagers + List topicManagers = new ArrayList<>(); + for (VeniceMultiClusterWrapper childDataCenter: childDatacenters) { + PubSubTopicRepository pubSubTopicRepository = + childDataCenter.getClusters().get(clusterNames[0]).getPubSubTopicRepository(); + topicManagers.add( + IntegrationTestPushUtils + .getTopicManagerRepo( + PUBSUB_OPERATION_TIMEOUT_MS_DEFAULT_VALUE, + 100, + 0l, + childDataCenter.getKafkaBrokerWrapper(), + pubSubTopicRepository) + .getLocalTopicManager()); + } + // Run inc push with source fabric preference taking effect. + PubSubTopicPartition separateRealTimeTopicPartition = new PubSubTopicPartitionImpl( + PUB_SUB_TOPIC_REPOSITORY.getTopic(Version.composeSeparateRealTimeTopic(storeName)), + 0); + PubSubTopicPartition realTimeTopicPartition = + new PubSubTopicPartitionImpl(PUB_SUB_TOPIC_REPOSITORY.getTopic(Version.composeRealTimeTopic(storeName)), 0); + try (VenicePushJob job = new VenicePushJob("Test push job incremental with NR + A/A from dc-2", propsInc1)) { + // TODO: Once server part separate topic ingestion logic is ready, we should avoid runAsync here and add extra + // check + CompletableFuture.runAsync(job::run); + TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, () -> { + Assert.assertEquals( + job.getKafkaUrl(), + childDatacenters.get(dcIndexForSourceRegion).getKafkaBrokerWrapper().getAddress()); + for (int dcIndex = 0; dcIndex < childDatacenters.size(); dcIndex++) { + long separateTopicOffset = + topicManagers.get(dcIndex).getLatestOffsetWithRetries(separateRealTimeTopicPartition, 3); + long realTimeTopicOffset = topicManagers.get(dcIndex).getLatestOffsetWithRetries(realTimeTopicPartition, 3); + // Real-time topic will have heartbeat messages, so the offset will be non-zero but smaller than the record + // count. + // DC 2 separeate real-time topic should get enough data. + if (dcIndex == dcIndexForSourceRegion) { + Assert.assertTrue( + separateTopicOffset > TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT, + "Records # is not enough: " + separateTopicOffset); + Assert.assertTrue( + realTimeTopicOffset < TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT / 10, + "Records # is more than expected: " + realTimeTopicOffset); + } else { + assertEquals(separateTopicOffset, 0, "Records # is not enough: " + separateTopicOffset); + Assert.assertTrue( + realTimeTopicOffset < TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT / 10, + "Records # is more than expected: " + realTimeTopicOffset); + } + } + }); + job.cancel(); + } + } - // Run another inc push with a different source fabric preference taking effect. - try (VenicePushJob job = new VenicePushJob("Test push job incremental with NR + A/A from dc-1", propsInc2)) { - job.run(); - Assert.assertEquals(job.getKafkaUrl(), childDatacenters.get(1).getKafkaBrokerWrapper().getAddress()); - } - NativeReplicationTestUtils.verifyIncrementalPushData(childDatacenters, clusterName, storeName, 200, 3); + private void verifyForRealTimeIncrementalPushTopic(String storeName, Properties propsInc1, Properties propsInc2) + throws Exception { + // Run inc push with source fabric preference taking effect. + try (VenicePushJob job = new VenicePushJob("Test push job incremental with NR + A/A from dc-2", propsInc1)) { + job.run(); + Assert.assertEquals(job.getKafkaUrl(), childDatacenters.get(2).getKafkaBrokerWrapper().getAddress()); + } + + // Verify + for (int i = 0; i < childDatacenters.size(); i++) { + VeniceMultiClusterWrapper childDataCenter = childDatacenters.get(i); + // Verify the current version should be 1. + Version version = + childDataCenter.getRandomController().getVeniceAdmin().getStore(clusterName, storeName).getVersion(1); + Assert.assertNotNull(version, "Version 1 is not present for DC: " + dcNames[i]); + } + NativeReplicationTestUtils.verifyIncrementalPushData(childDatacenters, clusterName, storeName, 150, 2); + + // Run another inc push with a different source fabric preference taking effect. + try (VenicePushJob job = new VenicePushJob("Test push job incremental with NR + A/A from dc-1", propsInc2)) { + job.run(); + Assert.assertEquals(job.getKafkaUrl(), childDatacenters.get(1).getKafkaBrokerWrapper().getAddress()); } + NativeReplicationTestUtils.verifyIncrementalPushData(childDatacenters, clusterName, storeName, 200, 3); } public static void verifyHybridAndIncPushConfig( diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java index 6547752ad2..7269235874 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java @@ -290,6 +290,8 @@ Version incrementVersionIdempotent( String getRealTimeTopic(String clusterName, String storeName); + String getSeparateRealTimeTopic(String clusterName, String storeName); + /** * Right now, it will return the latest version recorded in parent controller. There are a couple of edge cases. * 1. If a push fails in some colos, the version will be inconsistent among colos diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java index 50bdfaec33..e0c1881e05 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java @@ -99,6 +99,7 @@ import static com.linkedin.venice.ConfigKeys.ENABLE_PARTIAL_UPDATE_FOR_HYBRID_ACTIVE_ACTIVE_USER_STORES; import static com.linkedin.venice.ConfigKeys.ENABLE_PARTIAL_UPDATE_FOR_HYBRID_NON_ACTIVE_ACTIVE_USER_STORES; import static com.linkedin.venice.ConfigKeys.ENABLE_PARTITION_COUNT_ROUND_UP; +import static com.linkedin.venice.ConfigKeys.ENABLE_SEPARATE_REAL_TIME_TOPIC_FOR_STORE_WITH_INCREMENTAL_PUSH; import static com.linkedin.venice.ConfigKeys.ERROR_PARTITION_AUTO_RESET_LIMIT; import static com.linkedin.venice.ConfigKeys.ERROR_PARTITION_PROCESSING_CYCLE_DELAY; import static com.linkedin.venice.ConfigKeys.FATAL_DATA_VALIDATION_FAILURE_TOPIC_RETENTION_MS; @@ -439,6 +440,12 @@ public class VeniceControllerClusterConfig { */ private final boolean enabledIncrementalPushForHybridActiveActiveUserStores; + /** + * When the following option is enabled, new user hybrid store with incremental push enabled will automatically + * have separate real time topic enabled. + */ + private final boolean enabledSeparateRealTimeTopicForStoreWithIncrementalPush; + private final boolean enablePartialUpdateForHybridActiveActiveUserStores; private final boolean enablePartialUpdateForHybridNonActiveActiveUserStores; @@ -571,6 +578,8 @@ public VeniceControllerClusterConfig(VeniceProperties props) { this.controllerSchemaValidationEnabled = props.getBoolean(CONTROLLER_SCHEMA_VALIDATION_ENABLED, true); this.enabledIncrementalPushForHybridActiveActiveUserStores = props.getBoolean(ENABLE_INCREMENTAL_PUSH_FOR_HYBRID_ACTIVE_ACTIVE_USER_STORES, false); + this.enabledSeparateRealTimeTopicForStoreWithIncrementalPush = + props.getBoolean(ENABLE_SEPARATE_REAL_TIME_TOPIC_FOR_STORE_WITH_INCREMENTAL_PUSH, false); this.enablePartialUpdateForHybridActiveActiveUserStores = props.getBoolean(ENABLE_PARTIAL_UPDATE_FOR_HYBRID_ACTIVE_ACTIVE_USER_STORES, false); this.enablePartialUpdateForHybridNonActiveActiveUserStores = @@ -1141,6 +1150,10 @@ public boolean enabledIncrementalPushForHybridActiveActiveUserStores() { return enabledIncrementalPushForHybridActiveActiveUserStores; } + public boolean enabledSeparateRealTimeTopicForStoreWithIncrementalPush() { + return enabledSeparateRealTimeTopicForStoreWithIncrementalPush; + } + public boolean isEnablePartialUpdateForHybridActiveActiveUserStores() { return enablePartialUpdateForHybridActiveActiveUserStores; } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 55f03147a6..fe8170c501 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -2740,6 +2740,17 @@ private Pair addVersion( // Note: do not enable RT compaction! Might make jobs in Online/Offline model stuck clusterConfig.getMinInSyncReplicasRealTimeTopics(), false); + if (version.isSeparateRealTimeTopicEnabled()) { + getTopicManager().createTopic( + pubSubTopicRepository.getTopic(Version.composeSeparateRealTimeTopic(storeName)), + numberOfPartitions, + clusterConfig.getKafkaReplicationFactorRTTopics(), + StoreUtils.getExpectedRetentionTimeInMs(store, store.getHybridStoreConfig()), + false, + // Note: do not enable RT compaction! Might make jobs in Online/Offline model stuck + clusterConfig.getMinInSyncReplicasRealTimeTopics(), + false); + } } else { // If real-time topic already exists, check whether its retention time is correct. PubSubTopicConfiguration pubSubTopicConfiguration = @@ -3103,14 +3114,29 @@ private Optional getVersionWithPushId(String clusterName, String storeN @Override public String getRealTimeTopic(String clusterName, String storeName) { checkControllerLeadershipFor(clusterName); - TopicManager topicManager = getTopicManager(); PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName)); + ensureRealTimeTopicIsReady(clusterName, realTimeTopic); + return realTimeTopic.getName(); + } + + @Override + public String getSeparateRealTimeTopic(String clusterName, String storeName) { + checkControllerLeadershipFor(clusterName); + PubSubTopic incrementalPushRealTimeTopic = + pubSubTopicRepository.getTopic(Version.composeSeparateRealTimeTopic(storeName)); + ensureRealTimeTopicIsReady(clusterName, incrementalPushRealTimeTopic); + return incrementalPushRealTimeTopic.getName(); + } + + private void ensureRealTimeTopicIsReady(String clusterName, PubSubTopic realTimeTopic) { + TopicManager topicManager = getTopicManager(); + String storeName = realTimeTopic.getStoreName(); if (!topicManager.containsTopic(realTimeTopic)) { HelixVeniceClusterResources resources = getHelixVeniceClusterResources(clusterName); try (AutoCloseableLock ignore = resources.getClusterLockManager().createStoreWriteLock(storeName)) { // The topic might be created by another thread already. Check before creating. if (topicManager.containsTopic(realTimeTopic)) { - return realTimeTopic.getName(); + return; } ReadWriteStoreRepository repository = resources.getStoreMetadataRepository(); Store store = repository.getStore(storeName); @@ -3154,7 +3180,6 @@ public String getRealTimeTopic(String clusterName, String storeName) { storeName); } } - return realTimeTopic.getName(); } /** @@ -3513,6 +3538,15 @@ private void safeDeleteRTTopic(String clusterName, String storeName, Store store for (ControllerClient controllerClient: controllerClientMap.values()) { controllerClient.deleteKafkaTopic(rtTopicToDelete); } + // Check if there is incremental push topic exist. If yes, delete it and send out to let other controller to + // delete it. + String incrementalPushRTTopicToDelete = Version.composeSeparateRealTimeTopic(storeName); + if (getTopicManager().containsTopic(pubSubTopicRepository.getTopic(incrementalPushRTTopicToDelete))) { + truncateKafkaTopic(incrementalPushRTTopicToDelete); + for (ControllerClient controllerClient: controllerClientMap.values()) { + controllerClient.deleteKafkaTopic(incrementalPushRTTopicToDelete); + } + } } } @@ -4420,6 +4454,13 @@ void setIncrementalPushEnabled(String clusterName, String storeName, boolean inc }); } + void setSeparateRealTimeTopicEnabled(String clusterName, String storeName, boolean separateRealTimeTopicEnabled) { + storeMetadataUpdate(clusterName, storeName, store -> { + store.setSeparateRealTimeTopicEnabled(separateRealTimeTopicEnabled); + return store; + }); + } + private void setReplicationFactor(String clusterName, String storeName, int replicaFactor) { storeMetadataUpdate(clusterName, storeName, store -> { store.setReplicationFactor(replicaFactor); @@ -4699,6 +4740,7 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto Optional batchGetLimit = params.getBatchGetLimit(); Optional numVersionsToPreserve = params.getNumVersionsToPreserve(); Optional incrementalPushEnabled = params.getIncrementalPushEnabled(); + Optional separateRealTimeTopicEnabled = params.getSeparateRealTimeTopicEnabled(); Optional storeMigration = params.getStoreMigration(); Optional writeComputationEnabled = params.getWriteComputationEnabled(); Optional replicationMetadataVersionID = params.getReplicationMetadataVersionID(); @@ -4887,6 +4929,10 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto setIncrementalPushEnabled(clusterName, storeName, incrementalPushEnabled.get()); } + if (separateRealTimeTopicEnabled.isPresent()) { + setSeparateRealTimeTopicEnabled(clusterName, storeName, separateRealTimeTopicEnabled.get()); + } + if (replicationFactor.isPresent()) { setReplicationFactor(clusterName, storeName, replicationFactor.get()); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 8610718213..aaf3fa1ed8 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -49,6 +49,7 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.REPLICATION_METADATA_PROTOCOL_VERSION_ID; import static com.linkedin.venice.controllerapi.ControllerApiConstants.REWIND_TIME_IN_SECONDS; import static com.linkedin.venice.controllerapi.ControllerApiConstants.RMD_CHUNKING_ENABLED; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.SEPARATE_REAL_TIME_TOPIC_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORAGE_NODE_READ_QUOTA_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORAGE_QUOTA_IN_BYTE; import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_MIGRATION; @@ -1704,6 +1705,11 @@ public String getRealTimeTopic(String clusterName, String storeName) { return getVeniceHelixAdmin().getRealTimeTopic(clusterName, storeName); } + @Override + public String getSeparateRealTimeTopic(String clusterName, String storeName) { + return getVeniceHelixAdmin().getSeparateRealTimeTopic(clusterName, storeName); + } + /** * A couple of extra checks are needed in parent controller * 1. check batch job statuses across child controllers. (We cannot only check the version status @@ -2229,6 +2235,7 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa Optional batchGetLimit = params.getBatchGetLimit(); Optional numVersionsToPreserve = params.getNumVersionsToPreserve(); Optional incrementalPushEnabled = params.getIncrementalPushEnabled(); + Optional separateRealTimeTopicEnabled = params.getSeparateRealTimeTopicEnabled(); Optional storeMigration = params.getStoreMigration(); Optional writeComputationEnabled = params.getWriteComputationEnabled(); Optional replicationMetadataVersionID = params.getReplicationMetadataVersionID(); @@ -2460,6 +2467,13 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa setStore.incrementalPushEnabled = true; updatedConfigsList.add(INCREMENTAL_PUSH_ENABLED); } + // Enable separate real-time topic automatically when incremental push is enabled and cluster config allows it. + if (setStore.incrementalPushEnabled + && controllerConfig.enabledSeparateRealTimeTopicForStoreWithIncrementalPush()) { + setStore.separateRealTimeTopicEnabled = true; + updatedConfigsList.add(SEPARATE_REAL_TIME_TOPIC_ENABLED); + } + // When turning off hybrid store, we will also turn off incremental store config. if (storeBeingConvertedToBatch && setStore.incrementalPushEnabled) { setStore.incrementalPushEnabled = false; @@ -2584,6 +2598,10 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa .map(addToUpdatedConfigList(updatedConfigsList, BLOB_TRANSFER_ENABLED)) .orElseGet(currStore::isBlobTransferEnabled); + setStore.separateRealTimeTopicEnabled = + separateRealTimeTopicEnabled.map(addToUpdatedConfigList(updatedConfigsList, SEPARATE_REAL_TIME_TOPIC_ENABLED)) + .orElseGet(currStore::isSeparateRealTimeTopicEnabled); + // Check whether the passed param is valid or not if (latestSupersetSchemaId.isPresent()) { if (latestSupersetSchemaId.get() != SchemaData.INVALID_VALUE_SCHEMA_ID) { diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java index d2e3094a58..f0188c257b 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java @@ -486,6 +486,7 @@ private void handleSetStore(UpdateStore message) { .setBatchGetLimit(message.batchGetLimit) .setNumVersionsToPreserve(message.numVersionsToPreserve) .setIncrementalPushEnabled(message.incrementalPushEnabled) + .setSeparateRealTimeTopicEnabled(message.separateRealTimeTopicEnabled) .setStoreMigration(message.isMigrating) .setWriteComputationEnabled(message.writeComputationEnabled) .setReadComputationEnabled(message.readComputationEnabled) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/CreateVersion.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/CreateVersion.java index 44afaa81fb..c22d6c3b3e 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/CreateVersion.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/CreateVersion.java @@ -306,7 +306,12 @@ public Route requestTopicForPushing(Admin admin) { responseTopic = Version.composeStreamReprocessingTopic(storeName, version.getNumber()); } else if (pushType.isIncremental()) { isTopicRT = true; - responseTopic = Version.composeRealTimeTopic(storeName); + if (version.isSeparateRealTimeTopicEnabled()) { + admin.getSeparateRealTimeTopic(clusterName, storeName); + responseTopic = Version.composeSeparateRealTimeTopic(storeName); + } else { + responseTopic = Version.composeRealTimeTopic(storeName); + } // disable amplificationFactor logic on real-time topic responseObject.setAmplificationFactor(1); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/ingestion/control/RealTimeTopicSwitcher.java b/services/venice-controller/src/main/java/com/linkedin/venice/ingestion/control/RealTimeTopicSwitcher.java index f12180fb7d..f20209b903 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/ingestion/control/RealTimeTopicSwitcher.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/ingestion/control/RealTimeTopicSwitcher.java @@ -131,6 +131,8 @@ void ensurePreconditions( if (!hybridStoreConfig.isPresent()) { throw new VeniceException("Topic switching is only supported for Hybrid Stores."); } + Version version = + store.getVersion(Version.parseVersionFromKafkaTopicName(topicWhereToSendTheTopicSwitch.getName())); /** * TopicReplicator is used in child fabrics to create real-time (RT) topic when a child fabric * is ready to start buffer replay but RT topic doesn't exist. This scenario could happen for a @@ -146,35 +148,47 @@ void ensurePreconditions( * doesn't have any existing version or a correct storage quota, we cannot decide the partition * number for it. */ - if (!getTopicManager().containsTopicAndAllPartitionsAreOnline(srcTopicName)) { + createRealTimeTopicIfNeeded(store, version, srcTopicName, hybridStoreConfig.get()); + if (version != null && version.isSeparateRealTimeTopicEnabled()) { + PubSubTopic separateRealTimeTopic = + pubSubTopicRepository.getTopic(Version.composeSeparateRealTimeTopic(store.getName())); + createRealTimeTopicIfNeeded(store, version, separateRealTimeTopic, hybridStoreConfig.get()); + } + } + + void createRealTimeTopicIfNeeded( + Store store, + Version version, + PubSubTopic realTimeTopic, + HybridStoreConfig hybridStoreConfig) { + if (!getTopicManager().containsTopicAndAllPartitionsAreOnline(realTimeTopic)) { int partitionCount; - Version version = - store.getVersion(Version.parseVersionFromKafkaTopicName(topicWhereToSendTheTopicSwitch.getName())); if (version != null) { partitionCount = version.getPartitionCount(); } else { partitionCount = store.getPartitionCount(); } - int replicationFactor = srcTopicName.isRealTime() ? kafkaReplicationFactorForRTTopics : kafkaReplicationFactor; - Optional minISR = srcTopicName.isRealTime() ? minSyncReplicasForRTTopics : Optional.empty(); + int replicationFactor = realTimeTopic.isRealTime() ? kafkaReplicationFactorForRTTopics : kafkaReplicationFactor; + Optional minISR = realTimeTopic.isRealTime() ? minSyncReplicasForRTTopics : Optional.empty(); getTopicManager().createTopic( - srcTopicName, + realTimeTopic, partitionCount, replicationFactor, - StoreUtils.getExpectedRetentionTimeInMs(store, hybridStoreConfig.get()), - false, // Note: do not enable RT compaction! Might make jobs in Online/Offline model stuck + StoreUtils.getExpectedRetentionTimeInMs(store, hybridStoreConfig), + false, minISR, false); } else { /** * If real-time topic already exists, check whether its retention time is correct. */ - long topicRetentionTimeInMs = getTopicManager().getTopicRetention(srcTopicName); - long expectedRetentionTimeMs = StoreUtils.getExpectedRetentionTimeInMs(store, hybridStoreConfig.get()); + long topicRetentionTimeInMs = getTopicManager().getTopicRetention(realTimeTopic); + long expectedRetentionTimeMs = StoreUtils.getExpectedRetentionTimeInMs(store, hybridStoreConfig); if (topicRetentionTimeInMs != expectedRetentionTimeMs) { - getTopicManager().updateTopicRetention(srcTopicName, expectedRetentionTimeMs); + getTopicManager().updateTopicRetention(realTimeTopic, expectedRetentionTimeMs); } } + } long getRewindStartTime( diff --git a/services/venice-controller/src/main/resources/avro/AdminOperation/v81/AdminOperation.avsc b/services/venice-controller/src/main/resources/avro/AdminOperation/v81/AdminOperation.avsc new file mode 100644 index 0000000000..0987529807 --- /dev/null +++ b/services/venice-controller/src/main/resources/avro/AdminOperation/v81/AdminOperation.avsc @@ -0,0 +1,1134 @@ +{ + "name": "AdminOperation", + "namespace": "com.linkedin.venice.controller.kafka.protocol.admin", + "type": "record", + "fields": [ + { + "name": "operationType", + "doc": "0 => StoreCreation, 1 => ValueSchemaCreation, 2 => PauseStore, 3 => ResumeStore, 4 => KillOfflinePushJob, 5 => DisableStoreRead, 6 => EnableStoreRead, 7=> DeleteAllVersions, 8=> SetStoreOwner, 9=> SetStorePartitionCount, 10=> SetStoreCurrentVersion, 11=> UpdateStore, 12=> DeleteStore, 13=> DeleteOldVersion, 14=> MigrateStore, 15=> AbortMigration, 16=>AddVersion, 17=> DerivedSchemaCreation, 18=>SupersetSchemaCreation, 19=>EnableNativeReplicationForCluster, 20=>MetadataSchemaCreation, 21=>EnableActiveActiveReplicationForCluster, 25=>CreatePersona, 26=>DeletePersona, 27=>UpdatePersona, 28=>RollbackCurrentVersion, 29=>RollforwardCurrentVersion", + "type": "int" + }, { + "name": "executionId", + "doc": "ID of a command execution which is used to query the status of this command.", + "type": "long", + "default": 0 + }, { + "name": "payloadUnion", + "doc": "This contains the main payload of the admin operation", + "type": [ + { + "name": "StoreCreation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "owner", + "type": "string" + }, + { + "name": "keySchema", + "type": { + "type": "record", + "name": "SchemaMeta", + "fields": [ + {"name": "schemaType", "type": "int", "doc": "0 => Avro-1.4, and we can add more if necessary"}, + {"name": "definition", "type": "string"} + ] + } + }, + { + "name": "valueSchema", + "type": "SchemaMeta" + } + ] + }, + { + "name": "ValueSchemaCreation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "schema", + "type": "SchemaMeta" + }, + { + "name": "schemaId", + "type": "int" + }, + { + "name": "doUpdateSupersetSchemaID", + "type": "boolean", + "doc": "Whether this superset schema ID should be updated to be the value schema ID for this store.", + "default": false + } + ] + }, + { + "name": "PauseStore", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "ResumeStore", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "KillOfflinePushJob", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "kafkaTopic", + "type": "string" + } + ] + }, + { + "name": "DisableStoreRead", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "EnableStoreRead", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "DeleteAllVersions", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "SetStoreOwner", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "owner", + "type": "string" + } + ] + }, + { + "name": "SetStorePartitionCount", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "partitionNum", + "type": "int" + } + ] + }, + { + "name": "SetStoreCurrentVersion", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "currentVersion", + "type": "int" + } + ] + }, + { + "name": "UpdateStore", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "owner", + "type": "string" + }, + { + "name": "partitionNum", + "type": "int" + }, + { + "name": "currentVersion", + "type": "int" + }, + { + "name": "enableReads", + "type": "boolean" + }, + { + "name": "enableWrites", + "type": "boolean" + }, + { + "name": "storageQuotaInByte", + "type": "long", + "default": 21474836480 + }, + { + "name": "readQuotaInCU", + "type": "long", + "default": 1800 + }, + { + "name": "hybridStoreConfig", + "type": [ + "null", + { + "name": "HybridStoreConfigRecord", + "type": "record", + "fields": [ + { + "name": "rewindTimeInSeconds", + "type": "long" + }, + { + "name": "offsetLagThresholdToGoOnline", + "type": "long" + }, + { + "name": "producerTimestampLagThresholdToGoOnlineInSeconds", + "type": "long", + "default": -1 + }, + { + "name": "dataReplicationPolicy", + "doc": "Real-time Samza job data replication policy. Using int because Avro Enums are not evolvable 0 => NON_AGGREGATE, 1 => AGGREGATE, 2 => NONE, 3 => ACTIVE_ACTIVE", + "type": "int", + "default": 0 + }, + { + "name": "bufferReplayPolicy", + "type": "int", + "doc": "Policy that will be used during buffer replay. rewindTimeInSeconds defines the delta. 0 => REWIND_FROM_EOP (replay from 'EOP - rewindTimeInSeconds'), 1 => REWIND_FROM_SOP (replay from 'SOP - rewindTimeInSeconds')", + "default": 0 + } + ] + } + ], + "default": null + }, + { + "name": "accessControlled", + "type": "boolean", + "default": false + }, + { + "name": "compressionStrategy", + "doc": "Using int because Avro Enums are not evolvable", + "type": "int", + "default": 0 + }, + { + "name": "chunkingEnabled", + "type": "boolean", + "default": false + }, + { + "name": "rmdChunkingEnabled", + "type": "boolean", + "default": false + }, + { + "name": "singleGetRouterCacheEnabled", + "aliases": ["routerCacheEnabled"], + "type": "boolean", + "default": false + }, + { + "name": "batchGetRouterCacheEnabled", + "type": "boolean", + "default": false + }, + { + "name": "batchGetLimit", + "doc": "The max key number allowed in batch get request, and Venice will use cluster-level config if the limit (not positive) is not valid", + "type": "int", + "default": -1 + }, + { + "name": "numVersionsToPreserve", + "doc": "The max number of versions the store should preserve. Venice will use cluster-level config if the number is 0 here.", + "type": "int", + "default": 0 + }, + { + "name": "incrementalPushEnabled", + "doc": "a flag to see if the store supports incremental push or not", + "type": "boolean", + "default": false + }, + { + "name": "separateRealTimeTopicEnabled", + "doc": "Flag to see if the store supports separate real-time topic for incremental push.", + "type": "boolean", + "default": false + }, + { + "name": "isMigrating", + "doc": "Whether or not the store is in the process of migration", + "type": "boolean", + "default": false + }, + { + "name": "writeComputationEnabled", + "doc": "Whether write-path computation feature is enabled for this store", + "type": "boolean", + "default": false + }, + { + "name": "replicationMetadataVersionID", + "doc": "RMD (Replication metadata) version ID on the store-level. Default -1 means NOT_SET and the cluster-level RMD version ID should be used for stores.", + "type": "int", + "default": -1 + }, + { + "name": "readComputationEnabled", + "doc": "Whether read-path computation feature is enabled for this store", + "type": "boolean", + "default": false + }, + { + "name": "bootstrapToOnlineTimeoutInHours", + "doc": "Maximum number of hours allowed for the store to transition from bootstrap to online state", + "type": "int", + "default": 24 + }, + { + "name": "leaderFollowerModelEnabled", + "doc": "Whether or not to use leader follower state transition model for upcoming version", + "type": "boolean", + "default": false + }, + { + "name": "backupStrategy", + "doc": "Strategies to store backup versions.", + "type": "int", + "default": 0 + }, + { + "name": "clientDecompressionEnabled", + "type": "boolean", + "default": true + }, + { + "name": "schemaAutoRegisterFromPushJobEnabled", + "type": "boolean", + "default": false + }, + { + "name": "hybridStoreOverheadBypass", + "type": "boolean", + "default": false + }, + { + "name": "hybridStoreDiskQuotaEnabled", + "doc": "Whether or not to enable disk storage quota for a hybrid store", + "type": "boolean", + "default": false + }, + { + "name": "ETLStoreConfig", + "type": [ + "null", + { + "name": "ETLStoreConfigRecord", + "type": "record", + "fields": [ + { + "name": "etledUserProxyAccount", + "type": ["null", "string"] + }, + { + "name": "regularVersionETLEnabled", + "type": "boolean" + }, + { + "name": "futureVersionETLEnabled", + "type": "boolean" + } + ] + } + ], + "default": null + }, + { + "name": "partitionerConfig", + "type": [ + "null", + { + "name": "PartitionerConfigRecord", + "type": "record", + "fields": [ + { + "name": "partitionerClass", + "type": "string" + }, + { + "name": "partitionerParams", + "type": { + "type": "map", + "values": "string" + } + }, + { + "name": "amplificationFactor", + "type": "int" + } + ] + } + ], + "default": null + }, + { + "name": "nativeReplicationEnabled", + "type": "boolean", + "default": false + }, + { + "name": "pushStreamSourceAddress", + "type": ["null", "string"], + "default": null + }, + { + "name": "largestUsedVersionNumber", + "type": ["null", "int"], + "default": null + }, + { + "name": "incrementalPushPolicy", + "doc": "Incremental Push Policy to reconcile with real time pushes. Using int because Avro Enums are not evolvable 0 => PUSH_TO_VERSION_TOPIC, 1 => INCREMENTAL_PUSH_SAME_AS_REAL_TIME", + "type": "int", + "default": 0 + }, + { + "name": "backupVersionRetentionMs", + "type": "long", + "doc": "Backup version retention time after a new version is promoted to the current version, if not specified, Venice will use the configured retention as the default policy", + "default": -1 + }, + { + "name": "replicationFactor", + "doc": "number of replica each store version will have", + "type": "int", + "default": 3 + }, + { + "name": "migrationDuplicateStore", + "doc": "Whether or not the store is a duplicate store in the process of migration", + "type": "boolean", + "default": false + }, + { + "name": "nativeReplicationSourceFabric", + "doc": "The source fabric to be used when the store is running in Native Replication mode.", + "type": ["null", "string"], + "default": null + }, + { + "name": "activeActiveReplicationEnabled", + "doc": "A command option to enable/disable Active/Active replication feature for a store", + "type": "boolean", + "default": false + }, + { + "name": "disableMetaStore", + "doc": "An UpdateStore command option to disable the companion meta system store", + "type": "boolean", + "default": false + }, + { + "name": "disableDavinciPushStatusStore", + "doc": "An UpdateStore command option to disable the companion davinci push status store", + "type": "boolean", + "default": false + }, + { + "name": "applyTargetVersionFilterForIncPush", + "doc": "An UpdateStore command option to enable/disable applying the target version filter for incremental pushes", + "type": "boolean", + "default": false + }, + { + "name": "updatedConfigsList", + "doc": "The list that contains all updated configs by the UpdateStore command. Most of the fields in UpdateStore are not optional, and changing those fields to Optional (Union) is not a backward compatible change, so we have to add an addition array field to record all updated configs in parent controller.", + "type": { + "type": "array", + "items": "string" + }, + "default": [] + }, + { + "name": "replicateAllConfigs", + "doc": "A flag to indicate whether all store configs in parent cluster will be replicated to child clusters; true by default, so that existing UpdateStore messages in Admin topic will behave the same as before.", + "type": "boolean", + "default": true + }, + { + "name": "regionsFilter", + "doc": "A list of regions that will be impacted by the UpdateStore command", + "type": ["null", "string"], + "default": null + }, + { + "name": "storagePersona", + "doc": "The name of the StoragePersona to add to the store", + "type": ["null", "string"], + "default": null + }, + { + "name": "views", + "doc": "A map of views which describe and configure a downstream view of a venice store. Keys in this map are for convenience of managing configs.", + "type": ["null", + { + "type":"map", + "java-key-class": "java.lang.String", + "avro.java.string": "String", + "values": { + "name": "StoreViewConfigRecord", + "type": "record", + "doc": "A configuration for a particular view. This config should inform Venice leaders how to transform and transmit data to destination views.", + "fields": [ + { + "name": "viewClassName", + "type": "string", + "doc": "This informs what kind of view we are materializing. This then informs what kind of parameters are passed to parse this input. This is expected to be a fully formed class path name for materialization.", + "default": "" + }, + { + "name": "viewParameters", + "doc": "Optional parameters to be passed to the given view config.", + "type": ["null", + { + "type": "map", + "java-key-class": "java.lang.String", + "avro.java.string": "String", + "values": { "type": "string", "avro.java.string": "String" } + } + ], + "default": null + } + ] + } + }], + "default": null + }, + { + "name": "latestSuperSetValueSchemaId", + "doc": "The schema id for the latest superset schema", + "type" : "int", + "default": -1 + }, + { + "name": "storageNodeReadQuotaEnabled", + "doc": "Whether storage node read quota is enabled for this store", + "type": "boolean", + "default": false + }, + { + "name": "minCompactionLagSeconds", + "doc": "Store-level version topic min compaction lag", + "type": "long", + "default": -1 + }, + { + "name": "maxCompactionLagSeconds", + "doc": "Store-level version topic max compaction lag", + "type": "long", + "default": -1 + }, + { + "name": "maxRecordSizeBytes", + "doc": "Store-level maximum size of any record in bytes for batch push jobs", + "type": "int", + "default": -1 + }, + { + "name": "maxNearlineRecordSizeBytes", + "doc": "Store-level maximum size of any record in bytes for nearline jobs with partial updates", + "type": "int", + "default": -1 + }, + { + "name": "unusedSchemaDeletionEnabled", + "doc": "Whether unused schema deletion is enabled or not.", + "type": "boolean", + "default": false + }, + { + "name": "blobTransferEnabled", + "doc": "Flag to indicate if the blob transfer is allowed or not", + "type": "boolean", + "default": false + } + ] + }, + { + "name": "DeleteStore", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "largestUsedVersionNumber", + "type": "int" + } + ] + }, + { + "name": "DeleteOldVersion", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "versionNum", + "type": "int" + } + ] + }, + { + "name": "MigrateStore", + "type": "record", + "fields": [ + { + "name": "srcClusterName", + "type": "string" + }, + { + "name": "destClusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "AbortMigration", + "type": "record", + "fields": [ + { + "name": "srcClusterName", + "type": "string" + }, + { + "name": "destClusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "AddVersion", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "pushJobId", + "type": "string" + }, + { + "name": "versionNum", + "type": "int" + }, + { + "name": "numberOfPartitions", + "type": "int" + }, + { + "name": "pushType", + "doc": "The push type of the new version, 0 => BATCH, 1 => STREAM_REPROCESSING. Previous add version messages will default to BATCH and this is a safe because they were created when BATCH was the only version type", + "type": "int", + "default": 0 + }, + { + "name": "pushStreamSourceAddress", + "type": ["null", "string"], + "default": null + }, + { + "name": "rewindTimeInSecondsOverride", + "doc": "The overridable rewind time config for this specific version of a hybrid store, and if it is not specified, the new version will use the store-level rewind time config", + "type": "long", + "default": -1 + }, + { + "name": "timestampMetadataVersionId", + "doc": "The A/A metadata schema version ID that will be used to deserialize metadataPayload.", + "type": "int", + "default": -1 + }, + { + "name": "versionSwapDeferred", + "doc": "Indicates if swapping this version to current version after push completion should be initiated or not", + "type": "boolean", + "default": false + }, + { + "name": "targetedRegions", + "doc": "The list of regions that is separated by comma for targeted region push. If set, this admin message should only be consumed by the targeted regions", + "type": [ + "null", + { + "type": "array", + "items": "string" + } + ], + "default": null + }, + { + "name": "repushSourceVersion", + "doc": "Indicates the source version from which a repush version is created", + "type": "int", + "default": -1 + } + ] + }, + { + "name": "DerivedSchemaCreation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "schema", + "type": "SchemaMeta" + }, + { + "name": "valueSchemaId", + "type": "int" + }, + { + "name": "derivedSchemaId", + "type": "int" + } + ] + }, + { + "name": "SupersetSchemaCreation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "valueSchema", + "type": "SchemaMeta" + }, + { + "name": "valueSchemaId", + "type": "int" + }, + { + "name": "supersetSchema", + "type": "SchemaMeta" + }, + { + "name": "supersetSchemaId", + "type": "int" + } + ] + }, + { + "name": "ConfigureNativeReplicationForCluster", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeType", + "type": "string" + }, + { + "name": "enabled", + "type": "boolean" + }, + { + "name": "nativeReplicationSourceRegion", + "doc": "The source region to be used when the store is running in Native Replication mode.", + "type": ["null", "string"], + "default": null + }, + { + "name": "regionsFilter", + "type": ["null", "string"], + "default": null + } + ] + }, + { + "name": "MetadataSchemaCreation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "valueSchemaId", + "type": "int" + }, + { + "name": "metadataSchema", + "type": "SchemaMeta" + }, + { + "name": "timestampMetadataVersionId", + "type": "int", + "aliases": ["metadataVersionId"], + "default": -1 + } + ] + }, + { + "name": "ConfigureActiveActiveReplicationForCluster", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeType", + "type": "string" + }, + { + "name": "enabled", + "type": "boolean" + }, + { + "name": "regionsFilter", + "type": ["null", "string"], + "default": null + } + ] + }, { + "name": "ConfigureIncrementalPushForCluster", + "doc": "A command to migrate all incremental push stores in a cluster to a specific incremental push policy.", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "incrementalPushPolicyToFilter", + "doc": "If this batch update command is trying to configure existing incremental push store type, their incremental push policy should also match this filter before the batch update command applies any change to them. Default value is -1, meaning there is no filter.", + "type": "int", + "default": -1 + }, + { + "name": "incrementalPushPolicyToApply", + "doc": "This field will determine what incremental push policy will be applied to the selected stores. Default value is 1, which is the INCREMENTAL_PUSH_SAME_AS_REAL_TIME policy", + "type": "int", + "default": 1 + }, + { + "name": "regionsFilter", + "type": ["null", "string"], + "default": null + } + ] + }, { + "name": "MetaSystemStoreAutoCreationValidation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, { + "name": "PushStatusSystemStoreAutoCreationValidation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, { + "name": "CreateStoragePersona", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "name", + "type": "string" + }, + { + "name": "quotaNumber", + "type": "long" + }, + { + "name": "storesToEnforce", + "type": { + "type": "array", + "items": "string", + "default": [] + } + }, + { + "name": "owners", + "type": { + "type": "array", + "items": "string", + "default": [] + } + } + ] + }, { + "name": "DeleteStoragePersona", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "name", + "type": "string" + } + ] + }, { + "name": "UpdateStoragePersona", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, { + "name": "name", + "type": "string" + }, { + "name": "quotaNumber", + "type": ["null","long"], + "default": null + }, { + "name": "storesToEnforce", + "type": [ + "null", + { + "type": "array", + "items": "string" + } + ], + "default": null + }, { + "name": "owners", + "type": [ + "null", + { + "type": "array", + "items": "string" + } + ], + "default": null + } + ] + }, + { + "name": "DeleteUnusedValueSchemas", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "schemaIds", + "type": { + "type": "array", + "items": "int", + "default": [] + } + } + ] + }, + { + "name": "RollbackCurrentVersion", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "regionsFilter", + "doc": "A list of regions that will be impacted by the RollbackCurrentVersion command", + "type": ["null", "string"], + "default": null + } + ] + }, + { + "name": "RollForwardCurrentVersion", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "regionsFilter", + "doc": "A list of regions that will be impacted by the RollForwardCurrentVersion command", + "type": ["null", "string"], + "default": null + } + ] + } + ] + } + ] +} diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/CreateVersionTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/CreateVersionTest.java index a0776fb802..8220f19f0c 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/CreateVersionTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/CreateVersionTest.java @@ -145,8 +145,8 @@ public void testCreateVersionWithACL(boolean checkReadMethod) throws Exception { } } - @Test(description = "requestTopicForPushing should return an RT topic when store is hybrid and inc-push is enabled") - public void testRequestTopicForHybridIncPushEnabled() throws Exception { + @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, description = "requestTopicForPushing should return an RT topic when store is hybrid and inc-push is enabled") + public void testRequestTopicForHybridIncPushEnabled(boolean isSeparateTopicEnabled) throws Exception { doReturn(true).when(admin).whetherEnableBatchPushFromAdmin(STORE_NAME); doCallRealMethod().when(request).queryParamOrDefault(any(), any()); doReturn(true).when(accessClient).isAllowlistUsers(certificate, STORE_NAME, HTTP_GET); @@ -156,6 +156,7 @@ public void testRequestTopicForHybridIncPushEnabled() throws Exception { doReturn(store).when(admin).getStore(CLUSTER_NAME, STORE_NAME); Version version = new VersionImpl(STORE_NAME, 1, JOB_ID); + version.setSeparateRealTimeTopicEnabled(isSeparateTopicEnabled); doReturn(version).when(admin) .incrementVersionIdempotent( CLUSTER_NAME, @@ -186,7 +187,11 @@ public void testRequestTopicForHybridIncPushEnabled() throws Exception { assertNotNull(result); VersionCreationResponse versionCreateResponse = OBJECT_MAPPER.readValue(result.toString(), VersionCreationResponse.class); - assertEquals(versionCreateResponse.getKafkaTopic(), "test_store_rt"); + if (isSeparateTopicEnabled) { + assertEquals(versionCreateResponse.getKafkaTopic(), Version.composeSeparateRealTimeTopic(STORE_NAME)); + } else { + assertEquals(versionCreateResponse.getKafkaTopic(), Version.composeRealTimeTopic(STORE_NAME)); + } } // A store should never end up in the state where inc-push is enabled but hybrid configs are not set, nevertheless