Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ask server to directly return final result for queries hitting single server #11938

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import org.apache.pinot.spi.utils.TimestampIndexUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.FilterKind;
Expand Down Expand Up @@ -684,22 +685,40 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
return new BrokerResponseNative(exceptions);
}

// Set the maximum serialized response size per server
// Set the maximum serialized response size per server, and ask server to directly return final response when only
// one server is queried
int numServers = 0;
if (offlineRoutingTable != null) {
numServers += offlineRoutingTable.size();
}
if (realtimeRoutingTable != null) {
numServers += realtimeRoutingTable.size();
}

if (offlineBrokerRequest != null) {
setMaxServerResponseSizeBytes(numServers, offlineBrokerRequest.getPinotQuery().getQueryOptions(),
offlineTableConfig);
Map<String, String> queryOptions = offlineBrokerRequest.getPinotQuery().getQueryOptions();
setMaxServerResponseSizeBytes(numServers, queryOptions, offlineTableConfig);
// Set the query option to directly return final result for single server query unless it is explicitly disabled
if (numServers == 1) {
// Set the same flag in the original server request to be used in the reduce phase for hybrid table
if (queryOptions.putIfAbsent(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true") == null
&& offlineBrokerRequest != serverBrokerRequest) {
serverBrokerRequest.getPinotQuery().getQueryOptions()
.put(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true");
}
}
}
if (realtimeBrokerRequest != null) {
setMaxServerResponseSizeBytes(numServers, realtimeBrokerRequest.getPinotQuery().getQueryOptions(),
realtimeTableConfig);
Map<String, String> queryOptions = realtimeBrokerRequest.getPinotQuery().getQueryOptions();
setMaxServerResponseSizeBytes(numServers, queryOptions, realtimeTableConfig);
// Set the query option to directly return final result for single server query unless it is explicitly disabled
if (numServers == 1) {
// Set the same flag in the original server request to be used in the reduce phase for hybrid table
if (queryOptions.putIfAbsent(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true") == null
&& realtimeBrokerRequest != serverBrokerRequest) {
serverBrokerRequest.getPinotQuery().getQueryOptions()
.put(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true");
}
}
}

// Execute the query
Expand Down Expand Up @@ -1672,72 +1691,62 @@ private long setQueryTimeout(String tableNameWithType, Map<String, String> query
timeSpentMs, queryTimeoutMs, tableNameWithType);
throw new TimeoutException(errorMessage);
}
queryOptions.put(Broker.Request.QueryOptionKey.TIMEOUT_MS, Long.toString(remainingTimeMs));
queryOptions.put(QueryOptionKey.TIMEOUT_MS, Long.toString(remainingTimeMs));
return remainingTimeMs;
}

/**
* Sets a query option indicating the maximum response size that can be sent from a server to the broker. This size
* is measured for the serialized response.
*
* The overriding order of priority is:
* 1. QueryOption -> maxServerResponseSizeBytes
* 2. QueryOption -> maxQueryResponseSizeBytes
* 3. TableConfig -> maxServerResponseSizeBytes
* 4. TableConfig -> maxQueryResponseSizeBytes
* 5. BrokerConfig -> maxServerResponseSizeBytes
* 6. BrokerConfig -> maxServerResponseSizeBytes
*/
private void setMaxServerResponseSizeBytes(int numServers, Map<String, String> queryOptions,
TableConfig tableConfig) {
if (numServers == 0) {
return;
}

// The overriding order of priority is:
// 1. QueryOption -> maxServerResponseSizeBytes
// 2. QueryOption -> maxQueryResponseSizeBytes
// 3. TableConfig -> maxServerResponseSizeBytes
// 4. TableConfig -> maxQueryResponseSizeBytes
// 5. BrokerConfig -> maxServerResponseSizeBytes
// 6. BrokerConfig -> maxServerResponseSizeBytes

@Nullable TableConfig tableConfig) {
// QueryOption
if (QueryOptionsUtils.getMaxServerResponseSizeBytes(queryOptions) != null) {
return;
}
Long maxQueryResponseSizeQOption = QueryOptionsUtils.getMaxQueryResponseSizeBytes(queryOptions);
if (maxQueryResponseSizeQOption != null) {
Long maxServerResponseSize = maxQueryResponseSizeQOption / numServers;
queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
Long.toString(maxServerResponseSize));
Long maxQueryResponseSizeQueryOption = QueryOptionsUtils.getMaxQueryResponseSizeBytes(queryOptions);
if (maxQueryResponseSizeQueryOption != null) {
queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
Long.toString(maxQueryResponseSizeQueryOption / numServers));
return;
}

// TableConfig
Preconditions.checkState(tableConfig != null);
QueryConfig queryConfig = tableConfig.getQueryConfig();
if (queryConfig != null && queryConfig.getMaxServerResponseSizeBytes() != null) {
Long maxServerResponseSize = queryConfig.getMaxServerResponseSizeBytes();
queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
Long.toString(maxServerResponseSize));
return;
}
if (queryConfig != null && queryConfig.getMaxQueryResponseSizeBytes() != null) {
Long maxQueryResponseSize = queryConfig.getMaxQueryResponseSizeBytes();
Long maxServerResponseSize = maxQueryResponseSize / numServers;
queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
Long.toString(maxServerResponseSize));
return;
if (tableConfig != null && tableConfig.getQueryConfig() != null) {
QueryConfig queryConfig = tableConfig.getQueryConfig();
if (queryConfig.getMaxServerResponseSizeBytes() != null) {
queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
Long.toString(queryConfig.getMaxServerResponseSizeBytes()));
return;
}
if (queryConfig.getMaxQueryResponseSizeBytes() != null) {
queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
Long.toString(queryConfig.getMaxQueryResponseSizeBytes() / numServers));
return;
}
}

