From a4b33e6d7242ebd42595340cf6437b96e3b7997c Mon Sep 17 00:00:00 2001 From: ritchie Date: Sat, 28 Oct 2023 15:05:40 +0200 Subject: [PATCH] use vecdeque --- .../src/executors/sources/parquet.rs | 70 +++++++++---------- 1 file changed, 33 insertions(+), 37 deletions(-) diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index e11fdc25f0fe..812e24fb9063 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -1,3 +1,4 @@ +use std::collections::VecDeque; use std::ops::{Deref, Range}; use std::path::PathBuf; use std::sync::Arc; @@ -19,7 +20,7 @@ use crate::operators::{DataChunk, PExecutionContext, Source, SourceResult}; use crate::pipeline::determine_chunk_size; pub struct ParquetSource { - batched_reader: Option, + batched_readers: VecDeque, n_threads: usize, processed_paths: usize, chunk_index: IdxSize, @@ -35,9 +36,12 @@ pub struct ParquetSource { } impl ParquetSource { - fn init_reader(&mut self) -> PolarsResult<()> { - self.batched_reader = None; + fn init_next_reader(&mut self) -> PolarsResult<()> { self.metadata = None; + self.init_reader() + } + + fn init_reader(&mut self) -> PolarsResult<()> { let Some(index) = self.iter.next() else { return Ok(()); }; @@ -118,7 +122,7 @@ impl ParquetSource { if self.processed_paths >= 1 { polars_ensure!(batched_reader.schema().as_ref() == self.file_info.reader_schema.as_ref().unwrap().as_ref(), ComputeError: "schema of all files in a single scan_parquet must be equal"); } - self.batched_reader = Some(batched_reader); + self.batched_readers.push_back(batched_reader); self.processed_paths += 1; Ok(()) } @@ -138,7 +142,7 @@ impl ParquetSource { let iter = 0..paths.len(); let mut source = ParquetSource { - batched_reader: None, + batched_readers: VecDeque::new(), n_threads, chunk_index: 0, processed_paths: 0, @@ -151,49 +155,42 @@ impl ParquetSource { file_info, verbose, }; - source.init_reader()?; + // Already start downloading when we deal with cloud urls. + if !source.paths.first().unwrap().is_file() { + source.init_reader()?; + } Ok(source) } } impl Source for ParquetSource { fn get_batches(&mut self, _context: &PExecutionContext) -> PolarsResult { - if self.batched_reader.is_none() { - // If there was no new reader, we depleted all of them and are finished. - return Ok(SourceResult::Finished); + // We already start downloading the next file, we can only do that if we don't have a limit. + // In the case of a limit we first must update the row count with the batch results. + if self.batched_readers.len() < 3 && self.file_options.n_rows.is_none() + || self.batched_readers.is_empty() + { + self.init_next_reader()? } - let reader = self.batched_reader.as_mut().unwrap(); - - // We branch, because if we know the reader finishes after this batch we already want to start downloading the new batches. - // We can only do that if we don't have to update the row_count/limit with the result of this iteration. - let (reset, limit_reached, batches) = - if reader.finishes_this_batch(self.n_threads) && self.file_options.n_rows.is_none() { - // Take the reader, and immediately start the new download, before we await this one. - let mut reader = self.batched_reader.take().unwrap(); - // Already ensure the new downloads are started - self.init_reader()?; - let batches = get_runtime().block_on(reader.next_batches(self.n_threads))?; - (false, reader.limit_reached(), batches) - } else { - let batches = get_runtime().block_on(reader.next_batches(self.n_threads))?; - - (reader.is_finished(), reader.limit_reached(), batches) - }; + let Some(mut reader) = self.batched_readers.pop_front() else { + // If there was no new reader, we depleted all of them and are finished. + return Ok(SourceResult::Finished); + }; + let batches = get_runtime().block_on(reader.next_batches(self.n_threads))?; Ok(match batches { None => { - if limit_reached { + if reader.limit_reached() { return Ok(SourceResult::Finished); } - if reset { - // Set the new the reader. - self.init_reader()?; - } + + // reset the reader + self.init_next_reader()?; return self.get_batches(_context); }, Some(batches) => { - let source_result = SourceResult::GotMoreData( + let result = SourceResult::GotMoreData( batches .into_iter() .map(|data| { @@ -208,12 +205,11 @@ impl Source for ParquetSource { }) .collect(), ); + // We are not yet done with this reader. + // Ensure it is used in next iteration. + self.batched_readers.push_front(reader); - // Already start downloading the new files before we push the data into the engine. - if reset { - self.init_reader()? - } - source_result + result }, }) }