Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pluggable Time Series Engine in Pinot #35

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ yarn-error.log*
quickstart*

#build symlink directory
build*
build
build/*

#helm related files
kubernetes/helm/**/charts/
Expand Down
5 changes: 5 additions & 0 deletions pinot-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,10 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-timeseries-planner</artifactId>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.PrometheusResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.utils.request.RequestUtils;
Expand All @@ -69,6 +70,7 @@
import org.apache.pinot.core.auth.ManualAuthorization;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.RequestScope;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
Expand Down Expand Up @@ -236,6 +238,46 @@ public void processSqlWithMultiStageQueryEnginePost(String query, @Suspended Asy
}
}

@POST
@ManagedAsync
@Produces(MediaType.APPLICATION_JSON)
@Path("timeseries/api/v1/query_range")
@ApiOperation(value = "Prometheus Compatible API for Pinot's Time Series Engine")
@ManualAuthorization
public void processTimeSeriesQueryEnginePost(String request, @Suspended AsyncResponse asyncResponse,
@QueryParam("engine") String engine,
@Context org.glassfish.grizzly.http.server.Request requestCtx,
@Context HttpHeaders httpHeaders) {
try {
try (RequestScope requestContext = Tracing.getTracer().createRequestScope()) {
PrometheusResponse response = executeTimeSeriesQuery(engine, request, requestContext);
if (response.getErrorType() != null && !response.getErrorType().isEmpty()) {
asyncResponse.resume(Response.serverError().entity(response.serializeWhenError()).build());
return;
}
asyncResponse.resume(response);
}
} catch (Exception e) {
LOGGER.error("Caught exception while processing POST request", e);
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_POST_EXCEPTIONS, 1L);
asyncResponse.resume(Response.serverError().entity(
new PrometheusResponse("error", null, e.getClass().getSimpleName(), e.getMessage()))
.build());
}
}

@POST
@ManagedAsync
@Produces(MediaType.APPLICATION_JSON)
@Path("timeseries/api/v1/query")
@ApiOperation(value = "Prometheus Compatible API for Instant Queries")
@ManualAuthorization
public void processTimeSeriesInstantQueryPost(String request, @Suspended AsyncResponse asyncResponse,
@Context org.glassfish.grizzly.http.server.Request requestCtx,
@Context HttpHeaders httpHeaders) {
asyncResponse.resume(Response.ok().entity("{}").build());
}

@DELETE
@Path("query/{queryId}")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY)
Expand Down Expand Up @@ -342,6 +384,10 @@ private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterI
}
}

private PrometheusResponse executeTimeSeriesQuery(String engine, String queryString, RequestContext requestContext) {
return _requestHandler.handleTimeSeriesRequest(engine, queryString, requestContext);
}

private static HttpRequesterIdentity makeHttpIdentity(org.glassfish.grizzly.http.server.Request context) {
Multimap<String, String> headers = ArrayListMultimap.create();
context.getHeaderNames().forEach(key -> context.getHeaders(key).forEach(value -> headers.put(key, value)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pinot.broker.requesthandler.GrpcBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.MultiStageBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.SingleConnectionBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.TimeSeriesRequestHandler;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.config.NettyConfig;
Expand All @@ -71,6 +72,8 @@
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory;
Expand Down Expand Up @@ -130,6 +133,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
protected HelixManager _participantHelixManager;
// Handles the server routing stats.
protected ServerRoutingStatsManager _serverRoutingStatsManager;
private QueryDispatcher _queryDispatcher = null;

@Override
public void init(PinotConfiguration brokerConf)
Expand Down Expand Up @@ -327,16 +331,26 @@ public void start()
queryQuotaManager, tableCache, nettyDefaults, tlsDefaults, _serverRoutingStatsManager);
}
MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null;
MailboxService mailboxService = null;
if (_brokerConf.getProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED)) {
// multi-stage request handler uses both Netty and GRPC ports.
// worker requires both the "Netty port" for protocol transport; and "GRPC port" for mailbox transport.
// TODO: decouple protocol and engine selection.
QueryDispatcher queryDispatcher = initializeQueryDispatcher(_brokerConf);
multiStageBrokerRequestHandler =
new MultiStageBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache);
queryQuotaManager, tableCache, queryDispatcher);
}
TimeSeriesRequestHandler timeSeriesRequestHandler = null;
if (_brokerConf.getProperty(Helix.CONFIG_OF_TIME_SERIES_ENGINE_ENABLED, Helix.DEFAULT_TIME_SERIES_ENGINE_ENABLED)) {
QueryDispatcher queryDispatcher = initializeQueryDispatcher(_brokerConf);
timeSeriesRequestHandler =
new TimeSeriesRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory, queryQuotaManager,
tableCache, queryDispatcher);
}
_brokerRequestHandler =
new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler, multiStageBrokerRequestHandler);
new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler, multiStageBrokerRequestHandler,
timeSeriesRequestHandler);
_brokerRequestHandler.start();

