Skip to content

Commit

Permalink
require noDictionaryColumns with aggregationConfigs (#12464)
Browse files Browse the repository at this point in the history
  • Loading branch information
jadami10 authored Feb 23, 2024
1 parent 7f09cc8 commit efd7786
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@
import org.apache.pinot.segment.local.recordtransformer.SchemaConformingTransformer;
import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
import org.apache.pinot.segment.spi.index.IndexService;
import org.apache.pinot.segment.spi.index.IndexType;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
Expand Down Expand Up @@ -490,6 +492,17 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
Preconditions.checkState(new HashSet<>(schema.getMetricNames()).equals(aggregationColumns),
"all metric columns must be aggregated");
}

// This is required by MutableSegmentImpl.enableMetricsAggregationIfPossible().
// That code will disable ingestion aggregation if all metrics aren't noDictionaryColumns.
// But if you do that after the table is already created, all future aggregations will
// just be the default value.
Map<String, DictionaryIndexConfig> configPerCol = StandardIndexes.dictionary().getConfig(tableConfig, schema);
aggregationColumns.forEach(column -> {
DictionaryIndexConfig dictConfig = configPerCol.get(column);
Preconditions.checkState(dictConfig != null && dictConfig.isDisabled(),
"Aggregated column: %s must be a no-dictionary column", column);
});
}

// Transform configs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.RoutingConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
Expand Down Expand Up @@ -293,6 +294,9 @@ public void validateIngestionConfig() {
}

// using a transformation column in an aggregation
IndexingConfig indexingConfig = new IndexingConfig();
indexingConfig.setNoDictionaryColumns(List.of("twiceSum"));
tableConfig.setIndexingConfig(indexingConfig);
schema =
new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("twiceSum", FieldSpec.DataType.DOUBLE).build();
ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("twice", "col * 2")));
Expand All @@ -303,6 +307,7 @@ public void validateIngestionConfig() {
schema =
new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
.build();
indexingConfig.setNoDictionaryColumns(List.of("myCol"));
ingestionConfig.setAggregationConfigs(null);
ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("myCol", "reverse(anotherCol)")));
TableConfigUtils.validate(tableConfig, schema);
Expand Down Expand Up @@ -508,6 +513,27 @@ public void ingestionAggregationConfigsTest() {
// expected
}

ingestionConfig.setAggregationConfigs(Collections.singletonList(new AggregationConfig("m1", "SUM(m1)")));
try {
TableConfigUtils.validateIngestionConfig(tableConfig, schema);
Assert.fail("Should fail due to noDictionaryColumns being null");
} catch (IllegalStateException e) {
// expected
}

IndexingConfig indexingConfig = new IndexingConfig();
indexingConfig.setNoDictionaryColumns(List.of());
tableConfig.setIndexingConfig(indexingConfig);

try {
TableConfigUtils.validateIngestionConfig(tableConfig, schema);
Assert.fail("Should fail due to noDictionaryColumns not containing m1");
} catch (IllegalStateException e) {
// expected
}

indexingConfig.setNoDictionaryColumns(List.of("m1"));

ingestionConfig.setAggregationConfigs(Collections.singletonList(new AggregationConfig("m1", "SUM(m1)")));
TableConfigUtils.validateIngestionConfig(tableConfig, schema);

Expand All @@ -528,7 +554,7 @@ public void ingestionAggregationConfigsTest() {
ingestionConfig.setAggregationConfigs(aggregationConfigs);
tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
.setIngestionConfig(ingestionConfig).build();
.setIngestionConfig(ingestionConfig).setNoDictionaryColumns(List.of("d1")).build();

try {
TableConfigUtils.validateIngestionConfig(tableConfig, schema);
Expand All @@ -549,7 +575,7 @@ public void ingestionAggregationConfigsTest() {
ingestionConfig.setAggregationConfigs(aggregationConfigs);
tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
.setIngestionConfig(ingestionConfig).build();
.setIngestionConfig(ingestionConfig).setNoDictionaryColumns(List.of("d1", "d2", "d3", "d4", "d5")).build();

try {
TableConfigUtils.validateIngestionConfig(tableConfig, schema);
Expand All @@ -567,7 +593,7 @@ public void ingestionAggregationConfigsTest() {
ingestionConfig.setAggregationConfigs(aggregationConfigs);
tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
.setIngestionConfig(ingestionConfig).build();
.setIngestionConfig(ingestionConfig).setNoDictionaryColumns(List.of("d1", "d2", "d3", "d4", "d5")).build();

try {
TableConfigUtils.validateIngestionConfig(tableConfig, schema);
Expand All @@ -582,7 +608,7 @@ public void ingestionAggregationConfigsTest() {
ingestionConfig.setAggregationConfigs(aggregationConfigs);
tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
.setIngestionConfig(ingestionConfig).build();
.setIngestionConfig(ingestionConfig).setNoDictionaryColumns(List.of("d1")).build();

try {
TableConfigUtils.validateIngestionConfig(tableConfig, schema);
Expand All @@ -594,7 +620,7 @@ public void ingestionAggregationConfigsTest() {
ingestionConfig.setAggregationConfigs(aggregationConfigs);
tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
.setIngestionConfig(ingestionConfig).build();
.setIngestionConfig(ingestionConfig).setNoDictionaryColumns(List.of("d1")).build();

try {
TableConfigUtils.validateIngestionConfig(tableConfig, schema);
Expand All @@ -616,7 +642,7 @@ public void ingestionAggregationConfigsTest() {
ingestionConfig.setAggregationConfigs(aggregationConfigs);
tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
.setIngestionConfig(ingestionConfig).build();
.setIngestionConfig(ingestionConfig).setNoDictionaryColumns(List.of("d1", "d2", "d3", "d4", "d5")).build();
TableConfigUtils.validateIngestionConfig(tableConfig, schema);

// with too many arguments should fail
Expand All @@ -629,7 +655,7 @@ public void ingestionAggregationConfigsTest() {
ingestionConfig.setAggregationConfigs(aggregationConfigs);
tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
.setIngestionConfig(ingestionConfig).build();
.setIngestionConfig(ingestionConfig).setNoDictionaryColumns(List.of("d1")).build();

try {
TableConfigUtils.validateIngestionConfig(tableConfig, schema);
Expand Down

0 comments on commit efd7786

Please sign in to comment.