Skip to content

Commit

Permalink
Zyp Treatments: Add normalize_complex_lists option
Browse files Browse the repository at this point in the history
Because CrateDB can not store lists of varying objects, try to normalize
them, currently biased towards strings and floats.
  • Loading branch information
amotl committed Sep 7, 2024
1 parent d40cd82 commit 54bdb4d
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 1 deletion.
65 changes: 64 additions & 1 deletion src/zyp/model/treatment.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import builtins
import typing as t

from attr import Factory
Expand All @@ -13,6 +14,7 @@ class Treatment(Dumpable):
convert_list: t.List[str] = Factory(list)
convert_string: t.List[str] = Factory(list)
convert_dict: t.List[t.Dict[str, str]] = Factory(list)
normalize_complex_lists: bool = False
prune_invalid_date: t.List[str] = Factory(list)

def apply(self, data: DictOrList) -> DictOrList:
Expand All @@ -28,7 +30,7 @@ def apply_record(self, data: Record) -> Record:
local_ignores = []
if self.ignore_complex_lists:
for k, v in data.items():
if isinstance(v, list) and v and isinstance(v[0], dict):
if self.is_list_of_dicts(v):

Check warning on line 33 in src/zyp/model/treatment.py

View check run for this annotation

Codecov / codecov/patch

src/zyp/model/treatment.py#L30-L33

Added lines #L30 - L33 were not covered by tests
# Skip ignoring special-encoded items.
if v[0] and list(v[0].keys())[0].startswith("$"):
continue
Expand All @@ -39,6 +41,12 @@ def apply_record(self, data: Record) -> Record:
if ignore_name in data:
del data[ignore_name]

Check warning on line 42 in src/zyp/model/treatment.py

View check run for this annotation

Codecov / codecov/patch

src/zyp/model/treatment.py#L40-L42

Added lines #L40 - L42 were not covered by tests

# Apply sanitation for lists of objects.
if self.normalize_complex_lists:
for _, v in data.items():
if self.is_list_of_dicts(v):
ListOfVaryingObjectsNormalizer(v).apply()

Check warning on line 48 in src/zyp/model/treatment.py

View check run for this annotation

Codecov / codecov/patch

src/zyp/model/treatment.py#L45-L48

Added lines #L45 - L48 were not covered by tests

# Converge certain items to `list` even when defined differently.
for to_list_name in self.convert_list:
if to_list_name in data and not isinstance(data[to_list_name], list):
Expand Down Expand Up @@ -66,3 +74,58 @@ def apply_record(self, data: Record) -> Record:
del data[key]

Check warning on line 74 in src/zyp/model/treatment.py

View check run for this annotation

Codecov / codecov/patch

src/zyp/model/treatment.py#L68-L74

Added lines #L68 - L74 were not covered by tests

return data

Check warning on line 76 in src/zyp/model/treatment.py

View check run for this annotation

Codecov / codecov/patch

src/zyp/model/treatment.py#L76

Added line #L76 was not covered by tests

@staticmethod
def is_list_of_dicts(v: t.Any) -> bool:
return isinstance(v, list) and bool(v) and isinstance(v[0], dict)

Check warning on line 80 in src/zyp/model/treatment.py

View check run for this annotation

Codecov / codecov/patch

src/zyp/model/treatment.py#L80

Added line #L80 was not covered by tests


@define
class SanitizerRule:
"""
Manage details of a sanitizer rule.
"""

name: str
converter: t.Callable


@define
class ListOfVaryingObjectsNormalizer:
"""
CrateDB can not store lists of varying objects, so try to normalize them.
"""

data: Collection

def apply(self):
self.apply_rules(self.get_rules(self.type_stats()))

Check warning on line 102 in src/zyp/model/treatment.py

View check run for this annotation

Codecov / codecov/patch

src/zyp/model/treatment.py#L102

Added line #L102 was not covered by tests

def apply_rules(self, rules: t.List[SanitizerRule]) -> None:
for item in self.data:
for rule in rules:
name = rule.name
if name in item:
item[name] = rule.converter(item[name])

Check warning on line 109 in src/zyp/model/treatment.py

View check run for this annotation

Codecov / codecov/patch

src/zyp/model/treatment.py#L105-L109

Added lines #L105 - L109 were not covered by tests

