Skip to content

Commit

Permalink
feat!: Add --enable-set-inference flag to import
Browse files Browse the repository at this point in the history
BREAKING CHANGE: import command no longer inferences set types without `--enable-set-inference`
  • Loading branch information
StoneDot authored and ryota-sakamoto committed Aug 4, 2023
1 parent 1e90351 commit 67b134e
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 29 deletions.
6 changes: 4 additions & 2 deletions src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ pub async fn batch_write_item(
pub async fn convert_jsonvals_to_request_items(
cx: &app::Context,
items_jsonval: Vec<JsonValue>,
enable_set_inference: bool,
) -> Result<HashMap<String, Vec<WriteRequest>>, DyneinBatchError> {
let mut results = HashMap::<String, Vec<WriteRequest>>::new();
let mut write_requests = Vec::<WriteRequest>::new();
Expand All @@ -275,7 +276,7 @@ pub async fn convert_jsonvals_to_request_items(
{
item.insert(
attr_name.to_string(),
data::dispatch_jsonvalue_to_attrval(body),
data::dispatch_jsonvalue_to_attrval(body, enable_set_inference),
);
}

Expand All @@ -301,6 +302,7 @@ pub async fn csv_matrix_to_request_items(
cx: &app::Context,
matrix: &[Vec<&str>],
headers: &[&str],
enable_set_inference: bool,
) -> Result<HashMap<String, Vec<WriteRequest>>, DyneinBatchError> {
let total_elements_in_matrix: usize = matrix
.iter()
Expand Down Expand Up @@ -333,7 +335,7 @@ pub async fn csv_matrix_to_request_items(
);
item.insert(
headers[i].to_string(),
data::dispatch_jsonvalue_to_attrval(&jsonval),
data::dispatch_jsonvalue_to_attrval(&jsonval, enable_set_inference),
);
}

Expand Down
7 changes: 6 additions & 1 deletion src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,12 @@ e.g. https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GettingSta
.expect("each item should be a valid JSON object.");
let item_attrval: HashMap<String, AttributeValue> = item_json
.iter()
.map(|(k, v)| (String::from(k), data::dispatch_jsonvalue_to_attrval(v)))
.map(|(k, v)| {
(
String::from(k),
data::dispatch_jsonvalue_to_attrval(v, true),
)
})
.collect();
write_requests.push(WriteRequest {
put_request: Some(PutRequest { item: item_attrval }),
Expand Down
4 changes: 4 additions & 0 deletions src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,10 @@ pub enum Sub {
/// csv = comma-separated values with header. Header columns are considered to be DynamoDB attributes.
#[structopt(short, long, possible_values = &["csv", "json", "jsonl", "json-compact"])]
format: Option<String>,

/// Enable type inference for set types. This option is provided for backward compatibility.
#[structopt(long)]
enable_set_inference: bool,
},

/// Take backup of a DynamoDB table using on-demand backup
Expand Down
122 changes: 111 additions & 11 deletions src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ fn convert_jsonval_to_attrvals_in_hashmap_val(
let mut result = HashMap::<String, AttributeValue>::new();
for (k, v) in hashmap {
debug!("working on key '{}', and value '{:?}'", k, v);
result.insert(k, dispatch_jsonvalue_to_attrval(&v));
result.insert(k, dispatch_jsonvalue_to_attrval(&v, true));
}
result
}
Expand Down Expand Up @@ -674,15 +674,15 @@ fn build_attrval_set(ktype: &str, kval: &[JsonValue]) -> AttributeValue {

/// for "L" DynamoDB Attributes
/// used only for 'simplified JSON' format. Not compatible with DynamoDB JSON.
fn build_attrval_list(vec: &[JsonValue]) -> AttributeValue {
fn build_attrval_list(vec: &[JsonValue], enable_set_inference: bool) -> AttributeValue {
let mut attrval: AttributeValue = AttributeValue {
..Default::default()
};

let mut inside_attrvals = Vec::<AttributeValue>::new();
for v in vec {
debug!("this is an element of vec: {:?}", v);
inside_attrvals.push(dispatch_jsonvalue_to_attrval(v));
inside_attrvals.push(dispatch_jsonvalue_to_attrval(v, enable_set_inference));
}
attrval.l = Some(inside_attrvals);

Expand All @@ -691,23 +691,29 @@ fn build_attrval_list(vec: &[JsonValue]) -> AttributeValue {

/// for "M" DynamoDB Attributes
/// used only for 'simplified JSON' format. Not compatible with DynamoDB JSON.
fn build_attrval_map(json_map: &serde_json::Map<std::string::String, JsonValue>) -> AttributeValue {
fn build_attrval_map(
json_map: &serde_json::Map<std::string::String, JsonValue>,
enable_set_inference: bool,
) -> AttributeValue {
let mut result = AttributeValue {
..Default::default()
};

let mut mapval = HashMap::<String, AttributeValue>::new();
for (k, v) in json_map {
debug!("working on key '{}', and value '{:?}'", k, v);
mapval.insert(k.to_string(), dispatch_jsonvalue_to_attrval(v));
mapval.insert(
k.to_string(),
dispatch_jsonvalue_to_attrval(v, enable_set_inference),
);
}
result.m = Some(mapval);

result
}

/// Convert from serde_json::Value (standard JSON values) into DynamoDB style AttributeValue
pub fn dispatch_jsonvalue_to_attrval(jv: &JsonValue) -> AttributeValue {
pub fn dispatch_jsonvalue_to_attrval(jv: &JsonValue, enable_set_inference: bool) -> AttributeValue {
match jv {
// scalar types
JsonValue::String(val) => AttributeValue {
Expand All @@ -728,23 +734,23 @@ pub fn dispatch_jsonvalue_to_attrval(jv: &JsonValue) -> AttributeValue {
},

// document types. they can be recursive.
JsonValue::Object(obj) => build_attrval_map(obj),
JsonValue::Object(obj) => build_attrval_map(obj, enable_set_inference),
JsonValue::Array(vec) => {
if vec.iter().all(|v| v.is_string()) {
if enable_set_inference && vec.iter().all(|v| v.is_string()) {
debug!(
"All elements in this attribute are String - treat it as 'SS': {:?}",
vec
);
build_attrval_set(&String::from("SS"), vec)
} else if vec.iter().all(|v| v.is_number()) {
} else if enable_set_inference && vec.iter().all(|v| v.is_number()) {
debug!(
"All elements in this attribute are Number - treat it as 'NS': {:?}",
vec
);
build_attrval_set(&String::from("NS"), vec)
} else {
debug!("Elements are not uniform - treat it as 'L': {:?}", vec);
build_attrval_list(vec)
build_attrval_list(vec, enable_set_inference)
}
}
}
Expand Down Expand Up @@ -1349,7 +1355,8 @@ fn generate_scan_expressions(

#[cfg(test)]
mod tests {
use super::{generate_update_expressions, AttributeValue, UpdateActionType};
use super::*;
use serde_json::Value;
use std::collections::HashMap;

#[test]
Expand Down Expand Up @@ -1798,4 +1805,97 @@ mod tests {
);
assert_eq!(actual.vals, None);
}

#[test]
fn test_dispatch_jsonvalue_to_attrval() {
let string_list = r#"
[
"+44 1234567",
"+44 2345678"
]"#;
let string_list: Value = serde_json::from_str(string_list).unwrap();
let actual = dispatch_jsonvalue_to_attrval(&string_list, false);
assert_eq!(
actual,
AttributeValue {
l: Some(vec!(
AttributeValue {
s: Some("+44 1234567".to_owned()),
..Default::default()
},
AttributeValue {
s: Some("+44 2345678".to_owned()),
..Default::default()
}
)),
..Default::default()
}
);
let actual = dispatch_jsonvalue_to_attrval(&string_list, true);
assert_eq!(
actual,
AttributeValue {
ss: Some(vec!("+44 1234567".to_owned(), "+44 2345678".to_owned())),
..Default::default()
}
);

let number_list = r#"
[
12345,
67890
]"#;
let number_list: Value = serde_json::from_str(number_list).unwrap();
let actual = dispatch_jsonvalue_to_attrval(&number_list, false);
assert_eq!(
actual,
AttributeValue {
l: Some(vec!(
AttributeValue {
n: Some("12345".to_owned()),
..Default::default()
},
AttributeValue {
n: Some("67890".to_owned()),
..Default::default()
}
)),
..Default::default()
}
);
let actual = dispatch_jsonvalue_to_attrval(&number_list, true);
assert_eq!(
actual,
AttributeValue {
ns: Some(vec!["12345".to_owned(), "67890".to_owned()]),
..Default::default()
}
);

let mix_list = r#"
[
"text",
1234
]"#;
let mix_list: Value = serde_json::from_str(mix_list).unwrap();
for flag in [true, false] {
let actual = dispatch_jsonvalue_to_attrval(&mix_list, flag);
assert_eq!(
actual,
AttributeValue {
l: Some(vec!(
AttributeValue {
s: Some("text".to_owned()),
..Default::default()
},
AttributeValue {
n: Some("1234".to_owned()),
..Default::default()
}
)),
..Default::default()
}
);
}
}
}
8 changes: 5 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,11 @@ async fn dispatch(context: &mut app::Context, subcommand: cmd::Sub) -> Result<()
output_file,
format,
} => transfer::export(context.clone(), attributes, keys_only, output_file, format).await?,
cmd::Sub::Import { input_file, format } => {
transfer::import(context.clone(), input_file, format).await?
}
cmd::Sub::Import {
input_file,
format,
enable_set_inference,
} => transfer::import(context.clone(), input_file, format, enable_set_inference).await?,
cmd::Sub::Backup { list, all_tables } => {
if list {
control::list_backups(context.clone(), all_tables).await?
Expand Down
17 changes: 11 additions & 6 deletions src/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ pub async fn import(
cx: app::Context,
input_file: String,
format: Option<String>,
enable_set_inference: bool,
) -> Result<(), batch::DyneinBatchError> {
let format_str: Option<&str> = format.as_deref();

Expand All @@ -216,7 +217,8 @@ pub async fn import(
match format_str {
None | Some("json") | Some("json-compact") => {
let array_of_json_obj: Vec<JsonValue> = serde_json::from_str(&input_string)?;
write_array_of_jsons_with_chunked_25(cx, array_of_json_obj).await?;
write_array_of_jsons_with_chunked_25(cx, array_of_json_obj, enable_set_inference)
.await?;
}
Some("jsonl") => {
// JSON Lines can be deserialized with into_iter() as below.
Expand All @@ -225,7 +227,8 @@ pub async fn import(
// list_of_jsons contains deserialize results. Filter them and get only valid items.
let array_of_valid_json_obj: Vec<JsonValue> =
array_of_json_obj.filter_map(Result::ok).collect();
write_array_of_jsons_with_chunked_25(cx, array_of_valid_json_obj).await?;
write_array_of_jsons_with_chunked_25(cx, array_of_valid_json_obj, enable_set_inference)
.await?;
}
Some("csv") => {
let lines: Vec<&str> = input_string
Expand All @@ -243,13 +246,13 @@ pub async fn import(
debug!("splitted line => {:?}", cells);
matrix.push(cells);
if i % 25 == 0 {
write_csv_matrix(&cx, matrix.clone(), &headers).await?;
write_csv_matrix(&cx, matrix.clone(), &headers, enable_set_inference).await?;
matrix.clear();
}
}
debug!("rest of matrix => {:?}", matrix);
if !matrix.is_empty() {
write_csv_matrix(&cx, matrix.clone(), &headers).await?;
write_csv_matrix(&cx, matrix.clone(), &headers, enable_set_inference).await?;
}
}
Some(o) => panic!("Invalid input format is given: {}", o),
Expand Down Expand Up @@ -436,9 +439,10 @@ fn build_csv_header(
async fn write_array_of_jsons_with_chunked_25(
cx: app::Context,
array_of_json_obj: Vec<JsonValue>,
enable_set_inference: bool,
) -> Result<(), batch::DyneinBatchError> {
for chunk /* Vec<JsonValue> */ in array_of_json_obj.chunks(25) { // As BatchWriteItem request can have up to 25 items.
let request_items: HashMap<String, Vec<WriteRequest>> = batch::convert_jsonvals_to_request_items(&cx, chunk.to_vec()).await?;
let request_items: HashMap<String, Vec<WriteRequest>> = batch::convert_jsonvals_to_request_items(&cx, chunk.to_vec(), enable_set_inference).await?;
batch::batch_write_untill_processed(cx.clone(), request_items).await?;
}
Ok(())
Expand All @@ -456,9 +460,10 @@ async fn write_csv_matrix(
cx: &app::Context,
matrix: Vec<Vec<&str>>,
headers: &[&str],
enable_set_inference: bool,
) -> Result<(), batch::DyneinBatchError> {
let request_items: HashMap<String, Vec<WriteRequest>> =
batch::csv_matrix_to_request_items(cx, &matrix, headers).await?;
batch::csv_matrix_to_request_items(cx, &matrix, headers, enable_set_inference).await?;
batch::batch_write_untill_processed(cx.clone(), request_items).await?;
Ok(())
}
17 changes: 11 additions & 6 deletions tests/cmd/import.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ dy admin update table your_table --mode ondemand).
would be primary key(s).
USAGE:
dy import [OPTIONS] --input-file <input-file>
dy import [FLAGS] [OPTIONS] --input-file <input-file>
FLAGS:
-h, --help
--enable-set-inference
Enable type inference for set types. This option is provided for backward compatibility
-h, --help
Prints help information
-V, --version
-V, --version
Prints version information
Expand Down Expand Up @@ -53,11 +56,13 @@ dy admin update table your_table --mode ondemand).
would be primary key(s).
USAGE:
dy import [OPTIONS] --input-file <input-file>
dy import [FLAGS] [OPTIONS] --input-file <input-file>
FLAGS:
-h, --help Prints help information
-V, --version Prints version information
--enable-set-inference Enable type inference for set types. This option is provided for backward
compatibility
-h, --help Prints help information
-V, --version Prints version information
OPTIONS:
-f, --format <format> Data format for import items.
Expand Down

0 comments on commit 67b134e

Please sign in to comment.