Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix schema name in table config during controller startup #11574

Merged
merged 3 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,22 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
MAX_RECORDS_LAG("maxRecordsLag", false),

// Consumption availability lag in ms at a partition level
MAX_RECORD_AVAILABILITY_LAG_MS("maxRecordAvailabilityLagMs", false);
MAX_RECORD_AVAILABILITY_LAG_MS("maxRecordAvailabilityLagMs", false),

// Number of table schema got misconfigured
MISCONFIGURED_SCHEMA_TABLE_COUNT("misconfiguredSchemaTableCount", true),

// Number of table without schema
TABLE_WITHOUT_SCHEMA_COUNT("tableWithoutSchemaCount", true),

// Number of table schema got fixed
FIXED_SCHEMA_TABLE_COUNT("fixedSchemaTableCount", true),

// Number of tables that we want to fix but failed to copy schema from old schema name to new schema name
FAILED_TO_COPY_SCHEMA_COUNT("failedToCopySchemaCount", true),

// Number of tables that we want to fix but failed to update table config
FAILED_TO_UPDATE_TABLE_CONFIG_COUNT("failedToUpdateTableConfigCount", true);

private final String _gaugeName;
private final String _unit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
Expand All @@ -47,13 +49,16 @@
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.MasterSlaveSMD;
import org.apache.helix.model.Message;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.http.config.SocketConfig;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.function.FunctionRegistry;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
Expand All @@ -62,9 +67,11 @@
import org.apache.pinot.common.minion.TaskGeneratorMostRecentRunInfo;
import org.apache.pinot.common.minion.TaskManagerStatusCache;
import org.apache.pinot.common.utils.PinotAppConfigs;
import org.apache.pinot.common.utils.SchemaUtils;
import org.apache.pinot.common.utils.ServiceStartableUtils;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.TlsUtils;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.helix.LeadControllerUtils;
Expand Down Expand Up @@ -105,6 +112,7 @@
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.crypt.PinotCrypterFactory;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
Expand All @@ -117,7 +125,9 @@
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.InstanceTypeUtils;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
import org.apache.zookeeper.data.Stat;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -525,6 +535,10 @@ protected void configure() {
throw new RuntimeException("Unable to start controller due to existing HLC tables!");
}

// One time job to fix schema name in all tables
// This method can be removed after the next major release.
fixSchemaNameInTableConfig();

