From 7b1e9dd712faede24661a4e3b9cb90f7e5041b4d Mon Sep 17 00:00:00 2001 From: Nikhil Sinha <131262146+nikhilsinhacloudsurfex@users.noreply.github.com> Date: Mon, 29 Jan 2024 12:01:02 +0530 Subject: [PATCH] feat: allow configurable duration for data push to S3 (#626) create parquet file by grouping all arrow files (in staging) for the duration provided in env variable P_STORAGE_UPLOAD_INTERVAL also check if arrow files vector is not empty, then sort the arrow files and create key for parquet file from last file from sorted arrow files vector Fixes #616 --- server/src/storage/staging.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 6ee908079..04d3bef9a 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -124,6 +124,7 @@ impl StorageDir { // hashmap but exclude where hot filename matches let mut grouped_arrow_file: HashMap> = HashMap::new(); let mut arrow_files = self.arrow_files(); + arrow_files.retain(|path| { !path .file_name() @@ -132,12 +133,17 @@ impl StorageDir { .unwrap() .ends_with(&hot_filename) }); - for arrow_file_path in arrow_files { - let key = Self::arrow_path_to_parquet(&arrow_file_path); - grouped_arrow_file - .entry(key) - .or_default() - .push(arrow_file_path); + + //check if arrow files is not empty, fetch the parquet file path from last file from sorted arrow file list + if !(arrow_files.is_empty()) { + arrow_files.sort(); + let key = Self::arrow_path_to_parquet(arrow_files.last().unwrap()); + for arrow_file_path in arrow_files { + grouped_arrow_file + .entry(key.clone()) + .or_default() + .push(arrow_file_path); + } } grouped_arrow_file @@ -201,7 +207,6 @@ pub fn convert_disk_files_to_parquet( let record_reader = MergedReverseRecordReader::try_new(&files).unwrap(); let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; - let props = parquet_writer_props().build(); let merged_schema = record_reader.merged_schema(); schemas.push(merged_schema.clone());