diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java index edd3bae35e7..438836a4cfc 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java @@ -188,6 +188,7 @@ public void testResourceAndTagAssignment() } // Add a new table with same broker tenant + addDummySchema(newRawTableName); newTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(newRawTableName) .setServerTenant(TagNameUtils.DEFAULT_TENANT_NAME).build(); _helixResourceManager.addTable(newTableConfig); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index fe54d3cb7bc..7ecd25aab1f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -316,7 +316,7 @@ public void onInstanceConfigChange(List instanceConfigs, Notific Map configs = _helixAdmin.getConfig(helixConfigScope, Arrays.asList(Helix.ENABLE_CASE_INSENSITIVE_KEY)); boolean caseInsensitive = Boolean.parseBoolean(configs.getOrDefault(Helix.ENABLE_CASE_INSENSITIVE_KEY, - Boolean.toString(Helix.DEFAULT_ENABLE_CASE_INSENSITIVE))); + Boolean.toString(Helix.DEFAULT_ENABLE_CASE_INSENSITIVE))); _tableCache = new TableCache(_propertyStore, caseInsensitive); } @@ -1556,6 +1556,10 @@ public void addTable(TableConfig tableConfig) PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(), _enableBatchMessageMode); TableType tableType = tableConfig.getTableType(); + // Ensure that table is not created if schema is not present + if (ZKMetadataProvider.getSchema(_propertyStore, TableNameBuilder.extractRawTableName(tableNameWithType)) == null) { + throw new InvalidTableConfigException("No schema defined for table: " + tableNameWithType); + } if (tableType == TableType.OFFLINE) { try { // Add table config @@ -1571,18 +1575,6 @@ public void addTable(TableConfig tableConfig) } } else { Preconditions.checkState(tableType == TableType.REALTIME, "Invalid table type: %s", tableType); - - // Ensure that realtime table is not created if schema is not present - Schema schema = - ZKMetadataProvider.getSchema(_propertyStore, TableNameBuilder.extractRawTableName(tableNameWithType)); - if (schema == null) { - // Fall back to getting schema-name from table config if schema-name != table-name - String schemaName = tableConfig.getValidationConfig().getSchemaName(); - if (schemaName == null || ZKMetadataProvider.getSchema(_propertyStore, schemaName) == null) { - throw new InvalidTableConfigException("No schema defined for realtime table: " + tableNameWithType); - } - } - try { // Add table config ZKMetadataProvider.setTableConfig(_propertyStore, tableNameWithType, TableConfigUtils.toZNRecord(tableConfig)); @@ -2082,8 +2074,8 @@ public boolean addControllerJobToZK(String jobId, Map jobMetadat tasks.put(jobId, jobMetadata); if (tasks.size() > CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) { tasks = tasks.entrySet().stream().sorted((v1, v2) -> Long.compare( - Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)), - Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)))) + Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)), + Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)))) .collect(Collectors.toList()).subList(0, CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) .stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotBrokerRestletResourceStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotBrokerRestletResourceStatelessTest.java index 5c0242c5c55..4d2b1b6a064 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotBrokerRestletResourceStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotBrokerRestletResourceStatelessTest.java @@ -140,6 +140,10 @@ public void testGetBrokers() Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(), 10); + // Create schema + addDummySchema(TABLE_NAME_1); + addDummySchema(TABLE_NAME_2); + // Adding table _helixResourceManager .addTable(new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_1).setNumReplicas(1).build()); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotFileUploadTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotFileUploadTest.java index e499edf634e..51330ce48af 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotFileUploadTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotFileUploadTest.java @@ -46,6 +46,8 @@ public void setUp() throws Exception { TEST_INSTANCE.setupSharedStateAndValidate(); + // Create schema + TEST_INSTANCE.addDummySchema(TABLE_NAME); // Adding table TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) .setSegmentAssignmentStrategy("RandomAssignmentStrategy").setNumReplicas(2).build(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java index 3bf0e2c70dd..93a142f221d 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java @@ -230,6 +230,7 @@ public void instanceRetagHappyPathTest() public void instanceRetagServerDeficiencyTest() throws Exception { String tableName = "testTable"; + DEFAULT_INSTANCE.addDummySchema(tableName); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName) .setNumReplicas(2).build(); // create table with replication as 2 so that DefaultTenant has a minimum server requirement as 2. diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java index c8ba1dbd73a..eae4892009c 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java @@ -59,6 +59,7 @@ public void testListSegmentLineage() throws Exception { // Adding table String rawTableName = "lineageTestTable"; + TEST_INSTANCE.addDummySchema(rawTableName); String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).setNumReplicas(1).build(); @@ -116,6 +117,7 @@ public void testSegmentCrcApi() throws Exception { // Adding table String rawTableName = "crcTestTable"; + TEST_INSTANCE.addDummySchema(rawTableName); String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).setNumReplicas(1).build(); @@ -171,6 +173,7 @@ public void testDeleteSegmentsWithTimeWindow() throws Exception { // Adding table and segment String rawTableName = "deleteWithTimeWindowTestTable"; + TEST_INSTANCE.addDummySchema(rawTableName); String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).setNumReplicas(1) diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java index 7203da38191..f1766835fd6 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java @@ -65,16 +65,16 @@ public class PinotTableRestletResourceTest extends ControllerTest { public void setUp() throws Exception { DEFAULT_INSTANCE.setupSharedStateAndValidate(); - _createTableUrl = DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableCreate(); _offlineBuilder.setTableName(OFFLINE_TABLE_NAME).setTimeColumnName("timeColumn").setTimeType("DAYS") .setRetentionTimeUnit("DAYS").setRetentionTimeValue("5"); // add schema for realtime table DEFAULT_INSTANCE.addDummySchema(REALTIME_TABLE_NAME); + DEFAULT_INSTANCE.addDummySchema(OFFLINE_TABLE_NAME); StreamConfig streamConfig = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs(); _realtimeBuilder.setTableName(REALTIME_TABLE_NAME).setTimeColumnName("timeColumn").setTimeType("DAYS") - .setRetentionTimeUnit("DAYS").setRetentionTimeValue("5").setSchemaName(REALTIME_TABLE_NAME) + .setRetentionTimeUnit("DAYS").setRetentionTimeValue("5") .setStreamConfigs(streamConfig.getStreamConfigsMap()); } @@ -105,7 +105,18 @@ public void testCreateTable() assertTrue(e.getMessage().contains("Got error status code: 400")); } + // Creating an OFFLINE table without a valid schema should fail + offlineTableConfig = _offlineBuilder.setTableName("no_schema").build(); + try { + sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString()); + fail("Creation of a OFFLINE table without a valid schema does not fail"); + } catch (IOException e) { + // Expected 400 Bad Request + assertTrue(e.getMessage().contains("Got error status code: 400")); + } + // Create an OFFLINE table with a valid name which should succeed + DEFAULT_INSTANCE.addDummySchema("valid_table_name"); offlineTableConfig = _offlineBuilder.setTableName("valid_table_name").build(); String offlineTableConfigString = offlineTableConfig.toJsonString(); sendPostRequest(_createTableUrl, offlineTableConfigString); @@ -143,7 +154,7 @@ public void testCreateTable() } // Creating a REALTIME table without a valid schema should fail - realtimeTableConfig = _realtimeBuilder.setTableName("noSchema").setSchemaName("noSchema").build(); + realtimeTableConfig = _realtimeBuilder.setTableName("noSchema").build(); try { sendPostRequest(_createTableUrl, realtimeTableConfig.toJsonString()); fail("Creation of a REALTIME table without a valid schema does not fail"); @@ -152,10 +163,6 @@ public void testCreateTable() assertTrue(e.getMessage().contains("Got error status code: 400")); } - // Creating a REALTIME table with a different schema name in the config should succeed (backwards compatibility) - realtimeTableConfig = _realtimeBuilder.setSchemaName(REALTIME_TABLE_NAME).build(); - sendPostRequest(_createTableUrl, realtimeTableConfig.toJsonString()); - // Create a REALTIME table with the invalid time column name should fail realtimeTableConfig = _realtimeBuilder.setTableName(REALTIME_TABLE_NAME).setTimeColumnName("invalidTimeColumn").build(); @@ -174,8 +181,10 @@ public void testCreateTable() } @Test - public void testTableCronSchedule() { + public void testTableCronSchedule() + throws IOException { String rawTableName = "test_table_cron_schedule"; + DEFAULT_INSTANCE.addDummySchema(rawTableName); // Failed to create a table TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).setTaskConfig( new TableTaskConfig(ImmutableMap.of(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, @@ -224,6 +233,7 @@ public void testTableMinReplication() private void testTableMinReplicationInternal(String tableName, int tableReplication) throws Exception { + DEFAULT_INSTANCE.addDummySchema(tableName); String tableJSONConfigString = _offlineBuilder.setTableName(tableName).setNumReplicas(tableReplication).build().toJsonString(); sendPostRequest(_createTableUrl, tableJSONConfigString); @@ -232,7 +242,6 @@ private void testTableMinReplicationInternal(String tableName, int tableReplicat assertEquals(tableConfig.getReplication(), Math.max(tableReplication, DEFAULT_MIN_NUM_REPLICAS)); - DEFAULT_INSTANCE.addDummySchema(tableName); tableJSONConfigString = _realtimeBuilder.setTableName(tableName).setNumReplicas(tableReplication).build().toJsonString(); sendPostRequest(_createTableUrl, tableJSONConfigString); @@ -299,6 +308,7 @@ private TableConfig getTableConfig(String tableName, String tableType) public void testUpdateTableConfig() throws Exception { String tableName = "updateTC"; + DEFAULT_INSTANCE.addDummySchema(tableName); String tableConfigString = _offlineBuilder.setTableName(tableName).setNumReplicas(2).build().toJsonString(); sendPostRequest(_createTableUrl, tableConfigString); // table creation should succeed @@ -365,12 +375,13 @@ public void testListTables() // post 2 offline, 1 realtime String rawTableName1 = "pqr"; + DEFAULT_INSTANCE.addDummySchema(rawTableName1); TableConfig offlineTableConfig1 = _offlineBuilder.setTableName(rawTableName1).build(); sendPostRequest(_createTableUrl, offlineTableConfig1.toJsonString()); - DEFAULT_INSTANCE.addDummySchema(rawTableName1); TableConfig realtimeTableConfig1 = _realtimeBuilder.setTableName(rawTableName1).setNumReplicas(2).build(); sendPostRequest(_createTableUrl, realtimeTableConfig1.toJsonString()); String rawTableName2 = "abc"; + DEFAULT_INSTANCE.addDummySchema(rawTableName2); TableConfig offlineTableConfig2 = _offlineBuilder.setTableName(rawTableName2).build(); sendPostRequest(_createTableUrl, offlineTableConfig2.toJsonString()); @@ -479,6 +490,7 @@ public void rebalanceTableWithoutSegments() public void testDeleteTable() throws IOException { // Case 1: Create a REALTIME table and delete it directly w/o using query param. + DEFAULT_INSTANCE.addDummySchema("table0"); TableConfig realtimeTableConfig = _realtimeBuilder.setTableName("table0").build(); String creationResponse = sendPostRequest(_createTableUrl, realtimeTableConfig.toJsonString()); assertEquals(creationResponse, @@ -501,6 +513,7 @@ public void testDeleteTable() assertEquals(deleteResponse, "{\"status\":\"Tables: [table0_OFFLINE] deleted\"}"); // Case 3: Create REALTIME and OFFLINE tables and delete both of them. + DEFAULT_INSTANCE.addDummySchema("table1"); TableConfig rtConfig1 = _realtimeBuilder.setTableName("table1").build(); creationResponse = sendPostRequest(_createTableUrl, rtConfig1.toJsonString()); assertEquals(creationResponse, @@ -516,6 +529,7 @@ public void testDeleteTable() assertEquals(deleteResponse, "{\"status\":\"Tables: [table1_OFFLINE, table1_REALTIME] deleted\"}"); // Case 4: Create REALTIME and OFFLINE tables and delete the realtime/offline table using query params. + DEFAULT_INSTANCE.addDummySchema("table2"); TableConfig rtConfig2 = _realtimeBuilder.setTableName("table2").build(); creationResponse = sendPostRequest(_createTableUrl, rtConfig2.toJsonString()); assertEquals(creationResponse, @@ -553,6 +567,7 @@ public void testDeleteTable() } // Case 6: Create REALTIME and OFFLINE tables and delete the realtime/offline table using query params and suffixes. + DEFAULT_INSTANCE.addDummySchema("table3"); TableConfig rtConfig3 = _realtimeBuilder.setTableName("table3").build(); creationResponse = sendPostRequest(_createTableUrl, rtConfig3.toJsonString()); assertEquals(creationResponse, @@ -577,13 +592,15 @@ public void testCheckTableState() throws IOException { // Create a valid REALTIME table - TableConfig realtimeTableConfig = _realtimeBuilder.setTableName("testTable").build(); + String tableName = "testTable"; + DEFAULT_INSTANCE.addDummySchema(tableName); + TableConfig realtimeTableConfig = _realtimeBuilder.setTableName(tableName).build(); String creationResponse = sendPostRequest(_createTableUrl, realtimeTableConfig.toJsonString()); assertEquals(creationResponse, "{\"unrecognizedProperties\":{},\"status\":\"Table testTable_REALTIME successfully added\"}"); // Create a valid OFFLINE table - TableConfig offlineTableConfig = _offlineBuilder.setTableName("testTable").build(); + TableConfig offlineTableConfig = _offlineBuilder.setTableName(tableName).build(); creationResponse = sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString()); assertEquals(creationResponse, "{\"unrecognizedProperties\":{},\"status\":\"Table testTable_OFFLINE successfully added\"}"); @@ -708,6 +725,7 @@ public void testUnrecognizedProperties() // Create an OFFLINE table with a valid name but with unrecognizedProperties which should succeed // Should have unrecognizedProperties set correctly String tableName = "valid_table_name_extra_props"; + DEFAULT_INSTANCE.addDummySchema(tableName); TableConfig offlineTableConfig = _realtimeBuilder.setTableName("valid_table_name_extra_props").build(); JsonNode jsonNode = JsonUtils.objectToJsonNode(offlineTableConfig); ((ObjectNode) jsonNode).put("illegalKey1", 1); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTenantRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTenantRestletResourceTest.java index 289f070a14b..28a0454d41b 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTenantRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTenantRestletResourceTest.java @@ -46,7 +46,6 @@ public class PinotTenantRestletResourceTest extends ControllerTest { private static final String RAW_TABLE_NAME = "toggleTable"; private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME); - @BeforeClass public void setUp() throws Exception { @@ -56,91 +55,98 @@ public void setUp() @Test public void testTableListForTenant() throws Exception { - // Check that no tables on tenant works - String listTablesUrl = - TEST_INSTANCE.getControllerRequestURLBuilder().forTablesFromTenant(TagNameUtils.DEFAULT_TENANT_NAME); - JsonNode tableList = JsonUtils.stringToJsonNode(ControllerTest.sendGetRequest(listTablesUrl)); - assertEquals(tableList.get("tables").size(), 0); - - // Add some non default tag broker instances, UNTAGGED_BROKER_INSTANCE - String brokerTag2 = "brokerTag2"; - TEST_INSTANCE.addFakeBrokerInstanceToAutoJoinHelixCluster("broker_999", false); - TEST_INSTANCE.addFakeBrokerInstanceToAutoJoinHelixCluster("broker_1000", false); - TEST_INSTANCE.updateBrokerTenant("brokerTag2", 2); - - // Add a table - ControllerTest.sendPostRequest(TEST_INSTANCE.getControllerRequestURLBuilder().forTableCreate(), - new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build().toJsonString()); - - // Add a second table with a different broker tag - String table2 = "restletTable2_OFFLINE"; - ControllerTest.sendPostRequest(TEST_INSTANCE.getControllerRequestURLBuilder().forTableCreate(), - new TableConfigBuilder(TableType.OFFLINE).setTableName(table2).setBrokerTenant( - brokerTag2) - .build().toJsonString()); - - // There should be 2 tables on the tenant when querying default Tenant for servers w/o specifying ?type=server - tableList = JsonUtils.stringToJsonNode(ControllerTest.sendGetRequest(listTablesUrl)); - JsonNode tables = tableList.get("tables"); - assertEquals(tables.size(), 2); - - // There should be 2 tables even when specifying ?type=server as that is the default - listTablesUrl = - TEST_INSTANCE.getControllerRequestURLBuilder().forTablesFromTenant(TagNameUtils.DEFAULT_TENANT_NAME, - "server"); - tableList = JsonUtils.stringToJsonNode(ControllerTest.sendGetRequest(listTablesUrl)); - tables = tableList.get("tables"); - - // Check to make sure that test tables exists. - boolean found1 = false; - boolean found2 = false; - for (int i = 0; i < tables.size(); i++) { - found1 = found1 || tables.get(i).asText().equals(TABLE_NAME); - found2 = found2 || tables.get(i).asText().equals(table2); + try { + // Check that no tables on tenant works + String listTablesUrl = + TEST_INSTANCE.getControllerRequestURLBuilder().forTablesFromTenant(TagNameUtils.DEFAULT_TENANT_NAME); + JsonNode tableList = JsonUtils.stringToJsonNode(ControllerTest.sendGetRequest(listTablesUrl)); + assertEquals(tableList.get("tables").size(), 0); + + // Add some non default tag broker instances, UNTAGGED_BROKER_INSTANCE + String brokerTag2 = "brokerTag2"; + TEST_INSTANCE.addFakeBrokerInstanceToAutoJoinHelixCluster("broker_999", false); + TEST_INSTANCE.addFakeBrokerInstanceToAutoJoinHelixCluster("broker_1000", false); + TEST_INSTANCE.updateBrokerTenant("brokerTag2", 2); + + // Add a table + ControllerTest.sendPostRequest(TEST_INSTANCE.getControllerRequestURLBuilder().forSchemaCreate(), + createDummySchema(TableNameBuilder.extractRawTableName(TABLE_NAME)).toPrettyJsonString()); + + ControllerTest.sendPostRequest(TEST_INSTANCE.getControllerRequestURLBuilder().forTableCreate(), + new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build().toJsonString()); + + // Add a second table with a different broker tag + String table2 = "restletTable2_OFFLINE"; + ControllerTest.sendPostRequest(TEST_INSTANCE.getControllerRequestURLBuilder().forSchemaCreate(), + createDummySchema(TableNameBuilder.extractRawTableName(table2)).toPrettyJsonString()); + ControllerTest.sendPostRequest(TEST_INSTANCE.getControllerRequestURLBuilder().forTableCreate(), + new TableConfigBuilder(TableType.OFFLINE).setTableName(table2).setBrokerTenant( + brokerTag2) + .build().toJsonString()); + + // There should be 2 tables on the tenant when querying default Tenant for servers w/o specifying ?type=server + tableList = JsonUtils.stringToJsonNode(ControllerTest.sendGetRequest(listTablesUrl)); + JsonNode tables = tableList.get("tables"); + assertEquals(tables.size(), 2); + + // There should be 2 tables even when specifying ?type=server as that is the default + listTablesUrl = + TEST_INSTANCE.getControllerRequestURLBuilder().forTablesFromTenant(TagNameUtils.DEFAULT_TENANT_NAME, + "server"); + tableList = JsonUtils.stringToJsonNode(ControllerTest.sendGetRequest(listTablesUrl)); + tables = tableList.get("tables"); + + // Check to make sure that test tables exists. + boolean found1 = false; + boolean found2 = false; + for (int i = 0; i < tables.size(); i++) { + found1 = found1 || tables.get(i).asText().equals(TABLE_NAME); + found2 = found2 || tables.get(i).asText().equals(table2); + } + + // There should be only 1 table when specifying ?type=broker for the default tenant + listTablesUrl = + TEST_INSTANCE.getControllerRequestURLBuilder().forTablesFromTenant(TagNameUtils.DEFAULT_TENANT_NAME, + "broker"); + tableList = JsonUtils.stringToJsonNode(ControllerTest.sendGetRequest(listTablesUrl)); + tables = tableList.get("tables"); + assertEquals(tables.size(), 1); + + String defaultTenantTable = tables.get(0).asText(); + + // There should be only 1 table when specifying ?type=broker for the broker_untagged tenant + listTablesUrl = + TEST_INSTANCE.getControllerRequestURLBuilder().forTablesFromTenant(brokerTag2, + "broker"); + tableList = JsonUtils.stringToJsonNode(ControllerTest.sendGetRequest(listTablesUrl)); + tables = tableList.get("tables"); + assertEquals(tables.size(), 1); + + // This second table should not be the same as the one from the default tenant + assertTrue(!defaultTenantTable.equals(tables.get(0).asText())); + + // reset the ZK node to simulate corruption + ZkHelixPropertyStore propertyStore = TEST_INSTANCE.getPropertyStore(); + String zkPath = "/CONFIGS/TABLE/" + TABLE_NAME; + ZNRecord znRecord = propertyStore.get(zkPath, null, 0); + propertyStore.set(zkPath, new ZNRecord(znRecord.getId()), 1); + + // corrupt the other one also + zkPath = "/CONFIGS/TABLE/" + table2; + znRecord = propertyStore.get(zkPath, null, 0); + propertyStore.set(zkPath, new ZNRecord(znRecord.getId()), 1); + + // Now there should be no tables + tableList = JsonUtils.stringToJsonNode(ControllerTest.sendGetRequest(listTablesUrl)); + tables = tableList.get("tables"); + assertEquals(tables.size(), 0); + } finally { + // remove the additional, non-default table and broker instances + TEST_INSTANCE.dropOfflineTable("restletTable2_OFFLINE"); + TEST_INSTANCE.stopAndDropFakeInstance("broker_999"); + TEST_INSTANCE.stopAndDropFakeInstance("broker_1000"); + TEST_INSTANCE.deleteBrokerTenant("brokerTag2"); } - - // There should be only 1 table when specifying ?type=broker for the default tenant - listTablesUrl = - TEST_INSTANCE.getControllerRequestURLBuilder().forTablesFromTenant(TagNameUtils.DEFAULT_TENANT_NAME, - "broker"); - tableList = JsonUtils.stringToJsonNode(ControllerTest.sendGetRequest(listTablesUrl)); - tables = tableList.get("tables"); - assertEquals(tables.size(), 1); - - String defaultTenantTable = tables.get(0).asText(); - - // There should be only 1 table when specifying ?type=broker for the broker_untagged tenant - listTablesUrl = - TEST_INSTANCE.getControllerRequestURLBuilder().forTablesFromTenant(brokerTag2, - "broker"); - tableList = JsonUtils.stringToJsonNode(ControllerTest.sendGetRequest(listTablesUrl)); - tables = tableList.get("tables"); - assertEquals(tables.size(), 1); - - // This second table should not be the same as the one from the default tenant - assertTrue(!defaultTenantTable.equals(tables.get(0).asText())); - - // reset the ZK node to simulate corruption - ZkHelixPropertyStore propertyStore = TEST_INSTANCE.getPropertyStore(); - String zkPath = "/CONFIGS/TABLE/" + TABLE_NAME; - ZNRecord znRecord = propertyStore.get(zkPath, null, 0); - propertyStore.set(zkPath, new ZNRecord(znRecord.getId()), 1); - - // corrupt the other one also - zkPath = "/CONFIGS/TABLE/" + table2; - znRecord = propertyStore.get(zkPath, null, 0); - propertyStore.set(zkPath, new ZNRecord(znRecord.getId()), 1); - - // Now there should be no tables - tableList = JsonUtils.stringToJsonNode(ControllerTest.sendGetRequest(listTablesUrl)); - tables = tableList.get("tables"); - assertEquals(tables.size(), 0); - - // remove the additional, non-default table and broker instances - TEST_INSTANCE.dropOfflineTable(table2); - TEST_INSTANCE.stopAndDropFakeInstance("broker_999"); - TEST_INSTANCE.stopAndDropFakeInstance("broker_1000"); - TEST_INSTANCE.deleteBrokerTenant(brokerTag2); } @Test @@ -155,8 +161,10 @@ public void testListInstance() @Test public void testToggleTenantState() - throws Exception { + throws Exception { // Create an offline table + ControllerTest.sendPostRequest(TEST_INSTANCE.getControllerRequestURLBuilder().forSchemaCreate(), + createDummySchema(TableNameBuilder.extractRawTableName(RAW_TABLE_NAME)).toPrettyJsonString()); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(DEFAULT_MIN_NUM_REPLICAS) .build(); @@ -221,7 +229,8 @@ public void testToggleTenantState() } @AfterClass - public void tearDown() { + public void tearDown() + throws IOException { TEST_INSTANCE.cleanup(); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableConfigsRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableConfigsRestletResourceTest.java index 25b97606fe3..18ae1b14a54 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableConfigsRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableConfigsRestletResourceTest.java @@ -600,14 +600,10 @@ public void testUnrecognizedProperties() @Test public void testGetConfigCompatibility() throws IOException { - // Should not fail if schema name does not match raw table name in the case they are created separately - String schemaName = "schema1"; - Schema schema = createDummySchema(schemaName); - sendPostRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forSchemaCreate(), schema.toPrettyJsonString()); String tableName = "table1"; + DEFAULT_INSTANCE.addDummySchema(tableName); TableConfig offlineTableConfig = createOfflineTableConfig(tableName); SegmentsValidationAndRetentionConfig validationConfig = new SegmentsValidationAndRetentionConfig(); - validationConfig.setSchemaName(schemaName); validationConfig.setReplication("1"); offlineTableConfig.setValidationConfig(validationConfig); sendPostRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableCreate(), @@ -617,11 +613,11 @@ public void testGetConfigCompatibility() TableConfigs tableConfigsResponse = JsonUtils.stringToObject(response, TableConfigs.class); Assert.assertEquals(tableConfigsResponse.getTableName(), tableName); Assert.assertEquals(tableConfigsResponse.getOffline().getTableName(), offlineTableConfig.getTableName()); - Assert.assertEquals(tableConfigsResponse.getSchema().getSchemaName(), schema.getSchemaName()); + Assert.assertEquals(tableConfigsResponse.getSchema().getSchemaName(), tableName); // Delete sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName)); - sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forSchemaDelete(schemaName)); + sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forSchemaDelete(tableName)); } @AfterClass diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java index 279821537a1..178fb5720b5 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java @@ -53,6 +53,7 @@ public void setUp() DEFAULT_INSTANCE.setupSharedStateAndValidate(); // Create the offline table and add one segment + DEFAULT_INSTANCE.addDummySchema(OFFLINE_TABLE_NAME); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).setNumReplicas(2).build(); assertEquals(DEFAULT_INSTANCE.getHelixManager().getInstanceType(), InstanceType.CONTROLLER); @@ -62,12 +63,12 @@ public void setUp() SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, OFFLINE_SEGMENT_NAME), "downloadUrl"); // Create the hybrid table + DEFAULT_INSTANCE.addDummySchema(HYBRID_TABLE_NAME); tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(HYBRID_TABLE_NAME) .setNumReplicas(DEFAULT_MIN_NUM_REPLICAS).build(); DEFAULT_INSTANCE.getHelixResourceManager().addTable(tableConfig); // add schema for realtime table - DEFAULT_INSTANCE.addDummySchema(HYBRID_TABLE_NAME); StreamConfig streamConfig = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs(4); tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(HYBRID_TABLE_NAME) .setNumReplicas(DEFAULT_MIN_NUM_REPLICAS).setStreamConfigs(streamConfig.getStreamConfigsMap()).build(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java index 6fe3e565761..d74a6fd0a3e 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java @@ -50,6 +50,10 @@ public void setUp() @Test public void testInstanceToggle() throws Exception { + // Create schema + sendPostRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forSchemaCreate(), + createDummySchema(RAW_TABLE_NAME).toPrettyJsonString()); + // Create an offline table TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(DEFAULT_MIN_NUM_REPLICAS) diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTest.java index 1fd7b061409..0e6fea5abbd 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTest.java @@ -26,7 +26,7 @@ import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; -import org.testng.annotations.AfterTest; +import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -45,6 +45,9 @@ public void setUp() @Test public void testOfflineTableLifeCycle() throws IOException { + // Create schema + sendPostRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forSchemaCreate(), + createDummySchema(TABLE_NAME).toPrettyJsonString()); // Create offline table creation request TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNumReplicas(DEFAULT_MIN_NUM_REPLICAS) @@ -83,7 +86,7 @@ public void testOfflineTableLifeCycle() TagNameUtils.getRealtimeTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME)).size(), DEFAULT_NUM_SERVER_INSTANCES); } - @AfterTest + @AfterClass public void tearDown() { DEFAULT_INSTANCE.cleanup(); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java index 43b7e4224ef..33932ee38b1 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java @@ -88,6 +88,7 @@ import static org.apache.pinot.spi.utils.CommonConstants.Server.DEFAULT_ADMIN_API_PORT; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -792,9 +793,10 @@ public static String sendGetRequestRaw(String urlString) } public static String sendPostRequest(String urlString) - throws IOException { + throws IOException { return sendPostRequest(urlString, null); } + public static String sendPostRequest(String urlString, String payload) throws IOException { return sendPostRequest(urlString, payload, Collections.emptyMap()); @@ -1030,7 +1032,9 @@ public void setupSharedStateAndValidate() DEFAULT_NUM_SERVER_INSTANCES); // No pre-existing tables - assertEquals(getHelixResourceManager().getAllTables().size(), 0); + assertTrue(CollectionUtils.isEmpty(getHelixResourceManager().getAllTables())); + // No pre-existing schemas + assertTrue(CollectionUtils.isEmpty(getHelixResourceManager().getSchemaNames())); } /** diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java index 6f4c87de04c..1ed304740f5 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java @@ -61,6 +61,9 @@ public void setUp() _testInstance.setupSharedStateAndValidate(); _resourceManager = _testInstance.getHelixResourceManager(); + // Create schema + _testInstance.addDummySchema(RAW_TABLE_NAME); + // Adding an offline table TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); _resourceManager.addTable(offlineTableConfig); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java index fff0ac890eb..56fe2464cfb 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java @@ -43,7 +43,6 @@ public class TableCacheTest { private static final ControllerTest TEST_INSTANCE = ControllerTest.getInstance(); - private static final String SCHEMA_NAME = "cacheTestSchema"; private static final String RAW_TABLE_NAME = "cacheTestTable"; private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME); private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME); @@ -62,8 +61,6 @@ public void testTableCache(boolean isCaseInsensitive) throws Exception { TableCache tableCache = new TableCache(TEST_INSTANCE.getPropertyStore(), isCaseInsensitive); - assertNull(tableCache.getSchema(SCHEMA_NAME)); - assertNull(tableCache.getColumnNameMap(SCHEMA_NAME)); assertNull(tableCache.getSchema(RAW_TABLE_NAME)); assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME)); assertNull(tableCache.getTableConfig(OFFLINE_TABLE_NAME)); @@ -71,15 +68,15 @@ public void testTableCache(boolean isCaseInsensitive) // Add a schema Schema schema = - new Schema.SchemaBuilder().setSchemaName(SCHEMA_NAME).addSingleValueDimension("testColumn", DataType.INT) + new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).addSingleValueDimension("testColumn", DataType.INT) .build(); TEST_INSTANCE.getHelixResourceManager().addSchema(schema, false, false); // Wait for at most 10 seconds for the callback to add the schema to the cache - TestUtils.waitForCondition(aVoid -> tableCache.getSchema(SCHEMA_NAME) != null, 10_000L, + TestUtils.waitForCondition(aVoid -> tableCache.getSchema(RAW_TABLE_NAME) != null, 10_000L, "Failed to add the schema to the cache"); // Schema can be accessed by the schema name, but not by the table name because table config is not added yet Schema expectedSchema = - new Schema.SchemaBuilder().setSchemaName(SCHEMA_NAME).addSingleValueDimension("testColumn", DataType.INT) + new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).addSingleValueDimension("testColumn", DataType.INT) .addSingleValueDimension(BuiltInVirtualColumn.DOCID, DataType.INT) .addSingleValueDimension(BuiltInVirtualColumn.HOSTNAME, DataType.STRING) .addSingleValueDimension(BuiltInVirtualColumn.SEGMENTNAME, DataType.STRING).build(); @@ -88,16 +85,15 @@ public void testTableCache(boolean isCaseInsensitive) expectedColumnMap.put(isCaseInsensitive ? "$docid" : "$docId", "$docId"); expectedColumnMap.put(isCaseInsensitive ? "$hostname" : "$hostName", "$hostName"); expectedColumnMap.put(isCaseInsensitive ? "$segmentname" : "$segmentName", "$segmentName"); - assertEquals(tableCache.getSchema(SCHEMA_NAME), expectedSchema); - assertEquals(tableCache.getColumnNameMap(SCHEMA_NAME), expectedColumnMap); - assertNull(tableCache.getSchema(RAW_TABLE_NAME)); - assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME)); + assertEquals(tableCache.getSchema(RAW_TABLE_NAME), expectedSchema); + assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME), expectedColumnMap); // Case-insensitive table name are handled based on the table config instead of the schema assertNull(tableCache.getActualTableName(RAW_TABLE_NAME)); // Add a table config TableConfig tableConfig = - new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setSchemaName(SCHEMA_NAME).build(); + new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); + TEST_INSTANCE.waitForEVToDisappear(tableConfig.getTableName()); TEST_INSTANCE.getHelixResourceManager().addTable(tableConfig); // Wait for at most 10 seconds for the callback to add the table config to the cache TestUtils.waitForCondition( @@ -108,8 +104,6 @@ public void testTableCache(boolean isCaseInsensitive) // It should only add OFFLINE and normal table. assertNull(tableCache.getActualTableName(REALTIME_TABLE_NAME)); // Schema can be accessed by both the schema name and the raw table name - assertEquals(tableCache.getSchema(SCHEMA_NAME), expectedSchema); - assertEquals(tableCache.getColumnNameMap(SCHEMA_NAME), expectedColumnMap); assertEquals(tableCache.getSchema(RAW_TABLE_NAME), expectedSchema); assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME), expectedColumnMap); @@ -138,18 +132,14 @@ public void testTableCache(boolean isCaseInsensitive) expectedSchema.addField(new DimensionFieldSpec("newColumn", DataType.LONG, true)); expectedColumnMap.put(isCaseInsensitive ? "newcolumn" : "newColumn", "newColumn"); TestUtils.waitForCondition(aVoid -> { - assertNotNull(tableCache.getSchema(SCHEMA_NAME)); + assertNotNull(tableCache.getSchema(RAW_TABLE_NAME)); assertEquals(schemaChangeListener._schemaList.size(), 1); return schemaChangeListener._schemaList.get(0).equals(expectedSchema); }, 10_000L, "Failed to update the schema in the cache"); // Schema can be accessed by both the schema name and the raw table name - assertEquals(tableCache.getSchema(SCHEMA_NAME), expectedSchema); - assertEquals(tableCache.getColumnNameMap(SCHEMA_NAME), expectedColumnMap); assertEquals(tableCache.getSchema(RAW_TABLE_NAME), expectedSchema); assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME), expectedColumnMap); - // Update the table config and drop the schema name - tableConfig.getValidationConfig().setSchemaName(null); TEST_INSTANCE.getHelixResourceManager().updateTableConfig(tableConfig); // Wait for at most 10 seconds for the callback to update the table config in the cache // NOTE: @@ -165,8 +155,8 @@ public void testTableCache(boolean isCaseInsensitive) // After dropping the schema name from the table config, schema can only be accessed by the schema name, but not by // the table name assertEquals(tableCache.getTableConfig(OFFLINE_TABLE_NAME), tableConfig); - assertNull(tableCache.getSchema(RAW_TABLE_NAME)); - assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME)); + assertNotNull(tableCache.getSchema(RAW_TABLE_NAME)); + assertNotNull(tableCache.getColumnNameMap(RAW_TABLE_NAME)); if (isCaseInsensitive) { assertEquals(tableCache.getActualTableName(MANGLED_RAW_TABLE_NAME), RAW_TABLE_NAME); assertEquals(tableCache.getActualTableName(MANGLED_OFFLINE_TABLE_NAME), OFFLINE_TABLE_NAME); @@ -177,8 +167,8 @@ public void testTableCache(boolean isCaseInsensitive) assertEquals(tableCache.getActualTableName(OFFLINE_TABLE_NAME), OFFLINE_TABLE_NAME); } assertNull(tableCache.getActualTableName(REALTIME_TABLE_NAME)); - assertEquals(tableCache.getSchema(SCHEMA_NAME), expectedSchema); - assertEquals(tableCache.getColumnNameMap(SCHEMA_NAME), expectedColumnMap); + assertEquals(tableCache.getSchema(RAW_TABLE_NAME), expectedSchema); + assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME), expectedColumnMap); // Wait for external view to appear before deleting the table to prevent external view being created after the // waitForEVToDisappear() call @@ -194,21 +184,19 @@ public void testTableCache(boolean isCaseInsensitive) "Failed to remove the table config from the cache"); assertNull(tableCache.getTableConfig(OFFLINE_TABLE_NAME)); assertNull(tableCache.getActualTableName(RAW_TABLE_NAME)); - assertEquals(tableCache.getSchema(SCHEMA_NAME), expectedSchema); - assertEquals(tableCache.getColumnNameMap(SCHEMA_NAME), expectedColumnMap); - assertNull(tableCache.getSchema(RAW_TABLE_NAME)); - assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME)); + assertEquals(tableCache.getSchema(RAW_TABLE_NAME), expectedSchema); + assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME), expectedColumnMap); // Remove the schema - TEST_INSTANCE.getHelixResourceManager().deleteSchema(SCHEMA_NAME); + TEST_INSTANCE.getHelixResourceManager().deleteSchema(RAW_TABLE_NAME); // Wait for at most 10 seconds for the callback to remove the schema from the cache // NOTE: // - Verify if the callback is fully done by checking the schema change lister because it is the last step of the // callback handling TestUtils.waitForCondition(aVoid -> schemaChangeListener._schemaList.isEmpty(), 10_000L, "Failed to remove the schema from the cache"); - assertNull(tableCache.getSchema(SCHEMA_NAME)); - assertNull(tableCache.getColumnNameMap(SCHEMA_NAME)); + assertNull(tableCache.getSchema(RAW_TABLE_NAME)); + assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME)); assertNull(tableCache.getSchema(RAW_TABLE_NAME)); assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME)); assertEquals(schemaChangeListener._schemaList.size(), 0); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerAssignmentTest.java index 2c3b92ebc58..139f4eaba5e 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerAssignmentTest.java @@ -77,6 +77,8 @@ public void setUp() resetBrokerTags(); resetServerTags(); + + addDummySchema(RAW_TABLE_NAME); } private void untagBrokers() { @@ -129,6 +131,7 @@ public void testAssignTargetTier() TierConfig tierConfig = new TierConfig("tier1", TierFactory.FIXED_SEGMENT_SELECTOR_TYPE, null, Collections.singletonList("testSegment"), TierFactory.PINOT_SERVER_STORAGE_TYPE, coldOfflineServerTag, null, null); + addDummySchema(RAW_TABLE_NAME); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME) .setTierConfigList(Collections.singletonList(tierConfig)).setServerTenant(SERVER_TENANT_NAME).build(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java index 818a42e5346..77d2a8a8cc2 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java @@ -114,6 +114,7 @@ public void setUp() resetBrokerTags(); resetServerTags(); + addDummySchema(RAW_TABLE_NAME); } private void untagBrokers() { @@ -239,6 +240,7 @@ public void testUpdateBrokerResource() assertEquals(untaggedBrokers.size(), 1); // Add a table + addDummySchema(RAW_TABLE_NAME); TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME) .setServerTenant(SERVER_TENANT_NAME).build(); @@ -297,6 +299,7 @@ private void checkBrokerResource(List expectedBrokers) { public void testRebuildBrokerResourceFromHelixTags() throws Exception { // Create the table + addDummySchema(RAW_TABLE_NAME); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME) .setServerTenant(SERVER_TENANT_NAME).build(); @@ -761,6 +764,7 @@ public void testUpdateTargetTier() new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME) .setTierConfigList(Collections.singletonList(tierConfig)).setServerTenant(SERVER_TENANT_NAME).build(); waitForEVToDisappear(tableConfig.getTableName()); + addDummySchema(RAW_TABLE_NAME); _helixResourceManager.addTable(tableConfig); String segmentName = "testSegment"; @@ -796,6 +800,7 @@ public void testUpdateTargetTier() @Test public void testSegmentReplacementWithCustomToSegments() throws Exception { // Create the table + addDummySchema(RAW_TABLE_NAME); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME) .setServerTenant(SERVER_TENANT_NAME).build(); @@ -830,6 +835,7 @@ public void testSegmentReplacementWithCustomToSegments() throws Exception { public void testSegmentReplacementRegular() throws Exception { // Create the table + addDummySchema(RAW_TABLE_NAME); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME) .setServerTenant(SERVER_TENANT_NAME).build(); @@ -1101,6 +1107,7 @@ public void testSegmentReplacementRegular() public void testSegmentReplacementForRefresh() throws Exception { // Create the table + addDummySchema(RAW_TABLE_NAME); IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null, "REFRESH", "DAILY")); TableConfig tableConfig = diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java index 8d6da2681a8..6bf4710578d 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java @@ -104,6 +104,7 @@ public void testRebalance() assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED); // Create the table + addDummySchema(RAW_TABLE_NAME); _helixResourceManager.addTable(tableConfig); // Add the segments @@ -332,6 +333,7 @@ public void testRebalanceWithTiers() new TableConfigBuilder(TableType.OFFLINE).setTableName(TIERED_TABLE_NAME).setNumReplicas(NUM_REPLICAS) .setServerTenant(NO_TIER_NAME).build(); // Create the table + addDummySchema(TIERED_TABLE_NAME); _helixResourceManager.addTable(tableConfig); // Add the segments @@ -423,6 +425,7 @@ public void testRebalanceWithTiersAndInstanceAssignments() new TableConfigBuilder(TableType.OFFLINE).setTableName(TIERED_TABLE_NAME).setNumReplicas(NUM_REPLICAS) .setServerTenant("replicaAssignment" + NO_TIER_NAME).build(); // Create the table + addDummySchema(TIERED_TABLE_NAME); _helixResourceManager.addTable(tableConfig); // Add the segments diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java index a4237fee3fc..6bb7fbebb13 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java @@ -79,6 +79,10 @@ public void testRebalance() // tag all servers and brokers to test tenant addTenantTagToInstances(TENANT_NAME); + // create 2 schemas + addDummySchema(RAW_TABLE_NAME_A); + addDummySchema(RAW_TABLE_NAME_B); + // create 2 tables, one on each of test tenant and default tenant createTableWithSegments(RAW_TABLE_NAME_A, DEFAULT_TENANT_NAME); createTableWithSegments(RAW_TABLE_NAME_B, TENANT_NAME); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java index be1de7f92d5..5915dab1871 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java @@ -36,6 +36,7 @@ import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.util.TestUtils; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -66,6 +67,10 @@ public void setUp() _retentionManager = new RetentionManager(_resourceManager, mock(LeadControllerManager.class), controllerConf, mock(ControllerMetrics.class)); + // Create a schema + TEST_INSTANCE.addDummySchema(TableNameBuilder.extractRawTableName(OFFLINE_TABLE_NAME)); + TEST_INSTANCE.addDummySchema(TableNameBuilder.extractRawTableName(REFRESH_OFFLINE_TABLE_NAME)); + // Update table config TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).setNumReplicas(1).build(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerStatelessTest.java index 20da2137572..f66230c6d12 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerStatelessTest.java @@ -51,7 +51,9 @@ public void setUp() startController(); addFakeBrokerInstancesToAutoJoinHelixCluster(2, true); addFakeServerInstancesToAutoJoinHelixCluster(2, true); - + // Create a schema + addDummySchema(TEST_TABLE_NAME); + // Create a table _offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setNumReplicas(2).build(); _helixResourceManager.addTable(_offlineTableConfig); @@ -72,6 +74,7 @@ public void testRebuildBrokerResourceWhenBrokerAdded() _helixResourceManager.rebuildBrokerResourceFromHelixTags(partitionName); // Add another table that needs to be rebuilt + addDummySchema(TEST_TABLE_TWO); TableConfig offlineTableConfigTwo = new TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_TABLE_TWO).build(); _helixResourceManager.addTable(offlineTableConfigTwo); String partitionNameTwo = offlineTableConfigTwo.getTableName(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java index 4d576bb8cf0..7e2f22f1fda 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java @@ -28,6 +28,8 @@ import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.util.TestUtils; @@ -58,7 +60,14 @@ public class ValidationManagerTest { public void setUp() throws Exception { TEST_INSTANCE.setupSharedStateAndValidate(); - + // Create a schema + Schema schema = new Schema.SchemaBuilder() + .setSchemaName(TEST_TABLE_NAME) + .addSingleValueDimension("dim1", FieldSpec.DataType.STRING) + .addMetric("metric1", FieldSpec.DataType.INT) + .build(); + TEST_INSTANCE.getHelixResourceManager().addSchema(schema, true, true); + // Create a table TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setNumReplicas(2).build(); TEST_INSTANCE.getHelixResourceManager().addTable(offlineTableConfig); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java index 3381d9c3838..6816d015582 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java @@ -112,7 +112,7 @@ private Schema getSchema() { private TableConfig getTableConfig(boolean disablePreload) { DimensionTableConfig dimensionTableConfig = new DimensionTableConfig(disablePreload); - return new TableConfigBuilder(TableType.OFFLINE).setTableName("dimBaseballTeams").setSchemaName("dimBaseballTeams") + return new TableConfigBuilder(TableType.OFFLINE).setTableName("dimBaseballTeams") .setDimensionTableConfig(dimensionTableConfig).build(); } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java index a2838d543f1..211d2d08ad1 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java @@ -255,7 +255,7 @@ private static TableDataManagerConfig createTableDataManagerConfig() { private static TableConfig setupTableConfig(ZkHelixPropertyStore propertyStore) throws Exception { TableConfig tableConfig = - new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setSchemaName(TABLE_NAME).build(); + new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).build(); ZNRecord tableConfigZNRecord = TableConfigUtils.toZNRecord(tableConfig); when(propertyStore.get(ZKMetadataProvider.constructPropertyStorePathForResourceConfig(TABLE_NAME_WITH_TYPE), null, AccessOption.PERSISTENT)).thenReturn(tableConfigZNRecord); diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java index 41ece0290b9..3157174e047 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java @@ -116,10 +116,6 @@ protected String getTableName() { return DEFAULT_TABLE_NAME; } - protected String getSchemaName() { - return DEFAULT_SCHEMA_NAME; - } - protected String getSchemaFileName() { return DEFAULT_SCHEMA_FILE_NAME; } @@ -270,14 +266,9 @@ protected Schema createSchema() InputStream inputStream = BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(getSchemaFileName()); Assert.assertNotNull(inputStream); - return Schema.fromInputStream(inputStream); - } - - /** - * Returns the schema in the cluster. - */ - protected Schema getSchema() { - return getSchema(getSchemaName()); + Schema schema = Schema.fromInputStream(inputStream); + schema.setSchemaName(getTableName()); + return schema; } protected Schema createSchema(File schemaFile) @@ -298,7 +289,7 @@ protected TableConfig createTableConfig(File tableConfigFile) * Creates a new OFFLINE table config. */ protected TableConfig createOfflineTableConfig() { - return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).setSchemaName(getSchemaName()) + return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()) .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn()) .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns()) .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns()) @@ -352,7 +343,7 @@ protected Map getStreamConfigMap() { */ protected TableConfig createRealtimeTableConfig(File sampleAvroFile) { AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile; - return new TableConfigBuilder(TableType.REALTIME).setTableName(getTableName()).setSchemaName(getSchemaName()) + return new TableConfigBuilder(TableType.REALTIME).setTableName(getTableName()) .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn()) .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns()) .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns()) @@ -374,7 +365,7 @@ protected TableConfig createUpsertTableConfig(File sampleAvroFile, String primar UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL); upsertConfig.setDeleteRecordColumn(deleteColumn); - return new TableConfigBuilder(TableType.REALTIME).setTableName(getTableName()).setSchemaName(getSchemaName()) + return new TableConfigBuilder(TableType.REALTIME).setTableName(getTableName()) .setTimeColumnName(getTimeColumnName()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()) .setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()) .setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()) @@ -406,12 +397,9 @@ protected Map getCSVDecoderProperties(@Nullable String delimiter /** * Creates a new Upsert enabled table config. */ - protected TableConfig createCSVUpsertTableConfig(String tableName, @Nullable String schemaName, - @Nullable String kafkaTopicName, int numPartitions, Map streamDecoderProperties, - UpsertConfig upsertConfig, String primaryKeyColumn) { - if (schemaName == null) { - schemaName = getSchemaName(); - } + protected TableConfig createCSVUpsertTableConfig(String tableName, @Nullable String kafkaTopicName, + int numPartitions, Map streamDecoderProperties, UpsertConfig upsertConfig, + String primaryKeyColumn) { Map columnPartitionConfigMap = new HashMap<>(); columnPartitionConfigMap.put(primaryKeyColumn, new ColumnPartitionConfig("Murmur", numPartitions)); @@ -428,7 +416,7 @@ protected TableConfig createCSVUpsertTableConfig(String tableName, @Nullable Str kafkaTopicName); streamConfigsMap.putAll(streamDecoderProperties); - return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setSchemaName(schemaName) + return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName) .setTimeColumnName(getTimeColumnName()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()) .setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()) .setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()) @@ -448,7 +436,7 @@ protected TableConfig createDedupTableConfig(File sampleAvroFile, String primary Map columnPartitionConfigMap = new HashMap<>(); columnPartitionConfigMap.put(primaryKeyColumn, new ColumnPartitionConfig("Murmur", numPartitions)); - return new TableConfigBuilder(TableType.REALTIME).setTableName(getTableName()).setSchemaName(getSchemaName()) + return new TableConfigBuilder(TableType.REALTIME).setTableName(getTableName()) .setTimeColumnName(getTimeColumnName()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()) .setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()) .setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java index 7ad116a8c73..b7ba0103ab3 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java @@ -58,7 +58,7 @@ public void setUp() // Create and upload the schema and table config with reduced number of columns and aggregate metrics on Schema schema = - new Schema.SchemaBuilder().setSchemaName(getSchemaName()).addSingleValueDimension("Carrier", DataType.STRING) + new Schema.SchemaBuilder().setSchemaName(getTableName()).addSingleValueDimension("Carrier", DataType.STRING) .addSingleValueDimension("Origin", DataType.STRING).addMetric("AirTime", DataType.LONG) .addMetric("ArrDelay", DataType.DOUBLE) .addDateTime("DaysSinceEpoch", DataType.INT, "1:DAYS:EPOCH", "1:DAYS").build(); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java index f757ac54dea..bcda7532ed6 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java @@ -673,12 +673,12 @@ public boolean isReloadJobCompleted(String reloadJobId) /** * TODO: Support removing new added columns for MutableSegment and remove the new added columns before running the - * next test. Use this to replace {@link OfflineClusterIntegrationTest#testDefaultColumns()}. + * next test. Use this to replace {@link OfflineClusterIntegrationTest#testDefaultColumns(boolean)}. */ public void testReload(boolean includeOfflineTable) throws Exception { String rawTableName = getTableName(); - Schema schema = getSchema(); + Schema schema = getSchema(getTableName()); String selectStarQuery = "SELECT * FROM " + rawTableName; JsonNode queryResponse = postQuery(selectStarQuery); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java index 967ef11bb57..39e4855f635 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java @@ -183,14 +183,21 @@ public void testSegmentStatusChecker() String disabledTable = "disabledTable"; String tableWithOfflineSegment = "tableWithOfflineSegment"; + Schema schema = createSchema(); _currentTable = emptyTable; + schema.setSchemaName(_currentTable); + addSchema(schema); addTableConfig(createOfflineTableConfig()); _currentTable = disabledTable; + schema.setSchemaName(_currentTable); + addSchema(schema); addTableConfig(createOfflineTableConfig()); _helixAdmin.enableResource(getHelixClusterName(), TableNameBuilder.OFFLINE.tableNameWithType(disabledTable), false); _currentTable = tableWithOfflineSegment; + schema.setSchemaName(_currentTable); + addSchema(schema); addTableConfig(createOfflineTableConfig()); uploadSegments(_currentTable, _tarDir); // Turn one replica of a segment OFFLINE @@ -247,6 +254,9 @@ public void testSegmentStatusChecker() dropOfflineTable(emptyTable); dropOfflineTable(disabledTable); dropOfflineTable(tableWithOfflineSegment); + deleteSchema(emptyTable); + deleteSchema(disabledTable); + deleteSchema(tableWithOfflineSegment); } private boolean checkSegmentStatusCheckerMetrics(ControllerMetrics controllerMetrics, String tableNameWithType, diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DedupIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DedupIntegrationTest.java index 3cb6003356c..0a56a49357a 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DedupIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DedupIntegrationTest.java @@ -86,11 +86,6 @@ protected String getSchemaFileName() { return "dedupIngestionTestSchema.schema"; } - @Override - protected String getSchemaName() { - return "dedupSchema"; - } - @Override protected String getAvroTarFileName() { return "dedupIngestionTestData.tar.gz"; diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java index 2d5a8109c27..7eb32ccb0a6 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java @@ -113,6 +113,14 @@ public void setUp() // Create and upload the schema and table config Schema schema = createSchema(); addSchema(schema); + schema.setSchemaName(SINGLE_LEVEL_CONCAT_TEST_TABLE); + addSchema(schema); + schema.setSchemaName(SINGLE_LEVEL_ROLLUP_TEST_TABLE); + addSchema(schema); + schema.setSchemaName(MULTI_LEVEL_CONCAT_TEST_TABLE); + addSchema(schema); + schema.setSchemaName(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE); + addSchema(schema); TableConfig singleLevelConcatTableConfig = createOfflineTableConfig(SINGLE_LEVEL_CONCAT_TEST_TABLE, getSingleLevelConcatTaskConfig()); TableConfig singleLevelRollupTableConfig = @@ -149,6 +157,8 @@ public void setUp() addTableConfig(tableConfig); _kafkaStarters.get(0) .createTopic(PROCESS_ALL_MODE_KAFKA_TOPIC, KafkaStarterUtils.getTopicCreationProps(getNumKafkaPartitions())); + schema.setSchemaName(MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE); + addSchema(schema); TableConfig singleLevelConcatProcessAllRealtimeTableConfig = createRealtimeTableConfigWithProcessAllMode(avroFiles.get(0), MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE, PROCESS_ALL_MODE_KAFKA_TOPIC); @@ -206,7 +216,7 @@ private TableConfig createOfflineTableConfig(String tableName, TableTaskConfig t private TableConfig createOfflineTableConfig(String tableName, TableTaskConfig taskConfig, @Nullable SegmentPartitionConfig partitionConfig) { - return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setSchemaName(getSchemaName()) + return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName) .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn()) .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns()) .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns()) @@ -236,7 +246,7 @@ protected TableConfig createRealtimeTableConfigWithProcessAllMode(File sampleAvr tableTaskConfigs.put("ActualElapsedTime.aggregationType", "min"); tableTaskConfigs.put("WeatherDelay.aggregationType", "sum"); tableTaskConfigs.put("mode", "processAll"); - return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setSchemaName(getSchemaName()) + return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName) .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn()) .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns()) .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns()) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index dcec37ceefb..35678bbea8c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -441,8 +441,11 @@ public void testUploadSameSegments() @Test public void testUploadSegmentRefreshOnly() throws Exception { + Schema schema = createSchema(); + schema.setSchemaName(SEGMENT_UPLOAD_TEST_TABLE); + addSchema(schema); TableConfig segmentUploadTestTableConfig = - new TableConfigBuilder(TableType.OFFLINE).setTableName(SEGMENT_UPLOAD_TEST_TABLE).setSchemaName(getSchemaName()) + new TableConfigBuilder(TableType.OFFLINE).setTableName(SEGMENT_UPLOAD_TEST_TABLE) .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn()) .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns()) .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns()) @@ -1432,7 +1435,7 @@ private void reloadWithMissingColumns() updateTableConfig(tableConfig); // Need to first delete then add the schema because removing columns is backward-incompatible change - deleteSchema(getSchemaName()); + deleteSchema(getTableName()); Schema schema = createSchema(); schema.removeField("AirlineID"); schema.removeField("ArrTime"); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java index 4170e526bcc..969f46ab7c2 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java @@ -198,7 +198,7 @@ protected String getTimeColumnName() { } protected TableConfig createOfflineTableConfig() { - return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).setSchemaName(getSchemaName()) + return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()) .setTimeColumnName(getTimeColumnName()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()) .setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()) .setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java index 7c762fa4a48..2e866805eaa 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java @@ -202,7 +202,7 @@ protected String getTimeColumnName() { } protected TableConfig createOfflineTableConfig() { - return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).setSchemaName(getSchemaName()) + return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()) .setTimeColumnName(getTimeColumnName()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()) .setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()) .setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterServerCPUTimeQueryKillingTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterServerCPUTimeQueryKillingTest.java index 35d8ea3b5ee..f88450b7dee 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterServerCPUTimeQueryKillingTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterServerCPUTimeQueryKillingTest.java @@ -205,7 +205,7 @@ protected String getTimeColumnName() { } protected TableConfig createOfflineTableConfig() { - return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).setSchemaName(getSchemaName()) + return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()) .setTimeColumnName(getTimeColumnName()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()) .setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()) .setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java index f2f66027667..f5709b44b3c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java @@ -174,7 +174,7 @@ public Boolean apply(@Nullable Void aVoid) { } public TableConfig createKinesisTableConfig() { - return new TableConfigBuilder(TableType.REALTIME).setTableName(getTableName()).setSchemaName(getTableName()) + return new TableConfigBuilder(TableType.REALTIME).setTableName(getTableName()) .setTimeColumnName("DaysSinceEpoch").setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()) .setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()) .setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java index ddfac35562f..043c654ef77 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java @@ -137,6 +137,8 @@ public void setUp() taskConfigsWithMetadata.put( BatchConfigProperties.PUSH_MODE, BatchConfigProperties.SegmentPushType.METADATA.toString()); String tableWithMetadataPush = "myTable2"; + schema.setSchemaName(tableWithMetadataPush); + addSchema(schema); TableConfig realtimeMetadataTableConfig = createRealtimeTableConfig(avroFiles.get(0), tableWithMetadataPush, new TableTaskConfig(Collections.singletonMap( MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, taskConfigsWithMetadata))); @@ -191,7 +193,7 @@ public void setUp() private TableConfig createOfflineTableConfig(String tableName, @Nullable TableTaskConfig taskConfig, @Nullable SegmentPartitionConfig partitionConfig) { - return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setSchemaName(getSchemaName()) + return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName) .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn()) .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns()) .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns()) @@ -203,7 +205,7 @@ private TableConfig createOfflineTableConfig(String tableName, @Nullable TableTa protected TableConfig createRealtimeTableConfig(File sampleAvroFile, String tableName, TableTaskConfig taskConfig) { AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile; - return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setSchemaName(getSchemaName()) + return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName) .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn()) .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns()) .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns()) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java index 561cd67bc69..b7e2ac077af 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java @@ -83,7 +83,7 @@ public void setUp() _avroFiles = unpackAvroData(_tempDir); // Create and upload the schema and table config with reduced number of columns and partition config - Schema schema = new Schema.SchemaBuilder().setSchemaName(getSchemaName()) + Schema schema = new Schema.SchemaBuilder().setSchemaName(getTableName()) .addSingleValueDimension(PARTITION_COLUMN, DataType.STRING) .addDateTime("DaysSinceEpoch", DataType.INT, "1:DAYS:EPOCH", "1:DAYS").build(); addSchema(schema); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java index 766fdaabf38..86b4231776c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java @@ -90,6 +90,9 @@ public void setUp() .forCluster(helixResourceManager.getHelixClusterName()).build(), properties); // Add 3 offline tables, where 2 of them have TestTask enabled + addDummySchema(TABLE_NAME_1); + addDummySchema(TABLE_NAME_2); + addDummySchema(TABLE_NAME_3); TableTaskConfig taskConfig = new TableTaskConfig(Collections.singletonMap(TASK_TYPE, Collections.emptyMap())); addTableConfig( new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_1).setTaskConfig(taskConfig).build()); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertCompactionMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertCompactionMinionClusterIntegrationTest.java index de645ca7495..4e9038ba5d8 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertCompactionMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertCompactionMinionClusterIntegrationTest.java @@ -61,11 +61,6 @@ protected String getSchemaFileName() { return "upsert_upload_segment_test.schema"; } - @Override - protected String getSchemaName() { - return "upsertSchema"; - } - @Override protected String getAvroTarFileName() { return "upsert_compaction_test.tar.gz"; diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java index a9aac6e37cd..e140020a39c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java @@ -82,7 +82,7 @@ public void setUp() addSchema(schema); Map csvDecoderProperties = getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER); - TableConfig tableConfig = createCSVUpsertTableConfig(getTableName(), getSchemaName(), getKafkaTopic(), + TableConfig tableConfig = createCSVUpsertTableConfig(getTableName(), getKafkaTopic(), getNumKafkaPartitions(), csvDecoderProperties, null, PRIMARY_KEY_COL); addTableConfig(tableConfig); @@ -112,11 +112,6 @@ protected String getSchemaFileName() { return "upsert_table_test.schema"; } - @Override - protected String getSchemaName() { - return "playerScores"; - } - @Nullable @Override protected String getTimeColumnName() { @@ -146,6 +141,7 @@ private Schema createSchema(String schemaFileName) Assert.assertNotNull(inputStream); return Schema.fromInputStream(inputStream); } + private long queryCountStarWithoutUpsert(String tableName) { return getPinotConnection().execute("SELECT COUNT(*) FROM " + tableName + " OPTION(skipUpsert=true)") .getResultSet(0).getLong(0); @@ -182,7 +178,10 @@ protected void testDeleteWithFullUpsert(String kafkaTopicName, String tableName, // SETUP // Create table with delete Record column Map csvDecoderProperties = getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER); - TableConfig tableConfig = createCSVUpsertTableConfig(tableName, getSchemaName(), kafkaTopicName, + Schema upsertSchema = createSchema(); + upsertSchema.setSchemaName(tableName); + addSchema(upsertSchema); + TableConfig tableConfig = createCSVUpsertTableConfig(tableName, kafkaTopicName, getNumKafkaPartitions(), csvDecoderProperties, upsertConfig, PRIMARY_KEY_COL); addTableConfig(tableConfig); @@ -245,7 +244,6 @@ protected void testDeleteWithFullUpsert(String kafkaTopicName, String tableName, } }, 100L, 600_000L, "Failed to load all upsert records for testDeleteWithFullUpsert"); - // Validate: pk is queryable and all columns are overwritten with new value rs = getPinotConnection() .execute("SELECT playerId, name, game FROM " + tableName + " WHERE playerId = 100").getResultSet(0); @@ -277,7 +275,6 @@ public void testDeleteWithPartialUpsert() protected void testDeleteWithPartialUpsert(String kafkaTopicName, String tableName, UpsertConfig upsertConfig) throws Exception { - final String partialUpsertSchemaName = "playerScoresPartialUpsert"; final String inputDataTarFile = "gameScores_partial_upsert_csv.tar.gz"; Map partialUpsertStrategies = new HashMap<>(); @@ -288,7 +285,10 @@ protected void testDeleteWithPartialUpsert(String kafkaTopicName, String tableNa // Create table with delete Record column Map csvDecoderProperties = getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER); - TableConfig tableConfig = createCSVUpsertTableConfig(tableName, partialUpsertSchemaName, kafkaTopicName, + Schema partialUpsertSchema = createSchema(PARTIAL_UPSERT_TABLE_SCHEMA); + partialUpsertSchema.setSchemaName(tableName); + addSchema(partialUpsertSchema); + TableConfig tableConfig = createCSVUpsertTableConfig(tableName, kafkaTopicName, getNumKafkaPartitions(), csvDecoderProperties, upsertConfig, PRIMARY_KEY_COL); addTableConfig(tableConfig); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java index 1a83754b965..b88f5fbdab8 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java @@ -140,11 +140,6 @@ protected String getSchemaFileName() { return "upsert_upload_segment_test.schema"; } - @Override - protected String getSchemaName() { - return "upsertSchema"; - } - @Override protected String getAvroTarFileName() { return "upsert_upload_segment_test.tar.gz"; diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java index 28edcb23b3f..2e9f472d985 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java @@ -120,11 +120,6 @@ protected String getSchemaFileName() { return "upsert_upload_segment_test.schema"; } - @Override - protected String getSchemaName() { - return "upsertSchema"; - } - @Override protected String getAvroTarFileName() { return "upsert_upload_segment_test.tar.gz"; diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/pinot-plugins.tar.gz b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/pinot-plugins.tar.gz new file mode 100644 index 00000000000..ca43659467d Binary files /dev/null and b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/pinot-plugins.tar.gz differ diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/test/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunnerTest.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/test/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunnerTest.java index ba3d77bf6af..1c659b8ef63 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/test/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunnerTest.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/test/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunnerTest.java @@ -66,8 +66,8 @@ public void testSegmentGeneration() throws Exception { FileUtils.touch(new File(outputDir, otherFilename)); // Set up schema file. - final String schemaName = "mySchema"; - File schemaFile = new File(testDir, "schema"); + final String schemaName = "myTable"; + File schemaFile = new File(testDir, "myTable.schema"); Schema schema = new SchemaBuilder() .setSchemaName(schemaName) .addSingleValueDimension("col1", DataType.STRING) @@ -76,10 +76,9 @@ public void testSegmentGeneration() throws Exception { FileUtils.write(schemaFile, schema.toPrettyJsonString(), StandardCharsets.UTF_8); // Set up table config file. - File tableConfigFile = new File(testDir, "tableConfig"); + File tableConfigFile = new File(testDir, "myTable.table"); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE) .setTableName("myTable") - .setSchemaName(schemaName) .setNumReplicas(1) .build(); FileUtils.write(tableConfigFile, tableConfig.toJsonString(), StandardCharsets.UTF_8); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/test/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunnerTest.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/test/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunnerTest.java index 4556f9fc651..e7a1cab97be 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/test/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunnerTest.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/test/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunnerTest.java @@ -75,8 +75,8 @@ public void testSegmentGeneration() throws Exception { FileUtils.touch(new File(outputDir, existingFilename)); // Set up schema file. - final String schemaName = "mySchema"; - File schemaFile = new File(testDir, "schema"); + final String schemaName = "myTable"; + File schemaFile = new File(testDir, "myTable.schema"); Schema schema = new SchemaBuilder() .setSchemaName(schemaName) .addSingleValueDimension("col1", DataType.STRING) @@ -85,10 +85,9 @@ public void testSegmentGeneration() throws Exception { FileUtils.write(schemaFile, schema.toPrettyJsonString(), StandardCharsets.UTF_8); // Set up table config file. - File tableConfigFile = new File(testDir, "tableConfig"); + File tableConfigFile = new File(testDir, "myTable.table"); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE) .setTableName("myTable") - .setSchemaName(schemaName) .setNumReplicas(1) .build(); FileUtils.write(tableConfigFile, tableConfig.toJsonString(), StandardCharsets.UTF_8); @@ -184,7 +183,6 @@ public void testInputFilesWithSameNameInDifferentDirectories() File tableConfigFile = new File(testDir, "tableConfig"); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE) .setTableName("myTable") - .setSchemaName(schemaName) .setNumReplicas(1) .build(); FileUtils.write(tableConfigFile, tableConfig.toJsonString(), StandardCharsets.UTF_8); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/test/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunnerTest.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/test/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunnerTest.java index 52ea6d2d9d0..85a00f69595 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/test/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunnerTest.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/test/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunnerTest.java @@ -75,8 +75,8 @@ public void testSegmentGeneration() throws Exception { FileUtils.touch(new File(outputDir, existingFilename)); // Set up schema file. - final String schemaName = "mySchema"; - File schemaFile = new File(testDir, "schema"); + final String schemaName = "myTable"; + File schemaFile = new File(testDir, "myTable.schema"); Schema schema = new SchemaBuilder() .setSchemaName(schemaName) .addSingleValueDimension("col1", DataType.STRING) @@ -85,10 +85,9 @@ public void testSegmentGeneration() throws Exception { FileUtils.write(schemaFile, schema.toPrettyJsonString(), StandardCharsets.UTF_8); // Set up table config file. - File tableConfigFile = new File(testDir, "tableConfig"); + File tableConfigFile = new File(testDir, "myTable.table"); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE) .setTableName("myTable") - .setSchemaName(schemaName) .setNumReplicas(1) .build(); FileUtils.write(tableConfigFile, tableConfig.toJsonString(), StandardCharsets.UTF_8); @@ -184,7 +183,6 @@ public void testInputFilesWithSameNameInDifferentDirectories() File tableConfigFile = new File(testDir, "tableConfig"); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE) .setTableName("myTable") - .setSchemaName(schemaName) .setNumReplicas(1) .build(); FileUtils.write(tableConfigFile, tableConfig.toJsonString(), StandardCharsets.UTF_8); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/test/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/test/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.java index bd2f8c8aa46..4eea09fe8b5 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/test/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/test/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.java @@ -251,7 +251,6 @@ private File makeTableConfigFile(File testDir, String schemaName) throws IOExcep File tableConfigFile = new File(testDir, "tableConfig"); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE) .setTableName("myTable") - .setSchemaName(schemaName) .setNumReplicas(1) .build(); FileUtils.write(tableConfigFile, tableConfig.toJsonString(), StandardCharsets.UTF_8); @@ -263,7 +262,7 @@ private File makeTableConfigFileWithConsistentPush(File testDir, String schemaNa IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null, "REFRESH", "DAILY", true)); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE) - .setTableName("myTable").setSchemaName(schemaName) + .setTableName("myTable") .setNumReplicas(1) .setIngestionConfig(ingestionConfig) .build(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 413a2d7eecb..890db75b062 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -82,6 +82,7 @@ import org.apache.pinot.spi.utils.DataSizeUtils; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.TimeUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.quartz.CronScheduleBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -143,6 +144,7 @@ public static void validate(TableConfig tableConfig, @Nullable Schema schema, @N // skip all validation if skip type ALL is selected. if (!skipTypes.contains(ValidationType.ALL)) { + validateTableSchemaConfig(tableConfig); validateValidationConfig(tableConfig, schema); StreamConfig streamConfig = null; @@ -202,6 +204,20 @@ public static void validateTableName(TableConfig tableConfig, boolean allowTable } } + /** + * Validates the table name with the following rule: + * - Schema name should either be null or match the raw table name + */ + private static void validateTableSchemaConfig(TableConfig tableConfig) { + // Ensure that table is not created if schema is not present + String rawTableName = TableNameBuilder.extractRawTableName(tableConfig.getTableName()); + String schemaName = tableConfig.getValidationConfig().getSchemaName(); + if (schemaName != null && !schemaName.equals(rawTableName)) { + throw new IllegalStateException( + "Schema name: " + schemaName + " does not match table name: " + rawTableName); + } + } + /** * Validates retention config. Checks for following things: * - Valid segmentPushType diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java index 9c23b571ea5..64b8014b818 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java @@ -64,7 +64,6 @@ public class TableConfigBuilder { private boolean _isDimTable; // Segments config related - private String _schemaName; private String _numReplicas = DEFAULT_NUM_REPLICAS; private String _timeColumnName; private String _timeType; @@ -147,11 +146,6 @@ public TableConfigBuilder setLLC(boolean isLLC) { return this; } - public TableConfigBuilder setSchemaName(String schemaName) { - _schemaName = schemaName; - return this; - } - public TableConfigBuilder setNumReplicas(int numReplicas) { Preconditions.checkArgument(numReplicas > 0); _numReplicas = String.valueOf(numReplicas); @@ -434,7 +428,6 @@ public TableConfig build() { validationConfig.setSegmentAssignmentStrategy(_segmentAssignmentStrategy); validationConfig.setReplicaGroupStrategyConfig(_replicaGroupStrategyConfig); validationConfig.setCompletionConfig(_completionConfig); - validationConfig.setSchemaName(_schemaName); validationConfig.setReplication(_numReplicas); validationConfig.setPeerSegmentDownloadScheme(_peerSegmentDownloadScheme); validationConfig.setCrypterClassName(_crypterClassName);