Skip to content

Commit

Permalink
Fix the SQL group-by for empty data table (#5151)
Browse files Browse the repository at this point in the history
When all segments are pruned on the server side, support SQL group-by empty data table.
Enable SQL query tests for offline test.
  • Loading branch information
Jackie-Jiang authored Mar 14, 2020
1 parent d15a91a commit d989427
Show file tree
Hide file tree
Showing 9 changed files with 288 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,11 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import org.apache.pinot.common.request.AggregationInfo;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.Selection;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.query.aggregation.AggregationFunctionContext;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;


/**
Expand Down Expand Up @@ -97,7 +88,7 @@ public class DataTableBuilder {
private int _numRows;
private ByteBuffer _currentRowDataByteBuffer;

public DataTableBuilder(@Nonnull DataSchema dataSchema) {
public DataTableBuilder(DataSchema dataSchema) {
_dataSchema = dataSchema;
_columnOffsets = new int[dataSchema.size()];
_rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema, _columnOffsets);
Expand Down Expand Up @@ -152,7 +143,7 @@ public void setColumn(int colId, double value) {
_currentRowDataByteBuffer.putDouble(value);
}

public void setColumn(int colId, @Nonnull String value) {
public void setColumn(int colId, String value) {
String columnName = _dataSchema.getColumnName(colId);
Map<String, Integer> dictionary = _dictionaryMap.get(columnName);
if (dictionary == null) {
Expand All @@ -171,7 +162,7 @@ public void setColumn(int colId, @Nonnull String value) {
_currentRowDataByteBuffer.putInt(dictId);
}

public void setColumn(int colId, @Nonnull Object value)
public void setColumn(int colId, Object value)
throws IOException {
_currentRowDataByteBuffer.position(_columnOffsets[colId]);
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
Expand All @@ -182,7 +173,7 @@ public void setColumn(int colId, @Nonnull Object value)
_variableSizeDataByteArrayOutputStream.write(bytes);
}

public void setColumn(int colId, @Nonnull byte[] values) {
public void setColumn(int colId, byte[] values) {
_currentRowDataByteBuffer.position(_columnOffsets[colId]);
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
_currentRowDataByteBuffer.putInt(values.length);
Expand All @@ -191,7 +182,7 @@ public void setColumn(int colId, @Nonnull byte[] values) {
}
}

public void setColumn(int colId, @Nonnull char[] values)
public void setColumn(int colId, char[] values)
throws IOException {
_currentRowDataByteBuffer.position(_columnOffsets[colId]);
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
Expand All @@ -201,7 +192,7 @@ public void setColumn(int colId, @Nonnull char[] values)
}
}

public void setColumn(int colId, @Nonnull short[] values)
public void setColumn(int colId, short[] values)
throws IOException {
_currentRowDataByteBuffer.position(_columnOffsets[colId]);
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
Expand All @@ -211,7 +202,7 @@ public void setColumn(int colId, @Nonnull short[] values)
}
}

public void setColumn(int colId, @Nonnull int[] values)
public void setColumn(int colId, int[] values)
throws IOException {
_currentRowDataByteBuffer.position(_columnOffsets[colId]);
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
Expand All @@ -221,7 +212,7 @@ public void setColumn(int colId, @Nonnull int[] values)
}
}

public void setColumn(int colId, @Nonnull long[] values)
public void setColumn(int colId, long[] values)
throws IOException {
_currentRowDataByteBuffer.position(_columnOffsets[colId]);
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
Expand All @@ -231,7 +222,7 @@ public void setColumn(int colId, @Nonnull long[] values)
}
}

public void setColumn(int colId, @Nonnull float[] values)
public void setColumn(int colId, float[] values)
throws IOException {
_currentRowDataByteBuffer.position(_columnOffsets[colId]);
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
Expand All @@ -241,7 +232,7 @@ public void setColumn(int colId, @Nonnull float[] values)
}
}

public void setColumn(int colId, @Nonnull double[] values)
public void setColumn(int colId, double[] values)
throws IOException {
_currentRowDataByteBuffer.position(_columnOffsets[colId]);
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
Expand All @@ -251,7 +242,7 @@ public void setColumn(int colId, @Nonnull double[] values)
}
}

public void setColumn(int colId, @Nonnull String[] values)
public void setColumn(int colId, String[] values)
throws IOException {
_currentRowDataByteBuffer.position(_columnOffsets[colId]);
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
Expand Down Expand Up @@ -285,86 +276,4 @@ public DataTable build() {
return new DataTableImplV2(_numRows, _dataSchema, _reverseDictionaryMap,
_fixedSizeDataByteArrayOutputStream.toByteArray(), _variableSizeDataByteArrayOutputStream.toByteArray());
}

/**
* Build an empty data table based on the broker request.
*/
public static DataTable buildEmptyDataTable(BrokerRequest brokerRequest)
throws IOException {
// Selection query.
if (brokerRequest.isSetSelections()) {
Selection selection = brokerRequest.getSelections();
List<String> selectionColumns = selection.getSelectionColumns();
int numSelectionColumns = selectionColumns.size();
DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numSelectionColumns];
// Use STRING column data type as default for selection query.
Arrays.fill(columnDataTypes, DataSchema.ColumnDataType.STRING);
DataSchema dataSchema =
new DataSchema(selectionColumns.toArray(new String[numSelectionColumns]), columnDataTypes);
return new DataTableBuilder(dataSchema).build();
}

// Aggregation query.
List<AggregationInfo> aggregationsInfo = brokerRequest.getAggregationsInfo();
int numAggregations = aggregationsInfo.size();
AggregationFunctionContext[] aggregationFunctionContexts = new AggregationFunctionContext[numAggregations];
for (int i = 0; i < numAggregations; i++) {
aggregationFunctionContexts[i] =
AggregationFunctionUtils.getAggregationFunctionContext(aggregationsInfo.get(i), brokerRequest);
}
if (brokerRequest.isSetGroupBy()) {
// Aggregation group-by query.

String[] columnNames = new String[]{"functionName", "GroupByResultMap"};
DataSchema.ColumnDataType[] columnDataTypes =
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.OBJECT};

// Build the data table.
DataTableBuilder dataTableBuilder = new DataTableBuilder(new DataSchema(columnNames, columnDataTypes));
for (int i = 0; i < numAggregations; i++) {
dataTableBuilder.startRow();
dataTableBuilder.setColumn(0, aggregationFunctionContexts[i].getAggregationColumnName());
dataTableBuilder.setColumn(1, new HashMap<String, Object>());
dataTableBuilder.finishRow();
}
return dataTableBuilder.build();
} else {
// Aggregation only query.

String[] aggregationColumnNames = new String[numAggregations];
DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numAggregations];
Object[] aggregationResults = new Object[numAggregations];
for (int i = 0; i < numAggregations; i++) {
AggregationFunctionContext aggregationFunctionContext = aggregationFunctionContexts[i];
aggregationColumnNames[i] = aggregationFunctionContext.getAggregationColumnName();
AggregationFunction aggregationFunction = aggregationFunctionContext.getAggregationFunction();
columnDataTypes[i] = aggregationFunction.getIntermediateResultColumnType();
aggregationResults[i] =
aggregationFunction.extractAggregationResult(aggregationFunction.createAggregationResultHolder());
}

