diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 6ee908079..04d3bef9a 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -124,6 +124,7 @@ impl StorageDir { // hashmap but exclude where hot filename matches let mut grouped_arrow_file: HashMap> = HashMap::new(); let mut arrow_files = self.arrow_files(); + arrow_files.retain(|path| { !path .file_name() @@ -132,12 +133,17 @@ impl StorageDir { .unwrap() .ends_with(&hot_filename) }); - for arrow_file_path in arrow_files { - let key = Self::arrow_path_to_parquet(&arrow_file_path); - grouped_arrow_file - .entry(key) - .or_default() - .push(arrow_file_path); + + //check if arrow files is not empty, fetch the parquet file path from last file from sorted arrow file list + if !(arrow_files.is_empty()) { + arrow_files.sort(); + let key = Self::arrow_path_to_parquet(arrow_files.last().unwrap()); + for arrow_file_path in arrow_files { + grouped_arrow_file + .entry(key.clone()) + .or_default() + .push(arrow_file_path); + } } grouped_arrow_file @@ -201,7 +207,6 @@ pub fn convert_disk_files_to_parquet( let record_reader = MergedReverseRecordReader::try_new(&files).unwrap(); let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; - let props = parquet_writer_props().build(); let merged_schema = record_reader.merged_schema(); schemas.push(merged_schema.clone());