From 32a3104627827cd578c0417c63c45f9bad2c04a8 Mon Sep 17 00:00:00 2001 From: baishen Date: Mon, 23 Oct 2023 13:12:05 +0800 Subject: [PATCH 1/5] feat: Add `nested_column_iter_to_arrays` to deserialize inner columns --- src/io/parquet/read/deserialize/mod.rs | 19 +++++++++++++++++++ src/io/parquet/read/mod.rs | 2 +- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index 8dd55bb877..1079e577a8 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -214,3 +214,22 @@ where .map(|x| x.map(|x| x.1)), )) } + +/// Basically the same as `column_iter_to_arrays`, with the addition of the `init` parameter +/// to read the inner columns of the nested type directly, instead of reading the entire nested type. +pub fn nested_column_iter_to_arrays<'a, I: 'a>( + columns: Vec, + types: Vec<&PrimitiveType>, + field: Field, + init: Vec, + chunk_size: Option, + num_rows: usize, +) -> Result> +where + I: Pages, +{ + Ok(Box::new( + nested::columns_to_iter_recursive(columns, types, field, init, num_rows, chunk_size)? + .map(|x| x.map(|x| x.1)), + )) +} diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index baaffd6d44..ea2b2f46d4 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -37,7 +37,7 @@ use crate::{array::Array, error::Result}; use crate::types::{i256, NativeType}; pub use deserialize::{ column_iter_to_arrays, create_list, create_map, get_page_iterator, init_nested, n_columns, - InitNested, NestedArrayIter, NestedState, StructIterator, + nested_column_iter_to_arrays, InitNested, NestedArrayIter, NestedState, StructIterator, }; pub use file::{FileReader, RowGroupReader}; pub use row_group::*; From 7a37e19929124ba1b816a4f3944090084ce29f3f Mon Sep 17 00:00:00 2001 From: baishen Date: Tue, 24 Oct 2023 14:57:50 +0800 Subject: [PATCH 2/5] add tests --- tests/it/io/parquet/deserialize.rs | 86 ++++++++++++++++++++++++++++++ tests/it/io/parquet/mod.rs | 1 + 2 files changed, 87 insertions(+) create mode 100644 tests/it/io/parquet/deserialize.rs diff --git a/tests/it/io/parquet/deserialize.rs b/tests/it/io/parquet/deserialize.rs new file mode 100644 index 0000000000..06f2bd09fe --- /dev/null +++ b/tests/it/io/parquet/deserialize.rs @@ -0,0 +1,86 @@ +use std::fs::File; + +use arrow::{ + array::StructArray, + datatypes::DataType, + error::Error, + io::parquet::read::{ + infer_schema, n_columns, nested_column_iter_to_arrays, read_columns, read_metadata, + to_deserializer, BasicDecompressor, InitNested, PageReader, + }, +}; + +#[test] +fn test_deserialize_nested_column() -> Result<()> { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/nested_structs.rust.parquet"); + let mut reader = File::open(&path).unwrap(); + + let metadata = read::read_metadata(&mut reader)?; + let schema = read::infer_schema(&metadata)?; + + let num_rows = metadata.num_rows; + let row_group = metadata.row_groups[0].clone(); + + let field_columns = schema + .fields + .iter() + .map(|field| read_columns(&mut reader, row_group.columns(), &field.name)) + .collect::, Error>>()?; + + let fields = schema.fields.clone(); + for (mut columns, field) in field_columns.into_iter().zip(fields.iter()) { + if let DataType::Struct(inner_fields) = &field.data_type { + let mut array_iter = + to_deserializer(columns.clone(), field.clone(), num_rows, None, None)?; + let array = array_iter.next().transpose()?.unwrap(); + let expected_array = array + .as_any() + .downcast_ref::() + .unwrap() + .clone(); + + // deserialize inner values of struct fields. + let init = vec![InitNested::Struct(field.is_nullable)]; + let mut values = Vec::with_capacity(inner_fields.len()); + for inner_field in inner_fields { + let n = n_columns(&inner_field.data_type); + let inner_columns: Vec<_> = columns.drain(0..n).collect(); + + let (nestd_columns, types): (Vec<_>, Vec<_>) = inner_columns + .into_iter() + .map(|(column_meta, chunk)| { + let len = chunk.len(); + let pages = PageReader::new( + std::io::Cursor::new(chunk), + column_meta, + std::sync::Arc::new(|_, _| true), + vec![], + len * 2 + 1024, + ); + ( + BasicDecompressor::new(pages, vec![]), + &column_meta.descriptor().descriptor.primitive_type, + ) + }) + .unzip(); + + let mut inner_array_iter = nested_column_iter_to_arrays( + nestd_columns, + types, + inner_field.clone(), + init.clone(), + None, + num_rows, + )?; + let inner_array = inner_array_iter.next().transpose()?; + values.push(inner_array.unwrap()); + } + let struct_array = StructArray::try_new(field.data_type.clone(), values, None)?; + + assert_eq!(expected_array, struct_array); + } + } + + Ok(()) +} diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 4803cc9c52..1b38c61c99 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -14,6 +14,7 @@ use arrow2::{ types::{days_ms, NativeType}, }; +mod deserialize; #[cfg(feature = "io_json_integration")] mod integration; mod read; From a1482f1b88b01d0a5d0be8e486f2e3805ca5ff1b Mon Sep 17 00:00:00 2001 From: baishen Date: Tue, 24 Oct 2023 15:06:15 +0800 Subject: [PATCH 3/5] fix --- tests/it/io/parquet/deserialize.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/it/io/parquet/deserialize.rs b/tests/it/io/parquet/deserialize.rs index 06f2bd09fe..a2d08b66e4 100644 --- a/tests/it/io/parquet/deserialize.rs +++ b/tests/it/io/parquet/deserialize.rs @@ -1,23 +1,24 @@ use std::fs::File; -use arrow::{ +use arrow2::{ array::StructArray, datatypes::DataType, - error::Error, + error::{Error, Result}, io::parquet::read::{ infer_schema, n_columns, nested_column_iter_to_arrays, read_columns, read_metadata, to_deserializer, BasicDecompressor, InitNested, PageReader, }, + util::test_util::parquet_test_data, }; #[test] fn test_deserialize_nested_column() -> Result<()> { - let testdata = arrow::util::test_util::parquet_test_data(); + let testdata = parquet_test_data(); let path = format!("{testdata}/nested_structs.rust.parquet"); let mut reader = File::open(&path).unwrap(); - let metadata = read::read_metadata(&mut reader)?; - let schema = read::infer_schema(&metadata)?; + let metadata = read_metadata(&mut reader)?; + let schema = infer_schema(&metadata)?; let num_rows = metadata.num_rows; let row_group = metadata.row_groups[0].clone(); From 82fabfbf8b33667148e8c6efbc5fbb881b7acf85 Mon Sep 17 00:00:00 2001 From: baishen Date: Tue, 24 Oct 2023 15:20:51 +0800 Subject: [PATCH 4/5] fix --- tests/it/io/parquet/deserialize.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tests/it/io/parquet/deserialize.rs b/tests/it/io/parquet/deserialize.rs index a2d08b66e4..cbb47f7ef0 100644 --- a/tests/it/io/parquet/deserialize.rs +++ b/tests/it/io/parquet/deserialize.rs @@ -3,18 +3,16 @@ use std::fs::File; use arrow2::{ array::StructArray, datatypes::DataType, - error::{Error, Result}, + error::Result, io::parquet::read::{ infer_schema, n_columns, nested_column_iter_to_arrays, read_columns, read_metadata, to_deserializer, BasicDecompressor, InitNested, PageReader, }, - util::test_util::parquet_test_data, }; #[test] fn test_deserialize_nested_column() -> Result<()> { - let testdata = parquet_test_data(); - let path = format!("{testdata}/nested_structs.rust.parquet"); + let path = "testing/parquet-testing/data/nested_structs.rust.parquet"; let mut reader = File::open(&path).unwrap(); let metadata = read_metadata(&mut reader)?; @@ -27,7 +25,7 @@ fn test_deserialize_nested_column() -> Result<()> { .fields .iter() .map(|field| read_columns(&mut reader, row_group.columns(), &field.name)) - .collect::, Error>>()?; + .collect::>>()?; let fields = schema.fields.clone(); for (mut columns, field) in field_columns.into_iter().zip(fields.iter()) { From 79f230354007c63fac30a0f8d32c8293f5294944 Mon Sep 17 00:00:00 2001 From: baishen Date: Tue, 24 Oct 2023 15:24:18 +0800 Subject: [PATCH 5/5] fix --- tests/it/io/parquet/deserialize.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/it/io/parquet/deserialize.rs b/tests/it/io/parquet/deserialize.rs index cbb47f7ef0..3ea1c2846e 100644 --- a/tests/it/io/parquet/deserialize.rs +++ b/tests/it/io/parquet/deserialize.rs @@ -13,7 +13,7 @@ use arrow2::{ #[test] fn test_deserialize_nested_column() -> Result<()> { let path = "testing/parquet-testing/data/nested_structs.rust.parquet"; - let mut reader = File::open(&path).unwrap(); + let mut reader = File::open(path).unwrap(); let metadata = read_metadata(&mut reader)?; let schema = infer_schema(&metadata)?;