// Build the data table.
DataTableBuilder dataTableBuilder = new DataTableBuilder(new DataSchema(aggregationColumnNames, columnDataTypes));
dataTableBuilder.startRow();
for (int i = 0; i < numAggregations; i++) {
switch (columnDataTypes[i]) {
case LONG:
dataTableBuilder.setColumn(i, ((Number) aggregationResults[i]).longValue());
break;
case DOUBLE:
dataTableBuilder.setColumn(i, ((Double) aggregationResults[i]).doubleValue());
break;
case OBJECT:
dataTableBuilder.setColumn(i, aggregationResults[i]);
break;
default:
throw new UnsupportedOperationException(
"Unsupported aggregation column data type: " + columnDataTypes[i] + " for column: "
+ aggregationColumnNames[i]);
}
}
dataTableBuilder.finishRow();
return dataTableBuilder.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,19 @@
*/
package org.apache.pinot.core.common.datatable;

import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.pinot.common.request.AggregationInfo;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.Selection;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.query.aggregation.AggregationFunctionContext;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.apache.pinot.core.util.QueryOptions;


/**
Expand All @@ -37,7 +48,7 @@ private DataTableUtils() {
* @param columnOffsets array of column offsets.
* @return row size in bytes.
*/
public static int computeColumnOffsets(@Nonnull DataSchema dataSchema, @Nonnull int[] columnOffsets) {
public static int computeColumnOffsets(DataSchema dataSchema, int[] columnOffsets) {
int numColumns = columnOffsets.length;
assert numColumns == dataSchema.size();

Expand Down Expand Up @@ -71,4 +82,112 @@ public static int computeColumnOffsets(@Nonnull DataSchema dataSchema, @Nonnull

return rowSizeInBytes;
}

/**
* Builds an empty data table based on the broker request.
*/
public static DataTable buildEmptyDataTable(BrokerRequest brokerRequest)
throws IOException {
// Selection query.
if (brokerRequest.isSetSelections()) {
Selection selection = brokerRequest.getSelections();
List<String> selectionColumns = selection.getSelectionColumns();
int numSelectionColumns = selectionColumns.size();
DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numSelectionColumns];
// Use STRING column data type as default for selection query.
Arrays.fill(columnDataTypes, DataSchema.ColumnDataType.STRING);
DataSchema dataSchema =
new DataSchema(selectionColumns.toArray(new String[numSelectionColumns]), columnDataTypes);
return new DataTableBuilder(dataSchema).build();
}

// Aggregation query.
List<AggregationInfo> aggregationsInfo = brokerRequest.getAggregationsInfo();
int numAggregations = aggregationsInfo.size();
AggregationFunctionContext[] aggregationFunctionContexts = new AggregationFunctionContext[numAggregations];
for (int i = 0; i < numAggregations; i++) {
aggregationFunctionContexts[i] =
AggregationFunctionUtils.getAggregationFunctionContext(aggregationsInfo.get(i), brokerRequest);
}
if (brokerRequest.isSetGroupBy()) {
// Aggregation group-by query.

if (new QueryOptions(brokerRequest.getQueryOptions()).isGroupByModeSQL()) {
// SQL format

List<String> expressions = brokerRequest.getGroupBy().getExpressions();
int numColumns = expressions.size() + numAggregations;
String[] columnNames = new String[numColumns];
DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numColumns];
int index = 0;
for (String expression : expressions) {
columnNames[index] = expression;
// Use STRING column data type as default for group-by expressions
columnDataTypes[index] = DataSchema.ColumnDataType.STRING;
index++;
}
for (int i = 0; i < numAggregations; i++) {
AggregationFunctionContext aggregationFunctionContext = aggregationFunctionContexts[i];
columnNames[index] = aggregationFunctionContext.getResultColumnName();
AggregationFunction aggregationFunction = aggregationFunctionContext.getAggregationFunction();
columnDataTypes[index] = aggregationFunction.getIntermediateResultColumnType();
index++;
}
return new DataTableBuilder(new DataSchema(columnNames, columnDataTypes)).build();
} else {
// PQL format

String[] columnNames = new String[]{"functionName", "GroupByResultMap"};
DataSchema.ColumnDataType[] columnDataTypes =
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.OBJECT};

// Build the data table.
DataTableBuilder dataTableBuilder = new DataTableBuilder(new DataSchema(columnNames, columnDataTypes));
for (int i = 0; i < numAggregations; i++) {
dataTableBuilder.startRow();
dataTableBuilder.setColumn(0, aggregationFunctionContexts[i].getAggregationColumnName());
dataTableBuilder.setColumn(1, new HashMap<String, Object>());
dataTableBuilder.finishRow();
}
return dataTableBuilder.build();
}
} else {
// Aggregation only query.

String[] aggregationColumnNames = new String[numAggregations];
DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numAggregations];
Object[] aggregationResults = new Object[numAggregations];
for (int i = 0; i < numAggregations; i++) {
AggregationFunctionContext aggregationFunctionContext = aggregationFunctionContexts[i];
aggregationColumnNames[i] = aggregationFunctionContext.getAggregationColumnName();
AggregationFunction aggregationFunction = aggregationFunctionContext.getAggregationFunction();
columnDataTypes[i] = aggregationFunction.getIntermediateResultColumnType();
aggregationResults[i] =
aggregationFunction.extractAggregationResult(aggregationFunction.createAggregationResultHolder());
}

