Skip to content

Commit

Permalink
fix(core): do not apply logical rule if iter budget is exausted (#265)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi Z <iskyzh@gmail.com>
  • Loading branch information
skyzh committed Dec 18, 2024
1 parent cac5d3f commit bf25604
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 15 deletions.
9 changes: 3 additions & 6 deletions datafusion-optd-cli/tests/cli_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::process::Command;
use std::process::{Command, Stdio};

use assert_cmd::prelude::CommandCargoExt;

Expand Down Expand Up @@ -55,11 +55,8 @@ fn init() {
fn cli_test_tpch() {
let mut cmd = Command::cargo_bin("datafusion-optd-cli").unwrap();
cmd.current_dir(".."); // all paths in `test.sql` assume we're in the base dir of the repo
cmd.args([
"--enable-df-logical",
"--file",
"datafusion-optd-cli/tpch-sf0_01/test.sql",
]);
cmd.args(["--file", "datafusion-optd-cli/tpch-sf0_01/test.sql"]);
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
let status = cmd.status().unwrap();
assert!(
status.success(),
Expand Down
2 changes: 1 addition & 1 deletion optd-core/src/cascades/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ impl<T: NodeType, M: Memo<T>> CascadesOptimizer<T, M> {
trace!(event = "fire_optimize_tasks", root_group_id = %group_id);
let mut task = TaskContext::new(self);
// 32MB stack for the optimization process, TODO: reduce memory footprint
stacker::maybe_grow(32 * 1024 * 1024, 32 * 1024 * 1024, || {
stacker::grow(32 * 1024 * 1024, || {
let fut: Pin<Box<dyn Future<Output = ()>>> = Box::pin(task.fire_optimize(group_id));
fut.block_on();
});
Expand Down
25 changes: 17 additions & 8 deletions optd-core/src/cascades/tasks2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,14 @@ impl<'a, T: NodeType, M: Memo<T>> TaskContext<'a, T, M> {
continue;
}
// Skip transformation rules when budget is used
if self.optimizer.ctx.logical_budget_used && !rule.is_impl_rule() {
if (self.optimizer.ctx.logical_budget_used || self.optimizer.ctx.all_budget_used)
&& !rule.is_impl_rule()
{
continue;
}
if self.optimizer.ctx.all_budget_used {
if self.optimizer.ctx.all_budget_used
&& self.optimizer.get_group_winner(group_id).has_full_winner()
{
break;
}
if top_matches(rule.matcher(), expr.typ.clone()) {
Expand Down Expand Up @@ -246,7 +250,8 @@ impl<'a, T: NodeType, M: Memo<T>> TaskContext<'a, T, M> {
let rule = self.optimizer.rules()[rule_id].clone();

let binding_exprs = match_and_pick_expr(rule.matcher(), expr_id, self.optimizer);
if binding_exprs.len() >= 100 {
const BINDING_EXPR_WARNING_THRESHOLD: usize = 200;
if binding_exprs.len() >= BINDING_EXPR_WARNING_THRESHOLD {
tracing::warn!(
event = "rule_application",
task = "apply_rule",
Expand Down Expand Up @@ -302,12 +307,16 @@ impl<'a, T: NodeType, M: Memo<T>> TaskContext<'a, T, M> {
}
}

if self.optimizer.ctx.all_budget_used {
break;
}
if self.optimizer.ctx.logical_budget_used && !rule.is_impl_rule() {
if (self.optimizer.ctx.logical_budget_used || self.optimizer.ctx.all_budget_used)
&& !rule.is_impl_rule()
{
continue;
}
if self.optimizer.ctx.all_budget_used
&& self.optimizer.get_group_winner(group_id).has_full_winner()
{
break;
}

trace!(event = "before_apply_rule", task = "apply_rule", input_binding=%binding);
let applied = rule.apply(self.optimizer, binding);
Expand Down Expand Up @@ -540,7 +549,7 @@ impl<'a, T: NodeType, M: Memo<T>> TaskContext<'a, T, M> {

fn on_task_start(&self) {
if (self.optimizer.ctx.all_budget_used || self.optimizer.ctx.logical_budget_used)
&& self.steps % 100 == 0
&& self.steps % 100000 == 0
{
println!("out of budget, dumping info");
println!("step={}", self.steps);
Expand Down

0 comments on commit bf25604

Please sign in to comment.