Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Part-3: Time Series Engine E2E Quickstart #36

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pinot-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-query-runtime</artifactId>
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-timeseries-spi</artifactId>
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-timeseries-planner</artifactId>
</dependency>

<!-- Test -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.utils.request.RequestUtils;
Expand All @@ -69,6 +70,7 @@
import org.apache.pinot.core.auth.ManualAuthorization;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.RequestScope;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
Expand Down Expand Up @@ -236,6 +238,48 @@ public void processSqlWithMultiStageQueryEnginePost(String query, @Suspended Asy
}
}

@GET
@ManagedAsync
@Produces(MediaType.APPLICATION_JSON)
@Path("timeseries/api/v1/query_range")
@ApiOperation(value = "Prometheus Compatible API for Pinot's Time Series Engine")
@ManualAuthorization
public void processTimeSeriesQueryEngine(@Suspended AsyncResponse asyncResponse,
@QueryParam("language") String language,
@Context org.glassfish.grizzly.http.server.Request requestCtx,
@Context HttpHeaders httpHeaders) {
try {
try (RequestScope requestContext = Tracing.getTracer().createRequestScope()) {
String queryString = requestCtx.getQueryString();
PinotBrokerTimeSeriesResponse response = executeTimeSeriesQuery(language, queryString, requestContext);
if (response.getErrorType() != null && !response.getErrorType().isEmpty()) {
asyncResponse.resume(Response.serverError().entity(response).build());
return;
}
asyncResponse.resume(response);
}
} catch (Exception e) {
LOGGER.error("Caught exception while processing POST request", e);
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_POST_EXCEPTIONS, 1L);
asyncResponse.resume(Response.serverError().entity(
new PinotBrokerTimeSeriesResponse("error", null, e.getClass().getSimpleName(), e.getMessage()))
.build());
}
}

@GET
@ManagedAsync
@Produces(MediaType.APPLICATION_JSON)
@Path("timeseries/api/v1/query")
@ApiOperation(value = "Prometheus Compatible API for Instant Queries")
@ManualAuthorization
public void processTimeSeriesInstantQuery(@Suspended AsyncResponse asyncResponse,
@Context org.glassfish.grizzly.http.server.Request requestCtx,
@Context HttpHeaders httpHeaders) {
// TODO: Not implemented yet.
asyncResponse.resume(Response.ok().entity("{}").build());
}

@DELETE
@Path("query/{queryId}")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY)
Expand Down Expand Up @@ -342,6 +386,11 @@ private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterI
}
}

private PinotBrokerTimeSeriesResponse executeTimeSeriesQuery(String language, String queryString,
RequestContext requestContext) {
return _requestHandler.handleTimeSeriesRequest(language, queryString, requestContext);
}

private static HttpRequesterIdentity makeHttpIdentity(org.glassfish.grizzly.http.server.Request context) {
Multimap<String, String> headers = ArrayListMultimap.create();
context.getHeaderNames().forEach(key -> context.getHeaders(key).forEach(value -> headers.put(key, value)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pinot.broker.requesthandler.GrpcBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.MultiStageBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.SingleConnectionBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.TimeSeriesRequestHandler;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.config.NettyConfig;
Expand All @@ -71,6 +72,8 @@
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory;
Expand All @@ -86,6 +89,7 @@
import org.apache.pinot.spi.utils.InstanceTypeUtils;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -326,16 +330,25 @@ public void start()
_queryQuotaManager, tableCache, nettyDefaults, tlsDefaults, _serverRoutingStatsManager);
}
MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null;
QueryDispatcher queryDispatcher = null;
if (_brokerConf.getProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED)) {
// multi-stage request handler uses both Netty and GRPC ports.
// worker requires both the "Netty port" for protocol transport; and "GRPC port" for mailbox transport.
// TODO: decouple protocol and engine selection.
queryDispatcher = createQueryDispatcher(_brokerConf);
multiStageBrokerRequestHandler =
new MultiStageBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
_queryQuotaManager, tableCache);
}
TimeSeriesRequestHandler timeSeriesRequestHandler = null;
if (StringUtils.isNotBlank(_brokerConf.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey()))) {
Preconditions.checkNotNull(queryDispatcher, "Multistage Engine should be enabled to use time-series engine");
timeSeriesRequestHandler = new TimeSeriesRequestHandler(_brokerConf, brokerId, _routingManager,
_accessControlFactory, _queryQuotaManager, tableCache, queryDispatcher);
}
_brokerRequestHandler =
new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler, multiStageBrokerRequestHandler);
new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler, multiStageBrokerRequestHandler,
timeSeriesRequestHandler);
_brokerRequestHandler.start();

