Skip to content

Commit

Permalink
Optionally coerce names of maps and lists to match Parquet specificat…
Browse files Browse the repository at this point in the history
…ion (#6828)

* optionally coerce names of maps and lists to match Parquet spec

* less verbose

* add ArrowWriter round trip test

* move documentation to builder

* use create_random_array for map and list arrays
  • Loading branch information
etseidl authored Dec 5, 2024
1 parent 93ce75c commit 30c14ab
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 17 deletions.
51 changes: 51 additions & 0 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,7 @@ mod tests {
use arrow::datatypes::ToByteSlice;
use arrow::datatypes::{DataType, Schema};
use arrow::error::Result as ArrowResult;
use arrow::util::data_gen::create_random_array;
use arrow::util::pretty::pretty_format_batches;
use arrow::{array::*, buffer::Buffer};
use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer};
Expand Down Expand Up @@ -2491,6 +2492,56 @@ mod tests {
one_column_roundtrip(values, false);
}

#[test]
fn list_and_map_coerced_names() {
// Create map and list with non-Parquet naming
let list_field =
Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
let map_field = Field::new_map(
"my_map",
"entries",
Field::new("keys", DataType::Int32, false),
Field::new("values", DataType::Int32, true),
false,
true,
);

let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();

let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));

// Write data to Parquet but coerce names to match spec
let props = Some(WriterProperties::builder().set_coerce_types(true).build());
let file = tempfile::tempfile().unwrap();
let mut writer =
ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();

let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
writer.write(&batch).unwrap();
let file_metadata = writer.close().unwrap();

// Coerced name of "item" should be "element"
assert_eq!(file_metadata.schema[3].name, "element");
// Coerced name of "entries" should be "key_value"
assert_eq!(file_metadata.schema[5].name, "key_value");
// Coerced name of "keys" should be "key"
assert_eq!(file_metadata.schema[6].name, "key");
// Coerced name of "values" should be "value"
assert_eq!(file_metadata.schema[7].name, "value");

// Double check schema after reading from the file
let reader = SerializedFileReader::new(file).unwrap();
let file_schema = reader.metadata().file_metadata().schema();
let fields = file_schema.get_fields();
let list_field = &fields[0].get_fields()[0];
assert_eq!(list_field.get_fields()[0].name(), "element");
let map_field = &fields[1].get_fields()[0];
assert_eq!(map_field.name(), "key_value");
assert_eq!(map_field.get_fields()[0].name(), "key");
assert_eq!(map_field.get_fields()[1].name(), "value");
}

#[test]
fn fallback_flush_data_page() {
//tests if the Fallback::flush_data_page clears all buffers correctly
Expand Down
110 changes: 104 additions & 6 deletions parquet/src/arrow/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,11 @@ pub fn decimal_length_from_precision(precision: u8) -> usize {

/// Convert an arrow field to a parquet `Type`
fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
const PARQUET_LIST_ELEMENT_NAME: &str = "element";
const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
const PARQUET_KEY_FIELD_NAME: &str = "key";
const PARQUET_VALUE_FIELD_NAME: &str = "value";

let name = field.name().as_str();
let repetition = if field.is_nullable() {
Repetition::OPTIONAL
Expand Down Expand Up @@ -527,10 +532,18 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
.with_id(id)
.build(),
DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
// Ensure proper naming per the Parquet specification
let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
} else {
Arc::new(arrow_to_parquet_type(f, coerce_types)?)
};

Type::group_type_builder(name)
.with_fields(vec![Arc::new(
Type::group_type_builder("list")
.with_fields(vec![Arc::new(arrow_to_parquet_type(f, coerce_types)?)])
.with_fields(vec![field_ref])
.with_repetition(Repetition::REPEATED)
.build()?,
)])
Expand Down Expand Up @@ -559,13 +572,29 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
}
DataType::Map(field, _) => {
if let DataType::Struct(struct_fields) = field.data_type() {
// If coercing then set inner struct name to "key_value"
let map_struct_name = if coerce_types {
PARQUET_MAP_STRUCT_NAME
} else {
field.name()
};

// If coercing then ensure struct fields are named "key" and "value"
let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
if coerce_types && fld.name() != name {
let f = fld.as_ref().clone().with_name(name);
Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
} else {
Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
}
};
let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;

Type::group_type_builder(name)
.with_fields(vec![Arc::new(
Type::group_type_builder(field.name())
.with_fields(vec![
Arc::new(arrow_to_parquet_type(&struct_fields[0], coerce_types)?),
Arc::new(arrow_to_parquet_type(&struct_fields[1], coerce_types)?),
])
Type::group_type_builder(map_struct_name)
.with_fields(vec![key_field, val_field])
.with_repetition(Repetition::REPEATED)
.build()?,
)])
Expand Down Expand Up @@ -1420,6 +1449,75 @@ mod tests {
assert_eq!(arrow_fields, converted_arrow_fields);
}

#[test]
fn test_coerced_map_list() {
// Create Arrow schema with non-Parquet naming
let arrow_fields = vec![
Field::new_list(
"my_list",
Field::new("item", DataType::Boolean, true),
false,
),
Field::new_map(
"my_map",
"entries",
Field::new("keys", DataType::Utf8, false),
Field::new("values", DataType::Int32, true),
false,
true,
),
];
let arrow_schema = Schema::new(arrow_fields);

// Create Parquet schema with coerced names
let message_type = "
message parquet_schema {
REQUIRED GROUP my_list (LIST) {
REPEATED GROUP list {
OPTIONAL BOOLEAN element;
}
}
OPTIONAL GROUP my_map (MAP) {
REPEATED GROUP key_value {
REQUIRED BINARY key (STRING);
OPTIONAL INT32 value;
}
}
}
";
let parquet_group_type = parse_message_type(message_type).unwrap();
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, true).unwrap();
assert_eq!(
parquet_schema.columns().len(),
converted_arrow_schema.columns().len()
);

// Create Parquet schema without coerced names
let message_type = "
message parquet_schema {
REQUIRED GROUP my_list (LIST) {
REPEATED GROUP list {
OPTIONAL BOOLEAN item;
}
}
OPTIONAL GROUP my_map (MAP) {
REPEATED GROUP entries {
REQUIRED BINARY keys (STRING);
OPTIONAL INT32 values;
}
}
}
";
let parquet_group_type = parse_message_type(message_type).unwrap();
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, false).unwrap();
assert_eq!(
parquet_schema.columns().len(),
converted_arrow_schema.columns().len()
);
}

