Skip to content

Commit

Permalink
Add new broker query point for querying multi-stage engine (apache#11341
Browse files Browse the repository at this point in the history
)
  • Loading branch information
xiangfu0 authored Aug 14, 2023
1 parent 916ac18 commit 8318650
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiKeyAuthDefinition;
Expand Down Expand Up @@ -169,6 +170,70 @@ public void processSqlQueryPost(String query, @Suspended AsyncResponse asyncResp
}
}

@GET
@ManagedAsync
@Produces(MediaType.APPLICATION_JSON)
@Path("query")
@ApiOperation(value = "Querying pinot using MultiStage Query Engine")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Query response"),
@ApiResponse(code = 500, message = "Internal Server Error")
})
@ManualAuthorization
public void processSqlWithMultiStageQueryEngineGet(
@ApiParam(value = "Query", required = true) @QueryParam("sql") String query,
@Suspended AsyncResponse asyncResponse, @Context org.glassfish.grizzly.http.server.Request requestContext,
@Context HttpHeaders httpHeaders) {
try {
ObjectNode requestJson = JsonUtils.newObjectNode();
requestJson.put(Request.SQL, query);
BrokerResponse brokerResponse =
executeSqlQuery(requestJson, makeHttpIdentity(requestContext), true, httpHeaders, true);
asyncResponse.resume(brokerResponse.toJsonString());
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
LOGGER.error("Caught exception while processing GET request", e);
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_GET_EXCEPTIONS, 1L);
asyncResponse.resume(new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR));
}
}

@POST
@ManagedAsync
@Produces(MediaType.APPLICATION_JSON)
@Path("query")
@ApiOperation(value = "Querying pinot using MultiStage Query Engine")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Query response"),
@ApiResponse(code = 500, message = "Internal Server Error")
})
@ManualAuthorization
public void processSqlWithMultiStageQueryEnginePost(String query, @Suspended AsyncResponse asyncResponse,
@Context org.glassfish.grizzly.http.server.Request requestContext,
@Context HttpHeaders httpHeaders) {
try {
JsonNode requestJson = JsonUtils.stringToJsonNode(query);
if (!requestJson.has(Request.SQL)) {
throw new IllegalStateException("Payload is missing the query string field 'sql'");
}
BrokerResponse brokerResponse =
executeSqlQuery((ObjectNode) requestJson, makeHttpIdentity(requestContext), false, httpHeaders, true);
asyncResponse.resume(brokerResponse.toJsonString());
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
LOGGER.error("Caught exception while processing POST request", e);
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_POST_EXCEPTIONS, 1L);
asyncResponse.resume(
new WebApplicationException(e,
Response
.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity(e.getMessage())
.build()));
}
}

@DELETE
@Path("query/{queryId}")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY)
Expand All @@ -185,7 +250,7 @@ public String cancelQuery(
@ApiParam(value = "Timeout for servers to respond the cancel request") @QueryParam("timeoutMs")
@DefaultValue("3000") int timeoutMs,
@ApiParam(value = "Return server responses for troubleshooting") @QueryParam("verbose") @DefaultValue("false")
boolean verbose) {
boolean verbose) {
try {
Map<String, Integer> serverResponses = verbose ? new HashMap<>() : null;
if (_requestHandler.cancelQuery(queryId, timeoutMs, _executor, _httpConnMgr, serverResponses)) {
Expand Down Expand Up @@ -226,12 +291,21 @@ public Map<Long, String> getRunningQueries() {
private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterIdentity httpRequesterIdentity,
boolean onlyDql, HttpHeaders httpHeaders)
throws Exception {
return executeSqlQuery(sqlRequestJson, httpRequesterIdentity, onlyDql, httpHeaders, false);
}

private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterIdentity httpRequesterIdentity,
boolean onlyDql, HttpHeaders httpHeaders, boolean forceUseMultiStage)
throws Exception {
SqlNodeAndOptions sqlNodeAndOptions;
try {
sqlNodeAndOptions = RequestUtils.parseQuery(sqlRequestJson.get(Request.SQL).asText(), sqlRequestJson);
} catch (Exception e) {
return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e));
}
if (forceUseMultiStage) {
sqlNodeAndOptions.setExtraOptions(ImmutableMap.of(Request.QueryOptionKey.USE_MULTISTAGE_ENGINE, "true"));
}
PinotSqlType sqlType = sqlNodeAndOptions.getSqlType();
if (onlyDql && sqlType != PinotSqlType.DQL) {
return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,7 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
}

boolean traceEnabled = Boolean.parseBoolean(
request.has(CommonConstants.Broker.Request.TRACE) ? request.get(CommonConstants.Broker.Request.TRACE).asText()
: "false");
sqlNodeAndOptions.getOptions().getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));

