Skip to content

Commit

Permalink
Improved read of metadata (#143)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored May 26, 2022
1 parent e2b7533 commit 792a419
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 103 deletions.
118 changes: 30 additions & 88 deletions src/read/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ use std::{
use parquet_format_async_temp::thrift::protocol::TCompactInputProtocol;
use parquet_format_async_temp::FileMetaData as TFileMetaData;

use super::super::{metadata::FileMetaData, DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE, PARQUET_MAGIC};
use super::super::{
metadata::FileMetaData, DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE, HEADER_SIZE, PARQUET_MAGIC,
};

use crate::error::{Error, Result};
use crate::HEADER_SIZE;

pub(super) fn metadata_len(buffer: &[u8], len: usize) -> i32 {
i32::from_le_bytes(buffer[len - 8..len - 4].try_into().unwrap())
Expand Down Expand Up @@ -51,109 +52,50 @@ pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetaData> {
// read and cache up to DEFAULT_FOOTER_READ_SIZE bytes from the end and process the footer
let default_end_len = min(DEFAULT_FOOTER_READ_SIZE, file_size) as usize;
reader.seek(SeekFrom::End(-(default_end_len as i64)))?;
let mut default_len_end_buf = vec![0; default_end_len];
reader.read_exact(&mut default_len_end_buf)?;
let mut buffer = vec![0; default_end_len];
reader.read_exact(&mut buffer)?;

// check this is indeed a parquet file
if default_len_end_buf[default_end_len - 4..] != PARQUET_MAGIC {
if buffer[default_end_len - 4..] != PARQUET_MAGIC {
return Err(general_err!("Invalid Parquet file. Corrupt footer"));
}

let metadata_len = metadata_len(&default_len_end_buf, default_end_len);
let metadata = metadata_len(&buffer, default_end_len);

if metadata_len < 0 {
return Err(general_err!(
let metadata_len: u64 = metadata.try_into().map_err(|_| {
general_err!(
"Invalid Parquet file. Metadata length is less than zero ({})",
metadata_len
));
}
let footer_metadata_len = FOOTER_SIZE + metadata_len as u64;
metadata
)
})?;

let metadata = if footer_metadata_len > file_size {
let footer_len = FOOTER_SIZE + metadata_len;
if footer_len > file_size {
return Err(general_err!(
"Invalid Parquet file. Metadata start is less than zero ({})",
file_size as i64 - footer_metadata_len as i64
file_size as i64 - footer_len as i64
));
} else if footer_metadata_len < DEFAULT_FOOTER_READ_SIZE {
}

let reader = if (footer_len as usize) < buffer.len() {
// the whole metadata is in the bytes we already read
// build up the reader covering the entire metadata
let mut reader = Cursor::new(default_len_end_buf);
reader.seek(SeekFrom::End(-(footer_metadata_len as i64)))?;
let mut reader = Cursor::new(buffer);
reader.seek(SeekFrom::End(-(footer_len as i64)))?;

let mut prot = TCompactInputProtocol::new(reader);
TFileMetaData::read_from_in_protocol(&mut prot)
reader
} else {
// the end of file read by default is not long enough, read again including all metadata.
reader.seek(SeekFrom::End(-(footer_metadata_len as i64)))?;
reader.seek(SeekFrom::End(-(footer_len as i64)))?;
let mut buffer = vec![0; footer_len as usize];
reader.read_exact(&mut buffer)?;

let mut prot = TCompactInputProtocol::new(reader);
TFileMetaData::read_from_in_protocol(&mut prot)
}
.map_err(|e| Error::General(format!("Could not parse metadata: {}", e)))?;
Cursor::new(buffer)
};

FileMetaData::try_from_thrift(metadata)
}
let mut prot = TCompactInputProtocol::new(reader);
let metadata = TFileMetaData::read_from_in_protocol(&mut prot)
.map_err(|e| Error::General(format!("Could not parse metadata: {}", e)))?;

#[cfg(test)]
mod tests {
use std::fs::File;

use super::*;

use crate::schema::{types::PhysicalType, Repetition};
use crate::tests::get_path;

#[test]
fn test_basics() {
let mut testdata = get_path();
testdata.push("alltypes_plain.parquet");
let mut file = File::open(testdata).unwrap();

let metadata = read_metadata(&mut file).unwrap();

let columns = metadata.schema_descr.columns();

/*
from pyarrow:
required group field_id=0 schema {
optional int32 field_id=1 id;
optional boolean field_id=2 bool_col;
optional int32 field_id=3 tinyint_col;
optional int32 field_id=4 smallint_col;
optional int32 field_id=5 int_col;
optional int64 field_id=6 bigint_col;
optional float field_id=7 float_col;
optional double field_id=8 double_col;
optional binary field_id=9 date_string_col;
optional binary field_id=10 string_col;
optional int96 field_id=11 timestamp_col;
}
*/
let expected = vec![
PhysicalType::Int32,
PhysicalType::Boolean,
PhysicalType::Int32,
PhysicalType::Int32,
PhysicalType::Int32,
PhysicalType::Int64,
PhysicalType::Float,
PhysicalType::Double,
PhysicalType::ByteArray,
PhysicalType::ByteArray,
PhysicalType::Int96,
];

let result = columns
.iter()
.map(|column| {
assert_eq!(
column.descriptor.primitive_type.field_info.repetition,
Repetition::Optional
);
column.descriptor.primitive_type.physical_type
})
.collect::<Vec<_>>();

assert_eq!(expected, result);
}
FileMetaData::try_from_thrift(metadata)
}
30 changes: 15 additions & 15 deletions src/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,41 +52,41 @@ pub async fn read_metadata<R: AsyncRead + AsyncSeek + Send + std::marker::Unpin>
));
}

let metadata_len = metadata_len(&buffer, default_end_len);
let metadata = metadata_len(&buffer, default_end_len);

if metadata_len < 0 {
return Err(general_err!(
"Invalid file. Metadata length is less than zero ({})",
metadata_len
));
}
let footer_len = FOOTER_SIZE + metadata_len as u64;
let metadata_len: u64 = metadata.try_into().map_err(|_| {
general_err!(
"Invalid Parquet file. Metadata length is less than zero ({})",
metadata
)
})?;

let footer_len = FOOTER_SIZE + metadata_len;
if footer_len > file_size {
return Err(general_err!(
"Invalid Parquet file. Metadata start is less than zero ({})",
file_size as i64 - footer_len as i64
));
}

let metadata = if footer_len < DEFAULT_FOOTER_READ_SIZE {
let reader = if (footer_len as usize) < buffer.len() {
// the whole metadata is in the bytes we already read
// build up the reader covering the entire metadata
let mut reader = Cursor::new(buffer);
reader.seek(SeekFrom::End(-(footer_len as i64)))?;

let mut prot = TCompactInputProtocol::new(reader);
TFileMetaData::read_from_in_protocol(&mut prot)?
reader
} else {
// the end of file read by default is not long enough, read again including all metadata.
reader.seek(SeekFrom::End(-(footer_len as i64))).await?;
let mut buffer = vec![0; footer_len as usize];
reader.read_exact(&mut buffer).await?;

let reader = Cursor::new(buffer);

let mut prot = TCompactInputProtocol::new(reader);
TFileMetaData::read_from_in_protocol(&mut prot)?
Cursor::new(buffer)
};

let mut prot = TCompactInputProtocol::new(reader);
let metadata = TFileMetaData::read_from_in_protocol(&mut prot)?;

FileMetaData::try_from_thrift(metadata)
}
56 changes: 56 additions & 0 deletions tests/it/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use parquet2::read::{MutStreamingIterator, State};
use parquet2::schema::types::GroupConvertedType;
use parquet2::schema::types::ParquetType;
use parquet2::schema::types::PhysicalType;
use parquet2::schema::Repetition;
use parquet2::statistics::{BinaryStatistics, BooleanStatistics, PrimitiveStatistics, Statistics};
use parquet2::types::int96_to_i64_ns;
use parquet2::FallibleStreamingIterator;
Expand Down Expand Up @@ -517,3 +518,58 @@ fn pyarrow_v1_struct_required() -> Result<()> {
fn pyarrow_v2_struct_required() -> Result<()> {
test_pyarrow_integration("struct", "struct_required", 2, false, false, "")
}

#[test]
fn test_metadata() -> Result<()> {
let mut testdata = get_path();
testdata.push("alltypes_plain.parquet");
let mut file = File::open(testdata).unwrap();

let metadata = read_metadata(&mut file)?;

let columns = metadata.schema_descr.columns();

/*
from pyarrow:
required group field_id=0 schema {
optional int32 field_id=1 id;
optional boolean field_id=2 bool_col;
optional int32 field_id=3 tinyint_col;
optional int32 field_id=4 smallint_col;
optional int32 field_id=5 int_col;
optional int64 field_id=6 bigint_col;
optional float field_id=7 float_col;
optional double field_id=8 double_col;
optional binary field_id=9 date_string_col;
optional binary field_id=10 string_col;
optional int96 field_id=11 timestamp_col;
}
*/
let expected = vec![
PhysicalType::Int32,
PhysicalType::Boolean,
PhysicalType::Int32,
PhysicalType::Int32,
PhysicalType::Int32,
PhysicalType::Int64,
PhysicalType::Float,
PhysicalType::Double,
PhysicalType::ByteArray,
PhysicalType::ByteArray,
PhysicalType::Int96,
];

let result = columns
.iter()
.map(|column| {
assert_eq!(
column.descriptor.primitive_type.field_info.repetition,
Repetition::Optional
);
column.descriptor.primitive_type.physical_type
})
.collect::<Vec<_>>();

assert_eq!(expected, result);
Ok(())
}

0 comments on commit 792a419

Please sign in to comment.