Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Part-1: Pinot Timeseries Engine SPI #13885

Merged
merged 13 commits into from
Sep 9, 2024
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ yarn-error.log*
quickstart*

#build symlink directory
build*
build
build/*

#helm related files
kubernetes/helm/**/charts/
Expand Down
49 changes: 49 additions & 0 deletions pinot-timeseries/pinot-timeseries-spi/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot</artifactId>
<version>1.3.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>pinot-timeseries-spi</artifactId>

<properties>
<pinot.root>${basedir}/../..</pinot.root>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-spi</artifactId>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
ankitsultana marked this conversation as resolved.
Show resolved Hide resolved
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.tsdb.spi;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;


/**
* AggInfo is used to represent the aggregation function. Aggregation functions are simply stored as a string,
* since time-series languages are allowed to implement their own aggregation functions.
* TODO: We will likely be adding more parameters to this. One candidate is partial/full aggregation information or
* aggregation result type to allow for intermediate result types.
*/
public class AggInfo {
private final String _aggFunction;

@JsonCreator
public AggInfo(@JsonProperty("aggFunction") String aggFunction) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the test I found that aggFunction is not nullable. Can we enforce it otherwise exception is thrown deep in the stack?

_aggFunction = aggFunction;
}

public String getAggFunction() {
return _aggFunction;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.tsdb.spi;

import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory;


public class PinotTimeSeriesConfiguration {
private PinotTimeSeriesConfiguration() {
}

public static final String CONFIG_PREFIX = "pinot.timeseries";
private static final String ENABLE_LANGUAGES_SUFFIX = ".languages";
private static final String SERIES_BUILDER_FACTORY_SUFFIX = ".series.builder.factory";
private static final String LOGICAL_PLANNER_CLASS_SUFFIX = ".logical.planner.class";

/**
* Config key that controls which time-series languages are enabled in a given Pinot cluster.
*/
public static String getEnabledLanguagesConfigKey() {
return CONFIG_PREFIX + ENABLE_LANGUAGES_SUFFIX;
}

/**
* Returns the config key which determines the class name for the {@link TimeSeriesBuilderFactory} to be used for a
* given language. Each language can have its own {@link TimeSeriesBuilderFactory}, which allows each language to
* support custom time-series functions.
*/
public static String getSeriesBuilderFactoryConfigKey(String language) {
return CONFIG_PREFIX + "." + language + SERIES_BUILDER_FACTORY_SUFFIX;
}

/**
* Returns config key which determines the class name for the {@link TimeSeriesLogicalPlanner} to be used for a given
* language. Pinot broker will load this logical planner on start-up dynamically. This is called for each language
* configured via {@link #getEnabledLanguagesConfigKey()}.
*/
public static String getLogicalPlannerConfigKey(String language) {
return CONFIG_PREFIX + "." + language + LOGICAL_PLANNER_CLASS_SUFFIX;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.tsdb.spi;

import java.time.Duration;


/**
* A time-series request received by the Pinot Broker. This is passed to the {@link TimeSeriesLogicalPlanner} so
* each query language can parse and plan the query based on their spec.
* <br/>
* <br/>
* <b>Notes:</b>
* <ul>
* <li>[start, end] are both inclusive.</li>
* <li>
* The result can contain time values outside [start, end], though we generally recommend to keep your results
* within the requested range. This decision is left to the time-series query language implementations. In some
* cases, returning data outside the requested time-range can help (e.g. for debugging purposes when you are
* computing moving 1d sum but are only looking at data for the last 12 hours).
* </li>
* <li>stepSeconds is used to define the default resolution for the query</li>
* <li>
* Some query languages allow users to change the resolution via a function, and in those cases the returned
* time-series may have a resolution different than stepSeconds
* </li>
* <li>
* The query execution may scan and process data outside of the time-range [start, end]. The actual data scanned
* and processed is defined by the {@link TimeBuckets} used by the operator.
* </li>
* </ul>
*/
public class RangeTimeSeriesRequest {
/** Engine allows a Pinot cluster to support multiple Time-Series Query Languages. */
private final String _engine;
/** Query is the raw query sent by the caller. */
private final String _query;
/** Start time of the time-window being queried. */
private final long _startSeconds;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make these time series request have millisecond granularity? What is step resolution is less than a second?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The granularity here controls the minimum granularity of the response and the query execution. I don't think we will ever support granularity of less than a second. I had kept this compliant with Prometheus and other time-series systems (our M3 system at Uber also uses seconds for specifying range).

/** End time of the time-window being queried. */
private final long _endSeconds;
/**
* <b>Optional</b> field which the caller can use to suggest the default resolution for the query. Language
* implementations can choose to skip this suggestion and choose their own resolution based on their semantics.
*/
private final long _stepSeconds;
/** E2E timeout for the query. */
private final Duration _timeout;

public RangeTimeSeriesRequest(String engine, String query, long startSeconds, long endSeconds, long stepSeconds,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add input validation like endtime >= starttime?

Duration timeout) {
_engine = engine;
_query = query;
_startSeconds = startSeconds;
_endSeconds = endSeconds;
_stepSeconds = stepSeconds;
_timeout = timeout;
}

public String getEngine() {
return _engine;
}

public String getQuery() {
return _query;
}

public long getStartSeconds() {
return _startSeconds;
}

public long getEndSeconds() {
return _endSeconds;
}

public long getStepSeconds() {
return _stepSeconds;
}

public Duration getTimeout() {
return _timeout;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.tsdb.spi;

import java.time.Duration;
import java.util.Arrays;
import java.util.Objects;


/**
* Time buckets used for query execution. Each element (say x) in the {@link #getTimeBuckets()} array represents a
* time-range which is half open on the right side: [x, x + bucketSize.getSeconds()). Some query languages allow some
* operators to mutate the time-buckets on the fly, so it is not guaranteed that the time resolution and/or range
* will be the same across all operators. For instance, Uber's M3QL supports a "summarize 1h sum" operator which will
* change the bucket resolution to 1 hour for all subsequent operators.
*/
public class TimeBuckets {
private final Long[] _timeBuckets;
private final Duration _bucketSize;

private TimeBuckets(Long[] timeBuckets, Duration bucketSize) {
_timeBuckets = timeBuckets;
_bucketSize = bucketSize;
}

public Long[] getTimeBuckets() {
return _timeBuckets;
}

public Duration getBucketSize() {
return _bucketSize;
}

public long getStartTime() {
return _timeBuckets[0];
}

public long getEndTime() {
return _timeBuckets[_timeBuckets.length - 1];
}

public long getRangeSeconds() {
return _timeBuckets[_timeBuckets.length - 1] - _timeBuckets[0];
}

public int getNumBuckets() {
return _timeBuckets.length;
}

public int resolveIndex(long timeValue) {
if (_timeBuckets.length == 0) {
return -1;
}
if (timeValue < _timeBuckets[0]) {
return -1;
}
if (timeValue >= _timeBuckets[_timeBuckets.length - 1] + _bucketSize.getSeconds()) {
return -1;
}
return (int) ((timeValue - _timeBuckets[0]) / _bucketSize.getSeconds());
}

@Override
public boolean equals(Object o) {
if (!(o instanceof TimeBuckets)) {
return false;
}
TimeBuckets other = (TimeBuckets) o;
return this.getStartTime() == other.getStartTime() && this.getEndTime() == other.getEndTime()
&& this.getBucketSize().equals(other.getBucketSize());
}

@Override
public int hashCode() {
int result = Objects.hash(_bucketSize);
result = 31 * result + Arrays.hashCode(_timeBuckets);
return result;
}

public static TimeBuckets ofSeconds(long startTimeSeconds, Duration bucketSize, int numElements) {
long stepSize = bucketSize.getSeconds();
Long[] timeBuckets = new Long[numElements];
for (int i = 0; i < numElements; i++) {
timeBuckets[i] = startTimeSeconds + i * stepSize;
}
return new TimeBuckets(timeBuckets, bucketSize);
}
}
Loading
Loading