Skip to content

Commit

Permalink
feat: show the number of processed items in import/export
Browse files Browse the repository at this point in the history
  • Loading branch information
StoneDot authored and kzym-w committed Aug 10, 2023
1 parent 9e21655 commit bd470cf
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 4 deletions.
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pest = "2.6.0"
pest_derive = "2.6.0"
bytes = "1.4.0"
itertools = "0.10.5"
console = "0.15.7"

[dev-dependencies]
assert_cmd = "2.0.2" # contains helpers make executing the main binary on integration tests easier.
Expand Down
128 changes: 127 additions & 1 deletion src/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
* limitations under the License.
*/

use console::Term;
use std::collections::VecDeque;
use std::time::Instant;
use std::{
collections::HashMap,
fs,
Expand All @@ -38,6 +41,77 @@ struct SuggestedAttribute {
type_str: String,
}

#[derive(Clone, Debug, Hash, PartialOrd, PartialEq)]
struct ProgressState {
processed_items: usize,
recent_processed_items: VecDeque<(Instant, usize)>,
max_recordable_observations: usize,
}

impl ProgressState {
fn new(max_recordable_observations: usize) -> ProgressState {
ProgressState {
processed_items: 0,
recent_processed_items: VecDeque::with_capacity(max_recordable_observations),
max_recordable_observations,
}
}

fn add_observation(&mut self, processed_items: usize) {
self.add_observation_with_time(processed_items, Instant::now())
}

fn add_observation_with_time(&mut self, processed_items: usize, at: Instant) {
self.processed_items += processed_items;

if self.recent_processed_items.len() == self.max_recordable_observations {
self.recent_processed_items.pop_back();
}
self.recent_processed_items
.push_front((at, processed_items));
}

fn processed_items(&self) -> usize {
self.processed_items
}

fn recent_average_processed_items_per_second(&self) -> f64 {
self.recent_average_processed_items_per_second_with_time(Instant::now())
}

fn recent_average_processed_items_per_second_with_time(&self, at: Instant) -> f64 {
let mut sum = 0.0;
for v in &self.recent_processed_items {
sum += v.1 as f64
}
if let Some((oldest_time, _)) = self.recent_processed_items.back() {
if at == *oldest_time {
f64::NAN
} else {
sum / at.duration_since(*oldest_time).as_secs_f64()
}
} else {
0.0
}
}

fn show(&self) {
let items = self.processed_items();
let items_per_sec = self.recent_average_processed_items_per_second();
let mut term = Term::stdout();
term.clear_line().expect("Failed to clear line");
write!(
term,
"{} items processed ({:.2} items/sec)",
items, items_per_sec
)
.expect("Failed to update message");
term.flush().expect("Failed to flush");
}
}

const MAX_NUMBER_OF_OBSERVES: usize = 10;

/* =================================================
Public functions
================================================= */
Expand Down Expand Up @@ -111,6 +185,7 @@ pub async fn export(
tmp_output_file.set_len(0)?;

let mut last_evaluated_key: Option<HashMap<String, rusoto_dynamodb::AttributeValue>> = None;
let mut progress_status = ProgressState::new(MAX_NUMBER_OF_OBSERVES);
loop {
// Invoke Scan API here. At the 1st iteration exclusive_start_key would be "None" as defined above, outside of the loop.
// On 2nd iteration and later, passing last_evaluated_key from the previous loop as an exclusive_start_key.
Expand All @@ -128,6 +203,8 @@ pub async fn export(
let items = scan_output
.items
.expect("Scan result items should be 'Some' even if no item returned.");

progress_status.add_observation(items.len());
match format_str {
None | Some("json") => {
let s = serde_json::to_string_pretty(&data::convert_to_json_vec(&items))?;
Expand Down Expand Up @@ -156,6 +233,7 @@ pub async fn export(
}
Some(o) => panic!("Invalid output format is given: {}", o),
}
progress_status.show();

// update last_evaluated_key for the next iteration.
// If there's no more item in the table, last_evaluated_key would be "None" and it means it's ok to break the loop.
Expand Down Expand Up @@ -241,18 +319,23 @@ pub async fn import(
let headers: Vec<&str> = lines[0].split(',').collect::<Vec<&str>>();
let mut matrix: Vec<Vec<&str>> = vec![];
// Iterate over lines (from index = 1, as index = 0 is the header line)
let mut progress_status = ProgressState::new(MAX_NUMBER_OF_OBSERVES);
for (i, line) in lines.iter().enumerate().skip(1) {
let cells: Vec<&str> = line.split(',').collect::<Vec<&str>>();
debug!("splitted line => {:?}", cells);
matrix.push(cells);
if i % 25 == 0 {
write_csv_matrix(&cx, matrix.clone(), &headers, enable_set_inference).await?;
progress_status.add_observation(25);
progress_status.show();
matrix.clear();
}
}
debug!("rest of matrix => {:?}", matrix);
if !matrix.is_empty() {
write_csv_matrix(&cx, matrix.clone(), &headers, enable_set_inference).await?;
progress_status.add_observation(matrix.len());
progress_status.show();
}
}
Some(o) => panic!("Invalid input format is given: {}", o),
Expand Down Expand Up @@ -441,9 +524,14 @@ async fn write_array_of_jsons_with_chunked_25(
array_of_json_obj: Vec<JsonValue>,
enable_set_inference: bool,
) -> Result<(), batch::DyneinBatchError> {
let mut progress_status = ProgressState::new(MAX_NUMBER_OF_OBSERVES);
for chunk /* Vec<JsonValue> */ in array_of_json_obj.chunks(25) { // As BatchWriteItem request can have up to 25 items.
let request_items: HashMap<String, Vec<WriteRequest>> = batch::convert_jsonvals_to_request_items(&cx, chunk.to_vec(), enable_set_inference).await?;
let items = chunk.to_vec();
let count = items.len();
let request_items: HashMap<String, Vec<WriteRequest>> = batch::convert_jsonvals_to_request_items(&cx, items, enable_set_inference).await?;
batch::batch_write_untill_processed(cx.clone(), request_items).await?;
progress_status.add_observation(count);
progress_status.show();
}
Ok(())
}
Expand All @@ -467,3 +555,41 @@ async fn write_csv_matrix(
batch::batch_write_untill_processed(cx.clone(), request_items).await?;
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use std::ops::Add;
use std::time::Duration;

#[test]
fn test_progress_status() {
let mut progress = ProgressState::new(2);

let first_observation = Instant::now();
progress.add_observation_with_time(10, first_observation);
assert_eq!(progress.processed_items(), 10);
assert_eq!(progress.recent_processed_items.len(), 1);
assert!(progress
.recent_average_processed_items_per_second_with_time(first_observation)
.is_nan());

let second_observation = first_observation.add(Duration::from_millis(500));
progress.add_observation_with_time(10, second_observation);
assert_eq!(progress.processed_items(), 20);
assert_eq!(progress.recent_processed_items.len(), 2);
assert_eq!(
progress.recent_average_processed_items_per_second_with_time(second_observation),
(10.0 + 10.0) / 0.5
);

let third_observation = first_observation.add(Duration::from_millis(1000));
progress.add_observation_with_time(12, third_observation);
assert_eq!(progress.processed_items(), 32);
assert_eq!(progress.recent_processed_items.len(), 2);
assert_eq!(
progress.recent_average_processed_items_per_second_with_time(third_observation),
(10.0 + 12.0) / 0.5
);
}
}

0 comments on commit bd470cf

Please sign in to comment.