diff --git a/src/zyp/model/treatment.py b/src/zyp/model/treatment.py index f7392c0..7f99fee 100644 --- a/src/zyp/model/treatment.py +++ b/src/zyp/model/treatment.py @@ -1,3 +1,4 @@ +import builtins import typing as t from attr import Factory @@ -13,6 +14,7 @@ class Treatment(Dumpable): convert_list: t.List[str] = Factory(list) convert_string: t.List[str] = Factory(list) convert_dict: t.List[t.Dict[str, str]] = Factory(list) + normalize_complex_lists: bool = False prune_invalid_date: t.List[str] = Factory(list) def apply(self, data: DictOrList) -> DictOrList: @@ -28,7 +30,7 @@ def apply_record(self, data: Record) -> Record: local_ignores = [] if self.ignore_complex_lists: for k, v in data.items(): - if isinstance(v, list) and v and isinstance(v[0], dict): + if self.is_list_of_dicts(v): # Skip ignoring special-encoded items. if v[0] and list(v[0].keys())[0].startswith("$"): continue @@ -39,6 +41,12 @@ def apply_record(self, data: Record) -> Record: if ignore_name in data: del data[ignore_name] + # Apply sanitation for lists of objects. + if self.normalize_complex_lists: + for _, v in data.items(): + if self.is_list_of_dicts(v): + ListOfVaryingObjectsNormalizer(v).apply() + # Converge certain items to `list` even when defined differently. for to_list_name in self.convert_list: if to_list_name in data and not isinstance(data[to_list_name], list): @@ -66,3 +74,58 @@ def apply_record(self, data: Record) -> Record: del data[key] return data + + @staticmethod + def is_list_of_dicts(v: t.Any) -> bool: + return isinstance(v, list) and bool(v) and isinstance(v[0], dict) + + +@define +class SanitizerRule: + """ + Manage details of a sanitizer rule. + """ + + name: str + converter: t.Callable + + +@define +class ListOfVaryingObjectsNormalizer: + """ + CrateDB can not store lists of varying objects, so try to normalize them. + """ + + data: Collection + + def apply(self): + self.apply_rules(self.get_rules(self.type_stats())) + + def apply_rules(self, rules: t.List[SanitizerRule]) -> None: + for item in self.data: + for rule in rules: + name = rule.name + if name in item: + item[name] = rule.converter(item[name]) + + def get_rules(self, statistics) -> t.List[SanitizerRule]: + rules = [] + for name, types in statistics.items(): + if len(types) > 1: + rules.append(SanitizerRule(name=name, converter=self.get_best_converter(types))) + return rules + + def type_stats(self) -> t.Dict[str, t.List[str]]: + types: t.Dict[str, t.List[str]] = {} + for item in self.data: + for key, value in item.items(): + types.setdefault(key, []).append(type(value).__name__) + return types + + @staticmethod + def get_best_converter(types: t.List[str]) -> t.Callable: + if "str" in types: + return builtins.str + if "float" in types and "int" in types and "str" not in types: + return builtins.float + return lambda x: x diff --git a/tests/transform/conftest.py b/tests/transform/conftest.py index 5b023ec..9a9f3b1 100644 --- a/tests/transform/conftest.py +++ b/tests/transform/conftest.py @@ -2,6 +2,7 @@ RESET_TABLES = [ "from.dynamodb", + "from.generic", ] diff --git a/tests/transform/test_zyp_generic.py b/tests/transform/test_zyp_generic.py new file mode 100644 index 0000000..34f4e19 --- /dev/null +++ b/tests/transform/test_zyp_generic.py @@ -0,0 +1,41 @@ +import pytest + +from commons_codec.model import SQLOperation +from zyp.model.treatment import Treatment + +RECORD_IN = { + "_list_float_int": [{"abc": 42.42}, {"abc": 42}], + "_list_float_none": [{"id": 1, "abc": 42.42}, {"id": 2, "abc": None}], + "_list_int_str": [{"abc": 123}, {"abc": "123"}], +} + +RECORD_OUT = { + "_list_float_int": [{"abc": 42.42}, {"abc": 42.0}], + "_list_float_none": [{"id": 1, "abc": 42.42}, {"id": 2}], + "_list_int_str": [{"abc": "123"}, {"abc": "123"}], +} + + +@pytest.mark.integration +def test_normalize_list_of_objects(caplog, cratedb): + """ + Verify writing record to CrateDB, with transformations. + """ + + # Define CrateDB SQL DDL and DML operations (SQL+parameters). + operation_ddl = SQLOperation('CREATE TABLE "from".generic (data OBJECT(DYNAMIC))', None) + operation_dml = SQLOperation('INSERT INTO "from".generic (data) VALUES (:data)', {"data": RECORD_IN}) + + # Apply treatment to parameters. + parameters = operation_dml.parameters + Treatment(normalize_complex_lists=True).apply(parameters) + + # Insert into CrateDB. + cratedb.database.run_sql(operation_ddl.statement) + cratedb.database.run_sql(operation_dml.statement, parameters) + + # Verify data in target database. + assert cratedb.database.refresh_table("from.generic") is True + + results = cratedb.database.run_sql('SELECT * FROM "from".generic;', records=True) # noqa: S608 + assert results[0]["data"] == RECORD_OUT diff --git a/tests/zyp/test_treatment.py b/tests/zyp/test_treatment.py index fad5ad7..d69c582 100644 --- a/tests/zyp/test_treatment.py +++ b/tests/zyp/test_treatment.py @@ -70,6 +70,29 @@ def test_treatment_ignore_fields(): assert transformation.apply([{"data": [{"abc": 123}]}]) == [{"data": [{}]}] +def test_treatment_normalize_complex_lists_success(): + """ + Verify normalizing lists of objects works. + """ + transformation = Treatment(normalize_complex_lists=True) + assert transformation.apply([{"data": [{"abc": 123.42}, {"abc": 123}]}]) == [ + {"data": [{"abc": 123.42}, {"abc": 123.0}]} + ] + assert transformation.apply([{"data": [{"abc": 123}, {"abc": "123"}]}]) == [ + {"data": [{"abc": "123"}, {"abc": "123"}]} + ] + + +def test_treatment_normalize_complex_lists_passthrough(): + """ + When no normalization rule can be applied, return input 1:1. + """ + transformation = Treatment(normalize_complex_lists=True) + assert transformation.apply([{"data": [{"abc": 123.42}, {"abc": None}]}]) == [ + {"data": [{"abc": 123.42}, {"abc": None}]} + ] + + def test_treatment_convert_string(): """ Verify treating nested data to convert values into strings works.