Skip to content

Commit

Permalink
Egalpin/skip indexes minor changes (#12514)
Browse files Browse the repository at this point in the history
  • Loading branch information
egalpin authored Mar 1, 2024
1 parent 1bfc6ac commit f51c34f
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
Expand Down Expand Up @@ -150,31 +151,31 @@ public static boolean isSkipScanFilterReorder(Map<String, String> queryOptions)
}

@Nullable
public static Map<String, Set<FieldConfig.IndexType>> getIndexSkipConfig(Map<String, String> queryOptions) {
// Example config: indexSkipConfig='col1=inverted,range&col2=inverted'
String indexSkipConfigStr = queryOptions.get(QueryOptionKey.INDEX_SKIP_CONFIG);
if (indexSkipConfigStr == null) {
public static Map<String, Set<FieldConfig.IndexType>> getSkipIndexes(Map<String, String> queryOptions) {
// Example config: skipIndexes='col1=inverted,range&col2=inverted'
String skipIndexesStr = queryOptions.get(QueryOptionKey.SKIP_INDEXES);
if (skipIndexesStr == null) {
return null;
}

String[] perColumnIndexSkip = indexSkipConfigStr.split("&");
Map<String, Set<FieldConfig.IndexType>> indexSkipConfig = new HashMap<>();
String[] perColumnIndexSkip = StringUtils.split(skipIndexesStr, '&');
Map<String, Set<FieldConfig.IndexType>> skipIndexes = new HashMap<>();
for (String columnConf : perColumnIndexSkip) {
String[] conf = columnConf.split("=");
String[] conf = StringUtils.split(columnConf, '=');
if (conf.length != 2) {
throw new RuntimeException("Invalid format for " + QueryOptionKey.INDEX_SKIP_CONFIG
+ ". Example of valid format: SET indexSkipConfig='col1=inverted,range&col2=inverted'");
throw new RuntimeException("Invalid format for " + QueryOptionKey.SKIP_INDEXES
+ ". Example of valid format: SET skipIndexes='col1=inverted,range&col2=inverted'");
}
String columnName = conf[0];
String[] indexTypes = conf[1].split(",");
String[] indexTypes = StringUtils.split(conf[1], ',');

for (String indexType : indexTypes) {
indexSkipConfig.computeIfAbsent(columnName, k -> new HashSet<>())
skipIndexes.computeIfAbsent(columnName, k -> new HashSet<>())
.add(FieldConfig.IndexType.valueOf(indexType.toUpperCase()));
}
}

return indexSkipConfig;
return skipIndexes;
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Set;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.sql.parsers.parser.ParseException;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand All @@ -48,23 +47,22 @@ public void shouldConvertCaseInsensitiveMapToUseCorrectValues() {
}

@Test
public void testIndexSkipConfigParsing()
throws ParseException {
String indexSkipConfigStr = "col1=inverted,range&col2=sorted";
public void testSkipIndexesParsing() {
String skipIndexesStr = "col1=inverted,range&col2=sorted";
Map<String, String> queryOptions =
Map.of(CommonConstants.Broker.Request.QueryOptionKey.INDEX_SKIP_CONFIG, indexSkipConfigStr);
Map<String, Set<FieldConfig.IndexType>> indexSkipConfig = QueryOptionsUtils.getIndexSkipConfig(queryOptions);
Assert.assertEquals(indexSkipConfig.get("col1"),
Map.of(CommonConstants.Broker.Request.QueryOptionKey.SKIP_INDEXES, skipIndexesStr);
Map<String, Set<FieldConfig.IndexType>> skipIndexes = QueryOptionsUtils.getSkipIndexes(queryOptions);
Assert.assertEquals(skipIndexes.get("col1"),
Set.of(FieldConfig.IndexType.RANGE, FieldConfig.IndexType.INVERTED));
Assert.assertEquals(indexSkipConfig.get("col2"),
Assert.assertEquals(skipIndexes.get("col2"),
Set.of(FieldConfig.IndexType.SORTED));
}

@Test(expectedExceptions = RuntimeException.class)
public void testIndexSkipConfigParsingInvalid() {
String indexSkipConfigStr = "col1=inverted,range&col2";
public void testSkipIndexesParsingInvalid() {
String skipIndexesStr = "col1=inverted,range&col2";
Map<String, String> queryOptions =
Map.of(CommonConstants.Broker.Request.QueryOptionKey.INDEX_SKIP_CONFIG, indexSkipConfigStr);
QueryOptionsUtils.getIndexSkipConfig(queryOptions);
Map.of(CommonConstants.Broker.Request.QueryOptionKey.SKIP_INDEXES, skipIndexesStr);
QueryOptionsUtils.getSkipIndexes(queryOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ private void applyQueryOptions(QueryContext queryContext) {
// Set skipScanFilterReorder
queryContext.setSkipScanFilterReorder(QueryOptionsUtils.isSkipScanFilterReorder(queryOptions));

queryContext.setIndexSkipConfig(QueryOptionsUtils.getIndexSkipConfig(queryOptions));
queryContext.setSkipIndexes(QueryOptionsUtils.getSkipIndexes(queryOptions));

// Set maxExecutionThreads
int maxExecutionThreads;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public class QueryContext {
// Whether server returns the final result
private boolean _serverReturnFinalResult;
// Collection of index types to skip per column
private Map<String, Set<FieldConfig.IndexType>> _indexSkipConfig;
private Map<String, Set<FieldConfig.IndexType>> _skipIndexes;

private QueryContext(@Nullable String tableName, @Nullable QueryContext subquery,
List<ExpressionContext> selectExpressions, boolean distinct, List<String> aliasList,
Expand Down Expand Up @@ -432,15 +432,15 @@ public String toString() {
+ ", _expressionOverrideHints=" + _expressionOverrideHints + ", _explain=" + _explain + '}';
}

public void setIndexSkipConfig(Map<String, Set<FieldConfig.IndexType>> indexSkipConfig) {
_indexSkipConfig = indexSkipConfig;
public void setSkipIndexes(Map<String, Set<FieldConfig.IndexType>> skipIndexes) {
_skipIndexes = skipIndexes;
}

public boolean isIndexUseAllowed(String columnName, FieldConfig.IndexType indexType) {
if (_indexSkipConfig == null) {
if (_skipIndexes == null) {
return true;
}
return !_indexSkipConfig.getOrDefault(columnName, Collections.EMPTY_SET).contains(indexType);
return !_skipIndexes.getOrDefault(columnName, Collections.EMPTY_SET).contains(indexType);
}

public boolean isIndexUseAllowed(DataSource dataSource, FieldConfig.IndexType indexType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
import static org.apache.pinot.common.function.scalar.StringFunctions.*;
import static org.apache.pinot.controller.helix.core.PinotHelixResourceManager.EXTERNAL_VIEW_CHECK_INTERVAL_MS;
import static org.apache.pinot.controller.helix.core.PinotHelixResourceManager.EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS;
import static org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey.INDEX_SKIP_CONFIG;
import static org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey.SKIP_INDEXES;
import static org.testng.Assert.*;


Expand Down Expand Up @@ -3287,12 +3287,12 @@ public void testBooleanAggregation()
testQuery("SELECT BOOL_OR(CAST(Diverted AS BOOLEAN)) FROM mytable");
}

private String buildIndexSkipConfig(String columnsAndIndexes) {
return "SET " + INDEX_SKIP_CONFIG + "='" + columnsAndIndexes + "'; ";
private String buildSkipIndexesOption(String columnsAndIndexes) {
return "SET " + SKIP_INDEXES + "='" + columnsAndIndexes + "'; ";
}

@Test(dataProvider = "useBothQueryEngines")
public void testIndexSkipConfig(boolean useMultiStageQueryEngine)
public void testSkipIndexes(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
long numTotalDocs = getCountStarResult();
Expand All @@ -3306,32 +3306,32 @@ public void testIndexSkipConfig(boolean useMultiStageQueryEngine)
assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L);

// disallow use of range index on DivActualElapsedTime, inverted should be unaffected
String indexSkipConf = buildIndexSkipConfig("DivActualElapsedTime=range");
String skipIndexes = buildSkipIndexesOption("DivActualElapsedTime=range");
assertEquals(postQuery(
indexSkipConf + TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L);
skipIndexes + TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L);
assertEquals(postQuery(
indexSkipConf + TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs);
skipIndexes + TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs);

// disallow use of inverted index on DivActualElapsedTime, range should be unaffected
indexSkipConf = buildIndexSkipConfig("DivActualElapsedTime=inverted");
skipIndexes = buildSkipIndexesOption("DivActualElapsedTime=inverted");
// Confirm that inverted index is not used
assertFalse(postQuery(indexSkipConf + " EXPLAIN PLAN FOR " + TEST_UPDATED_INVERTED_INDEX_QUERY).toString()
assertFalse(postQuery(skipIndexes + " EXPLAIN PLAN FOR " + TEST_UPDATED_INVERTED_INDEX_QUERY).toString()
.contains("FILTER_INVERTED_INDEX"));

// EQ predicate type allows for using range index if one exists, even if inverted index is skipped. That is why
// we still see no docs scanned even though we skip the inverted index. This is a good test to show that using
// the indexSkipConfig can allow fine-grained experimentation of index usage at query time.
// the skipIndexes can allow fine-grained experimentation of index usage at query time.
assertEquals(postQuery(
indexSkipConf + TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L);
skipIndexes + TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L);
assertEquals(postQuery(
indexSkipConf + TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L);
skipIndexes + TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L);

// disallow use of both range and inverted indexes on DivActualElapsedTime, neither should be used at query time
indexSkipConf = buildIndexSkipConfig("DivActualElapsedTime=inverted,range");
skipIndexes = buildSkipIndexesOption("DivActualElapsedTime=inverted,range");
assertEquals(postQuery(
indexSkipConf + TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs);
skipIndexes + TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs);
assertEquals(postQuery(
indexSkipConf + TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs);
skipIndexes + TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs);

// Update table config to remove the new indexes, and check if the new indexes are removed
TableConfig tableConfig = getOfflineTableConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ public static class QueryOptionKey {
public static final String SERVER_RETURN_FINAL_RESULT = "serverReturnFinalResult";
// Reorder scan based predicates based on cardinality and number of selected values
public static final String AND_SCAN_REORDERING = "AndScanReordering";
public static final String INDEX_SKIP_CONFIG = "indexSkipConfig";
public static final String SKIP_INDEXES = "skipIndexes";

public static final String ORDER_BY_ALGORITHM = "orderByAlgorithm";

Expand Down

0 comments on commit f51c34f

Please sign in to comment.