// BrokerConfig
Long maxServerResponseSizeCfg = _config.getProperty(Broker.CONFIG_OF_MAX_SERVER_RESPONSE_SIZE_BYTES,
Broker.DEFAULT_MAX_SERVER_RESPONSE_SIZE_BYTES);
Long maxQueryResponseSizeCfg = _config.getProperty(Broker.CONFIG_OF_MAX_QUERY_RESPONSE_SIZE_BYTES,
Broker.DEFAULT_MAX_QUERY_RESPONSE_SIZE_BYTES);

if (maxServerResponseSizeCfg > 0) {
queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
Long.toString(maxServerResponseSizeCfg));
Long maxServerResponseSizeBrokerConfig =
_config.getProperty(Broker.CONFIG_OF_MAX_SERVER_RESPONSE_SIZE_BYTES, Long.class);
if (maxServerResponseSizeBrokerConfig != null) {
queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES, Long.toString(maxServerResponseSizeBrokerConfig));
return;
}
if (maxQueryResponseSizeCfg > 0) {
Long maxServerResponseSize = maxQueryResponseSizeCfg / numServers;
queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
Long.toString(maxServerResponseSize));
Long maxQueryResponseSizeBrokerConfig =
_config.getProperty(Broker.CONFIG_OF_MAX_QUERY_RESPONSE_SIZE_BYTES, Long.class);
if (maxQueryResponseSizeBrokerConfig != null) {
queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
Long.toString(maxQueryResponseSizeBrokerConfig / numServers));
}
}

