Skip to content

Commit

Permalink
Squashed Commit for Pluggable Time Series Engine
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitsultana committed Aug 23, 2024
1 parent 32ed101 commit 1d41597
Show file tree
Hide file tree
Showing 100 changed files with 5,379 additions and 12 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ yarn-error.log*
quickstart*

#build symlink directory
build*
build
build/*

#helm related files
kubernetes/helm/**/charts/
Expand Down
5 changes: 5 additions & 0 deletions pinot-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,10 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-timeseries-planner</artifactId>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.PrometheusResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.utils.request.RequestUtils;
Expand All @@ -69,6 +70,7 @@
import org.apache.pinot.core.auth.ManualAuthorization;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.RequestScope;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
Expand Down Expand Up @@ -236,6 +238,46 @@ public void processSqlWithMultiStageQueryEnginePost(String query, @Suspended Asy
}
}

@POST
@ManagedAsync
@Produces(MediaType.APPLICATION_JSON)
@Path("timeseries/api/v1/query_range")
@ApiOperation(value = "Prometheus Compatible API for Pinot's Time Series Engine")
@ManualAuthorization
public void processTimeSeriesQueryEnginePost(String request, @Suspended AsyncResponse asyncResponse,
@QueryParam("engine") String engine,
@Context org.glassfish.grizzly.http.server.Request requestCtx,
@Context HttpHeaders httpHeaders) {
try {
try (RequestScope requestContext = Tracing.getTracer().createRequestScope()) {
PrometheusResponse response = executeTimeSeriesQuery(engine, request, requestContext);
if (response.getErrorType() != null && !response.getErrorType().isEmpty()) {
asyncResponse.resume(Response.serverError().entity(response.serializeWhenError()).build());
return;
}
asyncResponse.resume(response);
}
} catch (Exception e) {
LOGGER.error("Caught exception while processing POST request", e);
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_POST_EXCEPTIONS, 1L);
asyncResponse.resume(Response.serverError().entity(
new PrometheusResponse("error", null, e.getClass().getSimpleName(), e.getMessage()))
.build());
}
}

@POST
@ManagedAsync
@Produces(MediaType.APPLICATION_JSON)
@Path("timeseries/api/v1/query")
@ApiOperation(value = "Prometheus Compatible API for Instant Queries")
@ManualAuthorization
public void processTimeSeriesInstantQueryPost(String request, @Suspended AsyncResponse asyncResponse,
@Context org.glassfish.grizzly.http.server.Request requestCtx,
@Context HttpHeaders httpHeaders) {
asyncResponse.resume(Response.ok().entity("{}").build());
}

@DELETE
@Path("query/{queryId}")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY)
Expand Down Expand Up @@ -342,6 +384,10 @@ private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterI
}
}

private PrometheusResponse executeTimeSeriesQuery(String engine, String queryString, RequestContext requestContext) {
return _requestHandler.handleTimeSeriesRequest(engine, queryString, requestContext);
}

private static HttpRequesterIdentity makeHttpIdentity(org.glassfish.grizzly.http.server.Request context) {
Multimap<String, String> headers = ArrayListMultimap.create();
context.getHeaderNames().forEach(key -> context.getHeaders(key).forEach(value -> headers.put(key, value)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pinot.broker.requesthandler.GrpcBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.MultiStageBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.SingleConnectionBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.TimeSeriesRequestHandler;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.config.NettyConfig;
Expand All @@ -71,6 +72,8 @@
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory;
Expand Down Expand Up @@ -130,6 +133,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
protected HelixManager _participantHelixManager;
// Handles the server routing stats.
protected ServerRoutingStatsManager _serverRoutingStatsManager;
private QueryDispatcher _queryDispatcher = null;

@Override
public void init(PinotConfiguration brokerConf)
Expand Down Expand Up @@ -327,16 +331,26 @@ public void start()
queryQuotaManager, tableCache, nettyDefaults, tlsDefaults, _serverRoutingStatsManager);
}
MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null;
MailboxService mailboxService = null;
if (_brokerConf.getProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED)) {
// multi-stage request handler uses both Netty and GRPC ports.
// worker requires both the "Netty port" for protocol transport; and "GRPC port" for mailbox transport.
// TODO: decouple protocol and engine selection.
QueryDispatcher queryDispatcher = initializeQueryDispatcher(_brokerConf);
multiStageBrokerRequestHandler =
new MultiStageBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache);
queryQuotaManager, tableCache, queryDispatcher);
}
TimeSeriesRequestHandler timeSeriesRequestHandler = null;
if (_brokerConf.getProperty(Helix.CONFIG_OF_TIME_SERIES_ENGINE_ENABLED, Helix.DEFAULT_TIME_SERIES_ENGINE_ENABLED)) {
QueryDispatcher queryDispatcher = initializeQueryDispatcher(_brokerConf);
timeSeriesRequestHandler =
new TimeSeriesRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory, queryQuotaManager,
tableCache, queryDispatcher);
}
_brokerRequestHandler =
new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler, multiStageBrokerRequestHandler);
new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler, multiStageBrokerRequestHandler,
timeSeriesRequestHandler);
_brokerRequestHandler.start();