ResultTable queryResults;
Map<Integer, ExecutionStatsAggregator> stageIdStatsMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.JdkSslContext;
import io.netty.handler.ssl.SslContext;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
Expand Down Expand Up @@ -61,22 +60,25 @@ public class JsonAsyncHttpPinotClientTransport implements PinotClientTransport<C
private final int _brokerReadTimeout;
private final AsyncHttpClient _httpClient;
private final String _extraOptionStr;
private final boolean _useMultiStageEngine;

public JsonAsyncHttpPinotClientTransport() {
_brokerReadTimeout = 60000;
_headers = new HashMap<>();
_scheme = CommonConstants.HTTP_PROTOCOL;
_extraOptionStr = DEFAULT_EXTRA_QUERY_OPTION_STRING;
_httpClient = Dsl.asyncHttpClient(Dsl.config().setRequestTimeout(_brokerReadTimeout));
_useMultiStageEngine = false;
}

public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String scheme, String extraOptionString,
@Nullable SSLContext sslContext, ConnectionTimeouts connectionTimeouts, TlsProtocols tlsProtocols,
@Nullable String appId) {
boolean useMultiStageEngine, @Nullable SSLContext sslContext, ConnectionTimeouts connectionTimeouts,
TlsProtocols tlsProtocols, @Nullable String appId) {
_brokerReadTimeout = connectionTimeouts.getReadTimeoutMs();
_headers = headers;
_scheme = scheme;
_extraOptionStr = StringUtils.isEmpty(extraOptionString) ? DEFAULT_EXTRA_QUERY_OPTION_STRING : extraOptionString;
_useMultiStageEngine = useMultiStageEngine;

Builder builder = Dsl.config();
if (sslContext != null) {
Expand All @@ -92,28 +94,6 @@ public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String sch
_httpClient = Dsl.asyncHttpClient(builder.build());
}

public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String scheme, String extraOptionStr,
@Nullable SslContext sslContext, ConnectionTimeouts connectionTimeouts, TlsProtocols tlsProtocols,
@Nullable String appId) {
_brokerReadTimeout = connectionTimeouts.getReadTimeoutMs();
_headers = headers;
_scheme = scheme;
_extraOptionStr = StringUtils.isEmpty(extraOptionStr) ? DEFAULT_EXTRA_QUERY_OPTION_STRING : extraOptionStr;

Builder builder = Dsl.config();
if (sslContext != null) {
builder.setSslContext(sslContext);
}

builder.setRequestTimeout(_brokerReadTimeout)
.setReadTimeout(connectionTimeouts.getReadTimeoutMs())
.setConnectTimeout(connectionTimeouts.getConnectTimeoutMs())
.setHandshakeTimeout(connectionTimeouts.getHandshakeTimeoutMs())
.setUserAgent(ConnectionUtils.getUserAgentVersionFromClassPath("ua", appId))
.setEnabledProtocols(tlsProtocols.getEnabledProtocols().toArray(new String[0]));
_httpClient = Dsl.asyncHttpClient(builder.build());
}

