Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: convert run_pipeline_no_finalize from recursive to iterative in order to avoid stack overflows #11898

Closed
wants to merge 5 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
292 changes: 208 additions & 84 deletions crates/polars-pipe/src/pipeline/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt::{Debug, Formatter};
use std::ops::{Deref, DerefMut};
use std::rc::Rc;
use std::sync::{Arc, Mutex};

Expand Down Expand Up @@ -330,89 +331,6 @@ impl PipeLine {
self.sources.push(src);
}

fn run_pipeline_no_finalize(
&mut self,
ec: &PExecutionContext,
pipeline_q: Rc<RefCell<VecDeque<PipeLine>>>,
) -> PolarsResult<(u32, Box<dyn Sink>)> {
let mut out = None;
let mut operator_start = 0;
let last_i = self.sinks.len() - 1;

// for unions we typically first want to push all pipelines
// into the union sink before we call `finalize`
// however if the sink is finished early, (for instance a `head`)
// we don't want to run the rest of the pipelines and we finalize early
let mut sink_finished = false;

for (i, (operator_end, shared_count, mut sink)) in
std::mem::take(&mut self.sinks).into_iter().enumerate()
{
for src in &mut std::mem::take(&mut self.sources) {
let mut next_batches = src.get_batches(ec)?;

while let SourceResult::GotMoreData(chunks) = next_batches {
let (sink_result, next_batches2) = self.par_process_chunks(
chunks,
&mut sink,
ec,
operator_start,
operator_end,
src,
)?;
next_batches = next_batches2;

if let Some(SinkResult::Finished) = sink_result {
sink_finished = true;
break;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this a bit strange—sink_finished is defined outside the "main" for loop, but here we only break the inner while let loop. Is that intentional?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should break the outer loop indeed.

}
}
}

// The sinks have taken all chunks thread locally, now we reduce them into a single
// result sink.
let mut reduced_sink = POOL
.install(|| {
sink.into_par_iter().reduce_with(|mut a, mut b| {
a.combine(&mut *b);
a
})
})
.unwrap();
operator_start = operator_end;

let mut shared_sink_count = {
let mut shared_sink_count = shared_count.borrow_mut();
*shared_sink_count -= 1;
*shared_sink_count
};

while shared_sink_count > 0 && !sink_finished {
let mut pipeline = pipeline_q.borrow_mut().pop_front().unwrap();
let (count, mut sink) =
pipeline.run_pipeline_no_finalize(ec, pipeline_q.clone())?;
reduced_sink.combine(sink.as_mut());
shared_sink_count = count;
}

if i != last_i {
let sink_result = reduced_sink.finalize(ec)?;
match sink_result {
// turn this sink an a new source
FinalizedSink::Finished(df) => self.set_df_as_sources(df),
FinalizedSink::Source(src) => self.set_sources(src),
// should not happen
FinalizedSink::Operator(_) => {
unreachable!()
},
}
} else {
out = Some((shared_sink_count, reduced_sink))
}
}
Ok(out.unwrap())
}

/// Run a single pipeline branch.
/// This pulls data from the sources and pushes it into the operators which run on a different
/// thread and finalize in a sink.
Expand All @@ -424,7 +342,7 @@ impl PipeLine {
pipeline_q: Rc<RefCell<VecDeque<PipeLine>>>,
) -> PolarsResult<Option<FinalizedSink>> {
let (sink_shared_count, mut reduced_sink) =
self.run_pipeline_no_finalize(ec, pipeline_q)?;
run_pipeline_no_finalize_iterative(self, ec, pipeline_q)?;
assert_eq!(sink_shared_count, 0);
Ok(reduced_sink.finalize(ec).ok())
}
Expand Down Expand Up @@ -514,3 +432,209 @@ fn consume_source(src: &mut dyn Source, context: &PExecutionContext) -> PolarsRe

unsafe impl Send for PipeLine {}
unsafe impl Sync for PipeLine {}

// The algorithm is written in an iterative fashion to avoid stack overflows.
fn run_pipeline_no_finalize_iterative(
current_pipeline: &mut PipeLine,
ec: &PExecutionContext,
pipeline_q: Rc<RefCell<VecDeque<PipeLine>>>,
) -> PolarsResult<(u32, Box<dyn Sink>)> {
struct StackFrame<'a> {
parent_index: Option<usize>,
pipeline: OwnedOrMut<'a, PipeLine>,
operator_start: usize,
sink_finished: bool,
#[allow(clippy::type_complexity)]
remaining: std::vec::IntoIter<(usize, Rc<RefCell<u32>>, Vec<Box<dyn Sink>>)>,
// Sink-local data
shared_sink_count: u32,
reduced_sink: Box<dyn Sink>,
}

