Skip to content

Commit

Permalink
feat: allow configurable duration for data push to S3 (#626)
Browse files Browse the repository at this point in the history
create parquet file by grouping all arrow files (in staging) for the duration 
provided in env variable P_STORAGE_UPLOAD_INTERVAL also check 
if arrow files vector is not empty, then sort the arrow files and create key 
for parquet file from last file from sorted arrow files vector

Fixes #616
  • Loading branch information
nikhilsinhaparseable authored Jan 29, 2024
1 parent 5ca9ecb commit 7b1e9dd
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions server/src/storage/staging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ impl StorageDir {
// hashmap <time, vec[paths]> but exclude where hot filename matches
let mut grouped_arrow_file: HashMap<PathBuf, Vec<PathBuf>> = HashMap::new();
let mut arrow_files = self.arrow_files();

arrow_files.retain(|path| {
!path
.file_name()
Expand All @@ -132,12 +133,17 @@ impl StorageDir {
.unwrap()
.ends_with(&hot_filename)
});
for arrow_file_path in arrow_files {
let key = Self::arrow_path_to_parquet(&arrow_file_path);
grouped_arrow_file
.entry(key)
.or_default()
.push(arrow_file_path);

//check if arrow files is not empty, fetch the parquet file path from last file from sorted arrow file list
if !(arrow_files.is_empty()) {
arrow_files.sort();
let key = Self::arrow_path_to_parquet(arrow_files.last().unwrap());
for arrow_file_path in arrow_files {
grouped_arrow_file
.entry(key.clone())
.or_default()
.push(arrow_file_path);
}
}

grouped_arrow_file
Expand Down Expand Up @@ -201,7 +207,6 @@ pub fn convert_disk_files_to_parquet(
let record_reader = MergedReverseRecordReader::try_new(&files).unwrap();

let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?;

let props = parquet_writer_props().build();
let merged_schema = record_reader.merged_schema();
schemas.push(merged_schema.clone());
Expand Down

0 comments on commit 7b1e9dd

Please sign in to comment.