#[test]
fn test_field_to_column_desc() {
let message_type = "
Expand Down
7 changes: 7 additions & 0 deletions parquet/src/bin/parquet-rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ struct Args {
/// Sets writer version.
#[clap(long)]
writer_version: Option<WriterVersionArgs>,

/// Sets whether to coerce Arrow types to match Parquet specification
#[clap(long)]
coerce_types: Option<bool>,
}

fn main() {
Expand Down Expand Up @@ -262,6 +266,9 @@ fn main() {
if let Some(value) = args.writer_version {
writer_properties_builder = writer_properties_builder.set_writer_version(value.into());
}
if let Some(value) = args.coerce_types {
writer_properties_builder = writer_properties_builder.set_coerce_types(value);
}
let writer_properties = writer_properties_builder.build();
let mut parquet_writer = ArrowWriter::try_new(
File::create(&args.output).expect("Unable to open output file"),
Expand Down
28 changes: 17 additions & 11 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,15 +287,7 @@ impl WriterProperties {
self.statistics_truncate_length
}

/// Returns `coerce_types` boolean
///
/// Some Arrow types do not have a corresponding Parquet logical type.
/// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`.
/// Writers have the option to coerce these into native Parquet types. Type
/// coercion allows for meaningful representations that do not require
/// downstream readers to consider the embedded Arrow schema. However, type
/// coercion also prevents the data from being losslessly round-tripped. This method
/// returns `true` if type coercion enabled.
/// Returns `true` if type coercion is enabled.
pub fn coerce_types(&self) -> bool {
self.coerce_types
}
Expand Down Expand Up @@ -788,8 +780,22 @@ impl WriterPropertiesBuilder {
self
}

/// Sets flag to enable/disable type coercion.
/// Takes precedence over globally defined settings.
/// Sets flag to control if type coercion is enabled (defaults to `false`).
///
/// # Notes
/// Some Arrow types do not have a corresponding Parquet logical type.
/// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`.
/// Also, for [`List`] and [`Map`] types, Parquet expects certain schema elements
/// to have specific names to be considered fully compliant.
/// Writers have the option to coerce these types and names to match those required
/// by the Parquet specification.
/// This type coercion allows for meaningful representations that do not require
/// downstream readers to consider the embedded Arrow schema, and can allow for greater
/// compatibility with other Parquet implementations. However, type
/// coercion also prevents the data from being losslessly round-tripped.
///
/// [`List`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
/// [`Map`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
pub fn set_coerce_types(mut self, coerce_types: bool) -> Self {
self.coerce_types = coerce_types;
self
Expand Down

0 comments on commit 30c14ab

Please sign in to comment.