Skip to content

Commit

Permalink
make various ci improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
XiangpengHao committed Jan 10, 2025
1 parent 547fb46 commit 05c8c8f
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 43 deletions.
76 changes: 38 additions & 38 deletions parquet/src/arrow/async_reader/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,42 @@ impl<R: ChunkReader> PageReader for CachedPageReader<R> {
}
}

// Helper implementation for testing
#[cfg(test)]
impl Page {
fn dummy_page(page_type: PageType, size: usize) -> Self {
use crate::basic::Encoding;
match page_type {
PageType::DATA_PAGE => Page::DataPage {
buf: vec![0; size].into(),
num_values: size as u32,
encoding: Encoding::PLAIN,
def_level_encoding: Encoding::PLAIN,
rep_level_encoding: Encoding::PLAIN,
statistics: None,
},
PageType::DICTIONARY_PAGE => Page::DictionaryPage {
buf: vec![0; size].into(),
num_values: size as u32,
encoding: Encoding::PLAIN,
is_sorted: false,
},
PageType::DATA_PAGE_V2 => Page::DataPageV2 {
buf: vec![0; size].into(),
num_values: size as u32,
encoding: Encoding::PLAIN,
def_levels_byte_len: 0,
rep_levels_byte_len: 0,
is_compressed: false,
statistics: None,
num_nulls: 0,
num_rows: 0,
},
_ => unreachable!(),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -434,7 +470,7 @@ mod tests {
// Check remaining queue
assert_eq!(queue.len(), 1);
assert_eq!(queue[0].row_count, 7);
assert_eq!(queue[0].skip, false);
assert!(!queue[0].skip);
}

#[test]
Expand All @@ -451,7 +487,7 @@ mod tests {

// Check remaining queue - should have 5 rows from split and original 10
assert_eq!(queue.len(), 1);
assert_eq!(queue[0].skip, false);
assert!(!queue[0].skip);
assert_eq!(queue[0].row_count, 5);
}

Expand Down Expand Up @@ -536,39 +572,3 @@ mod tests {
assert!(cache.get().get_page(2, 1000).is_none());
}
}

// Helper implementation for testing
#[cfg(test)]
impl Page {
fn dummy_page(page_type: PageType, size: usize) -> Self {
use crate::basic::Encoding;
match page_type {
PageType::DATA_PAGE => Page::DataPage {
buf: vec![0; size].into(),
num_values: size as u32,
encoding: Encoding::PLAIN,
def_level_encoding: Encoding::PLAIN,
rep_level_encoding: Encoding::PLAIN,
statistics: None,
},
PageType::DICTIONARY_PAGE => Page::DictionaryPage {
buf: vec![0; size].into(),
num_values: size as u32,
encoding: Encoding::PLAIN,
is_sorted: false,
},
PageType::DATA_PAGE_V2 => Page::DataPageV2 {
buf: vec![0; size].into(),
num_values: size as u32,
encoding: Encoding::PLAIN,
def_levels_byte_len: 0,
rep_levels_byte_len: 0,
is_compressed: false,
statistics: None,
num_nulls: 0,
num_rows: 0,
},
_ => unreachable!(),
}
}
}
6 changes: 3 additions & 3 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
type ReadResult<T> = Result<(ReaderFactory<T>, Option<FilteredParquetRecordBatchReader>)>;

/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
/// [`ParquetRecordBatchReader`]
/// [`FilteredParquetRecordBatchReader`]
struct ReaderFactory<T> {
metadata: Arc<ParquetMetaData>,

Expand Down Expand Up @@ -517,7 +517,7 @@ where
for predicate in filter.predicates.iter_mut() {
let p_projection = predicate.projection();
if let Some(ref mut p) = predicate_projection {
p.union(&p_projection);
p.union(p_projection);
} else {
predicate_projection = Some(p_projection.clone());
}
Expand Down Expand Up @@ -849,7 +849,7 @@ impl<'a> InMemoryRowGroup<'a> {
}
}
}
impl<'a> InMemoryRowGroup<'a> {
impl InMemoryRowGroup<'_> {
/// Fetches the necessary column data into memory
async fn fetch<T: AsyncFileReader + Send>(
&mut self,
Expand Down
5 changes: 3 additions & 2 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,6 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {

#[cfg(test)]
mod tests {
use std::collections::HashSet;

use bytes::Buf;

Expand Down Expand Up @@ -1165,6 +1164,7 @@ mod tests {
assert_eq!(page_count, 2);
}

#[cfg(feature = "async")]
fn get_serialized_page_reader<R: ChunkReader>(
file_reader: &SerializedFileReader<R>,
row_group: usize,
Expand Down Expand Up @@ -1201,12 +1201,13 @@ mod tests {
)
}

#[cfg(feature = "async")]
#[test]
fn test_peek_next_page_offset_matches_actual() -> Result<()> {
let test_file = get_test_file("alltypes_plain.parquet");
let reader = SerializedFileReader::new(test_file)?;

let mut offset_set = HashSet::new();
let mut offset_set = std::collections::HashSet::new();
let num_row_groups = reader.metadata.num_row_groups();
for row_group in 0..num_row_groups {
let num_columns = reader.metadata.row_group(row_group).num_columns();
Expand Down

0 comments on commit 05c8c8f

Please sign in to comment.