Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Sep 17, 2024
1 parent f631502 commit 2c7ec67
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 33 deletions.
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use polars_error::{ErrString, PolarsError};
#[cfg(feature = "cloud")]
pub use reader::ParquetAsyncReader;
pub use reader::{BatchedParquetReader, ParquetReader};
pub use utils::materialize_empty_df;
pub use utils::{ensure_schema_has_projected_fields, materialize_empty_df};

pub mod _internal {
pub use super::mmap::to_deserializer;
Expand Down
36 changes: 35 additions & 1 deletion crates/polars-io/src/parquet/read/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::borrow::Cow;

use polars_core::prelude::{ArrowSchema, DataFrame, Series, IDX_DTYPE};
use polars_core::prelude::{ArrowSchema, DataFrame, DataType, PlHashMap, Series, IDX_DTYPE};
use polars_error::{polars_bail, PolarsResult};
use polars_utils::pl_str::PlSmallStr;

use crate::hive::materialize_hive_partitions;
use crate::utils::apply_projection;
Expand Down Expand Up @@ -28,3 +30,35 @@ pub fn materialize_empty_df(

df
}

/// Ensures that a parquet file has all the necessary columns for a projection with the correct
/// dtype. There are no ordering requirements and extra columns are permitted.
pub fn ensure_schema_has_projected_fields(
schema: &ArrowSchema,
projected_arrow_schema: &ArrowSchema,
) -> PolarsResult<()> {
// Note: We convert to Polars-native dtypes for timezone normalization.
let mut schema = schema
.iter_values()
.map(|x| {
let dtype = DataType::from_arrow(&x.dtype, true);
(x.name.clone(), dtype)
})
.collect::<PlHashMap<PlSmallStr, DataType>>();

for field in projected_arrow_schema.iter_values() {
let Some(dtype) = schema.remove(&field.name) else {
polars_bail!(SchemaMismatch: "did not find column: {}", field.name)
};

let expected_dtype = DataType::from_arrow(&field.dtype, true);

if dtype != expected_dtype {
polars_bail!(SchemaMismatch: "data type mismatch for column {}: found: {}, expected: {}",
&field.name, dtype, expected_dtype
)
}
}

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ impl ParquetSourceNode {
};

ensure_metadata_has_projected_fields(
projected_arrow_schema.as_ref(),
&metadata,
projected_arrow_schema.as_ref(),
)?;

PolarsResult::Ok((path_index, byte_source, metadata))
Expand Down
35 changes: 5 additions & 30 deletions crates/polars-stream/src/nodes/parquet_source/metadata_utils.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use polars_core::prelude::{ArrowSchema, DataType, PlHashMap};
use polars_error::{polars_bail, PolarsResult};
use polars_io::prelude::FileMetadata;
use polars_core::prelude::ArrowSchema;
use polars_error::PolarsResult;
use polars_io::prelude::{ensure_schema_has_projected_fields, FileMetadata};
use polars_io::utils::byte_source::{ByteSource, DynByteSource};
use polars_utils::mmap::MemSlice;
use polars_utils::pl_str::PlSmallStr;

/// Read the metadata bytes of a parquet file, does not decode the bytes. If during metadata fetch
/// the bytes of the entire file are loaded, it is returned in the second return value.
Expand Down Expand Up @@ -124,33 +123,9 @@ pub(super) async fn read_parquet_metadata_bytes(
/// Ensures that a parquet file has all the necessary columns for a projection with the correct
/// dtype. There are no ordering requirements and extra columns are permitted.
pub(super) fn ensure_metadata_has_projected_fields(
projected_fields: &ArrowSchema,
metadata: &FileMetadata,
projected_arrow_schema: &ArrowSchema,
) -> PolarsResult<()> {
let schema = polars_parquet::arrow::read::infer_schema(metadata)?;

// Note: We convert to Polars-native dtypes for timezone normalization.
let mut schema = schema
.into_iter_values()
.map(|x| {
let dtype = DataType::from_arrow(&x.dtype, true);
(x.name, dtype)
})
.collect::<PlHashMap<PlSmallStr, DataType>>();

for field in projected_fields.iter_values() {
let Some(dtype) = schema.remove(&field.name) else {
polars_bail!(SchemaMismatch: "did not find column: {}", field.name)
};

let expected_dtype = DataType::from_arrow(&field.dtype, true);

if dtype != expected_dtype {
polars_bail!(SchemaMismatch: "data type mismatch for column {}: found: {}, expected: {}",
&field.name, dtype, expected_dtype
)
}
}

Ok(())
ensure_schema_has_projected_fields(&schema, projected_arrow_schema)
}

0 comments on commit 2c7ec67

Please sign in to comment.