Skip to content

Commit

Permalink
use recently added map_no_value.parquet from parquet-testing
Browse files Browse the repository at this point in the history
  • Loading branch information
etseidl committed Nov 22, 2024
1 parent 4f828f0 commit b23ec72
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 119 deletions.
83 changes: 27 additions & 56 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4068,68 +4068,39 @@ mod tests {

#[test]
fn test_map_no_value() {
let schema = "
message spark_schema {
REQUIRED group my_map (MAP) {
REPEATED group key_value {
REQUIRED INT32 key;
}
}
REQUIRED group my_list (LIST) {
REPEATED group list {
REQUIRED INT32 element;
}
}
}
";
let schema = Arc::new(parse_message_type(schema).unwrap());

// Write Parquet file to buffer
let mut buffer: Vec<u8> = Vec::new();
let mut file_writer =
SerializedFileWriter::new(&mut buffer, schema, Default::default()).unwrap();
let mut row_group_writer = file_writer.next_row_group().unwrap();

// Write column my_map.key_value.key
let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
column_writer
.typed::<Int32Type>()
.write_batch(
&[1, 2, 3, 4, 5, 6, 7, 8, 9],
Some(&[1, 1, 1, 1, 1, 1, 1, 1, 1]),
Some(&[0, 1, 1, 0, 1, 1, 0, 1, 1]),
)
.unwrap();
column_writer.close().unwrap();

// Write column my_list.list.element
let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
column_writer
.typed::<Int32Type>()
.write_batch(
&[1, 2, 3, 4, 5, 6, 7, 8, 9],
Some(&[1, 1, 1, 1, 1, 1, 1, 1, 1]),
Some(&[0, 1, 1, 0, 1, 1, 0, 1, 1]),
)
.unwrap();
column_writer.close().unwrap();

// Finalize Parquet file
row_group_writer.close().unwrap();
file_writer.close().unwrap();
assert_eq!(&buffer[0..4], b"PAR1");
// File schema:
// message schema {
// required group my_map (MAP) {
// repeated group key_value {
// required int32 key;
// optional int32 value;
// }
// }
// required group my_map_no_v (MAP) {
// repeated group key_value {
// required int32 key;
// }
// }
// required group my_list (LIST) {
// repeated group list {
// required int32 element;
// }
// }
// }
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/map_no_value.parquet");
let file = File::open(path).unwrap();

// Read Parquet file from buffer
let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
.unwrap()
.build()
.unwrap();
let out = reader.next().unwrap().unwrap();
assert_eq!(out.num_rows(), 3);
assert_eq!(out.num_columns(), 2);
// map and list columns should now be equivalent
let c0 = out.column(0).as_list::<i32>();
let c1 = out.column(1).as_list::<i32>();
assert_eq!(out.num_columns(), 3);
// my_map_no_v and my_list columns should now be equivalent
let c0 = out.column(1).as_list::<i32>();
let c1 = out.column(2).as_list::<i32>();
assert_eq!(c0.len(), c1.len());
c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
}
Expand Down
86 changes: 24 additions & 62 deletions parquet/src/record/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ impl Iterator for ReaderIter {
mod tests {
use super::*;

use crate::data_type::{Int32Type, Int64Type};
use crate::data_type::Int64Type;
use crate::file::reader::SerializedFileReader;
use crate::file::writer::SerializedFileWriter;
use crate::record::api::RowAccessor;
Expand Down Expand Up @@ -1838,69 +1838,31 @@ mod tests {

#[test]
fn test_map_no_value() {
let schema = "
message spark_schema {
REQUIRED group my_map (MAP) {
REPEATED group key_value {
REQUIRED INT32 key;
}
}
REQUIRED group my_list (LIST) {
REPEATED group list {
REQUIRED INT32 element;
}
}
}
";
let schema = Arc::new(parse_message_type(schema).unwrap());

// Write Parquet file to buffer
let mut buffer: Vec<u8> = Vec::new();
let mut file_writer =
SerializedFileWriter::new(&mut buffer, schema, Default::default()).unwrap();
let mut row_group_writer = file_writer.next_row_group().unwrap();

// Write column my_map.key_value.key
let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
column_writer
.typed::<Int32Type>()
.write_batch(
&[1, 2, 3, 4, 5, 6, 7, 8, 9],
Some(&[1, 1, 1, 1, 1, 1, 1, 1, 1]),
Some(&[0, 1, 1, 0, 1, 1, 0, 1, 1]),
)
.unwrap();
column_writer.close().unwrap();

// Write column my_list.list.element
let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
column_writer
.typed::<Int32Type>()
.write_batch(
&[1, 2, 3, 4, 5, 6, 7, 8, 9],
Some(&[1, 1, 1, 1, 1, 1, 1, 1, 1]),
Some(&[0, 1, 1, 0, 1, 1, 0, 1, 1]),
)
.unwrap();
column_writer.close().unwrap();

// Finalize Parquet file
row_group_writer.close().unwrap();
file_writer.close().unwrap();
assert_eq!(&buffer[0..4], b"PAR1");

// Read Parquet file from buffer
let file_reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap();
let rows: Vec<_> = file_reader
.get_row_iter(None)
.unwrap()
.map(|row| row.unwrap())
.collect();

// the two columns should be equivalent lists by this point
// File schema:
// message schema {
// required group my_map (MAP) {
// repeated group key_value {
// required int32 key;
// optional int32 value;
// }
// }
// required group my_map_no_v (MAP) {
// repeated group key_value {
// required int32 key;
// }
// }
// required group my_list (LIST) {
// repeated group list {
// required int32 element;
// }
// }
// }
let rows = test_file_reader_rows("map_no_value.parquet", None).unwrap();

// the my_map_no_v and my_list columns should be equivalent lists by this point
for row in rows {
let cols = row.into_columns();
assert_eq!(cols[0].1, cols[1].1);
assert_eq!(cols[1].1, cols[2].1);
}
}

Expand Down

0 comments on commit b23ec72

Please sign in to comment.