From 45e62ec3e1b6df5506f747bd9a6950abcaab225d Mon Sep 17 00:00:00 2001 From: Xiang Fu Date: Tue, 12 Sep 2023 17:24:53 +0800 Subject: [PATCH 1/3] Adding a controller task to update schema name --- .../pinot/common/metrics/ControllerGauge.java | 17 +- .../controller/BaseControllerStarter.java | 94 ++++++ .../SchemaCleanupTaskStatelessTest.java | 285 ++++++++++++++++++ 3 files changed, 395 insertions(+), 1 deletion(-) create mode 100644 pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/SchemaCleanupTaskStatelessTest.java diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index 64ced0203c0..d08155e3f80 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -132,7 +132,22 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { MAX_RECORDS_LAG("maxRecordsLag", false), // Consumption availability lag in ms at a partition level - MAX_RECORD_AVAILABILITY_LAG_MS("maxRecordAvailabilityLagMs", false); + MAX_RECORD_AVAILABILITY_LAG_MS("maxRecordAvailabilityLagMs", false), + + // Number of table schema got fixed + FIXED_SCHEMA_TABLE_COUNT("FixedSchemaTableCount", true), + + // Number of table without schema + TABLE_WITHOUT_SCHEMA_COUNT("tableWithoutSchemaCount", true), + + // Number of tables that we want to fix but failed to copy schema from old schema name to new schema name + FAILED_TO_COPY_SCHEMA("failedToCopySchema", true), + + // Number of tables that we want to fix but failed to update table config + FAILED_TO_UPDATE_TABLE_CONFIG("failedToUpdateTableConfig", true), + + // Number of table schema got misconfigured + MISCONFIGURED_SCHEMA_TABLE_COUNT("misconfiguredSchemaTableCount", true); private final String _gaugeName; private final String _unit; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 240e8d662b0..65fd1e44a27 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -35,6 +35,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.helix.HelixManager; @@ -105,6 +106,7 @@ import org.apache.pinot.core.util.ListenerConfigUtil; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.crypt.PinotCrypterFactory; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.metrics.PinotMetricUtils; @@ -117,6 +119,7 @@ import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.InstanceTypeUtils; import org.apache.pinot.spi.utils.NetUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory; import org.glassfish.hk2.utilities.binding.AbstractBinder; import org.slf4j.Logger; @@ -525,6 +528,10 @@ protected void configure() { throw new RuntimeException("Unable to start controller due to existing HLC tables!"); } + // One time job to fix schema name in all tables + // This method can be removed after the next major release. + fixSchemaNameInTableConfig(); + _controllerMetrics.addCallbackGauge("dataDir.exists", () -> new File(_config.getDataDir()).exists() ? 1L : 0L); _controllerMetrics.addCallbackGauge("dataDir.fileOpLatencyMs", () -> { File dataDir = new File(_config.getDataDir()); @@ -549,6 +556,93 @@ protected void configure() { _serviceStatusCallbackList.add(generateServiceStatusCallback(_helixParticipantManager)); } + /** + * This method is used to fix table/schema names. + * TODO: in the next release, maybe 2.0.0, we can remove this method. Meanwhile we can delete the orphan schemas + * that has been existed longer than a certain time period. + * + */ + @VisibleForTesting + public void fixSchemaNameInTableConfig() { + AtomicInteger fixedSchemaTableCount = new AtomicInteger(); + AtomicInteger tableWithoutSchemaCount = new AtomicInteger(); + AtomicInteger failedToCopySchemaCount = new AtomicInteger(); + AtomicInteger failedToUpdateTableConfigCount = new AtomicInteger(); + AtomicInteger misConfiguredTableCount = new AtomicInteger(); + List allTables = _helixResourceManager.getAllTables(); + + allTables.forEach(table -> { + TableConfig tableConfig = _helixResourceManager.getTableConfig(table); + if ((tableConfig == null) || (tableConfig.getValidationConfig() == null)) { + // This might due to table deletion, just log it here. + LOGGER.warn("Failed to find table config for table: {}, the table likely already got deleted", table); + return; + } + String rawTableName = TableNameBuilder.extractRawTableName(tableConfig.getTableName()); + String existSchemaName = tableConfig.getValidationConfig().getSchemaName(); + if (existSchemaName == null || existSchemaName.equals(rawTableName)) { + // Although the table config is valid, we still need to ensure the schema exists + if (_helixResourceManager.getSchema(rawTableName) == null) { + LOGGER.warn("Failed to find schema for table: {}", rawTableName); + tableWithoutSchemaCount.getAndIncrement(); + return; + } + // Table config is already in good status + return; + } + if (_helixResourceManager.getSchema(rawTableName) != null) { + // If a schema named `rawTableName` already exists, then likely this is a misconfiguration. + // Reset schema name in table config to null to let the table point to the existing schema. + LOGGER.warn("Schema: {} already exists, fix the schema name in table config from {} to null", rawTableName, + existSchemaName); + misConfiguredTableCount.getAndIncrement(); + } else { + // Copy the schema current table referring to to `rawTableName` if it does not exist + Schema schema = _helixResourceManager.getSchema(existSchemaName); + if (schema == null) { + LOGGER.warn("Failed to find schema for schema name: {}, tale name: {}", existSchemaName, table); + misConfiguredTableCount.getAndIncrement(); + tableWithoutSchemaCount.getAndIncrement(); + return; + } + schema.setSchemaName(rawTableName); + try { + _helixResourceManager.addSchema(schema, false, false); + LOGGER.info("Copied schema: {} to {}", existSchemaName, rawTableName); + } catch (Exception e) { + LOGGER.error("Failed to copy schema: {} to {}", existSchemaName, rawTableName, e); + failedToCopySchemaCount.getAndIncrement(); + return; + } + } + // Update table config to remove schema name + tableConfig.getValidationConfig().setSchemaName(null); + try { + _helixResourceManager.updateTableConfig(tableConfig); + LOGGER.info("Removed schema name from table config for table: {}", table); + fixedSchemaTableCount.getAndIncrement(); + } catch (IOException e) { + failedToUpdateTableConfigCount.getAndIncrement(); + LOGGER.error("Failed to remove schema name from table config for: {}", tableConfig.getTableName(), e); + } + }); + LOGGER.info( + "Found {} tables are misconfigured, {} tables without schema. " + + "Successfully fixed schema for {} tables, " + + "failed to fix {} tables due to copy schema failure, " + + "failed to fix {} tables due to update table config failure.", + misConfiguredTableCount.get(), tableWithoutSchemaCount.get(), + fixedSchemaTableCount.get(), failedToCopySchemaCount.get(), failedToUpdateTableConfigCount.get()); + + _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT, + misConfiguredTableCount.get()); + _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT, tableWithoutSchemaCount.get()); + _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FIXED_SCHEMA_TABLE_COUNT, fixedSchemaTableCount.get()); + _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FAILED_TO_COPY_SCHEMA, failedToCopySchemaCount.get()); + _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG, + failedToUpdateTableConfigCount.get()); + } + private ServiceStatus.ServiceStatusCallback generateServiceStatusCallback(HelixManager helixManager) { return new ServiceStatus.ServiceStatusCallback() { private volatile boolean _isStarted = false; diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/SchemaCleanupTaskStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/SchemaCleanupTaskStatelessTest.java new file mode 100644 index 00000000000..4d58f4a53c7 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/SchemaCleanupTaskStatelessTest.java @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.cleanup; + +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.pinot.common.metrics.ControllerGauge; +import org.apache.pinot.common.metrics.MetricValueUtils; +import org.apache.pinot.common.utils.config.TagNameUtils; +import org.apache.pinot.controller.BaseControllerStarter; +import org.apache.pinot.controller.helix.ControllerTest; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.metrics.PinotMetricUtils; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.NetUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + + +/** + * This test can be deleted once {@link BaseControllerStarter#fixSchemaNameInTableConfig()} is deleted. Likely in 2.0.0. + */ +@Test(groups = "stateless") +public class SchemaCleanupTaskStatelessTest extends ControllerTest { + @BeforeClass + public void setup() + throws Exception { + startZk(); + startController(); + startFakeBroker(); + startFakeServer(); + } + + private void startFakeBroker() + throws Exception { + String brokerInstance = CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE + NetUtils.getHostAddress() + "_" + + CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT; + + // Create server instance with the fake server state model + HelixManager brokerHelixManager = + HelixManagerFactory.getZKHelixManager(getHelixClusterName(), brokerInstance, InstanceType.PARTICIPANT, + getZkUrl()); + brokerHelixManager.connect(); + + // Add Helix tag to the server + brokerHelixManager.getClusterManagmentTool().addInstanceTag(getHelixClusterName(), brokerInstance, + TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME)); + } + + private void startFakeServer() + throws Exception { + String serverInstance = CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE + NetUtils.getHostAddress() + "_" + + CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT; + + // Create server instance with the fake server state model + HelixManager serverHelixManager = HelixManagerFactory + .getZKHelixManager(getHelixClusterName(), serverInstance, InstanceType.PARTICIPANT, getZkUrl()); + serverHelixManager.connect(); + + // Add Helix tag to the server + serverHelixManager.getClusterManagmentTool().addInstanceTag(getHelixClusterName(), serverInstance, + TableNameBuilder.OFFLINE.tableNameWithType(TagNameUtils.DEFAULT_TENANT_NAME)); + } + + @AfterClass + public void teardown() { + stopController(); + stopZk(); + } + + @Test + public void testSchemaCleanupTask() + throws Exception { + PinotMetricUtils.cleanUp(); + // 1. Add a schema + addSchema(createDummySchema("t1")); + addSchema(createDummySchema("t2")); + addSchema(createDummySchema("t3")); + + // 2. Add a table with the schema name reference + addTableConfig(createDummyTableConfig("t1", "t1")); + addTableConfig(createDummyTableConfig("t2", "t2")); + addTableConfig(createDummyTableConfig("t3", "t3")); + + _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t1", "t2")); + _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t2", "t3")); + _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t3", "t1")); + + // 3. Fix table schema + _controllerStarter.fixSchemaNameInTableConfig(); + + // 4. validate + assertEquals(getHelixResourceManager().getAllTables().size(), 3); + assertEquals(getHelixResourceManager().getSchemaNames().size(), 3); + + assertNull(getHelixResourceManager().getTableConfig("t1_OFFLINE").getValidationConfig().getSchemaName()); + assertNull(getHelixResourceManager().getTableConfig("t2_OFFLINE").getValidationConfig().getSchemaName()); + assertNull(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName()); + + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT), 3); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.FIXED_SCHEMA_TABLE_COUNT), 3); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT), 0); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.FAILED_TO_COPY_SCHEMA), 0); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG), 0); + + // 5. Clean up + for (String table : getHelixResourceManager().getAllOfflineTables()) { + getHelixResourceManager().deleteOfflineTable(table); + } + for (String schema : getHelixResourceManager().getSchemaNames()) { + getHelixResourceManager().deleteSchema(schema); + } + } + + @Test + public void testSchemaCleanupTaskNormalCase() + throws Exception { + PinotMetricUtils.cleanUp(); + // 1. Add a schema + addSchema(createDummySchema("t1")); + addSchema(createDummySchema("t2")); + addSchema(createDummySchema("t3")); + + assertEquals(getHelixResourceManager().getSchemaNames().size(), 3); + + // 2. Add a table with the schema name reference + addTableConfig(createDummyTableConfig("t1", "t1")); + addTableConfig(createDummyTableConfig("t2", "t2")); + addTableConfig(createDummyTableConfig("t3", "t3")); + + assertEquals(getHelixResourceManager().getAllTables().size(), 3); + + // 3. Create new schemas and update table to new schema + addSchema(createDummySchema("t11")); + addSchema(createDummySchema("t21")); + addSchema(createDummySchema("t31")); + _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t1", "t11")); + _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t2", "t21")); + _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t3", "t31")); + + assertEquals(getHelixResourceManager().getAllTables().size(), 3); + assertEquals(getHelixResourceManager().getSchemaNames().size(), 6); + assertEquals(getHelixResourceManager().getTableConfig("t1_OFFLINE").getValidationConfig().getSchemaName(), "t11"); + assertEquals(getHelixResourceManager().getTableConfig("t2_OFFLINE").getValidationConfig().getSchemaName(), "t21"); + assertEquals(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName(), "t31"); + + // 4. Delete schema t1, t2, t3, so we can check if those schemas are fixed later. + deleteSchema("t1"); + deleteSchema("t2"); + deleteSchema("t3"); + + assertEquals(getHelixResourceManager().getSchemaNames().size(), 3); + + // 5. Fix table schema + _controllerStarter.fixSchemaNameInTableConfig(); + + // 6. All tables will directly set schema. + assertEquals(getHelixResourceManager().getAllTables().size(), 3); + assertEquals(getHelixResourceManager().getSchemaNames().size(), 6); + assertTrue(getHelixResourceManager().getSchemaNames().contains("t1")); + assertTrue(getHelixResourceManager().getSchemaNames().contains("t2")); + assertTrue(getHelixResourceManager().getSchemaNames().contains("t3")); + + assertNull(getHelixResourceManager().getTableConfig("t1_OFFLINE").getValidationConfig().getSchemaName()); + assertNull(getHelixResourceManager().getTableConfig("t2_OFFLINE").getValidationConfig().getSchemaName()); + assertNull(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName()); + + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT), 0); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.FIXED_SCHEMA_TABLE_COUNT), 3); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT), 0); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.FAILED_TO_COPY_SCHEMA), 0); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG), 0); + + // 7. Clean up + for (String table : getHelixResourceManager().getAllOfflineTables()) { + getHelixResourceManager().deleteOfflineTable(table); + } + for (String schema : getHelixResourceManager().getSchemaNames()) { + getHelixResourceManager().deleteSchema(schema); + } + } + + @Test + public void testMissingSchema() + throws Exception { + PinotMetricUtils.cleanUp(); + // 1. Add a schema + addSchema(createDummySchema("t1")); + addSchema(createDummySchema("t2")); + addSchema(createDummySchema("t3")); + + assertEquals(getHelixResourceManager().getSchemaNames().size(), 3); + + // 2. Add a table with the schema name reference + addTableConfig(createDummyTableConfig("t1")); + addTableConfig(createDummyTableConfig("t2")); + addTableConfig(createDummyTableConfig("t3")); + + assertEquals(getHelixResourceManager().getAllTables().size(), 3); + + // 4. Delete schema t1, t2, t3, so we can check if those schemas are fixed later. + deleteSchema("t1"); + deleteSchema("t2"); + deleteSchema("t3"); + + assertEquals(getHelixResourceManager().getSchemaNames().size(), 0); + + // 5. Fix table schema + _controllerStarter.fixSchemaNameInTableConfig(); + + // 6. We cannot fix schema + assertEquals(getHelixResourceManager().getAllTables().size(), 3); + assertEquals(getHelixResourceManager().getSchemaNames().size(), 0); + + assertNull(getHelixResourceManager().getTableConfig("t1_OFFLINE").getValidationConfig().getSchemaName()); + assertNull(getHelixResourceManager().getTableConfig("t2_OFFLINE").getValidationConfig().getSchemaName()); + assertNull(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName()); + + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT), 0); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.FIXED_SCHEMA_TABLE_COUNT), 0); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT), 3); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.FAILED_TO_COPY_SCHEMA), 0); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG), 0); + + // 7. Clean up + for (String table : getHelixResourceManager().getAllOfflineTables()) { + getHelixResourceManager().deleteOfflineTable(table); + } + for (String schema : getHelixResourceManager().getSchemaNames()) { + getHelixResourceManager().deleteSchema(schema); + } + } + + private TableConfig createDummyTableConfig(String table) { + return new TableConfigBuilder(TableType.OFFLINE) + .setTableName(table) + .build(); + } + + private TableConfig createDummyTableConfig(String table, String schema) { + TableConfig tableConfig = createDummyTableConfig(table); + tableConfig.getValidationConfig().setSchemaName(schema); + return tableConfig; + } +} From 00493e2acef77c21c3ac82f97ae36c434476b4e0 Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" Date: Tue, 10 Oct 2023 11:16:05 -0700 Subject: [PATCH 2/3] Misc fixes --- .../pinot/common/metrics/ControllerGauge.java | 14 ++-- .../controller/BaseControllerStarter.java | 84 ++++++++++++------- .../SchemaCleanupTaskStatelessTest.java | 26 +++--- 3 files changed, 73 insertions(+), 51 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index d08155e3f80..887b3421109 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -134,20 +134,20 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { // Consumption availability lag in ms at a partition level MAX_RECORD_AVAILABILITY_LAG_MS("maxRecordAvailabilityLagMs", false), - // Number of table schema got fixed - FIXED_SCHEMA_TABLE_COUNT("FixedSchemaTableCount", true), + // Number of table schema got misconfigured + MISCONFIGURED_SCHEMA_TABLE_COUNT("misconfiguredSchemaTableCount", true), // Number of table without schema TABLE_WITHOUT_SCHEMA_COUNT("tableWithoutSchemaCount", true), + // Number of table schema got fixed + FIXED_SCHEMA_TABLE_COUNT("fixedSchemaTableCount", true), + // Number of tables that we want to fix but failed to copy schema from old schema name to new schema name - FAILED_TO_COPY_SCHEMA("failedToCopySchema", true), + FAILED_TO_COPY_SCHEMA_COUNT("failedToCopySchemaCount", true), // Number of tables that we want to fix but failed to update table config - FAILED_TO_UPDATE_TABLE_CONFIG("failedToUpdateTableConfig", true), - - // Number of table schema got misconfigured - MISCONFIGURED_SCHEMA_TABLE_COUNT("misconfiguredSchemaTableCount", true); + FAILED_TO_UPDATE_TABLE_CONFIG_COUNT("failedToUpdateTableConfigCount", true); private final String _gaugeName; private final String _unit; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 65fd1e44a27..bb34f45618d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.helix.AccessOption; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; @@ -48,13 +49,16 @@ import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.MasterSlaveSMD; import org.apache.helix.model.Message; +import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.task.TaskDriver; +import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.http.config.SocketConfig; import org.apache.http.conn.HttpClientConnectionManager; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.pinot.common.Utils; import org.apache.pinot.common.config.TlsConfig; import org.apache.pinot.common.function.FunctionRegistry; +import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; @@ -63,9 +67,11 @@ import org.apache.pinot.common.minion.TaskGeneratorMostRecentRunInfo; import org.apache.pinot.common.minion.TaskManagerStatusCache; import org.apache.pinot.common.utils.PinotAppConfigs; +import org.apache.pinot.common.utils.SchemaUtils; import org.apache.pinot.common.utils.ServiceStartableUtils; import org.apache.pinot.common.utils.ServiceStatus; import org.apache.pinot.common.utils.TlsUtils; +import org.apache.pinot.common.utils.config.TableConfigUtils; import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.common.utils.helix.LeadControllerUtils; @@ -121,6 +127,7 @@ import org.apache.pinot.spi.utils.NetUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory; +import org.apache.zookeeper.data.Stat; import org.glassfish.hk2.utilities.binding.AbstractBinder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -564,53 +571,64 @@ protected void configure() { */ @VisibleForTesting public void fixSchemaNameInTableConfig() { - AtomicInteger fixedSchemaTableCount = new AtomicInteger(); + AtomicInteger misconfiguredTableCount = new AtomicInteger(); AtomicInteger tableWithoutSchemaCount = new AtomicInteger(); + AtomicInteger fixedSchemaTableCount = new AtomicInteger(); AtomicInteger failedToCopySchemaCount = new AtomicInteger(); AtomicInteger failedToUpdateTableConfigCount = new AtomicInteger(); - AtomicInteger misConfiguredTableCount = new AtomicInteger(); - List allTables = _helixResourceManager.getAllTables(); + ZkHelixPropertyStore propertyStore = _helixResourceManager.getPropertyStore(); - allTables.forEach(table -> { - TableConfig tableConfig = _helixResourceManager.getTableConfig(table); - if ((tableConfig == null) || (tableConfig.getValidationConfig() == null)) { + List allTables = _helixResourceManager.getAllTables(); + allTables.forEach(tableNameWithType -> { + String tableConfigPath = ZKMetadataProvider.constructPropertyStorePathForResourceConfig(tableNameWithType); + Stat tableConfigStat = new Stat(); + ZNRecord tableConfigZNRecord = propertyStore.get(tableConfigPath, tableConfigStat, AccessOption.PERSISTENT); + if (tableConfigZNRecord == null) { // This might due to table deletion, just log it here. - LOGGER.warn("Failed to find table config for table: {}, the table likely already got deleted", table); + LOGGER.warn("Failed to find table config for table: {}, the table likely already got deleted", + tableNameWithType); + return; + } + TableConfig tableConfig; + try { + tableConfig = TableConfigUtils.fromZNRecord(tableConfigZNRecord); + } catch (Exception e) { + LOGGER.error("Caught exception constructing table config from ZNRecord for table: {}", tableNameWithType, e); return; } - String rawTableName = TableNameBuilder.extractRawTableName(tableConfig.getTableName()); + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + String schemaPath = ZKMetadataProvider.constructPropertyStorePathForSchema(rawTableName); + boolean schemaExists = propertyStore.exists(schemaPath, AccessOption.PERSISTENT); String existSchemaName = tableConfig.getValidationConfig().getSchemaName(); if (existSchemaName == null || existSchemaName.equals(rawTableName)) { // Although the table config is valid, we still need to ensure the schema exists - if (_helixResourceManager.getSchema(rawTableName) == null) { - LOGGER.warn("Failed to find schema for table: {}", rawTableName); + if (!schemaExists) { + LOGGER.warn("Failed to find schema for table: {}", tableNameWithType); tableWithoutSchemaCount.getAndIncrement(); return; } // Table config is already in good status return; } - if (_helixResourceManager.getSchema(rawTableName) != null) { + misconfiguredTableCount.getAndIncrement(); + if (schemaExists) { // If a schema named `rawTableName` already exists, then likely this is a misconfiguration. // Reset schema name in table config to null to let the table point to the existing schema. LOGGER.warn("Schema: {} already exists, fix the schema name in table config from {} to null", rawTableName, existSchemaName); - misConfiguredTableCount.getAndIncrement(); } else { // Copy the schema current table referring to to `rawTableName` if it does not exist Schema schema = _helixResourceManager.getSchema(existSchemaName); if (schema == null) { - LOGGER.warn("Failed to find schema for schema name: {}, tale name: {}", existSchemaName, table); - misConfiguredTableCount.getAndIncrement(); + LOGGER.warn("Failed to find schema: {} for table: {}", existSchemaName, tableNameWithType); tableWithoutSchemaCount.getAndIncrement(); return; } schema.setSchemaName(rawTableName); - try { - _helixResourceManager.addSchema(schema, false, false); + if (propertyStore.create(schemaPath, SchemaUtils.toZNRecord(schema), AccessOption.PERSISTENT)) { LOGGER.info("Copied schema: {} to {}", existSchemaName, rawTableName); - } catch (Exception e) { - LOGGER.error("Failed to copy schema: {} to {}", existSchemaName, rawTableName, e); + } else { + LOGGER.warn("Failed to copy schema: {} to {}", existSchemaName, rawTableName); failedToCopySchemaCount.getAndIncrement(); return; } @@ -618,28 +636,32 @@ public void fixSchemaNameInTableConfig() { // Update table config to remove schema name tableConfig.getValidationConfig().setSchemaName(null); try { - _helixResourceManager.updateTableConfig(tableConfig); - LOGGER.info("Removed schema name from table config for table: {}", table); + tableConfigZNRecord = TableConfigUtils.toZNRecord(tableConfig); + } catch (Exception e) { + LOGGER.error("Caught exception constructing ZNRecord from table config for table: {}", tableNameWithType, e); + return; + } + if (propertyStore.set(tableConfigPath, tableConfigZNRecord, tableConfigStat.getVersion(), + AccessOption.PERSISTENT)) { + LOGGER.info("Removed schema name from table config for table: {}", tableNameWithType); fixedSchemaTableCount.getAndIncrement(); - } catch (IOException e) { + } else { + LOGGER.warn("Failed to update table config for table: {}", tableNameWithType); failedToUpdateTableConfigCount.getAndIncrement(); - LOGGER.error("Failed to remove schema name from table config for: {}", tableConfig.getTableName(), e); } }); LOGGER.info( - "Found {} tables are misconfigured, {} tables without schema. " - + "Successfully fixed schema for {} tables, " - + "failed to fix {} tables due to copy schema failure, " - + "failed to fix {} tables due to update table config failure.", - misConfiguredTableCount.get(), tableWithoutSchemaCount.get(), - fixedSchemaTableCount.get(), failedToCopySchemaCount.get(), failedToUpdateTableConfigCount.get()); + "Found {} tables misconfigured, {} tables without schema. Successfully fixed schema for {} tables, failed to " + + "fix {} tables due to copy schema failure, failed to fix {} tables due to update table config failure.", + misconfiguredTableCount.get(), tableWithoutSchemaCount.get(), fixedSchemaTableCount.get(), + failedToCopySchemaCount.get(), failedToUpdateTableConfigCount.get()); _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT, - misConfiguredTableCount.get()); + misconfiguredTableCount.get()); _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT, tableWithoutSchemaCount.get()); _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FIXED_SCHEMA_TABLE_COUNT, fixedSchemaTableCount.get()); - _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FAILED_TO_COPY_SCHEMA, failedToCopySchemaCount.get()); - _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG, + _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT, failedToCopySchemaCount.get()); + _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT, failedToUpdateTableConfigCount.get()); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/SchemaCleanupTaskStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/SchemaCleanupTaskStatelessTest.java index 4d58f4a53c7..8d4e2077f0f 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/SchemaCleanupTaskStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/SchemaCleanupTaskStatelessTest.java @@ -124,14 +124,14 @@ public void testSchemaCleanupTask() assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT), 3); - assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.FIXED_SCHEMA_TABLE_COUNT), 3); assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT), 0); assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.FAILED_TO_COPY_SCHEMA), 0); + ControllerGauge.FIXED_SCHEMA_TABLE_COUNT), 3); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT), 0); assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG), 0); + ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT), 0); // 5. Clean up for (String table : getHelixResourceManager().getAllOfflineTables()) { @@ -196,15 +196,15 @@ public void testSchemaCleanupTaskNormalCase() assertNull(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName()); assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT), 0); - assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.FIXED_SCHEMA_TABLE_COUNT), 3); + ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT), 3); assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT), 0); assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.FAILED_TO_COPY_SCHEMA), 0); + ControllerGauge.FIXED_SCHEMA_TABLE_COUNT), 3); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT), 0); assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG), 0); + ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT), 0); // 7. Clean up for (String table : getHelixResourceManager().getAllOfflineTables()) { @@ -253,14 +253,14 @@ public void testMissingSchema() assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT), 0); - assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.FIXED_SCHEMA_TABLE_COUNT), 0); assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT), 3); assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.FAILED_TO_COPY_SCHEMA), 0); + ControllerGauge.FIXED_SCHEMA_TABLE_COUNT), 0); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT), 0); assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG), 0); + ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT), 0); // 7. Clean up for (String table : getHelixResourceManager().getAllOfflineTables()) { From 313aa28db819cd9074fc302c6f15ca0052af6a06 Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" Date: Tue, 10 Oct 2023 12:16:29 -0700 Subject: [PATCH 3/3] Fix linter --- .../org/apache/pinot/controller/BaseControllerStarter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index bb34f45618d..3b2565f0208 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -660,7 +660,8 @@ public void fixSchemaNameInTableConfig() { misconfiguredTableCount.get()); _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT, tableWithoutSchemaCount.get()); _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FIXED_SCHEMA_TABLE_COUNT, fixedSchemaTableCount.get()); - _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT, failedToCopySchemaCount.get()); + _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT, + failedToCopySchemaCount.get()); _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT, failedToUpdateTableConfigCount.get()); }