diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 570748a74c9..eed9ae56d4d 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -97,6 +97,7 @@ import org.apache.pinot.spi.utils.BytesUtils; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Broker; +import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey; import org.apache.pinot.spi.utils.TimestampIndexUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.sql.FilterKind; @@ -684,7 +685,8 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S return new BrokerResponseNative(exceptions); } - // Set the maximum serialized response size per server + // Set the maximum serialized response size per server, and ask server to directly return final response when only + // one server is queried int numServers = 0; if (offlineRoutingTable != null) { numServers += offlineRoutingTable.size(); @@ -692,14 +694,31 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S if (realtimeRoutingTable != null) { numServers += realtimeRoutingTable.size(); } - if (offlineBrokerRequest != null) { - setMaxServerResponseSizeBytes(numServers, offlineBrokerRequest.getPinotQuery().getQueryOptions(), - offlineTableConfig); + Map queryOptions = offlineBrokerRequest.getPinotQuery().getQueryOptions(); + setMaxServerResponseSizeBytes(numServers, queryOptions, offlineTableConfig); + // Set the query option to directly return final result for single server query unless it is explicitly disabled + if (numServers == 1) { + // Set the same flag in the original server request to be used in the reduce phase for hybrid table + if (queryOptions.putIfAbsent(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true") == null + && offlineBrokerRequest != serverBrokerRequest) { + serverBrokerRequest.getPinotQuery().getQueryOptions() + .put(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true"); + } + } } if (realtimeBrokerRequest != null) { - setMaxServerResponseSizeBytes(numServers, realtimeBrokerRequest.getPinotQuery().getQueryOptions(), - realtimeTableConfig); + Map queryOptions = realtimeBrokerRequest.getPinotQuery().getQueryOptions(); + setMaxServerResponseSizeBytes(numServers, queryOptions, realtimeTableConfig); + // Set the query option to directly return final result for single server query unless it is explicitly disabled + if (numServers == 1) { + // Set the same flag in the original server request to be used in the reduce phase for hybrid table + if (queryOptions.putIfAbsent(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true") == null + && realtimeBrokerRequest != serverBrokerRequest) { + serverBrokerRequest.getPinotQuery().getQueryOptions() + .put(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true"); + } + } } // Execute the query @@ -1672,72 +1691,62 @@ private long setQueryTimeout(String tableNameWithType, Map query timeSpentMs, queryTimeoutMs, tableNameWithType); throw new TimeoutException(errorMessage); } - queryOptions.put(Broker.Request.QueryOptionKey.TIMEOUT_MS, Long.toString(remainingTimeMs)); + queryOptions.put(QueryOptionKey.TIMEOUT_MS, Long.toString(remainingTimeMs)); return remainingTimeMs; } /** * Sets a query option indicating the maximum response size that can be sent from a server to the broker. This size * is measured for the serialized response. + * + * The overriding order of priority is: + * 1. QueryOption -> maxServerResponseSizeBytes + * 2. QueryOption -> maxQueryResponseSizeBytes + * 3. TableConfig -> maxServerResponseSizeBytes + * 4. TableConfig -> maxQueryResponseSizeBytes + * 5. BrokerConfig -> maxServerResponseSizeBytes + * 6. BrokerConfig -> maxServerResponseSizeBytes */ private void setMaxServerResponseSizeBytes(int numServers, Map queryOptions, - TableConfig tableConfig) { - if (numServers == 0) { - return; - } - - // The overriding order of priority is: - // 1. QueryOption -> maxServerResponseSizeBytes - // 2. QueryOption -> maxQueryResponseSizeBytes - // 3. TableConfig -> maxServerResponseSizeBytes - // 4. TableConfig -> maxQueryResponseSizeBytes - // 5. BrokerConfig -> maxServerResponseSizeBytes - // 6. BrokerConfig -> maxServerResponseSizeBytes - + @Nullable TableConfig tableConfig) { // QueryOption if (QueryOptionsUtils.getMaxServerResponseSizeBytes(queryOptions) != null) { return; } - Long maxQueryResponseSizeQOption = QueryOptionsUtils.getMaxQueryResponseSizeBytes(queryOptions); - if (maxQueryResponseSizeQOption != null) { - Long maxServerResponseSize = maxQueryResponseSizeQOption / numServers; - queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES, - Long.toString(maxServerResponseSize)); + Long maxQueryResponseSizeQueryOption = QueryOptionsUtils.getMaxQueryResponseSizeBytes(queryOptions); + if (maxQueryResponseSizeQueryOption != null) { + queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES, + Long.toString(maxQueryResponseSizeQueryOption / numServers)); return; } // TableConfig - Preconditions.checkState(tableConfig != null); - QueryConfig queryConfig = tableConfig.getQueryConfig(); - if (queryConfig != null && queryConfig.getMaxServerResponseSizeBytes() != null) { - Long maxServerResponseSize = queryConfig.getMaxServerResponseSizeBytes(); - queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES, - Long.toString(maxServerResponseSize)); - return; - } - if (queryConfig != null && queryConfig.getMaxQueryResponseSizeBytes() != null) { - Long maxQueryResponseSize = queryConfig.getMaxQueryResponseSizeBytes(); - Long maxServerResponseSize = maxQueryResponseSize / numServers; - queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES, - Long.toString(maxServerResponseSize)); - return; + if (tableConfig != null && tableConfig.getQueryConfig() != null) { + QueryConfig queryConfig = tableConfig.getQueryConfig(); + if (queryConfig.getMaxServerResponseSizeBytes() != null) { + queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES, + Long.toString(queryConfig.getMaxServerResponseSizeBytes())); + return; + } + if (queryConfig.getMaxQueryResponseSizeBytes() != null) { + queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES, + Long.toString(queryConfig.getMaxQueryResponseSizeBytes() / numServers)); + return; + } } // BrokerConfig - Long maxServerResponseSizeCfg = _config.getProperty(Broker.CONFIG_OF_MAX_SERVER_RESPONSE_SIZE_BYTES, - Broker.DEFAULT_MAX_SERVER_RESPONSE_SIZE_BYTES); - Long maxQueryResponseSizeCfg = _config.getProperty(Broker.CONFIG_OF_MAX_QUERY_RESPONSE_SIZE_BYTES, - Broker.DEFAULT_MAX_QUERY_RESPONSE_SIZE_BYTES); - - if (maxServerResponseSizeCfg > 0) { - queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES, - Long.toString(maxServerResponseSizeCfg)); + Long maxServerResponseSizeBrokerConfig = + _config.getProperty(Broker.CONFIG_OF_MAX_SERVER_RESPONSE_SIZE_BYTES, Long.class); + if (maxServerResponseSizeBrokerConfig != null) { + queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES, Long.toString(maxServerResponseSizeBrokerConfig)); return; } - if (maxQueryResponseSizeCfg > 0) { - Long maxServerResponseSize = maxQueryResponseSizeCfg / numServers; - queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES, - Long.toString(maxServerResponseSize)); + Long maxQueryResponseSizeBrokerConfig = + _config.getProperty(Broker.CONFIG_OF_MAX_QUERY_RESPONSE_SIZE_BYTES, Long.class); + if (maxQueryResponseSizeBrokerConfig != null) { + queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES, + Long.toString(maxQueryResponseSizeBrokerConfig / numServers)); } } @@ -1769,7 +1778,7 @@ static void validateRequest(PinotQuery pinotQuery, int queryResponseLimit) { numReplicaGroupsToQuery); } } catch (NumberFormatException e) { - String numReplicaGroupsToQuery = queryOptions.get(Broker.Request.QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY); + String numReplicaGroupsToQuery = queryOptions.get(QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY); throw new IllegalStateException( String.format("numReplicaGroups must be a positive number, got: %s", numReplicaGroupsToQuery)); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java index cf13255f8b2..8c3e025af30 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java @@ -207,7 +207,7 @@ private void setFinalResult(DataTableBuilder dataTableBuilder, ColumnDataType[] dataTableBuilder.setColumn(index, ((DoubleArrayList) result).elements()); break; case STRING_ARRAY: - dataTableBuilder.setColumn(index, ((ObjectArrayList) result).toArray(new String[0])); + dataTableBuilder.setColumn(index, ((ObjectArrayList) result).elements()); break; default: throw new IllegalStateException("Illegal column data type in final result: " + columnDataType); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java index 8302a7db28e..c469363be98 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java @@ -19,6 +19,10 @@ package org.apache.pinot.core.operator.blocks.results; import it.unimi.dsi.fastutil.doubles.DoubleArrayList; +import it.unimi.dsi.fastutil.floats.FloatArrayList; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; import java.io.IOException; import java.math.BigDecimal; import java.util.ArrayList; @@ -240,13 +244,25 @@ private void setDataTableColumn(ColumnDataType storedColumnDataType, DataTableBu dataTableBuilder.setColumn(columnIndex, (ByteArray) value); break; case INT_ARRAY: - dataTableBuilder.setColumn(columnIndex, (int[]) value); + if (value instanceof IntArrayList) { + dataTableBuilder.setColumn(columnIndex, ((IntArrayList) value).elements()); + } else { + dataTableBuilder.setColumn(columnIndex, (int[]) value); + } break; case LONG_ARRAY: - dataTableBuilder.setColumn(columnIndex, (long[]) value); + if (value instanceof LongArrayList) { + dataTableBuilder.setColumn(columnIndex, ((LongArrayList) value).elements()); + } else { + dataTableBuilder.setColumn(columnIndex, (long[]) value); + } break; case FLOAT_ARRAY: - dataTableBuilder.setColumn(columnIndex, (float[]) value); + if (value instanceof FloatArrayList) { + dataTableBuilder.setColumn(columnIndex, ((FloatArrayList) value).elements()); + } else { + dataTableBuilder.setColumn(columnIndex, (float[]) value); + } break; case DOUBLE_ARRAY: if (value instanceof DoubleArrayList) { @@ -256,7 +272,12 @@ private void setDataTableColumn(ColumnDataType storedColumnDataType, DataTableBu } break; case STRING_ARRAY: - dataTableBuilder.setColumn(columnIndex, (String[]) value); + if (value instanceof ObjectArrayList) { + //noinspection unchecked + dataTableBuilder.setColumn(columnIndex, ((ObjectArrayList) value).elements()); + } else { + dataTableBuilder.setColumn(columnIndex, (String[]) value); + } break; case OBJECT: dataTableBuilder.setColumn(columnIndex, value); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java index 7de88cb8c78..7a077ccbd57 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java @@ -178,8 +178,34 @@ public static Object getConvertedFinalResult(DataTable dataTable, ColumnDataType return dataTable.getString(rowId, colId); case BYTES: return dataTable.getBytes(rowId, colId).getBytes(); + case INT_ARRAY: + return dataTable.getIntArray(rowId, colId); + case LONG_ARRAY: + return dataTable.getLongArray(rowId, colId); + case FLOAT_ARRAY: + return dataTable.getFloatArray(rowId, colId); case DOUBLE_ARRAY: return dataTable.getDoubleArray(rowId, colId); + case BOOLEAN_ARRAY: { + int[] intValues = dataTable.getIntArray(rowId, colId); + int numValues = intValues.length; + boolean[] booleanValues = new boolean[numValues]; + for (int i = 0; i < numValues; i++) { + booleanValues[i] = intValues[i] == 1; + } + return booleanValues; + } + case TIMESTAMP_ARRAY: { + long[] longValues = dataTable.getLongArray(rowId, colId); + int numValues = longValues.length; + Timestamp[] timestampValues = new Timestamp[numValues]; + for (int i = 0; i < numValues; i++) { + timestampValues[i] = new Timestamp(longValues[i]); + } + return timestampValues; + } + case STRING_ARRAY: + return dataTable.getStringArray(rowId, colId); default: throw new IllegalStateException("Illegal column data type in final result: " + columnDataType); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggStringFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggStringFunction.java index 5f5b98f7c89..e906015a86e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggStringFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggStringFunction.java @@ -20,6 +20,7 @@ import it.unimi.dsi.fastutil.objects.AbstractObjectCollection; import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import it.unimi.dsi.fastutil.objects.ObjectIterators; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.core.common.BlockValSet; import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; @@ -93,6 +94,9 @@ public I merge(I intermediateResult1, I intermediateResult2) { @Override public ObjectArrayList extractFinalResult(I stringArrayList) { - return new ObjectArrayList<>(stringArrayList); + // NOTE: Wrap a String[] to work around the bug of ObjectArrayList constructor creating Object[] internally. + String[] stringArray = new String[stringArrayList.size()]; + ObjectIterators.unwrap(stringArrayList.iterator(), stringArray); + return ObjectArrayList.wrap(stringArray); } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index e20ad0c9fde..09b7411d1cf 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -314,9 +314,6 @@ private void registerCallbackHandlers() { @Override protected void testQuery(String pinotQuery, String h2Query) throws Exception { - if (getNumServers() == 1) { - pinotQuery = "SET serverReturnFinalResult = true;" + pinotQuery; - } super.testQuery(pinotQuery, h2Query); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index fffa3ba17b5..568b1e15057 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -324,11 +324,9 @@ public static class Broker { // Broker config indicating the maximum serialized response size across all servers for a query. This value is // equally divided across all servers processing the query. public static final String CONFIG_OF_MAX_QUERY_RESPONSE_SIZE_BYTES = "pinot.broker.max.query.response.size.bytes"; - public static final long DEFAULT_MAX_QUERY_RESPONSE_SIZE_BYTES = Long.MAX_VALUE; // Broker config indicating the maximum length of the serialized response per server for a query. public static final String CONFIG_OF_MAX_SERVER_RESPONSE_SIZE_BYTES = "pinot.broker.max.server.response.size.bytes"; - public static final long DEFAULT_MAX_SERVER_RESPONSE_SIZE_BYTES = Long.MAX_VALUE; public static class Request {