From 2e18e66c4bb7113a5813e562a223f3f9e48ae040 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 8 Nov 2024 14:44:15 -0800 Subject: [PATCH 1/3] allow value-less maps --- parquet/src/arrow/arrow_reader/mod.rs | 68 +++++++++++++ parquet/src/arrow/schema/complex.rs | 7 +- parquet/src/record/reader.rs | 132 +++++++++++++++++++++----- 3 files changed, 181 insertions(+), 26 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index d3709c03e99a..014125d06eda 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -4065,4 +4065,72 @@ mod tests { } } } + + #[test] + fn test_map_no_value() { + let schema = " + message spark_schema { + REQUIRED group my_map (MAP) { + REPEATED group key_value { + REQUIRED INT32 key; + } + } + REQUIRED group my_list (LIST) { + REPEATED group list { + REQUIRED INT32 element; + } + } + } + "; + let schema = Arc::new(parse_message_type(schema).unwrap()); + + // Write Parquet file to buffer + let mut buffer: Vec = Vec::new(); + let mut file_writer = + SerializedFileWriter::new(&mut buffer, schema, Default::default()).unwrap(); + let mut row_group_writer = file_writer.next_row_group().unwrap(); + + // Write column my_map.key_value.key + let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); + column_writer + .typed::() + .write_batch( + &[1, 2, 3, 4, 5, 6, 7, 8, 9], + Some(&[1, 1, 1, 1, 1, 1, 1, 1, 1]), + Some(&[0, 1, 1, 0, 1, 1, 0, 1, 1]), + ) + .unwrap(); + column_writer.close().unwrap(); + + // Write column my_list.list.element + let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); + column_writer + .typed::() + .write_batch( + &[1, 2, 3, 4, 5, 6, 7, 8, 9], + Some(&[1, 1, 1, 1, 1, 1, 1, 1, 1]), + Some(&[0, 1, 1, 0, 1, 1, 0, 1, 1]), + ) + .unwrap(); + column_writer.close().unwrap(); + + // Finalize Parquet file + row_group_writer.close().unwrap(); + file_writer.close().unwrap(); + assert_eq!(&buffer[0..4], b"PAR1"); + + // Read Parquet file from buffer + let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)) + .unwrap() + .build() + .unwrap(); + let out = reader.next().unwrap().unwrap(); + assert_eq!(out.num_rows(), 3); + assert_eq!(out.num_columns(), 2); + // map and list columns should now be equivalent + let c0 = out.column(0).as_list::(); + let c1 = out.column(1).as_list::(); + assert_eq!(c0.len(), c1.len()); + c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r)); + } } diff --git a/parquet/src/arrow/schema/complex.rs b/parquet/src/arrow/schema/complex.rs index e487feabb848..dbc81d1ee195 100644 --- a/parquet/src/arrow/schema/complex.rs +++ b/parquet/src/arrow/schema/complex.rs @@ -271,8 +271,13 @@ impl Visitor { return Err(arrow_err!("Child of map field must be repeated")); } + // According to the specification the values are optional (#1642). + // In this case, return the keys as a list. + if map_key_value.get_fields().len() == 1 { + return self.visit_list(map_type, context); + } + if map_key_value.get_fields().len() != 2 { - // According to the specification the values are optional (#1642) return Err(arrow_err!( "Child of map field must have two children, found {}", map_key_value.get_fields().len() diff --git a/parquet/src/record/reader.rs b/parquet/src/record/reader.rs index fd6ca7cdd57a..ba9a396d3998 100644 --- a/parquet/src/record/reader.rs +++ b/parquet/src/record/reader.rs @@ -217,11 +217,15 @@ impl TreeBuilder { Repetition::REPEATED, "Invalid map type: {field:?}" ); - assert_eq!( - key_value_type.get_fields().len(), - 2, - "Invalid map type: {field:?}" - ); + // Parquet spec allows no value. In that case treat as a list. #1642 + if key_value_type.get_fields().len() != 1 { + // If not a list, then there can only be 2 fields in the struct + assert_eq!( + key_value_type.get_fields().len(), + 2, + "Invalid map type: {field:?}" + ); + } path.push(String::from(key_value_type.name())); @@ -239,25 +243,35 @@ impl TreeBuilder { row_group_reader, )?; - let value_type = &key_value_type.get_fields()[1]; - let value_reader = self.reader_tree( - value_type.clone(), - path, - curr_def_level + 1, - curr_rep_level + 1, - paths, - row_group_reader, - )?; + if key_value_type.get_fields().len() == 1 { + path.pop(); + Reader::RepeatedReader( + field, + curr_def_level, + curr_rep_level, + Box::new(key_reader), + ) + } else { + let value_type = &key_value_type.get_fields()[1]; + let value_reader = self.reader_tree( + value_type.clone(), + path, + curr_def_level + 1, + curr_rep_level + 1, + paths, + row_group_reader, + )?; - path.pop(); + path.pop(); - Reader::KeyValueReader( - field, - curr_def_level, - curr_rep_level, - Box::new(key_reader), - Box::new(value_reader), - ) + Reader::KeyValueReader( + field, + curr_def_level, + curr_rep_level, + Box::new(key_reader), + Box::new(value_reader), + ) + } } // A repeated field that is neither contained by a `LIST`- or // `MAP`-annotated group nor annotated by `LIST` or `MAP` @@ -813,7 +827,7 @@ impl Iterator for ReaderIter { mod tests { use super::*; - use crate::data_type::Int64Type; + use crate::data_type::{Int32Type, Int64Type}; use crate::file::reader::SerializedFileReader; use crate::file::writer::SerializedFileWriter; use crate::record::api::RowAccessor; @@ -1459,8 +1473,7 @@ mod tests { } #[test] - #[should_panic(expected = "Invalid map type")] - fn test_file_reader_rows_invalid_map_type() { + fn test_file_reader_rows_nested_map_type() { let schema = " message spark_schema { OPTIONAL group a (MAP) { @@ -1823,6 +1836,75 @@ mod tests { assert_eq!(rows, expected_rows); } + #[test] + fn test_map_no_value() { + let schema = " + message spark_schema { + REQUIRED group my_map (MAP) { + REPEATED group key_value { + REQUIRED INT32 key; + } + } + REQUIRED group my_list (LIST) { + REPEATED group list { + REQUIRED INT32 element; + } + } + } + "; + let schema = Arc::new(parse_message_type(schema).unwrap()); + + // Write Parquet file to buffer + //let mut buffer = std::fs::File::create("/Users/seidl/map_no_value.pq").unwrap(); + let mut buffer: Vec = Vec::new(); + let mut file_writer = + SerializedFileWriter::new(&mut buffer, schema, Default::default()).unwrap(); + let mut row_group_writer = file_writer.next_row_group().unwrap(); + + // Write column my_map.key_value.key + let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); + column_writer + .typed::() + .write_batch( + &[1, 2, 3, 4, 5, 6, 7, 8, 9], + Some(&[1, 1, 1, 1, 1, 1, 1, 1, 1]), + Some(&[0, 1, 1, 0, 1, 1, 0, 1, 1]), + ) + .unwrap(); + column_writer.close().unwrap(); + + // Write column my_list.list.element + let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); + column_writer + .typed::() + .write_batch( + &[1, 2, 3, 4, 5, 6, 7, 8, 9], + Some(&[1, 1, 1, 1, 1, 1, 1, 1, 1]), + Some(&[0, 1, 1, 0, 1, 1, 0, 1, 1]), + ) + .unwrap(); + column_writer.close().unwrap(); + + // Finalize Parquet file + row_group_writer.close().unwrap(); + file_writer.close().unwrap(); + assert_eq!(&buffer[0..4], b"PAR1"); + + // Read Parquet file from buffer + let file_reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap(); + let rows: Vec<_> = file_reader + .get_row_iter(None) + .unwrap() + .map(|row| row.unwrap()) + .collect(); + + // the two columns should be equivalent lists by this point + for row in rows { + let cols = row.into_columns(); + assert_eq!(cols[0].1, cols[1].1); + } + } + fn test_file_reader_rows(file_name: &str, schema: Option) -> Result> { let file = get_test_file(file_name); let file_reader: Box = Box::new(SerializedFileReader::new(file)?); From 4f828f0b7ed618dc19382b940c7f9f91d3f494d4 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Thu, 21 Nov 2024 15:39:03 -0800 Subject: [PATCH 2/3] remove commented out code --- parquet/src/record/reader.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/parquet/src/record/reader.rs b/parquet/src/record/reader.rs index c104b0a6cc65..ac6cd63201f4 100644 --- a/parquet/src/record/reader.rs +++ b/parquet/src/record/reader.rs @@ -1855,7 +1855,6 @@ mod tests { let schema = Arc::new(parse_message_type(schema).unwrap()); // Write Parquet file to buffer - //let mut buffer = std::fs::File::create("/Users/seidl/map_no_value.pq").unwrap(); let mut buffer: Vec = Vec::new(); let mut file_writer = SerializedFileWriter::new(&mut buffer, schema, Default::default()).unwrap(); From b23ec72297de00eaa46f668d5055795593e36ad3 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 22 Nov 2024 14:13:01 -0800 Subject: [PATCH 3/3] use recently added map_no_value.parquet from parquet-testing --- parquet-testing | 2 +- parquet/src/arrow/arrow_reader/mod.rs | 83 +++++++++----------------- parquet/src/record/reader.rs | 86 ++++++++------------------- 3 files changed, 52 insertions(+), 119 deletions(-) diff --git a/parquet-testing b/parquet-testing index 550368ca77b9..4439a223a315 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit 550368ca77b97231efead39251a96bd6f8f08c6e +Subproject commit 4439a223a315cf874746d3b5da25e6a6b2a2b16e diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 014125d06eda..d686198c8d5a 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -4068,68 +4068,39 @@ mod tests { #[test] fn test_map_no_value() { - let schema = " - message spark_schema { - REQUIRED group my_map (MAP) { - REPEATED group key_value { - REQUIRED INT32 key; - } - } - REQUIRED group my_list (LIST) { - REPEATED group list { - REQUIRED INT32 element; - } - } - } - "; - let schema = Arc::new(parse_message_type(schema).unwrap()); - - // Write Parquet file to buffer - let mut buffer: Vec = Vec::new(); - let mut file_writer = - SerializedFileWriter::new(&mut buffer, schema, Default::default()).unwrap(); - let mut row_group_writer = file_writer.next_row_group().unwrap(); - - // Write column my_map.key_value.key - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - column_writer - .typed::() - .write_batch( - &[1, 2, 3, 4, 5, 6, 7, 8, 9], - Some(&[1, 1, 1, 1, 1, 1, 1, 1, 1]), - Some(&[0, 1, 1, 0, 1, 1, 0, 1, 1]), - ) - .unwrap(); - column_writer.close().unwrap(); - - // Write column my_list.list.element - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - column_writer - .typed::() - .write_batch( - &[1, 2, 3, 4, 5, 6, 7, 8, 9], - Some(&[1, 1, 1, 1, 1, 1, 1, 1, 1]), - Some(&[0, 1, 1, 0, 1, 1, 0, 1, 1]), - ) - .unwrap(); - column_writer.close().unwrap(); - - // Finalize Parquet file - row_group_writer.close().unwrap(); - file_writer.close().unwrap(); - assert_eq!(&buffer[0..4], b"PAR1"); + // File schema: + // message schema { + // required group my_map (MAP) { + // repeated group key_value { + // required int32 key; + // optional int32 value; + // } + // } + // required group my_map_no_v (MAP) { + // repeated group key_value { + // required int32 key; + // } + // } + // required group my_list (LIST) { + // repeated group list { + // required int32 element; + // } + // } + // } + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/map_no_value.parquet"); + let file = File::open(path).unwrap(); - // Read Parquet file from buffer - let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)) + let mut reader = ParquetRecordBatchReaderBuilder::try_new(file) .unwrap() .build() .unwrap(); let out = reader.next().unwrap().unwrap(); assert_eq!(out.num_rows(), 3); - assert_eq!(out.num_columns(), 2); - // map and list columns should now be equivalent - let c0 = out.column(0).as_list::(); - let c1 = out.column(1).as_list::(); + assert_eq!(out.num_columns(), 3); + // my_map_no_v and my_list columns should now be equivalent + let c0 = out.column(1).as_list::(); + let c1 = out.column(2).as_list::(); assert_eq!(c0.len(), c1.len()); c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r)); } diff --git a/parquet/src/record/reader.rs b/parquet/src/record/reader.rs index ac6cd63201f4..5e8dc9714875 100644 --- a/parquet/src/record/reader.rs +++ b/parquet/src/record/reader.rs @@ -827,7 +827,7 @@ impl Iterator for ReaderIter { mod tests { use super::*; - use crate::data_type::{Int32Type, Int64Type}; + use crate::data_type::Int64Type; use crate::file::reader::SerializedFileReader; use crate::file::writer::SerializedFileWriter; use crate::record::api::RowAccessor; @@ -1838,69 +1838,31 @@ mod tests { #[test] fn test_map_no_value() { - let schema = " - message spark_schema { - REQUIRED group my_map (MAP) { - REPEATED group key_value { - REQUIRED INT32 key; - } - } - REQUIRED group my_list (LIST) { - REPEATED group list { - REQUIRED INT32 element; - } - } - } - "; - let schema = Arc::new(parse_message_type(schema).unwrap()); - - // Write Parquet file to buffer - let mut buffer: Vec = Vec::new(); - let mut file_writer = - SerializedFileWriter::new(&mut buffer, schema, Default::default()).unwrap(); - let mut row_group_writer = file_writer.next_row_group().unwrap(); - - // Write column my_map.key_value.key - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - column_writer - .typed::() - .write_batch( - &[1, 2, 3, 4, 5, 6, 7, 8, 9], - Some(&[1, 1, 1, 1, 1, 1, 1, 1, 1]), - Some(&[0, 1, 1, 0, 1, 1, 0, 1, 1]), - ) - .unwrap(); - column_writer.close().unwrap(); - - // Write column my_list.list.element - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - column_writer - .typed::() - .write_batch( - &[1, 2, 3, 4, 5, 6, 7, 8, 9], - Some(&[1, 1, 1, 1, 1, 1, 1, 1, 1]), - Some(&[0, 1, 1, 0, 1, 1, 0, 1, 1]), - ) - .unwrap(); - column_writer.close().unwrap(); - - // Finalize Parquet file - row_group_writer.close().unwrap(); - file_writer.close().unwrap(); - assert_eq!(&buffer[0..4], b"PAR1"); - - // Read Parquet file from buffer - let file_reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap(); - let rows: Vec<_> = file_reader - .get_row_iter(None) - .unwrap() - .map(|row| row.unwrap()) - .collect(); - - // the two columns should be equivalent lists by this point + // File schema: + // message schema { + // required group my_map (MAP) { + // repeated group key_value { + // required int32 key; + // optional int32 value; + // } + // } + // required group my_map_no_v (MAP) { + // repeated group key_value { + // required int32 key; + // } + // } + // required group my_list (LIST) { + // repeated group list { + // required int32 element; + // } + // } + // } + let rows = test_file_reader_rows("map_no_value.parquet", None).unwrap(); + + // the my_map_no_v and my_list columns should be equivalent lists by this point for row in rows { let cols = row.into_columns(); - assert_eq!(cols[0].1, cols[1].1); + assert_eq!(cols[1].1, cols[2].1); } }