diff --git a/datafusion-optd-cli/tests/cli_integration.rs b/datafusion-optd-cli/tests/cli_integration.rs index 27145df7..cd2a5fbd 100644 --- a/datafusion-optd-cli/tests/cli_integration.rs +++ b/datafusion-optd-cli/tests/cli_integration.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::process::Command; +use std::process::{Command, Stdio}; use assert_cmd::prelude::CommandCargoExt; @@ -55,11 +55,8 @@ fn init() { fn cli_test_tpch() { let mut cmd = Command::cargo_bin("datafusion-optd-cli").unwrap(); cmd.current_dir(".."); // all paths in `test.sql` assume we're in the base dir of the repo - cmd.args([ - "--enable-df-logical", - "--file", - "datafusion-optd-cli/tpch-sf0_01/test.sql", - ]); + cmd.args(["--file", "datafusion-optd-cli/tpch-sf0_01/test.sql"]); + cmd.stdout(Stdio::piped()).stderr(Stdio::piped()); let status = cmd.status().unwrap(); assert!( status.success(), diff --git a/optd-core/src/cascades/optimizer.rs b/optd-core/src/cascades/optimizer.rs index 7d2abfdc..ff0b8e8a 100644 --- a/optd-core/src/cascades/optimizer.rs +++ b/optd-core/src/cascades/optimizer.rs @@ -297,7 +297,7 @@ impl> CascadesOptimizer { trace!(event = "fire_optimize_tasks", root_group_id = %group_id); let mut task = TaskContext::new(self); // 32MB stack for the optimization process, TODO: reduce memory footprint - stacker::maybe_grow(32 * 1024 * 1024, 32 * 1024 * 1024, || { + stacker::grow(32 * 1024 * 1024, || { let fut: Pin>> = Box::pin(task.fire_optimize(group_id)); fut.block_on(); }); diff --git a/optd-core/src/cascades/tasks2.rs b/optd-core/src/cascades/tasks2.rs index 5644ab7f..a1961804 100644 --- a/optd-core/src/cascades/tasks2.rs +++ b/optd-core/src/cascades/tasks2.rs @@ -160,10 +160,14 @@ impl<'a, T: NodeType, M: Memo> TaskContext<'a, T, M> { continue; } // Skip transformation rules when budget is used - if self.optimizer.ctx.logical_budget_used && !rule.is_impl_rule() { + if (self.optimizer.ctx.logical_budget_used || self.optimizer.ctx.all_budget_used) + && !rule.is_impl_rule() + { continue; } - if self.optimizer.ctx.all_budget_used { + if self.optimizer.ctx.all_budget_used + && self.optimizer.get_group_winner(group_id).has_full_winner() + { break; } if top_matches(rule.matcher(), expr.typ.clone()) { @@ -246,7 +250,8 @@ impl<'a, T: NodeType, M: Memo> TaskContext<'a, T, M> { let rule = self.optimizer.rules()[rule_id].clone(); let binding_exprs = match_and_pick_expr(rule.matcher(), expr_id, self.optimizer); - if binding_exprs.len() >= 100 { + const BINDING_EXPR_WARNING_THRESHOLD: usize = 200; + if binding_exprs.len() >= BINDING_EXPR_WARNING_THRESHOLD { tracing::warn!( event = "rule_application", task = "apply_rule", @@ -302,12 +307,16 @@ impl<'a, T: NodeType, M: Memo> TaskContext<'a, T, M> { } } - if self.optimizer.ctx.all_budget_used { - break; - } - if self.optimizer.ctx.logical_budget_used && !rule.is_impl_rule() { + if (self.optimizer.ctx.logical_budget_used || self.optimizer.ctx.all_budget_used) + && !rule.is_impl_rule() + { continue; } + if self.optimizer.ctx.all_budget_used + && self.optimizer.get_group_winner(group_id).has_full_winner() + { + break; + } trace!(event = "before_apply_rule", task = "apply_rule", input_binding=%binding); let applied = rule.apply(self.optimizer, binding); @@ -540,7 +549,7 @@ impl<'a, T: NodeType, M: Memo> TaskContext<'a, T, M> { fn on_task_start(&self) { if (self.optimizer.ctx.all_budget_used || self.optimizer.ctx.logical_budget_used) - && self.steps % 100 == 0 + && self.steps % 100000 == 0 { println!("out of budget, dumping info"); println!("step={}", self.steps);