Skip to content

Commit

Permalink
colocated-without-hints: Adds the ability to enable/disable this opti…
Browse files Browse the repository at this point in the history
…mization at startup and query time
  • Loading branch information
gortiz committed Sep 9, 2024
1 parent 7877411 commit 400675a
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,14 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
Long timeoutMsFromQueryOption = QueryOptionsUtils.getTimeoutMs(queryOptions);
queryTimeoutMs = timeoutMsFromQueryOption != null ? timeoutMsFromQueryOption : _brokerTimeoutMs;
database = DatabaseUtils.extractDatabaseFromQueryRequest(queryOptions, httpHeaders);
QueryEnvironment queryEnvironment = new QueryEnvironment(database, _tableCache, _workerManager);
boolean useImplicitColocatedByDefault = _config.getProperty(CommonConstants.Broker.IMPLICIT_COLOCATE_JOIN,
CommonConstants.Broker.DEFAULT_IMPLICIT_COLOCATE_JOIN);
QueryEnvironment queryEnvironment = new QueryEnvironment(QueryEnvironment.configBuilder()
.database(database)
.tableCache(_tableCache)
.workerManager(_workerManager)
.useImplicitColocatedByDefault(useImplicitColocatedByDefault)
.build());
switch (sqlNodeAndOptions.getSqlNode().getKind()) {
case EXPLAIN:
queryPlanResult = queryEnvironment.explainQuery(query, sqlNodeAndOptions, requestId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,13 @@ public interface PartitionTableFinder {
*/
@Nullable
TablePartitionInfo getTablePartitionInfo(String tableNameWithType);

/**
* A partition table finder that always returns null, meaning that the table partition info is not found.
*/
static PartitionTableFinder disabled() {
return (table) -> null;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -69,9 +70,11 @@
import org.apache.pinot.query.routing.WorkerManager;
import org.apache.pinot.query.type.TypeFactory;
import org.apache.pinot.query.validate.BytesCastVisitor;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.apache.pinot.sql.parsers.parser.SqlPhysicalExplain;
import org.immutables.value.Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -80,7 +83,14 @@
* The {@code QueryEnvironment} contains the main entrypoint for query planning.
*
* <p>It provide the higher level entry interface to convert a SQL string into a {@link DispatchableSubPlan}.
* It is also used to execute some static analysis on the query like to determine if it can be compiled or get the
* tables involved in the query.
*/

//TODO: We should consider splitting this class in two: One that is used for parsing and one that is used for
// executing queries. This would allow us to remove the worker manager from the parsing environment and therefore
// make sure there is a worker manager when executing queries.
@Value.Enclosing
public class QueryEnvironment {
private static final Logger LOGGER = LoggerFactory.getLogger(QueryEnvironment.class);
private static final CalciteConnectionConfig CONNECTION_CONFIG;
Expand All @@ -95,27 +105,58 @@ public class QueryEnvironment {
private final FrameworkConfig _config;
private final CalciteCatalogReader _catalogReader;
private final HepProgram _optProgram;
private final HepProgram _traitProgram;

// Pinot extensions
private final TableCache _tableCache;
@Nullable
private final WorkerManager _workerManager;
private final Config _envConfig;

public QueryEnvironment(String database, TableCache tableCache, @Nullable WorkerManager workerManager) {
PinotCatalog catalog = new PinotCatalog(database, tableCache);
public QueryEnvironment(Config config) {
_envConfig = config;
String database = config.getDatabase();
PinotCatalog catalog = new PinotCatalog(database, config.getTableCache());
CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false, database, catalog);
_config = Frameworks.newConfigBuilder().traitDefs().operatorTable(PinotOperatorTable.instance())
.defaultSchema(rootSchema.plus()).sqlToRelConverterConfig(PinotRuleUtils.PINOT_SQL_TO_REL_CONFIG).build();
_catalogReader = new CalciteCatalogReader(rootSchema, List.of(database), _typeFactory, CONNECTION_CONFIG);
_optProgram = getOptProgram();
_traitProgram = getTraitProgram(workerManager);
_tableCache = tableCache;
_workerManager = workerManager;
}

private PlannerContext getPlannerContext() {
return new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram, _traitProgram);
public QueryEnvironment(String database, TableCache tableCache, @Nullable WorkerManager workerManager) {
this(configBuilder()
.database(database)
.tableCache(tableCache)
.workerManager(workerManager)
.build());
}

/**
* Returns a planner context that can be used to either parse, explain or execute a query.
*/
private PlannerContext getPlannerContext(SqlNodeAndOptions sqlNodeAndOptions) {
boolean useImplicitColocated;
PinotImplicitTableHintRule.PartitionTableFinder ptf;
String useImplicitColocatedOptionValue = sqlNodeAndOptions.getOptions()
.get(CommonConstants.Broker.Request.QueryOptionKey.IMPLICIT_COLOCATE_JOIN);
if (Boolean.parseBoolean(useImplicitColocatedOptionValue)) {
useImplicitColocated = true;
Objects.requireNonNull(_envConfig.getWorkerManager(), "WorkerManager is required for implicit colocated join");
ptf = _envConfig.getWorkerManager()::getTablePartitionInfo;
} else {
useImplicitColocated = _envConfig.useImplicitColocatedByDefault();
WorkerManager workerManager = _envConfig.getWorkerManager();
if (useImplicitColocated && workerManager != null) {
ptf = workerManager::getTablePartitionInfo;
} else {
ptf = PinotImplicitTableHintRule.PartitionTableFinder.disabled();
}
}
HepProgram traitProgram = getTraitProgram(ptf, useImplicitColocated);
return new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram, traitProgram);
}

/**
* Returns the planner context that should be used only for parsing queries.
*/
private PlannerContext getParsingPlannerContext() {
HepProgram traitProgram = getTraitProgram(PinotImplicitTableHintRule.PartitionTableFinder.disabled(), false);
return new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram, traitProgram);
}

/**
Expand All @@ -131,7 +172,7 @@ private PlannerContext getPlannerContext() {
* @return QueryPlannerResult containing the dispatchable query plan and the relRoot.
*/
public QueryPlannerResult planQuery(String sqlQuery, SqlNodeAndOptions sqlNodeAndOptions, long requestId) {
try (PlannerContext plannerContext = getPlannerContext()) {
try (PlannerContext plannerContext = getPlannerContext(sqlNodeAndOptions)) {
plannerContext.setOptions(sqlNodeAndOptions.getOptions());
RelRoot relRoot = compileQuery(sqlNodeAndOptions.getSqlNode(), plannerContext);
// TODO: current code only assume one SubPlan per query, but we should support multiple SubPlans per query.
Expand All @@ -146,6 +187,11 @@ public QueryPlannerResult planQuery(String sqlQuery, SqlNodeAndOptions sqlNodeAn
}
}

@VisibleForTesting
public DispatchableSubPlan planQuery(String sqlQuery) {
return planQuery(sqlQuery, CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery), 0).getQueryPlan();
}

/**
* Explain a SQL query.
*
Expand All @@ -159,7 +205,7 @@ public QueryPlannerResult planQuery(String sqlQuery, SqlNodeAndOptions sqlNodeAn
* @return QueryPlannerResult containing the explained query plan and the relRoot.
*/
public QueryPlannerResult explainQuery(String sqlQuery, SqlNodeAndOptions sqlNodeAndOptions, long requestId) {
try (PlannerContext plannerContext = getPlannerContext()) {
try (PlannerContext plannerContext = getPlannerContext(sqlNodeAndOptions)) {
SqlExplain explain = (SqlExplain) sqlNodeAndOptions.getSqlNode();
plannerContext.setOptions(sqlNodeAndOptions.getOptions());
RelRoot relRoot = compileQuery(explain.getExplicandum(), plannerContext);
Expand All @@ -181,18 +227,13 @@ public QueryPlannerResult explainQuery(String sqlQuery, SqlNodeAndOptions sqlNod
}
}

@VisibleForTesting
public DispatchableSubPlan planQuery(String sqlQuery) {
return planQuery(sqlQuery, CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery), 0).getQueryPlan();
}

@VisibleForTesting
public String explainQuery(String sqlQuery, long requestId) {
return explainQuery(sqlQuery, CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery), requestId).getExplainPlan();
}

public List<String> getTableNamesForQuery(String sqlQuery) {
try (PlannerContext plannerContext = getPlannerContext()) {
try (PlannerContext plannerContext = getParsingPlannerContext()) {
SqlNode sqlNode = CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery).getSqlNode();
if (sqlNode.getKind().equals(SqlKind.EXPLAIN)) {
sqlNode = ((SqlExplain) sqlNode).getExplicandum();
Expand All @@ -209,7 +250,7 @@ public List<String> getTableNamesForQuery(String sqlQuery) {
* Returns whether the query can be successfully compiled in this query environment
*/
public boolean canCompileQuery(String query) {
try (PlannerContext plannerContext = getPlannerContext()) {
try (PlannerContext plannerContext = getParsingPlannerContext()) {
SqlNode sqlNode = CalciteSqlParser.compileToSqlNodeAndOptions(query).getSqlNode();
if (sqlNode.getKind().equals(SqlKind.EXPLAIN)) {
sqlNode = ((SqlExplain) sqlNode).getExplicandum();
Expand Down Expand Up @@ -318,7 +359,7 @@ private RelNode optimize(RelRoot relRoot, PlannerContext plannerContext) {
private DispatchableSubPlan toDispatchableSubPlan(RelRoot relRoot, PlannerContext plannerContext, long requestId) {
SubPlan plan = PinotLogicalQueryPlanner.makePlan(relRoot);
PinotDispatchPlanner pinotDispatchPlanner =
new PinotDispatchPlanner(plannerContext, _workerManager, requestId, _tableCache);
new PinotDispatchPlanner(plannerContext, _envConfig.getWorkerManager(), requestId, _envConfig.getTableCache());
return pinotDispatchPlanner.createDispatchableSubPlan(plan);
}

Expand Down Expand Up @@ -350,7 +391,8 @@ private static HepProgram getOptProgram() {
return hepProgramBuilder.build();
}

private static HepProgram getTraitProgram(@Nullable WorkerManager workerManager) {
private static HepProgram getTraitProgram(
PinotImplicitTableHintRule.PartitionTableFinder tablePartitionTableFinder, boolean useImplicitColocated) {
HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();

// Set the match order as BOTTOM_UP.
Expand All @@ -363,12 +405,40 @@ private static HepProgram getTraitProgram(@Nullable WorkerManager workerManager)
}

// apply RelDistribution trait to all nodes
if (workerManager != null) {
hepProgramBuilder.addRuleInstance(
PinotImplicitTableHintRule.withPartitionTableFinder(workerManager::getTablePartitionInfo));
if (useImplicitColocated) {
hepProgramBuilder.addRuleInstance(PinotImplicitTableHintRule.withPartitionTableFinder(tablePartitionTableFinder));
}
hepProgramBuilder.addRuleInstance(PinotRelDistributionTraitRule.INSTANCE);

return hepProgramBuilder.build();
}

public static ImmutableQueryEnvironment.Config.Builder configBuilder() {
return ImmutableQueryEnvironment.Config.builder();
}

@Value.Immutable
public interface Config {
String getDatabase();

TableCache getTableCache();

/**
* Whether to use implicit colocated join by default.
*
* This is treated as the default value for the broker and it is expected to be obtained from a Pinot configuration.
* This default value can be always overridden at query level by the query option
* {@link CommonConstants.Broker.Request.QueryOptionKey#IMPLICIT_COLOCATE_JOIN}.
*/
boolean useImplicitColocatedByDefault();

/**
* Returns the worker manager.
*
* This is used whenever the query needs to be executed, but can be null when the QueryEnvironment will be used
* just to execute some static analysis on the query like parsing it or getting the tables involved in the query.
*/
@Nullable
WorkerManager getWorkerManager();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -426,11 +426,9 @@ private PartitionTableInfo getPartitionTableInfo(String tableName, String partit
return getRealtimePartitionTableInfo(realtimeTableName, partitionKey, numPartitions, partitionFunction);
}
TablePartitionInfo.PartitionInfo[] offlinePartitionInfoMap =
getTablePartitionInfo(offlineTableName, partitionKey, numPartitions,
partitionFunction).getPartitionInfoMap();
calculateTablePartitionInfoMap(offlineTableName, partitionKey, numPartitions, partitionFunction);
TablePartitionInfo.PartitionInfo[] realtimePartitionInfoMap =
getTablePartitionInfo(realtimeTableName, partitionKey, numPartitions,
partitionFunction).getPartitionInfoMap();
calculateTablePartitionInfoMap(realtimeTableName, partitionKey, numPartitions, partitionFunction);
PartitionInfo[] partitionInfoMap = new PartitionInfo[numPartitions];
for (int i = 0; i < numPartitions; i++) {
TablePartitionInfo.PartitionInfo offlinePartitionInfo = offlinePartitionInfoMap[i];
Expand Down Expand Up @@ -473,8 +471,12 @@ private PartitionTableInfo getPartitionTableInfo(String tableName, String partit
}
}

private TablePartitionInfo getTablePartitionInfo(String tableNameWithType, String partitionKey, int numPartitions,
String partitionFunction) {
/**
* Returns the partition info map for the given table name with type, and checks whether the partition key, number of
* partitions and partition function match the values expected by the routing manager.
*/
private TablePartitionInfo.PartitionInfo[] calculateTablePartitionInfoMap(
String tableNameWithType, String partitionKey, int numPartitions, String partitionFunction) {
TablePartitionInfo tablePartitionInfo = _routingManager.getTablePartitionInfo(tableNameWithType);
Preconditions.checkState(tablePartitionInfo != null, "Failed to find table partition info for table: %s",
tableNameWithType);
Expand All @@ -490,13 +492,13 @@ private TablePartitionInfo getTablePartitionInfo(String tableNameWithType, Strin
Preconditions.checkState(tablePartitionInfo.getSegmentsWithInvalidPartition().isEmpty(),
"Find %s segments with invalid partition for table: %s",
tablePartitionInfo.getSegmentsWithInvalidPartition().size(), tableNameWithType);
return tablePartitionInfo;
return tablePartitionInfo.getPartitionInfoMap();
}

private PartitionTableInfo getOfflinePartitionTableInfo(String offlineTableName, String partitionKey,
int numPartitions, String partitionFunction) {
TablePartitionInfo.PartitionInfo[] tablePartitionInfoArr =
getTablePartitionInfo(offlineTableName, partitionKey, numPartitions, partitionFunction).getPartitionInfoMap();
calculateTablePartitionInfoMap(offlineTableName, partitionKey, numPartitions, partitionFunction);
PartitionInfo[] partitionInfoMap = new PartitionInfo[numPartitions];
for (int i = 0; i < numPartitions; i++) {
TablePartitionInfo.PartitionInfo partitionInfo = tablePartitionInfoArr[i];
Expand All @@ -511,7 +513,7 @@ private PartitionTableInfo getOfflinePartitionTableInfo(String offlineTableName,
private PartitionTableInfo getRealtimePartitionTableInfo(String realtimeTableName, String partitionKey,
int numPartitions, String partitionFunction) {
TablePartitionInfo.PartitionInfo[] tablePartitionInfoArr =
getTablePartitionInfo(realtimeTableName, partitionKey, numPartitions, partitionFunction).getPartitionInfoMap();
calculateTablePartitionInfoMap(realtimeTableName, partitionKey, numPartitions, partitionFunction);
PartitionInfo[] partitionInfoMap = new PartitionInfo[numPartitions];
for (int i = 0; i < numPartitions; i++) {
TablePartitionInfo.PartitionInfo partitionInfo = tablePartitionInfoArr[i];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ protected QueryEnvironment.QueryPlannerResult planQuery(String sql) {
protected QueryDispatcher.QueryResult queryRunner(String sql, boolean trace) {
long requestId = REQUEST_ID_GEN.getAndIncrement();
SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(sql);
QueryEnvironment.QueryPlannerResult queryPlannerResult =
_queryEnvironment.planQuery(sql, sqlNodeAndOptions, requestId);
QueryEnvironment.QueryPlannerResult queryPlannerResult = _queryEnvironment.planQuery(sql, sqlNodeAndOptions,
requestId);
DispatchableSubPlan dispatchableSubPlan = queryPlannerResult.getQueryPlan();
Map<String, String> requestMetadataMap = new HashMap<>();
requestMetadataMap.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, String.valueOf(requestId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,8 @@ public static class Broker {
public static final String CONFIG_OF_ENABLE_PARTITION_METADATA_MANAGER =
"pinot.broker.enable.partition.metadata.manager";
public static final boolean DEFAULT_ENABLE_PARTITION_METADATA_MANAGER = false;
public static final String IMPLICIT_COLOCATE_JOIN = "pinot.broker.multistage.implicit.colocate";
public static final boolean DEFAULT_IMPLICIT_COLOCATE_JOIN = false;

public static final String CONFIG_OF_USE_FIXED_REPLICA = "pinot.broker.use.fixed.replica";
public static final boolean DEFAULT_USE_FIXED_REPLICA = false;
Expand Down Expand Up @@ -386,6 +388,7 @@ public static class QueryOptionKey {
public static final String USE_FIXED_REPLICA = "useFixedReplica";
public static final String EXPLAIN_PLAN_VERBOSE = "explainPlanVerbose";
public static final String USE_MULTISTAGE_ENGINE = "useMultistageEngine";
public static final String IMPLICIT_COLOCATE_JOIN = "implicitColocateJoin";
public static final String ENABLE_NULL_HANDLING = "enableNullHandling";

// Can be applied to aggregation and group-by queries to ask servers to directly return final results instead of
Expand Down

0 comments on commit 400675a

Please sign in to comment.