Skip to content

Commit

Permalink
Throw exception when schema name doesn't match table name during tabl…
Browse files Browse the repository at this point in the history
…e creation (#11591)

* Throw exception when schema doesn't matche table name

* Move the validation logic to TableConfigUtils
  • Loading branch information
xiangfu0 authored Oct 4, 2023
1 parent 3329d83 commit fb656c5
Show file tree
Hide file tree
Showing 49 changed files with 311 additions and 249 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ public void testResourceAndTagAssignment()
}

// Add a new table with same broker tenant
addDummySchema(newRawTableName);
newTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(newRawTableName)
.setServerTenant(TagNameUtils.DEFAULT_TENANT_NAME).build();
_helixResourceManager.addTable(newTableConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs, Notific
Map<String, String> configs = _helixAdmin.getConfig(helixConfigScope,
Arrays.asList(Helix.ENABLE_CASE_INSENSITIVE_KEY));
boolean caseInsensitive = Boolean.parseBoolean(configs.getOrDefault(Helix.ENABLE_CASE_INSENSITIVE_KEY,
Boolean.toString(Helix.DEFAULT_ENABLE_CASE_INSENSITIVE)));
Boolean.toString(Helix.DEFAULT_ENABLE_CASE_INSENSITIVE)));
_tableCache = new TableCache(_propertyStore, caseInsensitive);
}

Expand Down Expand Up @@ -1556,6 +1556,10 @@ public void addTable(TableConfig tableConfig)
PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(),
_enableBatchMessageMode);
TableType tableType = tableConfig.getTableType();
// Ensure that table is not created if schema is not present
if (ZKMetadataProvider.getSchema(_propertyStore, TableNameBuilder.extractRawTableName(tableNameWithType)) == null) {
throw new InvalidTableConfigException("No schema defined for table: " + tableNameWithType);
}
if (tableType == TableType.OFFLINE) {
try {
// Add table config
Expand All @@ -1571,18 +1575,6 @@ public void addTable(TableConfig tableConfig)
}
} else {
Preconditions.checkState(tableType == TableType.REALTIME, "Invalid table type: %s", tableType);

// Ensure that realtime table is not created if schema is not present
Schema schema =
ZKMetadataProvider.getSchema(_propertyStore, TableNameBuilder.extractRawTableName(tableNameWithType));
if (schema == null) {
// Fall back to getting schema-name from table config if schema-name != table-name
String schemaName = tableConfig.getValidationConfig().getSchemaName();
if (schemaName == null || ZKMetadataProvider.getSchema(_propertyStore, schemaName) == null) {
throw new InvalidTableConfigException("No schema defined for realtime table: " + tableNameWithType);
}
}

try {
// Add table config
ZKMetadataProvider.setTableConfig(_propertyStore, tableNameWithType, TableConfigUtils.toZNRecord(tableConfig));
Expand Down Expand Up @@ -2082,8 +2074,8 @@ public boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadat
tasks.put(jobId, jobMetadata);
if (tasks.size() > CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) {
tasks = tasks.entrySet().stream().sorted((v1, v2) -> Long.compare(
Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)),
Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS))))
Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)),
Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS))))
.collect(Collectors.toList()).subList(0, CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK)
.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ public void testGetBrokers()
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(),
10);

// Create schema
addDummySchema(TABLE_NAME_1);
addDummySchema(TABLE_NAME_2);