def get_rules(self, statistics) -> t.List[SanitizerRule]:
rules = []
for name, types in statistics.items():
if len(types) > 1:
rules.append(SanitizerRule(name=name, converter=self.get_best_converter(types)))
return rules

Check warning on line 116 in src/zyp/model/treatment.py

View check run for this annotation

Codecov / codecov/patch

src/zyp/model/treatment.py#L112-L116

Added lines #L112 - L116 were not covered by tests

def type_stats(self) -> t.Dict[str, t.List[str]]:
types: t.Dict[str, t.List[str]] = {}
for item in self.data:
for key, value in item.items():
types.setdefault(key, []).append(type(value).__name__)
return types

Check warning on line 123 in src/zyp/model/treatment.py

View check run for this annotation

Codecov / codecov/patch

src/zyp/model/treatment.py#L119-L123

Added lines #L119 - L123 were not covered by tests

@staticmethod
def get_best_converter(types: t.List[str]) -> t.Callable:
if "str" in types:
return builtins.str
if "float" in types and "int" in types and "str" not in types:
return builtins.float
return lambda x: x

Check warning on line 131 in src/zyp/model/treatment.py

View check run for this annotation

Codecov / codecov/patch

src/zyp/model/treatment.py#L127-L131

Added lines #L127 - L131 were not covered by tests
1 change: 1 addition & 0 deletions tests/transform/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

RESET_TABLES = [
"from.dynamodb",
"from.generic",
]


Expand Down
41 changes: 41 additions & 0 deletions tests/transform/test_zyp_generic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import pytest

from commons_codec.model import SQLOperation
from zyp.model.treatment import Treatment

RECORD_IN = {
"_list_float_int": [{"abc": 42.42}, {"abc": 42}],
"_list_float_none": [{"id": 1, "abc": 42.42}, {"id": 2, "abc": None}],
"_list_int_str": [{"abc": 123}, {"abc": "123"}],
}

RECORD_OUT = {
"_list_float_int": [{"abc": 42.42}, {"abc": 42.0}],
"_list_float_none": [{"id": 1, "abc": 42.42}, {"id": 2}],
"_list_int_str": [{"abc": "123"}, {"abc": "123"}],
}


@pytest.mark.integration
def test_normalize_list_of_objects(caplog, cratedb):
"""
Verify writing record to CrateDB, with transformations.
"""

# Define CrateDB SQL DDL and DML operations (SQL+parameters).
operation_ddl = SQLOperation('CREATE TABLE "from".generic (data OBJECT(DYNAMIC))', None)
operation_dml = SQLOperation('INSERT INTO "from".generic (data) VALUES (:data)', {"data": RECORD_IN})

# Apply treatment to parameters.
parameters = operation_dml.parameters
Treatment(normalize_complex_lists=True).apply(parameters)

# Insert into CrateDB.
cratedb.database.run_sql(operation_ddl.statement)
cratedb.database.run_sql(operation_dml.statement, parameters)

# Verify data in target database.
assert cratedb.database.refresh_table("from.generic") is True

results = cratedb.database.run_sql('SELECT * FROM "from".generic;', records=True) # noqa: S608
assert results[0]["data"] == RECORD_OUT
23 changes: 23 additions & 0 deletions tests/zyp/test_treatment.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,29 @@ def test_treatment_ignore_fields():
assert transformation.apply([{"data": [{"abc": 123}]}]) == [{"data": [{}]}]


def test_treatment_normalize_complex_lists_success():
"""
Verify normalizing lists of objects works.
"""
transformation = Treatment(normalize_complex_lists=True)
assert transformation.apply([{"data": [{"abc": 123.42}, {"abc": 123}]}]) == [
{"data": [{"abc": 123.42}, {"abc": 123.0}]}
]
assert transformation.apply([{"data": [{"abc": 123}, {"abc": "123"}]}]) == [
{"data": [{"abc": "123"}, {"abc": "123"}]}
]


def test_treatment_normalize_complex_lists_passthrough():
"""
When no normalization rule can be applied, return input 1:1.
"""
transformation = Treatment(normalize_complex_lists=True)
assert transformation.apply([{"data": [{"abc": 123.42}, {"abc": None}]}]) == [
{"data": [{"abc": 123.42}, {"abc": None}]}
]


def test_treatment_convert_string():
"""
Verify treating nested data to convert values into strings works.
Expand Down

0 comments on commit 54bdb4d

Please sign in to comment.