Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally coerce names of maps and lists to match Parquet specification #6828

Merged
merged 5 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,7 @@ mod tests {
use arrow::datatypes::ToByteSlice;
use arrow::datatypes::{DataType, Schema};
use arrow::error::Result as ArrowResult;
use arrow::util::data_gen::create_random_array;
use arrow::util::pretty::pretty_format_batches;
use arrow::{array::*, buffer::Buffer};
use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer};
Expand Down Expand Up @@ -2491,6 +2492,56 @@ mod tests {
one_column_roundtrip(values, false);
}

#[test]
fn list_and_map_coerced_names() {
// Create map and list with non-Parquet naming
let list_field =
Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
let map_field = Field::new_map(
"my_map",
"entries",
Field::new("keys", DataType::Int32, false),
Field::new("values", DataType::Int32, true),
false,
true,
);

let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();

let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));

// Write data to Parquet but coerce names to match spec
let props = Some(WriterProperties::builder().set_coerce_types(true).build());
let file = tempfile::tempfile().unwrap();
let mut writer =
ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();

let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
writer.write(&batch).unwrap();
let file_metadata = writer.close().unwrap();

// Coerced name of "item" should be "element"
assert_eq!(file_metadata.schema[3].name, "element");
// Coerced name of "entries" should be "key_value"
assert_eq!(file_metadata.schema[5].name, "key_value");
// Coerced name of "keys" should be "key"
assert_eq!(file_metadata.schema[6].name, "key");
// Coerced name of "values" should be "value"
assert_eq!(file_metadata.schema[7].name, "value");

// Double check schema after reading from the file
let reader = SerializedFileReader::new(file).unwrap();
let file_schema = reader.metadata().file_metadata().schema();
let fields = file_schema.get_fields();
let list_field = &fields[0].get_fields()[0];
assert_eq!(list_field.get_fields()[0].name(), "element");
let map_field = &fields[1].get_fields()[0];
assert_eq!(map_field.name(), "key_value");
assert_eq!(map_field.get_fields()[0].name(), "key");
assert_eq!(map_field.get_fields()[1].name(), "value");
}

#[test]
fn fallback_flush_data_page() {
//tests if the Fallback::flush_data_page clears all buffers correctly
Expand Down
110 changes: 104 additions & 6 deletions parquet/src/arrow/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,11 @@ pub fn decimal_length_from_precision(precision: u8) -> usize {

/// Convert an arrow field to a parquet `Type`
fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
const PARQUET_LIST_ELEMENT_NAME: &str = "element";
const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
const PARQUET_KEY_FIELD_NAME: &str = "key";
const PARQUET_VALUE_FIELD_NAME: &str = "value";

let name = field.name().as_str();
let repetition = if field.is_nullable() {
Repetition::OPTIONAL
Expand Down Expand Up @@ -527,10 +532,18 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
.with_id(id)
.build(),
DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
// Ensure proper naming per the Parquet specification
let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
} else {
Arc::new(arrow_to_parquet_type(f, coerce_types)?)
};

Type::group_type_builder(name)
.with_fields(vec![Arc::new(
Type::group_type_builder("list")
.with_fields(vec![Arc::new(arrow_to_parquet_type(f, coerce_types)?)])
.with_fields(vec![field_ref])
.with_repetition(Repetition::REPEATED)
.build()?,
)])
Expand Down Expand Up @@ -559,13 +572,29 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
}
DataType::Map(field, _) => {
if let DataType::Struct(struct_fields) = field.data_type() {
// If coercing then set inner struct name to "key_value"
let map_struct_name = if coerce_types {
PARQUET_MAP_STRUCT_NAME
} else {
field.name()
};

// If coercing then ensure struct fields are named "key" and "value"
let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
if coerce_types && fld.name() != name {
let f = fld.as_ref().clone().with_name(name);
Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
} else {
Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
}
};
let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;

Type::group_type_builder(name)
.with_fields(vec![Arc::new(
Type::group_type_builder(field.name())
.with_fields(vec![
Arc::new(arrow_to_parquet_type(&struct_fields[0], coerce_types)?),
Arc::new(arrow_to_parquet_type(&struct_fields[1], coerce_types)?),
])
Type::group_type_builder(map_struct_name)
.with_fields(vec![key_field, val_field])
.with_repetition(Repetition::REPEATED)
.build()?,
)])
Expand Down Expand Up @@ -1420,6 +1449,75 @@ mod tests {
assert_eq!(arrow_fields, converted_arrow_fields);
}

#[test]
fn test_coerced_map_list() {
// Create Arrow schema with non-Parquet naming
let arrow_fields = vec![
Field::new_list(
"my_list",
Field::new("item", DataType::Boolean, true),
false,
),
Field::new_map(
"my_map",
"entries",
Field::new("keys", DataType::Utf8, false),
Field::new("values", DataType::Int32, true),
false,
true,
),
];
let arrow_schema = Schema::new(arrow_fields);

// Create Parquet schema with coerced names
let message_type = "
message parquet_schema {
REQUIRED GROUP my_list (LIST) {
REPEATED GROUP list {
OPTIONAL BOOLEAN element;
}
}
OPTIONAL GROUP my_map (MAP) {
REPEATED GROUP key_value {
REQUIRED BINARY key (STRING);
OPTIONAL INT32 value;
}
}
}
";
let parquet_group_type = parse_message_type(message_type).unwrap();
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, true).unwrap();
assert_eq!(
parquet_schema.columns().len(),
converted_arrow_schema.columns().len()
);

// Create Parquet schema without coerced names
let message_type = "
message parquet_schema {
REQUIRED GROUP my_list (LIST) {
REPEATED GROUP list {
OPTIONAL BOOLEAN item;
}
}
OPTIONAL GROUP my_map (MAP) {
REPEATED GROUP entries {
REQUIRED BINARY keys (STRING);
OPTIONAL INT32 values;
}
}
}
";
let parquet_group_type = parse_message_type(message_type).unwrap();
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, false).unwrap();
assert_eq!(
parquet_schema.columns().len(),
converted_arrow_schema.columns().len()
);
}