// Adding table
_helixResourceManager
.addTable(new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_1).setNumReplicas(1).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public void setUp()
throws Exception {
TEST_INSTANCE.setupSharedStateAndValidate();

// Create schema
TEST_INSTANCE.addDummySchema(TABLE_NAME);
// Adding table
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setSegmentAssignmentStrategy("RandomAssignmentStrategy").setNumReplicas(2).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ public void instanceRetagHappyPathTest()
public void instanceRetagServerDeficiencyTest()
throws Exception {
String tableName = "testTable";
DEFAULT_INSTANCE.addDummySchema(tableName);
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName)
.setNumReplicas(2).build();
// create table with replication as 2 so that DefaultTenant has a minimum server requirement as 2.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public void testListSegmentLineage()
throws Exception {
// Adding table
String rawTableName = "lineageTestTable";
TEST_INSTANCE.addDummySchema(rawTableName);
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).setNumReplicas(1).build();
Expand Down Expand Up @@ -116,6 +117,7 @@ public void testSegmentCrcApi()
throws Exception {
// Adding table
String rawTableName = "crcTestTable";
TEST_INSTANCE.addDummySchema(rawTableName);
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).setNumReplicas(1).build();
Expand Down Expand Up @@ -171,6 +173,7 @@ public void testDeleteSegmentsWithTimeWindow()
throws Exception {
// Adding table and segment
String rawTableName = "deleteWithTimeWindowTestTable";
TEST_INSTANCE.addDummySchema(rawTableName);
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).setNumReplicas(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,16 @@ public class PinotTableRestletResourceTest extends ControllerTest {
public void setUp()
throws Exception {
DEFAULT_INSTANCE.setupSharedStateAndValidate();

_createTableUrl = DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableCreate();
_offlineBuilder.setTableName(OFFLINE_TABLE_NAME).setTimeColumnName("timeColumn").setTimeType("DAYS")
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("5");

// add schema for realtime table
DEFAULT_INSTANCE.addDummySchema(REALTIME_TABLE_NAME);
DEFAULT_INSTANCE.addDummySchema(OFFLINE_TABLE_NAME);
StreamConfig streamConfig = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs();
_realtimeBuilder.setTableName(REALTIME_TABLE_NAME).setTimeColumnName("timeColumn").setTimeType("DAYS")
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("5").setSchemaName(REALTIME_TABLE_NAME)
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("5")
.setStreamConfigs(streamConfig.getStreamConfigsMap());
}

Expand Down Expand Up @@ -105,7 +105,18 @@ public void testCreateTable()
assertTrue(e.getMessage().contains("Got error status code: 400"));
}

// Creating an OFFLINE table without a valid schema should fail
offlineTableConfig = _offlineBuilder.setTableName("no_schema").build();
try {
sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString());
fail("Creation of a OFFLINE table without a valid schema does not fail");
} catch (IOException e) {
// Expected 400 Bad Request
assertTrue(e.getMessage().contains("Got error status code: 400"));
}

// Create an OFFLINE table with a valid name which should succeed
DEFAULT_INSTANCE.addDummySchema("valid_table_name");
offlineTableConfig = _offlineBuilder.setTableName("valid_table_name").build();
String offlineTableConfigString = offlineTableConfig.toJsonString();
sendPostRequest(_createTableUrl, offlineTableConfigString);
Expand Down Expand Up @@ -143,7 +154,7 @@ public void testCreateTable()
}

// Creating a REALTIME table without a valid schema should fail
realtimeTableConfig = _realtimeBuilder.setTableName("noSchema").setSchemaName("noSchema").build();
realtimeTableConfig = _realtimeBuilder.setTableName("noSchema").build();
try {
sendPostRequest(_createTableUrl, realtimeTableConfig.toJsonString());
fail("Creation of a REALTIME table without a valid schema does not fail");
Expand All @@ -152,10 +163,6 @@ public void testCreateTable()
assertTrue(e.getMessage().contains("Got error status code: 400"));
}

// Creating a REALTIME table with a different schema name in the config should succeed (backwards compatibility)
realtimeTableConfig = _realtimeBuilder.setSchemaName(REALTIME_TABLE_NAME).build();
sendPostRequest(_createTableUrl, realtimeTableConfig.toJsonString());

// Create a REALTIME table with the invalid time column name should fail
realtimeTableConfig =
_realtimeBuilder.setTableName(REALTIME_TABLE_NAME).setTimeColumnName("invalidTimeColumn").build();
Expand All @@ -174,8 +181,10 @@ public void testCreateTable()
}