Expand Down Expand Up @@ -1769,7 +1778,7 @@ static void validateRequest(PinotQuery pinotQuery, int queryResponseLimit) {
numReplicaGroupsToQuery);
}
} catch (NumberFormatException e) {
String numReplicaGroupsToQuery = queryOptions.get(Broker.Request.QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY);
String numReplicaGroupsToQuery = queryOptions.get(QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY);
throw new IllegalStateException(
String.format("numReplicaGroups must be a positive number, got: %s", numReplicaGroupsToQuery));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ private void setFinalResult(DataTableBuilder dataTableBuilder, ColumnDataType[]
dataTableBuilder.setColumn(index, ((DoubleArrayList) result).elements());
break;
case STRING_ARRAY:
dataTableBuilder.setColumn(index, ((ObjectArrayList<String>) result).toArray(new String[0]));
dataTableBuilder.setColumn(index, ((ObjectArrayList<String>) result).elements());
break;
default:
throw new IllegalStateException("Illegal column data type in final result: " + columnDataType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
package org.apache.pinot.core.operator.blocks.results;

import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import it.unimi.dsi.fastutil.floats.FloatArrayList;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
Expand Down Expand Up @@ -240,13 +244,25 @@ private void setDataTableColumn(ColumnDataType storedColumnDataType, DataTableBu
dataTableBuilder.setColumn(columnIndex, (ByteArray) value);
break;
case INT_ARRAY:
dataTableBuilder.setColumn(columnIndex, (int[]) value);
if (value instanceof IntArrayList) {
dataTableBuilder.setColumn(columnIndex, ((IntArrayList) value).elements());
} else {
dataTableBuilder.setColumn(columnIndex, (int[]) value);
}
break;
case LONG_ARRAY:
dataTableBuilder.setColumn(columnIndex, (long[]) value);
if (value instanceof LongArrayList) {
dataTableBuilder.setColumn(columnIndex, ((LongArrayList) value).elements());
} else {
dataTableBuilder.setColumn(columnIndex, (long[]) value);
}
break;
case FLOAT_ARRAY:
dataTableBuilder.setColumn(columnIndex, (float[]) value);
if (value instanceof FloatArrayList) {
dataTableBuilder.setColumn(columnIndex, ((FloatArrayList) value).elements());
} else {
dataTableBuilder.setColumn(columnIndex, (float[]) value);
}
break;
case DOUBLE_ARRAY:
if (value instanceof DoubleArrayList) {
Expand All @@ -256,7 +272,12 @@ private void setDataTableColumn(ColumnDataType storedColumnDataType, DataTableBu
}
break;
case STRING_ARRAY:
dataTableBuilder.setColumn(columnIndex, (String[]) value);
if (value instanceof ObjectArrayList) {
//noinspection unchecked
dataTableBuilder.setColumn(columnIndex, ((ObjectArrayList<String>) value).elements());
} else {
dataTableBuilder.setColumn(columnIndex, (String[]) value);
}
break;
case OBJECT:
dataTableBuilder.setColumn(columnIndex, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,34 @@ public static Object getConvertedFinalResult(DataTable dataTable, ColumnDataType
return dataTable.getString(rowId, colId);
case BYTES:
return dataTable.getBytes(rowId, colId).getBytes();
case INT_ARRAY:
return dataTable.getIntArray(rowId, colId);
case LONG_ARRAY:
return dataTable.getLongArray(rowId, colId);
case FLOAT_ARRAY:
return dataTable.getFloatArray(rowId, colId);
case DOUBLE_ARRAY:
return dataTable.getDoubleArray(rowId, colId);
case BOOLEAN_ARRAY: {
int[] intValues = dataTable.getIntArray(rowId, colId);
int numValues = intValues.length;
boolean[] booleanValues = new boolean[numValues];
for (int i = 0; i < numValues; i++) {
booleanValues[i] = intValues[i] == 1;
}
return booleanValues;
}
case TIMESTAMP_ARRAY: {
long[] longValues = dataTable.getLongArray(rowId, colId);
int numValues = longValues.length;
Timestamp[] timestampValues = new Timestamp[numValues];
for (int i = 0; i < numValues; i++) {
timestampValues[i] = new Timestamp(longValues[i]);
}
return timestampValues;
}
case STRING_ARRAY:
return dataTable.getStringArray(rowId, colId);
default:
throw new IllegalStateException("Illegal column data type in final result: " + columnDataType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import it.unimi.dsi.fastutil.objects.AbstractObjectCollection;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import it.unimi.dsi.fastutil.objects.ObjectIterators;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
Expand Down Expand Up @@ -93,6 +94,9 @@ public I merge(I intermediateResult1, I intermediateResult2) {

@Override
public ObjectArrayList<String> extractFinalResult(I stringArrayList) {
return new ObjectArrayList<>(stringArrayList);
// NOTE: Wrap a String[] to work around the bug of ObjectArrayList constructor creating Object[] internally.
String[] stringArray = new String[stringArrayList.size()];
ObjectIterators.unwrap(stringArrayList.iterator(), stringArray);
return ObjectArrayList.wrap(stringArray);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,6 @@ private void registerCallbackHandlers() {
@Override
protected void testQuery(String pinotQuery, String h2Query)
throws Exception {
if (getNumServers() == 1) {
pinotQuery = "SET serverReturnFinalResult = true;" + pinotQuery;
}
super.testQuery(pinotQuery, h2Query);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,9 @@ public static class Broker {
// Broker config indicating the maximum serialized response size across all servers for a query. This value is
// equally divided across all servers processing the query.
public static final String CONFIG_OF_MAX_QUERY_RESPONSE_SIZE_BYTES = "pinot.broker.max.query.response.size.bytes";
public static final long DEFAULT_MAX_QUERY_RESPONSE_SIZE_BYTES = Long.MAX_VALUE;

// Broker config indicating the maximum length of the serialized response per server for a query.
public static final String CONFIG_OF_MAX_SERVER_RESPONSE_SIZE_BYTES = "pinot.broker.max.server.response.size.bytes";
public static final long DEFAULT_MAX_SERVER_RESPONSE_SIZE_BYTES = Long.MAX_VALUE;


public static class Request {
Expand Down
Loading