Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow reading Parquet maps that lack a values field #6730

Merged
merged 4 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4065,4 +4065,43 @@ mod tests {
}
}
}

#[test]
fn test_map_no_value() {
// File schema:
// message schema {
// required group my_map (MAP) {
// repeated group key_value {
// required int32 key;
// optional int32 value;
// }
// }
// required group my_map_no_v (MAP) {
// repeated group key_value {
// required int32 key;
// }
// }
// required group my_list (LIST) {
// repeated group list {
// required int32 element;
// }
// }
// }
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/map_no_value.parquet");
let file = File::open(path).unwrap();

let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
.unwrap()
.build()
.unwrap();
let out = reader.next().unwrap().unwrap();
assert_eq!(out.num_rows(), 3);
assert_eq!(out.num_columns(), 3);
// my_map_no_v and my_list columns should now be equivalent
let c0 = out.column(1).as_list::<i32>();
let c1 = out.column(2).as_list::<i32>();
assert_eq!(c0.len(), c1.len());
c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
}
}
7 changes: 6 additions & 1 deletion parquet/src/arrow/schema/complex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,13 @@ impl Visitor {
return Err(arrow_err!("Child of map field must be repeated"));
}

// According to the specification the values are optional (#1642).
// In this case, return the keys as a list.
if map_key_value.get_fields().len() == 1 {
return self.visit_list(map_type, context);
}

if map_key_value.get_fields().len() != 2 {
// According to the specification the values are optional (#1642)
return Err(arrow_err!(
"Child of map field must have two children, found {}",
map_key_value.get_fields().len()
Expand Down
91 changes: 67 additions & 24 deletions parquet/src/record/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,15 @@ impl TreeBuilder {
Repetition::REPEATED,
"Invalid map type: {field:?}"
);
assert_eq!(
key_value_type.get_fields().len(),
2,
"Invalid map type: {field:?}"
);
// Parquet spec allows no value. In that case treat as a list. #1642
if key_value_type.get_fields().len() != 1 {
// If not a list, then there can only be 2 fields in the struct
assert_eq!(
key_value_type.get_fields().len(),
2,
"Invalid map type: {field:?}"
);
}

path.push(String::from(key_value_type.name()));

Expand All @@ -239,25 +243,35 @@ impl TreeBuilder {
row_group_reader,
)?;

let value_type = &key_value_type.get_fields()[1];
let value_reader = self.reader_tree(
value_type.clone(),
path,
curr_def_level + 1,
curr_rep_level + 1,
paths,
row_group_reader,
)?;
if key_value_type.get_fields().len() == 1 {
path.pop();
Reader::RepeatedReader(
field,
curr_def_level,
curr_rep_level,
Box::new(key_reader),
)
} else {
let value_type = &key_value_type.get_fields()[1];
let value_reader = self.reader_tree(
value_type.clone(),
path,
curr_def_level + 1,
curr_rep_level + 1,
paths,
row_group_reader,
)?;

path.pop();
path.pop();

Reader::KeyValueReader(
field,
curr_def_level,
curr_rep_level,
Box::new(key_reader),
Box::new(value_reader),
)
Reader::KeyValueReader(
field,
curr_def_level,
curr_rep_level,
Box::new(key_reader),
Box::new(value_reader),
)
}
}
// A repeated field that is neither contained by a `LIST`- or
// `MAP`-annotated group nor annotated by `LIST` or `MAP`
Expand Down Expand Up @@ -1459,8 +1473,7 @@ mod tests {
}

#[test]
#[should_panic(expected = "Invalid map type")]
fn test_file_reader_rows_invalid_map_type() {
fn test_file_reader_rows_nested_map_type() {
let schema = "
message spark_schema {
OPTIONAL group a (MAP) {
Expand Down Expand Up @@ -1823,6 +1836,36 @@ mod tests {
assert_eq!(rows, expected_rows);
}

#[test]
fn test_map_no_value() {
// File schema:
// message schema {
// required group my_map (MAP) {
// repeated group key_value {
// required int32 key;
// optional int32 value;
// }
// }
// required group my_map_no_v (MAP) {
// repeated group key_value {
// required int32 key;
// }
// }
// required group my_list (LIST) {
// repeated group list {
// required int32 element;
// }
// }
// }
let rows = test_file_reader_rows("map_no_value.parquet", None).unwrap();

// the my_map_no_v and my_list columns should be equivalent lists by this point
for row in rows {
let cols = row.into_columns();
assert_eq!(cols[1].1, cols[2].1);
}
}

fn test_file_reader_rows(file_name: &str, schema: Option<Type>) -> Result<Vec<Row>> {
let file = get_test_file(file_name);
let file_reader: Box<dyn FileReader> = Box::new(SerializedFileReader::new(file)?);
Expand Down
Loading