impl<'a> StackFrame<'a> {
// We run the entire non-recursive initialization step.
fn new(
mut pipeline: OwnedOrMut<'a, PipeLine>,
ec: &PExecutionContext,
parent_index: Option<usize>,
) -> PolarsResult<Self> {
let operator_start = 0;
// for unions we typically first want to push all pipelines
// into the union sink before we call `finalize`
// however if the sink is finished early, (for instance a `head`)
// we don't want to run the rest of the pipelines and we finalize early
let mut sink_finished = false;
let mut sinks = std::mem::take(&mut pipeline.sinks).into_iter();

let (operator_end, shared_count, sink) = sinks.next().unwrap();
let (shared_sink_count, reduced_sink) = Self::iteration_init(
pipeline.deref_mut(),
ec,
sink,
shared_count,
operator_start,
operator_end,
&mut sink_finished,
)?;

Ok(StackFrame {
operator_start: operator_end,
// for unions we typically first want to push all pipelines
// into the union sink before we call `finalize`
// however if the sink is finished early, (for instance a `head`)
// we don't want to run the rest of the pipelines and we finalize early
sink_finished,
remaining: sinks,
pipeline,
parent_index,
// Dummy values, they get set in the non-recursive init
shared_sink_count,
reduced_sink,
})
}

/// Try to pull another (sink, shared_count, operator_end) triplet from remaining,
/// finalizing the current cycle in the process.
///
/// If there are no more triplets, return `Ok(None)`.
/// If there was at least one more triplet, return `Ok(Some(()))`.
fn advance(&mut self, ec: &PExecutionContext) -> PolarsResult<Option<()>> {
let Some((operator_end, shared_count, sink)) = self.remaining.next() else {
return Ok(None);
};
let sink_result = self.reduced_sink.finalize(ec)?;
match sink_result {
// turn this sink an a new source
FinalizedSink::Finished(df) => self.pipeline.set_df_as_sources(df),
FinalizedSink::Source(src) => self.pipeline.set_sources(src),
// should not happen
FinalizedSink::Operator(_) => {
unreachable!()
},
}
let (shared_count, reduced_sink) = Self::iteration_init(
self.pipeline.deref_mut(),
ec,
sink,
shared_count,
self.operator_start,
operator_end,
&mut self.sink_finished,
)?;
self.operator_start = operator_end;
self.shared_sink_count = shared_count;
self.reduced_sink = reduced_sink;
Ok(Some(()))
}

/// Perform the non-recursive initialization for the current (sink, shared_count, operator_end) triplet.
fn iteration_init(
pipeline: &mut PipeLine,
ec: &PExecutionContext,
mut sink: Vec<Box<dyn Sink>>,
shared_count: Rc<RefCell<u32>>,
operator_start: usize,
operator_end: usize,
sink_finished: &mut bool,
) -> PolarsResult<(u32, Box<dyn Sink>)> {
'outer: for src in &mut std::mem::take(&mut pipeline.sources) {
let mut next_batches = src.get_batches(ec)?;

while let SourceResult::GotMoreData(chunks) = next_batches {
let (sink_result, next_batches2) = pipeline.par_process_chunks(
chunks,
&mut sink,
ec,
operator_start,
operator_end,
src,
)?;
next_batches = next_batches2;

if let Some(SinkResult::Finished) = sink_result {
*sink_finished = true;
break 'outer;
}
}
}

// The sinks have taken all chunks thread locally, now we reduce them into a single
// result sink.
let reduced_sink = POOL
.install(|| {
sink.into_par_iter().reduce_with(|mut a, mut b| {
a.combine(&mut *b);
a
})
})
.unwrap();

let shared_sink_count = {
let mut shared_sink_count = shared_count.borrow_mut();
*shared_sink_count -= 1;
*shared_sink_count
};
Ok((shared_sink_count, reduced_sink))
}
}

let mut stack = Vec::new();
stack.push(StackFrame::new(
OwnedOrMut::Mut(current_pipeline),
ec,
None,
)?);

'outer: while let Some(mut current_frame) = stack.pop() {
if current_frame.shared_sink_count > 0 && !current_frame.sink_finished {
let pipeline = pipeline_q.borrow_mut().pop_front().unwrap();
let parent_index = {
stack.push(current_frame);
stack.len() - 1
};
let next_frame = StackFrame::new(OwnedOrMut::Owned(pipeline), ec, Some(parent_index))?;
stack.push(next_frame);
continue 'outer;
}

if current_frame.advance(ec)?.is_some() {
// We re-enqueue for execution since we have more work to do.
stack.push(current_frame);
} else if let Some(parent_index) = current_frame.parent_index {
let parent_frame = &mut stack[parent_index];
parent_frame.shared_sink_count = current_frame.shared_sink_count;
parent_frame
.reduced_sink
.combine(current_frame.reduced_sink.as_mut());
} else {
return Ok((current_frame.shared_sink_count, current_frame.reduced_sink));
}
}
unreachable!()
}

// A type that can be either owned or a mutable reference.
enum OwnedOrMut<'a, T> {
Owned(T),
Mut(&'a mut T),
}

impl<'a, T> Deref for OwnedOrMut<'a, T> {
type Target = T;

fn deref(&self) -> &Self::Target {
match self {
OwnedOrMut::Owned(t) => t,
OwnedOrMut::Mut(t) => t,
}
}
}

impl<'a, T> DerefMut for OwnedOrMut<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
match self {
OwnedOrMut::Owned(t) => t,
OwnedOrMut::Mut(t) => t,
}
}
}