diff --git a/velox/exec/fuzzer/JoinFuzzer.cpp b/velox/exec/fuzzer/JoinFuzzer.cpp index 5218a9ff4db1..fc4e76ee57cd 100644 --- a/velox/exec/fuzzer/JoinFuzzer.cpp +++ b/velox/exec/fuzzer/JoinFuzzer.cpp @@ -142,7 +142,8 @@ class JoinFuzzer { const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput, - const std::vector& outputColumns); + const std::vector& outputColumns, + const std::string filter); JoinFuzzer::PlanWithSplits makeMergeJoinPlan( core::JoinType joinType, @@ -150,7 +151,8 @@ class JoinFuzzer { const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput, - const std::vector& outputColumns); + const std::vector& outputColumns, + const std::string filter); // Returns a PlanWithSplits for NestedLoopJoin with inputs from Values nodes. // If withFilter is true, uses the equality filter between probeKeys and @@ -162,7 +164,7 @@ class JoinFuzzer { const std::vector& probeInput, const std::vector& buildInput, const std::vector& outputColumns, - bool withFilter = true); + const std::string filter); // Makes the default query plan with table scan as inputs for both probe and // build sides. @@ -175,7 +177,8 @@ class JoinFuzzer { const std::vector& buildKeys, const std::vector& probeSplits, const std::vector& buildSplits, - const std::vector& outputColumns); + const std::vector& outputColumns, + const std::string filter); JoinFuzzer::PlanWithSplits makeMergeJoinPlanWithTableScan( core::JoinType joinType, @@ -185,7 +188,8 @@ class JoinFuzzer { const std::vector& buildKeys, const std::vector& probeSplits, const std::vector& buildSplits, - const std::vector& outputColumns); + const std::vector& outputColumns, + const std::string filter); // Returns a PlanWithSplits for NestedLoopJoin with inputs from TableScan // nodes. If withFilter is true, uses the equiality filter between probeKeys @@ -199,13 +203,14 @@ class JoinFuzzer { const std::vector& probeSplits, const std::vector& buildSplits, const std::vector& outputColumns, - bool withFilter = true); + const std::string filter); void makeAlternativePlans( const core::PlanNodePtr& plan, const std::vector& probeInput, const std::vector& buildInput, - std::vector& plans); + std::vector& plans, + const std::string filter); // Makes the query plan from 'planWithTableScan' with grouped execution mode. // Correspondingly, it replaces the table scan input splits with grouped ones. @@ -249,7 +254,8 @@ class JoinFuzzer { const std::vector& probeInput, const std::vector& buildInput, const std::vector& outputColumns, - std::vector& altPlans); + std::vector& altPlans, + const std::string filter = ""); // Splits the input into groups by partitioning on the join keys. std::vector> splitInputByGroup( @@ -688,7 +694,8 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlan( const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput, - const std::vector& outputColumns) { + const std::vector& outputColumns, + const std::string filter) { auto planNodeIdGenerator = std::make_shared(); auto plan = PlanBuilder(planNodeIdGenerator) @@ -697,7 +704,7 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlan( probeKeys, buildKeys, PlanBuilder(planNodeIdGenerator).values(buildInput).planNode(), - /*filter=*/"", + filter, outputColumns, joinType, nullAware) @@ -714,7 +721,8 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlanWithTableScan( const std::vector& buildKeys, const std::vector& probeSplits, const std::vector& buildSplits, - const std::vector& outputColumns) { + const std::vector& outputColumns, + const std::string filter) { auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId probeScanId; core::PlanNodeId buildScanId; @@ -728,7 +736,7 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlanWithTableScan( .tableScan(buildType) .capturePlanNodeId(buildScanId) .planNode(), - /*filter=*/"", + filter, outputColumns, joinType, nullAware) @@ -819,7 +827,8 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlan( const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput, - const std::vector& outputColumns) { + const std::vector& outputColumns, + const std::string filter) { auto planNodeIdGenerator = std::make_shared(); return JoinFuzzer::PlanWithSplits{PlanBuilder(planNodeIdGenerator) .values(probeInput) @@ -831,7 +840,7 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlan( .values(buildInput) .orderBy(buildKeys, false) .planNode(), - /*filter=*/"", + filter, outputColumns, joinType) .planNode()}; @@ -844,10 +853,8 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlan( const std::vector& probeInput, const std::vector& buildInput, const std::vector& outputColumns, - bool withFilter) { + const std::string filter) { auto planNodeIdGenerator = std::make_shared(); - const std::string filter = - withFilter ? makeJoinFilter(probeKeys, buildKeys) : ""; return JoinFuzzer::PlanWithSplits{ PlanBuilder(planNodeIdGenerator) .values(probeInput) @@ -863,7 +870,8 @@ void JoinFuzzer::makeAlternativePlans( const core::PlanNodePtr& plan, const std::vector& probeInput, const std::vector& buildInput, - std::vector& plans) { + std::vector& plans, + const std::string filter) { auto joinNode = std::dynamic_pointer_cast(plan); VELOX_CHECK_NOT_NULL(joinNode); @@ -888,7 +896,7 @@ void JoinFuzzer::makeAlternativePlans( .localPartitionRoundRobin( makeSources(buildInput, planNodeIdGenerator)) .planNode(), - /*filter=*/"", + filter, outputColumns, joinType, joinNode->isNullAware()) @@ -897,7 +905,13 @@ void JoinFuzzer::makeAlternativePlans( // Use OrderBy + MergeJoin if (core::MergeJoinNode::isSupported(joinNode->joinType())) { auto planWithSplits = makeMergeJoinPlan( - joinType, probeKeys, buildKeys, probeInput, buildInput, outputColumns); + joinType, + probeKeys, + buildKeys, + probeInput, + buildInput, + outputColumns, + filter); plans.push_back(planWithSplits); addFlippedJoinPlan(planWithSplits.plan, plans); @@ -906,7 +920,13 @@ void JoinFuzzer::makeAlternativePlans( // Use NestedLoopJoin. if (core::NestedLoopJoinNode::isSupported(joinNode->joinType())) { auto planWithSplits = makeNestedLoopJoinPlan( - joinType, probeKeys, buildKeys, probeInput, buildInput, outputColumns); + joinType, + probeKeys, + buildKeys, + probeInput, + buildInput, + outputColumns, + filter); plans.push_back(planWithSplits); addFlippedJoinPlan(planWithSplits.plan, plans); @@ -957,7 +977,7 @@ RowVectorPtr JoinFuzzer::testCrossProduct( probeInput, buildInput, outputColumns, - /*withFilter*/ false); + /*filter=*/""); const auto expected = execute(plan, /*injectSpill=*/false); // If OOM injection is not enabled verify the results against Reference query @@ -992,7 +1012,7 @@ RowVectorPtr JoinFuzzer::testCrossProduct( probeScanSplits, buildScanSplits, outputColumns, - /*withFilter*/ false)); + /*filter=*/"")); } addFlippedJoinPlan(plan.plan, altPlans); @@ -1014,10 +1034,30 @@ void JoinFuzzer::verify(core::JoinType joinType) { const auto numKeys = nullAware ? 1 : randInt(1, 5); // Pick number and types of join keys. - const std::vector keyTypes = generateJoinKeyTypes(numKeys); + std::vector keyTypes = generateJoinKeyTypes(numKeys); + std::string semiFilter; + // Add boolean/integer semi-filter 10% of the time. + if (vectorFuzzer_.coinToss(0.1)) { + if (vectorFuzzer_.coinToss(0.5)) { + keyTypes.push_back(BOOLEAN()); + semiFilter = vectorFuzzer_.coinToss(0.5) + ? fmt::format("t{} = true", keyTypes.size() - 1) + : fmt::format("u{} = true", keyTypes.size() - 1); + } else { + keyTypes.push_back(INTEGER()); + semiFilter = vectorFuzzer_.coinToss(0.5) + ? fmt::format("t{} % {} = 0", keyTypes.size() - 1, randInt(1, 9)) + : fmt::format("u{} % {} = 0", keyTypes.size() - 1, randInt(1, 9)); + } + } std::vector probeKeys = makeNames("t", keyTypes.size()); std::vector buildKeys = makeNames("u", keyTypes.size()); + const std::string filter = semiFilter.empty() + ? makeJoinFilter(probeKeys, buildKeys) + : fmt::format( + "{} AND {}", makeJoinFilter(probeKeys, buildKeys), semiFilter); + auto probeInput = generateProbeInput(probeKeys, keyTypes); auto buildInput = generateBuildInput(probeInput, probeKeys, buildKeys); @@ -1094,7 +1134,8 @@ void JoinFuzzer::verify(core::JoinType joinType) { buildKeys, probeInput, buildInput, - outputColumns); + outputColumns, + filter); const auto expected = execute(defaultPlan, /*injectSpill=*/false); @@ -1123,11 +1164,13 @@ void JoinFuzzer::verify(core::JoinType joinType) { buildKeys, flatProbeInput, flatBuildInput, - outputColumns)); + outputColumns, + filter)); - makeAlternativePlans(defaultPlan.plan, probeInput, buildInput, altPlans); makeAlternativePlans( - defaultPlan.plan, flatProbeInput, flatBuildInput, altPlans); + defaultPlan.plan, probeInput, buildInput, altPlans, filter); + makeAlternativePlans( + defaultPlan.plan, flatProbeInput, flatBuildInput, altPlans, filter); addPlansWithTableScan( tableScanDir->getPath(), @@ -1138,7 +1181,8 @@ void JoinFuzzer::verify(core::JoinType joinType) { flatProbeInput, flatBuildInput, outputColumns, - altPlans); + altPlans, + filter); for (auto i = 0; i < altPlans.size(); ++i) { LOG(INFO) << "Testing plan #" << i; @@ -1190,7 +1234,8 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlanWithTableScan( const std::vector& buildKeys, const std::vector& probeSplits, const std::vector& buildSplits, - const std::vector& outputColumns) { + const std::vector& outputColumns, + const std::string filter) { auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId probeScanId; core::PlanNodeId buildScanId; @@ -1208,7 +1253,7 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlanWithTableScan( .capturePlanNodeId(buildScanId) .orderBy(buildKeys, false) .planNode(), - /*filter=*/"", + filter, outputColumns, joinType) .planNode(), @@ -1226,13 +1271,11 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlanWithTableScan( const std::vector& probeSplits, const std::vector& buildSplits, const std::vector& outputColumns, - bool withFilter) { + const std::string filter) { auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId probeScanId; core::PlanNodeId buildScanId; - const std::string filter = - withFilter ? makeJoinFilter(probeKeys, buildKeys) : ""; return JoinFuzzer::PlanWithSplits{ PlanBuilder(planNodeIdGenerator) .tableScan(probeType) @@ -1260,7 +1303,8 @@ void JoinFuzzer::addPlansWithTableScan( const std::vector& probeInput, const std::vector& buildInput, const std::vector& outputColumns, - std::vector& altPlans) { + std::vector& altPlans, + const std::string filter) { VELOX_CHECK(!tableDir.empty()); if (!isTableScanSupported(probeInput[0]->type()) || @@ -1286,7 +1330,8 @@ void JoinFuzzer::addPlansWithTableScan( buildKeys, probeScanSplits, buildScanSplits, - outputColumns); + outputColumns, + filter); plansWithTableScan.push_back(defaultPlan); auto joinNode = @@ -1336,7 +1381,8 @@ void JoinFuzzer::addPlansWithTableScan( buildKeys, probeScanSplits, buildScanSplits, - outputColumns); + outputColumns, + filter); altPlans.push_back(planWithSplits); addFlippedJoinPlan( @@ -1358,7 +1404,8 @@ void JoinFuzzer::addPlansWithTableScan( buildKeys, probeScanSplits, buildScanSplits, - outputColumns); + outputColumns, + filter); altPlans.push_back(planWithSplits); addFlippedJoinPlan(