Skip to content

Commit

Permalink
Adds per-column, query-time index skip option (#12414)
Browse files Browse the repository at this point in the history
* Adds per-column, query-time index skip option

* Adds tests

* Throw ParseException if indexSkipConfig parsing fails

* Throw RuntimeException instead of ParseException

* Empty commit to re-run tests
  • Loading branch information
egalpin authored Feb 27, 2024
1 parent d9ac0ef commit 5dc807b
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode;
Expand All @@ -34,6 +37,7 @@
* Utils to parse query options.
*/
public class QueryOptionsUtils {

private QueryOptionsUtils() {
}

Expand Down Expand Up @@ -145,6 +149,34 @@ public static boolean isSkipScanFilterReorder(Map<String, String> queryOptions)
return "false".equalsIgnoreCase(queryOptions.get(QueryOptionKey.USE_SCAN_REORDER_OPTIMIZATION));
}

@Nullable
public static Map<String, Set<FieldConfig.IndexType>> getIndexSkipConfig(Map<String, String> queryOptions) {
// Example config: indexSkipConfig='col1=inverted,range&col2=inverted'
String indexSkipConfigStr = queryOptions.get(QueryOptionKey.INDEX_SKIP_CONFIG);
if (indexSkipConfigStr == null) {
return null;
}

String[] perColumnIndexSkip = indexSkipConfigStr.split("&");
Map<String, Set<FieldConfig.IndexType>> indexSkipConfig = new HashMap<>();
for (String columnConf : perColumnIndexSkip) {
String[] conf = columnConf.split("=");
if (conf.length != 2) {
throw new RuntimeException("Invalid format for " + QueryOptionKey.INDEX_SKIP_CONFIG
+ ". Example of valid format: SET indexSkipConfig='col1=inverted,range&col2=inverted'");
}
String columnName = conf[0];
String[] indexTypes = conf[1].split(",");

for (String indexType : indexTypes) {
indexSkipConfig.computeIfAbsent(columnName, k -> new HashSet<>())
.add(FieldConfig.IndexType.valueOf(indexType.toUpperCase()));
}
}

return indexSkipConfig;
}

@Nullable
public static Boolean isUseFixedReplica(Map<String, String> queryOptions) {
String useFixedReplica = queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.USE_FIXED_REPLICA);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.sql.parsers.parser.ParseException;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand All @@ -43,4 +46,25 @@ public void shouldConvertCaseInsensitiveMapToUseCorrectValues() {
Assert.assertEquals(resolved.get(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING), "true");
Assert.assertEquals(resolved.get(CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE), "false");
}

@Test
public void testIndexSkipConfigParsing()
throws ParseException {
String indexSkipConfigStr = "col1=inverted,range&col2=sorted";
Map<String, String> queryOptions =
Map.of(CommonConstants.Broker.Request.QueryOptionKey.INDEX_SKIP_CONFIG, indexSkipConfigStr);
Map<String, Set<FieldConfig.IndexType>> indexSkipConfig = QueryOptionsUtils.getIndexSkipConfig(queryOptions);
Assert.assertEquals(indexSkipConfig.get("col1"),
Set.of(FieldConfig.IndexType.RANGE, FieldConfig.IndexType.INVERTED));
Assert.assertEquals(indexSkipConfig.get("col2"),
Set.of(FieldConfig.IndexType.SORTED));
}

@Test(expectedExceptions = RuntimeException.class)
public void testIndexSkipConfigParsingInvalid() {
String indexSkipConfigStr = "col1=inverted,range&col2";
Map<String, String> queryOptions =
Map.of(CommonConstants.Broker.Request.QueryOptionKey.INDEX_SKIP_CONFIG, indexSkipConfigStr);
QueryOptionsUtils.getIndexSkipConfig(queryOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;


Expand Down Expand Up @@ -95,29 +96,36 @@ public BaseFilterOperator getLeafFilterOperator(QueryContext queryContext, Predi
// operator is used only if the column is sorted and has dictionary.
Predicate.Type predicateType = predicateEvaluator.getPredicateType();
if (predicateType == Predicate.Type.RANGE) {
if (dataSource.getDataSourceMetadata().isSorted() && dataSource.getDictionary() != null) {
if (dataSource.getDataSourceMetadata().isSorted() && dataSource.getDictionary() != null
&& queryContext.isIndexUseAllowed(dataSource, FieldConfig.IndexType.SORTED)) {
return new SortedIndexBasedFilterOperator(queryContext, predicateEvaluator, dataSource, numDocs);
}
if (RangeIndexBasedFilterOperator.canEvaluate(predicateEvaluator, dataSource)) {
if (RangeIndexBasedFilterOperator.canEvaluate(predicateEvaluator, dataSource)
&& queryContext.isIndexUseAllowed(dataSource, FieldConfig.IndexType.RANGE)) {
return new RangeIndexBasedFilterOperator(queryContext, predicateEvaluator, dataSource, numDocs);
}
return new ScanBasedFilterOperator(queryContext, predicateEvaluator, dataSource, numDocs);
} else if (predicateType == Predicate.Type.REGEXP_LIKE) {
if (dataSource.getFSTIndex() != null && dataSource.getDataSourceMetadata().isSorted()) {
if (dataSource.getFSTIndex() != null && dataSource.getDataSourceMetadata().isSorted()
&& queryContext.isIndexUseAllowed(dataSource, FieldConfig.IndexType.SORTED)) {
return new SortedIndexBasedFilterOperator(queryContext, predicateEvaluator, dataSource, numDocs);
}
if (dataSource.getFSTIndex() != null && dataSource.getInvertedIndex() != null) {
if (dataSource.getFSTIndex() != null && dataSource.getInvertedIndex() != null
&& queryContext.isIndexUseAllowed(dataSource, FieldConfig.IndexType.INVERTED)) {
return new InvertedIndexFilterOperator(queryContext, predicateEvaluator, dataSource, numDocs);
}
return new ScanBasedFilterOperator(queryContext, predicateEvaluator, dataSource, numDocs);
} else {
if (dataSource.getDataSourceMetadata().isSorted() && dataSource.getDictionary() != null) {
if (dataSource.getDataSourceMetadata().isSorted() && dataSource.getDictionary() != null
&& queryContext.isIndexUseAllowed(dataSource, FieldConfig.IndexType.SORTED)) {
return new SortedIndexBasedFilterOperator(queryContext, predicateEvaluator, dataSource, numDocs);
}
if (dataSource.getInvertedIndex() != null) {
if (dataSource.getInvertedIndex() != null
&& queryContext.isIndexUseAllowed(dataSource, FieldConfig.IndexType.INVERTED)) {
return new InvertedIndexFilterOperator(queryContext, predicateEvaluator, dataSource, numDocs);
}
if (RangeIndexBasedFilterOperator.canEvaluate(predicateEvaluator, dataSource)) {
if (RangeIndexBasedFilterOperator.canEvaluate(predicateEvaluator, dataSource)
&& queryContext.isIndexUseAllowed(dataSource, FieldConfig.IndexType.RANGE)) {
return new RangeIndexBasedFilterOperator(queryContext, predicateEvaluator, dataSource, numDocs);
}
return new ScanBasedFilterOperator(queryContext, predicateEvaluator, dataSource, numDocs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
import org.apache.pinot.segment.spi.index.reader.VectorIndexReader;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.roaringbitmap.buffer.MutableRoaringBitmap;

Expand Down Expand Up @@ -152,7 +153,8 @@ private boolean canApplyH3IndexForDistanceCheck(Predicate predicate, FunctionCon
findLiteral = true;
}
}
return columnName != null && _indexSegment.getDataSource(columnName).getH3Index() != null && findLiteral;
return columnName != null && _indexSegment.getDataSource(columnName).getH3Index() != null && findLiteral
&& _queryContext.isIndexUseAllowed(columnName, FieldConfig.IndexType.H3);
}

/**
Expand Down Expand Up @@ -182,14 +184,16 @@ private boolean canApplyH3IndexForInclusionCheck(Predicate predicate, FunctionCo
if (arguments.get(0).getType() == ExpressionContext.Type.IDENTIFIER
&& arguments.get(1).getType() == ExpressionContext.Type.LITERAL) {
String columnName = arguments.get(0).getIdentifier();
return _indexSegment.getDataSource(columnName).getH3Index() != null;
return _indexSegment.getDataSource(columnName).getH3Index() != null
&& _queryContext.isIndexUseAllowed(columnName, FieldConfig.IndexType.H3);
}
return false;
} else {
if (arguments.get(1).getType() == ExpressionContext.Type.IDENTIFIER
&& arguments.get(0).getType() == ExpressionContext.Type.LITERAL) {
String columnName = arguments.get(1).getIdentifier();
return _indexSegment.getDataSource(columnName).getH3Index() != null;
return _indexSegment.getDataSource(columnName).getH3Index() != null
&& _queryContext.isIndexUseAllowed(columnName, FieldConfig.IndexType.H3);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ private void applyQueryOptions(QueryContext queryContext) {
// Set skipScanFilterReorder
queryContext.setSkipScanFilterReorder(QueryOptionsUtils.isSkipScanFilterReorder(queryOptions));

queryContext.setIndexSkipConfig(QueryOptionsUtils.getIndexSkipConfig(queryOptions));

// Set maxExecutionThreads
int maxExecutionThreads;
Integer maxExecutionThreadsFromQuery = QueryOptionsUtils.getMaxExecutionThreads(queryOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
import org.apache.pinot.core.util.MemoizedClassAssociation;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.config.table.FieldConfig;


/**
Expand Down Expand Up @@ -123,6 +125,8 @@ public class QueryContext {
private boolean _nullHandlingEnabled;
// Whether server returns the final result
private boolean _serverReturnFinalResult;
// Collection of index types to skip per column
private Map<String, Set<FieldConfig.IndexType>> _indexSkipConfig;

private QueryContext(@Nullable String tableName, @Nullable QueryContext subquery,
List<ExpressionContext> selectExpressions, boolean distinct, List<String> aliasList,
Expand Down Expand Up @@ -428,6 +432,21 @@ public String toString() {
+ ", _expressionOverrideHints=" + _expressionOverrideHints + ", _explain=" + _explain + '}';
}

public void setIndexSkipConfig(Map<String, Set<FieldConfig.IndexType>> indexSkipConfig) {
_indexSkipConfig = indexSkipConfig;
}

public boolean isIndexUseAllowed(String columnName, FieldConfig.IndexType indexType) {
if (_indexSkipConfig == null) {
return true;
}
return !_indexSkipConfig.getOrDefault(columnName, Collections.EMPTY_SET).contains(indexType);
}

public boolean isIndexUseAllowed(DataSource dataSource, FieldConfig.IndexType indexType) {
return isIndexUseAllowed(dataSource.getColumnName(), indexType);
}

public static class Builder {
private String _tableName;
private QueryContext _subquery;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import static org.apache.pinot.common.function.scalar.StringFunctions.*;
import static org.apache.pinot.controller.helix.core.PinotHelixResourceManager.EXTERNAL_VIEW_CHECK_INTERVAL_MS;
import static org.apache.pinot.controller.helix.core.PinotHelixResourceManager.EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS;
import static org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey.INDEX_SKIP_CONFIG;
import static org.testng.Assert.*;


Expand Down Expand Up @@ -598,7 +599,7 @@ public void testInvertedIndexTriggering()
assertEquals(getTableSize(getTableName()), DISK_SIZE_IN_BYTES);
}

private void addInvertedIndex()
private void addInvertedIndex(boolean shouldReload)
throws Exception {
// Update table config to add inverted index on DivActualElapsedTime column, and
// reload the table to get config change into effect and add the inverted index.
Expand All @@ -610,8 +611,36 @@ private void addInvertedIndex()
// After all segments are reloaded, the inverted index is added on DivActualElapsedTime.
// It's expected to have numEntriesScannedInFilter equal to 0, i.e. no docs is scanned
// at filtering stage when inverted index can answer the predicate directly.
reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, false, getCountStarResult());
assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L);
if (shouldReload) {
reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, false, getCountStarResult());
assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L);
}
}
private void addInvertedIndex()
throws Exception {
addInvertedIndex(true);
}

private void addRangeIndex(boolean shouldReload)
throws Exception {
// Update table config to add Range index on DivActualElapsedTime column, and
// reload the table to get config change into effect and add the Range index.
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.getIndexingConfig().setRangeIndexColumns(UPDATED_RANGE_INDEX_COLUMNS);
updateTableConfig(tableConfig);

// It takes a while to reload multiple segments, thus we retry the query for some time.
// After all segments are reloaded, the range index is added on DivActualElapsedTime.
// It's expected to have numEntriesScannedInFilter equal to 0, i.e. no docs is scanned
// at filtering stage when inverted index can answer the predicate directly.
if (shouldReload) {
reloadAllSegments(TEST_UPDATED_RANGE_INDEX_QUERY, false, getCountStarResult());
assertEquals(postQuery(TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L);
}
}

private void addRangeIndex() throws Exception {
addRangeIndex(true);
}

@Test(dataProvider = "useBothQueryEngines")
Expand Down Expand Up @@ -1277,14 +1306,10 @@ public void testRangeIndexTriggering(boolean useMultiStageQueryEngine)
assertEquals(postQuery(TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs);

// Update table config and trigger reload
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.getIndexingConfig().setRangeIndexColumns(UPDATED_RANGE_INDEX_COLUMNS);
updateTableConfig(tableConfig);
reloadAllSegments(TEST_UPDATED_RANGE_INDEX_QUERY, false, numTotalDocs);
assertEquals(postQuery(TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L);
addRangeIndex();

// Update table config to remove the new range index, and check if the new range index is removed
tableConfig = getOfflineTableConfig();
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.getIndexingConfig().setRangeIndexColumns(getRangeIndexColumns());
updateTableConfig(tableConfig);
reloadAllSegments(TEST_UPDATED_RANGE_INDEX_QUERY, true, numTotalDocs);
Expand Down Expand Up @@ -3261,4 +3286,58 @@ public void testBooleanAggregation()
testQuery("SELECT BOOL_AND(CAST(Cancelled AS BOOLEAN)) FROM mytable");
testQuery("SELECT BOOL_OR(CAST(Diverted AS BOOLEAN)) FROM mytable");
}

private String buildIndexSkipConfig(String columnsAndIndexes) {
return "SET " + INDEX_SKIP_CONFIG + "='" + columnsAndIndexes + "'; ";
}

@Test(dataProvider = "useBothQueryEngines")
public void testIndexSkipConfig(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
long numTotalDocs = getCountStarResult();
assertEquals(postQuery(TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs);

// Update table config to add range and inverted index, and trigger reload
addRangeIndex(false); // skip segment reload and instead reload after also adding inverted index
addInvertedIndex();

// Ensure inv index is operational
assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L);

// disallow use of range index on DivActualElapsedTime, inverted should be unaffected
String indexSkipConf = buildIndexSkipConfig("DivActualElapsedTime=range");
assertEquals(postQuery(
indexSkipConf + TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L);
assertEquals(postQuery(
indexSkipConf + TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs);

// disallow use of inverted index on DivActualElapsedTime, range should be unaffected
indexSkipConf = buildIndexSkipConfig("DivActualElapsedTime=inverted");
// Confirm that inverted index is not used
assertFalse(postQuery(indexSkipConf + " EXPLAIN PLAN FOR " + TEST_UPDATED_INVERTED_INDEX_QUERY).toString()
.contains("FILTER_INVERTED_INDEX"));

// EQ predicate type allows for using range index if one exists, even if inverted index is skipped. That is why
// we still see no docs scanned even though we skip the inverted index. This is a good test to show that using
// the indexSkipConfig can allow fine-grained experimentation of index usage at query time.
assertEquals(postQuery(
indexSkipConf + TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L);
assertEquals(postQuery(
indexSkipConf + TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L);

// disallow use of both range and inverted indexes on DivActualElapsedTime, neither should be used at query time
indexSkipConf = buildIndexSkipConfig("DivActualElapsedTime=inverted,range");
assertEquals(postQuery(
indexSkipConf + TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs);
assertEquals(postQuery(
indexSkipConf + TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs);

// Update table config to remove the new indexes, and check if the new indexes are removed
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.getIndexingConfig().setRangeIndexColumns(getRangeIndexColumns());
tableConfig.getIndexingConfig().setInvertedIndexColumns(getInvertedIndexColumns());
updateTableConfig(tableConfig);
reloadAllSegments(TEST_UPDATED_RANGE_INDEX_QUERY, true, numTotalDocs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ public interface DataSource {
*/
ForwardIndexReader<?> getForwardIndex();

/**
* Returns the column name to which this data source pertains
*/
default String getColumnName() {
return getDataSourceMetadata().getFieldSpec().getName();
}

/**
* Returns the dictionary for the column if it is dictionary-encoded, or {@code null} if not.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ public static class QueryOptionKey {
public static final String SERVER_RETURN_FINAL_RESULT = "serverReturnFinalResult";
// Reorder scan based predicates based on cardinality and number of selected values
public static final String AND_SCAN_REORDERING = "AndScanReordering";
public static final String INDEX_SKIP_CONFIG = "indexSkipConfig";

public static final String ORDER_BY_ALGORITHM = "orderByAlgorithm";

Expand Down

0 comments on commit 5dc807b

Please sign in to comment.