From efd7786797a8143d231dd61a7451f69e7f44c6e7 Mon Sep 17 00:00:00 2001 From: Johan Adami <4760722+jadami10@users.noreply.github.com> Date: Fri, 23 Feb 2024 16:55:54 -0500 Subject: [PATCH] require noDictionaryColumns with aggregationConfigs (#12464) --- .../segment/local/utils/TableConfigUtils.java | 13 ++++++ .../local/utils/TableConfigUtilsTest.java | 40 +++++++++++++++---- 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 45889095b22..83c890d5f90 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -48,8 +48,10 @@ import org.apache.pinot.segment.local.recordtransformer.SchemaConformingTransformer; import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator; import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.segment.spi.index.DictionaryIndexConfig; import org.apache.pinot.segment.spi.index.IndexService; import org.apache.pinot.segment.spi.index.IndexType; +import org.apache.pinot.segment.spi.index.StandardIndexes; import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair; import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec; @@ -490,6 +492,17 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc Preconditions.checkState(new HashSet<>(schema.getMetricNames()).equals(aggregationColumns), "all metric columns must be aggregated"); } + + // This is required by MutableSegmentImpl.enableMetricsAggregationIfPossible(). + // That code will disable ingestion aggregation if all metrics aren't noDictionaryColumns. + // But if you do that after the table is already created, all future aggregations will + // just be the default value. + Map configPerCol = StandardIndexes.dictionary().getConfig(tableConfig, schema); + aggregationColumns.forEach(column -> { + DictionaryIndexConfig dictConfig = configPerCol.get(column); + Preconditions.checkState(dictConfig != null && dictConfig.isDisabled(), + "Aggregated column: %s must be a no-dictionary column", column); + }); } // Transform configs diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index 7c18185cae2..bcf8aa622ea 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -33,6 +33,7 @@ import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec; import org.apache.pinot.spi.config.table.HashFunction; +import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig; import org.apache.pinot.spi.config.table.RoutingConfig; import org.apache.pinot.spi.config.table.SegmentPartitionConfig; @@ -293,6 +294,9 @@ public void validateIngestionConfig() { } // using a transformation column in an aggregation + IndexingConfig indexingConfig = new IndexingConfig(); + indexingConfig.setNoDictionaryColumns(List.of("twiceSum")); + tableConfig.setIndexingConfig(indexingConfig); schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("twiceSum", FieldSpec.DataType.DOUBLE).build(); ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("twice", "col * 2"))); @@ -303,6 +307,7 @@ public void validateIngestionConfig() { schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING) .build(); + indexingConfig.setNoDictionaryColumns(List.of("myCol")); ingestionConfig.setAggregationConfigs(null); ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("myCol", "reverse(anotherCol)"))); TableConfigUtils.validate(tableConfig, schema); @@ -508,6 +513,27 @@ public void ingestionAggregationConfigsTest() { // expected } + ingestionConfig.setAggregationConfigs(Collections.singletonList(new AggregationConfig("m1", "SUM(m1)"))); + try { + TableConfigUtils.validateIngestionConfig(tableConfig, schema); + Assert.fail("Should fail due to noDictionaryColumns being null"); + } catch (IllegalStateException e) { + // expected + } + + IndexingConfig indexingConfig = new IndexingConfig(); + indexingConfig.setNoDictionaryColumns(List.of()); + tableConfig.setIndexingConfig(indexingConfig); + + try { + TableConfigUtils.validateIngestionConfig(tableConfig, schema); + Assert.fail("Should fail due to noDictionaryColumns not containing m1"); + } catch (IllegalStateException e) { + // expected + } + + indexingConfig.setNoDictionaryColumns(List.of("m1")); + ingestionConfig.setAggregationConfigs(Collections.singletonList(new AggregationConfig("m1", "SUM(m1)"))); TableConfigUtils.validateIngestionConfig(tableConfig, schema); @@ -528,7 +554,7 @@ public void ingestionAggregationConfigsTest() { ingestionConfig.setAggregationConfigs(aggregationConfigs); tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn") - .setIngestionConfig(ingestionConfig).build(); + .setIngestionConfig(ingestionConfig).setNoDictionaryColumns(List.of("d1")).build(); try { TableConfigUtils.validateIngestionConfig(tableConfig, schema); @@ -549,7 +575,7 @@ public void ingestionAggregationConfigsTest() { ingestionConfig.setAggregationConfigs(aggregationConfigs); tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn") - .setIngestionConfig(ingestionConfig).build(); + .setIngestionConfig(ingestionConfig).setNoDictionaryColumns(List.of("d1", "d2", "d3", "d4", "d5")).build(); try { TableConfigUtils.validateIngestionConfig(tableConfig, schema); @@ -567,7 +593,7 @@ public void ingestionAggregationConfigsTest() { ingestionConfig.setAggregationConfigs(aggregationConfigs); tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn") - .setIngestionConfig(ingestionConfig).build(); + .setIngestionConfig(ingestionConfig).setNoDictionaryColumns(List.of("d1", "d2", "d3", "d4", "d5")).build(); try { TableConfigUtils.validateIngestionConfig(tableConfig, schema); @@ -582,7 +608,7 @@ public void ingestionAggregationConfigsTest() { ingestionConfig.setAggregationConfigs(aggregationConfigs); tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn") - .setIngestionConfig(ingestionConfig).build(); + .setIngestionConfig(ingestionConfig).setNoDictionaryColumns(List.of("d1")).build(); try { TableConfigUtils.validateIngestionConfig(tableConfig, schema); @@ -594,7 +620,7 @@ public void ingestionAggregationConfigsTest() { ingestionConfig.setAggregationConfigs(aggregationConfigs); tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn") - .setIngestionConfig(ingestionConfig).build(); + .setIngestionConfig(ingestionConfig).setNoDictionaryColumns(List.of("d1")).build(); try { TableConfigUtils.validateIngestionConfig(tableConfig, schema); @@ -616,7 +642,7 @@ public void ingestionAggregationConfigsTest() { ingestionConfig.setAggregationConfigs(aggregationConfigs); tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn") - .setIngestionConfig(ingestionConfig).build(); + .setIngestionConfig(ingestionConfig).setNoDictionaryColumns(List.of("d1", "d2", "d3", "d4", "d5")).build(); TableConfigUtils.validateIngestionConfig(tableConfig, schema); // with too many arguments should fail @@ -629,7 +655,7 @@ public void ingestionAggregationConfigsTest() { ingestionConfig.setAggregationConfigs(aggregationConfigs); tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn") - .setIngestionConfig(ingestionConfig).build(); + .setIngestionConfig(ingestionConfig).setNoDictionaryColumns(List.of("d1")).build(); try { TableConfigUtils.validateIngestionConfig(tableConfig, schema);