From 01adab54ce925b662af9b486c95ce3909dcab180 Mon Sep 17 00:00:00 2001 From: Hiroaki Goto Date: Thu, 10 Aug 2023 16:57:39 +0900 Subject: [PATCH] feat: show the number of processed items in import/export --- Cargo.lock | 7 +-- Cargo.toml | 1 + src/transfer.rs | 128 +++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 132 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 64d37529..a25b65b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -305,15 +305,15 @@ checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" [[package]] name = "console" -version = "0.15.5" +version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3d79fbe8970a77e3e34151cc13d3b3e248aa0faaecb9f6091fa07ebefe5ad60" +checksum = "c926e00cc70edefdc64d3a5ff31cc65bb97a3460097762bd23afb4d8145fccf8" dependencies = [ "encode_unicode", "lazy_static", "libc", "unicode-width", - "windows-sys 0.42.0", + "windows-sys 0.45.0", ] [[package]] @@ -571,6 +571,7 @@ dependencies = [ "atty", "bytes", "chrono", + "console", "dialoguer", "dirs", "env_logger", diff --git a/Cargo.toml b/Cargo.toml index d3e40464..87bf5ffb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,7 @@ pest = "2.6.0" pest_derive = "2.6.0" bytes = "1.4.0" itertools = "0.10.5" +console = "0.15.7" [dev-dependencies] assert_cmd = "2.0.2" # contains helpers make executing the main binary on integration tests easier. diff --git a/src/transfer.rs b/src/transfer.rs index 23be31f3..89024471 100644 --- a/src/transfer.rs +++ b/src/transfer.rs @@ -14,6 +14,9 @@ * limitations under the License. */ +use console::Term; +use std::collections::VecDeque; +use std::time::Instant; use std::{ collections::HashMap, fs, @@ -38,6 +41,77 @@ struct SuggestedAttribute { type_str: String, } +#[derive(Clone, Debug, Hash, PartialOrd, PartialEq)] +struct ProgressState { + processed_items: usize, + recent_processed_items: VecDeque<(Instant, usize)>, + max_recordable_observations: usize, +} + +impl ProgressState { + fn new(max_recordable_observations: usize) -> ProgressState { + ProgressState { + processed_items: 0, + recent_processed_items: VecDeque::with_capacity(max_recordable_observations), + max_recordable_observations, + } + } + + fn add_observation(&mut self, processed_items: usize) { + self.add_observation_with_time(processed_items, Instant::now()) + } + + fn add_observation_with_time(&mut self, processed_items: usize, at: Instant) { + self.processed_items += processed_items; + + if self.recent_processed_items.len() == self.max_recordable_observations { + self.recent_processed_items.pop_back(); + } + self.recent_processed_items + .push_front((at, processed_items)); + } + + fn processed_items(&self) -> usize { + self.processed_items + } + + fn recent_average_processed_items_per_second(&self) -> f64 { + self.recent_average_processed_items_per_second_with_time(Instant::now()) + } + + fn recent_average_processed_items_per_second_with_time(&self, at: Instant) -> f64 { + let mut sum = 0.0; + for v in &self.recent_processed_items { + sum += v.1 as f64 + } + if let Some((oldest_time, _)) = self.recent_processed_items.back() { + if at == *oldest_time { + f64::NAN + } else { + sum / at.duration_since(*oldest_time).as_secs_f64() + } + } else { + 0.0 + } + } + + fn show(&self) { + let items = self.processed_items(); + let items_per_sec = self.recent_average_processed_items_per_second(); + let mut term = Term::stdout(); + term.clear_line().expect("Failed to clear line"); + write!( + term, + "{} items processed ({:.2} items/sec)", + items, items_per_sec + ) + .expect("Failed to update message"); + term.flush().expect("Failed to flush"); + } +} + +const MAX_NUMBER_OF_OBSERVES: usize = 10; + /* ================================================= Public functions ================================================= */ @@ -111,6 +185,7 @@ pub async fn export( tmp_output_file.set_len(0)?; let mut last_evaluated_key: Option> = None; + let mut progress_status = ProgressState::new(MAX_NUMBER_OF_OBSERVES); loop { // Invoke Scan API here. At the 1st iteration exclusive_start_key would be "None" as defined above, outside of the loop. // On 2nd iteration and later, passing last_evaluated_key from the previous loop as an exclusive_start_key. @@ -128,6 +203,8 @@ pub async fn export( let items = scan_output .items .expect("Scan result items should be 'Some' even if no item returned."); + + progress_status.add_observation(items.len()); match format_str { None | Some("json") => { let s = serde_json::to_string_pretty(&data::convert_to_json_vec(&items))?; @@ -156,6 +233,7 @@ pub async fn export( } Some(o) => panic!("Invalid output format is given: {}", o), } + progress_status.show(); // update last_evaluated_key for the next iteration. // If there's no more item in the table, last_evaluated_key would be "None" and it means it's ok to break the loop. @@ -241,18 +319,23 @@ pub async fn import( let headers: Vec<&str> = lines[0].split(',').collect::>(); let mut matrix: Vec> = vec![]; // Iterate over lines (from index = 1, as index = 0 is the header line) + let mut progress_status = ProgressState::new(MAX_NUMBER_OF_OBSERVES); for (i, line) in lines.iter().enumerate().skip(1) { let cells: Vec<&str> = line.split(',').collect::>(); debug!("splitted line => {:?}", cells); matrix.push(cells); if i % 25 == 0 { write_csv_matrix(&cx, matrix.clone(), &headers, enable_set_inference).await?; + progress_status.add_observation(25); + progress_status.show(); matrix.clear(); } } debug!("rest of matrix => {:?}", matrix); if !matrix.is_empty() { write_csv_matrix(&cx, matrix.clone(), &headers, enable_set_inference).await?; + progress_status.add_observation(matrix.len()); + progress_status.show(); } } Some(o) => panic!("Invalid input format is given: {}", o), @@ -441,9 +524,14 @@ async fn write_array_of_jsons_with_chunked_25( array_of_json_obj: Vec, enable_set_inference: bool, ) -> Result<(), batch::DyneinBatchError> { + let mut progress_status = ProgressState::new(MAX_NUMBER_OF_OBSERVES); for chunk /* Vec */ in array_of_json_obj.chunks(25) { // As BatchWriteItem request can have up to 25 items. - let request_items: HashMap> = batch::convert_jsonvals_to_request_items(&cx, chunk.to_vec(), enable_set_inference).await?; + let items = chunk.to_vec(); + let count = items.len(); + let request_items: HashMap> = batch::convert_jsonvals_to_request_items(&cx, items, enable_set_inference).await?; batch::batch_write_untill_processed(cx.clone(), request_items).await?; + progress_status.add_observation(count); + progress_status.show(); } Ok(()) } @@ -467,3 +555,41 @@ async fn write_csv_matrix( batch::batch_write_untill_processed(cx.clone(), request_items).await?; Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + use std::ops::Add; + use std::time::Duration; + + #[test] + fn test_progress_status() { + let mut progress = ProgressState::new(2); + + let first_observation = Instant::now(); + progress.add_observation_with_time(10, first_observation); + assert_eq!(progress.processed_items(), 10); + assert_eq!(progress.recent_processed_items.len(), 1); + assert!(progress + .recent_average_processed_items_per_second_with_time(first_observation) + .is_nan()); + + let second_observation = first_observation.add(Duration::from_millis(500)); + progress.add_observation_with_time(10, second_observation); + assert_eq!(progress.processed_items(), 20); + assert_eq!(progress.recent_processed_items.len(), 2); + assert_eq!( + progress.recent_average_processed_items_per_second_with_time(second_observation), + (10.0 + 10.0) / 0.5 + ); + + let third_observation = first_observation.add(Duration::from_millis(1000)); + progress.add_observation_with_time(12, third_observation); + assert_eq!(progress.processed_items(), 32); + assert_eq!(progress.recent_processed_items.len(), 2); + assert_eq!( + progress.recent_average_processed_items_per_second_with_time(third_observation), + (10.0 + 12.0) / 0.5 + ); + } +}