diff --git a/pinot-broker/pom.xml b/pinot-broker/pom.xml
index 881ef091fc8..085ef1e90f3 100644
--- a/pinot-broker/pom.xml
+++ b/pinot-broker/pom.xml
@@ -38,6 +38,14 @@
org.apache.pinotpinot-query-runtime
+
+ org.apache.pinot
+ pinot-timeseries-spi
+
+
+ org.apache.pinot
+ pinot-timeseries-planner
+
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
index fd3f58fbf25..bb9e36f204b 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
@@ -61,6 +61,7 @@
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.utils.request.RequestUtils;
@@ -69,6 +70,7 @@
import org.apache.pinot.core.auth.ManualAuthorization;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
+import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.RequestScope;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
@@ -236,6 +238,48 @@ public void processSqlWithMultiStageQueryEnginePost(String query, @Suspended Asy
}
}
+ @GET
+ @ManagedAsync
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("timeseries/api/v1/query_range")
+ @ApiOperation(value = "Prometheus Compatible API for Pinot's Time Series Engine")
+ @ManualAuthorization
+ public void processTimeSeriesQueryEngine(@Suspended AsyncResponse asyncResponse,
+ @QueryParam("language") String language,
+ @Context org.glassfish.grizzly.http.server.Request requestCtx,
+ @Context HttpHeaders httpHeaders) {
+ try {
+ try (RequestScope requestContext = Tracing.getTracer().createRequestScope()) {
+ String queryString = requestCtx.getQueryString();
+ PinotBrokerTimeSeriesResponse response = executeTimeSeriesQuery(language, queryString, requestContext);
+ if (response.getErrorType() != null && !response.getErrorType().isEmpty()) {
+ asyncResponse.resume(Response.serverError().entity(response).build());
+ return;
+ }
+ asyncResponse.resume(response);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while processing POST request", e);
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_POST_EXCEPTIONS, 1L);
+ asyncResponse.resume(Response.serverError().entity(
+ new PinotBrokerTimeSeriesResponse("error", null, e.getClass().getSimpleName(), e.getMessage()))
+ .build());
+ }
+ }
+
+ @GET
+ @ManagedAsync
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("timeseries/api/v1/query")
+ @ApiOperation(value = "Prometheus Compatible API for Instant Queries")
+ @ManualAuthorization
+ public void processTimeSeriesInstantQuery(@Suspended AsyncResponse asyncResponse,
+ @Context org.glassfish.grizzly.http.server.Request requestCtx,
+ @Context HttpHeaders httpHeaders) {
+ // TODO: Not implemented yet.
+ asyncResponse.resume(Response.ok().entity("{}").build());
+ }
+
@DELETE
@Path("query/{queryId}")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY)
@@ -342,6 +386,11 @@ private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterI
}
}
+ private PinotBrokerTimeSeriesResponse executeTimeSeriesQuery(String language, String queryString,
+ RequestContext requestContext) {
+ return _requestHandler.handleTimeSeriesRequest(language, queryString, requestContext);
+ }
+
private static HttpRequesterIdentity makeHttpIdentity(org.glassfish.grizzly.http.server.Request context) {
Multimap headers = ArrayListMultimap.create();
context.getHeaderNames().forEach(key -> context.getHeaders(key).forEach(value -> headers.put(key, value)));
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 47007e315b6..41ab3b02804 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -48,6 +48,7 @@
import org.apache.pinot.broker.requesthandler.GrpcBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.MultiStageBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.SingleConnectionBrokerRequestHandler;
+import org.apache.pinot.broker.requesthandler.TimeSeriesRequestHandler;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.config.NettyConfig;
@@ -71,6 +72,8 @@
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
import org.apache.pinot.core.util.ListenerConfigUtil;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory;
@@ -86,6 +89,7 @@
import org.apache.pinot.spi.utils.InstanceTypeUtils;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
+import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -326,16 +330,25 @@ public void start()
_queryQuotaManager, tableCache, nettyDefaults, tlsDefaults, _serverRoutingStatsManager);
}
MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null;
+ QueryDispatcher queryDispatcher = null;
if (_brokerConf.getProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED)) {
// multi-stage request handler uses both Netty and GRPC ports.
// worker requires both the "Netty port" for protocol transport; and "GRPC port" for mailbox transport.
// TODO: decouple protocol and engine selection.
+ queryDispatcher = createQueryDispatcher(_brokerConf);
multiStageBrokerRequestHandler =
new MultiStageBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
_queryQuotaManager, tableCache);
}
+ TimeSeriesRequestHandler timeSeriesRequestHandler = null;
+ if (StringUtils.isNotBlank(_brokerConf.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey()))) {
+ Preconditions.checkNotNull(queryDispatcher, "Multistage Engine should be enabled to use time-series engine");
+ timeSeriesRequestHandler = new TimeSeriesRequestHandler(_brokerConf, brokerId, _routingManager,
+ _accessControlFactory, _queryQuotaManager, tableCache, queryDispatcher);
+ }
_brokerRequestHandler =
- new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler, multiStageBrokerRequestHandler);
+ new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler, multiStageBrokerRequestHandler,
+ timeSeriesRequestHandler);
_brokerRequestHandler.start();
// Enable/disable thread CPU time measurement through instance config.
@@ -435,6 +448,13 @@ public void start()
protected void registerExtraComponents(BrokerAdminApiApplication brokerAdminApplication) {
}
+ private QueryDispatcher createQueryDispatcher(PinotConfiguration brokerConf) {
+ String hostname = _brokerConf.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
+ int port = Integer.parseInt(_brokerConf.getProperty(
+ CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
+ return new QueryDispatcher(new MailboxService(hostname, port, _brokerConf));
+ }
+
private void updateInstanceConfigAndBrokerResourceIfNeeded() {
InstanceConfig instanceConfig = HelixHelper.getInstanceConfig(_participantHelixManager, _instanceId);
boolean updated = HelixHelper.updateHostnamePort(instanceConfig, _hostname, _port);
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
index 27c45817f40..277f5f96df6 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
@@ -29,6 +29,7 @@
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.RequestScope;
import org.apache.pinot.spi.trace.Tracing;
@@ -59,6 +60,14 @@ default BrokerResponse handleRequest(String sql)
}
}
+ /**
+ * Run a query and use the time-series engine.
+ */
+ default PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String rawQueryParamString,
+ RequestContext requestContext) {
+ throw new UnsupportedOperationException("Handler does not support Time Series requests");
+ }
+
Map getRunningQueries();
/**
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
index b3e48f25aeb..e3a814365a9 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
@@ -27,6 +27,7 @@
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.common.utils.request.RequestUtils;
@@ -44,11 +45,14 @@
public class BrokerRequestHandlerDelegate implements BrokerRequestHandler {
private final BaseSingleStageBrokerRequestHandler _singleStageBrokerRequestHandler;
private final MultiStageBrokerRequestHandler _multiStageBrokerRequestHandler;
+ private final TimeSeriesRequestHandler _timeSeriesRequestHandler;
public BrokerRequestHandlerDelegate(BaseSingleStageBrokerRequestHandler singleStageBrokerRequestHandler,
- @Nullable MultiStageBrokerRequestHandler multiStageBrokerRequestHandler) {
+ @Nullable MultiStageBrokerRequestHandler multiStageBrokerRequestHandler,
+ @Nullable TimeSeriesRequestHandler timeSeriesRequestHandler) {
_singleStageBrokerRequestHandler = singleStageBrokerRequestHandler;
_multiStageBrokerRequestHandler = multiStageBrokerRequestHandler;
+ _timeSeriesRequestHandler = timeSeriesRequestHandler;
}
@Override
@@ -57,6 +61,9 @@ public void start() {
if (_multiStageBrokerRequestHandler != null) {
_multiStageBrokerRequestHandler.start();
}
+ if (_timeSeriesRequestHandler != null) {
+ _timeSeriesRequestHandler.start();
+ }
}
@Override
@@ -65,6 +72,9 @@ public void shutDown() {
if (_multiStageBrokerRequestHandler != null) {
_multiStageBrokerRequestHandler.shutDown();
}
+ if (_timeSeriesRequestHandler != null) {
+ _timeSeriesRequestHandler.shutDown();
+ }
}
@Override
@@ -103,6 +113,15 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption
}
}
+ @Override
+ public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String rawQueryParamString,
+ RequestContext requestContext) {
+ if (_timeSeriesRequestHandler != null) {
+ return _timeSeriesRequestHandler.handleTimeSeriesRequest(lang, rawQueryParamString, requestContext);
+ }
+ return new PinotBrokerTimeSeriesResponse("error", null, "error", "Time series query engine not enabled.");
+ }
+
@Override
public Map getRunningQueries() {
// TODO: add support for multiStaged engine: track running queries for multiStaged engine and combine its
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
new file mode 100644
index 00000000000..fa6b67be11e
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.requesthandler;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Preconditions;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import javax.annotation.Nullable;
+import javax.ws.rs.core.HttpHeaders;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hc.client5.http.io.HttpClientConnectionManager;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.utils.URLEncodedUtils;
+import org.apache.pinot.broker.api.AccessControl;
+import org.apache.pinot.broker.api.RequesterIdentity;
+import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
+import org.apache.pinot.broker.routing.BrokerRoutingManager;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
+import org.apache.pinot.common.utils.HumanReadableDuration;
+import org.apache.pinot.query.service.dispatch.QueryDispatcher;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.apache.pinot.tsdb.planner.TimeSeriesQueryEnvironment;
+import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan;
+import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest;
+import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TimeSeriesRequestHandler extends BaseBrokerRequestHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TimeSeriesRequestHandler.class);
+ private static final long DEFAULT_STEP_SECONDS = 60L;
+ private final TimeSeriesQueryEnvironment _queryEnvironment;
+ private final QueryDispatcher _queryDispatcher;
+
+ public TimeSeriesRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
+ AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
+ QueryDispatcher queryDispatcher) {
+ super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache);
+ _queryEnvironment = new TimeSeriesQueryEnvironment(config, routingManager, tableCache);
+ _queryEnvironment.init(config);
+ _queryDispatcher = queryDispatcher;
+ }
+
+ @Override
+ protected BrokerResponse handleRequest(long requestId, String query, @Nullable SqlNodeAndOptions sqlNodeAndOptions,
+ JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext,
+ @Nullable HttpHeaders httpHeaders, AccessControl accessControl)
+ throws Exception {
+ throw new IllegalArgumentException("Not supported yet");
+ }
+
+ @Override
+ public void start() {
+ LOGGER.info("Starting time-series request handler");
+ }
+
+ @Override
+ public void shutDown() {
+ LOGGER.info("Shutting down time-series request handler");
+ }
+
+ @Override
+ public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String rawQueryParamString,
+ RequestContext requestContext) {
+ RangeTimeSeriesRequest timeSeriesRequest = null;
+ try {
+ timeSeriesRequest = buildRangeTimeSeriesRequest(lang, rawQueryParamString);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ TimeSeriesLogicalPlanResult logicalPlanResult = _queryEnvironment.buildLogicalPlan(timeSeriesRequest);
+ TimeSeriesDispatchablePlan dispatchablePlan = _queryEnvironment.buildPhysicalPlan(timeSeriesRequest, requestContext,
+ logicalPlanResult);
+ return _queryDispatcher.submitAndGet(requestContext, dispatchablePlan, timeSeriesRequest.getTimeout().toMillis(),
+ new HashMap<>());
+ }
+
+ @Override
+ public Map getRunningQueries() {
+ // TODO: Implement this.
+ return Map.of();
+ }
+
+ @Override
+ public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr,
+ Map serverResponses)
+ throws Exception {
+ // TODO: Implement this.
+ return false;
+ }
+
+ private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String language, String queryParamString)
+ throws URISyntaxException {
+ List pairs = URLEncodedUtils.parse(
+ new URI("http://localhost?" + queryParamString), "UTF-8");
+ String query = null;
+ Long startTs = null;
+ Long endTs = null;
+ String step = null;
+ String timeoutStr = null;
+ for (NameValuePair nameValuePair : pairs) {
+ switch (nameValuePair.getName()) {
+ case "query":
+ query = nameValuePair.getValue();
+ break;
+ case "start":
+ startTs = Long.parseLong(nameValuePair.getValue());
+ break;
+ case "end":
+ endTs = Long.parseLong(nameValuePair.getValue());
+ break;
+ case "step":
+ step = nameValuePair.getValue();
+ break;
+ case "timeout":
+ timeoutStr = nameValuePair.getValue();
+ break;
+ default:
+ /* Okay to ignore unknown parameters since the language implementor may be using them. */
+ break;
+ }
+ }
+ Preconditions.checkNotNull(query, "Query cannot be null");
+ Preconditions.checkNotNull(startTs, "Start time cannot be null");
+ Preconditions.checkNotNull(endTs, "End time cannot be null");
+ Duration timeout = Duration.ofMillis(_brokerTimeoutMs);
+ if (StringUtils.isNotBlank(timeoutStr)) {
+ timeout = HumanReadableDuration.from(timeoutStr);
+ }
+ // TODO: Pass full raw query param string to the request
+ return new RangeTimeSeriesRequest(language, query, startTs, endTs, getStepSeconds(step), timeout, queryParamString);
+ }
+
+ public static Long getStepSeconds(@Nullable String step) {
+ if (step == null) {
+ return DEFAULT_STEP_SECONDS;
+ }
+ try {
+ return Long.parseLong(step);
+ } catch (NumberFormatException ignored) {
+ }
+ return HumanReadableDuration.from(step).getSeconds();
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
new file mode 100644
index 00000000000..8d455e09bd3
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+
+
+/**
+ * POJO returned by the Pinot broker in a time-series query response. Format is defined
+ * in the prometheus docs.
+ * TODO: We will evolve this until Pinot 1.3. At present we are mimicking Prometheus HTTP API.
+ */
+@InterfaceStability.Evolving
+public class PinotBrokerTimeSeriesResponse {
+ public static final String SUCCESS_STATUS = "success";
+ public static final String ERROR_STATUS = "error";
+ public static final String METRIC_NAME_KEY = "__name__";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private String _status;
+ private Data _data;
+ private String _errorType;
+ private String _error;
+
+ static {
+ OBJECT_MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
+ }
+
+ @JsonCreator
+ public PinotBrokerTimeSeriesResponse(@JsonProperty("status") String status, @JsonProperty("data") Data data,
+ @JsonProperty("errorType") String errorType, @JsonProperty("error") String error) {
+ _status = status;
+ _data = data;
+ _errorType = errorType;
+ _error = error;
+ }
+
+ public String getStatus() {
+ return _status;
+ }
+
+ public Data getData() {
+ return _data;
+ }
+
+ public String getErrorType() {
+ return _errorType;
+ }
+
+ public String getError() {
+ return _error;
+ }
+
+ public String serialize()
+ throws JsonProcessingException {
+ return OBJECT_MAPPER.writeValueAsString(this);
+ }
+
+ public static PinotBrokerTimeSeriesResponse newSuccessResponse(Data data) {
+ return new PinotBrokerTimeSeriesResponse(SUCCESS_STATUS, data, null, null);
+ }
+
+ public static PinotBrokerTimeSeriesResponse newErrorResponse(String errorType, String errorMessage) {
+ return new PinotBrokerTimeSeriesResponse(ERROR_STATUS, Data.EMPTY, errorType, errorMessage);
+ }
+
+ public static PinotBrokerTimeSeriesResponse fromTimeSeriesBlock(TimeSeriesBlock seriesBlock) {
+ if (seriesBlock.getTimeBuckets() == null) {
+ throw new UnsupportedOperationException("Non-bucketed series block not supported yet");
+ }
+ return convertBucketedSeriesBlock(seriesBlock);
+ }
+
+ private static PinotBrokerTimeSeriesResponse convertBucketedSeriesBlock(TimeSeriesBlock seriesBlock) {
+ Long[] timeValues = Objects.requireNonNull(seriesBlock.getTimeBuckets()).getTimeBuckets();
+ List result = new ArrayList<>();
+ for (var listOfTimeSeries : seriesBlock.getSeriesMap().values()) {
+ Preconditions.checkState(!listOfTimeSeries.isEmpty(), "Received empty time-series");
+ TimeSeries anySeries = listOfTimeSeries.get(0);
+ // TODO: Ideally we should allow "aliasing" a series, and hence we should store something like a series-name.
+ // Though in most contexts that would serve the same purpose as an ID.
+ Map metricMap = new HashMap<>();
+ metricMap.put(METRIC_NAME_KEY, anySeries.getTagsSerialized());
+ metricMap.putAll(anySeries.getTagKeyValuesAsMap());
+ for (TimeSeries timeSeries : listOfTimeSeries) {
+ Object[][] values = new Object[timeValues.length][];
+ for (int i = 0; i < timeValues.length; i++) {
+ Object nullableValue = timeSeries.getValues()[i];
+ values[i] = new Object[]{timeValues[i], nullableValue == null ? null : nullableValue.toString()};
+ }
+ result.add(new PinotBrokerTimeSeriesResponse.Value(metricMap, values));
+ }
+ }
+ PinotBrokerTimeSeriesResponse.Data data = PinotBrokerTimeSeriesResponse.Data.newMatrix(result);
+ return PinotBrokerTimeSeriesResponse.newSuccessResponse(data);
+ }
+
+ public static class Data {
+ public static final Data EMPTY = new Data("", new ArrayList<>());
+ private final String _resultType;
+ private final List _result;
+
+ @JsonCreator
+ public Data(@JsonProperty("resultType") String resultType, @JsonProperty("result") List result) {
+ _resultType = resultType;
+ _result = result;
+ }
+
+ public String getResultType() {
+ return _resultType;
+ }
+
+ public List getResult() {
+ return _result;
+ }
+
+ public static Data newMatrix(List result) {
+ return new Data("matrix", result);
+ }
+ }
+
+ public static class Value {
+ private final Map _metric;
+ private final Object[][] _values;
+
+ @JsonCreator
+ public Value(@JsonProperty("metric") Map metric, @JsonProperty("values") Object[][] values) {
+ _metric = metric;
+ _values = values;
+ }
+
+ public Map getMetric() {
+ return _metric;
+ }
+
+ public Object[][] getValues() {
+ return _values;
+ }
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/HumanReadableDuration.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/HumanReadableDuration.java
new file mode 100644
index 00000000000..a224676b392
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/HumanReadableDuration.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Preconditions;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+public class HumanReadableDuration {
+ private static final Pattern DURATION_PATTERN = Pattern.compile("(\\d+)\\s*(\\S+)");
+ private static final Map SUFFIXES = createSuffixes();
+
+ private HumanReadableDuration() {
+ }
+
+ private static Map createSuffixes() {
+ Map suffixes = new HashMap<>();
+ suffixes.put("ns", TimeUnit.NANOSECONDS);
+ suffixes.put("nanosecond", TimeUnit.NANOSECONDS);
+ suffixes.put("nanoseconds", TimeUnit.NANOSECONDS);
+ suffixes.put("us", TimeUnit.MICROSECONDS);
+ suffixes.put("microsecond", TimeUnit.MICROSECONDS);
+ suffixes.put("microseconds", TimeUnit.MICROSECONDS);
+ suffixes.put("ms", TimeUnit.MILLISECONDS);
+ suffixes.put("millisecond", TimeUnit.MILLISECONDS);
+ suffixes.put("milliseconds", TimeUnit.MILLISECONDS);
+ suffixes.put("s", TimeUnit.SECONDS);
+ suffixes.put("second", TimeUnit.SECONDS);
+ suffixes.put("seconds", TimeUnit.SECONDS);
+ suffixes.put("m", TimeUnit.MINUTES);
+ suffixes.put("minute", TimeUnit.MINUTES);
+ suffixes.put("minutes", TimeUnit.MINUTES);
+ suffixes.put("h", TimeUnit.HOURS);
+ suffixes.put("hour", TimeUnit.HOURS);
+ suffixes.put("hours", TimeUnit.HOURS);
+ suffixes.put("d", TimeUnit.DAYS);
+ suffixes.put("day", TimeUnit.DAYS);
+ suffixes.put("days", TimeUnit.DAYS);
+ return suffixes;
+ }
+
+ public static Duration from(String durationStr) {
+ final Matcher matcher = DURATION_PATTERN.matcher(durationStr);
+ Preconditions.checkArgument(matcher.matches(), "Invalid duration string: %s", durationStr);
+
+ final long count = Long.parseLong(matcher.group(1));
+ final TimeUnit unit = SUFFIXES.get(matcher.group(2));
+ if (unit == null) {
+ throw new IllegalArgumentException(String.format("Unknown time-unit: %s", matcher.group(2)));
+ }
+ return Duration.of(count, unit.toChronoUnit());
+ }
+}
diff --git a/pinot-common/src/main/proto/worker.proto b/pinot-common/src/main/proto/worker.proto
index a643c53760f..8fe3fadab55 100644
--- a/pinot-common/src/main/proto/worker.proto
+++ b/pinot-common/src/main/proto/worker.proto
@@ -25,6 +25,8 @@ service PinotQueryWorker {
// Dispatch a QueryRequest to a PinotQueryWorker
rpc Submit(QueryRequest) returns (QueryResponse);
+ rpc SubmitTimeSeries(TimeSeriesQueryRequest) returns (TimeSeriesResponse);
+
rpc Cancel(CancelRequest) returns (CancelResponse);
}
@@ -49,6 +51,16 @@ message QueryResponse {
bytes payload = 2;
}
+message TimeSeriesQueryRequest {
+ bytes dispatchPlan = 1;
+ map metadata = 2;
+}
+
+message TimeSeriesResponse {
+ bytes payload = 1;
+ map metadata = 2;
+}
+
message StagePlan {
bytes rootNode = 1; // Serialized StageNode
StageMetadata stageMetadata = 2;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index 6db1d6099a1..33d83843f62 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -171,7 +171,7 @@ public void setUp()
// Setup time series builder factory
TimeSeriesBuilderFactoryProvider.registerSeriesBuilderFactory(TIME_SERIES_ENGINE_NAME,
- SimpleTimeSeriesBuilderFactory.INSTANCE);
+ new SimpleTimeSeriesBuilderFactory());
}
@Test
diff --git a/pinot-distribution/pinot-assembly.xml b/pinot-distribution/pinot-assembly.xml
index 0573e11e383..654417f3b3b 100644
--- a/pinot-distribution/pinot-assembly.xml
+++ b/pinot-distribution/pinot-assembly.xml
@@ -227,6 +227,16 @@
+
+
+
+
+ plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/pinot-timeseries-m3ql-${project.version}-shaded.jar
+
+
+
diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/pom.xml b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/pom.xml
new file mode 100644
index 00000000000..b853e9f3a8d
--- /dev/null
+++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/pom.xml
@@ -0,0 +1,72 @@
+
+
+
+ 4.0.0
+
+ org.apache.pinot
+ pinot-timeseries-lang
+ 1.3.0-SNAPSHOT
+
+
+ pinot-timeseries-m3ql
+
+
+ ${basedir}/../../..
+ 11
+ 11
+ UTF-8
+
+
+
+
+ org.apache.pinot
+ pinot-timeseries-spi
+ provided
+
+
+ com.google.guava
+ guava
+
+
+ org.testng
+ testng
+ test
+
+
+
+
+
+ build-shaded-jar
+
+
+ skipShade
+ !true
+
+
+
+ package
+
+
+
+
diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/Aggregations.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/Aggregations.java
new file mode 100644
index 00000000000..392a83d843c
--- /dev/null
+++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/Aggregations.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.m3ql;
+
+public class Aggregations {
+}
diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/Constants.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/Constants.java
new file mode 100644
index 00000000000..3878a2d5dd4
--- /dev/null
+++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/Constants.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.m3ql;
+
+import java.time.Duration;
+
+
+public class Constants {
+ public static final String LANGUAGE = "m3ql";
+ public static final Duration DEFAULT_RESOLUTION = Duration.ofMinutes(1);
+
+ private Constants() {
+ }
+}
diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
new file mode 100644
index 00000000000..f355e464ebc
--- /dev/null
+++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.m3ql;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.pinot.tsdb.m3ql.parser.Tokenizer;
+import org.apache.pinot.tsdb.m3ql.plan.KeepLastValuePlanNode;
+import org.apache.pinot.tsdb.m3ql.plan.TransformNullPlanNode;
+import org.apache.pinot.tsdb.m3ql.time.TimeBucketComputer;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanResult;
+import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanner;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode;
+
+
+public class M3TimeSeriesPlanner implements TimeSeriesLogicalPlanner {
+ @Override
+ public void init(Map config) {
+ }
+
+ @Override
+ public TimeSeriesLogicalPlanResult plan(RangeTimeSeriesRequest request) {
+ if (!request.getLanguage().equals(Constants.LANGUAGE)) {
+ throw new IllegalArgumentException(String.format("Invalid engine id: %s. Expected: %s", request.getLanguage(),
+ Constants.LANGUAGE));
+ }
+ // Step-1: Parse and create a logical plan tree.
+ BaseTimeSeriesPlanNode planNode = planQuery(request);
+ // Step-2: Compute the time-buckets.
+ TimeBuckets timeBuckets = TimeBucketComputer.compute(planNode, request);
+ return new TimeSeriesLogicalPlanResult(planNode, timeBuckets);
+ }
+
+ public BaseTimeSeriesPlanNode planQuery(RangeTimeSeriesRequest request) {
+ PlanIdGenerator planIdGenerator = new PlanIdGenerator();
+ Tokenizer tokenizer = new Tokenizer(request.getQuery());
+ List> commands = tokenizer.tokenize();
+ Preconditions.checkState(commands.size() > 1, "At least two commands required. "
+ + "Query should start with a fetch followed by an aggregation.");
+ BaseTimeSeriesPlanNode lastNode = null;
+ AggInfo aggInfo = null;
+ List groupByColumns = new ArrayList<>();
+ BaseTimeSeriesPlanNode rootNode = null;
+ for (int commandId = commands.size() - 1; commandId >= 0; commandId--) {
+ String command = commands.get(commandId).get(0);
+ Preconditions.checkState((command.equals("fetch") && commandId == 0)
+ || (!command.equals("fetch") && commandId > 0),
+ "fetch should be the first command");
+ List children = new ArrayList<>();
+ BaseTimeSeriesPlanNode currentNode = null;
+ switch (command) {
+ case "fetch":
+ List tokens = commands.get(commandId).subList(1, commands.get(commandId).size());
+ currentNode = handleFetchNode(planIdGenerator.generateId(), tokens, children, aggInfo, groupByColumns);
+ break;
+ case "sum":
+ case "min":
+ case "max":
+ Preconditions.checkState(commandId == 1,
+ "Aggregation should be the second command (fetch should be first)");
+ Preconditions.checkState(aggInfo == null, "Aggregation already set. Only single agg allowed.");
+ aggInfo = new AggInfo(command.toUpperCase(Locale.ENGLISH));
+ if (commands.get(commandId).size() > 1) {
+ String[] cols = commands.get(commandId).get(1).split(",");
+ groupByColumns = Stream.of(cols).map(String::trim).collect(Collectors.toList());
+ }
+ break;
+ case "keepLastValue":
+ currentNode = new KeepLastValuePlanNode(planIdGenerator.generateId(), children);
+ break;
+ case "transformNull":
+ Double defaultValue = TransformNullPlanNode.DEFAULT_VALUE;
+ if (commands.get(commandId).size() > 1) {
+ defaultValue = Double.parseDouble(commands.get(commandId).get(1));
+ }
+ currentNode = new TransformNullPlanNode(planIdGenerator.generateId(), defaultValue, children);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown function: " + command);
+ }
+ if (currentNode != null) {
+ if (rootNode == null) {
+ rootNode = currentNode;
+ }
+ if (lastNode != null) {
+ lastNode.addChildNode(currentNode);
+ }
+ lastNode = currentNode;
+ }
+ }
+ return rootNode;
+ }
+
+ public BaseTimeSeriesPlanNode handleFetchNode(String planId, List tokens,
+ List children, AggInfo aggInfo, List groupByColumns) {
+ Preconditions.checkState(tokens.size() % 2 == 0, "Mismatched args");
+ String tableName = null;
+ String timeColumn = null;
+ TimeUnit timeUnit = null;
+ String filter = "";
+ String valueExpr = null;
+ for (int idx = 0; idx < tokens.size(); idx += 2) {
+ String key = tokens.get(idx);
+ String value = tokens.get(idx + 1);
+ switch (key) {
+ case "table":
+ tableName = value.replaceAll("\"", "");
+ break;
+ case "ts_column":
+ timeColumn = value.replaceAll("\"", "");
+ break;
+ case "ts_unit":
+ timeUnit = TimeUnit.valueOf(value.replaceAll("\"", "").toUpperCase(Locale.ENGLISH));
+ break;
+ case "filter":
+ filter = value.replaceAll("\"", "");
+ break;
+ case "value":
+ valueExpr = value.replaceAll("\"", "");
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown key: " + key);
+ }
+ }
+ Preconditions.checkNotNull(tableName, "Table name not set. Set via table=");
+ Preconditions.checkNotNull(timeColumn, "Time column not set. Set via time_col=");
+ Preconditions.checkNotNull(timeUnit, "Time unit not set. Set via time_unit=");
+ Preconditions.checkNotNull(valueExpr, "Value expression not set. Set via value=");
+ return new ScanFilterAndProjectPlanNode(planId, children, tableName, timeColumn, timeUnit, 0L, filter, valueExpr,
+ aggInfo, groupByColumns);
+ }
+}
diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/PlanIdGenerator.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/PlanIdGenerator.java
new file mode 100644
index 00000000000..1a719081deb
--- /dev/null
+++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/PlanIdGenerator.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.m3ql;
+
+public class PlanIdGenerator {
+ private Integer _id = 0;
+
+ public PlanIdGenerator() {
+ }
+
+ public String generateId() {
+ return "plan_" + (_id++);
+ }
+}
diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/KeepLastValueOperator.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/KeepLastValueOperator.java
new file mode 100644
index 00000000000..0330dff13b1
--- /dev/null
+++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/KeepLastValueOperator.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.m3ql.operator;
+
+import java.util.List;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+
+
+public class KeepLastValueOperator extends BaseTimeSeriesOperator {
+ public KeepLastValueOperator(List childOperators) {
+ super(childOperators);
+ }
+
+ @Override
+ public TimeSeriesBlock getNextBlock() {
+ TimeSeriesBlock seriesBlock = _childOperators.get(0).nextBlock();
+ seriesBlock.getSeriesMap().values().parallelStream().forEach(unionOfSeries -> {
+ for (TimeSeries series : unionOfSeries) {
+ Double[] values = series.getValues();
+ Double lastValue = null;
+ for (int index = 0; index < values.length; index++) {
+ if (values[index] != null) {
+ lastValue = values[index];
+ } else {
+ values[index] = lastValue;
+ }
+ }
+ }
+ });
+ return seriesBlock;
+ }
+
+ @Override
+ public String getExplainName() {
+ return "KEEP_LAST_VALUE";
+ }
+}
diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/TransformNullOperator.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/TransformNullOperator.java
new file mode 100644
index 00000000000..ca971c932cb
--- /dev/null
+++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/TransformNullOperator.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.m3ql.operator;
+
+import java.util.List;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+
+
+public class TransformNullOperator extends BaseTimeSeriesOperator {
+ private final Double _defaultValue;
+
+ public TransformNullOperator(Double defaultValue, List childOperators) {
+ super(childOperators);
+ _defaultValue = defaultValue;
+ }
+
+ @Override
+ public TimeSeriesBlock getNextBlock() {
+ TimeSeriesBlock seriesBlock = _childOperators.get(0).nextBlock();
+ seriesBlock.getSeriesMap().values().parallelStream().forEach(unionOfSeries -> {
+ for (TimeSeries series : unionOfSeries) {
+ Double[] values = series.getValues();
+ for (int index = 0; index < values.length; index++) {
+ values[index] = values[index] == null ? _defaultValue : values[index];
+ }
+ }
+ });
+ return seriesBlock;
+ }
+
+ @Override
+ public String getExplainName() {
+ return "TRANSFORM_NULL";
+ }
+}
diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/parser/Tokenizer.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/parser/Tokenizer.java
new file mode 100644
index 00000000000..cf38b501b00
--- /dev/null
+++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/parser/Tokenizer.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.m3ql.parser;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+* TODO: Dummy implementation. Will be switched out with a proper implementation soon.
+*/
+public class Tokenizer {
+ private final String _query;
+
+ public Tokenizer(String query) {
+ _query = query;
+ }
+
+ public List> tokenize() {
+ String[] pipelines = _query.split("\\|");
+ List> result = new ArrayList<>();
+ for (String pipeline : pipelines) {
+ String command = pipeline.trim().substring(0, pipeline.indexOf("{"));
+ if (command.equals("fetch")) {
+ result.add(consumeFetch(pipeline.trim()));
+ } else {
+ result.add(consumeGeneric(pipeline.trim()));
+ }
+ }
+ return result;
+ }
+
+ private List consumeFetch(String pipeline) {
+ pipeline = pipeline.trim();
+ String command = pipeline.substring(0, 5);
+ Preconditions.checkState(command.equals("fetch"), "Invalid command: %s", command);
+ pipeline = pipeline.substring(5).trim();
+ int start = pipeline.indexOf("{");
+ int end = pipeline.indexOf("}");
+ String args = pipeline.substring(start + 1, end);
+ List result = new ArrayList<>();
+ result.add("fetch");
+ int indexOfEquals = args.indexOf("=");
+ while (indexOfEquals != -1) {
+ args = args.strip();
+ int equalIndex = args.indexOf("=");
+ int indexOfQuotes = args.indexOf("\"");
+ int lastQuote = indexOfQuotes + 1 + args.substring(indexOfQuotes + 1).indexOf("\"");
+ String key = args.substring(0, equalIndex);
+ String value = args.substring(indexOfQuotes + 1, lastQuote);
+ args = args.substring(lastQuote + 1);
+ result.add(key);
+ result.add(value);
+ if (args.strip().startsWith(",")) {
+ args = args.strip().substring(1);
+ }
+ indexOfEquals = args.indexOf("=");
+ }
+ return result;
+ }
+
+ private List consumeGeneric(String pipeline) {
+ List result = new ArrayList<>();
+ int indexOfOpenBracket = pipeline.indexOf("{");
+ int indexOfClosedBracket = pipeline.indexOf("}");
+ result.add(pipeline.substring(0, indexOfOpenBracket));
+ result.add(pipeline.substring(indexOfOpenBracket + 1, indexOfClosedBracket));
+ return result;
+ }
+}
diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/plan/KeepLastValuePlanNode.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/plan/KeepLastValuePlanNode.java
new file mode 100644
index 00000000000..26359269cd0
--- /dev/null
+++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/plan/KeepLastValuePlanNode.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.m3ql.plan;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+import org.apache.pinot.tsdb.m3ql.operator.KeepLastValueOperator;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+
+
+public class KeepLastValuePlanNode extends BaseTimeSeriesPlanNode {
+ @JsonCreator
+ public KeepLastValuePlanNode(@JsonProperty("id") String id,
+ @JsonProperty("children") List children) {
+ super(id, children);
+ }
+
+ @Override
+ public String getKlass() {
+ return KeepLastValuePlanNode.class.getName();
+ }
+
+ @Override
+ public String getExplainName() {
+ return "KEEP_LAST_VALUE";
+ }
+
+ @Override
+ public BaseTimeSeriesOperator run() {
+ BaseTimeSeriesOperator childOperator = _children.get(0).run();
+ return new KeepLastValueOperator(List.of(childOperator));
+ }
+}
diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/plan/TransformNullPlanNode.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/plan/TransformNullPlanNode.java
new file mode 100644
index 00000000000..c98e4f421e6
--- /dev/null
+++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/plan/TransformNullPlanNode.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.m3ql.plan;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.pinot.tsdb.m3ql.operator.TransformNullOperator;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+
+
+public class TransformNullPlanNode extends BaseTimeSeriesPlanNode {
+ public static final Double DEFAULT_VALUE = 0.0;
+ private final Double _defaultValue;
+
+ @JsonCreator
+ public TransformNullPlanNode(@JsonProperty("id") String id, @JsonProperty("defaultValue") Double defaultValue,
+ @JsonProperty("children") List children) {
+ super(id, children);
+ _defaultValue = defaultValue;
+ }
+
+ public Double getDefaultValue() {
+ return _defaultValue;
+ }
+
+ @Override
+ public String getKlass() {
+ return TransformNullPlanNode.class.getName();
+ }
+
+ @Override
+ public String getExplainName() {
+ return "TRANSFORM_NULL";
+ }
+
+ @Override
+ public BaseTimeSeriesOperator run() {
+ Preconditions.checkState(_children.size() == 1,
+ "TransformNullPlanNode should have only 1 child, got: %s", _children.size());
+ BaseTimeSeriesOperator childOperator = _children.get(0).run();
+ return new TransformNullOperator(_defaultValue, ImmutableList.of(childOperator));
+ }
+}
diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/QueryTimeBoundaryConstraints.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/QueryTimeBoundaryConstraints.java
new file mode 100644
index 00000000000..cfb5a8d8df7
--- /dev/null
+++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/QueryTimeBoundaryConstraints.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.m3ql.time;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+public class QueryTimeBoundaryConstraints {
+ private final Set _divisors = new HashSet<>();
+ private long _leftExtensionSeconds = 0;
+ private long _rightExtensionSeconds = 0;
+ private boolean _leftAligned = false;
+
+ public Set getDivisors() {
+ return _divisors;
+ }
+
+ public long getLeftExtensionSeconds() {
+ return _leftExtensionSeconds;
+ }
+
+ public void setLeftExtensionSeconds(long leftExtensionSeconds) {
+ _leftExtensionSeconds = leftExtensionSeconds;
+ }
+
+ public long getRightExtensionSeconds() {
+ return _rightExtensionSeconds;
+ }
+
+ public void setRightExtensionSeconds(long rightExtensionSeconds) {
+ _rightExtensionSeconds = rightExtensionSeconds;
+ }
+
+ public boolean isLeftAligned() {
+ return _leftAligned;
+ }
+
+ public void setLeftAligned(boolean leftAligned) {
+ _leftAligned = leftAligned;
+ }
+
+ public static QueryTimeBoundaryConstraints merge(QueryTimeBoundaryConstraints left,
+ QueryTimeBoundaryConstraints right) {
+ QueryTimeBoundaryConstraints merged = new QueryTimeBoundaryConstraints();
+ merged._divisors.addAll(left._divisors);
+ merged._divisors.addAll(right._divisors);
+ merged._leftExtensionSeconds = Math.max(left._leftExtensionSeconds, right._leftExtensionSeconds);
+ merged._rightExtensionSeconds = Math.max(left._rightExtensionSeconds, right._rightExtensionSeconds);
+ if (left._leftAligned != right._leftAligned) {
+ throw new IllegalArgumentException(String.format("Cannot merge constraints with different alignments. "
+ + "Alignment from plan node on the left and right are %s and %s respectively",
+ left._leftAligned, right._leftAligned));
+ }
+ merged._leftAligned = left._leftAligned;
+ return merged;
+ }
+}
diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/TimeBucketComputer.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/TimeBucketComputer.java
new file mode 100644
index 00000000000..d79d5c6a8c3
--- /dev/null
+++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/TimeBucketComputer.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.m3ql.time;
+
+import java.time.Duration;
+import java.util.Collection;
+import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode;
+
+
+public class TimeBucketComputer {
+ private TimeBucketComputer() {
+ }
+
+ public static TimeBuckets compute(BaseTimeSeriesPlanNode planNode, RangeTimeSeriesRequest request) {
+ QueryTimeBoundaryConstraints constraints = process(planNode, request);
+ long newStartTime = request.getStartSeconds() - constraints.getLeftExtensionSeconds();
+ long newEndTime = request.getEndSeconds() + constraints.getRightExtensionSeconds();
+ long lcmOfDivisors = lcm(constraints.getDivisors());
+ long roundStartTimeDelta = (lcmOfDivisors - (newEndTime - newStartTime) % lcmOfDivisors) % lcmOfDivisors;
+ newStartTime -= roundStartTimeDelta;
+ int numItems = (int) ((newEndTime - newStartTime) / request.getStepSeconds());
+ return TimeBuckets.ofSeconds(newStartTime, Duration.ofSeconds(request.getStepSeconds()), numItems);
+ }
+
+ public static QueryTimeBoundaryConstraints process(BaseTimeSeriesPlanNode planNode, RangeTimeSeriesRequest request) {
+ if (planNode instanceof ScanFilterAndProjectPlanNode) {
+ QueryTimeBoundaryConstraints constraints = new QueryTimeBoundaryConstraints();
+ constraints.getDivisors().add(request.getStepSeconds());
+ return constraints;
+ }
+ QueryTimeBoundaryConstraints constraints = new QueryTimeBoundaryConstraints();
+ for (BaseTimeSeriesPlanNode childNode : planNode.getChildren()) {
+ QueryTimeBoundaryConstraints childConstraints = process(childNode, request);
+ constraints = QueryTimeBoundaryConstraints.merge(constraints, childConstraints);
+ }
+ return constraints;
+ }
+
+ public static long lcm(Collection values) {
+ long result = 1;
+ for (long value : values) {
+ result = lcm(result, value);
+ }
+ return result;
+ }
+
+ public static long lcm(long a, long b) {
+ return a * b / gcd(a, b);
+ }
+
+ public static long gcd(long a, long b) {
+ if (a < b) {
+ return gcd(b, a);
+ }
+ if (b == 0) {
+ return a;
+ }
+ return gcd(b, a % b);
+ }
+}
diff --git a/pinot-plugins/pinot-timeseries-lang/pom.xml b/pinot-plugins/pinot-timeseries-lang/pom.xml
new file mode 100644
index 00000000000..98dc39789f8
--- /dev/null
+++ b/pinot-plugins/pinot-timeseries-lang/pom.xml
@@ -0,0 +1,45 @@
+
+
+
+ 4.0.0
+
+ org.apache.pinot
+ pinot-plugins
+ 1.3.0-SNAPSHOT
+
+
+ pinot-timeseries-lang
+ pom
+ Pluggable Pinot Time Series Languages
+
+
+ ${basedir}/../..
+ pinot-ts-languages
+
+
+
+ pinot-timeseries-m3ql
+
+
+
\ No newline at end of file
diff --git a/pinot-plugins/pom.xml b/pinot-plugins/pom.xml
index 62b6848299c..fa873b34a69 100644
--- a/pinot-plugins/pom.xml
+++ b/pinot-plugins/pom.xml
@@ -25,7 +25,6 @@
pinotorg.apache.pinot1.3.0-SNAPSHOT
- ..pinot-pluginspom
@@ -48,6 +47,7 @@
pinot-segment-uploaderpinot-environmentassembly-descriptor
+ pinot-timeseries-lang
diff --git a/pinot-query-runtime/pom.xml b/pinot-query-runtime/pom.xml
index 303244b0b0c..14c2f0e085c 100644
--- a/pinot-query-runtime/pom.xml
+++ b/pinot-query-runtime/pom.xml
@@ -40,6 +40,14 @@
org.apache.pinotpinot-query-planner
+
+ org.apache.pinot
+ pinot-timeseries-spi
+
+
+ org.apache.pinot
+ pinot-timeseries-planner
+ org.apache.pinot
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 817062e9508..49a45d1c2b8 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -19,15 +19,25 @@
package org.apache.pinot.query.runtime;
import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.datatable.StatMap;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.query.executor.QueryExecutor;
@@ -49,12 +59,23 @@
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestUtils;
+import org.apache.pinot.query.runtime.timeseries.PhysicalTimeSeriesPlanVisitor;
+import org.apache.pinot.query.runtime.timeseries.TimeSeriesExecutionContext;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.executor.ExecutorServiceUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode;
+import org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants.WorkerRequestMetadataKeys;
+import org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants.WorkerResponseMetadataKeys;
+import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.serde.TimeSeriesPlanSerde;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactoryProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -128,6 +149,10 @@ public void init(PinotConfiguration config, InstanceDataManager instanceDataMana
} catch (Exception e) {
throw new RuntimeException(e);
}
+ if (StringUtils.isNotBlank(config.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey()))) {
+ PhysicalTimeSeriesPlanVisitor.INSTANCE.init(_leafQueryExecutor, _executorService, serverMetrics);
+ TimeSeriesBuilderFactoryProvider.init(config);
+ }
LOGGER.info("Initialized QueryRunner with hostname: {}, port: {}", hostname, port);
}
@@ -210,6 +235,69 @@ public void processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan, Map
_opChainScheduler.register(opChain);
}
+ /**
+ * Receives a serialized plan sent by the broker, and runs it to completion, blocking the thread until the execution
+ * is complete.
+ * TODO: This design is at odds with MSE because MSE runs even the leaf stage via OpChainSchedulerService.
+ * However, both OpChain scheduler and this method use the same ExecutorService.
+ */
+ public void processTimeSeriesQuery(String serializedPlan, Map metadata,
+ StreamObserver responseObserver) {
+ // Define a common way to handle errors.
+ final Consumer handleErrors = (t) -> {
+ Map errorMetadata = new HashMap<>();
+ errorMetadata.put(WorkerResponseMetadataKeys.ERROR_TYPE, t.getClass().getSimpleName());
+ errorMetadata.put(WorkerResponseMetadataKeys.ERROR_MESSAGE, t.getMessage());
+ responseObserver.onNext(Worker.TimeSeriesResponse.newBuilder().putAllMetadata(errorMetadata).build());
+ responseObserver.onCompleted();
+ };
+ try {
+ // Deserialize plan, and compile to create a tree of operators.
+ BaseTimeSeriesPlanNode rootNode = TimeSeriesPlanSerde.deserialize(serializedPlan);
+ TimeSeriesExecutionContext context = new TimeSeriesExecutionContext(
+ metadata.get(WorkerRequestMetadataKeys.LANGUAGE), extractTimeBuckets(metadata),
+ extractPlanToSegmentMap(metadata));
+ BaseTimeSeriesOperator operator = PhysicalTimeSeriesPlanVisitor.INSTANCE.compile(rootNode, context);
+ // Run the operator using the same executor service as OpChainSchedulerService
+ _executorService.submit(() -> {
+ try {
+ TimeSeriesBlock seriesBlock = operator.nextBlock();
+ Worker.TimeSeriesResponse response = Worker.TimeSeriesResponse.newBuilder()
+ .setPayload(ByteString.copyFrom(
+ PinotBrokerTimeSeriesResponse.fromTimeSeriesBlock(seriesBlock).serialize(),
+ StandardCharsets.UTF_8))
+ .build();
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
+ } catch (Throwable t) {
+ handleErrors.accept(t);
+ }
+ });
+ } catch (Throwable t) {
+ handleErrors.accept(t);
+ }
+ }
+
+ public TimeBuckets extractTimeBuckets(Map metadataMap) {
+ long startTimeSeconds = Long.parseLong(metadataMap.get(WorkerRequestMetadataKeys.START_TIME_SECONDS));
+ long windowSeconds = Long.parseLong(metadataMap.get(WorkerRequestMetadataKeys.WINDOW_SECONDS));
+ int numElements = Integer.parseInt(metadataMap.get(WorkerRequestMetadataKeys.NUM_ELEMENTS));
+ return TimeBuckets.ofSeconds(startTimeSeconds, Duration.ofSeconds(windowSeconds), numElements);
+ }
+
+ public Map> extractPlanToSegmentMap(Map metadataMap) {
+ Map> result = new HashMap<>();
+ for (var entry : metadataMap.entrySet()) {
+ if (WorkerRequestMetadataKeys.isKeySegmentList(entry.getKey())) {
+ String planId = WorkerRequestMetadataKeys.decodeSegmentListKey(entry.getKey());
+ String[] segments = entry.getValue().split(",");
+ result.put(planId,
+ Stream.of(segments).map(String::strip).collect(Collectors.toList()));
+ }
+ }
+ return result;
+ }
+
private Map consolidateMetadata(Map customProperties,
Map requestMetadata) {
Map opChainMetadata = new HashMap<>();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
new file mode 100644
index 00000000000..74da11c5b87
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.timeseries;
+
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.collections.MapUtils;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock;
+import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+
+
+public class LeafTimeSeriesOperator extends BaseTimeSeriesOperator {
+ private final ServerQueryRequest _request;
+ private final QueryExecutor _queryExecutor;
+ private final ExecutorService _executorService;
+
+ public LeafTimeSeriesOperator(ServerQueryRequest serverQueryRequest, QueryExecutor queryExecutor,
+ ExecutorService executorService) {
+ super(Collections.emptyList());
+ _request = serverQueryRequest;
+ _queryExecutor = queryExecutor;
+ _executorService = executorService;
+ }
+
+ @Override
+ public TimeSeriesBlock getNextBlock() {
+ Preconditions.checkNotNull(_queryExecutor, "Leaf time series operator has not been initialized");
+ InstanceResponseBlock instanceResponseBlock = _queryExecutor.execute(_request, _executorService);
+ assert instanceResponseBlock.getResultsBlock() instanceof TimeSeriesResultsBlock;
+ if (MapUtils.isNotEmpty(instanceResponseBlock.getExceptions())) {
+ // TODO: Return error in the TimeSeriesBlock instead?
+ String oneException = instanceResponseBlock.getExceptions().values().iterator().next();
+ throw new RuntimeException(oneException);
+ }
+ return ((TimeSeriesResultsBlock) instanceResponseBlock.getResultsBlock()).getTimeSeriesBlock();
+ }
+
+ @Override
+ public String getExplainName() {
+ return "TIME_SERIES_LEAF_STAGE_OPERATOR";
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java
new file mode 100644
index 00000000000..258e41c51eb
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.timeseries;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.common.request.context.TimeSeriesContext;
+import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode;
+
+
+public class PhysicalTimeSeriesPlanVisitor {
+ public static final PhysicalTimeSeriesPlanVisitor INSTANCE = new PhysicalTimeSeriesPlanVisitor();
+
+ private QueryExecutor _queryExecutor;
+ private ExecutorService _executorService;
+ private ServerMetrics _serverMetrics;
+
+ private PhysicalTimeSeriesPlanVisitor() {
+ }
+
+ public void init(QueryExecutor queryExecutor, ExecutorService executorService, ServerMetrics serverMetrics) {
+ _queryExecutor = queryExecutor;
+ _executorService = executorService;
+ _serverMetrics = serverMetrics;
+ }
+
+ public BaseTimeSeriesOperator compile(BaseTimeSeriesPlanNode rootNode, TimeSeriesExecutionContext context) {
+ // Step-1: Replace scan filter project with our physical plan node with Pinot Core and Runtime context
+ initLeafPlanNode(rootNode, context);
+ // Step-2: Trigger recursive operator generation
+ return rootNode.run();
+ }
+
+ public void initLeafPlanNode(BaseTimeSeriesPlanNode planNode, TimeSeriesExecutionContext context) {
+ for (int index = 0; index < planNode.getChildren().size(); index++) {
+ BaseTimeSeriesPlanNode childNode = planNode.getChildren().get(index);
+ if (childNode instanceof ScanFilterAndProjectPlanNode) {
+ ScanFilterAndProjectPlanNode sfpNode = (ScanFilterAndProjectPlanNode) childNode;
+ List segments = context.getPlanIdToSegmentsMap().get(sfpNode.getId());
+ ServerQueryRequest serverQueryRequest = compileLeafServerQueryRequest(sfpNode, segments, context);
+ TimeSeriesPhysicalTableScan physicalTableScan = new TimeSeriesPhysicalTableScan(childNode.getId(),
+ serverQueryRequest, _queryExecutor, _executorService);
+ planNode.getChildren().set(index, physicalTableScan);
+ } else {
+ initLeafPlanNode(childNode, context);
+ }
+ }
+ }
+
+ public ServerQueryRequest compileLeafServerQueryRequest(ScanFilterAndProjectPlanNode sfpNode, List segments,
+ TimeSeriesExecutionContext context) {
+ return new ServerQueryRequest(compileQueryContext(sfpNode, context),
+ segments, /* TODO: Pass metadata from request */ Collections.emptyMap(), _serverMetrics);
+ }
+
+ public QueryContext compileQueryContext(ScanFilterAndProjectPlanNode sfpNode, TimeSeriesExecutionContext context) {
+ FilterContext filterContext =
+ RequestContextUtils.getFilter(CalciteSqlParser.compileToExpression(
+ sfpNode.getEffectiveFilter(context.getInitialTimeBuckets())));
+ List groupByExpressions = sfpNode.getGroupByColumns().stream()
+ .map(ExpressionContext::forIdentifier).collect(Collectors.toList());
+ ExpressionContext valueExpression = RequestContextUtils.getExpression(sfpNode.getValueExpression());
+ TimeSeriesContext timeSeriesContext = new TimeSeriesContext(context.getLanguage(),
+ sfpNode.getTimeColumn(),
+ sfpNode.getTimeUnit(), context.getInitialTimeBuckets(), sfpNode.getOffset(),
+ valueExpression,
+ sfpNode.getAggInfo());
+ return new QueryContext.Builder()
+ .setTableName(sfpNode.getTableName())
+ .setFilter(filterContext)
+ .setGroupByExpressions(groupByExpressions)
+ .setSelectExpressions(Collections.emptyList())
+ .setQueryOptions(Collections.emptyMap())
+ .setAliasList(Collections.emptyList())
+ .setTimeSeriesContext(timeSeriesContext)
+ .setLimit(Integer.MAX_VALUE)
+ .build();
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExecutionContext.java
new file mode 100644
index 00000000000..4d093af3edf
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExecutionContext.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.timeseries;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+
+
+public class TimeSeriesExecutionContext {
+ private final String _language;
+ private final TimeBuckets _initialTimeBuckets;
+ private final Map> _planIdToSegmentsMap;
+
+ public TimeSeriesExecutionContext(String language, TimeBuckets initialTimeBuckets,
+ Map> planIdToSegmentsMap) {
+ _language = language;
+ _initialTimeBuckets = initialTimeBuckets;
+ _planIdToSegmentsMap = planIdToSegmentsMap;
+ }
+
+ public String getLanguage() {
+ return _language;
+ }
+
+ public TimeBuckets getInitialTimeBuckets() {
+ return _initialTimeBuckets;
+ }
+
+ public Map> getPlanIdToSegmentsMap() {
+ return _planIdToSegmentsMap;
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesPhysicalTableScan.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesPhysicalTableScan.java
new file mode 100644
index 00000000000..272e5564983
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesPhysicalTableScan.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.timeseries;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+
+
+public class TimeSeriesPhysicalTableScan extends BaseTimeSeriesPlanNode {
+ private final ServerQueryRequest _request;
+ private final QueryExecutor _queryExecutor;
+ private final ExecutorService _executorService;
+
+ public TimeSeriesPhysicalTableScan(
+ String id,
+ ServerQueryRequest serverQueryRequest,
+ QueryExecutor queryExecutor,
+ ExecutorService executorService) {
+ super(id, Collections.emptyList());
+ _request = serverQueryRequest;
+ _queryExecutor = queryExecutor;
+ _executorService = executorService;
+ }
+
+ public ServerQueryRequest getServerQueryRequest() {
+ return _request;
+ }
+
+ public QueryExecutor getQueryExecutor() {
+ return _queryExecutor;
+ }
+
+ public ExecutorService getExecutorService() {
+ return _executorService;
+ }
+
+ public String getKlass() {
+ return TimeSeriesPhysicalTableScan.class.getName();
+ }
+
+ @Override
+ public String getExplainName() {
+ return "PHYSICAL_TABLE_SCAN";
+ }
+
+ @Override
+ public BaseTimeSeriesOperator run() {
+ return new LeafTimeSeriesOperator(_request, _queryExecutor, _executorService);
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index fc8874911ec..e619087ae0d 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -18,10 +18,12 @@
*/
package org.apache.pinot.query.service.dispatch;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import io.grpc.Deadline;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -36,9 +38,11 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
import org.apache.calcite.runtime.PairList;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
@@ -60,10 +64,16 @@
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.service.dispatch.timeseries.AsyncQueryTimeSeriesDispatchResponse;
+import org.apache.pinot.query.service.dispatch.timeseries.TimeSeriesDispatchClient;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants.WorkerRequestMetadataKeys;
+import org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants.WorkerResponseMetadataKeys;
+import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan;
+import org.apache.pinot.tsdb.planner.physical.TimeSeriesQueryServerInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,10 +84,12 @@
public class QueryDispatcher {
private static final Logger LOGGER = LoggerFactory.getLogger(QueryDispatcher.class);
private static final String PINOT_BROKER_QUERY_DISPATCHER_FORMAT = "multistage-query-dispatch-%d";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final MailboxService _mailboxService;
private final ExecutorService _executorService;
private final Map _dispatchClientMap = new ConcurrentHashMap<>();
+ private final Map _timeSeriesDispatchClientMap = new ConcurrentHashMap<>();
public QueryDispatcher(MailboxService mailboxService) {
_mailboxService = mailboxService;
@@ -103,6 +115,41 @@ public QueryResult submitAndReduce(RequestContext context, DispatchableSubPlan d
}
}
+ public PinotBrokerTimeSeriesResponse submitAndGet(RequestContext context, TimeSeriesDispatchablePlan plan,
+ long timeoutMs, Map queryOptions) {
+ long requestId = context.getRequestId();
+ BlockingQueue receiver = new ArrayBlockingQueue<>(10);
+ try {
+ submit(requestId, plan, timeoutMs, queryOptions, receiver::offer);
+ AsyncQueryTimeSeriesDispatchResponse received = receiver.poll(timeoutMs, TimeUnit.MILLISECONDS);
+ if (received == null) {
+ return PinotBrokerTimeSeriesResponse.newErrorResponse(
+ "TimeoutException", "Timed out waiting for response");
+ }
+ if (received.getThrowable() != null) {
+ Throwable t = received.getThrowable();
+ return PinotBrokerTimeSeriesResponse.newErrorResponse(t.getClass().getSimpleName(), t.getMessage());
+ }
+ if (received.getQueryResponse() == null) {
+ return PinotBrokerTimeSeriesResponse.newErrorResponse("NullResponse", "Received null response from server");
+ }
+ if (received.getQueryResponse().containsMetadata(
+ WorkerResponseMetadataKeys.ERROR_MESSAGE)) {
+ return PinotBrokerTimeSeriesResponse.newErrorResponse(
+ received.getQueryResponse().getMetadataOrDefault(
+ WorkerResponseMetadataKeys.ERROR_TYPE, "unknown error-type"),
+ received.getQueryResponse().getMetadataOrDefault(
+ WorkerResponseMetadataKeys.ERROR_MESSAGE, "unknown error"));
+ }
+ Worker.TimeSeriesResponse timeSeriesResponse = received.getQueryResponse();
+ Preconditions.checkNotNull(timeSeriesResponse, "time series response is null");
+ return OBJECT_MAPPER.readValue(
+ timeSeriesResponse.getPayload().toStringUtf8(), PinotBrokerTimeSeriesResponse.class);
+ } catch (Throwable t) {
+ return PinotBrokerTimeSeriesResponse.newErrorResponse(t.getClass().getSimpleName(), t.getMessage());
+ }
+ }
+
@VisibleForTesting
void submit(long requestId, DispatchableSubPlan dispatchableSubPlan, long timeoutMs, Map queryOptions)
throws Exception {
@@ -211,6 +258,37 @@ void submit(long requestId, DispatchableSubPlan dispatchableSubPlan, long timeou
}
}
+ void submit(long requestId, TimeSeriesDispatchablePlan plan, long timeoutMs, Map queryOptions,
+ Consumer receiver)
+ throws Exception {
+ Deadline deadline = Deadline.after(timeoutMs, TimeUnit.MILLISECONDS);
+ String serializedPlan = plan.getSerializedPlan();
+ Worker.TimeSeriesQueryRequest request = Worker.TimeSeriesQueryRequest.newBuilder()
+ .setDispatchPlan(ByteString.copyFrom(serializedPlan, StandardCharsets.UTF_8))
+ .putAllMetadata(initializeTimeSeriesMetadataMap(plan))
+ .putMetadata(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, Long.toString(requestId))
+ .build();
+ getOrCreateTimeSeriesDispatchClient(plan.getQueryServerInstance()).submit(request,
+ new QueryServerInstance(plan.getQueryServerInstance().getHostname(),
+ plan.getQueryServerInstance().getQueryServicePort(), plan.getQueryServerInstance().getQueryMailboxPort()),
+ deadline, receiver::accept);
+ };
+
+ Map initializeTimeSeriesMetadataMap(TimeSeriesDispatchablePlan dispatchablePlan) {
+ Map result = new HashMap<>();
+ result.put(WorkerRequestMetadataKeys.LANGUAGE, dispatchablePlan.getLanguage());
+ result.put(WorkerRequestMetadataKeys.START_TIME_SECONDS,
+ Long.toString(dispatchablePlan.getTimeBuckets().getStartTime()));
+ result.put(WorkerRequestMetadataKeys.WINDOW_SECONDS,
+ Long.toString(dispatchablePlan.getTimeBuckets().getBucketSize().getSeconds()));
+ result.put(WorkerRequestMetadataKeys.NUM_ELEMENTS,
+ Long.toString(dispatchablePlan.getTimeBuckets().getTimeBuckets().length));
+ for (Map.Entry> entry : dispatchablePlan.getPlanIdToSegments().entrySet()) {
+ result.put(WorkerRequestMetadataKeys.encodeSegmentListKey(entry.getKey()), String.join(",", entry.getValue()));
+ }
+ return result;
+ }
+
private static class StageInfo {
final ByteString _rootNode;
final ByteString _customProperty;
@@ -245,6 +323,14 @@ private DispatchClient getOrCreateDispatchClient(QueryServerInstance queryServer
return _dispatchClientMap.computeIfAbsent(key, k -> new DispatchClient(hostname, port));
}
+ private TimeSeriesDispatchClient getOrCreateTimeSeriesDispatchClient(
+ TimeSeriesQueryServerInstance queryServerInstance) {
+ String hostname = queryServerInstance.getHostname();
+ int port = queryServerInstance.getQueryServicePort();
+ String key = String.format("%s_%d", hostname, port);
+ return _timeSeriesDispatchClientMap.computeIfAbsent(key, k -> new TimeSeriesDispatchClient(hostname, port));
+ }
+
@VisibleForTesting
public static QueryResult runReducer(long requestId, DispatchableSubPlan dispatchableSubPlan, long timeoutMs,
Map queryOptions, MailboxService mailboxService) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/AsyncQueryTimeSeriesDispatchResponse.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/AsyncQueryTimeSeriesDispatchResponse.java
new file mode 100644
index 00000000000..b00919d3bf3
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/AsyncQueryTimeSeriesDispatchResponse.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.service.dispatch.timeseries;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.query.routing.QueryServerInstance;
+
+
+/**
+ * Response of the broker dispatch to the server.
+ * TODO: This shouldn't exist and we should re-use AsyncQueryDispatchResponse. TBD as part of multi-stage
+ * engine integration.
+ */
+public class AsyncQueryTimeSeriesDispatchResponse {
+ private final QueryServerInstance _serverInstance;
+ private final Worker.TimeSeriesResponse _queryResponse;
+ private final Throwable _throwable;
+
+ public AsyncQueryTimeSeriesDispatchResponse(QueryServerInstance serverInstance,
+ @Nullable Worker.TimeSeriesResponse queryResponse,
+ @Nullable Throwable throwable) {
+ _serverInstance = serverInstance;
+ _queryResponse = queryResponse;
+ _throwable = throwable;
+ }
+
+ public QueryServerInstance getServerInstance() {
+ return _serverInstance;
+ }
+
+ @Nullable
+ public Worker.TimeSeriesResponse getQueryResponse() {
+ return _queryResponse;
+ }
+
+ @Nullable
+ public Throwable getThrowable() {
+ return _throwable;
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchClient.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchClient.java
new file mode 100644
index 00000000000..a68e636b96a
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchClient.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.service.dispatch.timeseries;
+
+import io.grpc.Deadline;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import java.util.function.Consumer;
+import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
+import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.query.routing.QueryServerInstance;
+
+
+/**
+ * Dispatch client used to dispatch a runnable plan to the server.
+ * TODO: This shouldn't exist and we should re-use DispatchClient. TBD as part of multi-stage
+ * engine integration.
+ */
+public class TimeSeriesDispatchClient {
+ private final ManagedChannel _channel;
+ private final PinotQueryWorkerGrpc.PinotQueryWorkerStub _dispatchStub;
+
+ public TimeSeriesDispatchClient(String host, int port) {
+ _channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
+ _dispatchStub = PinotQueryWorkerGrpc.newStub(_channel);
+ }
+
+ public ManagedChannel getChannel() {
+ return _channel;
+ }
+
+ public void submit(Worker.TimeSeriesQueryRequest request, QueryServerInstance virtualServer, Deadline deadline,
+ Consumer callback) {
+ _dispatchStub.withDeadline(deadline).submitTimeSeries(
+ request, new TimeSeriesDispatchObserver(virtualServer, callback));
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java
new file mode 100644
index 00000000000..ccfe0e122cb
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.service.dispatch.timeseries;
+
+import io.grpc.stub.StreamObserver;
+import java.util.function.Consumer;
+import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.query.routing.QueryServerInstance;
+
+
+/**
+ * Response observer for a time-series query request.
+ * TODO: This shouldn't exist and we should re-use DispatchObserver. TBD as part of multi-stage
+ * engine integration.
+ */
+public class TimeSeriesDispatchObserver implements StreamObserver {
+ private final QueryServerInstance _serverInstance;
+ private final Consumer _callback;
+
+ private Worker.TimeSeriesResponse _timeSeriesResponse;
+
+ public TimeSeriesDispatchObserver(QueryServerInstance serverInstance,
+ Consumer callback) {
+ _serverInstance = serverInstance;
+ _callback = callback;
+ }
+
+ @Override
+ public void onNext(Worker.TimeSeriesResponse timeSeriesResponse) {
+ _timeSeriesResponse = timeSeriesResponse;
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ _callback.accept(
+ new AsyncQueryTimeSeriesDispatchResponse(
+ _serverInstance,
+ Worker.TimeSeriesResponse.getDefaultInstance(),
+ throwable));
+ }
+
+ @Override
+ public void onCompleted() {
+ _callback.accept(
+ new AsyncQueryTimeSeriesDispatchResponse(
+ _serverInstance,
+ _timeSeriesResponse,
+ null));
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index 65e4ca7df07..3a28454db5b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.service.server;
+import com.google.protobuf.ByteString;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
@@ -192,6 +193,13 @@ public void submit(Worker.QueryRequest request, StreamObserver responseObserver) {
+ ByteString bytes = request.getDispatchPlan();
+ _queryRunner.processTimeSeriesQuery(bytes.toStringUtf8(), request.getMetadataMap(), responseObserver);
+ }
+
@Override
public void cancel(Worker.CancelRequest request, StreamObserver responseObserver) {
try {
diff --git a/pinot-timeseries/pinot-timeseries-planner/pom.xml b/pinot-timeseries/pinot-timeseries-planner/pom.xml
new file mode 100644
index 00000000000..134fbc66741
--- /dev/null
+++ b/pinot-timeseries/pinot-timeseries-planner/pom.xml
@@ -0,0 +1,57 @@
+
+
+
+ 4.0.0
+
+ org.apache.pinot
+ pinot-timeseries
+ 1.3.0-SNAPSHOT
+
+
+ pinot-timeseries-planner
+
+
+ ${basedir}/../..
+ 11
+ 11
+ UTF-8
+
+
+
+
+ org.apache.pinot
+ pinot-core
+
+
+ org.apache.pinot
+ pinot-timeseries-spi
+
+
+ org.testng
+ testng
+ test
+
+
+
+
\ No newline at end of file
diff --git a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanConstants.java b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanConstants.java
new file mode 100644
index 00000000000..f764ea720ef
--- /dev/null
+++ b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanConstants.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.planner;
+
+public class TimeSeriesPlanConstants {
+ private TimeSeriesPlanConstants() {
+ }
+
+ public static class WorkerRequestMetadataKeys {
+ private static final String SEGMENT_MAP_ENTRY_PREFIX = "$segmentMapEntry#";
+
+ private WorkerRequestMetadataKeys() {
+ }
+
+ public static final String LANGUAGE = "language";
+ public static final String START_TIME_SECONDS = "startTimeSeconds";
+ public static final String WINDOW_SECONDS = "windowSeconds";
+ public static final String NUM_ELEMENTS = "numElements";
+
+ public static boolean isKeySegmentList(String key) {
+ return key.startsWith(SEGMENT_MAP_ENTRY_PREFIX);
+ }
+
+ public static String encodeSegmentListKey(String planId) {
+ return SEGMENT_MAP_ENTRY_PREFIX + planId;
+ }
+
+ /**
+ * Returns the plan-id corresponding to the encoded key.
+ */
+ public static String decodeSegmentListKey(String key) {
+ return key.substring(SEGMENT_MAP_ENTRY_PREFIX.length());
+ }
+ }
+
+ public static class WorkerResponseMetadataKeys {
+ public static final String ERROR_TYPE = "errorType";
+ public static final String ERROR_MESSAGE = "error";
+ }
+}
diff --git a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java
new file mode 100644
index 00000000000..dada38f6f32
--- /dev/null
+++ b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.planner;
+
+import com.google.common.base.Preconditions;
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.DataSource;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.request.QuerySource;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.tsdb.planner.physical.TableScanVisitor;
+import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan;
+import org.apache.pinot.tsdb.planner.physical.TimeSeriesQueryServerInstance;
+import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration;
+import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest;
+import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanResult;
+import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanner;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode;
+import org.apache.pinot.tsdb.spi.plan.serde.TimeSeriesPlanSerde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TimeSeriesQueryEnvironment {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TimeSeriesQueryEnvironment.class);
+ private final RoutingManager _routingManager;
+ private final TableCache _tableCache;
+ private final Map _plannerMap = new HashMap<>();
+
+ public TimeSeriesQueryEnvironment(PinotConfiguration config, RoutingManager routingManager, TableCache tableCache) {
+ _routingManager = routingManager;
+ _tableCache = tableCache;
+ }
+
+ public void init(PinotConfiguration config) {
+ String[] languages = config.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey(), "")
+ .split(",");
+ LOGGER.info("Found {} configured time series languages. List: {}", languages.length, languages);
+ for (String language : languages) {
+ String configPrefix = PinotTimeSeriesConfiguration.getLogicalPlannerConfigKey(language);
+ String klassName =
+ config.getProperty(PinotTimeSeriesConfiguration.getLogicalPlannerConfigKey(language));
+ Preconditions.checkNotNull(klassName, "Logical planner class not found for language: " + language);
+ // Create the planner with empty constructor
+ try {
+ Class> klass = TimeSeriesQueryEnvironment.class.getClassLoader().loadClass(klassName);
+ Constructor> constructor = klass.getConstructor();
+ TimeSeriesLogicalPlanner planner = (TimeSeriesLogicalPlanner) constructor.newInstance();
+ planner.init(config.subset(configPrefix).toMap());
+ _plannerMap.put(language, planner);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to instantiate logical planner for language: " + language, e);
+ }
+ }
+ TableScanVisitor.INSTANCE.init(_routingManager);
+ }
+
+ public TimeSeriesLogicalPlanResult buildLogicalPlan(RangeTimeSeriesRequest request) {
+ Preconditions.checkState(_plannerMap.containsKey(request.getLanguage()),
+ "No logical planner found for engine: %s. Available: %s", request.getLanguage(),
+ _plannerMap.keySet());
+ return _plannerMap.get(request.getLanguage()).plan(request);
+ }
+
+ public TimeSeriesDispatchablePlan buildPhysicalPlan(RangeTimeSeriesRequest timeSeriesRequest,
+ RequestContext requestContext, TimeSeriesLogicalPlanResult logicalPlan) {
+ // Step-1: Find tables in the query.
+ final Set tableNames = new HashSet<>();
+ findTableNames(logicalPlan.getPlanNode(), tableNames::add);
+ Preconditions.checkState(tableNames.size() == 1,
+ "Expected exactly one table name in the logical plan, got: %s",
+ tableNames);
+ String tableName = tableNames.iterator().next();
+ // Step-2: Compute routing table assuming all segments are selected. This is to perform the check to reject tables
+ // that span across multiple servers.
+ RoutingTable routingTable = _routingManager.getRoutingTable(compileBrokerRequest(tableName),
+ requestContext.getRequestId());
+ Preconditions.checkState(routingTable != null,
+ "Failed to get routing table for table: %s", tableName);
+ Preconditions.checkState(routingTable.getServerInstanceToSegmentsMap().size() == 1,
+ "Only support routing to a single server. Computed: %s",
+ routingTable.getServerInstanceToSegmentsMap().size());
+ var entry = routingTable.getServerInstanceToSegmentsMap().entrySet().iterator().next();
+ ServerInstance serverInstance = entry.getKey();
+ // Step-3: Assign segments to the leaf plan nodes.
+ TableScanVisitor.Context scanVisitorContext = TableScanVisitor.createContext(requestContext.getRequestId());
+ TableScanVisitor.INSTANCE.assignSegmentsToPlan(logicalPlan.getPlanNode(), logicalPlan.getTimeBuckets(),
+ scanVisitorContext);
+ return new TimeSeriesDispatchablePlan(timeSeriesRequest.getLanguage(),
+ new TimeSeriesQueryServerInstance(serverInstance),
+ TimeSeriesPlanSerde.serialize(logicalPlan.getPlanNode()), logicalPlan.getTimeBuckets(),
+ scanVisitorContext.getPlanIdToSegmentMap());
+ }
+
+ public static void findTableNames(BaseTimeSeriesPlanNode planNode, Consumer tableNameConsumer) {
+ if (planNode instanceof ScanFilterAndProjectPlanNode) {
+ ScanFilterAndProjectPlanNode scanNode = (ScanFilterAndProjectPlanNode) planNode;
+ tableNameConsumer.accept(scanNode.getTableName());
+ return;
+ }
+ for (BaseTimeSeriesPlanNode childNode : planNode.getChildren()) {
+ findTableNames(childNode, tableNameConsumer);
+ }
+ }
+
+ private BrokerRequest compileBrokerRequest(String tableName) {
+ DataSource dataSource = new DataSource();
+ dataSource.setTableName(tableName);
+ PinotQuery pinotQuery = new PinotQuery();
+ pinotQuery.setDataSource(dataSource);
+ QuerySource querySource = new QuerySource();
+ querySource.setTableName(tableName);
+ BrokerRequest dummyRequest = new BrokerRequest();
+ dummyRequest.setPinotQuery(pinotQuery);
+ dummyRequest.setQuerySource(querySource);
+ return dummyRequest;
+ }
+}
diff --git a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java
new file mode 100644
index 00000000000..58eccd2de0b
--- /dev/null
+++ b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.planner.physical;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.DataSource;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.request.QuerySource;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode;
+
+
+public class TableScanVisitor {
+ public static final TableScanVisitor INSTANCE = new TableScanVisitor();
+ private RoutingManager _routingManager;
+
+ private TableScanVisitor() {
+ }
+
+ public void init(RoutingManager routingManager) {
+ _routingManager = routingManager;
+ }
+
+ public void assignSegmentsToPlan(BaseTimeSeriesPlanNode planNode, TimeBuckets timeBuckets, Context context) {
+ if (planNode instanceof ScanFilterAndProjectPlanNode) {
+ ScanFilterAndProjectPlanNode sfpNode = (ScanFilterAndProjectPlanNode) planNode;
+ Expression filterExpression = CalciteSqlParser.compileToExpression(sfpNode.getEffectiveFilter(timeBuckets));
+ RoutingTable routingTable = _routingManager.getRoutingTable(
+ compileBrokerRequest(sfpNode.getTableName(), filterExpression),
+ context._requestId);
+ Preconditions.checkNotNull(routingTable, "Failed to get routing table for table: " + sfpNode.getTableName());
+ Preconditions.checkState(routingTable.getServerInstanceToSegmentsMap().size() == 1,
+ "Only support routing to a single server. Computed: %s",
+ routingTable.getServerInstanceToSegmentsMap().size());
+ var entry = routingTable.getServerInstanceToSegmentsMap().entrySet().iterator().next();
+ List segments = entry.getValue().getLeft();
+ context.getPlanIdToSegmentMap().put(sfpNode.getId(), segments);
+ }
+ for (BaseTimeSeriesPlanNode childNode : planNode.getChildren()) {
+ assignSegmentsToPlan(childNode, timeBuckets, context);
+ }
+ }
+
+ public static Context createContext(Long requestId) {
+ return new Context(requestId);
+ }
+
+ public static class Context {
+ private final Map> _planIdToSegmentMap = new HashMap<>();
+ private final Long _requestId;
+
+ public Context(Long requestId) {
+ _requestId = requestId;
+ }
+
+ public Map> getPlanIdToSegmentMap() {
+ return _planIdToSegmentMap;
+ }
+ }
+
+ private BrokerRequest compileBrokerRequest(String tableName, Expression filterExpression) {
+ DataSource dataSource = new DataSource();
+ dataSource.setTableName(tableName);
+ PinotQuery pinotQuery = new PinotQuery();
+ pinotQuery.setDataSource(dataSource);
+ pinotQuery.setFilterExpression(filterExpression);
+ QuerySource querySource = new QuerySource();
+ querySource.setTableName(tableName);
+ BrokerRequest dummyRequest = new BrokerRequest();
+ dummyRequest.setPinotQuery(pinotQuery);
+ dummyRequest.setQuerySource(querySource);
+ return dummyRequest;
+ }
+}
diff --git a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TimeSeriesDispatchablePlan.java b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TimeSeriesDispatchablePlan.java
new file mode 100644
index 00000000000..6c64a396d82
--- /dev/null
+++ b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TimeSeriesDispatchablePlan.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.planner.physical;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+
+
+public class TimeSeriesDispatchablePlan {
+ private final TimeSeriesQueryServerInstance _queryServerInstance;
+ private final String _language;
+ private final String _serializedPlan;
+ private final TimeBuckets _timeBuckets;
+ private final Map> _planIdToSegments;
+
+ public TimeSeriesDispatchablePlan(String language, TimeSeriesQueryServerInstance queryServerInstance,
+ String serializedPlan, TimeBuckets timeBuckets, Map> planIdToSegments) {
+ _language = language;
+ _queryServerInstance = queryServerInstance;
+ _serializedPlan = serializedPlan;
+ _timeBuckets = timeBuckets;
+ _planIdToSegments = planIdToSegments;
+ }
+
+ public String getLanguage() {
+ return _language;
+ }
+
+ public TimeSeriesQueryServerInstance getQueryServerInstance() {
+ return _queryServerInstance;
+ }
+
+ public String getSerializedPlan() {
+ return _serializedPlan;
+ }
+
+ public TimeBuckets getTimeBuckets() {
+ return _timeBuckets;
+ }
+
+ public Map> getPlanIdToSegments() {
+ return _planIdToSegments;
+ }
+}
diff --git a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TimeSeriesQueryServerInstance.java b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TimeSeriesQueryServerInstance.java
new file mode 100644
index 00000000000..cf1768a7444
--- /dev/null
+++ b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TimeSeriesQueryServerInstance.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.planner.physical;
+
+import org.apache.pinot.core.transport.ServerInstance;
+
+
+public class TimeSeriesQueryServerInstance {
+ private final String _hostname;
+ private final int _queryServicePort;
+ private final int _queryMailboxPort;
+
+ public TimeSeriesQueryServerInstance(ServerInstance serverInstance) {
+ this(serverInstance.getHostname(), serverInstance.getQueryServicePort(), serverInstance.getQueryMailboxPort());
+ }
+
+ public TimeSeriesQueryServerInstance(String hostname, int queryServicePort, int queryMailboxPort) {
+ _hostname = hostname;
+ _queryServicePort = queryServicePort;
+ _queryMailboxPort = queryMailboxPort;
+ }
+
+ public String getHostname() {
+ return _hostname;
+ }
+
+ public int getQueryServicePort() {
+ return _queryServicePort;
+ }
+
+ public int getQueryMailboxPort() {
+ return _queryMailboxPort;
+ }
+}
diff --git a/pinot-timeseries/pinot-timeseries-spi/pom.xml b/pinot-timeseries/pinot-timeseries-spi/pom.xml
index c21e9971d50..1683928749d 100644
--- a/pinot-timeseries/pinot-timeseries-spi/pom.xml
+++ b/pinot-timeseries/pinot-timeseries-spi/pom.xml
@@ -25,9 +25,8 @@
4.0.0org.apache.pinot
- pinot
+ pinot-timeseries1.3.0-SNAPSHOT
- ../../pom.xmlpinot-timeseries-spi
diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
index 2c4fc045a4a..ecbc3b3f6bb 100644
--- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
+++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
@@ -49,7 +49,7 @@
*/
public class RangeTimeSeriesRequest {
/** Engine allows a Pinot cluster to support multiple Time-Series Query Languages. */
- private final String _engine;
+ private final String _language;
/** Query is the raw query sent by the caller. */
private final String _query;
/** Start time of the time-window being queried. */
@@ -63,22 +63,25 @@ public class RangeTimeSeriesRequest {
private final long _stepSeconds;
/** E2E timeout for the query. */
private final Duration _timeout;
+ /** Full query string to allow language implementations to pass custom parameters. */
+ private final String _fullQueryString;
- public RangeTimeSeriesRequest(String engine, String query, long startSeconds, long endSeconds, long stepSeconds,
- Duration timeout) {
+ public RangeTimeSeriesRequest(String language, String query, long startSeconds, long endSeconds, long stepSeconds,
+ Duration timeout, String fullQueryString) {
Preconditions.checkState(endSeconds >= startSeconds, "Invalid range. startSeconds "
+ "should be greater than or equal to endSeconds. Found startSeconds=%s and endSeconds=%s",
startSeconds, endSeconds);
- _engine = engine;
+ _language = language;
_query = query;
_startSeconds = startSeconds;
_endSeconds = endSeconds;
_stepSeconds = stepSeconds;
_timeout = timeout;
+ _fullQueryString = fullQueryString;
}
- public String getEngine() {
- return _engine;
+ public String getLanguage() {
+ return _language;
}
public String getQuery() {
@@ -100,4 +103,8 @@ public long getStepSeconds() {
public Duration getTimeout() {
return _timeout;
}
+
+ public String getFullQueryString() {
+ return _fullQueryString;
+ }
}
diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/SimpleTimeSeriesBuilderFactory.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/SimpleTimeSeriesBuilderFactory.java
index 2cece60cde3..9ed40954e5f 100644
--- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/SimpleTimeSeriesBuilderFactory.java
+++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/SimpleTimeSeriesBuilderFactory.java
@@ -28,9 +28,7 @@
public class SimpleTimeSeriesBuilderFactory extends TimeSeriesBuilderFactory {
- public static final SimpleTimeSeriesBuilderFactory INSTANCE = new SimpleTimeSeriesBuilderFactory();
-
- private SimpleTimeSeriesBuilderFactory() {
+ public SimpleTimeSeriesBuilderFactory() {
super();
}
diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
index fe7fd5be422..e1e89d624d3 100644
--- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
+++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
@@ -20,6 +20,7 @@
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.pinot.tsdb.spi.TimeBuckets;
@@ -33,11 +34,12 @@ public class TimeSeriesBlock {
private final TimeBuckets _timeBuckets;
private final Map> _seriesMap;
- public TimeSeriesBlock(TimeBuckets timeBuckets, Map> seriesMap) {
+ public TimeSeriesBlock(@Nullable TimeBuckets timeBuckets, Map> seriesMap) {
_timeBuckets = timeBuckets;
_seriesMap = seriesMap;
}
+ @Nullable
public TimeBuckets getTimeBuckets() {
return _timeBuckets;
}
diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactoryProvider.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactoryProvider.java
index 942bffdf18a..e82d3bdd444 100644
--- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactoryProvider.java
+++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactoryProvider.java
@@ -42,7 +42,8 @@ public static void init(PinotConfiguration pinotConfiguration) {
String seriesBuilderClass = pinotConfiguration
.getProperty(PinotTimeSeriesConfiguration.getSeriesBuilderFactoryConfigKey(language));
try {
- Object untypedSeriesBuilderFactory = Class.forName(seriesBuilderClass).getConstructor().newInstance();
+ Class> klass = TimeSeriesBuilderFactoryProvider.class.getClassLoader().loadClass(seriesBuilderClass);
+ Object untypedSeriesBuilderFactory = klass.getConstructor().newInstance();
if (!(untypedSeriesBuilderFactory instanceof TimeSeriesBuilderFactory)) {
throw new RuntimeException("Series builder factory class " + seriesBuilderClass
+ " does not implement SeriesBuilderFactory");
diff --git a/pinot-timeseries/pom.xml b/pinot-timeseries/pom.xml
index f49d0cb922f..47452054c8e 100644
--- a/pinot-timeseries/pom.xml
+++ b/pinot-timeseries/pom.xml
@@ -27,7 +27,6 @@
org.apache.pinotpinot1.3.0-SNAPSHOT
- ..pom
@@ -39,6 +38,7 @@
pinot-timeseries-spi
+ pinot-timeseries-planner
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/TimeSeriesEngineQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/TimeSeriesEngineQuickStart.java
new file mode 100644
index 00000000000..0b00e2dad62
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/TimeSeriesEngineQuickStart.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.tools.admin.command.QuickstartRunner;
+import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration;
+import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory;
+
+
+public class TimeSeriesEngineQuickStart extends Quickstart {
+ private static final String[] TIME_SERIES_TABLE_DIRECTORIES = new String[]{
+ "examples/batch/airlineStats",
+ "examples/batch/baseballStats",
+ "examples/batch/billing",
+ "examples/batch/dimBaseballTeams",
+ "examples/batch/githubEvents",
+ "examples/batch/githubComplexTypeEvents",
+ "examples/batch/ssb/customer",
+ "examples/batch/ssb/dates",
+ "examples/batch/ssb/lineorder",
+ "examples/batch/ssb/part",
+ "examples/batch/ssb/supplier",
+ "examples/batch/starbucksStores",
+ "examples/batch/fineFoodReviews",
+ };
+
+ @Override
+ public String[] getDefaultBatchTableDirectories() {
+ return TIME_SERIES_TABLE_DIRECTORIES;
+ }
+
+ @Override
+ public List types() {
+ return Collections.singletonList("TIME_SERIES");
+ }
+
+ @Override
+ protected Map getConfigOverrides() {
+ Map configs = new HashMap<>();
+ configs.put(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey(), "m3ql");
+ configs.put(PinotTimeSeriesConfiguration.getLogicalPlannerConfigKey("m3ql"),
+ "org.apache.pinot.tsdb.m3ql.M3TimeSeriesPlanner");
+ configs.put(PinotTimeSeriesConfiguration.getSeriesBuilderFactoryConfigKey("m3ql"),
+ SimpleTimeSeriesBuilderFactory.class.getName());
+ return configs;
+ }
+
+ @Override
+ public void execute()
+ throws Exception {
+ File quickstartTmpDir =
+ _setCustomDataDir ? _dataDir : new File(_dataDir, String.valueOf(System.currentTimeMillis()));
+ File quickstartRunnerDir = new File(quickstartTmpDir, "quickstart");
+ Preconditions.checkState(quickstartRunnerDir.mkdirs());
+ List quickstartTableRequests = bootstrapStreamTableDirectories(quickstartTmpDir);
+ final QuickstartRunner runner =
+ new QuickstartRunner(quickstartTableRequests, 1, 1, 1, 1, quickstartRunnerDir, getConfigOverrides());
+
+ startKafka();
+ startAllDataStreams(_kafkaStarter, quickstartTmpDir);
+
+ printStatus(Color.CYAN, "***** Starting Zookeeper, controller, broker, server and minion *****");
+ runner.startAll();
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ printStatus(Color.GREEN, "***** Shutting down realtime quick start *****");
+ runner.stop();
+ FileUtils.deleteDirectory(quickstartTmpDir);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }));
+
+ printStatus(Color.CYAN, "***** Bootstrap all tables *****");
+ runner.bootstrapTable();
+
+ printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to get populated *****");
+ Thread.sleep(5000);
+
+ printStatus(Color.YELLOW, "***** Realtime quickstart setup complete *****");
+ runSampleQueries(runner);
+
+ printStatus(Color.GREEN, "You can always go to http://localhost:9000 to play around in the query console");
+ }
+}
diff --git a/pom.xml b/pom.xml
index af640c1670d..5aabff1f8e6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -494,6 +494,11 @@
pinot-timeseries-spi${project.version}
+
+ org.apache.pinot
+ pinot-timeseries-planner
+ ${project.version}
+