diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index 8dd55bb877..1079e577a8 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -214,3 +214,22 @@ where .map(|x| x.map(|x| x.1)), )) } + +/// Basically the same as `column_iter_to_arrays`, with the addition of the `init` parameter +/// to read the inner columns of the nested type directly, instead of reading the entire nested type. +pub fn nested_column_iter_to_arrays<'a, I: 'a>( + columns: Vec, + types: Vec<&PrimitiveType>, + field: Field, + init: Vec, + chunk_size: Option, + num_rows: usize, +) -> Result> +where + I: Pages, +{ + Ok(Box::new( + nested::columns_to_iter_recursive(columns, types, field, init, num_rows, chunk_size)? + .map(|x| x.map(|x| x.1)), + )) +} diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index baaffd6d44..ea2b2f46d4 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -37,7 +37,7 @@ use crate::{array::Array, error::Result}; use crate::types::{i256, NativeType}; pub use deserialize::{ column_iter_to_arrays, create_list, create_map, get_page_iterator, init_nested, n_columns, - InitNested, NestedArrayIter, NestedState, StructIterator, + nested_column_iter_to_arrays, InitNested, NestedArrayIter, NestedState, StructIterator, }; pub use file::{FileReader, RowGroupReader}; pub use row_group::*; diff --git a/tests/it/io/parquet/deserialize.rs b/tests/it/io/parquet/deserialize.rs new file mode 100644 index 0000000000..3ea1c2846e --- /dev/null +++ b/tests/it/io/parquet/deserialize.rs @@ -0,0 +1,85 @@ +use std::fs::File; + +use arrow2::{ + array::StructArray, + datatypes::DataType, + error::Result, + io::parquet::read::{ + infer_schema, n_columns, nested_column_iter_to_arrays, read_columns, read_metadata, + to_deserializer, BasicDecompressor, InitNested, PageReader, + }, +}; + +#[test] +fn test_deserialize_nested_column() -> Result<()> { + let path = "testing/parquet-testing/data/nested_structs.rust.parquet"; + let mut reader = File::open(path).unwrap(); + + let metadata = read_metadata(&mut reader)?; + let schema = infer_schema(&metadata)?; + + let num_rows = metadata.num_rows; + let row_group = metadata.row_groups[0].clone(); + + let field_columns = schema + .fields + .iter() + .map(|field| read_columns(&mut reader, row_group.columns(), &field.name)) + .collect::>>()?; + + let fields = schema.fields.clone(); + for (mut columns, field) in field_columns.into_iter().zip(fields.iter()) { + if let DataType::Struct(inner_fields) = &field.data_type { + let mut array_iter = + to_deserializer(columns.clone(), field.clone(), num_rows, None, None)?; + let array = array_iter.next().transpose()?.unwrap(); + let expected_array = array + .as_any() + .downcast_ref::() + .unwrap() + .clone(); + + // deserialize inner values of struct fields. + let init = vec![InitNested::Struct(field.is_nullable)]; + let mut values = Vec::with_capacity(inner_fields.len()); + for inner_field in inner_fields { + let n = n_columns(&inner_field.data_type); + let inner_columns: Vec<_> = columns.drain(0..n).collect(); + + let (nestd_columns, types): (Vec<_>, Vec<_>) = inner_columns + .into_iter() + .map(|(column_meta, chunk)| { + let len = chunk.len(); + let pages = PageReader::new( + std::io::Cursor::new(chunk), + column_meta, + std::sync::Arc::new(|_, _| true), + vec![], + len * 2 + 1024, + ); + ( + BasicDecompressor::new(pages, vec![]), + &column_meta.descriptor().descriptor.primitive_type, + ) + }) + .unzip(); + + let mut inner_array_iter = nested_column_iter_to_arrays( + nestd_columns, + types, + inner_field.clone(), + init.clone(), + None, + num_rows, + )?; + let inner_array = inner_array_iter.next().transpose()?; + values.push(inner_array.unwrap()); + } + let struct_array = StructArray::try_new(field.data_type.clone(), values, None)?; + + assert_eq!(expected_array, struct_array); + } + } + + Ok(()) +} diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 4803cc9c52..1b38c61c99 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -14,6 +14,7 @@ use arrow2::{ types::{days_ms, NativeType}, }; +mod deserialize; #[cfg(feature = "io_json_integration")] mod integration; mod read;