Skip to content

Commit

Permalink
Generate single JSON with all types of plans in the server (#1606)
Browse files Browse the repository at this point in the history
* Moved plan simplification to the server

* Small fix in plan generation

* Minor fix in scripting

* Don't print OLAP plans yet

* Fixed unused arg problem

* fixed a typo

* Simplifying a copy of the plan now

* Fixed minor bugs

* fixed a bug in ut_common

* Cannonized some tests, changed plan comparison
  • Loading branch information
pavelvelikhov authored Feb 7, 2024
1 parent b71a6fd commit 903def9
Show file tree
Hide file tree
Showing 14 changed files with 1,142 additions and 1,116 deletions.
222 changes: 220 additions & 2 deletions ydb/core/kqp/opt/kqp_query_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include <ydb/library/yql/utils/plan/plan_utils.h>
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>

#include <ydb/public/lib/ydb_cli/common/format.h>

#include <library/cpp/json/writer/json.h>
#include <library/cpp/json/json_reader.h>
#include <library/cpp/protobuf/json/proto2json.h>
Expand Down Expand Up @@ -1811,6 +1813,217 @@ void SetNonZero(NJson::TJsonValue& node, const TStringBuf& name, T value) {
}
}

void BuildPlanIndex(NJson::TJsonValue& plan, THashMap<int, NJson::TJsonValue>& planIndex, THashMap<TString, NJson::TJsonValue>& precomputes) {
if (plan.GetMapSafe().contains("PlanNodeId")){
auto id = plan.GetMapSafe().at("PlanNodeId").GetIntegerSafe();
planIndex[id] = plan;
}

if (plan.GetMapSafe().contains("Subplan Name")) {
const auto& precomputeName = plan.GetMapSafe().at("Subplan Name").GetStringSafe();

auto pos = precomputeName.find("precompute");
if (pos != TString::npos) {
precomputes[precomputeName.substr(pos)] = plan;
}
}

if (plan.GetMapSafe().contains("Plans")) {
for (auto p : plan.GetMapSafe().at("Plans").GetArraySafe()) {
BuildPlanIndex(p, planIndex, precomputes);
}
}
}

TVector<NJson::TJsonValue> RemoveRedundantNodes(NJson::TJsonValue& plan, const THashSet<TString>& redundantNodes) {
auto& planMap = plan.GetMapSafe();

TVector<NJson::TJsonValue> children;
if (planMap.contains("Plans") && planMap.at("Plans").IsArray()) {
for (auto& child : planMap.at("Plans").GetArraySafe()) {
auto newChildren = RemoveRedundantNodes(child, redundantNodes);
children.insert(children.end(), newChildren.begin(), newChildren.end());
}
}

planMap.erase("Plans");
if (!children.empty()) {
auto& plans = planMap["Plans"];
for (auto& child : children) {
plans.AppendValue(child);
}
}

const auto typeName = planMap.at("Node Type").GetStringSafe();
if (redundantNodes.contains(typeName) || typeName.find("Precompute") != TString::npos) {
return children;
}

return {plan};
}

NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,
int operatorIndex,
const THashMap<int, NJson::TJsonValue>& planIndex,
const THashMap<TString, NJson::TJsonValue>& precomputes,
int& nodeCounter) {

int currentNodeId = nodeCounter++;

NJson::TJsonValue result;
result["PlanNodeId"] = currentNodeId;
if (plan.GetMapSafe().contains("PlanNodeType")) {
result["PlanNodeType"] = plan.GetMapSafe().at("PlanNodeType").GetStringSafe();
}

if (plan.GetMapSafe().contains("Stats")) {
result["Stats"] = plan.GetMapSafe().at("Stats");
}

if (!plan.GetMapSafe().contains("Operators")) {
NJson::TJsonValue planInputs;

result["Node Type"] = plan.GetMapSafe().at("Node Type").GetStringSafe();

if (!plan.GetMapSafe().contains("Plans")) {
return result;
}

if (plan.GetMapSafe().at("Node Type") == "TableLookup") {
NJson::TJsonValue newOps;
NJson::TJsonValue op;

op["Name"] = "TableLookup";
op["Columns"] = plan.GetMapSafe().at("Columns");
op["LookupKeyColumns"] = plan.GetMapSafe().at("LookupKeyColumns");
op["Table"] = plan.GetMapSafe().at("Table");

newOps.AppendValue(op);

result["Operators"] = newOps;
return result;
}

for (auto p : plan.GetMapSafe().at("Plans").GetArraySafe()) {
if (p.GetMapSafe().at("Node Type").GetStringSafe().find("Precompute") == TString::npos) {
planInputs.AppendValue(ReconstructQueryPlanRec(p, 0, planIndex, precomputes, nodeCounter));
}
}
result["Plans"] = planInputs;
return result;
}

if (plan.GetMapSafe().contains("CTE Name") && plan.GetMapSafe().at("Node Type") == "ConstantExpr") {
auto precompute = plan.GetMapSafe().at("CTE Name").GetStringSafe();
if (!precomputes.contains(precompute)) {
result["Node Type"] = "ConstantExpr";
return result;
}
return ReconstructQueryPlanRec(precomputes.at(precompute), 0, planIndex, precomputes, nodeCounter);
}

auto ops = plan.GetMapSafe().at("Operators").GetArraySafe();
auto op = ops[operatorIndex];

TVector<NJson::TJsonValue> planInputs;

auto opName = op.GetMapSafe().at("Name").GetStringSafe();

for (auto opInput : op.GetMapSafe().at("Inputs").GetArraySafe()) {
if (opInput.GetMapSafe().contains("ExternalPlanNodeId")) {
auto inputPlanKey = opInput.GetMapSafe().at("ExternalPlanNodeId").GetIntegerSafe();
auto inputPlan = planIndex.at(inputPlanKey);
planInputs.push_back( ReconstructQueryPlanRec(inputPlan, 0, planIndex, precomputes, nodeCounter));
} else if (opInput.GetMapSafe().contains("InternalOperatorId")) {
auto inputPlanId = opInput.GetMapSafe().at("InternalOperatorId").GetIntegerSafe();
planInputs.push_back( ReconstructQueryPlanRec(plan, inputPlanId, planIndex, precomputes, nodeCounter));
}
// temp hack
if (opName == "Filter") {
break;
}
}

if (op.GetMapSafe().contains("Inputs")) {
op.GetMapSafe().erase("Inputs");
}

if (op.GetMapSafe().contains("Input") || op.GetMapSafe().contains("ToFlow")) {
TString maybePrecompute = "";
if (op.GetMapSafe().contains("Input")) {
maybePrecompute = op.GetMapSafe().at("Input").GetStringSafe();
} else if (op.GetMapSafe().contains("ToFlow")) {
maybePrecompute = op.GetMapSafe().at("ToFlow").GetStringSafe();
}

if (precomputes.contains(maybePrecompute)) {
planInputs.push_back(ReconstructQueryPlanRec(precomputes.at(maybePrecompute), 0, planIndex, precomputes, nodeCounter));
}
}

result["Node Type"] = opName;
NJson::TJsonValue newOps;
newOps.AppendValue(op);
result["Operators"] = newOps;

if (planInputs.size()){
NJson::TJsonValue plans;
for( auto i : planInputs) {
plans.AppendValue(i);
}
result["Plans"] = plans;
}

return result;
}

