From be3c0aadec6c71ed3e53b1a4eaec668d5429ef5e Mon Sep 17 00:00:00 2001 From: zhangli20 Date: Wed, 23 Oct 2024 20:24:45 +0800 Subject: [PATCH 1/3] skip reading unused columns --- src/stripe.rs | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/src/stripe.rs b/src/stripe.rs index d43e9cee..c73ed8e8 100644 --- a/src/stripe.rs +++ b/src/stripe.rs @@ -16,6 +16,7 @@ // under the License. use std::{collections::HashMap, io::Read, sync::Arc}; +use std::collections::HashSet; use bytes::Bytes; use prost::Message; @@ -139,23 +140,23 @@ impl Stripe { .context(IoSnafu)?; let footer = Arc::new(deserialize_stripe_footer(footer, compression)?); - let columns = projected_data_type + let columns: Vec = projected_data_type .children() .iter() .map(|col| Column::new(col.name(), col.data_type(), &footer)) .collect(); + let column_ids: HashSet = columns.iter().map(|c| c.column_id()).collect(); let mut stream_map = HashMap::new(); let mut stream_offset = info.offset(); for stream in &footer.streams { let length = stream.length(); let column_id = stream.column(); - let kind = stream.kind(); - let data = Column::read_stream(reader, stream_offset, length)?; - - // TODO(weny): filter out unused streams. - stream_map.insert((column_id, kind), data); - + if column_ids.contains(&column_id) { + let kind = stream.kind(); + let data = Column::read_stream(reader, stream_offset, length)?; + stream_map.insert((column_id, kind), data); + } stream_offset += length; } @@ -192,22 +193,23 @@ impl Stripe { .context(IoSnafu)?; let footer = Arc::new(deserialize_stripe_footer(footer, compression)?); - let columns = projected_data_type + let columns: Vec = projected_data_type .children() .iter() .map(|col| Column::new(col.name(), col.data_type(), &footer)) .collect(); + let column_ids: HashSet = columns.iter().map(|c| c.column_id()).collect(); let mut stream_map = HashMap::new(); let mut stream_offset = info.offset(); for stream in &footer.streams { let length = stream.length(); let column_id = stream.column(); - let kind = stream.kind(); - let data = Column::read_stream_async(reader, stream_offset, length).await?; - - // TODO(weny): filter out unused streams. - stream_map.insert((column_id, kind), data); + if column_ids.contains(&column_id) { + let kind = stream.kind(); + let data = Column::read_stream_async(reader, stream_offset, length).await?; + stream_map.insert((column_id, kind), data); + } stream_offset += length; } From 15ecd4db604fc41dbd834f5344728627ee464255 Mon Sep 17 00:00:00 2001 From: zhangli20 Date: Thu, 24 Oct 2024 10:48:16 +0800 Subject: [PATCH 2/3] fix lint --- src/stripe.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stripe.rs b/src/stripe.rs index c73ed8e8..9d3d4f32 100644 --- a/src/stripe.rs +++ b/src/stripe.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -use std::{collections::HashMap, io::Read, sync::Arc}; use std::collections::HashSet; +use std::{collections::HashMap, io::Read, sync::Arc}; use bytes::Bytes; use prost::Message; From a858f77847b15837acafa0ddb460c085a12b35e1 Mon Sep 17 00:00:00 2001 From: zhangli20 Date: Thu, 24 Oct 2024 15:59:20 +0800 Subject: [PATCH 3/3] supports complex data types --- src/stripe.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/stripe.rs b/src/stripe.rs index 9d3d4f32..3f043610 100644 --- a/src/stripe.rs +++ b/src/stripe.rs @@ -145,7 +145,7 @@ impl Stripe { .iter() .map(|col| Column::new(col.name(), col.data_type(), &footer)) .collect(); - let column_ids: HashSet = columns.iter().map(|c| c.column_id()).collect(); + let column_ids = collect_required_column_ids(&columns); let mut stream_map = HashMap::new(); let mut stream_offset = info.offset(); @@ -198,7 +198,7 @@ impl Stripe { .iter() .map(|col| Column::new(col.name(), col.data_type(), &footer)) .collect(); - let column_ids: HashSet = columns.iter().map(|c| c.column_id()).collect(); + let column_ids = collect_required_column_ids(&columns); let mut stream_map = HashMap::new(); let mut stream_offset = info.offset(); @@ -284,3 +284,12 @@ fn deserialize_stripe_footer( .context(error::IoSnafu)?; StripeFooter::decode(buffer.as_slice()).context(error::DecodeProtoSnafu) } + +fn collect_required_column_ids(columns: &[Column]) -> HashSet { + let mut set = HashSet::new(); + for column in columns { + set.insert(column.column_id()); + set.extend(collect_required_column_ids(&column.children())); + } + set +}