From 8fd0c5bd4323e323f0007970d7209fc31bbbe514 Mon Sep 17 00:00:00 2001 From: Gijs Burghoorn Date: Wed, 10 Jul 2024 14:58:26 +0200 Subject: [PATCH] perf: Batch nested embed parquet decoding (#17549) --- .../arrow/read/deserialize/binary/utils.rs | 3 +- .../src/arrow/read/deserialize/mod.rs | 6 +- .../src/arrow/read/deserialize/nested.rs | 10 +- .../arrow/read/deserialize/nested_utils.rs | 246 +++++++++--------- .../src/arrow/read/deserialize/struct_.rs | 2 +- 5 files changed, 137 insertions(+), 130 deletions(-) diff --git a/crates/polars-parquet/src/arrow/read/deserialize/binary/utils.rs b/crates/polars-parquet/src/arrow/read/deserialize/binary/utils.rs index 962203273b24..f96d036691d3 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/binary/utils.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/binary/utils.rs @@ -108,7 +108,8 @@ impl<'a> Iterator for BinaryIter<'a> { #[inline] fn next(&mut self) -> Option { - if self.values.is_empty() || self.num_values == 0 { + if self.num_values == 0 { + assert!(self.values.is_empty()); return None; } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/mod.rs b/crates/polars-parquet/src/arrow/read/deserialize/mod.rs index aa3f87a313b0..5a35efd35d7d 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/mod.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/mod.rs @@ -46,7 +46,7 @@ pub fn create_list( nested: &mut NestedState, values: Box, ) -> Box { - let (mut offsets, validity) = nested.nested.pop().unwrap().take(); + let (mut offsets, validity) = nested.pop().unwrap(); match data_type.to_logical_type() { ArrowDataType::List(_) => { offsets.push(values.len() as i64); @@ -89,7 +89,7 @@ pub fn create_map( nested: &mut NestedState, values: Box, ) -> Box { - let (mut offsets, validity) = nested.nested.pop().unwrap().take(); + let (mut offsets, validity) = nested.pop().unwrap(); match data_type.to_logical_type() { ArrowDataType::Map(_, _) => { offsets.push(values.len() as i64); @@ -147,7 +147,7 @@ where chunk_size, num_rows, )? - .map(|x| Ok((NestedState::new(vec![]), x?))), + .map(|x| Ok((NestedState::default(), x?))), )); } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/nested.rs b/crates/polars-parquet/src/arrow/read/deserialize/nested.rs index 3180c4594285..66ae9acd9366 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/nested.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/nested.rs @@ -13,7 +13,7 @@ where { Box::new(iter.map(|x| { x.map(|(mut nested, array)| { - let _ = nested.nested.pop().unwrap(); // the primitive + let _ = nested.pop().unwrap(); // the primitive (nested, array) }) })) @@ -28,7 +28,7 @@ where { Box::new(iter.map(|x| { x.map(|(mut nested, array)| { - let _ = nested.nested.pop().unwrap(); // the primitive + let _ = nested.pop().unwrap(); // the primitive (nested, Box::new(array) as _) }) })) @@ -322,7 +322,7 @@ where validity, )?); - let _ = nested.nested.pop().unwrap(); // the primitive + let _ = nested.pop().unwrap(); // the primitive Ok((nested, array)) }); @@ -380,7 +380,7 @@ where validity, )?); - let _ = nested.nested.pop().unwrap(); // the primitive + let _ = nested.pop().unwrap(); // the primitive Ok((nested, array)) }); @@ -411,7 +411,7 @@ where validity, )?); - let _ = nested.nested.pop().unwrap(); // the primitive + let _ = nested.pop().unwrap(); // the primitive Ok((nested, array)) }); diff --git a/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs b/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs index cfb5817e3e0b..649fa93f371d 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs @@ -15,169 +15,171 @@ use crate::parquet::read::levels::get_bit_width; use crate::read::deserialize::utils::BatchedCollector; #[derive(Debug)] -pub enum Nested { - Primitive(NestedPrimitive), - List(NestedList), - FixedSizeList(NestedFixedSizeList), - Struct(NestedStruct), -} - -#[derive(Debug)] -pub struct NestedPrimitive { - is_nullable: bool, - length: usize, -} - -#[derive(Debug)] -pub struct NestedList { - validity: Option, - offsets: Vec, -} - -#[derive(Debug)] -pub struct NestedFixedSizeList { +pub struct Nested { validity: Option, length: usize, - width: usize, + content: NestedContent, + + // We batch the collection of valids and invalids to amortize the costs. This only really works + // when valids and invalids are grouped or there is a disbalance in the amount of valids vs. + // invalids. This, however, is a very common situation. + num_valids: usize, + num_invalids: usize, } #[derive(Debug)] -pub struct NestedStruct { - validity: Option, - length: usize, +pub enum NestedContent { + Primitive, + List { offsets: Vec }, + FixedSizeList { width: usize }, + Struct, } impl Nested { fn primitive(is_nullable: bool) -> Self { - Self::Primitive(NestedPrimitive { - is_nullable, + // @NOTE: We allocate with `0` capacity here since we will not be pushing to this bitmap. + // This is because primitive does not keep track of the validity here. It keeps track in + // the decoder. We do still want to put something so that we can check for nullability by + // looking at the option. + let validity = is_nullable.then(|| MutableBitmap::with_capacity(0)); + + Self { + validity, length: 0, - }) + content: NestedContent::Primitive, + + num_valids: 0, + num_invalids: 0, + } } fn list_with_capacity(is_nullable: bool, capacity: usize) -> Self { let offsets = Vec::with_capacity(capacity); let validity = is_nullable.then(|| MutableBitmap::with_capacity(capacity)); - Self::List(NestedList { offsets, validity }) + Self { + validity, + length: 0, + content: NestedContent::List { offsets }, + + num_valids: 0, + num_invalids: 0, + } } fn fixedlist_with_capacity(is_nullable: bool, width: usize, capacity: usize) -> Self { let validity = is_nullable.then(|| MutableBitmap::with_capacity(capacity)); - Self::FixedSizeList(NestedFixedSizeList { - length: 0, - width, + Self { validity, - }) + length: 0, + content: NestedContent::FixedSizeList { width }, + + num_valids: 0, + num_invalids: 0, + } } fn struct_with_capacity(is_nullable: bool, capacity: usize) -> Self { let validity = is_nullable.then(|| MutableBitmap::with_capacity(capacity)); - Self::Struct(NestedStruct { - length: 0, + Self { validity, - }) + length: 0, + content: NestedContent::Struct, + + num_valids: 0, + num_invalids: 0, + } } - pub fn take(self) -> (Vec, Option) { - match self { - Nested::Primitive(_) => (Vec::new(), None), - Nested::List(n) => (n.offsets, n.validity), - Nested::FixedSizeList(n) => (Vec::new(), n.validity), - Nested::Struct(n) => (Vec::new(), n.validity), + fn take(mut self) -> (Vec, Option) { + if !matches!(self.content, NestedContent::Primitive) { + if let Some(validity) = self.validity.as_mut() { + validity.extend_constant(self.num_valids, true); + validity.extend_constant(self.num_invalids, false); + } + } + + self.num_valids = 0; + self.num_invalids = 0; + + match self.content { + NestedContent::Primitive => { + debug_assert!(self.validity.map_or(true, |validity| validity.is_empty())); + (Vec::new(), None) + }, + NestedContent::List { offsets } => (offsets, self.validity), + NestedContent::FixedSizeList { .. } => (Vec::new(), self.validity), + NestedContent::Struct => (Vec::new(), self.validity), } } fn is_nullable(&self) -> bool { - match self { - Nested::Primitive(n) => n.is_nullable, - Nested::List(n) => n.validity.is_some(), - Nested::FixedSizeList(n) => n.validity.is_some(), - Nested::Struct(n) => n.validity.is_some(), - } + self.validity.is_some() } fn is_repeated(&self) -> bool { - match self { - Nested::Primitive(_) => false, - Nested::List(_) => true, - Nested::FixedSizeList(_) => true, - Nested::Struct(_) => false, + match self.content { + NestedContent::Primitive => false, + NestedContent::List { .. } => true, + NestedContent::FixedSizeList { .. } => true, + NestedContent::Struct => false, } } fn is_required(&self) -> bool { - match self { - Nested::Primitive(_) => false, - Nested::List(_) => false, - Nested::FixedSizeList(_) => false, - Nested::Struct(_) => true, + match self.content { + NestedContent::Primitive => false, + NestedContent::List { .. } => false, + NestedContent::FixedSizeList { .. } => false, + NestedContent::Struct => true, } } - fn push_default(&mut self, length: i64) { - match self { - Nested::Primitive(n) => n.length += 1, - Nested::List(n) => { - if let Some(validity) = n.validity.as_mut() { - validity.push(false); - } - n.offsets.push(length); - }, - Nested::FixedSizeList(n) => { - if let Some(validity) = n.validity.as_mut() { - validity.push(false); - } - n.length += 1; - }, - Nested::Struct(n) => { - if let Some(validity) = n.validity.as_mut() { - validity.push(false); - } - n.length += 1; - }, + /// number of rows + fn len(&self) -> usize { + self.length + } + + fn invalid_num_values(&self) -> usize { + match &self.content { + NestedContent::Primitive => 0, + NestedContent::List { .. } => 0, + NestedContent::FixedSizeList { width } => *width, + NestedContent::Struct => 1, } } fn push(&mut self, value: i64, is_valid: bool) { - match self { - Nested::Primitive(n) => n.length += 1, - Nested::List(n) => { - if let Some(validity) = n.validity.as_mut() { - validity.push(is_valid); - } - n.offsets.push(value); - }, - Nested::FixedSizeList(n) => { - if let Some(validity) = n.validity.as_mut() { - validity.push(is_valid); - } - n.length += 1; - }, - Nested::Struct(n) => { - if let Some(validity) = n.validity.as_mut() { - validity.push(is_valid); - } - n.length += 1; - }, + let is_primitive = matches!(self.content, NestedContent::Primitive); + + if is_valid && self.num_invalids != 0 { + debug_assert!(!is_primitive); + + let validity = self.validity.as_mut().unwrap(); + validity.extend_constant(self.num_valids, true); + validity.extend_constant(self.num_invalids, false); + + self.num_valids = 0; + self.num_invalids = 0; } - } - /// number of rows - fn len(&self) -> usize { - match self { - Nested::Primitive(n) => n.length, - Nested::List(n) => n.offsets.len(), - Nested::FixedSizeList(n) => n.length, - Nested::Struct(n) => n.length, + self.num_valids += usize::from(!is_primitive & is_valid); + self.num_invalids += usize::from(!is_primitive & !is_valid); + + self.length += 1; + if let NestedContent::List { offsets } = &mut self.content { + offsets.push(value); } } - fn invalid_num_values(&self) -> usize { - match self { - Nested::Primitive(_) => 0, - Nested::List(_) => 0, - Nested::FixedSizeList(n) => n.width, - Nested::Struct(_) => 1, + fn push_default(&mut self, length: i64) { + debug_assert!(self.validity.is_some()); + + let is_primitive = matches!(self.content, NestedContent::Primitive); + self.num_invalids += usize::from(!is_primitive); + + self.length += 1; + if let NestedContent::List { offsets } = &mut self.content { + offsets.push(length); } } } @@ -300,18 +302,22 @@ impl<'a> NestedPage<'a> { } /// The state of nested data types. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct NestedState { /// The nesteds composing `NestedState`. - pub nested: Vec, + nested: Vec, } impl NestedState { /// Creates a new [`NestedState`]. - pub fn new(nested: Vec) -> Self { + fn new(nested: Vec) -> Self { Self { nested } } + pub fn pop(&mut self) -> Option<(Vec, Option)> { + Some(self.nested.pop()?.take()) + } + /// The number of rows in this state pub fn len(&self) -> usize { // outermost is the number of rows @@ -347,8 +353,8 @@ pub(super) fn extend<'a, D: NestedDecoder<'a>>( let chunk_size = chunk_size.unwrap_or(usize::MAX); let mut first_item_is_fully_read = false; // Amortize the allocations. - let mut cum_sum = vec![]; - let mut cum_rep = vec![]; + let mut def_levels = vec![]; + let mut rep_levels = vec![]; loop { if let Some((mut nested, mut decoded)) = items.pop_back() { @@ -368,8 +374,8 @@ pub(super) fn extend<'a, D: NestedDecoder<'a>>( &mut batched_collector, &mut nested.nested, additional, - &mut cum_sum, - &mut cum_rep, + &mut def_levels, + &mut rep_levels, )?; batched_collector.finalize()?; diff --git a/crates/polars-parquet/src/arrow/read/deserialize/struct_.rs b/crates/polars-parquet/src/arrow/read/deserialize/struct_.rs index 27de40d7d67e..b43edf85c675 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/struct_.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/struct_.rs @@ -45,7 +45,7 @@ impl<'a> Iterator for StructIterator<'a> { } } let mut nested = nested.pop().unwrap(); - let (_, validity) = nested.nested.pop().unwrap().take(); + let (_, validity) = nested.pop().unwrap(); Some(Ok(( nested,