@Override
public BrokerResponse executeQuery(String brokerAddress, String query)
throws PinotClientException {
Expand All @@ -131,29 +111,29 @@ public CompletableFuture<BrokerResponse> executeQueryAsync(String brokerAddress,
json.put("sql", query);
json.put("queryOptions", _extraOptionStr);

String url = _scheme + "://" + brokerAddress + "/query/sql";
String url = String.format("%s://%s%s", _scheme, brokerAddress, _useMultiStageEngine ? "/query" : "/query/sql");
BoundRequestBuilder requestBuilder = _httpClient.preparePost(url);

if (_headers != null) {
_headers.forEach((k, v) -> requestBuilder.addHeader(k, v));
}
LOGGER.debug("Sending query {} to {}", query, url);
return requestBuilder.addHeader("Content-Type", "application/json; charset=utf-8").setBody(json.toString())
.execute().toCompletableFuture().thenApply(httpResponse -> {
LOGGER.debug("Completed query, HTTP status is {}", httpResponse.getStatusCode());
.execute().toCompletableFuture().thenApply(httpResponse -> {
LOGGER.debug("Completed query, HTTP status is {}", httpResponse.getStatusCode());

if (httpResponse.getStatusCode() != 200) {
throw new PinotClientException(
if (httpResponse.getStatusCode() != 200) {
throw new PinotClientException(
"Pinot returned HTTP status " + httpResponse.getStatusCode() + ", expected 200");
}

String responseBody = httpResponse.getResponseBody(StandardCharsets.UTF_8);
try {
return BrokerResponse.fromJson(OBJECT_READER.readTree(responseBody));
} catch (JsonProcessingException e) {
throw new CompletionException(e);
}
});
}

String responseBody = httpResponse.getResponseBody(StandardCharsets.UTF_8);
try {
return BrokerResponse.fromJson(OBJECT_READER.readTree(responseBody));
} catch (JsonProcessingException e) {
throw new CompletionException(e);
}
});
} catch (Exception e) {
throw new PinotClientException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@ public class JsonAsyncHttpPinotClientTransportFactory implements PinotClientTran
private int _handshakeTimeoutMs = Integer.parseInt(DEFAULT_BROKER_HANDSHAKE_TIMEOUT_MS);
private String _appId = null;
private String _extraOptionString;
private boolean _useMultiStageEngine;

@Override
public PinotClientTransport buildTransport() {
ConnectionTimeouts connectionTimeouts =
ConnectionTimeouts.create(_readTimeoutMs, _connectTimeoutMs, _handshakeTimeoutMs);
TlsProtocols tlsProtocols = TlsProtocols.defaultProtocols(_tlsV10Enabled);
return new JsonAsyncHttpPinotClientTransport(_headers, _scheme, _extraOptionString, _sslContext, connectionTimeouts,
tlsProtocols, _appId);
return new JsonAsyncHttpPinotClientTransport(_headers, _scheme, _extraOptionString, _useMultiStageEngine,
_sslContext, connectionTimeouts, tlsProtocols, _appId);
}

public Map<String, String> getHeaders() {
Expand Down Expand Up @@ -103,6 +104,7 @@ public JsonAsyncHttpPinotClientTransportFactory withConnectionProperties(Propert
System.getProperties().getProperty("broker.tlsV10Enabled", DEFAULT_BROKER_TLS_V10_ENABLED));

_extraOptionString = properties.getProperty("queryOptions", "");
_useMultiStageEngine = Boolean.parseBoolean(properties.getProperty("useMultiStageEngine", "false"));
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
protected List<StreamDataServerStartable> _kafkaStarters;

protected org.apache.pinot.client.Connection _pinotConnection;
protected org.apache.pinot.client.Connection _pinotConnectionV2;
protected Connection _h2Connection;
protected QueryGenerator _queryGenerator;

Expand Down Expand Up @@ -506,6 +507,14 @@ protected TableConfig getRealtimeTableConfig() {
* @return Pinot connection
*/
protected org.apache.pinot.client.Connection getPinotConnection() {
if (useMultiStageQueryEngine()) {
if (_pinotConnectionV2 == null) {
Properties properties = getPinotConnectionProperties();
properties.put("useMultiStageEngine", "true");
_pinotConnectionV2 = ConnectionFactory.fromZookeeper(properties, getZkUrl() + "/" + getHelixClusterName());
}
return _pinotConnectionV2;
}
if (_pinotConnection == null) {
_pinotConnection =
ConnectionFactory.fromZookeeper(getPinotConnectionProperties(), getZkUrl() + "/" + getHelixClusterName());
Expand Down Expand Up @@ -753,7 +762,7 @@ protected void testQuery(String query)
protected void testQuery(String pinotQuery, String h2Query)
throws Exception {
ClusterIntegrationTestUtils.testQuery(pinotQuery, getBrokerBaseApiUrl(), getPinotConnection(), h2Query,
getH2Connection(), null, getExtraQueryProperties());
getH2Connection(), null, getExtraQueryProperties(), useMultiStageQueryEngine());
}

/**
Expand All @@ -762,6 +771,6 @@ protected void testQuery(String pinotQuery, String h2Query)
protected void testQueryWithMatchingRowCount(String pinotQuery, String h2Query)
throws Exception {
ClusterIntegrationTestUtils.testQueryWithMatchingRowCount(pinotQuery, getBrokerBaseApiUrl(), getPinotConnection(),
h2Query, getH2Connection(), null, getExtraQueryProperties());
h2Query, getH2Connection(), null, getExtraQueryProperties(), useMultiStageQueryEngine());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ public static void pushCsvIntoKafka(List<String> csvRecords, String kafkaTopic,
}
}
CSVFormat csvFormat = CSVFormat.DEFAULT.withSkipHeaderRecord(true);
for (String recordCsv: csvRecords) {
for (String recordCsv : csvRecords) {
try (CSVParser parser = CSVParser.parse(recordCsv, csvFormat)) {
for (CSVRecord csv : parser) {
byte[] keyBytes = (partitionColumnIndex == null) ? Longs.toByteArray(counter++)
Expand Down Expand Up @@ -650,7 +650,7 @@ static void testQuery(String pinotQuery, String queryResourceUrl, org.apache.pin
static void testQuery(String pinotQuery, String queryResourceUrl, org.apache.pinot.client.Connection pinotConnection,
String h2Query, Connection h2Connection, @Nullable Map<String, String> headers)
throws Exception {
testQuery(pinotQuery, queryResourceUrl, pinotConnection, h2Query, h2Connection, headers, null);
testQuery(pinotQuery, queryResourceUrl, pinotConnection, h2Query, h2Connection, headers, null, false);
}

/**
Expand All @@ -659,33 +659,35 @@ static void testQuery(String pinotQuery, String queryResourceUrl, org.apache.pin
*/
static void testQueryWithMatchingRowCount(String pinotQuery, String queryResourceUrl,
org.apache.pinot.client.Connection pinotConnection, String h2Query, Connection h2Connection,
@Nullable Map<String, String> headers, @Nullable Map<String, String> extraJsonProperties)
@Nullable Map<String, String> headers, @Nullable Map<String, String> extraJsonProperties,
boolean useMultiStageQueryEngine)
throws Exception {
try {
testQueryInternal(pinotQuery, queryResourceUrl, pinotConnection, h2Query, h2Connection, headers,
extraJsonProperties, true, false);
extraJsonProperties, useMultiStageQueryEngine, true, false);
} catch (Exception e) {
failure(pinotQuery, h2Query, e);
}
}

static void testQuery(String pinotQuery, String queryResourceUrl, org.apache.pinot.client.Connection pinotConnection,
String h2Query, Connection h2Connection, @Nullable Map<String, String> headers,
@Nullable Map<String, String> extraJsonProperties) {
@Nullable Map<String, String> extraJsonProperties, boolean useMultiStageQueryEngine) {
try {
testQueryInternal(pinotQuery, queryResourceUrl, pinotConnection, h2Query, h2Connection, headers,
extraJsonProperties, false, false);
extraJsonProperties, useMultiStageQueryEngine, false, false);
} catch (Exception e) {
failure(pinotQuery, h2Query, e);
}
}

static void testQueryViaController(String pinotQuery, String queryResourceUrl,
org.apache.pinot.client.Connection pinotConnection, String h2Query, Connection h2Connection,
@Nullable Map<String, String> headers, @Nullable Map<String, String> extraJsonProperties) {
@Nullable Map<String, String> headers, @Nullable Map<String, String> extraJsonProperties,
boolean useMultiStageQueryEngine) {
try {
testQueryInternal(pinotQuery, queryResourceUrl, pinotConnection, h2Query, h2Connection, headers,
extraJsonProperties, false, true);
extraJsonProperties, useMultiStageQueryEngine, false, true);
} catch (Exception e) {
failure(pinotQuery, h2Query, e);
}
Expand All @@ -694,14 +696,16 @@ static void testQueryViaController(String pinotQuery, String queryResourceUrl,
private static void testQueryInternal(String pinotQuery, String queryResourceUrl,
org.apache.pinot.client.Connection pinotConnection, String h2Query, Connection h2Connection,
@Nullable Map<String, String> headers, @Nullable Map<String, String> extraJsonProperties,
boolean matchingRowCount, boolean viaController)
boolean useMultiStageQueryEngine, boolean matchingRowCount, boolean viaController)
throws Exception {
// broker response
JsonNode pinotResponse;
if (viaController) {
pinotResponse = ClusterTest.postQueryToController(pinotQuery, queryResourceUrl, headers, extraJsonProperties);
} else {
pinotResponse = ClusterTest.postQuery(pinotQuery, queryResourceUrl, headers, extraJsonProperties);
pinotResponse =
ClusterTest.postQuery(pinotQuery, getBrokerQueryApiUrl(queryResourceUrl, useMultiStageQueryEngine), headers,
extraJsonProperties);
}
if (!pinotResponse.get("exceptions").isEmpty()) {
throw new RuntimeException("Got Exceptions from Query Response: " + pinotResponse);
Expand Down Expand Up @@ -824,10 +828,15 @@ private static String getExplainPlan(String pinotQuery, String brokerUrl, @Nulla
@Nullable Map<String, String> extraJsonProperties)
throws Exception {
JsonNode explainPlanForResponse =
ClusterTest.postQuery("explain plan for " + pinotQuery, brokerUrl, headers, extraJsonProperties);
ClusterTest.postQuery("explain plan for " + pinotQuery, getBrokerQueryApiUrl(brokerUrl, false), headers,
extraJsonProperties);
return ExplainPlanUtils.formatExplainPlan(explainPlanForResponse);
}

public static String getBrokerQueryApiUrl(String brokerBaseApiUrl, boolean useMultiStageQueryEngine) {
return useMultiStageQueryEngine ? brokerBaseApiUrl + "/query" : brokerBaseApiUrl + "/query/sql";
}

private static int getH2ExpectedValues(Set<String> expectedValues, List<String> expectedOrderByValues,
ResultSet h2ResultSet, ResultSetMetaData h2MetaData, Collection<String> orderByColumns)
throws SQLException {
Expand Down Expand Up @@ -1021,6 +1030,7 @@ private static void failure(String pinotQuery, String h2Query, @Nullable Excepti
String failureMessage = "Caught exception while testing query!";
failure(pinotQuery, h2Query, failureMessage, e);
}

/**
* Helper method to report failures.
*
Expand Down
Loading

0 comments on commit 8318650

Please sign in to comment.