Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally coerce names of maps and lists to match Parquet specification #6828

Merged
merged 5 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 104 additions & 6 deletions parquet/src/arrow/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,11 @@ pub fn decimal_length_from_precision(precision: u8) -> usize {

/// Convert an arrow field to a parquet `Type`
fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
const PARQUET_LIST_ELEMENT_NAME: &str = "element";
const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
const PARQUET_KEY_FIELD_NAME: &str = "key";
const PARQUET_VALUE_FIELD_NAME: &str = "value";

let name = field.name().as_str();
let repetition = if field.is_nullable() {
Repetition::OPTIONAL
Expand Down Expand Up @@ -527,10 +532,18 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
.with_id(id)
.build(),
DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
// Ensure proper naming per the Parquet specification
let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
} else {
Arc::new(arrow_to_parquet_type(f, coerce_types)?)
};

Type::group_type_builder(name)
.with_fields(vec![Arc::new(
Type::group_type_builder("list")
.with_fields(vec![Arc::new(arrow_to_parquet_type(f, coerce_types)?)])
.with_fields(vec![field_ref])
.with_repetition(Repetition::REPEATED)
.build()?,
)])
Expand Down Expand Up @@ -559,13 +572,29 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
}
DataType::Map(field, _) => {
if let DataType::Struct(struct_fields) = field.data_type() {
// If coercing then set inner struct name to "key_value"
let map_struct_name = if coerce_types {
PARQUET_MAP_STRUCT_NAME
} else {
field.name()
};

// If coercing then ensure struct fields are named "key" and "value"
let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
if coerce_types && fld.name() != name {
let f = fld.as_ref().clone().with_name(name);
Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
} else {
Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
}
};
let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;

Type::group_type_builder(name)
.with_fields(vec![Arc::new(
Type::group_type_builder(field.name())
.with_fields(vec![
Arc::new(arrow_to_parquet_type(&struct_fields[0], coerce_types)?),
Arc::new(arrow_to_parquet_type(&struct_fields[1], coerce_types)?),
])
Type::group_type_builder(map_struct_name)
.with_fields(vec![key_field, val_field])
.with_repetition(Repetition::REPEATED)
.build()?,
)])
Expand Down Expand Up @@ -1420,6 +1449,75 @@ mod tests {
assert_eq!(arrow_fields, converted_arrow_fields);
}

#[test]
fn test_coerced_map_list() {
// Create Arrow schema with non-Parquet naming
let arrow_fields = vec![
Field::new_list(
"my_list",
Field::new("item", DataType::Boolean, true),
false,
),
Field::new_map(
"my_map",
"entries",
Field::new("keys", DataType::Utf8, false),
Field::new("values", DataType::Int32, true),
false,
true,
),
];
let arrow_schema = Schema::new(arrow_fields);

// Create Parquet schema with coerced names
let message_type = "
message parquet_schema {
REQUIRED GROUP my_list (LIST) {
REPEATED GROUP list {
OPTIONAL BOOLEAN element;
}
}
OPTIONAL GROUP my_map (MAP) {
REPEATED GROUP key_value {
REQUIRED BINARY key (STRING);
OPTIONAL INT32 value;
}
}
}
";
let parquet_group_type = parse_message_type(message_type).unwrap();
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, true).unwrap();
assert_eq!(
parquet_schema.columns().len(),
converted_arrow_schema.columns().len()
);

// Create Parquet schema without coerced names
let message_type = "
message parquet_schema {
REQUIRED GROUP my_list (LIST) {
REPEATED GROUP list {
OPTIONAL BOOLEAN item;
}
}
OPTIONAL GROUP my_map (MAP) {
REPEATED GROUP entries {
REQUIRED BINARY keys (STRING);
OPTIONAL INT32 values;
}
}
}
";
let parquet_group_type = parse_message_type(message_type).unwrap();
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, false).unwrap();
assert_eq!(
parquet_schema.columns().len(),
converted_arrow_schema.columns().len()
);
}

#[test]
fn test_field_to_column_desc() {
let message_type = "
Expand Down
7 changes: 7 additions & 0 deletions parquet/src/bin/parquet-rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ struct Args {
/// Sets writer version.
#[clap(long)]
writer_version: Option<WriterVersionArgs>,

/// Sets whether to coerce Arrow types to match Parquet specification
#[clap(long)]
coerce_types: Option<bool>,
}

fn main() {
Expand Down Expand Up @@ -262,6 +266,9 @@ fn main() {
if let Some(value) = args.writer_version {
writer_properties_builder = writer_properties_builder.set_writer_version(value.into());
}
if let Some(value) = args.coerce_types {
writer_properties_builder = writer_properties_builder.set_coerce_types(value);
}
let writer_properties = writer_properties_builder.build();
let mut parquet_writer = ArrowWriter::try_new(
File::create(&args.output).expect("Unable to open output file"),
Expand Down
18 changes: 13 additions & 5 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,15 +287,22 @@ impl WriterProperties {
self.statistics_truncate_length
}

/// Returns `coerce_types` boolean
/// Returns whether `coerce_types` is enabled (defaults to `false`).
///
/// Some Arrow types do not have a corresponding Parquet logical type.
/// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`.
/// Writers have the option to coerce these into native Parquet types. Type
/// coercion allows for meaningful representations that do not require
/// downstream readers to consider the embedded Arrow schema. However, type
/// Also, for [`List`] and [`Map`] types, Parquet expects certain schema elements
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should most of this verbiage move to set_coerce_types on the builder?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or set_coerce_types could link to here

Copy link
Contributor Author

@etseidl etseidl Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or set_coerce_types could link to here

I already made that change. My concern is that most of the deep documentation is currently on the setters rather than the getters...this one is an outlier.

Edit: I went ahead and moved the bulk of the documentation to the builder. I think more eyes will find it there.

/// to have specific names to be considered fully compliant.
/// Writers have the option to coerce these types and names to match those required
/// by the Parquet specification.
/// This type coercion allows for meaningful representations that do not require
/// downstream readers to consider the embedded Arrow schema, and can allow for greater
/// compatibility with other Parquet implementations. However, type
/// coercion also prevents the data from being losslessly round-tripped. This method
/// returns `true` if type coercion enabled.
///
/// [`List`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
/// [`Map`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
pub fn coerce_types(&self) -> bool {
self.coerce_types
}
Expand Down Expand Up @@ -789,7 +796,8 @@ impl WriterPropertiesBuilder {
}

/// Sets flag to enable/disable type coercion.
/// Takes precedence over globally defined settings.
/// Takes precedence over globally defined settings. See [`WriterProperties::coerce_types`]
/// for more information.
pub fn set_coerce_types(mut self, coerce_types: bool) -> Self {
self.coerce_types = coerce_types;
self
Expand Down
Loading