// Enable/disable thread CPU time measurement through instance config.
Expand Down Expand Up @@ -436,6 +450,16 @@ public void start()
protected void registerExtraComponents(BrokerAdminApiApplication brokerAdminApplication) {
}

private QueryDispatcher initializeQueryDispatcher(PinotConfiguration config) {
if (_queryDispatcher != null) {
return _queryDispatcher;
}
String hostname = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
int port = Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
MailboxService mailboxService = new MailboxService(hostname, port, config);
return new QueryDispatcher(mailboxService);
}

private void updateInstanceConfigAndBrokerResourceIfNeeded() {
InstanceConfig instanceConfig = HelixHelper.getInstanceConfig(_participantHelixManager, _instanceId);
boolean updated = HelixHelper.updateHostnamePort(instanceConfig, _hostname, _port);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.PrometheusResponse;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.spi.auth.AuthorizationResult;
import org.apache.pinot.spi.env.PinotConfiguration;
Expand Down Expand Up @@ -144,6 +145,11 @@ protected abstract BrokerResponse handleRequest(long requestId, String query,
RequestContext requestContext, @Nullable HttpHeaders httpHeaders, AccessControl accessControl)
throws Exception;

public PrometheusResponse handleTimeSeriesRequest(String engine, String rawQueryParamString,
RequestContext requestContext) {
throw new UnsupportedOperationException("Handler does not support Time Series requests");
}

protected static void augmentStatistics(RequestContext statistics, BrokerResponse response) {
statistics.setNumRowsResultSet(response.getNumRowsResultSet());
// TODO: Add partial result flag to RequestContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.PrometheusResponse;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.RequestScope;
import org.apache.pinot.spi.trace.Tracing;
Expand All @@ -48,6 +49,8 @@ BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOptions sqlNo
@Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, @Nullable HttpHeaders httpHeaders)
throws Exception;

PrometheusResponse handleTimeSeriesRequest(String engine, String rawQueryParamString, RequestContext requestContext);

@VisibleForTesting
default BrokerResponse handleRequest(String sql)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.PrometheusResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.common.utils.request.RequestUtils;
Expand All @@ -44,11 +45,14 @@
public class BrokerRequestHandlerDelegate implements BrokerRequestHandler {
private final BaseSingleStageBrokerRequestHandler _singleStageBrokerRequestHandler;
private final MultiStageBrokerRequestHandler _multiStageBrokerRequestHandler;
private final TimeSeriesRequestHandler _timeSeriesRequestHandler;

public BrokerRequestHandlerDelegate(BaseSingleStageBrokerRequestHandler singleStageBrokerRequestHandler,
@Nullable MultiStageBrokerRequestHandler multiStageBrokerRequestHandler) {
@Nullable MultiStageBrokerRequestHandler multiStageBrokerRequestHandler,
@Nullable TimeSeriesRequestHandler timeSeriesRequestHandler) {
_singleStageBrokerRequestHandler = singleStageBrokerRequestHandler;
_multiStageBrokerRequestHandler = multiStageBrokerRequestHandler;
_timeSeriesRequestHandler = timeSeriesRequestHandler;
}

@Override
Expand Down Expand Up @@ -103,6 +107,16 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption
}
}

@Override
public PrometheusResponse handleTimeSeriesRequest(String engine, String rawQueryParamString,
RequestContext requestContext) {
if (_timeSeriesRequestHandler == null) {
return new PrometheusResponse("error", null,
"NOT_ENABLED", "Time series engine is not enabled");
}
return _timeSeriesRequestHandler.handleTimeSeriesRequest(engine, rawQueryParamString, requestContext);
}

@Override
public Map<Long, String> getRunningQueries() {
// TODO: add support for multiStaged engine: track running queries for multiStaged engine and combine its
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.catalog.PinotCatalog;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.planner.plannode.PlanNode;
Expand Down Expand Up @@ -85,12 +84,13 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
private final PinotCatalog _catalog;

public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache) {
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
QueryDispatcher queryDispatcher) {
super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache);
String hostname = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
int port = Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
_workerManager = new WorkerManager(hostname, port, _routingManager);
_queryDispatcher = new QueryDispatcher(new MailboxService(hostname, port, config));
_queryDispatcher = queryDispatcher;
_catalog = new PinotCatalog(tableCache);
LOGGER.info("Initialized MultiStageBrokerRequestHandler on host: {}, port: {} with broker id: {}, timeout: {}ms, "
+ "query log max length: {}, query log max rate: {}", hostname, port, _brokerId, _brokerTimeoutMs,
Expand Down
Loading