// Build the data table.
DataTableBuilder dataTableBuilder = new DataTableBuilder(new DataSchema(aggregationColumnNames, columnDataTypes));
dataTableBuilder.startRow();
for (int i = 0; i < numAggregations; i++) {
switch (columnDataTypes[i]) {
case LONG:
dataTableBuilder.setColumn(i, ((Number) aggregationResults[i]).longValue());
break;
case DOUBLE:
dataTableBuilder.setColumn(i, ((Double) aggregationResults[i]).doubleValue());
break;
case OBJECT:
dataTableBuilder.setColumn(i, aggregationResults[i]);
break;
default:
throw new UnsupportedOperationException(
"Unsupported aggregation column data type: " + columnDataTypes[i] + " for column: "
+ aggregationColumnNames[i]);
}
}
dataTableBuilder.finishRow();
return dataTableBuilder.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import org.apache.pinot.common.segment.SegmentMetadata;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.common.datatable.DataTableImplV2;
import org.apache.pinot.core.common.datatable.DataTableUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.SegmentDataManager;
import org.apache.pinot.core.data.manager.TableDataManager;
Expand Down Expand Up @@ -188,7 +188,7 @@ public DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService e
int numSegmentsMatchedAfterPruning = segmentDataManagers.size();
LOGGER.debug("Matched {} segments after pruning", numSegmentsMatchedAfterPruning);
if (numSegmentsMatchedAfterPruning == 0) {
dataTable = DataTableBuilder.buildEmptyDataTable(brokerRequest);
dataTable = DataTableUtils.buildEmptyDataTable(brokerRequest);
Map<String, String> metadata = dataTable.getMetadata();
metadata.put(DataTable.TOTAL_DOCS_METADATA_KEY, String.valueOf(numTotalDocs));
metadata.put(DataTable.NUM_DOCS_SCANNED_METADATA_KEY, "0");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest,
}
}

// NOTE: When there is no cached data schema, that means all servers encountered exception. In such case, return the
// response with metadata only.
if (cachedDataSchema == null) {
return brokerResponseNative;
}

DataTableReducer dataTableReducer = ResultReducerFactory.getResultReducer(brokerRequest);
dataTableReducer
.reduceAndSetResults(tableName, cachedDataSchema, dataTableMap, brokerResponseNative, brokerMetrics);
Expand Down
Loading

0 comments on commit d989427

Please sign in to comment.