NJson::TJsonValue SimplifyQueryPlan(NJson::TJsonValue& plan) {
static const THashSet<TString> redundantNodes = {
"UnionAll",
"Broadcast",
"Map",
"HashShuffle",
"Merge",
"Collect",
"Stage",
"Iterator",
"PartitionByKey",
"ToFlow"
};

THashMap<int, NJson::TJsonValue> planIndex;
THashMap<TString, NJson::TJsonValue> precomputes;


BuildPlanIndex(plan, planIndex, precomputes);

int nodeCounter = 0;
plan = ReconstructQueryPlanRec(plan, 0, planIndex, precomputes, nodeCounter);
RemoveRedundantNodes(plan, redundantNodes);
return plan;
}

TString AddSimplifiedPlan(const TString& planText, bool analyzeMode) {
Y_UNUSED(analyzeMode);
NJson::TJsonValue planJson;
NJson::ReadJsonTree(planText, &planJson, true);
if (!planJson.GetMapSafe().contains("Plan")){
return planText;
}

NJson::TJsonValue planCopy;
NJson::ReadJsonTree(planText, &planCopy, true);

planJson["SimplifiedPlan"] = SimplifyQueryPlan(planCopy.GetMapSafe().at("Plan"));

// Don't print the OLAP plan yet, there are some non UTF-8 symbols there that need to be fixed
//TTempBufOutput stringStream;
//NYdb::NConsoleClient::TQueryPlanPrinter printer(NYdb::NConsoleClient::EOutputFormat::PrettyTable, analyzeMode, stringStream);
//printer.Print(planJson.GetStringRobust());
//planJson["OLAPText"] = stringStream.Data();
return planJson.GetStringRobust();
}

TString SerializeTxPlans(const TVector<const TString>& txPlans, const TString commonPlanInfo = "") {
NJsonWriter::TBuf writer;
writer.SetIndentSpaces(2);
Expand Down Expand Up @@ -1862,7 +2075,8 @@ TString SerializeTxPlans(const TVector<const TString>& txPlans, const TString co
writer.EndObject();
writer.EndObject();

return writer.Str();
auto resultPlan = writer.Str();
return AddSimplifiedPlan(resultPlan, false);
}

} // namespace
Expand Down Expand Up @@ -2250,7 +2464,8 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD

NJsonWriter::TBuf txWriter;
txWriter.WriteJsonValue(&root, true);
return txWriter.Str();
auto resultPlan = txWriter.Str();
return AddSimplifiedPlan(resultPlan, true);
}

TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats) {
Expand Down Expand Up @@ -2294,6 +2509,9 @@ TString SerializeScriptPlan(const TVector<const TString>& queryPlans) {
if (auto dqPlan = planMap.FindPtr("Plan")) {
writer.WriteKey("Plan");
writer.WriteJsonValue(dqPlan);
writer.WriteKey("SimplifiedPlan");
auto simplifiedPlan = SimplifyQueryPlan(*dqPlan);
writer.WriteJsonValue(&simplifiedPlan);
}
writer.EndObject();
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/common/kqp_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1170,7 +1170,7 @@ std::vector<NJson::TJsonValue> FindPlanNodes(const NJson::TJsonValue& plan, cons

std::vector<NJson::TJsonValue> FindPlanStages(const NJson::TJsonValue& plan) {
std::vector<NJson::TJsonValue> stages;
FindPlanStagesImpl(plan, stages);
FindPlanStagesImpl(plan.GetMapSafe().at("Plan"), stages);
return stages;
}

Expand Down
Loading

0 comments on commit 903def9

Please sign in to comment.