// Enable/disable thread CPU time measurement through instance config.
Expand Down Expand Up @@ -436,6 +450,16 @@ public void start()
protected void registerExtraComponents(BrokerAdminApiApplication brokerAdminApplication) {
}

private QueryDispatcher initializeQueryDispatcher(PinotConfiguration config) {
if (_queryDispatcher != null) {
return _queryDispatcher;
}
String hostname = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
int port = Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
MailboxService mailboxService = new MailboxService(hostname, port, config);
return new QueryDispatcher(mailboxService);
}

private void updateInstanceConfigAndBrokerResourceIfNeeded() {
InstanceConfig instanceConfig = HelixHelper.getInstanceConfig(_participantHelixManager, _instanceId);
boolean updated = HelixHelper.updateHostnamePort(instanceConfig, _hostname, _port);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.PrometheusResponse;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.spi.auth.AuthorizationResult;
import org.apache.pinot.spi.env.PinotConfiguration;
Expand Down Expand Up @@ -144,6 +145,11 @@ protected abstract BrokerResponse handleRequest(long requestId, String query,
RequestContext requestContext, @Nullable HttpHeaders httpHeaders, AccessControl accessControl)
throws Exception;

public PrometheusResponse handleTimeSeriesRequest(String engine, String rawQueryParamString,
RequestContext requestContext) {
throw new UnsupportedOperationException("Handler does not support Time Series requests");
}

protected static void augmentStatistics(RequestContext statistics, BrokerResponse response) {
statistics.setNumRowsResultSet(response.getNumRowsResultSet());
// TODO: Add partial result flag to RequestContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.PrometheusResponse;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.RequestScope;
import org.apache.pinot.spi.trace.Tracing;
Expand All @@ -48,6 +49,8 @@ BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOptions sqlNo
@Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, @Nullable HttpHeaders httpHeaders)
throws Exception;

PrometheusResponse handleTimeSeriesRequest(String engine, String rawQueryParamString, RequestContext requestContext);

@VisibleForTesting
default BrokerResponse handleRequest(String sql)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.PrometheusResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.common.utils.request.RequestUtils;
Expand All @@ -44,11 +45,14 @@
public class BrokerRequestHandlerDelegate implements BrokerRequestHandler {
private final BaseSingleStageBrokerRequestHandler _singleStageBrokerRequestHandler;
private final MultiStageBrokerRequestHandler _multiStageBrokerRequestHandler;
private final TimeSeriesRequestHandler _timeSeriesRequestHandler;

public BrokerRequestHandlerDelegate(BaseSingleStageBrokerRequestHandler singleStageBrokerRequestHandler,
@Nullable MultiStageBrokerRequestHandler multiStageBrokerRequestHandler) {
@Nullable MultiStageBrokerRequestHandler multiStageBrokerRequestHandler,
@Nullable TimeSeriesRequestHandler timeSeriesRequestHandler) {
_singleStageBrokerRequestHandler = singleStageBrokerRequestHandler;
_multiStageBrokerRequestHandler = multiStageBrokerRequestHandler;
_timeSeriesRequestHandler = timeSeriesRequestHandler;
}

@Override
Expand Down Expand Up @@ -103,6 +107,16 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption
}
}

@Override
public PrometheusResponse handleTimeSeriesRequest(String engine, String rawQueryParamString,
RequestContext requestContext) {
if (_timeSeriesRequestHandler == null) {
return new PrometheusResponse("error", null,
"NOT_ENABLED", "Time series engine is not enabled");
}
return _timeSeriesRequestHandler.handleTimeSeriesRequest(engine, rawQueryParamString, requestContext);
}

@Override
public Map<Long, String> getRunningQueries() {
// TODO: add support for multiStaged engine: track running queries for multiStaged engine and combine its
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.catalog.PinotCatalog;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.planner.plannode.PlanNode;
Expand Down Expand Up @@ -85,12 +84,13 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
private final PinotCatalog _catalog;

public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache) {
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
QueryDispatcher queryDispatcher) {
super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache);
String hostname = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
int port = Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
_workerManager = new WorkerManager(hostname, port, _routingManager);
_queryDispatcher = new QueryDispatcher(new MailboxService(hostname, port, config));
_queryDispatcher = queryDispatcher;
_catalog = new PinotCatalog(tableCache);
LOGGER.info("Initialized MultiStageBrokerRequestHandler on host: {}, port: {} with broker id: {}, timeout: {}ms, "
+ "query log max length: {}, query log max rate: {}", hostname, port, _brokerId, _brokerTimeoutMs,
Expand Down
Loading

0 comments on commit 1d41597

Please sign in to comment.