@Test
public void testTableCronSchedule() {
public void testTableCronSchedule()
throws IOException {
String rawTableName = "test_table_cron_schedule";
DEFAULT_INSTANCE.addDummySchema(rawTableName);
// Failed to create a table
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).setTaskConfig(
new TableTaskConfig(ImmutableMap.of(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
Expand Down Expand Up @@ -224,6 +233,7 @@ public void testTableMinReplication()

private void testTableMinReplicationInternal(String tableName, int tableReplication)
throws Exception {
DEFAULT_INSTANCE.addDummySchema(tableName);
String tableJSONConfigString =
_offlineBuilder.setTableName(tableName).setNumReplicas(tableReplication).build().toJsonString();
sendPostRequest(_createTableUrl, tableJSONConfigString);
Expand All @@ -232,7 +242,6 @@ private void testTableMinReplicationInternal(String tableName, int tableReplicat
assertEquals(tableConfig.getReplication(),
Math.max(tableReplication, DEFAULT_MIN_NUM_REPLICAS));

DEFAULT_INSTANCE.addDummySchema(tableName);
tableJSONConfigString =
_realtimeBuilder.setTableName(tableName).setNumReplicas(tableReplication).build().toJsonString();
sendPostRequest(_createTableUrl, tableJSONConfigString);
Expand Down Expand Up @@ -299,6 +308,7 @@ private TableConfig getTableConfig(String tableName, String tableType)
public void testUpdateTableConfig()
throws Exception {
String tableName = "updateTC";
DEFAULT_INSTANCE.addDummySchema(tableName);
String tableConfigString = _offlineBuilder.setTableName(tableName).setNumReplicas(2).build().toJsonString();
sendPostRequest(_createTableUrl, tableConfigString);
// table creation should succeed
Expand Down Expand Up @@ -365,12 +375,13 @@ public void testListTables()

// post 2 offline, 1 realtime
String rawTableName1 = "pqr";
DEFAULT_INSTANCE.addDummySchema(rawTableName1);
TableConfig offlineTableConfig1 = _offlineBuilder.setTableName(rawTableName1).build();
sendPostRequest(_createTableUrl, offlineTableConfig1.toJsonString());
DEFAULT_INSTANCE.addDummySchema(rawTableName1);
TableConfig realtimeTableConfig1 = _realtimeBuilder.setTableName(rawTableName1).setNumReplicas(2).build();
sendPostRequest(_createTableUrl, realtimeTableConfig1.toJsonString());
String rawTableName2 = "abc";
DEFAULT_INSTANCE.addDummySchema(rawTableName2);
TableConfig offlineTableConfig2 = _offlineBuilder.setTableName(rawTableName2).build();
sendPostRequest(_createTableUrl, offlineTableConfig2.toJsonString());

Expand Down Expand Up @@ -479,6 +490,7 @@ public void rebalanceTableWithoutSegments()
public void testDeleteTable()
throws IOException {
// Case 1: Create a REALTIME table and delete it directly w/o using query param.
DEFAULT_INSTANCE.addDummySchema("table0");
TableConfig realtimeTableConfig = _realtimeBuilder.setTableName("table0").build();
String creationResponse = sendPostRequest(_createTableUrl, realtimeTableConfig.toJsonString());
assertEquals(creationResponse,
Expand All @@ -501,6 +513,7 @@ public void testDeleteTable()
assertEquals(deleteResponse, "{\"status\":\"Tables: [table0_OFFLINE] deleted\"}");

// Case 3: Create REALTIME and OFFLINE tables and delete both of them.
DEFAULT_INSTANCE.addDummySchema("table1");
TableConfig rtConfig1 = _realtimeBuilder.setTableName("table1").build();
creationResponse = sendPostRequest(_createTableUrl, rtConfig1.toJsonString());
assertEquals(creationResponse,
Expand All @@ -516,6 +529,7 @@ public void testDeleteTable()
assertEquals(deleteResponse, "{\"status\":\"Tables: [table1_OFFLINE, table1_REALTIME] deleted\"}");

// Case 4: Create REALTIME and OFFLINE tables and delete the realtime/offline table using query params.
DEFAULT_INSTANCE.addDummySchema("table2");
TableConfig rtConfig2 = _realtimeBuilder.setTableName("table2").build();
creationResponse = sendPostRequest(_createTableUrl, rtConfig2.toJsonString());
assertEquals(creationResponse,
Expand Down Expand Up @@ -553,6 +567,7 @@ public void testDeleteTable()
}

// Case 6: Create REALTIME and OFFLINE tables and delete the realtime/offline table using query params and suffixes.
DEFAULT_INSTANCE.addDummySchema("table3");
TableConfig rtConfig3 = _realtimeBuilder.setTableName("table3").build();
creationResponse = sendPostRequest(_createTableUrl, rtConfig3.toJsonString());
assertEquals(creationResponse,
Expand All @@ -577,13 +592,15 @@ public void testCheckTableState()
throws IOException {

// Create a valid REALTIME table
TableConfig realtimeTableConfig = _realtimeBuilder.setTableName("testTable").build();
String tableName = "testTable";
DEFAULT_INSTANCE.addDummySchema(tableName);
TableConfig realtimeTableConfig = _realtimeBuilder.setTableName(tableName).build();
String creationResponse = sendPostRequest(_createTableUrl, realtimeTableConfig.toJsonString());
assertEquals(creationResponse,
"{\"unrecognizedProperties\":{},\"status\":\"Table testTable_REALTIME successfully added\"}");

// Create a valid OFFLINE table
TableConfig offlineTableConfig = _offlineBuilder.setTableName("testTable").build();
TableConfig offlineTableConfig = _offlineBuilder.setTableName(tableName).build();
creationResponse = sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString());
assertEquals(creationResponse,
"{\"unrecognizedProperties\":{},\"status\":\"Table testTable_OFFLINE successfully added\"}");
Expand Down Expand Up @@ -708,6 +725,7 @@ public void testUnrecognizedProperties()
// Create an OFFLINE table with a valid name but with unrecognizedProperties which should succeed
// Should have unrecognizedProperties set correctly
String tableName = "valid_table_name_extra_props";
DEFAULT_INSTANCE.addDummySchema(tableName);
TableConfig offlineTableConfig = _realtimeBuilder.setTableName("valid_table_name_extra_props").build();
JsonNode jsonNode = JsonUtils.objectToJsonNode(offlineTableConfig);
((ObjectNode) jsonNode).put("illegalKey1", 1);
Expand Down
Loading

0 comments on commit fb656c5

Please sign in to comment.