Skip to content

Commit

Permalink
refactor: in-place extraction of hot-tier manifest files
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh committed Dec 8, 2024
1 parent bcc8669 commit 0b2e19e
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 19 deletions.
33 changes: 19 additions & 14 deletions src/hottier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,31 +463,36 @@ impl HotTierManager {
Ok(hot_tier_manifest)
}

///get the list of files from all the manifests present in hot tier directory for the stream
/// Returns the list of manifest files present in hot tier directory for the stream
pub async fn get_hot_tier_manifest_files(
&self,
stream: &str,
manifest_files: Vec<File>,
) -> Result<(Vec<File>, Vec<File>), HotTierError> {
manifest_files: &mut Vec<File>,
) -> Result<Vec<File>, HotTierError> {
// Fetch the list of hot tier parquet files for the given stream.
let mut hot_tier_files = self.get_hot_tier_parquet_files(stream).await?;

// Retain only the files in `hot_tier_files` that also exist in `manifest_files`.
hot_tier_files.retain(|file| {
manifest_files
.iter()
.any(|manifest_file| manifest_file.file_path.eq(&file.file_path))
});

// Sort `hot_tier_files` in descending order by file path.
hot_tier_files.sort_unstable_by(|a, b| b.file_path.cmp(&a.file_path));

let mut remaining_files: Vec<File> = manifest_files
.into_iter()
.filter(|manifest_file| {
hot_tier_files
.iter()
.all(|file| !file.file_path.eq(&manifest_file.file_path))
})
.collect();
remaining_files.sort_unstable_by(|a, b| b.file_path.cmp(&a.file_path));

Ok((hot_tier_files, remaining_files))
// Update `manifest_files` to exclude files that are present in the filtered `hot_tier_files`.
manifest_files.retain(|manifest_file| {
hot_tier_files
.iter()
.all(|file| !file.file_path.eq(&manifest_file.file_path))
});

// Sort `manifest_files` in descending order by file path.
manifest_files.sort_unstable_by(|a, b| b.file_path.cmp(&a.file_path));

Ok(hot_tier_files)
}

///get the list of parquet files from the hot tier directory for the stream
Expand Down
7 changes: 2 additions & 5 deletions src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,10 @@ impl StandardTableProvider {
state: &dyn Session,
time_partition: Option<String>,
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> {
let (hot_tier_files, remainder) = hot_tier_manager
.get_hot_tier_manifest_files(&self.stream, manifest_files.clone())
let hot_tier_files = hot_tier_manager
.get_hot_tier_manifest_files(&self.stream, manifest_files)
.await
.map_err(|err| DataFusionError::External(Box::new(err)))?;
// Assign remaining entries back to manifest list
// This is to be used for remote query
*manifest_files = remainder;

let hot_tier_files = hot_tier_files
.into_iter()
Expand Down

0 comments on commit 0b2e19e

Please sign in to comment.