// Enable/disable thread CPU time measurement through instance config.
Expand Down Expand Up @@ -435,6 +448,13 @@ public void start()
protected void registerExtraComponents(BrokerAdminApiApplication brokerAdminApplication) {
}

private QueryDispatcher createQueryDispatcher(PinotConfiguration brokerConf) {
String hostname = _brokerConf.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
int port = Integer.parseInt(_brokerConf.getProperty(
CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
return new QueryDispatcher(new MailboxService(hostname, port, _brokerConf));
}

private void updateInstanceConfigAndBrokerResourceIfNeeded() {
InstanceConfig instanceConfig = HelixHelper.getInstanceConfig(_participantHelixManager, _instanceId);
boolean updated = HelixHelper.updateHostnamePort(instanceConfig, _hostname, _port);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.RequestScope;
import org.apache.pinot.spi.trace.Tracing;
Expand Down Expand Up @@ -59,6 +60,14 @@ default BrokerResponse handleRequest(String sql)
}
}

/**
* Run a query and use the time-series engine.
*/
default PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String rawQueryParamString,
RequestContext requestContext) {
throw new UnsupportedOperationException("Handler does not support Time Series requests");
}

Map<Long, String> getRunningQueries();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.common.utils.request.RequestUtils;
Expand All @@ -44,11 +45,14 @@
public class BrokerRequestHandlerDelegate implements BrokerRequestHandler {
private final BaseSingleStageBrokerRequestHandler _singleStageBrokerRequestHandler;
private final MultiStageBrokerRequestHandler _multiStageBrokerRequestHandler;
private final TimeSeriesRequestHandler _timeSeriesRequestHandler;

public BrokerRequestHandlerDelegate(BaseSingleStageBrokerRequestHandler singleStageBrokerRequestHandler,
@Nullable MultiStageBrokerRequestHandler multiStageBrokerRequestHandler) {
@Nullable MultiStageBrokerRequestHandler multiStageBrokerRequestHandler,
@Nullable TimeSeriesRequestHandler timeSeriesRequestHandler) {
_singleStageBrokerRequestHandler = singleStageBrokerRequestHandler;
_multiStageBrokerRequestHandler = multiStageBrokerRequestHandler;
_timeSeriesRequestHandler = timeSeriesRequestHandler;
}

@Override
Expand All @@ -57,6 +61,9 @@ public void start() {
if (_multiStageBrokerRequestHandler != null) {
_multiStageBrokerRequestHandler.start();
}
if (_timeSeriesRequestHandler != null) {
_timeSeriesRequestHandler.start();
}
}

@Override
Expand All @@ -65,6 +72,9 @@ public void shutDown() {
if (_multiStageBrokerRequestHandler != null) {
_multiStageBrokerRequestHandler.shutDown();
}
if (_timeSeriesRequestHandler != null) {
_timeSeriesRequestHandler.shutDown();
}
}

@Override
Expand Down Expand Up @@ -103,6 +113,15 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption
}
}

@Override
public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String rawQueryParamString,
RequestContext requestContext) {
if (_timeSeriesRequestHandler != null) {
return _timeSeriesRequestHandler.handleTimeSeriesRequest(lang, rawQueryParamString, requestContext);
}
return new PinotBrokerTimeSeriesResponse("error", null, "error", "Time series query engine not enabled.");
}

@Override
public Map<Long, String> getRunningQueries() {
// TODO: add support for multiStaged engine: track running queries for multiStaged engine and combine its
Expand Down
Loading