-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Part-1: Pinot Timeseries Engine SPI #13885
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #13885 +/- ##
============================================
- Coverage 61.75% 57.89% -3.86%
- Complexity 207 219 +12
============================================
Files 2436 2612 +176
Lines 133233 143175 +9942
Branches 20636 21982 +1346
============================================
+ Hits 82274 82894 +620
- Misses 44911 53802 +8891
- Partials 6048 6479 +431
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java
Outdated
Show resolved
Hide resolved
*/ | ||
package org.apache.pinot.tsdb.spi; | ||
|
||
public class PinotTimeSeriesConfigs { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why not use the full name PinotTimeSeriesConfiguration following the PinotConfiguration convention.
...ies/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/PinotTimeSeriesConfigs.java
Outdated
Show resolved
Hide resolved
...ies/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
Outdated
Show resolved
Hide resolved
...t-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/Series.java
Outdated
Show resolved
Hide resolved
...eseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/SeriesBlock.java
Outdated
Show resolved
Hide resolved
*/ | ||
public class SeriesBlock { | ||
private final TimeBuckets _timeBuckets; | ||
private final Map<Long, List<Series>> _seriesMap; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the comment, the key of the map is series ID (which is supposed to be a string type). Why use Long here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Efficiency. String comparisons and copies are costly (interning might help but there's a lot of nuance to it).
When a series is built for the first time, we will convert the string id of the series to a Long hash and use that from there on. You can refer to this operator in the full working code: https://github.com/ankitsultana/pinot/pull/35/files#diff-8e88071ce9fc459e5fa8f4ade8f6ab9598d4edf7909ad054fd537804348944a6
Also, note that we will likely optimize this pretty soon and this representation may change significantly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^ Adding one more point: I think the API is not that clean right now. I had left a TODO in Series#hash
about it.
I need more code to be merged to clean this up since I want to look at the call-sites to figure out a proper design for this.
...timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MaxSeriesBuilder.java
Outdated
Show resolved
Hide resolved
...series-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/SummingSeriesBuilder.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems you are adding the ser/de layer in this PR. I don't think they belong to SPI
pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java
Outdated
Show resolved
Hide resolved
This is temporary and I can add a TODO (or create an issue). Right now I am relying on Jackson for the serde and we unfortunately have some Serde related code in the plan nodes (JsonCreator, JsonProperty, etc.). Keeping the SPI here for now allows users to test their plans easily. I am thinking of building a better approach once the baseline implementation is ready. |
...not-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/PinotTimeSeriesConfiguration.java
Outdated
Show resolved
Hide resolved
Co-authored-by: Xiaotian (Jackie) Jiang <17555551+Jackie-Jiang@users.noreply.github.com>
/** Query is the raw query sent by the caller. */ | ||
private final String _query; | ||
/** Start time of the time-window being queried. */ | ||
private final long _startSeconds; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make these time series request have millisecond granularity? What is step resolution is less than a second?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The granularity here controls the minimum granularity of the response and the query execution. I don't think we will ever support granularity of less than a second. I had kept this compliant with Prometheus and other time-series systems (our M3 system at Uber also uses seconds for specifying range).
private final String _aggFunction; | ||
|
||
@JsonCreator | ||
public AggInfo(@JsonProperty("aggFunction") String aggFunction) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the test I found that aggFunction is not nullable. Can we enforce it otherwise exception is thrown deep in the stack?
/** E2E timeout for the query. */ | ||
private final Duration _timeout; | ||
|
||
public RangeTimeSeriesRequest(String engine, String query, long startSeconds, long endSeconds, long stepSeconds, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add input validation like endtime >= starttime?
|
||
/** | ||
* This would typically be the leaf node of a plan-tree generated by a time-series engine's logical planner. At runtime, | ||
* this gets compiled to a Combine Operator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ankitsultana promql has function operators like rate, increase.delta etc. Can they be plugged into leaf plan node as function? I think we will need a different function operator, thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rate and other compound functions can be implemented by using "partial aggregates". I was planning to add an example after the baseline implementation is done. After this and the next PR, we will only support simple aggregates where the intermediate result has the same type as the final result. In a following PR I will add support for functions like rate, percentile, etc. Tracker: #13957
_tableName = tableName; | ||
_timeColumn = timeColumn; | ||
_timeUnit = timeUnit; | ||
// TODO: This is broken technically. Adjust offset to meet TimeUnit resolution. For now use 0 offset. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does the comment mean? Is it aligning the offsets to time resolution boundaries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue is that timeUnit here represents time-unit of the stored time column. But the offset is always in seconds right now. We need to change the offset to match the time-unit. I am also tracking this here: #13957
|
||
public String getEffectiveFilter(TimeBuckets timeBuckets) { | ||
String filter = _filterExpression == null ? "" : _filterExpression; | ||
// TODO: This is wrong. offset should be converted to seconds before arithmetic. For now use 0 offset. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the comment valid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah same as the above comment. I am tracking this issue here: #13957
* Each time-series operator would typically call either of {@link #addValue} or {@link #addValueAtIndex}. When | ||
* the operator is done, it will call {@link #build()} to allow the builder to compute the final {@link TimeSeries}. | ||
*/ | ||
public abstract class BaseTimeSeriesBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you see any issue if Function operator like rate, increase in promql passed as aggFunc? aggFunc name seems little confusing for rate. What are your thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we need to fix it. Will be picked up after E2E basic implementation.
* <b>Context:</b>We provide some ready to use implementations for some of the most common use-cases in the SPI. This | ||
* reduces redundancy and also serves as a reference implementation for language developers. | ||
*/ | ||
public class MinTimeSeriesBuilder extends BaseTimeSeriesBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think we should add a series builder in spi which returns the raw data? the use case would Instant vector or range vector in promql.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup good point. Will be picking up instant vector after the E2E implementation. We will also need to add a new combine operator for it.
...s/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanner.java
Show resolved
Hide resolved
...imeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/ScanFilterAndProjectPlanNode.java
Show resolved
Hide resolved
/** | ||
* We have implemented a custom serialization/deserialization mechanism for time series plans. This allows users to | ||
* use Jackson to annotate their plan nodes as shown in {@link ScanFilterAndProjectPlanNode}, which is used for | ||
* plan serde for broker/server communication. | ||
* TODO: There are limitations to this and we will change this soon. Issues: | ||
* 1. Pinot TS SPI is compiled in Pinot distribution and Jackson deps get shaded usually. | ||
* 2. The plugins have to shade the dependency in the exact same way, which is obviously error-prone and not ideal. | ||
*/ | ||
@InterfaceStability.Evolving | ||
public class TimeSeriesPlanSerde { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the 3rd time we define how to serialize and deserialize plans and we still have issues in multi-stage query engine (ie I had to write a lot of code to be able to implement physical explain in multi-stage query engine, see #13733).
I think we should honestly think about moving to something more general we can always use, like substrait
...imeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/ScanFilterAndProjectPlanNode.java
Show resolved
Hide resolved
public class TimeSeries { | ||
private final String _id; | ||
private final Long[] _timeValues; | ||
private final TimeBuckets _timeBuckets; | ||
private final Double[] _values; | ||
private final List<String> _tagNames; | ||
private final Object[] _tagValues; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is fine for now, but IICU we are going to have tons of these objects during runtime. Therefore thinking about memory layout is pretty important. We should plan to create different TimeSeries for different data types with specific memory layouts. My main concern is the fact that we are using boxed arries, which are close to twice as expensive in terms of memory as a primitive array. Substituting _values
with a double[]
and a BitSet
that marks the nulls should be quite cheaper in terms of memory and faster in terms of calculation. Same with _timeValues
.
Notice that I'm assuming values should be in the order or thousands at most, so BitSet
should be better than RoaringBitmap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the design doc states, optimizations are for Phase-3 and we are aiming for simplicity and completeness for now. But yes agreed on all points
The design doc and the corresponding issue was raised ~2 weeks ago: #13760
You can find the reference complete working implementation here. It also has instructions on how you can kickstart it on your local. The PR has slight improvements and code-cleanup over the full implementation: ankitsultana#35