diff --git a/pinot-broker/pom.xml b/pinot-broker/pom.xml index 881ef091fc8..085ef1e90f3 100644 --- a/pinot-broker/pom.xml +++ b/pinot-broker/pom.xml @@ -38,6 +38,14 @@ org.apache.pinot pinot-query-runtime + + org.apache.pinot + pinot-timeseries-spi + + + org.apache.pinot + pinot-timeseries-planner + diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java index fd3f58fbf25..bb9e36f204b 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java @@ -61,6 +61,7 @@ import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.response.BrokerResponse; +import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse; import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.response.broker.QueryProcessingException; import org.apache.pinot.common.utils.request.RequestUtils; @@ -69,6 +70,7 @@ import org.apache.pinot.core.auth.ManualAuthorization; import org.apache.pinot.core.auth.TargetType; import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor; +import org.apache.pinot.spi.trace.RequestContext; import org.apache.pinot.spi.trace.RequestScope; import org.apache.pinot.spi.trace.Tracing; import org.apache.pinot.spi.utils.CommonConstants.Broker.Request; @@ -236,6 +238,48 @@ public void processSqlWithMultiStageQueryEnginePost(String query, @Suspended Asy } } + @GET + @ManagedAsync + @Produces(MediaType.APPLICATION_JSON) + @Path("timeseries/api/v1/query_range") + @ApiOperation(value = "Prometheus Compatible API for Pinot's Time Series Engine") + @ManualAuthorization + public void processTimeSeriesQueryEngine(@Suspended AsyncResponse asyncResponse, + @QueryParam("language") String language, + @Context org.glassfish.grizzly.http.server.Request requestCtx, + @Context HttpHeaders httpHeaders) { + try { + try (RequestScope requestContext = Tracing.getTracer().createRequestScope()) { + String queryString = requestCtx.getQueryString(); + PinotBrokerTimeSeriesResponse response = executeTimeSeriesQuery(language, queryString, requestContext); + if (response.getErrorType() != null && !response.getErrorType().isEmpty()) { + asyncResponse.resume(Response.serverError().entity(response).build()); + return; + } + asyncResponse.resume(response); + } + } catch (Exception e) { + LOGGER.error("Caught exception while processing POST request", e); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_POST_EXCEPTIONS, 1L); + asyncResponse.resume(Response.serverError().entity( + new PinotBrokerTimeSeriesResponse("error", null, e.getClass().getSimpleName(), e.getMessage())) + .build()); + } + } + + @GET + @ManagedAsync + @Produces(MediaType.APPLICATION_JSON) + @Path("timeseries/api/v1/query") + @ApiOperation(value = "Prometheus Compatible API for Instant Queries") + @ManualAuthorization + public void processTimeSeriesInstantQuery(@Suspended AsyncResponse asyncResponse, + @Context org.glassfish.grizzly.http.server.Request requestCtx, + @Context HttpHeaders httpHeaders) { + // TODO: Not implemented yet. + asyncResponse.resume(Response.ok().entity("{}").build()); + } + @DELETE @Path("query/{queryId}") @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY) @@ -342,6 +386,11 @@ private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterI } } + private PinotBrokerTimeSeriesResponse executeTimeSeriesQuery(String language, String queryString, + RequestContext requestContext) { + return _requestHandler.handleTimeSeriesRequest(language, queryString, requestContext); + } + private static HttpRequesterIdentity makeHttpIdentity(org.glassfish.grizzly.http.server.Request context) { Multimap headers = ArrayListMultimap.create(); context.getHeaderNames().forEach(key -> context.getHeaders(key).forEach(value -> headers.put(key, value))); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java index 47007e315b6..41ab3b02804 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java @@ -48,6 +48,7 @@ import org.apache.pinot.broker.requesthandler.GrpcBrokerRequestHandler; import org.apache.pinot.broker.requesthandler.MultiStageBrokerRequestHandler; import org.apache.pinot.broker.requesthandler.SingleConnectionBrokerRequestHandler; +import org.apache.pinot.broker.requesthandler.TimeSeriesRequestHandler; import org.apache.pinot.broker.routing.BrokerRoutingManager; import org.apache.pinot.common.Utils; import org.apache.pinot.common.config.NettyConfig; @@ -71,6 +72,8 @@ import org.apache.pinot.core.transport.ListenerConfig; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; import org.apache.pinot.core.util.ListenerConfigUtil; +import org.apache.pinot.query.mailbox.MailboxService; +import org.apache.pinot.query.service.dispatch.QueryDispatcher; import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory; @@ -86,6 +89,7 @@ import org.apache.pinot.spi.utils.InstanceTypeUtils; import org.apache.pinot.spi.utils.NetUtils; import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory; +import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -326,16 +330,25 @@ public void start() _queryQuotaManager, tableCache, nettyDefaults, tlsDefaults, _serverRoutingStatsManager); } MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null; + QueryDispatcher queryDispatcher = null; if (_brokerConf.getProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED)) { // multi-stage request handler uses both Netty and GRPC ports. // worker requires both the "Netty port" for protocol transport; and "GRPC port" for mailbox transport. // TODO: decouple protocol and engine selection. + queryDispatcher = createQueryDispatcher(_brokerConf); multiStageBrokerRequestHandler = new MultiStageBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory, _queryQuotaManager, tableCache); } + TimeSeriesRequestHandler timeSeriesRequestHandler = null; + if (StringUtils.isNotBlank(_brokerConf.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey()))) { + Preconditions.checkNotNull(queryDispatcher, "Multistage Engine should be enabled to use time-series engine"); + timeSeriesRequestHandler = new TimeSeriesRequestHandler(_brokerConf, brokerId, _routingManager, + _accessControlFactory, _queryQuotaManager, tableCache, queryDispatcher); + } _brokerRequestHandler = - new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler, multiStageBrokerRequestHandler); + new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler, multiStageBrokerRequestHandler, + timeSeriesRequestHandler); _brokerRequestHandler.start(); // Enable/disable thread CPU time measurement through instance config. @@ -435,6 +448,13 @@ public void start() protected void registerExtraComponents(BrokerAdminApiApplication brokerAdminApplication) { } + private QueryDispatcher createQueryDispatcher(PinotConfiguration brokerConf) { + String hostname = _brokerConf.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME); + int port = Integer.parseInt(_brokerConf.getProperty( + CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT)); + return new QueryDispatcher(new MailboxService(hostname, port, _brokerConf)); + } + private void updateInstanceConfigAndBrokerResourceIfNeeded() { InstanceConfig instanceConfig = HelixHelper.getInstanceConfig(_participantHelixManager, _instanceId); boolean updated = HelixHelper.updateHostnamePort(instanceConfig, _hostname, _port); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java index 27c45817f40..277f5f96df6 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java @@ -29,6 +29,7 @@ import org.apache.hc.client5.http.io.HttpClientConnectionManager; import org.apache.pinot.broker.api.RequesterIdentity; import org.apache.pinot.common.response.BrokerResponse; +import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse; import org.apache.pinot.spi.trace.RequestContext; import org.apache.pinot.spi.trace.RequestScope; import org.apache.pinot.spi.trace.Tracing; @@ -59,6 +60,14 @@ default BrokerResponse handleRequest(String sql) } } + /** + * Run a query and use the time-series engine. + */ + default PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String rawQueryParamString, + RequestContext requestContext) { + throw new UnsupportedOperationException("Handler does not support Time Series requests"); + } + Map getRunningQueries(); /** diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java index b3e48f25aeb..e3a814365a9 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java @@ -27,6 +27,7 @@ import org.apache.pinot.broker.api.RequesterIdentity; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.response.BrokerResponse; +import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse; import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.common.utils.request.RequestUtils; @@ -44,11 +45,14 @@ public class BrokerRequestHandlerDelegate implements BrokerRequestHandler { private final BaseSingleStageBrokerRequestHandler _singleStageBrokerRequestHandler; private final MultiStageBrokerRequestHandler _multiStageBrokerRequestHandler; + private final TimeSeriesRequestHandler _timeSeriesRequestHandler; public BrokerRequestHandlerDelegate(BaseSingleStageBrokerRequestHandler singleStageBrokerRequestHandler, - @Nullable MultiStageBrokerRequestHandler multiStageBrokerRequestHandler) { + @Nullable MultiStageBrokerRequestHandler multiStageBrokerRequestHandler, + @Nullable TimeSeriesRequestHandler timeSeriesRequestHandler) { _singleStageBrokerRequestHandler = singleStageBrokerRequestHandler; _multiStageBrokerRequestHandler = multiStageBrokerRequestHandler; + _timeSeriesRequestHandler = timeSeriesRequestHandler; } @Override @@ -57,6 +61,9 @@ public void start() { if (_multiStageBrokerRequestHandler != null) { _multiStageBrokerRequestHandler.start(); } + if (_timeSeriesRequestHandler != null) { + _timeSeriesRequestHandler.start(); + } } @Override @@ -65,6 +72,9 @@ public void shutDown() { if (_multiStageBrokerRequestHandler != null) { _multiStageBrokerRequestHandler.shutDown(); } + if (_timeSeriesRequestHandler != null) { + _timeSeriesRequestHandler.shutDown(); + } } @Override @@ -103,6 +113,15 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption } } + @Override + public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String rawQueryParamString, + RequestContext requestContext) { + if (_timeSeriesRequestHandler != null) { + return _timeSeriesRequestHandler.handleTimeSeriesRequest(lang, rawQueryParamString, requestContext); + } + return new PinotBrokerTimeSeriesResponse("error", null, "error", "Time series query engine not enabled."); + } + @Override public Map getRunningQueries() { // TODO: add support for multiStaged engine: track running queries for multiStaged engine and combine its diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java new file mode 100644 index 00000000000..fa6b67be11e --- /dev/null +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.requesthandler; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Preconditions; +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; +import javax.annotation.Nullable; +import javax.ws.rs.core.HttpHeaders; +import org.apache.commons.lang3.StringUtils; +import org.apache.hc.client5.http.io.HttpClientConnectionManager; +import org.apache.http.NameValuePair; +import org.apache.http.client.utils.URLEncodedUtils; +import org.apache.pinot.broker.api.AccessControl; +import org.apache.pinot.broker.api.RequesterIdentity; +import org.apache.pinot.broker.broker.AccessControlFactory; +import org.apache.pinot.broker.queryquota.QueryQuotaManager; +import org.apache.pinot.broker.routing.BrokerRoutingManager; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.response.BrokerResponse; +import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse; +import org.apache.pinot.common.utils.HumanReadableDuration; +import org.apache.pinot.query.service.dispatch.QueryDispatcher; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.trace.RequestContext; +import org.apache.pinot.sql.parsers.SqlNodeAndOptions; +import org.apache.pinot.tsdb.planner.TimeSeriesQueryEnvironment; +import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan; +import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest; +import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class TimeSeriesRequestHandler extends BaseBrokerRequestHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(TimeSeriesRequestHandler.class); + private static final long DEFAULT_STEP_SECONDS = 60L; + private final TimeSeriesQueryEnvironment _queryEnvironment; + private final QueryDispatcher _queryDispatcher; + + public TimeSeriesRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager, + AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, + QueryDispatcher queryDispatcher) { + super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache); + _queryEnvironment = new TimeSeriesQueryEnvironment(config, routingManager, tableCache); + _queryEnvironment.init(config); + _queryDispatcher = queryDispatcher; + } + + @Override + protected BrokerResponse handleRequest(long requestId, String query, @Nullable SqlNodeAndOptions sqlNodeAndOptions, + JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, + @Nullable HttpHeaders httpHeaders, AccessControl accessControl) + throws Exception { + throw new IllegalArgumentException("Not supported yet"); + } + + @Override + public void start() { + LOGGER.info("Starting time-series request handler"); + } + + @Override + public void shutDown() { + LOGGER.info("Shutting down time-series request handler"); + } + + @Override + public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String rawQueryParamString, + RequestContext requestContext) { + RangeTimeSeriesRequest timeSeriesRequest = null; + try { + timeSeriesRequest = buildRangeTimeSeriesRequest(lang, rawQueryParamString); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + TimeSeriesLogicalPlanResult logicalPlanResult = _queryEnvironment.buildLogicalPlan(timeSeriesRequest); + TimeSeriesDispatchablePlan dispatchablePlan = _queryEnvironment.buildPhysicalPlan(timeSeriesRequest, requestContext, + logicalPlanResult); + return _queryDispatcher.submitAndGet(requestContext, dispatchablePlan, timeSeriesRequest.getTimeout().toMillis(), + new HashMap<>()); + } + + @Override + public Map getRunningQueries() { + // TODO: Implement this. + return Map.of(); + } + + @Override + public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, + Map serverResponses) + throws Exception { + // TODO: Implement this. + return false; + } + + private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String language, String queryParamString) + throws URISyntaxException { + List pairs = URLEncodedUtils.parse( + new URI("http://localhost?" + queryParamString), "UTF-8"); + String query = null; + Long startTs = null; + Long endTs = null; + String step = null; + String timeoutStr = null; + for (NameValuePair nameValuePair : pairs) { + switch (nameValuePair.getName()) { + case "query": + query = nameValuePair.getValue(); + break; + case "start": + startTs = Long.parseLong(nameValuePair.getValue()); + break; + case "end": + endTs = Long.parseLong(nameValuePair.getValue()); + break; + case "step": + step = nameValuePair.getValue(); + break; + case "timeout": + timeoutStr = nameValuePair.getValue(); + break; + default: + /* Okay to ignore unknown parameters since the language implementor may be using them. */ + break; + } + } + Preconditions.checkNotNull(query, "Query cannot be null"); + Preconditions.checkNotNull(startTs, "Start time cannot be null"); + Preconditions.checkNotNull(endTs, "End time cannot be null"); + Duration timeout = Duration.ofMillis(_brokerTimeoutMs); + if (StringUtils.isNotBlank(timeoutStr)) { + timeout = HumanReadableDuration.from(timeoutStr); + } + // TODO: Pass full raw query param string to the request + return new RangeTimeSeriesRequest(language, query, startTs, endTs, getStepSeconds(step), timeout, queryParamString); + } + + public static Long getStepSeconds(@Nullable String step) { + if (step == null) { + return DEFAULT_STEP_SECONDS; + } + try { + return Long.parseLong(step); + } catch (NumberFormatException ignored) { + } + return HumanReadableDuration.from(step).getSeconds(); + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java new file mode 100644 index 00000000000..8d455e09bd3 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.response; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.pinot.spi.annotations.InterfaceStability; +import org.apache.pinot.tsdb.spi.series.TimeSeries; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; + + +/** + * POJO returned by the Pinot broker in a time-series query response. Format is defined + * in the prometheus docs. + * TODO: We will evolve this until Pinot 1.3. At present we are mimicking Prometheus HTTP API. + */ +@InterfaceStability.Evolving +public class PinotBrokerTimeSeriesResponse { + public static final String SUCCESS_STATUS = "success"; + public static final String ERROR_STATUS = "error"; + public static final String METRIC_NAME_KEY = "__name__"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private String _status; + private Data _data; + private String _errorType; + private String _error; + + static { + OBJECT_MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL); + } + + @JsonCreator + public PinotBrokerTimeSeriesResponse(@JsonProperty("status") String status, @JsonProperty("data") Data data, + @JsonProperty("errorType") String errorType, @JsonProperty("error") String error) { + _status = status; + _data = data; + _errorType = errorType; + _error = error; + } + + public String getStatus() { + return _status; + } + + public Data getData() { + return _data; + } + + public String getErrorType() { + return _errorType; + } + + public String getError() { + return _error; + } + + public String serialize() + throws JsonProcessingException { + return OBJECT_MAPPER.writeValueAsString(this); + } + + public static PinotBrokerTimeSeriesResponse newSuccessResponse(Data data) { + return new PinotBrokerTimeSeriesResponse(SUCCESS_STATUS, data, null, null); + } + + public static PinotBrokerTimeSeriesResponse newErrorResponse(String errorType, String errorMessage) { + return new PinotBrokerTimeSeriesResponse(ERROR_STATUS, Data.EMPTY, errorType, errorMessage); + } + + public static PinotBrokerTimeSeriesResponse fromTimeSeriesBlock(TimeSeriesBlock seriesBlock) { + if (seriesBlock.getTimeBuckets() == null) { + throw new UnsupportedOperationException("Non-bucketed series block not supported yet"); + } + return convertBucketedSeriesBlock(seriesBlock); + } + + private static PinotBrokerTimeSeriesResponse convertBucketedSeriesBlock(TimeSeriesBlock seriesBlock) { + Long[] timeValues = Objects.requireNonNull(seriesBlock.getTimeBuckets()).getTimeBuckets(); + List result = new ArrayList<>(); + for (var listOfTimeSeries : seriesBlock.getSeriesMap().values()) { + Preconditions.checkState(!listOfTimeSeries.isEmpty(), "Received empty time-series"); + TimeSeries anySeries = listOfTimeSeries.get(0); + // TODO: Ideally we should allow "aliasing" a series, and hence we should store something like a series-name. + // Though in most contexts that would serve the same purpose as an ID. + Map metricMap = new HashMap<>(); + metricMap.put(METRIC_NAME_KEY, anySeries.getTagsSerialized()); + metricMap.putAll(anySeries.getTagKeyValuesAsMap()); + for (TimeSeries timeSeries : listOfTimeSeries) { + Object[][] values = new Object[timeValues.length][]; + for (int i = 0; i < timeValues.length; i++) { + Object nullableValue = timeSeries.getValues()[i]; + values[i] = new Object[]{timeValues[i], nullableValue == null ? null : nullableValue.toString()}; + } + result.add(new PinotBrokerTimeSeriesResponse.Value(metricMap, values)); + } + } + PinotBrokerTimeSeriesResponse.Data data = PinotBrokerTimeSeriesResponse.Data.newMatrix(result); + return PinotBrokerTimeSeriesResponse.newSuccessResponse(data); + } + + public static class Data { + public static final Data EMPTY = new Data("", new ArrayList<>()); + private final String _resultType; + private final List _result; + + @JsonCreator + public Data(@JsonProperty("resultType") String resultType, @JsonProperty("result") List result) { + _resultType = resultType; + _result = result; + } + + public String getResultType() { + return _resultType; + } + + public List getResult() { + return _result; + } + + public static Data newMatrix(List result) { + return new Data("matrix", result); + } + } + + public static class Value { + private final Map _metric; + private final Object[][] _values; + + @JsonCreator + public Value(@JsonProperty("metric") Map metric, @JsonProperty("values") Object[][] values) { + _metric = metric; + _values = values; + } + + public Map getMetric() { + return _metric; + } + + public Object[][] getValues() { + return _values; + } + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/HumanReadableDuration.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/HumanReadableDuration.java new file mode 100644 index 00000000000..a224676b392 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/HumanReadableDuration.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils; + +import com.google.common.base.Preconditions; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +public class HumanReadableDuration { + private static final Pattern DURATION_PATTERN = Pattern.compile("(\\d+)\\s*(\\S+)"); + private static final Map SUFFIXES = createSuffixes(); + + private HumanReadableDuration() { + } + + private static Map createSuffixes() { + Map suffixes = new HashMap<>(); + suffixes.put("ns", TimeUnit.NANOSECONDS); + suffixes.put("nanosecond", TimeUnit.NANOSECONDS); + suffixes.put("nanoseconds", TimeUnit.NANOSECONDS); + suffixes.put("us", TimeUnit.MICROSECONDS); + suffixes.put("microsecond", TimeUnit.MICROSECONDS); + suffixes.put("microseconds", TimeUnit.MICROSECONDS); + suffixes.put("ms", TimeUnit.MILLISECONDS); + suffixes.put("millisecond", TimeUnit.MILLISECONDS); + suffixes.put("milliseconds", TimeUnit.MILLISECONDS); + suffixes.put("s", TimeUnit.SECONDS); + suffixes.put("second", TimeUnit.SECONDS); + suffixes.put("seconds", TimeUnit.SECONDS); + suffixes.put("m", TimeUnit.MINUTES); + suffixes.put("minute", TimeUnit.MINUTES); + suffixes.put("minutes", TimeUnit.MINUTES); + suffixes.put("h", TimeUnit.HOURS); + suffixes.put("hour", TimeUnit.HOURS); + suffixes.put("hours", TimeUnit.HOURS); + suffixes.put("d", TimeUnit.DAYS); + suffixes.put("day", TimeUnit.DAYS); + suffixes.put("days", TimeUnit.DAYS); + return suffixes; + } + + public static Duration from(String durationStr) { + final Matcher matcher = DURATION_PATTERN.matcher(durationStr); + Preconditions.checkArgument(matcher.matches(), "Invalid duration string: %s", durationStr); + + final long count = Long.parseLong(matcher.group(1)); + final TimeUnit unit = SUFFIXES.get(matcher.group(2)); + if (unit == null) { + throw new IllegalArgumentException(String.format("Unknown time-unit: %s", matcher.group(2))); + } + return Duration.of(count, unit.toChronoUnit()); + } +} diff --git a/pinot-common/src/main/proto/worker.proto b/pinot-common/src/main/proto/worker.proto index a643c53760f..8fe3fadab55 100644 --- a/pinot-common/src/main/proto/worker.proto +++ b/pinot-common/src/main/proto/worker.proto @@ -25,6 +25,8 @@ service PinotQueryWorker { // Dispatch a QueryRequest to a PinotQueryWorker rpc Submit(QueryRequest) returns (QueryResponse); + rpc SubmitTimeSeries(TimeSeriesQueryRequest) returns (TimeSeriesResponse); + rpc Cancel(CancelRequest) returns (CancelResponse); } @@ -49,6 +51,16 @@ message QueryResponse { bytes payload = 2; } +message TimeSeriesQueryRequest { + bytes dispatchPlan = 1; + map metadata = 2; +} + +message TimeSeriesResponse { + bytes payload = 1; + map metadata = 2; +} + message StagePlan { bytes rootNode = 1; // Serialized StageNode StageMetadata stageMetadata = 2; diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java index 6db1d6099a1..33d83843f62 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java @@ -171,7 +171,7 @@ public void setUp() // Setup time series builder factory TimeSeriesBuilderFactoryProvider.registerSeriesBuilderFactory(TIME_SERIES_ENGINE_NAME, - SimpleTimeSeriesBuilderFactory.INSTANCE); + new SimpleTimeSeriesBuilderFactory()); } @Test diff --git a/pinot-distribution/pinot-assembly.xml b/pinot-distribution/pinot-assembly.xml index 0573e11e383..654417f3b3b 100644 --- a/pinot-distribution/pinot-assembly.xml +++ b/pinot-distribution/pinot-assembly.xml @@ -227,6 +227,16 @@ + + + + ${pinot.root}/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/target/pinot-timeseries-m3ql-${project.version}-shaded.jar + + + plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/pinot-timeseries-m3ql-${project.version}-shaded.jar + + + diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/pom.xml b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/pom.xml new file mode 100644 index 00000000000..b853e9f3a8d --- /dev/null +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/pom.xml @@ -0,0 +1,72 @@ + + + + 4.0.0 + + org.apache.pinot + pinot-timeseries-lang + 1.3.0-SNAPSHOT + + + pinot-timeseries-m3ql + + + ${basedir}/../../.. + 11 + 11 + UTF-8 + + + + + org.apache.pinot + pinot-timeseries-spi + provided + + + com.google.guava + guava + + + org.testng + testng + test + + + + + + build-shaded-jar + + + skipShade + !true + + + + package + + + + diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/Aggregations.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/Aggregations.java new file mode 100644 index 00000000000..392a83d843c --- /dev/null +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/Aggregations.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.m3ql; + +public class Aggregations { +} diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/Constants.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/Constants.java new file mode 100644 index 00000000000..3878a2d5dd4 --- /dev/null +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/Constants.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.m3ql; + +import java.time.Duration; + + +public class Constants { + public static final String LANGUAGE = "m3ql"; + public static final Duration DEFAULT_RESOLUTION = Duration.ofMinutes(1); + + private Constants() { + } +} diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java new file mode 100644 index 00000000000..f355e464ebc --- /dev/null +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.m3ql; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.pinot.tsdb.m3ql.parser.Tokenizer; +import org.apache.pinot.tsdb.m3ql.plan.KeepLastValuePlanNode; +import org.apache.pinot.tsdb.m3ql.plan.TransformNullPlanNode; +import org.apache.pinot.tsdb.m3ql.time.TimeBucketComputer; +import org.apache.pinot.tsdb.spi.AggInfo; +import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanResult; +import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanner; +import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; +import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode; + + +public class M3TimeSeriesPlanner implements TimeSeriesLogicalPlanner { + @Override + public void init(Map config) { + } + + @Override + public TimeSeriesLogicalPlanResult plan(RangeTimeSeriesRequest request) { + if (!request.getLanguage().equals(Constants.LANGUAGE)) { + throw new IllegalArgumentException(String.format("Invalid engine id: %s. Expected: %s", request.getLanguage(), + Constants.LANGUAGE)); + } + // Step-1: Parse and create a logical plan tree. + BaseTimeSeriesPlanNode planNode = planQuery(request); + // Step-2: Compute the time-buckets. + TimeBuckets timeBuckets = TimeBucketComputer.compute(planNode, request); + return new TimeSeriesLogicalPlanResult(planNode, timeBuckets); + } + + public BaseTimeSeriesPlanNode planQuery(RangeTimeSeriesRequest request) { + PlanIdGenerator planIdGenerator = new PlanIdGenerator(); + Tokenizer tokenizer = new Tokenizer(request.getQuery()); + List> commands = tokenizer.tokenize(); + Preconditions.checkState(commands.size() > 1, "At least two commands required. " + + "Query should start with a fetch followed by an aggregation."); + BaseTimeSeriesPlanNode lastNode = null; + AggInfo aggInfo = null; + List groupByColumns = new ArrayList<>(); + BaseTimeSeriesPlanNode rootNode = null; + for (int commandId = commands.size() - 1; commandId >= 0; commandId--) { + String command = commands.get(commandId).get(0); + Preconditions.checkState((command.equals("fetch") && commandId == 0) + || (!command.equals("fetch") && commandId > 0), + "fetch should be the first command"); + List children = new ArrayList<>(); + BaseTimeSeriesPlanNode currentNode = null; + switch (command) { + case "fetch": + List tokens = commands.get(commandId).subList(1, commands.get(commandId).size()); + currentNode = handleFetchNode(planIdGenerator.generateId(), tokens, children, aggInfo, groupByColumns); + break; + case "sum": + case "min": + case "max": + Preconditions.checkState(commandId == 1, + "Aggregation should be the second command (fetch should be first)"); + Preconditions.checkState(aggInfo == null, "Aggregation already set. Only single agg allowed."); + aggInfo = new AggInfo(command.toUpperCase(Locale.ENGLISH)); + if (commands.get(commandId).size() > 1) { + String[] cols = commands.get(commandId).get(1).split(","); + groupByColumns = Stream.of(cols).map(String::trim).collect(Collectors.toList()); + } + break; + case "keepLastValue": + currentNode = new KeepLastValuePlanNode(planIdGenerator.generateId(), children); + break; + case "transformNull": + Double defaultValue = TransformNullPlanNode.DEFAULT_VALUE; + if (commands.get(commandId).size() > 1) { + defaultValue = Double.parseDouble(commands.get(commandId).get(1)); + } + currentNode = new TransformNullPlanNode(planIdGenerator.generateId(), defaultValue, children); + break; + default: + throw new IllegalArgumentException("Unknown function: " + command); + } + if (currentNode != null) { + if (rootNode == null) { + rootNode = currentNode; + } + if (lastNode != null) { + lastNode.addChildNode(currentNode); + } + lastNode = currentNode; + } + } + return rootNode; + } + + public BaseTimeSeriesPlanNode handleFetchNode(String planId, List tokens, + List children, AggInfo aggInfo, List groupByColumns) { + Preconditions.checkState(tokens.size() % 2 == 0, "Mismatched args"); + String tableName = null; + String timeColumn = null; + TimeUnit timeUnit = null; + String filter = ""; + String valueExpr = null; + for (int idx = 0; idx < tokens.size(); idx += 2) { + String key = tokens.get(idx); + String value = tokens.get(idx + 1); + switch (key) { + case "table": + tableName = value.replaceAll("\"", ""); + break; + case "ts_column": + timeColumn = value.replaceAll("\"", ""); + break; + case "ts_unit": + timeUnit = TimeUnit.valueOf(value.replaceAll("\"", "").toUpperCase(Locale.ENGLISH)); + break; + case "filter": + filter = value.replaceAll("\"", ""); + break; + case "value": + valueExpr = value.replaceAll("\"", ""); + break; + default: + throw new IllegalArgumentException("Unknown key: " + key); + } + } + Preconditions.checkNotNull(tableName, "Table name not set. Set via table="); + Preconditions.checkNotNull(timeColumn, "Time column not set. Set via time_col="); + Preconditions.checkNotNull(timeUnit, "Time unit not set. Set via time_unit="); + Preconditions.checkNotNull(valueExpr, "Value expression not set. Set via value="); + return new ScanFilterAndProjectPlanNode(planId, children, tableName, timeColumn, timeUnit, 0L, filter, valueExpr, + aggInfo, groupByColumns); + } +} diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/PlanIdGenerator.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/PlanIdGenerator.java new file mode 100644 index 00000000000..1a719081deb --- /dev/null +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/PlanIdGenerator.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.m3ql; + +public class PlanIdGenerator { + private Integer _id = 0; + + public PlanIdGenerator() { + } + + public String generateId() { + return "plan_" + (_id++); + } +} diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/KeepLastValueOperator.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/KeepLastValueOperator.java new file mode 100644 index 00000000000..0330dff13b1 --- /dev/null +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/KeepLastValueOperator.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.m3ql.operator; + +import java.util.List; +import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; +import org.apache.pinot.tsdb.spi.series.TimeSeries; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; + + +public class KeepLastValueOperator extends BaseTimeSeriesOperator { + public KeepLastValueOperator(List childOperators) { + super(childOperators); + } + + @Override + public TimeSeriesBlock getNextBlock() { + TimeSeriesBlock seriesBlock = _childOperators.get(0).nextBlock(); + seriesBlock.getSeriesMap().values().parallelStream().forEach(unionOfSeries -> { + for (TimeSeries series : unionOfSeries) { + Double[] values = series.getValues(); + Double lastValue = null; + for (int index = 0; index < values.length; index++) { + if (values[index] != null) { + lastValue = values[index]; + } else { + values[index] = lastValue; + } + } + } + }); + return seriesBlock; + } + + @Override + public String getExplainName() { + return "KEEP_LAST_VALUE"; + } +} diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/TransformNullOperator.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/TransformNullOperator.java new file mode 100644 index 00000000000..ca971c932cb --- /dev/null +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/TransformNullOperator.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.m3ql.operator; + +import java.util.List; +import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; +import org.apache.pinot.tsdb.spi.series.TimeSeries; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; + + +public class TransformNullOperator extends BaseTimeSeriesOperator { + private final Double _defaultValue; + + public TransformNullOperator(Double defaultValue, List childOperators) { + super(childOperators); + _defaultValue = defaultValue; + } + + @Override + public TimeSeriesBlock getNextBlock() { + TimeSeriesBlock seriesBlock = _childOperators.get(0).nextBlock(); + seriesBlock.getSeriesMap().values().parallelStream().forEach(unionOfSeries -> { + for (TimeSeries series : unionOfSeries) { + Double[] values = series.getValues(); + for (int index = 0; index < values.length; index++) { + values[index] = values[index] == null ? _defaultValue : values[index]; + } + } + }); + return seriesBlock; + } + + @Override + public String getExplainName() { + return "TRANSFORM_NULL"; + } +} diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/parser/Tokenizer.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/parser/Tokenizer.java new file mode 100644 index 00000000000..cf38b501b00 --- /dev/null +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/parser/Tokenizer.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.m3ql.parser; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; + + +/** +* TODO: Dummy implementation. Will be switched out with a proper implementation soon. +*/ +public class Tokenizer { + private final String _query; + + public Tokenizer(String query) { + _query = query; + } + + public List> tokenize() { + String[] pipelines = _query.split("\\|"); + List> result = new ArrayList<>(); + for (String pipeline : pipelines) { + String command = pipeline.trim().substring(0, pipeline.indexOf("{")); + if (command.equals("fetch")) { + result.add(consumeFetch(pipeline.trim())); + } else { + result.add(consumeGeneric(pipeline.trim())); + } + } + return result; + } + + private List consumeFetch(String pipeline) { + pipeline = pipeline.trim(); + String command = pipeline.substring(0, 5); + Preconditions.checkState(command.equals("fetch"), "Invalid command: %s", command); + pipeline = pipeline.substring(5).trim(); + int start = pipeline.indexOf("{"); + int end = pipeline.indexOf("}"); + String args = pipeline.substring(start + 1, end); + List result = new ArrayList<>(); + result.add("fetch"); + int indexOfEquals = args.indexOf("="); + while (indexOfEquals != -1) { + args = args.strip(); + int equalIndex = args.indexOf("="); + int indexOfQuotes = args.indexOf("\""); + int lastQuote = indexOfQuotes + 1 + args.substring(indexOfQuotes + 1).indexOf("\""); + String key = args.substring(0, equalIndex); + String value = args.substring(indexOfQuotes + 1, lastQuote); + args = args.substring(lastQuote + 1); + result.add(key); + result.add(value); + if (args.strip().startsWith(",")) { + args = args.strip().substring(1); + } + indexOfEquals = args.indexOf("="); + } + return result; + } + + private List consumeGeneric(String pipeline) { + List result = new ArrayList<>(); + int indexOfOpenBracket = pipeline.indexOf("{"); + int indexOfClosedBracket = pipeline.indexOf("}"); + result.add(pipeline.substring(0, indexOfOpenBracket)); + result.add(pipeline.substring(indexOfOpenBracket + 1, indexOfClosedBracket)); + return result; + } +} diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/plan/KeepLastValuePlanNode.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/plan/KeepLastValuePlanNode.java new file mode 100644 index 00000000000..26359269cd0 --- /dev/null +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/plan/KeepLastValuePlanNode.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.m3ql.plan; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import org.apache.pinot.tsdb.m3ql.operator.KeepLastValueOperator; +import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; +import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; + + +public class KeepLastValuePlanNode extends BaseTimeSeriesPlanNode { + @JsonCreator + public KeepLastValuePlanNode(@JsonProperty("id") String id, + @JsonProperty("children") List children) { + super(id, children); + } + + @Override + public String getKlass() { + return KeepLastValuePlanNode.class.getName(); + } + + @Override + public String getExplainName() { + return "KEEP_LAST_VALUE"; + } + + @Override + public BaseTimeSeriesOperator run() { + BaseTimeSeriesOperator childOperator = _children.get(0).run(); + return new KeepLastValueOperator(List.of(childOperator)); + } +} diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/plan/TransformNullPlanNode.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/plan/TransformNullPlanNode.java new file mode 100644 index 00000000000..c98e4f421e6 --- /dev/null +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/plan/TransformNullPlanNode.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.m3ql.plan; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import java.util.List; +import org.apache.pinot.tsdb.m3ql.operator.TransformNullOperator; +import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; +import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; + + +public class TransformNullPlanNode extends BaseTimeSeriesPlanNode { + public static final Double DEFAULT_VALUE = 0.0; + private final Double _defaultValue; + + @JsonCreator + public TransformNullPlanNode(@JsonProperty("id") String id, @JsonProperty("defaultValue") Double defaultValue, + @JsonProperty("children") List children) { + super(id, children); + _defaultValue = defaultValue; + } + + public Double getDefaultValue() { + return _defaultValue; + } + + @Override + public String getKlass() { + return TransformNullPlanNode.class.getName(); + } + + @Override + public String getExplainName() { + return "TRANSFORM_NULL"; + } + + @Override + public BaseTimeSeriesOperator run() { + Preconditions.checkState(_children.size() == 1, + "TransformNullPlanNode should have only 1 child, got: %s", _children.size()); + BaseTimeSeriesOperator childOperator = _children.get(0).run(); + return new TransformNullOperator(_defaultValue, ImmutableList.of(childOperator)); + } +} diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/QueryTimeBoundaryConstraints.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/QueryTimeBoundaryConstraints.java new file mode 100644 index 00000000000..cfb5a8d8df7 --- /dev/null +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/QueryTimeBoundaryConstraints.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.m3ql.time; + +import java.util.HashSet; +import java.util.Set; + + +public class QueryTimeBoundaryConstraints { + private final Set _divisors = new HashSet<>(); + private long _leftExtensionSeconds = 0; + private long _rightExtensionSeconds = 0; + private boolean _leftAligned = false; + + public Set getDivisors() { + return _divisors; + } + + public long getLeftExtensionSeconds() { + return _leftExtensionSeconds; + } + + public void setLeftExtensionSeconds(long leftExtensionSeconds) { + _leftExtensionSeconds = leftExtensionSeconds; + } + + public long getRightExtensionSeconds() { + return _rightExtensionSeconds; + } + + public void setRightExtensionSeconds(long rightExtensionSeconds) { + _rightExtensionSeconds = rightExtensionSeconds; + } + + public boolean isLeftAligned() { + return _leftAligned; + } + + public void setLeftAligned(boolean leftAligned) { + _leftAligned = leftAligned; + } + + public static QueryTimeBoundaryConstraints merge(QueryTimeBoundaryConstraints left, + QueryTimeBoundaryConstraints right) { + QueryTimeBoundaryConstraints merged = new QueryTimeBoundaryConstraints(); + merged._divisors.addAll(left._divisors); + merged._divisors.addAll(right._divisors); + merged._leftExtensionSeconds = Math.max(left._leftExtensionSeconds, right._leftExtensionSeconds); + merged._rightExtensionSeconds = Math.max(left._rightExtensionSeconds, right._rightExtensionSeconds); + if (left._leftAligned != right._leftAligned) { + throw new IllegalArgumentException(String.format("Cannot merge constraints with different alignments. " + + "Alignment from plan node on the left and right are %s and %s respectively", + left._leftAligned, right._leftAligned)); + } + merged._leftAligned = left._leftAligned; + return merged; + } +} diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/TimeBucketComputer.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/TimeBucketComputer.java new file mode 100644 index 00000000000..d79d5c6a8c3 --- /dev/null +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/TimeBucketComputer.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.m3ql.time; + +import java.time.Duration; +import java.util.Collection; +import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; +import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode; + + +public class TimeBucketComputer { + private TimeBucketComputer() { + } + + public static TimeBuckets compute(BaseTimeSeriesPlanNode planNode, RangeTimeSeriesRequest request) { + QueryTimeBoundaryConstraints constraints = process(planNode, request); + long newStartTime = request.getStartSeconds() - constraints.getLeftExtensionSeconds(); + long newEndTime = request.getEndSeconds() + constraints.getRightExtensionSeconds(); + long lcmOfDivisors = lcm(constraints.getDivisors()); + long roundStartTimeDelta = (lcmOfDivisors - (newEndTime - newStartTime) % lcmOfDivisors) % lcmOfDivisors; + newStartTime -= roundStartTimeDelta; + int numItems = (int) ((newEndTime - newStartTime) / request.getStepSeconds()); + return TimeBuckets.ofSeconds(newStartTime, Duration.ofSeconds(request.getStepSeconds()), numItems); + } + + public static QueryTimeBoundaryConstraints process(BaseTimeSeriesPlanNode planNode, RangeTimeSeriesRequest request) { + if (planNode instanceof ScanFilterAndProjectPlanNode) { + QueryTimeBoundaryConstraints constraints = new QueryTimeBoundaryConstraints(); + constraints.getDivisors().add(request.getStepSeconds()); + return constraints; + } + QueryTimeBoundaryConstraints constraints = new QueryTimeBoundaryConstraints(); + for (BaseTimeSeriesPlanNode childNode : planNode.getChildren()) { + QueryTimeBoundaryConstraints childConstraints = process(childNode, request); + constraints = QueryTimeBoundaryConstraints.merge(constraints, childConstraints); + } + return constraints; + } + + public static long lcm(Collection values) { + long result = 1; + for (long value : values) { + result = lcm(result, value); + } + return result; + } + + public static long lcm(long a, long b) { + return a * b / gcd(a, b); + } + + public static long gcd(long a, long b) { + if (a < b) { + return gcd(b, a); + } + if (b == 0) { + return a; + } + return gcd(b, a % b); + } +} diff --git a/pinot-plugins/pinot-timeseries-lang/pom.xml b/pinot-plugins/pinot-timeseries-lang/pom.xml new file mode 100644 index 00000000000..98dc39789f8 --- /dev/null +++ b/pinot-plugins/pinot-timeseries-lang/pom.xml @@ -0,0 +1,45 @@ + + + + 4.0.0 + + org.apache.pinot + pinot-plugins + 1.3.0-SNAPSHOT + + + pinot-timeseries-lang + pom + Pluggable Pinot Time Series Languages + + + ${basedir}/../.. + pinot-ts-languages + + + + pinot-timeseries-m3ql + + + \ No newline at end of file diff --git a/pinot-plugins/pom.xml b/pinot-plugins/pom.xml index 62b6848299c..fa873b34a69 100644 --- a/pinot-plugins/pom.xml +++ b/pinot-plugins/pom.xml @@ -25,7 +25,6 @@ pinot org.apache.pinot 1.3.0-SNAPSHOT - .. pinot-plugins pom @@ -48,6 +47,7 @@ pinot-segment-uploader pinot-environment assembly-descriptor + pinot-timeseries-lang diff --git a/pinot-query-runtime/pom.xml b/pinot-query-runtime/pom.xml index 303244b0b0c..14c2f0e085c 100644 --- a/pinot-query-runtime/pom.xml +++ b/pinot-query-runtime/pom.xml @@ -40,6 +40,14 @@ org.apache.pinot pinot-query-planner + + org.apache.pinot + pinot-timeseries-spi + + + org.apache.pinot + pinot-timeseries-planner + org.apache.pinot diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index 817062e9508..49a45d1c2b8 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -19,15 +19,25 @@ package org.apache.pinot.query.runtime; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; +import io.grpc.stub.StreamObserver; +import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.annotation.Nullable; +import org.apache.commons.lang3.StringUtils; import org.apache.helix.HelixManager; import org.apache.pinot.common.datatable.StatMap; import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.proto.Worker; +import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse; import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.query.executor.QueryExecutor; @@ -49,12 +59,23 @@ import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor; import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult; import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestUtils; +import org.apache.pinot.query.runtime.timeseries.PhysicalTimeSeriesPlanVisitor; +import org.apache.pinot.query.runtime.timeseries.TimeSeriesExecutionContext; import org.apache.pinot.spi.accounting.ThreadExecutionContext; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.executor.ExecutorServiceUtils; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey; import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode; +import org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants.WorkerRequestMetadataKeys; +import org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants.WorkerResponseMetadataKeys; +import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; +import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; +import org.apache.pinot.tsdb.spi.plan.serde.TimeSeriesPlanSerde; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactoryProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,6 +149,10 @@ public void init(PinotConfiguration config, InstanceDataManager instanceDataMana } catch (Exception e) { throw new RuntimeException(e); } + if (StringUtils.isNotBlank(config.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey()))) { + PhysicalTimeSeriesPlanVisitor.INSTANCE.init(_leafQueryExecutor, _executorService, serverMetrics); + TimeSeriesBuilderFactoryProvider.init(config); + } LOGGER.info("Initialized QueryRunner with hostname: {}, port: {}", hostname, port); } @@ -210,6 +235,69 @@ public void processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan, Map _opChainScheduler.register(opChain); } + /** + * Receives a serialized plan sent by the broker, and runs it to completion, blocking the thread until the execution + * is complete. + * TODO: This design is at odds with MSE because MSE runs even the leaf stage via OpChainSchedulerService. + * However, both OpChain scheduler and this method use the same ExecutorService. + */ + public void processTimeSeriesQuery(String serializedPlan, Map metadata, + StreamObserver responseObserver) { + // Define a common way to handle errors. + final Consumer handleErrors = (t) -> { + Map errorMetadata = new HashMap<>(); + errorMetadata.put(WorkerResponseMetadataKeys.ERROR_TYPE, t.getClass().getSimpleName()); + errorMetadata.put(WorkerResponseMetadataKeys.ERROR_MESSAGE, t.getMessage()); + responseObserver.onNext(Worker.TimeSeriesResponse.newBuilder().putAllMetadata(errorMetadata).build()); + responseObserver.onCompleted(); + }; + try { + // Deserialize plan, and compile to create a tree of operators. + BaseTimeSeriesPlanNode rootNode = TimeSeriesPlanSerde.deserialize(serializedPlan); + TimeSeriesExecutionContext context = new TimeSeriesExecutionContext( + metadata.get(WorkerRequestMetadataKeys.LANGUAGE), extractTimeBuckets(metadata), + extractPlanToSegmentMap(metadata)); + BaseTimeSeriesOperator operator = PhysicalTimeSeriesPlanVisitor.INSTANCE.compile(rootNode, context); + // Run the operator using the same executor service as OpChainSchedulerService + _executorService.submit(() -> { + try { + TimeSeriesBlock seriesBlock = operator.nextBlock(); + Worker.TimeSeriesResponse response = Worker.TimeSeriesResponse.newBuilder() + .setPayload(ByteString.copyFrom( + PinotBrokerTimeSeriesResponse.fromTimeSeriesBlock(seriesBlock).serialize(), + StandardCharsets.UTF_8)) + .build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (Throwable t) { + handleErrors.accept(t); + } + }); + } catch (Throwable t) { + handleErrors.accept(t); + } + } + + public TimeBuckets extractTimeBuckets(Map metadataMap) { + long startTimeSeconds = Long.parseLong(metadataMap.get(WorkerRequestMetadataKeys.START_TIME_SECONDS)); + long windowSeconds = Long.parseLong(metadataMap.get(WorkerRequestMetadataKeys.WINDOW_SECONDS)); + int numElements = Integer.parseInt(metadataMap.get(WorkerRequestMetadataKeys.NUM_ELEMENTS)); + return TimeBuckets.ofSeconds(startTimeSeconds, Duration.ofSeconds(windowSeconds), numElements); + } + + public Map> extractPlanToSegmentMap(Map metadataMap) { + Map> result = new HashMap<>(); + for (var entry : metadataMap.entrySet()) { + if (WorkerRequestMetadataKeys.isKeySegmentList(entry.getKey())) { + String planId = WorkerRequestMetadataKeys.decodeSegmentListKey(entry.getKey()); + String[] segments = entry.getValue().split(","); + result.put(planId, + Stream.of(segments).map(String::strip).collect(Collectors.toList())); + } + } + return result; + } + private Map consolidateMetadata(Map customProperties, Map requestMetadata) { Map opChainMetadata = new HashMap<>(); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java new file mode 100644 index 00000000000..74da11c5b87 --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.timeseries; + +import com.google.common.base.Preconditions; +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import org.apache.commons.collections.MapUtils; +import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; +import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; +import org.apache.pinot.core.query.executor.QueryExecutor; +import org.apache.pinot.core.query.request.ServerQueryRequest; +import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; + + +public class LeafTimeSeriesOperator extends BaseTimeSeriesOperator { + private final ServerQueryRequest _request; + private final QueryExecutor _queryExecutor; + private final ExecutorService _executorService; + + public LeafTimeSeriesOperator(ServerQueryRequest serverQueryRequest, QueryExecutor queryExecutor, + ExecutorService executorService) { + super(Collections.emptyList()); + _request = serverQueryRequest; + _queryExecutor = queryExecutor; + _executorService = executorService; + } + + @Override + public TimeSeriesBlock getNextBlock() { + Preconditions.checkNotNull(_queryExecutor, "Leaf time series operator has not been initialized"); + InstanceResponseBlock instanceResponseBlock = _queryExecutor.execute(_request, _executorService); + assert instanceResponseBlock.getResultsBlock() instanceof TimeSeriesResultsBlock; + if (MapUtils.isNotEmpty(instanceResponseBlock.getExceptions())) { + // TODO: Return error in the TimeSeriesBlock instead? + String oneException = instanceResponseBlock.getExceptions().values().iterator().next(); + throw new RuntimeException(oneException); + } + return ((TimeSeriesResultsBlock) instanceResponseBlock.getResultsBlock()).getTimeSeriesBlock(); + } + + @Override + public String getExplainName() { + return "TIME_SERIES_LEAF_STAGE_OPERATOR"; + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java new file mode 100644 index 00000000000..258e41c51eb --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.timeseries; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FilterContext; +import org.apache.pinot.common.request.context.RequestContextUtils; +import org.apache.pinot.common.request.context.TimeSeriesContext; +import org.apache.pinot.core.query.executor.QueryExecutor; +import org.apache.pinot.core.query.request.ServerQueryRequest; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.sql.parsers.CalciteSqlParser; +import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; +import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; +import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode; + + +public class PhysicalTimeSeriesPlanVisitor { + public static final PhysicalTimeSeriesPlanVisitor INSTANCE = new PhysicalTimeSeriesPlanVisitor(); + + private QueryExecutor _queryExecutor; + private ExecutorService _executorService; + private ServerMetrics _serverMetrics; + + private PhysicalTimeSeriesPlanVisitor() { + } + + public void init(QueryExecutor queryExecutor, ExecutorService executorService, ServerMetrics serverMetrics) { + _queryExecutor = queryExecutor; + _executorService = executorService; + _serverMetrics = serverMetrics; + } + + public BaseTimeSeriesOperator compile(BaseTimeSeriesPlanNode rootNode, TimeSeriesExecutionContext context) { + // Step-1: Replace scan filter project with our physical plan node with Pinot Core and Runtime context + initLeafPlanNode(rootNode, context); + // Step-2: Trigger recursive operator generation + return rootNode.run(); + } + + public void initLeafPlanNode(BaseTimeSeriesPlanNode planNode, TimeSeriesExecutionContext context) { + for (int index = 0; index < planNode.getChildren().size(); index++) { + BaseTimeSeriesPlanNode childNode = planNode.getChildren().get(index); + if (childNode instanceof ScanFilterAndProjectPlanNode) { + ScanFilterAndProjectPlanNode sfpNode = (ScanFilterAndProjectPlanNode) childNode; + List segments = context.getPlanIdToSegmentsMap().get(sfpNode.getId()); + ServerQueryRequest serverQueryRequest = compileLeafServerQueryRequest(sfpNode, segments, context); + TimeSeriesPhysicalTableScan physicalTableScan = new TimeSeriesPhysicalTableScan(childNode.getId(), + serverQueryRequest, _queryExecutor, _executorService); + planNode.getChildren().set(index, physicalTableScan); + } else { + initLeafPlanNode(childNode, context); + } + } + } + + public ServerQueryRequest compileLeafServerQueryRequest(ScanFilterAndProjectPlanNode sfpNode, List segments, + TimeSeriesExecutionContext context) { + return new ServerQueryRequest(compileQueryContext(sfpNode, context), + segments, /* TODO: Pass metadata from request */ Collections.emptyMap(), _serverMetrics); + } + + public QueryContext compileQueryContext(ScanFilterAndProjectPlanNode sfpNode, TimeSeriesExecutionContext context) { + FilterContext filterContext = + RequestContextUtils.getFilter(CalciteSqlParser.compileToExpression( + sfpNode.getEffectiveFilter(context.getInitialTimeBuckets()))); + List groupByExpressions = sfpNode.getGroupByColumns().stream() + .map(ExpressionContext::forIdentifier).collect(Collectors.toList()); + ExpressionContext valueExpression = RequestContextUtils.getExpression(sfpNode.getValueExpression()); + TimeSeriesContext timeSeriesContext = new TimeSeriesContext(context.getLanguage(), + sfpNode.getTimeColumn(), + sfpNode.getTimeUnit(), context.getInitialTimeBuckets(), sfpNode.getOffset(), + valueExpression, + sfpNode.getAggInfo()); + return new QueryContext.Builder() + .setTableName(sfpNode.getTableName()) + .setFilter(filterContext) + .setGroupByExpressions(groupByExpressions) + .setSelectExpressions(Collections.emptyList()) + .setQueryOptions(Collections.emptyMap()) + .setAliasList(Collections.emptyList()) + .setTimeSeriesContext(timeSeriesContext) + .setLimit(Integer.MAX_VALUE) + .build(); + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExecutionContext.java new file mode 100644 index 00000000000..4d093af3edf --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExecutionContext.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.timeseries; + +import java.util.List; +import java.util.Map; +import org.apache.pinot.tsdb.spi.TimeBuckets; + + +public class TimeSeriesExecutionContext { + private final String _language; + private final TimeBuckets _initialTimeBuckets; + private final Map> _planIdToSegmentsMap; + + public TimeSeriesExecutionContext(String language, TimeBuckets initialTimeBuckets, + Map> planIdToSegmentsMap) { + _language = language; + _initialTimeBuckets = initialTimeBuckets; + _planIdToSegmentsMap = planIdToSegmentsMap; + } + + public String getLanguage() { + return _language; + } + + public TimeBuckets getInitialTimeBuckets() { + return _initialTimeBuckets; + } + + public Map> getPlanIdToSegmentsMap() { + return _planIdToSegmentsMap; + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesPhysicalTableScan.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesPhysicalTableScan.java new file mode 100644 index 00000000000..272e5564983 --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesPhysicalTableScan.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.timeseries; + +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import org.apache.pinot.core.query.executor.QueryExecutor; +import org.apache.pinot.core.query.request.ServerQueryRequest; +import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; +import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; + + +public class TimeSeriesPhysicalTableScan extends BaseTimeSeriesPlanNode { + private final ServerQueryRequest _request; + private final QueryExecutor _queryExecutor; + private final ExecutorService _executorService; + + public TimeSeriesPhysicalTableScan( + String id, + ServerQueryRequest serverQueryRequest, + QueryExecutor queryExecutor, + ExecutorService executorService) { + super(id, Collections.emptyList()); + _request = serverQueryRequest; + _queryExecutor = queryExecutor; + _executorService = executorService; + } + + public ServerQueryRequest getServerQueryRequest() { + return _request; + } + + public QueryExecutor getQueryExecutor() { + return _queryExecutor; + } + + public ExecutorService getExecutorService() { + return _executorService; + } + + public String getKlass() { + return TimeSeriesPhysicalTableScan.class.getName(); + } + + @Override + public String getExplainName() { + return "PHYSICAL_TABLE_SCAN"; + } + + @Override + public BaseTimeSeriesOperator run() { + return new LeafTimeSeriesOperator(_request, _queryExecutor, _executorService); + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index fc8874911ec..e619087ae0d 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -18,10 +18,12 @@ */ package org.apache.pinot.query.service.dispatch; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import io.grpc.Deadline; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -36,9 +38,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import org.apache.calcite.runtime.PairList; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.proto.Worker; +import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; @@ -60,10 +64,16 @@ import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator; import org.apache.pinot.query.runtime.plan.MultiStageQueryStats; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; +import org.apache.pinot.query.service.dispatch.timeseries.AsyncQueryTimeSeriesDispatchResponse; +import org.apache.pinot.query.service.dispatch.timeseries.TimeSeriesDispatchClient; import org.apache.pinot.spi.accounting.ThreadExecutionContext; import org.apache.pinot.spi.trace.RequestContext; import org.apache.pinot.spi.trace.Tracing; import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants.WorkerRequestMetadataKeys; +import org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants.WorkerResponseMetadataKeys; +import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan; +import org.apache.pinot.tsdb.planner.physical.TimeSeriesQueryServerInstance; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,10 +84,12 @@ public class QueryDispatcher { private static final Logger LOGGER = LoggerFactory.getLogger(QueryDispatcher.class); private static final String PINOT_BROKER_QUERY_DISPATCHER_FORMAT = "multistage-query-dispatch-%d"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private final MailboxService _mailboxService; private final ExecutorService _executorService; private final Map _dispatchClientMap = new ConcurrentHashMap<>(); + private final Map _timeSeriesDispatchClientMap = new ConcurrentHashMap<>(); public QueryDispatcher(MailboxService mailboxService) { _mailboxService = mailboxService; @@ -103,6 +115,41 @@ public QueryResult submitAndReduce(RequestContext context, DispatchableSubPlan d } } + public PinotBrokerTimeSeriesResponse submitAndGet(RequestContext context, TimeSeriesDispatchablePlan plan, + long timeoutMs, Map queryOptions) { + long requestId = context.getRequestId(); + BlockingQueue receiver = new ArrayBlockingQueue<>(10); + try { + submit(requestId, plan, timeoutMs, queryOptions, receiver::offer); + AsyncQueryTimeSeriesDispatchResponse received = receiver.poll(timeoutMs, TimeUnit.MILLISECONDS); + if (received == null) { + return PinotBrokerTimeSeriesResponse.newErrorResponse( + "TimeoutException", "Timed out waiting for response"); + } + if (received.getThrowable() != null) { + Throwable t = received.getThrowable(); + return PinotBrokerTimeSeriesResponse.newErrorResponse(t.getClass().getSimpleName(), t.getMessage()); + } + if (received.getQueryResponse() == null) { + return PinotBrokerTimeSeriesResponse.newErrorResponse("NullResponse", "Received null response from server"); + } + if (received.getQueryResponse().containsMetadata( + WorkerResponseMetadataKeys.ERROR_MESSAGE)) { + return PinotBrokerTimeSeriesResponse.newErrorResponse( + received.getQueryResponse().getMetadataOrDefault( + WorkerResponseMetadataKeys.ERROR_TYPE, "unknown error-type"), + received.getQueryResponse().getMetadataOrDefault( + WorkerResponseMetadataKeys.ERROR_MESSAGE, "unknown error")); + } + Worker.TimeSeriesResponse timeSeriesResponse = received.getQueryResponse(); + Preconditions.checkNotNull(timeSeriesResponse, "time series response is null"); + return OBJECT_MAPPER.readValue( + timeSeriesResponse.getPayload().toStringUtf8(), PinotBrokerTimeSeriesResponse.class); + } catch (Throwable t) { + return PinotBrokerTimeSeriesResponse.newErrorResponse(t.getClass().getSimpleName(), t.getMessage()); + } + } + @VisibleForTesting void submit(long requestId, DispatchableSubPlan dispatchableSubPlan, long timeoutMs, Map queryOptions) throws Exception { @@ -211,6 +258,37 @@ void submit(long requestId, DispatchableSubPlan dispatchableSubPlan, long timeou } } + void submit(long requestId, TimeSeriesDispatchablePlan plan, long timeoutMs, Map queryOptions, + Consumer receiver) + throws Exception { + Deadline deadline = Deadline.after(timeoutMs, TimeUnit.MILLISECONDS); + String serializedPlan = plan.getSerializedPlan(); + Worker.TimeSeriesQueryRequest request = Worker.TimeSeriesQueryRequest.newBuilder() + .setDispatchPlan(ByteString.copyFrom(serializedPlan, StandardCharsets.UTF_8)) + .putAllMetadata(initializeTimeSeriesMetadataMap(plan)) + .putMetadata(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, Long.toString(requestId)) + .build(); + getOrCreateTimeSeriesDispatchClient(plan.getQueryServerInstance()).submit(request, + new QueryServerInstance(plan.getQueryServerInstance().getHostname(), + plan.getQueryServerInstance().getQueryServicePort(), plan.getQueryServerInstance().getQueryMailboxPort()), + deadline, receiver::accept); + }; + + Map initializeTimeSeriesMetadataMap(TimeSeriesDispatchablePlan dispatchablePlan) { + Map result = new HashMap<>(); + result.put(WorkerRequestMetadataKeys.LANGUAGE, dispatchablePlan.getLanguage()); + result.put(WorkerRequestMetadataKeys.START_TIME_SECONDS, + Long.toString(dispatchablePlan.getTimeBuckets().getStartTime())); + result.put(WorkerRequestMetadataKeys.WINDOW_SECONDS, + Long.toString(dispatchablePlan.getTimeBuckets().getBucketSize().getSeconds())); + result.put(WorkerRequestMetadataKeys.NUM_ELEMENTS, + Long.toString(dispatchablePlan.getTimeBuckets().getTimeBuckets().length)); + for (Map.Entry> entry : dispatchablePlan.getPlanIdToSegments().entrySet()) { + result.put(WorkerRequestMetadataKeys.encodeSegmentListKey(entry.getKey()), String.join(",", entry.getValue())); + } + return result; + } + private static class StageInfo { final ByteString _rootNode; final ByteString _customProperty; @@ -245,6 +323,14 @@ private DispatchClient getOrCreateDispatchClient(QueryServerInstance queryServer return _dispatchClientMap.computeIfAbsent(key, k -> new DispatchClient(hostname, port)); } + private TimeSeriesDispatchClient getOrCreateTimeSeriesDispatchClient( + TimeSeriesQueryServerInstance queryServerInstance) { + String hostname = queryServerInstance.getHostname(); + int port = queryServerInstance.getQueryServicePort(); + String key = String.format("%s_%d", hostname, port); + return _timeSeriesDispatchClientMap.computeIfAbsent(key, k -> new TimeSeriesDispatchClient(hostname, port)); + } + @VisibleForTesting public static QueryResult runReducer(long requestId, DispatchableSubPlan dispatchableSubPlan, long timeoutMs, Map queryOptions, MailboxService mailboxService) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/AsyncQueryTimeSeriesDispatchResponse.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/AsyncQueryTimeSeriesDispatchResponse.java new file mode 100644 index 00000000000..b00919d3bf3 --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/AsyncQueryTimeSeriesDispatchResponse.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.service.dispatch.timeseries; + +import javax.annotation.Nullable; +import org.apache.pinot.common.proto.Worker; +import org.apache.pinot.query.routing.QueryServerInstance; + + +/** + * Response of the broker dispatch to the server. + * TODO: This shouldn't exist and we should re-use AsyncQueryDispatchResponse. TBD as part of multi-stage + * engine integration. + */ +public class AsyncQueryTimeSeriesDispatchResponse { + private final QueryServerInstance _serverInstance; + private final Worker.TimeSeriesResponse _queryResponse; + private final Throwable _throwable; + + public AsyncQueryTimeSeriesDispatchResponse(QueryServerInstance serverInstance, + @Nullable Worker.TimeSeriesResponse queryResponse, + @Nullable Throwable throwable) { + _serverInstance = serverInstance; + _queryResponse = queryResponse; + _throwable = throwable; + } + + public QueryServerInstance getServerInstance() { + return _serverInstance; + } + + @Nullable + public Worker.TimeSeriesResponse getQueryResponse() { + return _queryResponse; + } + + @Nullable + public Throwable getThrowable() { + return _throwable; + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchClient.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchClient.java new file mode 100644 index 00000000000..a68e636b96a --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchClient.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.service.dispatch.timeseries; + +import io.grpc.Deadline; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import java.util.function.Consumer; +import org.apache.pinot.common.proto.PinotQueryWorkerGrpc; +import org.apache.pinot.common.proto.Worker; +import org.apache.pinot.query.routing.QueryServerInstance; + + +/** + * Dispatch client used to dispatch a runnable plan to the server. + * TODO: This shouldn't exist and we should re-use DispatchClient. TBD as part of multi-stage + * engine integration. + */ +public class TimeSeriesDispatchClient { + private final ManagedChannel _channel; + private final PinotQueryWorkerGrpc.PinotQueryWorkerStub _dispatchStub; + + public TimeSeriesDispatchClient(String host, int port) { + _channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build(); + _dispatchStub = PinotQueryWorkerGrpc.newStub(_channel); + } + + public ManagedChannel getChannel() { + return _channel; + } + + public void submit(Worker.TimeSeriesQueryRequest request, QueryServerInstance virtualServer, Deadline deadline, + Consumer callback) { + _dispatchStub.withDeadline(deadline).submitTimeSeries( + request, new TimeSeriesDispatchObserver(virtualServer, callback)); + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java new file mode 100644 index 00000000000..ccfe0e122cb --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.service.dispatch.timeseries; + +import io.grpc.stub.StreamObserver; +import java.util.function.Consumer; +import org.apache.pinot.common.proto.Worker; +import org.apache.pinot.query.routing.QueryServerInstance; + + +/** + * Response observer for a time-series query request. + * TODO: This shouldn't exist and we should re-use DispatchObserver. TBD as part of multi-stage + * engine integration. + */ +public class TimeSeriesDispatchObserver implements StreamObserver { + private final QueryServerInstance _serverInstance; + private final Consumer _callback; + + private Worker.TimeSeriesResponse _timeSeriesResponse; + + public TimeSeriesDispatchObserver(QueryServerInstance serverInstance, + Consumer callback) { + _serverInstance = serverInstance; + _callback = callback; + } + + @Override + public void onNext(Worker.TimeSeriesResponse timeSeriesResponse) { + _timeSeriesResponse = timeSeriesResponse; + } + + @Override + public void onError(Throwable throwable) { + _callback.accept( + new AsyncQueryTimeSeriesDispatchResponse( + _serverInstance, + Worker.TimeSeriesResponse.getDefaultInstance(), + throwable)); + } + + @Override + public void onCompleted() { + _callback.accept( + new AsyncQueryTimeSeriesDispatchResponse( + _serverInstance, + _timeSeriesResponse, + null)); + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java index 65e4ca7df07..3a28454db5b 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.query.service.server; +import com.google.protobuf.ByteString; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; @@ -192,6 +193,13 @@ public void submit(Worker.QueryRequest request, StreamObserver responseObserver) { + ByteString bytes = request.getDispatchPlan(); + _queryRunner.processTimeSeriesQuery(bytes.toStringUtf8(), request.getMetadataMap(), responseObserver); + } + @Override public void cancel(Worker.CancelRequest request, StreamObserver responseObserver) { try { diff --git a/pinot-timeseries/pinot-timeseries-planner/pom.xml b/pinot-timeseries/pinot-timeseries-planner/pom.xml new file mode 100644 index 00000000000..134fbc66741 --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-planner/pom.xml @@ -0,0 +1,57 @@ + + + + 4.0.0 + + org.apache.pinot + pinot-timeseries + 1.3.0-SNAPSHOT + + + pinot-timeseries-planner + + + ${basedir}/../.. + 11 + 11 + UTF-8 + + + + + org.apache.pinot + pinot-core + + + org.apache.pinot + pinot-timeseries-spi + + + org.testng + testng + test + + + + \ No newline at end of file diff --git a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanConstants.java b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanConstants.java new file mode 100644 index 00000000000..f764ea720ef --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanConstants.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.planner; + +public class TimeSeriesPlanConstants { + private TimeSeriesPlanConstants() { + } + + public static class WorkerRequestMetadataKeys { + private static final String SEGMENT_MAP_ENTRY_PREFIX = "$segmentMapEntry#"; + + private WorkerRequestMetadataKeys() { + } + + public static final String LANGUAGE = "language"; + public static final String START_TIME_SECONDS = "startTimeSeconds"; + public static final String WINDOW_SECONDS = "windowSeconds"; + public static final String NUM_ELEMENTS = "numElements"; + + public static boolean isKeySegmentList(String key) { + return key.startsWith(SEGMENT_MAP_ENTRY_PREFIX); + } + + public static String encodeSegmentListKey(String planId) { + return SEGMENT_MAP_ENTRY_PREFIX + planId; + } + + /** + * Returns the plan-id corresponding to the encoded key. + */ + public static String decodeSegmentListKey(String key) { + return key.substring(SEGMENT_MAP_ENTRY_PREFIX.length()); + } + } + + public static class WorkerResponseMetadataKeys { + public static final String ERROR_TYPE = "errorType"; + public static final String ERROR_MESSAGE = "error"; + } +} diff --git a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java new file mode 100644 index 00000000000..dada38f6f32 --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.planner; + +import com.google.common.base.Preconditions; +import java.lang.reflect.Constructor; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.request.DataSource; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.request.QuerySource; +import org.apache.pinot.core.routing.RoutingManager; +import org.apache.pinot.core.routing.RoutingTable; +import org.apache.pinot.core.transport.ServerInstance; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.trace.RequestContext; +import org.apache.pinot.tsdb.planner.physical.TableScanVisitor; +import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan; +import org.apache.pinot.tsdb.planner.physical.TimeSeriesQueryServerInstance; +import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration; +import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest; +import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanResult; +import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanner; +import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; +import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode; +import org.apache.pinot.tsdb.spi.plan.serde.TimeSeriesPlanSerde; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class TimeSeriesQueryEnvironment { + private static final Logger LOGGER = LoggerFactory.getLogger(TimeSeriesQueryEnvironment.class); + private final RoutingManager _routingManager; + private final TableCache _tableCache; + private final Map _plannerMap = new HashMap<>(); + + public TimeSeriesQueryEnvironment(PinotConfiguration config, RoutingManager routingManager, TableCache tableCache) { + _routingManager = routingManager; + _tableCache = tableCache; + } + + public void init(PinotConfiguration config) { + String[] languages = config.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey(), "") + .split(","); + LOGGER.info("Found {} configured time series languages. List: {}", languages.length, languages); + for (String language : languages) { + String configPrefix = PinotTimeSeriesConfiguration.getLogicalPlannerConfigKey(language); + String klassName = + config.getProperty(PinotTimeSeriesConfiguration.getLogicalPlannerConfigKey(language)); + Preconditions.checkNotNull(klassName, "Logical planner class not found for language: " + language); + // Create the planner with empty constructor + try { + Class klass = TimeSeriesQueryEnvironment.class.getClassLoader().loadClass(klassName); + Constructor constructor = klass.getConstructor(); + TimeSeriesLogicalPlanner planner = (TimeSeriesLogicalPlanner) constructor.newInstance(); + planner.init(config.subset(configPrefix).toMap()); + _plannerMap.put(language, planner); + } catch (Exception e) { + throw new RuntimeException("Failed to instantiate logical planner for language: " + language, e); + } + } + TableScanVisitor.INSTANCE.init(_routingManager); + } + + public TimeSeriesLogicalPlanResult buildLogicalPlan(RangeTimeSeriesRequest request) { + Preconditions.checkState(_plannerMap.containsKey(request.getLanguage()), + "No logical planner found for engine: %s. Available: %s", request.getLanguage(), + _plannerMap.keySet()); + return _plannerMap.get(request.getLanguage()).plan(request); + } + + public TimeSeriesDispatchablePlan buildPhysicalPlan(RangeTimeSeriesRequest timeSeriesRequest, + RequestContext requestContext, TimeSeriesLogicalPlanResult logicalPlan) { + // Step-1: Find tables in the query. + final Set tableNames = new HashSet<>(); + findTableNames(logicalPlan.getPlanNode(), tableNames::add); + Preconditions.checkState(tableNames.size() == 1, + "Expected exactly one table name in the logical plan, got: %s", + tableNames); + String tableName = tableNames.iterator().next(); + // Step-2: Compute routing table assuming all segments are selected. This is to perform the check to reject tables + // that span across multiple servers. + RoutingTable routingTable = _routingManager.getRoutingTable(compileBrokerRequest(tableName), + requestContext.getRequestId()); + Preconditions.checkState(routingTable != null, + "Failed to get routing table for table: %s", tableName); + Preconditions.checkState(routingTable.getServerInstanceToSegmentsMap().size() == 1, + "Only support routing to a single server. Computed: %s", + routingTable.getServerInstanceToSegmentsMap().size()); + var entry = routingTable.getServerInstanceToSegmentsMap().entrySet().iterator().next(); + ServerInstance serverInstance = entry.getKey(); + // Step-3: Assign segments to the leaf plan nodes. + TableScanVisitor.Context scanVisitorContext = TableScanVisitor.createContext(requestContext.getRequestId()); + TableScanVisitor.INSTANCE.assignSegmentsToPlan(logicalPlan.getPlanNode(), logicalPlan.getTimeBuckets(), + scanVisitorContext); + return new TimeSeriesDispatchablePlan(timeSeriesRequest.getLanguage(), + new TimeSeriesQueryServerInstance(serverInstance), + TimeSeriesPlanSerde.serialize(logicalPlan.getPlanNode()), logicalPlan.getTimeBuckets(), + scanVisitorContext.getPlanIdToSegmentMap()); + } + + public static void findTableNames(BaseTimeSeriesPlanNode planNode, Consumer tableNameConsumer) { + if (planNode instanceof ScanFilterAndProjectPlanNode) { + ScanFilterAndProjectPlanNode scanNode = (ScanFilterAndProjectPlanNode) planNode; + tableNameConsumer.accept(scanNode.getTableName()); + return; + } + for (BaseTimeSeriesPlanNode childNode : planNode.getChildren()) { + findTableNames(childNode, tableNameConsumer); + } + } + + private BrokerRequest compileBrokerRequest(String tableName) { + DataSource dataSource = new DataSource(); + dataSource.setTableName(tableName); + PinotQuery pinotQuery = new PinotQuery(); + pinotQuery.setDataSource(dataSource); + QuerySource querySource = new QuerySource(); + querySource.setTableName(tableName); + BrokerRequest dummyRequest = new BrokerRequest(); + dummyRequest.setPinotQuery(pinotQuery); + dummyRequest.setQuerySource(querySource); + return dummyRequest; + } +} diff --git a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java new file mode 100644 index 00000000000..58eccd2de0b --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.planner.physical; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.request.DataSource; +import org.apache.pinot.common.request.Expression; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.request.QuerySource; +import org.apache.pinot.core.routing.RoutingManager; +import org.apache.pinot.core.routing.RoutingTable; +import org.apache.pinot.sql.parsers.CalciteSqlParser; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; +import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode; + + +public class TableScanVisitor { + public static final TableScanVisitor INSTANCE = new TableScanVisitor(); + private RoutingManager _routingManager; + + private TableScanVisitor() { + } + + public void init(RoutingManager routingManager) { + _routingManager = routingManager; + } + + public void assignSegmentsToPlan(BaseTimeSeriesPlanNode planNode, TimeBuckets timeBuckets, Context context) { + if (planNode instanceof ScanFilterAndProjectPlanNode) { + ScanFilterAndProjectPlanNode sfpNode = (ScanFilterAndProjectPlanNode) planNode; + Expression filterExpression = CalciteSqlParser.compileToExpression(sfpNode.getEffectiveFilter(timeBuckets)); + RoutingTable routingTable = _routingManager.getRoutingTable( + compileBrokerRequest(sfpNode.getTableName(), filterExpression), + context._requestId); + Preconditions.checkNotNull(routingTable, "Failed to get routing table for table: " + sfpNode.getTableName()); + Preconditions.checkState(routingTable.getServerInstanceToSegmentsMap().size() == 1, + "Only support routing to a single server. Computed: %s", + routingTable.getServerInstanceToSegmentsMap().size()); + var entry = routingTable.getServerInstanceToSegmentsMap().entrySet().iterator().next(); + List segments = entry.getValue().getLeft(); + context.getPlanIdToSegmentMap().put(sfpNode.getId(), segments); + } + for (BaseTimeSeriesPlanNode childNode : planNode.getChildren()) { + assignSegmentsToPlan(childNode, timeBuckets, context); + } + } + + public static Context createContext(Long requestId) { + return new Context(requestId); + } + + public static class Context { + private final Map> _planIdToSegmentMap = new HashMap<>(); + private final Long _requestId; + + public Context(Long requestId) { + _requestId = requestId; + } + + public Map> getPlanIdToSegmentMap() { + return _planIdToSegmentMap; + } + } + + private BrokerRequest compileBrokerRequest(String tableName, Expression filterExpression) { + DataSource dataSource = new DataSource(); + dataSource.setTableName(tableName); + PinotQuery pinotQuery = new PinotQuery(); + pinotQuery.setDataSource(dataSource); + pinotQuery.setFilterExpression(filterExpression); + QuerySource querySource = new QuerySource(); + querySource.setTableName(tableName); + BrokerRequest dummyRequest = new BrokerRequest(); + dummyRequest.setPinotQuery(pinotQuery); + dummyRequest.setQuerySource(querySource); + return dummyRequest; + } +} diff --git a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TimeSeriesDispatchablePlan.java b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TimeSeriesDispatchablePlan.java new file mode 100644 index 00000000000..6c64a396d82 --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TimeSeriesDispatchablePlan.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.planner.physical; + +import java.util.List; +import java.util.Map; +import org.apache.pinot.tsdb.spi.TimeBuckets; + + +public class TimeSeriesDispatchablePlan { + private final TimeSeriesQueryServerInstance _queryServerInstance; + private final String _language; + private final String _serializedPlan; + private final TimeBuckets _timeBuckets; + private final Map> _planIdToSegments; + + public TimeSeriesDispatchablePlan(String language, TimeSeriesQueryServerInstance queryServerInstance, + String serializedPlan, TimeBuckets timeBuckets, Map> planIdToSegments) { + _language = language; + _queryServerInstance = queryServerInstance; + _serializedPlan = serializedPlan; + _timeBuckets = timeBuckets; + _planIdToSegments = planIdToSegments; + } + + public String getLanguage() { + return _language; + } + + public TimeSeriesQueryServerInstance getQueryServerInstance() { + return _queryServerInstance; + } + + public String getSerializedPlan() { + return _serializedPlan; + } + + public TimeBuckets getTimeBuckets() { + return _timeBuckets; + } + + public Map> getPlanIdToSegments() { + return _planIdToSegments; + } +} diff --git a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TimeSeriesQueryServerInstance.java b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TimeSeriesQueryServerInstance.java new file mode 100644 index 00000000000..cf1768a7444 --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TimeSeriesQueryServerInstance.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.planner.physical; + +import org.apache.pinot.core.transport.ServerInstance; + + +public class TimeSeriesQueryServerInstance { + private final String _hostname; + private final int _queryServicePort; + private final int _queryMailboxPort; + + public TimeSeriesQueryServerInstance(ServerInstance serverInstance) { + this(serverInstance.getHostname(), serverInstance.getQueryServicePort(), serverInstance.getQueryMailboxPort()); + } + + public TimeSeriesQueryServerInstance(String hostname, int queryServicePort, int queryMailboxPort) { + _hostname = hostname; + _queryServicePort = queryServicePort; + _queryMailboxPort = queryMailboxPort; + } + + public String getHostname() { + return _hostname; + } + + public int getQueryServicePort() { + return _queryServicePort; + } + + public int getQueryMailboxPort() { + return _queryMailboxPort; + } +} diff --git a/pinot-timeseries/pinot-timeseries-spi/pom.xml b/pinot-timeseries/pinot-timeseries-spi/pom.xml index c21e9971d50..1683928749d 100644 --- a/pinot-timeseries/pinot-timeseries-spi/pom.xml +++ b/pinot-timeseries/pinot-timeseries-spi/pom.xml @@ -25,9 +25,8 @@ 4.0.0 org.apache.pinot - pinot + pinot-timeseries 1.3.0-SNAPSHOT - ../../pom.xml pinot-timeseries-spi diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java index 2c4fc045a4a..ecbc3b3f6bb 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java @@ -49,7 +49,7 @@ */ public class RangeTimeSeriesRequest { /** Engine allows a Pinot cluster to support multiple Time-Series Query Languages. */ - private final String _engine; + private final String _language; /** Query is the raw query sent by the caller. */ private final String _query; /** Start time of the time-window being queried. */ @@ -63,22 +63,25 @@ public class RangeTimeSeriesRequest { private final long _stepSeconds; /** E2E timeout for the query. */ private final Duration _timeout; + /** Full query string to allow language implementations to pass custom parameters. */ + private final String _fullQueryString; - public RangeTimeSeriesRequest(String engine, String query, long startSeconds, long endSeconds, long stepSeconds, - Duration timeout) { + public RangeTimeSeriesRequest(String language, String query, long startSeconds, long endSeconds, long stepSeconds, + Duration timeout, String fullQueryString) { Preconditions.checkState(endSeconds >= startSeconds, "Invalid range. startSeconds " + "should be greater than or equal to endSeconds. Found startSeconds=%s and endSeconds=%s", startSeconds, endSeconds); - _engine = engine; + _language = language; _query = query; _startSeconds = startSeconds; _endSeconds = endSeconds; _stepSeconds = stepSeconds; _timeout = timeout; + _fullQueryString = fullQueryString; } - public String getEngine() { - return _engine; + public String getLanguage() { + return _language; } public String getQuery() { @@ -100,4 +103,8 @@ public long getStepSeconds() { public Duration getTimeout() { return _timeout; } + + public String getFullQueryString() { + return _fullQueryString; + } } diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/SimpleTimeSeriesBuilderFactory.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/SimpleTimeSeriesBuilderFactory.java index 2cece60cde3..9ed40954e5f 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/SimpleTimeSeriesBuilderFactory.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/SimpleTimeSeriesBuilderFactory.java @@ -28,9 +28,7 @@ public class SimpleTimeSeriesBuilderFactory extends TimeSeriesBuilderFactory { - public static final SimpleTimeSeriesBuilderFactory INSTANCE = new SimpleTimeSeriesBuilderFactory(); - - private SimpleTimeSeriesBuilderFactory() { + public SimpleTimeSeriesBuilderFactory() { super(); } diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java index fe7fd5be422..e1e89d624d3 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import javax.annotation.Nullable; import org.apache.pinot.tsdb.spi.TimeBuckets; @@ -33,11 +34,12 @@ public class TimeSeriesBlock { private final TimeBuckets _timeBuckets; private final Map> _seriesMap; - public TimeSeriesBlock(TimeBuckets timeBuckets, Map> seriesMap) { + public TimeSeriesBlock(@Nullable TimeBuckets timeBuckets, Map> seriesMap) { _timeBuckets = timeBuckets; _seriesMap = seriesMap; } + @Nullable public TimeBuckets getTimeBuckets() { return _timeBuckets; } diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactoryProvider.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactoryProvider.java index 942bffdf18a..e82d3bdd444 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactoryProvider.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactoryProvider.java @@ -42,7 +42,8 @@ public static void init(PinotConfiguration pinotConfiguration) { String seriesBuilderClass = pinotConfiguration .getProperty(PinotTimeSeriesConfiguration.getSeriesBuilderFactoryConfigKey(language)); try { - Object untypedSeriesBuilderFactory = Class.forName(seriesBuilderClass).getConstructor().newInstance(); + Class klass = TimeSeriesBuilderFactoryProvider.class.getClassLoader().loadClass(seriesBuilderClass); + Object untypedSeriesBuilderFactory = klass.getConstructor().newInstance(); if (!(untypedSeriesBuilderFactory instanceof TimeSeriesBuilderFactory)) { throw new RuntimeException("Series builder factory class " + seriesBuilderClass + " does not implement SeriesBuilderFactory"); diff --git a/pinot-timeseries/pom.xml b/pinot-timeseries/pom.xml index f49d0cb922f..47452054c8e 100644 --- a/pinot-timeseries/pom.xml +++ b/pinot-timeseries/pom.xml @@ -27,7 +27,6 @@ org.apache.pinot pinot 1.3.0-SNAPSHOT - .. pom @@ -39,6 +38,7 @@ pinot-timeseries-spi + pinot-timeseries-planner diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/TimeSeriesEngineQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/TimeSeriesEngineQuickStart.java new file mode 100644 index 00000000000..0b00e2dad62 --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/TimeSeriesEngineQuickStart.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tools; + +import com.google.common.base.Preconditions; +import java.io.File; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.tools.admin.command.QuickstartRunner; +import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration; +import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory; + + +public class TimeSeriesEngineQuickStart extends Quickstart { + private static final String[] TIME_SERIES_TABLE_DIRECTORIES = new String[]{ + "examples/batch/airlineStats", + "examples/batch/baseballStats", + "examples/batch/billing", + "examples/batch/dimBaseballTeams", + "examples/batch/githubEvents", + "examples/batch/githubComplexTypeEvents", + "examples/batch/ssb/customer", + "examples/batch/ssb/dates", + "examples/batch/ssb/lineorder", + "examples/batch/ssb/part", + "examples/batch/ssb/supplier", + "examples/batch/starbucksStores", + "examples/batch/fineFoodReviews", + }; + + @Override + public String[] getDefaultBatchTableDirectories() { + return TIME_SERIES_TABLE_DIRECTORIES; + } + + @Override + public List types() { + return Collections.singletonList("TIME_SERIES"); + } + + @Override + protected Map getConfigOverrides() { + Map configs = new HashMap<>(); + configs.put(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey(), "m3ql"); + configs.put(PinotTimeSeriesConfiguration.getLogicalPlannerConfigKey("m3ql"), + "org.apache.pinot.tsdb.m3ql.M3TimeSeriesPlanner"); + configs.put(PinotTimeSeriesConfiguration.getSeriesBuilderFactoryConfigKey("m3ql"), + SimpleTimeSeriesBuilderFactory.class.getName()); + return configs; + } + + @Override + public void execute() + throws Exception { + File quickstartTmpDir = + _setCustomDataDir ? _dataDir : new File(_dataDir, String.valueOf(System.currentTimeMillis())); + File quickstartRunnerDir = new File(quickstartTmpDir, "quickstart"); + Preconditions.checkState(quickstartRunnerDir.mkdirs()); + List quickstartTableRequests = bootstrapStreamTableDirectories(quickstartTmpDir); + final QuickstartRunner runner = + new QuickstartRunner(quickstartTableRequests, 1, 1, 1, 1, quickstartRunnerDir, getConfigOverrides()); + + startKafka(); + startAllDataStreams(_kafkaStarter, quickstartTmpDir); + + printStatus(Color.CYAN, "***** Starting Zookeeper, controller, broker, server and minion *****"); + runner.startAll(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + printStatus(Color.GREEN, "***** Shutting down realtime quick start *****"); + runner.stop(); + FileUtils.deleteDirectory(quickstartTmpDir); + } catch (Exception e) { + e.printStackTrace(); + } + })); + + printStatus(Color.CYAN, "***** Bootstrap all tables *****"); + runner.bootstrapTable(); + + printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to get populated *****"); + Thread.sleep(5000); + + printStatus(Color.YELLOW, "***** Realtime quickstart setup complete *****"); + runSampleQueries(runner); + + printStatus(Color.GREEN, "You can always go to http://localhost:9000 to play around in the query console"); + } +} diff --git a/pom.xml b/pom.xml index af640c1670d..5aabff1f8e6 100644 --- a/pom.xml +++ b/pom.xml @@ -494,6 +494,11 @@ pinot-timeseries-spi ${project.version} + + org.apache.pinot + pinot-timeseries-planner + ${project.version} +