Skip to content

Commit

Permalink
share the same table config object (apache#12463)
Browse files Browse the repository at this point in the history
  • Loading branch information
klsince authored Feb 22, 2024
1 parent 133ad9e commit 6b0cfeb
Showing 1 changed file with 5 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,7 @@ protected void doInit() {
// Set up dedup/upsert metadata manager
// NOTE: Dedup/upsert has to be set up when starting the server. Changing the table config without restarting the
// server won't enable/disable them on the fly.
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", _tableNameWithType);

DedupConfig dedupConfig = tableConfig.getDedupConfig();
DedupConfig dedupConfig = _tableConfig.getDedupConfig();
boolean dedupEnabled = dedupConfig != null && dedupConfig.isDedupEnabled();
if (dedupEnabled) {
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
Expand All @@ -197,10 +194,10 @@ protected void doInit() {
List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
Preconditions.checkState(!CollectionUtils.isEmpty(primaryKeyColumns),
"Primary key columns must be configured for dedup");
_tableDedupMetadataManager = TableDedupMetadataManagerFactory.create(tableConfig, schema, this, _serverMetrics);
_tableDedupMetadataManager = TableDedupMetadataManagerFactory.create(_tableConfig, schema, this, _serverMetrics);
}

UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
UpsertConfig upsertConfig = _tableConfig.getUpsertConfig();
if (upsertConfig != null && upsertConfig.getMode() != UpsertConfig.Mode.NONE) {
Preconditions.checkState(!dedupEnabled, "Dedup and upsert cannot be both enabled for table: %s",
_tableUpsertMetadataManager);
Expand All @@ -209,8 +206,8 @@ protected void doInit() {
// NOTE: Set _tableUpsertMetadataManager before initializing it because when preloading is enabled, we need to
// load segments into it
_tableUpsertMetadataManager =
TableUpsertMetadataManagerFactory.create(tableConfig, _instanceDataManagerConfig.getUpsertConfig());
_tableUpsertMetadataManager.init(tableConfig, schema, this, _helixManager, _segmentPreloadExecutor);
TableUpsertMetadataManagerFactory.create(_tableConfig, _instanceDataManagerConfig.getUpsertConfig());
_tableUpsertMetadataManager.init(_tableConfig, schema, this, _helixManager, _segmentPreloadExecutor);
}

// For dedup and partial-upsert, need to wait for all segments loaded before starting consuming data
Expand Down

0 comments on commit 6b0cfeb

Please sign in to comment.