Skip to content

Commit

Permalink
working quickstart
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitsultana committed Sep 21, 2024
1 parent 8cb8228 commit d7d3ccc
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,19 +238,20 @@ public void processSqlWithMultiStageQueryEnginePost(String query, @Suspended Asy
}
}

@POST
@GET
@ManagedAsync
@Produces(MediaType.APPLICATION_JSON)
@Path("timeseries/api/v1/query_range")
@ApiOperation(value = "Prometheus Compatible API for Pinot's Time Series Engine")
@ManualAuthorization
public void processTimeSeriesQueryEnginePost(String request, @Suspended AsyncResponse asyncResponse,
@QueryParam("engine") String engine,
public void processTimeSeriesQueryEnginePost(@Suspended AsyncResponse asyncResponse,
@QueryParam("language") String language,
@Context org.glassfish.grizzly.http.server.Request requestCtx,
@Context HttpHeaders httpHeaders) {
try {
try (RequestScope requestContext = Tracing.getTracer().createRequestScope()) {
PinotBrokerTimeSeriesResponse response = executeTimeSeriesQuery(engine, request, requestContext);
String queryString = requestCtx.getQueryString();
PinotBrokerTimeSeriesResponse response = executeTimeSeriesQuery(language, queryString, requestContext);
if (response.getErrorType() != null && !response.getErrorType().isEmpty()) {
asyncResponse.resume(Response.serverError().entity(response).build());
return;
Expand All @@ -266,7 +267,7 @@ public void processTimeSeriesQueryEnginePost(String request, @Suspended AsyncRes
}
}

@POST
@GET
@ManagedAsync
@Produces(MediaType.APPLICATION_JSON)
@Path("timeseries/api/v1/query")
Expand Down Expand Up @@ -385,9 +386,9 @@ private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterI
}
}

private PinotBrokerTimeSeriesResponse executeTimeSeriesQuery(String engine, String queryString,
private PinotBrokerTimeSeriesResponse executeTimeSeriesQuery(String language, String queryString,
RequestContext requestContext) {
return _requestHandler.handleTimeSeriesRequest(engine, queryString, requestContext);
return _requestHandler.handleTimeSeriesRequest(language, queryString, requestContext);
}

private static HttpRequesterIdentity makeHttpIdentity(org.glassfish.grizzly.http.server.Request context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String
TimeSeriesLogicalPlanResult logicalPlanResult = _queryEnvironment.buildLogicalPlan(timeSeriesRequest);
TimeSeriesDispatchablePlan dispatchablePlan = _queryEnvironment.buildPhysicalPlan(timeSeriesRequest, requestContext,
logicalPlanResult);
return _queryDispatcher.submitAndGet(requestContext, dispatchablePlan, 15_000L, new HashMap<>());
return _queryDispatcher.submitAndGet(requestContext, dispatchablePlan, timeSeriesRequest.getTimeout().toMillis(),
new HashMap<>());
}

@Override
Expand All @@ -116,10 +117,10 @@ public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpC
return false;
}

private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String engine, String rawQueryParamString)
private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String language, String queryParamString)
throws URISyntaxException {
List<NameValuePair> pairs = URLEncodedUtils.parse(
new URI("http://localhost?" + rawQueryParamString), "UTF-8");
new URI("http://localhost?" + queryParamString), "UTF-8");
String query = null;
Long startTs = null;
Long endTs = null;
Expand All @@ -143,7 +144,8 @@ private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String engine, String
timeoutStr = nameValuePair.getValue();
break;
default:
throw new IllegalArgumentException("Unknown query parameter: " + nameValuePair.getName());
/* Okay to ignore unknown parameters since the language implementor may be using them. */
break;
}
}
Preconditions.checkNotNull(query, "Query cannot be null");
Expand All @@ -154,7 +156,7 @@ private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String engine, String
timeout = HumanReadableDuration.from(timeoutStr);
}
// TODO: Pass full raw query param string to the request
return new RangeTimeSeriesRequest(engine, query, startTs, endTs, getStepSeconds(step), timeout);
return new RangeTimeSeriesRequest(language, query, startTs, endTs, getStepSeconds(step), timeout, queryParamString);
}

public static Long getStepSeconds(@Nullable String step) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void setUp()

// Setup time series builder factory
TimeSeriesBuilderFactoryProvider.registerSeriesBuilderFactory(TIME_SERIES_ENGINE_NAME,
SimpleTimeSeriesBuilderFactory.INSTANCE);
new SimpleTimeSeriesBuilderFactory());
}