#[test]
fn test_field_to_column_desc() {
let message_type = "
Expand Down
7 changes: 7 additions & 0 deletions parquet/src/bin/parquet-rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ struct Args {
/// Sets writer version.
#[clap(long)]
writer_version: Option<WriterVersionArgs>,

/// Sets whether to coerce Arrow types to match Parquet specification
#[clap(long)]
coerce_types: Option<bool>,
}

fn main() {
Expand Down Expand Up @@ -262,6 +266,9 @@ fn main() {
if let Some(value) = args.writer_version {
writer_properties_builder = writer_properties_builder.set_writer_version(value.into());
}
if let Some(value) = args.coerce_types {
writer_properties_builder = writer_properties_builder.set_coerce_types(value);
}
let writer_properties = writer_properties_builder.build();
let mut parquet_writer = ArrowWriter::try_new(
File::create(&args.output).expect("Unable to open output file"),
Expand Down
28 changes: 17 additions & 11 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,15 +287,7 @@ impl WriterProperties {
self.statistics_truncate_length
}

/// Returns `coerce_types` boolean
///
/// Some Arrow types do not have a corresponding Parquet logical type.
/// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`.
/// Writers have the option to coerce these into native Parquet types. Type
/// coercion allows for meaningful representations that do not require
/// downstream readers to consider the embedded Arrow schema. However, type
/// coercion also prevents the data from being losslessly round-tripped. This method
/// returns `true` if type coercion enabled.
/// Returns `true` if type coercion is enabled.
pub fn coerce_types(&self) -> bool {
self.coerce_types
}
Expand Down Expand Up @@ -788,8 +780,22 @@ impl WriterPropertiesBuilder {
self
}

/// Sets flag to enable/disable type coercion.
/// Takes precedence over globally defined settings.
/// Sets flag to control if type coercion is enabled (defaults to `false`).
///
/// # Notes
/// Some Arrow types do not have a corresponding Parquet logical type.
/// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`.
/// Also, for [`List`] and [`Map`] types, Parquet expects certain schema elements
/// to have specific names to be considered fully compliant.
/// Writers have the option to coerce these types and names to match those required
/// by the Parquet specification.
/// This type coercion allows for meaningful representations that do not require
/// downstream readers to consider the embedded Arrow schema, and can allow for greater
/// compatibility with other Parquet implementations. However, type
/// coercion also prevents the data from being losslessly round-tripped.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this wording too

///
/// [`List`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
/// [`Map`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
pub fn set_coerce_types(mut self, coerce_types: bool) -> Self {
self.coerce_types = coerce_types;
self
Expand Down
Loading