_controllerMetrics.addCallbackGauge("dataDir.exists", () -> new File(_config.getDataDir()).exists() ? 1L : 0L);
_controllerMetrics.addCallbackGauge("dataDir.fileOpLatencyMs", () -> {
File dataDir = new File(_config.getDataDir());
Expand All @@ -549,6 +563,109 @@ protected void configure() {
_serviceStatusCallbackList.add(generateServiceStatusCallback(_helixParticipantManager));
}

/**
* This method is used to fix table/schema names.
* TODO: in the next release, maybe 2.0.0, we can remove this method. Meanwhile we can delete the orphan schemas
* that has been existed longer than a certain time period.
*
*/
@VisibleForTesting
public void fixSchemaNameInTableConfig() {
AtomicInteger misconfiguredTableCount = new AtomicInteger();
AtomicInteger tableWithoutSchemaCount = new AtomicInteger();
AtomicInteger fixedSchemaTableCount = new AtomicInteger();
AtomicInteger failedToCopySchemaCount = new AtomicInteger();
AtomicInteger failedToUpdateTableConfigCount = new AtomicInteger();
ZkHelixPropertyStore<ZNRecord> propertyStore = _helixResourceManager.getPropertyStore();

List<String> allTables = _helixResourceManager.getAllTables();
allTables.forEach(tableNameWithType -> {
String tableConfigPath = ZKMetadataProvider.constructPropertyStorePathForResourceConfig(tableNameWithType);
Stat tableConfigStat = new Stat();
ZNRecord tableConfigZNRecord = propertyStore.get(tableConfigPath, tableConfigStat, AccessOption.PERSISTENT);
if (tableConfigZNRecord == null) {
// This might due to table deletion, just log it here.
LOGGER.warn("Failed to find table config for table: {}, the table likely already got deleted",
tableNameWithType);
return;
}
TableConfig tableConfig;
try {
tableConfig = TableConfigUtils.fromZNRecord(tableConfigZNRecord);
} catch (Exception e) {
LOGGER.error("Caught exception constructing table config from ZNRecord for table: {}", tableNameWithType, e);
return;
}
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
String schemaPath = ZKMetadataProvider.constructPropertyStorePathForSchema(rawTableName);
boolean schemaExists = propertyStore.exists(schemaPath, AccessOption.PERSISTENT);
String existSchemaName = tableConfig.getValidationConfig().getSchemaName();
if (existSchemaName == null || existSchemaName.equals(rawTableName)) {
// Although the table config is valid, we still need to ensure the schema exists
if (!schemaExists) {
LOGGER.warn("Failed to find schema for table: {}", tableNameWithType);
tableWithoutSchemaCount.getAndIncrement();
return;
}
// Table config is already in good status
return;
}
misconfiguredTableCount.getAndIncrement();
if (schemaExists) {
// If a schema named `rawTableName` already exists, then likely this is a misconfiguration.
// Reset schema name in table config to null to let the table point to the existing schema.
LOGGER.warn("Schema: {} already exists, fix the schema name in table config from {} to null", rawTableName,
existSchemaName);
} else {
// Copy the schema current table referring to to `rawTableName` if it does not exist
Schema schema = _helixResourceManager.getSchema(existSchemaName);
if (schema == null) {
LOGGER.warn("Failed to find schema: {} for table: {}", existSchemaName, tableNameWithType);
tableWithoutSchemaCount.getAndIncrement();
return;
}
schema.setSchemaName(rawTableName);
if (propertyStore.create(schemaPath, SchemaUtils.toZNRecord(schema), AccessOption.PERSISTENT)) {
LOGGER.info("Copied schema: {} to {}", existSchemaName, rawTableName);
xiangfu0 marked this conversation as resolved.
Show resolved Hide resolved
} else {
LOGGER.warn("Failed to copy schema: {} to {}", existSchemaName, rawTableName);
failedToCopySchemaCount.getAndIncrement();
return;
}
}
// Update table config to remove schema name
tableConfig.getValidationConfig().setSchemaName(null);
try {
tableConfigZNRecord = TableConfigUtils.toZNRecord(tableConfig);
} catch (Exception e) {
LOGGER.error("Caught exception constructing ZNRecord from table config for table: {}", tableNameWithType, e);
return;
}
if (propertyStore.set(tableConfigPath, tableConfigZNRecord, tableConfigStat.getVersion(),
AccessOption.PERSISTENT)) {
LOGGER.info("Removed schema name from table config for table: {}", tableNameWithType);
fixedSchemaTableCount.getAndIncrement();
} else {
LOGGER.warn("Failed to update table config for table: {}", tableNameWithType);
failedToUpdateTableConfigCount.getAndIncrement();
}
});
LOGGER.info(
"Found {} tables misconfigured, {} tables without schema. Successfully fixed schema for {} tables, failed to "
+ "fix {} tables due to copy schema failure, failed to fix {} tables due to update table config failure.",
misconfiguredTableCount.get(), tableWithoutSchemaCount.get(), fixedSchemaTableCount.get(),
failedToCopySchemaCount.get(), failedToUpdateTableConfigCount.get());

_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT,
misconfiguredTableCount.get());
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT, tableWithoutSchemaCount.get());
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FIXED_SCHEMA_TABLE_COUNT, fixedSchemaTableCount.get());
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT,
failedToCopySchemaCount.get());
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT,
failedToUpdateTableConfigCount.get());
}

private ServiceStatus.ServiceStatusCallback generateServiceStatusCallback(HelixManager helixManager) {
return new ServiceStatus.ServiceStatusCallback() {
private volatile boolean _isStarted = false;
Expand Down
Loading
Loading