Skip to content

Commit

Permalink
refactor(sqlplannertest): further split test cases, support specify r…
Browse files Browse the repository at this point in the history
…ules enabled (#190)

* Split TPC-H test cases
* Add the test case that causes cycles

Signed-off-by: Alex Chi <iskyzh@gmail.com>
  • Loading branch information
skyzh authored Oct 26, 2024
1 parent 9633315 commit d73c26c
Show file tree
Hide file tree
Showing 22 changed files with 3,583 additions and 2,889 deletions.
6 changes: 3 additions & 3 deletions datafusion-optd-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ struct Args {
maxrows: MaxRows,

#[clap(long, help = "Turn on datafusion logical optimizer before optd")]
enable_logical: bool,
enable_df_logical: bool,

#[clap(long, help = "Turn on adaptive optimization")]
enable_adaptive: bool,
Expand All @@ -164,7 +164,7 @@ pub async fn main() -> Result<()> {

let mut session_config = SessionConfig::from_env()?.with_information_schema(true);

if !args.enable_logical {
if !args.enable_df_logical {
session_config.options_mut().optimizer.max_passes = 0;
}

Expand Down Expand Up @@ -198,7 +198,7 @@ pub async fn main() -> Result<()> {
let mut ctx = {
let mut state =
SessionState::new_with_config_rt(session_config.clone(), Arc::new(runtime_env));
if !args.enable_logical {
if !args.enable_df_logical {
// clean up optimizer rules so that we can plug in our own optimizer
state = state.with_optimizer_rules(vec![]);
state = state.with_physical_optimizer_rules(vec![]);
Expand Down
2 changes: 1 addition & 1 deletion datafusion-optd-cli/tests/cli_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ fn cli_test_tpch() {
let mut cmd = Command::cargo_bin("datafusion-optd-cli").unwrap();
cmd.current_dir(".."); // all paths in `test.sql` assume we're in the base dir of the repo
cmd.args([
"--enable-logical",
"--enable-df-logical",
"--file",
"datafusion-optd-cli/tpch-sf0_01/test.sql",
]);
Expand Down
4 changes: 2 additions & 2 deletions datafusion-optd-cli/tpch-sf0_01/simple_manual_test.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- This is just used if you want to run really simple manual tests on the CLI. Feel free to delete the whole thing and write your own manual tests
-- Command: `cargo run --bin datafusion-optd-cli -- --enable-logical -f datafusion-optd-cli/tpch-sf0_01/simple_manual_test.sql`
-- Command: `cargo run --bin datafusion-optd-cli -- --enable-df-logical -f datafusion-optd-cli/tpch-sf0_01/simple_manual_test.sql`
CREATE TABLE NATION (
N_NATIONKEY INT NOT NULL,
N_NAME CHAR(25) NOT NULL,
Expand All @@ -10,4 +10,4 @@ CREATE TABLE NATION (
CREATE EXTERNAL TABLE nation_tbl STORED AS CSV DELIMITER '|' LOCATION 'datafusion-optd-cli/tpch-sf0_01/nation.tbl';
insert into nation select column_1, column_2, column_3, column_4 from nation_tbl;

SELECT * FROM nation where nation.n_nationkey = 1 OR nation.n_nationkey = 2 OR nation.n_nationkey = 5;
SELECT * FROM nation where nation.n_nationkey = 1 OR nation.n_nationkey = 2 OR nation.n_nationkey = 5;
13 changes: 11 additions & 2 deletions optd-core/src/cascades/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct OptimizerContext {

#[derive(Default, Clone, Debug)]
pub struct OptimizerProperties {
pub partial_explore_temporarily_disabled: bool,
/// If the number of rules applied exceeds this number, we stop applying logical rules.
pub partial_explore_iter: Option<usize>,
/// Plan space can be expanded by this number of times before we stop applying logical rules.
Expand Down Expand Up @@ -86,6 +87,14 @@ impl<T: RelNodeTyp> CascadesOptimizer<T> {
Self::new_with_prop(rules, cost, property_builders, Default::default())
}

pub fn disable_explore_limit(&mut self) {
self.prop.partial_explore_temporarily_disabled = true;
}

pub fn enable_explore_limit(&mut self) {
self.prop.partial_explore_temporarily_disabled = false;
}

pub fn new_with_prop(
rules: Vec<Arc<RuleWrapper<T, Self>>>,
cost: Box<dyn CostModel<T>>,
Expand Down Expand Up @@ -113,7 +122,7 @@ impl<T: RelNodeTyp> CascadesOptimizer<T> {
self.cost.clone()
}

pub(super) fn rules(&self) -> Arc<[Arc<RuleWrapper<T, Self>>]> {
pub fn rules(&self) -> Arc<[Arc<RuleWrapper<T, Self>>]> {
self.rules.clone()
}

Expand Down Expand Up @@ -229,7 +238,7 @@ impl<T: RelNodeTyp> CascadesOptimizer<T> {
let new_tasks = task.execute(self)?;
self.tasks.extend(new_tasks);
iter += 1;
if !self.ctx.budget_used {
if !self.ctx.budget_used && !self.prop.partial_explore_temporarily_disabled {
let plan_space = self.memo.compute_plan_space();
if let Some(partial_explore_space) = self.prop.partial_explore_space {
if plan_space - plan_space_begin > partial_explore_space {
Expand Down
1 change: 1 addition & 0 deletions optd-datafusion-repr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ impl DatafusionOptimizer {
Box::new(ColumnRefPropertyBuilder::new(catalog.clone())),
],
OptimizerProperties {
partial_explore_temporarily_disabled: false,
partial_explore_iter: Some(1 << 20),
partial_explore_space: Some(1 << 10),
},
Expand Down
1 change: 1 addition & 0 deletions optd-sqlplannertest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ The `explain` and `execute` task will be run with datafusion's logical optimizer
| -------------- | --------------------------------------- |
| use_df_logical | Enable Datafusion's logical optimizer |
| verbose | Display estimated cost in physical plan |
| logical_rules | Only enable these logical rules |

Currently we have the following options for the explain task:

Expand Down
124 changes: 99 additions & 25 deletions optd-sqlplannertest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,44 +13,49 @@ use optd_datafusion_bridge::{DatafusionCatalog, OptdQueryPlanner};
use optd_datafusion_repr::cost::BaseTableStats;
use optd_datafusion_repr::DatafusionOptimizer;
use regex::Regex;
use std::collections::HashSet;
use std::sync::Arc;

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;

use anyhow::{Context, Result};
use anyhow::{bail, Result};
use async_trait::async_trait;

#[derive(Default)]
pub struct DatafusionDBMS {
ctx: SessionContext,
/// Context enabling datafusion's logical optimizer.
use_df_logical_ctx: SessionContext,
/// Shared optd optimizer (for tweaking config)
optd_optimizer: Option<Arc<OptdQueryPlanner>>,
}

impl DatafusionDBMS {
pub async fn new() -> Result<Self> {
let ctx = DatafusionDBMS::new_session_ctx(false, None).await?;
let use_df_logical_ctx =
let (ctx, optd_optimizer) = DatafusionDBMS::new_session_ctx(false, None).await?;
let (use_df_logical_ctx, _) =
DatafusionDBMS::new_session_ctx(true, Some(ctx.state().catalog_list().clone())).await?;
Ok(Self {
ctx,
use_df_logical_ctx,
optd_optimizer: Some(optd_optimizer),
})
}

/// Creates a new session context. If the `use_df_logical` flag is set, datafusion's logical optimizer will be used.
async fn new_session_ctx(
use_df_logical: bool,
catalog: Option<Arc<dyn CatalogList>>,
) -> Result<SessionContext> {
) -> Result<(SessionContext, Arc<OptdQueryPlanner>)> {
let mut session_config = SessionConfig::from_env()?.with_information_schema(true);
if !use_df_logical {
session_config.options_mut().optimizer.max_passes = 0;
}

let rn_config = RuntimeConfig::new();
let runtime_env = RuntimeEnv::new(rn_config.clone())?;
let optd_optimizer;

let ctx = {
let mut state = if let Some(catalog) = catalog {
Expand All @@ -73,20 +78,63 @@ impl DatafusionDBMS {
}
state = state.with_physical_optimizer_rules(vec![]);
// use optd-bridge query planner
state = state.with_query_planner(Arc::new(OptdQueryPlanner::new(optimizer)));
optd_optimizer = Arc::new(OptdQueryPlanner::new(optimizer));
state = state.with_query_planner(optd_optimizer.clone());
SessionContext::new_with_state(state)
};
ctx.refresh_catalogs().await?;
Ok(ctx)
Ok((ctx, optd_optimizer))
}

pub async fn execute(&self, sql: &str, use_df_logical: bool) -> Result<Vec<Vec<String>>> {
pub(crate) async fn execute(&self, sql: &str, flags: &TestFlags) -> Result<Vec<Vec<String>>> {
{
let mut guard = self
.optd_optimizer
.as_ref()
.unwrap()
.optimizer
.lock()
.unwrap();
let optimizer = guard.as_mut().unwrap().optd_optimizer_mut();
if flags.disable_explore_limit {
optimizer.disable_explore_limit();
} else {
optimizer.enable_explore_limit();
}
let rules = optimizer.rules();
if flags.enable_logical_rules.is_empty() {
for r in 0..rules.len() {
optimizer.enable_rule(r);
}
} else {
for (rule_id, rule) in rules.as_ref().iter().enumerate() {
if rule.rule.is_impl_rule() {
optimizer.enable_rule(rule_id);
} else {
optimizer.disable_rule(rule_id);
}
}
let mut rules_to_enable = flags
.enable_logical_rules
.iter()
.map(|x| x.as_str())
.collect::<HashSet<_>>();
for (rule_id, rule) in rules.as_ref().iter().enumerate() {
if rules_to_enable.remove(rule.rule.name()) {
optimizer.enable_rule(rule_id);
}
}
if !rules_to_enable.is_empty() {
bail!("Unknown logical rule: {:?}", rules_to_enable);
}
}
}
let sql = unescape_input(sql)?;
let dialect = Box::new(GenericDialect);
let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
let mut result = Vec::new();
for statement in statements {
let df = if use_df_logical {
let df = if flags.enable_df_logical {
let plan = self
.use_df_logical_ctx
.state()
Expand All @@ -95,6 +143,7 @@ impl DatafusionDBMS {
self.use_df_logical_ctx.execute_logical_plan(plan).await?
} else {
let plan = self.ctx.state().statement_to_plan(statement).await?;

self.ctx.execute_logical_plan(plan).await?
};

Expand Down Expand Up @@ -123,10 +172,12 @@ impl DatafusionDBMS {
}

/// Executes the `execute` task.
async fn task_execute(&mut self, r: &mut String, sql: &str, flags: &[String]) -> Result<()> {
async fn task_execute(&mut self, r: &mut String, sql: &str, flags: &TestFlags) -> Result<()> {
use std::fmt::Write;
let use_df_logical = flags.contains(&"use_df_logical".to_string());
let result = self.execute(sql, use_df_logical).await?;
if flags.verbose {
bail!("Verbose flag is not supported for execute task");
}
let result = self.execute(sql, flags).await?;
writeln!(r, "{}", result.into_iter().map(|x| x.join(" ")).join("\n"))?;
writeln!(r)?;
Ok(())
Expand All @@ -138,19 +189,18 @@ impl DatafusionDBMS {
r: &mut String,
sql: &str,
task: &str,
flags: &[String],
flags: &TestFlags,
) -> Result<()> {
use std::fmt::Write;

let use_df_logical = flags.contains(&"use_df_logical".to_string());
let verbose = flags.contains(&"verbose".to_string());
let verbose = flags.verbose;
let explain_sql = if verbose {
format!("explain verbose {}", &sql)
} else {
format!("explain {}", &sql)
};
let result = self.execute(&explain_sql, use_df_logical).await?;
let subtask_start_pos = task.find(':').unwrap() + 1;
let result = self.execute(&explain_sql, flags).await?;
let subtask_start_pos = task.rfind(':').unwrap() + 1;
for subtask in task[subtask_start_pos..].split(',') {
let subtask = subtask.trim();
if subtask == "logical_datafusion" {
Expand All @@ -163,7 +213,7 @@ impl DatafusionDBMS {
.map(|x| &x[1])
.unwrap()
)?;
} else if subtask == "logical_optd_heuristic" {
} else if subtask == "logical_optd_heuristic" || subtask == "optimized_logical_optd" {
writeln!(
r,
"{}",
Expand Down Expand Up @@ -225,6 +275,8 @@ impl DatafusionDBMS {
.map(|x| &x[1])
.unwrap()
)?;
} else {
bail!("Unknown subtask: {}", subtask);
}
}

Expand All @@ -235,10 +287,8 @@ impl DatafusionDBMS {
#[async_trait]
impl sqlplannertest::PlannerTestRunner for DatafusionDBMS {
async fn run(&mut self, test_case: &sqlplannertest::ParsedTestCase) -> Result<String> {
for before in &test_case.before_sql {
self.execute(before, true)
.await
.context("before execution error")?;
if !test_case.before_sql.is_empty() {
panic!("before is not supported in optd-sqlplannertest, always specify the task type to run");
}

let mut result = String::new();
Expand All @@ -259,18 +309,42 @@ lazy_static! {
static ref FLAGS_REGEX: Regex = Regex::new(r"\[(.*)\]").unwrap();
}

#[derive(Default, Debug)]
struct TestFlags {
verbose: bool,
enable_df_logical: bool,
enable_logical_rules: Vec<String>,
disable_explore_limit: bool,
}

/// Extract the flags from a task. The flags are specified in square brackets.
/// For example, the flags for the task `explain[use_df_logical, verbose]` are `["use_df_logical", "verbose"]`.
fn extract_flags(task: &str) -> Result<Vec<String>> {
fn extract_flags(task: &str) -> Result<TestFlags> {
if let Some(captures) = FLAGS_REGEX.captures(task) {
Ok(captures
let flags = captures
.get(1)
.unwrap()
.as_str()
.split(',')
.map(|x| x.trim().to_string())
.collect())
.collect_vec();
let mut options = TestFlags::default();
for flag in flags {
if flag == "verbose" {
options.verbose = true;
} else if flag == "use_df_logical" {
options.enable_df_logical = true;
} else if flag.starts_with("logical_rules") {
options.enable_logical_rules =
flag.split('+').skip(1).map(|x| x.to_string()).collect();
} else if flag == "disable_explore_limit" {
options.disable_explore_limit = true;
} else {
bail!("Unknown flag: {}", flag);
}
}
Ok(options)
} else {
Ok(vec![])
Ok(TestFlags::default())
}
}
35 changes: 35 additions & 0 deletions optd-sqlplannertest/tests/pushdowns/fliter_transpose.planner.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
-- (no id or description)
create table t1(t1v1 int, t1v2 int);
create table t2(t2v1 int, t2v3 int);
insert into t1 values (0, 0), (1, 1), (2, 2);
insert into t2 values (0, 200), (1, 201), (2, 202);

/*
3
3
*/

-- Test whether we can transpose filter and projection
SELECT t1.t1v1, t1.t1v2, t2.t2v3
FROM t1, t2
WHERE t1.t1v1 = t2.t2v1;

/*
LogicalProjection { exprs: [ #0, #1, #3 ] }
└── LogicalFilter
├── cond:Eq
│ ├── #0
│ └── #2
└── LogicalJoin { join_type: Cross, cond: true }
├── LogicalScan { table: t1 }
└── LogicalScan { table: t2 }
PhysicalProjection { exprs: [ #0, #1, #3 ] }
└── PhysicalFilter
├── cond:Eq
│ ├── #0
│ └── #2
└── PhysicalNestedLoopJoin { join_type: Cross, cond: true }
├── PhysicalScan { table: t1 }
└── PhysicalScan { table: t2 }
*/

Loading

0 comments on commit d73c26c

Please sign in to comment.