From d989427ec076d4553206e9fdd062be24de50477d Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" <17555551+Jackie-Jiang@users.noreply.github.com> Date: Sat, 14 Mar 2020 01:14:29 -0700 Subject: [PATCH] Fix the SQL group-by for empty data table (#5151) When all segments are pruned on the server side, support SQL group-by empty data table. Enable SQL query tests for offline test. --- .../common/datatable/DataTableBuilder.java | 113 ++-------------- .../core/common/datatable/DataTableUtils.java | 123 +++++++++++++++++- .../executor/ServerQueryExecutorV1Impl.java | 4 +- .../query/reduce/BrokerReduceService.java | 6 + .../query/reduce/GroupByDataTableReducer.java | 8 -- .../reduce/SelectionDataTableReducer.java | 29 ++--- .../common/datatable/DataTableUtilsTest.java | 86 ++++++++++++ ...rSegmentOrderBySingleValueQueriesTest.java | 68 +++++----- ...ltiNodesOfflineClusterIntegrationTest.java | 21 +++ 9 files changed, 288 insertions(+), 170 deletions(-) create mode 100644 pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableUtilsTest.java diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java index 77cdf5c1e94..4dc32cb55be 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java @@ -22,20 +22,11 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Map; -import javax.annotation.Nonnull; -import org.apache.pinot.common.request.AggregationInfo; -import org.apache.pinot.common.request.BrokerRequest; -import org.apache.pinot.common.request.Selection; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataTable; import org.apache.pinot.core.common.ObjectSerDeUtils; -import org.apache.pinot.core.query.aggregation.AggregationFunctionContext; -import org.apache.pinot.core.query.aggregation.function.AggregationFunction; -import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; /** @@ -97,7 +88,7 @@ public class DataTableBuilder { private int _numRows; private ByteBuffer _currentRowDataByteBuffer; - public DataTableBuilder(@Nonnull DataSchema dataSchema) { + public DataTableBuilder(DataSchema dataSchema) { _dataSchema = dataSchema; _columnOffsets = new int[dataSchema.size()]; _rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema, _columnOffsets); @@ -152,7 +143,7 @@ public void setColumn(int colId, double value) { _currentRowDataByteBuffer.putDouble(value); } - public void setColumn(int colId, @Nonnull String value) { + public void setColumn(int colId, String value) { String columnName = _dataSchema.getColumnName(colId); Map dictionary = _dictionaryMap.get(columnName); if (dictionary == null) { @@ -171,7 +162,7 @@ public void setColumn(int colId, @Nonnull String value) { _currentRowDataByteBuffer.putInt(dictId); } - public void setColumn(int colId, @Nonnull Object value) + public void setColumn(int colId, Object value) throws IOException { _currentRowDataByteBuffer.position(_columnOffsets[colId]); _currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size()); @@ -182,7 +173,7 @@ public void setColumn(int colId, @Nonnull Object value) _variableSizeDataByteArrayOutputStream.write(bytes); } - public void setColumn(int colId, @Nonnull byte[] values) { + public void setColumn(int colId, byte[] values) { _currentRowDataByteBuffer.position(_columnOffsets[colId]); _currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size()); _currentRowDataByteBuffer.putInt(values.length); @@ -191,7 +182,7 @@ public void setColumn(int colId, @Nonnull byte[] values) { } } - public void setColumn(int colId, @Nonnull char[] values) + public void setColumn(int colId, char[] values) throws IOException { _currentRowDataByteBuffer.position(_columnOffsets[colId]); _currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size()); @@ -201,7 +192,7 @@ public void setColumn(int colId, @Nonnull char[] values) } } - public void setColumn(int colId, @Nonnull short[] values) + public void setColumn(int colId, short[] values) throws IOException { _currentRowDataByteBuffer.position(_columnOffsets[colId]); _currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size()); @@ -211,7 +202,7 @@ public void setColumn(int colId, @Nonnull short[] values) } } - public void setColumn(int colId, @Nonnull int[] values) + public void setColumn(int colId, int[] values) throws IOException { _currentRowDataByteBuffer.position(_columnOffsets[colId]); _currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size()); @@ -221,7 +212,7 @@ public void setColumn(int colId, @Nonnull int[] values) } } - public void setColumn(int colId, @Nonnull long[] values) + public void setColumn(int colId, long[] values) throws IOException { _currentRowDataByteBuffer.position(_columnOffsets[colId]); _currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size()); @@ -231,7 +222,7 @@ public void setColumn(int colId, @Nonnull long[] values) } } - public void setColumn(int colId, @Nonnull float[] values) + public void setColumn(int colId, float[] values) throws IOException { _currentRowDataByteBuffer.position(_columnOffsets[colId]); _currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size()); @@ -241,7 +232,7 @@ public void setColumn(int colId, @Nonnull float[] values) } } - public void setColumn(int colId, @Nonnull double[] values) + public void setColumn(int colId, double[] values) throws IOException { _currentRowDataByteBuffer.position(_columnOffsets[colId]); _currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size()); @@ -251,7 +242,7 @@ public void setColumn(int colId, @Nonnull double[] values) } } - public void setColumn(int colId, @Nonnull String[] values) + public void setColumn(int colId, String[] values) throws IOException { _currentRowDataByteBuffer.position(_columnOffsets[colId]); _currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size()); @@ -285,86 +276,4 @@ public DataTable build() { return new DataTableImplV2(_numRows, _dataSchema, _reverseDictionaryMap, _fixedSizeDataByteArrayOutputStream.toByteArray(), _variableSizeDataByteArrayOutputStream.toByteArray()); } - - /** - * Build an empty data table based on the broker request. - */ - public static DataTable buildEmptyDataTable(BrokerRequest brokerRequest) - throws IOException { - // Selection query. - if (brokerRequest.isSetSelections()) { - Selection selection = brokerRequest.getSelections(); - List selectionColumns = selection.getSelectionColumns(); - int numSelectionColumns = selectionColumns.size(); - DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numSelectionColumns]; - // Use STRING column data type as default for selection query. - Arrays.fill(columnDataTypes, DataSchema.ColumnDataType.STRING); - DataSchema dataSchema = - new DataSchema(selectionColumns.toArray(new String[numSelectionColumns]), columnDataTypes); - return new DataTableBuilder(dataSchema).build(); - } - - // Aggregation query. - List aggregationsInfo = brokerRequest.getAggregationsInfo(); - int numAggregations = aggregationsInfo.size(); - AggregationFunctionContext[] aggregationFunctionContexts = new AggregationFunctionContext[numAggregations]; - for (int i = 0; i < numAggregations; i++) { - aggregationFunctionContexts[i] = - AggregationFunctionUtils.getAggregationFunctionContext(aggregationsInfo.get(i), brokerRequest); - } - if (brokerRequest.isSetGroupBy()) { - // Aggregation group-by query. - - String[] columnNames = new String[]{"functionName", "GroupByResultMap"}; - DataSchema.ColumnDataType[] columnDataTypes = - new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.OBJECT}; - - // Build the data table. - DataTableBuilder dataTableBuilder = new DataTableBuilder(new DataSchema(columnNames, columnDataTypes)); - for (int i = 0; i < numAggregations; i++) { - dataTableBuilder.startRow(); - dataTableBuilder.setColumn(0, aggregationFunctionContexts[i].getAggregationColumnName()); - dataTableBuilder.setColumn(1, new HashMap()); - dataTableBuilder.finishRow(); - } - return dataTableBuilder.build(); - } else { - // Aggregation only query. - - String[] aggregationColumnNames = new String[numAggregations]; - DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numAggregations]; - Object[] aggregationResults = new Object[numAggregations]; - for (int i = 0; i < numAggregations; i++) { - AggregationFunctionContext aggregationFunctionContext = aggregationFunctionContexts[i]; - aggregationColumnNames[i] = aggregationFunctionContext.getAggregationColumnName(); - AggregationFunction aggregationFunction = aggregationFunctionContext.getAggregationFunction(); - columnDataTypes[i] = aggregationFunction.getIntermediateResultColumnType(); - aggregationResults[i] = - aggregationFunction.extractAggregationResult(aggregationFunction.createAggregationResultHolder()); - } - - // Build the data table. - DataTableBuilder dataTableBuilder = new DataTableBuilder(new DataSchema(aggregationColumnNames, columnDataTypes)); - dataTableBuilder.startRow(); - for (int i = 0; i < numAggregations; i++) { - switch (columnDataTypes[i]) { - case LONG: - dataTableBuilder.setColumn(i, ((Number) aggregationResults[i]).longValue()); - break; - case DOUBLE: - dataTableBuilder.setColumn(i, ((Double) aggregationResults[i]).doubleValue()); - break; - case OBJECT: - dataTableBuilder.setColumn(i, aggregationResults[i]); - break; - default: - throw new UnsupportedOperationException( - "Unsupported aggregation column data type: " + columnDataTypes[i] + " for column: " - + aggregationColumnNames[i]); - } - } - dataTableBuilder.finishRow(); - return dataTableBuilder.build(); - } - } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java index b0c9b150f29..497417b13c8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java @@ -18,8 +18,19 @@ */ package org.apache.pinot.core.common.datatable; -import javax.annotation.Nonnull; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import org.apache.pinot.common.request.AggregationInfo; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.request.Selection; import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataTable; +import org.apache.pinot.core.query.aggregation.AggregationFunctionContext; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; +import org.apache.pinot.core.util.QueryOptions; /** @@ -37,7 +48,7 @@ private DataTableUtils() { * @param columnOffsets array of column offsets. * @return row size in bytes. */ - public static int computeColumnOffsets(@Nonnull DataSchema dataSchema, @Nonnull int[] columnOffsets) { + public static int computeColumnOffsets(DataSchema dataSchema, int[] columnOffsets) { int numColumns = columnOffsets.length; assert numColumns == dataSchema.size(); @@ -71,4 +82,112 @@ public static int computeColumnOffsets(@Nonnull DataSchema dataSchema, @Nonnull return rowSizeInBytes; } + + /** + * Builds an empty data table based on the broker request. + */ + public static DataTable buildEmptyDataTable(BrokerRequest brokerRequest) + throws IOException { + // Selection query. + if (brokerRequest.isSetSelections()) { + Selection selection = brokerRequest.getSelections(); + List selectionColumns = selection.getSelectionColumns(); + int numSelectionColumns = selectionColumns.size(); + DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numSelectionColumns]; + // Use STRING column data type as default for selection query. + Arrays.fill(columnDataTypes, DataSchema.ColumnDataType.STRING); + DataSchema dataSchema = + new DataSchema(selectionColumns.toArray(new String[numSelectionColumns]), columnDataTypes); + return new DataTableBuilder(dataSchema).build(); + } + + // Aggregation query. + List aggregationsInfo = brokerRequest.getAggregationsInfo(); + int numAggregations = aggregationsInfo.size(); + AggregationFunctionContext[] aggregationFunctionContexts = new AggregationFunctionContext[numAggregations]; + for (int i = 0; i < numAggregations; i++) { + aggregationFunctionContexts[i] = + AggregationFunctionUtils.getAggregationFunctionContext(aggregationsInfo.get(i), brokerRequest); + } + if (brokerRequest.isSetGroupBy()) { + // Aggregation group-by query. + + if (new QueryOptions(brokerRequest.getQueryOptions()).isGroupByModeSQL()) { + // SQL format + + List expressions = brokerRequest.getGroupBy().getExpressions(); + int numColumns = expressions.size() + numAggregations; + String[] columnNames = new String[numColumns]; + DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numColumns]; + int index = 0; + for (String expression : expressions) { + columnNames[index] = expression; + // Use STRING column data type as default for group-by expressions + columnDataTypes[index] = DataSchema.ColumnDataType.STRING; + index++; + } + for (int i = 0; i < numAggregations; i++) { + AggregationFunctionContext aggregationFunctionContext = aggregationFunctionContexts[i]; + columnNames[index] = aggregationFunctionContext.getResultColumnName(); + AggregationFunction aggregationFunction = aggregationFunctionContext.getAggregationFunction(); + columnDataTypes[index] = aggregationFunction.getIntermediateResultColumnType(); + index++; + } + return new DataTableBuilder(new DataSchema(columnNames, columnDataTypes)).build(); + } else { + // PQL format + + String[] columnNames = new String[]{"functionName", "GroupByResultMap"}; + DataSchema.ColumnDataType[] columnDataTypes = + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.OBJECT}; + + // Build the data table. + DataTableBuilder dataTableBuilder = new DataTableBuilder(new DataSchema(columnNames, columnDataTypes)); + for (int i = 0; i < numAggregations; i++) { + dataTableBuilder.startRow(); + dataTableBuilder.setColumn(0, aggregationFunctionContexts[i].getAggregationColumnName()); + dataTableBuilder.setColumn(1, new HashMap()); + dataTableBuilder.finishRow(); + } + return dataTableBuilder.build(); + } + } else { + // Aggregation only query. + + String[] aggregationColumnNames = new String[numAggregations]; + DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numAggregations]; + Object[] aggregationResults = new Object[numAggregations]; + for (int i = 0; i < numAggregations; i++) { + AggregationFunctionContext aggregationFunctionContext = aggregationFunctionContexts[i]; + aggregationColumnNames[i] = aggregationFunctionContext.getAggregationColumnName(); + AggregationFunction aggregationFunction = aggregationFunctionContext.getAggregationFunction(); + columnDataTypes[i] = aggregationFunction.getIntermediateResultColumnType(); + aggregationResults[i] = + aggregationFunction.extractAggregationResult(aggregationFunction.createAggregationResultHolder()); + } + + // Build the data table. + DataTableBuilder dataTableBuilder = new DataTableBuilder(new DataSchema(aggregationColumnNames, columnDataTypes)); + dataTableBuilder.startRow(); + for (int i = 0; i < numAggregations; i++) { + switch (columnDataTypes[i]) { + case LONG: + dataTableBuilder.setColumn(i, ((Number) aggregationResults[i]).longValue()); + break; + case DOUBLE: + dataTableBuilder.setColumn(i, ((Double) aggregationResults[i]).doubleValue()); + break; + case OBJECT: + dataTableBuilder.setColumn(i, aggregationResults[i]); + break; + default: + throw new UnsupportedOperationException( + "Unsupported aggregation column data type: " + columnDataTypes[i] + " for column: " + + aggregationColumnNames[i]); + } + } + dataTableBuilder.finishRow(); + return dataTableBuilder.build(); + } + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java index 83bd6113b1d..741c119b2fe 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java @@ -35,8 +35,8 @@ import org.apache.pinot.common.segment.SegmentMetadata; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.DataTable; -import org.apache.pinot.core.common.datatable.DataTableBuilder; import org.apache.pinot.core.common.datatable.DataTableImplV2; +import org.apache.pinot.core.common.datatable.DataTableUtils; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.SegmentDataManager; import org.apache.pinot.core.data.manager.TableDataManager; @@ -188,7 +188,7 @@ public DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService e int numSegmentsMatchedAfterPruning = segmentDataManagers.size(); LOGGER.debug("Matched {} segments after pruning", numSegmentsMatchedAfterPruning); if (numSegmentsMatchedAfterPruning == 0) { - dataTable = DataTableBuilder.buildEmptyDataTable(brokerRequest); + dataTable = DataTableUtils.buildEmptyDataTable(brokerRequest); Map metadata = dataTable.getMetadata(); metadata.put(DataTable.TOTAL_DOCS_METADATA_KEY, String.valueOf(numTotalDocs)); metadata.put(DataTable.NUM_DOCS_SCANNED_METADATA_KEY, "0"); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java index c09de82bbd1..bcaf738d5dc 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java @@ -181,6 +181,12 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, } } + // NOTE: When there is no cached data schema, that means all servers encountered exception. In such case, return the + // response with metadata only. + if (cachedDataSchema == null) { + return brokerResponseNative; + } + DataTableReducer dataTableReducer = ResultReducerFactory.getResultReducer(brokerRequest); dataTableReducer .reduceAndSetResults(tableName, cachedDataSchema, dataTableMap, brokerResponseNative, brokerMetrics); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java index a9c71c58db2..e5db5ca2f5f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java @@ -56,16 +56,12 @@ import org.apache.pinot.core.util.GroupByUtils; import org.apache.pinot.core.util.QueryOptions; import org.apache.pinot.spi.utils.BytesUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Helper class to reduce data tables and set group by results into the BrokerResponseNative */ public class GroupByDataTableReducer implements DataTableReducer { - private static final Logger LOGGER = LoggerFactory.getLogger(GroupByDataTableReducer.class); - private final BrokerRequest _brokerRequest; private final AggregationFunction[] _aggregationFunctions; private final List _aggregationInfos; @@ -112,10 +108,6 @@ public class GroupByDataTableReducer implements DataTableReducer { public void reduceAndSetResults(String tableName, DataSchema dataSchema, Map dataTableMap, BrokerResponseNative brokerResponseNative, BrokerMetrics brokerMetrics) { - if (dataTableMap.isEmpty() && !_responseFormatSql) { - return; - } - assert dataSchema != null; int resultSize = 0; Collection dataTables = dataTableMap.values(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java index 60adacf22ec..1cea8253107 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java @@ -20,7 +20,6 @@ import java.io.Serializable; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -71,26 +70,17 @@ public class SelectionDataTableReducer implements DataTableReducer { public void reduceAndSetResults(String tableName, DataSchema dataSchema, Map dataTableMap, BrokerResponseNative brokerResponseNative, BrokerMetrics brokerMetrics) { - Collection dataTables = dataTableMap.values(); - if (dataTableMap.isEmpty()) { // For empty data table map, construct empty result using the cached data schema for selection query if exists - if (dataSchema != null) { - List selectionColumns = - SelectionOperatorUtils.getSelectionColumns(_selection.getSelectionColumns(), dataSchema); - if (_responseFormatSql) { - DataSchema selectionDataSchema = - SelectionOperatorUtils.getResultTableDataSchema(dataSchema, selectionColumns); - brokerResponseNative.setResultTable(new ResultTable(selectionDataSchema, Collections.emptyList())); - } else { - brokerResponseNative.setSelectionResults(new SelectionResults(selectionColumns, Collections.emptyList())); - } + List selectionColumns = + SelectionOperatorUtils.getSelectionColumns(_selection.getSelectionColumns(), dataSchema); + if (_responseFormatSql) { + DataSchema selectionDataSchema = SelectionOperatorUtils.getResultTableDataSchema(dataSchema, selectionColumns); + brokerResponseNative.setResultTable(new ResultTable(selectionDataSchema, Collections.emptyList())); + } else { + brokerResponseNative.setSelectionResults(new SelectionResults(selectionColumns, Collections.emptyList())); } - return; } else { - - assert dataSchema != null; - // For data table map with more than one data tables, remove conflicting data tables if (dataTableMap.size() > 1) { List droppedServers = removeConflictingResponses(dataSchema, dataTableMap); @@ -111,7 +101,7 @@ public void reduceAndSetResults(String tableName, DataSchema dataSchema, if (selectionSize > 0 && _selection.isSetSelectionSortSequence()) { // Selection order-by SelectionOperatorService selectionService = new SelectionOperatorService(_selection, dataSchema); - selectionService.reduceWithOrdering(dataTables); + selectionService.reduceWithOrdering(dataTableMap.values()); if (_responseFormatSql) { // TODO: Selection uses Serializable[] in all its operations // Converting that to Object[] end to end would be a big change, and will be done in future PRs @@ -123,7 +113,8 @@ public void reduceAndSetResults(String tableName, DataSchema dataSchema, // Selection only List selectionColumns = SelectionOperatorUtils.getSelectionColumns(_selection.getSelectionColumns(), dataSchema); - List reducedRows = SelectionOperatorUtils.reduceWithoutOrdering(dataTables, selectionSize); + List reducedRows = + SelectionOperatorUtils.reduceWithoutOrdering(dataTableMap.values(), selectionSize); if (_responseFormatSql) { // TODO: Selection uses Serializable[] in all its operations // Converting that to Object[] end to end would be a big change, and will be done in future PRs diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableUtilsTest.java new file mode 100644 index 00000000000..1fe908d41e9 --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableUtilsTest.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.common.datatable; + +import java.io.IOException; +import java.util.Collections; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.utils.CommonConstants.Broker.Request; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataTable; +import org.apache.pinot.pql.parsers.Pql2Compiler; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + + +public class DataTableUtilsTest { + private static final Pql2Compiler COMPILER = new Pql2Compiler(); + + @Test + public void testBuildEmptyDataTable() + throws IOException { + // Selection + BrokerRequest brokerRequest = COMPILER.compileToBrokerRequest("SELECT * FROM table WHERE foo = 'bar'"); + DataTable dataTable = DataTableUtils.buildEmptyDataTable(brokerRequest); + DataSchema dataSchema = dataTable.getDataSchema(); + assertEquals(dataSchema.getColumnNames(), new String[]{"*"}); + assertEquals(dataSchema.getColumnDataTypes(), new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING}); + assertEquals(dataTable.getNumberOfRows(), 0); + + // Aggregation + brokerRequest = COMPILER.compileToBrokerRequest("SELECT COUNT(*), SUM(a), MAX(b) FROM table WHERE foo = 'bar'"); + dataTable = DataTableUtils.buildEmptyDataTable(brokerRequest); + dataSchema = dataTable.getDataSchema(); + assertEquals(dataSchema.getColumnNames(), new String[]{"count_star", "sum_a", "max_b"}); + assertEquals(dataSchema.getColumnDataTypes(), + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE}); + assertEquals(dataTable.getNumberOfRows(), 1); + assertEquals(dataTable.getLong(0, 0), 0L); + assertEquals(dataTable.getDouble(0, 1), 0.0); + assertEquals(dataTable.getDouble(0, 2), Double.NEGATIVE_INFINITY); + + // PQL group-by + brokerRequest = + COMPILER.compileToBrokerRequest("SELECT COUNT(*), SUM(a), MAX(b) FROM table WHERE foo = 'bar' GROUP BY c, d"); + dataTable = DataTableUtils.buildEmptyDataTable(brokerRequest); + dataSchema = dataTable.getDataSchema(); + assertEquals(dataSchema.getColumnNames(), new String[]{"functionName", "GroupByResultMap"}); + assertEquals(dataSchema.getColumnDataTypes(), + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.OBJECT}); + assertEquals(dataTable.getNumberOfRows(), 3); + assertEquals(dataTable.getString(0, 0), "count_star"); + assertEquals(dataTable.getObject(0, 1), Collections.emptyMap()); + assertEquals(dataTable.getString(1, 0), "sum_a"); + assertEquals(dataTable.getObject(1, 1), Collections.emptyMap()); + assertEquals(dataTable.getString(2, 0), "max_b"); + assertEquals(dataTable.getObject(2, 1), Collections.emptyMap()); + + // SQL group-by + brokerRequest = COMPILER + .compileToBrokerRequest("SELECT c, d, COUNT(*), SUM(a), MAX(b) FROM table WHERE foo = 'bar' GROUP BY c, d"); + brokerRequest.setQueryOptions(Collections.singletonMap(Request.QueryOptionKey.GROUP_BY_MODE, Request.SQL)); + dataTable = DataTableUtils.buildEmptyDataTable(brokerRequest); + dataSchema = dataTable.getDataSchema(); + assertEquals(dataSchema.getColumnNames(), new String[]{"c", "d", "count(*)", "sum(a)", "max(b)"}); + assertEquals(dataSchema.getColumnDataTypes(), + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE}); + assertEquals(dataTable.getNumberOfRows(), 0); + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderBySingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderBySingleValueQueriesTest.java index 31d234a0757..a8cba7b5439 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderBySingleValueQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderBySingleValueQueriesTest.java @@ -218,10 +218,10 @@ public Object[][] orderBySQLResultTableProvider() { query = "SELECT SUM(column1) FROM testTable GROUP BY column11, column12 ORDER BY column11, column12 DESC"; results = Lists.newArrayList(new Object[]{"", "oZgnrlDEtjjVpUoFLol", 22680162504.0}, new Object[]{"", "dJWwFk", 55470665124.0000}, new Object[]{"", "MaztCmmxxgguBUxPti", 1333941430664.0}, - new Object[]{"", "KrNxpdycSiwoRohEiTIlLqDHnx", 733802350944.0}, - new Object[]{"", "HEuxNvH", 3789390396216.0}, new Object[]{"P", "oZgnrlDEtjjVpUoFLol", 8345501392852.0}, - new Object[]{"P", "gFuH", 860077643636.0}, new Object[]{"P", "fykKFqiw", 1574451324140.0}, - new Object[]{"P", "dJWwFk", 6224665921376.0}, new Object[]{"P", "XcBNHe", 120021767504.0}); + new Object[]{"", "KrNxpdycSiwoRohEiTIlLqDHnx", 733802350944.0}, new Object[]{"", "HEuxNvH", 3789390396216.0}, + new Object[]{"P", "oZgnrlDEtjjVpUoFLol", 8345501392852.0}, new Object[]{"P", "gFuH", 860077643636.0}, + new Object[]{"P", "fykKFqiw", 1574451324140.0}, new Object[]{"P", "dJWwFk", 6224665921376.0}, + new Object[]{"P", "XcBNHe", 120021767504.0}); dataSchema = new DataSchema(new String[]{"column11", "column12", "sum(column1)"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.DOUBLE}); numEntriesScannedPostFilter = 360000; @@ -231,8 +231,7 @@ public Object[][] orderBySQLResultTableProvider() { // order by group by column and an aggregation query = "SELECT SUM(column1) FROM testTable GROUP BY column11, column12 ORDER BY column11, sum(column1)"; results = Lists.newArrayList(new Object[]{"", "oZgnrlDEtjjVpUoFLol", 22680162504.0}, - new Object[]{"", "dJWwFk", 55470665124.0000}, - new Object[]{"", "KrNxpdycSiwoRohEiTIlLqDHnx", 733802350944.0}, + new Object[]{"", "dJWwFk", 55470665124.0000}, new Object[]{"", "KrNxpdycSiwoRohEiTIlLqDHnx", 733802350944.0}, new Object[]{"", "MaztCmmxxgguBUxPti", 1333941430664.0}, new Object[]{"", "HEuxNvH", 3789390396216.0}, new Object[]{"P", "XcBNHe", 120021767504.0}, new Object[]{"P", "gFuH", 860077643636.0}, new Object[]{"P", "fykKFqiw", 1574451324140.0}, new Object[]{"P", "TTltMtFiRqUjvOG", 4462670055540.0}, @@ -252,22 +251,19 @@ public Object[][] orderBySQLResultTableProvider() { new Object[]{"o", "MaztCmmxxgguBUxPti", 6905624581072.0}, new Object[]{"P", "dJWwFk", 6224665921376.0}, new Object[]{"o", "HEuxNvH", 5026384681784.0}, new Object[]{"t", "MaztCmmxxgguBUxPti", 4492405624940.0}, new Object[]{"P", "TTltMtFiRqUjvOG", 4462670055540.0}, new Object[]{"t", "HEuxNvH", 4424489490364.0}, - new Object[]{"o", "KrNxpdycSiwoRohEiTIlLqDHnx", 4051812250524.0}, - new Object[]{"", "HEuxNvH", 3789390396216.0}, + new Object[]{"o", "KrNxpdycSiwoRohEiTIlLqDHnx", 4051812250524.0}, new Object[]{"", "HEuxNvH", 3789390396216.0}, new Object[]{"t", "KrNxpdycSiwoRohEiTIlLqDHnx", 3529048341192.0}, new Object[]{"P", "fykKFqiw", 1574451324140.0}, new Object[]{"t", "dJWwFk", 1349058948804.0}, new Object[]{"", "MaztCmmxxgguBUxPti", 1333941430664.0}, new Object[]{"o", "dJWwFk", 1152689463360.0}, new Object[]{"t", "oZgnrlDEtjjVpUoFLol", 1039101333316.0}, new Object[]{"P", "gFuH", 860077643636.0}, new Object[]{"", "KrNxpdycSiwoRohEiTIlLqDHnx", 733802350944.0}, - new Object[]{"o", "oZgnrlDEtjjVpUoFLol", 699381633640.0}, - new Object[]{"t", "TTltMtFiRqUjvOG", 675238030848.0}, new Object[]{"t", "fykKFqiw", 480973878052.0}, - new Object[]{"t", "gFuH", 330331507792.0}, new Object[]{"o", "TTltMtFiRqUjvOG", 203835153352.0}, - new Object[]{"P", "XcBNHe", 120021767504.0}, new Object[]{"o", "fykKFqiw", 62975165296.0}, - new Object[]{"", "dJWwFk", 55470665124.0000}, new Object[]{"gFuH", "HEuxNvH", 29872400856.0}, - new Object[]{"gFuH", "MaztCmmxxgguBUxPti", 29170832184.0}, + new Object[]{"o", "oZgnrlDEtjjVpUoFLol", 699381633640.0}, new Object[]{"t", "TTltMtFiRqUjvOG", 675238030848.0}, + new Object[]{"t", "fykKFqiw", 480973878052.0}, new Object[]{"t", "gFuH", 330331507792.0}, + new Object[]{"o", "TTltMtFiRqUjvOG", 203835153352.0}, new Object[]{"P", "XcBNHe", 120021767504.0}, + new Object[]{"o", "fykKFqiw", 62975165296.0}, new Object[]{"", "dJWwFk", 55470665124.0000}, + new Object[]{"gFuH", "HEuxNvH", 29872400856.0}, new Object[]{"gFuH", "MaztCmmxxgguBUxPti", 29170832184.0}, new Object[]{"", "oZgnrlDEtjjVpUoFLol", 22680162504.0}, new Object[]{"t", "XcBNHe", 11276063956.0}, - new Object[]{"gFuH", "KrNxpdycSiwoRohEiTIlLqDHnx", 4159552848.0}, - new Object[]{"o", "gFuH", 2628604920.0}); + new Object[]{"gFuH", "KrNxpdycSiwoRohEiTIlLqDHnx", 4159552848.0}, new Object[]{"o", "gFuH", 2628604920.0}); dataSchema = new DataSchema(new String[]{"column11", "column12", "sum(column1)"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.DOUBLE}); numEntriesScannedPostFilter = 360000; @@ -387,7 +383,8 @@ public Object[][] orderBySQLResultTableProvider() { new Object[]{query, results, numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, numTotalDocs, dataSchema}); // percentile - query = "SELECT percentile90(column6) FROM testTable GROUP BY column11 ORDER BY PERCENTILE90(column6), column11 TOP 3"; + query = + "SELECT percentile90(column6) FROM testTable GROUP BY column11 ORDER BY PERCENTILE90(column6), column11 TOP 3"; results = Lists.newArrayList(new Object[]{"", 2.96467636E8}, new Object[]{"gFuH", 2.96467636E8}, new Object[]{"o", 2.96467636E8}); dataSchema = new DataSchema(new String[]{"column11", "percentile90(column6)"}, @@ -471,9 +468,8 @@ public Object[][] orderByPQLResultProvider() { new String[]{"", "MaztCmmxxgguBUxPti"}, new String[]{"", "dJWwFk"}, new String[]{"", "oZgnrlDEtjjVpUoFLol"}, new String[]{"P", "HEuxNvH"}, new String[]{"P", "KrNxpdycSiwoRohEiTIlLqDHnx"}, new String[]{"P", "MaztCmmxxgguBUxPti"}, new String[]{"P", "TTltMtFiRqUjvOG"}, new String[]{"P", "XcBNHe"}); - result1 = Lists - .newArrayList(3789390396216.0, 733802350944.0, 1333941430664.0, 55470665124.0000, 22680162504.0, - 21998672845052.0, 18069909216728.0, 27177029040008.0, 4462670055540.0, 120021767504.0); + result1 = Lists.newArrayList(3789390396216.0, 733802350944.0, 1333941430664.0, 55470665124.0000, 22680162504.0, + 21998672845052.0, 18069909216728.0, 27177029040008.0, 4462670055540.0, 120021767504.0); results = new ArrayList<>(); results.add(result1); numEntriesScannedPostFilter = 360000; @@ -506,9 +502,8 @@ public Object[][] orderByPQLResultProvider() { new String[]{"", "MaztCmmxxgguBUxPti"}, new String[]{"", "KrNxpdycSiwoRohEiTIlLqDHnx"}, new String[]{"", "HEuxNvH"}, new String[]{"P", "oZgnrlDEtjjVpUoFLol"}, new String[]{"P", "gFuH"}, new String[]{"P", "fykKFqiw"}, new String[]{"P", "dJWwFk"}, new String[]{"P", "XcBNHe"}); - result1 = Lists - .newArrayList(22680162504.0, 55470665124.0000, 1333941430664.0, 733802350944.0, 3789390396216.0, - 8345501392852.0, 860077643636.0, 1574451324140.0, 6224665921376.0, 120021767504.0); + result1 = Lists.newArrayList(22680162504.0, 55470665124.0000, 1333941430664.0, 733802350944.0, 3789390396216.0, + 8345501392852.0, 860077643636.0, 1574451324140.0, 6224665921376.0, 120021767504.0); results = new ArrayList<>(); results.add(result1); numEntriesScannedPostFilter = 360000; @@ -522,8 +517,8 @@ public Object[][] orderByPQLResultProvider() { new String[]{"", "HEuxNvH"}, new String[]{"P", "XcBNHe"}, new String[]{"P", "gFuH"}, new String[]{"P", "fykKFqiw"}, new String[]{"P", "TTltMtFiRqUjvOG"}, new String[]{"P", "dJWwFk"}); result1 = Lists - .newArrayList(22680162504.0, 55470665124.0000, 733802350944.0, 1333941430664.0, 3789390396216.0, - 120021767504.0, 860077643636.0, 1574451324140.0, 4462670055540.0, 6224665921376.0); + .newArrayList(22680162504.0, 55470665124.0000, 733802350944.0, 1333941430664.0, 3789390396216.0, 120021767504.0, + 860077643636.0, 1574451324140.0, 4462670055540.0, 6224665921376.0); results = new ArrayList<>(); results.add(result1); numEntriesScannedPostFilter = 360000; @@ -545,13 +540,12 @@ public Object[][] orderByPQLResultProvider() { new String[]{"o", "fykKFqiw"}, new String[]{"", "dJWwFk"}, new String[]{"gFuH", "HEuxNvH"}, new String[]{"gFuH", "MaztCmmxxgguBUxPti"}, new String[]{"", "oZgnrlDEtjjVpUoFLol"}, new String[]{"t", "XcBNHe"}, new String[]{"gFuH", "KrNxpdycSiwoRohEiTIlLqDHnx"}, new String[]{"o", "gFuH"}); - result1 = Lists.newArrayList(27177029040008.0, 21998672845052.0, 18069909216728.0, 8345501392852.0, - 6905624581072.0, 6224665921376.0, 5026384681784.0, 4492405624940.0, 4462670055540.0, - 4424489490364.0, 4051812250524.0, 3789390396216.0, 3529048341192.0, 1574451324140.0, - 1349058948804.0, 1333941430664.0, 1152689463360.0, 1039101333316.0, 860077643636.0, - 733802350944.0, 699381633640.0, 675238030848.0, 480973878052.0, 330331507792.0, - 203835153352.0, 120021767504.0, 62975165296.0, 55470665124.0000, 29872400856.0, - 29170832184.0, 22680162504.0, 11276063956.0, 4159552848.0, 2628604920.0); + result1 = Lists.newArrayList(27177029040008.0, 21998672845052.0, 18069909216728.0, 8345501392852.0, 6905624581072.0, + 6224665921376.0, 5026384681784.0, 4492405624940.0, 4462670055540.0, 4424489490364.0, 4051812250524.0, + 3789390396216.0, 3529048341192.0, 1574451324140.0, 1349058948804.0, 1333941430664.0, 1152689463360.0, + 1039101333316.0, 860077643636.0, 733802350944.0, 699381633640.0, 675238030848.0, 480973878052.0, 330331507792.0, + 203835153352.0, 120021767504.0, 62975165296.0, 55470665124.0000, 29872400856.0, 29170832184.0, 22680162504.0, + 11276063956.0, 4159552848.0, 2628604920.0); results = new ArrayList<>(); results.add(result1); numEntriesScannedPostFilter = 360000; @@ -589,8 +583,8 @@ public Object[][] orderByPQLResultProvider() { new String[]{"MaztCmmxxgguBUxPti"}, new String[]{"dJWwFk"}, new String[]{"KrNxpdycSiwoRohEiTIlLqDHnx"}, new String[]{"TTltMtFiRqUjvOG"}, new String[]{"oZgnrlDEtjjVpUoFLol"}); result1 = Lists - .newArrayList(329467557.0, 296467636.0, 296467636.0, 6043515.0, 6043515.0, 6043515.0, - 1980174.0, 1980174.0, 1689277.0); + .newArrayList(329467557.0, 296467636.0, 296467636.0, 6043515.0, 6043515.0, 6043515.0, 1980174.0, 1980174.0, + 1689277.0); results = new ArrayList<>(); results.add(result1); numEntriesScannedPostFilter = 240000; @@ -680,8 +674,8 @@ public Object[][] orderByPQLResultProvider() { // empty results query = "SELECT MIN(column6) FROM testTable where column12='non-existent-value' GROUP BY column11 order by column11"; - groups = new ArrayList<>(0); - results = new ArrayList<>(0); + groups = Collections.emptyList(); + results = Collections.singletonList(Collections.emptyList()); numDocsScanned = 0; numEntriesScannedPostFilter = 0; data.add( @@ -689,4 +683,4 @@ public Object[][] orderByPQLResultProvider() { return data.toArray(new Object[data.size()][]); } -} \ No newline at end of file +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java index d5605d68150..d36ca2f0f49 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java @@ -38,6 +38,20 @@ protected int getNumServers() { return NUM_SERVERS; } + @Test + @Override + public void testHardcodedQueries() + throws Exception { + super.testHardcodedQueries(); + } + + @Test + @Override + public void testHardcodedSqlQueries() + throws Exception { + super.testHardcodedSqlQueries(); + } + @Test @Override public void testQueriesFromQueryFile() @@ -45,6 +59,13 @@ public void testQueriesFromQueryFile() super.testQueriesFromQueryFile(); } + @Test + @Override + public void testSqlQueriesFromQueryFile() + throws Exception { + super.testSqlQueriesFromQueryFile(); + } + @Test @Override public void testGeneratedQueriesWithMultiValues()