Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: show the number of processed items in import/export #141

Merged
merged 1 commit into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pest = "2.6.0"
pest_derive = "2.6.0"
bytes = "1.4.0"
itertools = "0.10.5"
console = "0.15.7"

[dev-dependencies]
assert_cmd = "2.0.2" # contains helpers make executing the main binary on integration tests easier.
Expand Down
128 changes: 127 additions & 1 deletion src/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
* limitations under the License.
*/

use console::Term;
use std::collections::VecDeque;
use std::time::Instant;
use std::{
collections::HashMap,
fs,
Expand All @@ -38,6 +41,77 @@ struct SuggestedAttribute {
type_str: String,
}

#[derive(Clone, Debug, Hash, PartialOrd, PartialEq)]
struct ProgressState {
processed_items: usize,
recent_processed_items: VecDeque<(Instant, usize)>,
max_recordable_observations: usize,
}

impl ProgressState {
fn new(max_recordable_observations: usize) -> ProgressState {
ProgressState {
processed_items: 0,
recent_processed_items: VecDeque::with_capacity(max_recordable_observations),
max_recordable_observations,
}
}

fn add_observation(&mut self, processed_items: usize) {
self.add_observation_with_time(processed_items, Instant::now())
}

fn add_observation_with_time(&mut self, processed_items: usize, at: Instant) {
self.processed_items += processed_items;

if self.recent_processed_items.len() == self.max_recordable_observations {
self.recent_processed_items.pop_back();
}
self.recent_processed_items
.push_front((at, processed_items));
}

fn processed_items(&self) -> usize {
self.processed_items
}

fn recent_average_processed_items_per_second(&self) -> f64 {
self.recent_average_processed_items_per_second_with_time(Instant::now())
}

fn recent_average_processed_items_per_second_with_time(&self, at: Instant) -> f64 {
let mut sum = 0.0;
for v in &self.recent_processed_items {
sum += v.1 as f64
}
if let Some((oldest_time, _)) = self.recent_processed_items.back() {
if at == *oldest_time {
f64::NAN
} else {
sum / at.duration_since(*oldest_time).as_secs_f64()
}
} else {
0.0
}
}

fn show(&self) {
let items = self.processed_items();
let items_per_sec = self.recent_average_processed_items_per_second();
let mut term = Term::stdout();
term.clear_line().expect("Failed to clear line");
write!(
term,
"{} items processed ({:.2} items/sec)",
items, items_per_sec
)
.expect("Failed to update message");
term.flush().expect("Failed to flush");
}
}

const MAX_NUMBER_OF_OBSERVES: usize = 10;

/* =================================================
Public functions
================================================= */
Expand Down Expand Up @@ -111,6 +185,7 @@ pub async fn export(
tmp_output_file.set_len(0)?;

let mut last_evaluated_key: Option<HashMap<String, rusoto_dynamodb::AttributeValue>> = None;
let mut progress_status = ProgressState::new(MAX_NUMBER_OF_OBSERVES);
loop {
// Invoke Scan API here. At the 1st iteration exclusive_start_key would be "None" as defined above, outside of the loop.
// On 2nd iteration and later, passing last_evaluated_key from the previous loop as an exclusive_start_key.
Expand All @@ -128,6 +203,8 @@ pub async fn export(
let items = scan_output
.items
.expect("Scan result items should be 'Some' even if no item returned.");

progress_status.add_observation(items.len());
match format_str {
None | Some("json") => {
let s = serde_json::to_string_pretty(&data::convert_to_json_vec(&items))?;
Expand Down Expand Up @@ -156,6 +233,7 @@ pub async fn export(
}
Some(o) => panic!("Invalid output format is given: {}", o),
}
progress_status.show();

// update last_evaluated_key for the next iteration.
// If there's no more item in the table, last_evaluated_key would be "None" and it means it's ok to break the loop.
Expand Down Expand Up @@ -241,18 +319,23 @@ pub async fn import(
let headers: Vec<&str> = lines[0].split(',').collect::<Vec<&str>>();
let mut matrix: Vec<Vec<&str>> = vec![];
// Iterate over lines (from index = 1, as index = 0 is the header line)
let mut progress_status = ProgressState::new(MAX_NUMBER_OF_OBSERVES);
for (i, line) in lines.iter().enumerate().skip(1) {
let cells: Vec<&str> = line.split(',').collect::<Vec<&str>>();
debug!("splitted line => {:?}", cells);
matrix.push(cells);
if i % 25 == 0 {
write_csv_matrix(&cx, matrix.clone(), &headers, enable_set_inference).await?;
progress_status.add_observation(25);
progress_status.show();
matrix.clear();
}
}
debug!("rest of matrix => {:?}", matrix);
if !matrix.is_empty() {
write_csv_matrix(&cx, matrix.clone(), &headers, enable_set_inference).await?;
progress_status.add_observation(matrix.len());
progress_status.show();
}
}
Some(o) => panic!("Invalid input format is given: {}", o),
Expand Down Expand Up @@ -441,9 +524,14 @@ async fn write_array_of_jsons_with_chunked_25(
array_of_json_obj: Vec<JsonValue>,
enable_set_inference: bool,
) -> Result<(), batch::DyneinBatchError> {
let mut progress_status = ProgressState::new(MAX_NUMBER_OF_OBSERVES);
for chunk /* Vec<JsonValue> */ in array_of_json_obj.chunks(25) { // As BatchWriteItem request can have up to 25 items.
let request_items: HashMap<String, Vec<WriteRequest>> = batch::convert_jsonvals_to_request_items(&cx, chunk.to_vec(), enable_set_inference).await?;
let items = chunk.to_vec();
let count = items.len();
let request_items: HashMap<String, Vec<WriteRequest>> = batch::convert_jsonvals_to_request_items(&cx, items, enable_set_inference).await?;
batch::batch_write_untill_processed(cx.clone(), request_items).await?;
progress_status.add_observation(count);
progress_status.show();
}
Ok(())
}
Expand All @@ -467,3 +555,41 @@ async fn write_csv_matrix(
batch::batch_write_untill_processed(cx.clone(), request_items).await?;
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use std::ops::Add;
use std::time::Duration;

#[test]
fn test_progress_status() {
let mut progress = ProgressState::new(2);

let first_observation = Instant::now();
progress.add_observation_with_time(10, first_observation);
assert_eq!(progress.processed_items(), 10);
assert_eq!(progress.recent_processed_items.len(), 1);
assert!(progress
.recent_average_processed_items_per_second_with_time(first_observation)
.is_nan());

let second_observation = first_observation.add(Duration::from_millis(500));
progress.add_observation_with_time(10, second_observation);
assert_eq!(progress.processed_items(), 20);
assert_eq!(progress.recent_processed_items.len(), 2);
assert_eq!(
progress.recent_average_processed_items_per_second_with_time(second_observation),
(10.0 + 10.0) / 0.5
);

let third_observation = first_observation.add(Duration::from_millis(1000));
progress.add_observation_with_time(12, third_observation);
assert_eq!(progress.processed_items(), 32);
assert_eq!(progress.recent_processed_items.len(), 2);
assert_eq!(
progress.recent_average_processed_items_per_second_with_time(third_observation),
(10.0 + 12.0) / 0.5
);
}
}
Loading