@Test
Expand Down
4 changes: 2 additions & 2 deletions pinot-distribution/pinot-assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,10 @@
<!-- Start Include Pinot Time Series Plugins -->
<file>
<source>
${pinot.root}/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/target/pinot-timeseries-m3ql-${project.version}.jar
${pinot.root}/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/target/pinot-timeseries-m3ql-${project.version}-shaded.jar
</source>
<destName>
plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/pinot-timeseries-m3ql-${project.version}.jar
plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/pinot-timeseries-m3ql-${project.version}-shaded.jar
</destName>
</file>
<!-- End Include Pinot Time Series Plugins -->
Expand Down
15 changes: 15 additions & 0 deletions pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,19 @@
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
<profile>
<id>build-shaded-jar</id>
<activation>
<property>
<name>skipShade</name>
<value>!true</value>
</property>
</activation>
<properties>
<shade.phase.prop>package</shade.phase.prop>
</properties>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.datatable.StatMap;
import org.apache.pinot.common.metrics.ServerMetrics;
Expand Down Expand Up @@ -68,11 +69,13 @@
import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode;
import org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants.WorkerRequestMetadataKeys;
import org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants.WorkerResponseMetadataKeys;
import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration;
import org.apache.pinot.tsdb.spi.TimeBuckets;
import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
import org.apache.pinot.tsdb.spi.plan.serde.TimeSeriesPlanSerde;
import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactoryProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -146,6 +149,10 @@ public void init(PinotConfiguration config, InstanceDataManager instanceDataMana
} catch (Exception e) {
throw new RuntimeException(e);
}
if (StringUtils.isNotBlank(config.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey()))) {
PhysicalTimeSeriesPlanVisitor.INSTANCE.init(_leafQueryExecutor, _executorService, serverMetrics);
TimeSeriesBuilderFactoryProvider.init(config);
}

LOGGER.info("Initialized QueryRunner with hostname: {}, port: {}", hostname, port);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ public class RangeTimeSeriesRequest {
private final long _stepSeconds;
/** E2E timeout for the query. */
private final Duration _timeout;
/** Full query string to allow language implementations to pass custom parameters. */
private final String _fullQueryString;

public RangeTimeSeriesRequest(String language, String query, long startSeconds, long endSeconds, long stepSeconds,
Duration timeout) {
Duration timeout, String fullQueryString) {
Preconditions.checkState(endSeconds >= startSeconds, "Invalid range. startSeconds "
+ "should be greater than or equal to endSeconds. Found startSeconds=%s and endSeconds=%s",
startSeconds, endSeconds);
Expand All @@ -75,6 +77,7 @@ public RangeTimeSeriesRequest(String language, String query, long startSeconds,
_endSeconds = endSeconds;
_stepSeconds = stepSeconds;
_timeout = timeout;
_fullQueryString = fullQueryString;
}

public String getLanguage() {
Expand All @@ -100,4 +103,8 @@ public long getStepSeconds() {
public Duration getTimeout() {
return _timeout;
}

public String getFullQueryString() {
return _fullQueryString;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@


public class SimpleTimeSeriesBuilderFactory extends TimeSeriesBuilderFactory {
public static final SimpleTimeSeriesBuilderFactory INSTANCE = new SimpleTimeSeriesBuilderFactory();

private SimpleTimeSeriesBuilderFactory() {
public SimpleTimeSeriesBuilderFactory() {
super();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public static void init(PinotConfiguration pinotConfiguration) {
String seriesBuilderClass = pinotConfiguration
.getProperty(PinotTimeSeriesConfiguration.getSeriesBuilderFactoryConfigKey(language));
try {
Object untypedSeriesBuilderFactory = Class.forName(seriesBuilderClass).getConstructor().newInstance();
Class<?> klass = TimeSeriesBuilderFactoryProvider.class.getClassLoader().loadClass(seriesBuilderClass);
Object untypedSeriesBuilderFactory = klass.getConstructor().newInstance();
if (!(untypedSeriesBuilderFactory instanceof TimeSeriesBuilderFactory)) {
throw new RuntimeException("Series builder factory class " + seriesBuilderClass
+ " does not implement SeriesBuilderFactory");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ public List<String> types() {
@Override
protected Map<String, Object> getConfigOverrides() {
Map<String, Object> configs = new HashMap<>();
configs.put(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey(), "spql");
configs.put(PinotTimeSeriesConfiguration.getLogicalPlannerConfigKey("spql"),
"org.apache.pinot.tsdb.planner.logical.TimeSeriesLogicalPlanner");
configs.put(PinotTimeSeriesConfiguration.getSeriesBuilderFactoryConfigKey("spql"),
SimpleTimeSeriesBuilderFactory.class);
configs.put(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey(), "m3ql");
configs.put(PinotTimeSeriesConfiguration.getLogicalPlannerConfigKey("m3ql"),
"org.apache.pinot.tsdb.m3ql.M3TimeSeriesPlanner");
configs.put(PinotTimeSeriesConfiguration.getSeriesBuilderFactoryConfigKey("m3ql"),
SimpleTimeSeriesBuilderFactory.class.getName());
return configs;
}

Expand Down

0 comments on commit d7d3ccc

Please sign in to comment.