Skip to content

Commit

Permalink
Zyp Treatments: Add normalize_complex_lists option
Browse files Browse the repository at this point in the history
Because CrateDB can not store lists of varying objects, try to normalize
them, currently biased towards strings and floats.
  • Loading branch information
amotl committed Sep 10, 2024
1 parent f9d5926 commit 62a3bbb
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 1 deletion.
65 changes: 64 additions & 1 deletion src/zyp/model/treatment.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import builtins
import typing as t

from attr import Factory
Expand All @@ -13,6 +14,7 @@ class Treatment(Dumpable):
convert_list: t.List[str] = Factory(list)
convert_string: t.List[str] = Factory(list)
convert_dict: t.List[t.Dict[str, str]] = Factory(list)
normalize_complex_lists: bool = False
prune_invalid_date: t.List[str] = Factory(list)

def apply(self, data: DictOrList) -> DictOrList:
Expand All @@ -28,7 +30,7 @@ def apply_record(self, data: Record) -> Record:
local_ignores = []
if self.ignore_complex_lists:
for k, v in data.items():
if isinstance(v, list) and v and isinstance(v[0], dict):
if self.is_list_of_dicts(v):
# Skip ignoring special-encoded items.
if v[0] and list(v[0].keys())[0].startswith("$"):
continue
Expand All @@ -39,6 +41,12 @@ def apply_record(self, data: Record) -> Record:
if ignore_name in data:
del data[ignore_name]

# Apply normalization for lists of objects.
if self.normalize_complex_lists:
for _, v in data.items():
if self.is_list_of_dicts(v):
ListOfVaryingObjectsNormalizer(v).apply()

# Converge certain items to `list` even when defined differently.
for to_list_name in self.convert_list:
if to_list_name in data and not isinstance(data[to_list_name], list):
Expand Down Expand Up @@ -66,3 +74,58 @@ def apply_record(self, data: Record) -> Record:
del data[key]

return data

@staticmethod
def is_list_of_dicts(v: t.Any) -> bool:
return isinstance(v, list) and bool(v) and isinstance(v[0], dict)


@define
class NormalizerRule:
"""
Manage details of a normalizer rule.
"""

name: str
converter: t.Callable


@define
class ListOfVaryingObjectsNormalizer:
"""
CrateDB can not store lists of varying objects, so try to normalize them.
"""

data: Collection

def apply(self):
self.apply_rules(self.get_rules(self.type_stats()))

def apply_rules(self, rules: t.List[NormalizerRule]) -> None:
for item in self.data:
for rule in rules:
name = rule.name
if name in item:
item[name] = rule.converter(item[name])

def get_rules(self, statistics) -> t.List[NormalizerRule]:
rules = []
for name, types in statistics.items():
if len(types) > 1:
rules.append(NormalizerRule(name=name, converter=self.get_best_converter(types)))
return rules

def type_stats(self) -> t.Dict[str, t.List[str]]:
types: t.Dict[str, t.List[str]] = {}
for item in self.data:
for key, value in item.items():
types.setdefault(key, []).append(type(value).__name__)
return types

@staticmethod
def get_best_converter(types: t.List[str]) -> t.Callable:
if "str" in types:
return builtins.str
if "float" in types and "int" in types and "str" not in types:
return builtins.float
return lambda x: x
1 change: 1 addition & 0 deletions tests/transform/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

RESET_TABLES = [
"from.dynamodb",
"from.generic",
]


Expand Down
41 changes: 41 additions & 0 deletions tests/transform/test_zyp_generic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import pytest

from commons_codec.model import SQLOperation
from zyp.model.treatment import Treatment


@pytest.mark.integration
def test_normalize_list_of_objects(caplog, cratedb):
"""
Verify writing record to CrateDB, with transformations.
"""

record_in = {
"_list_float_int": [{"abc": 42.42}, {"abc": 42}],
"_list_float_none": [{"id": 1, "abc": 42.42}, {"id": 2, "abc": None}],
"_list_int_str": [{"abc": 123}, {"abc": "123"}],
}

record_out = {
"_list_float_int": [{"abc": 42.42}, {"abc": 42.0}],
"_list_float_none": [{"id": 1, "abc": 42.42}, {"id": 2}],
"_list_int_str": [{"abc": "123"}, {"abc": "123"}],
}

# Define CrateDB SQL DDL and DML operations (SQL+parameters).
operation_ddl = SQLOperation('CREATE TABLE "from".generic (data OBJECT(DYNAMIC))', None)
operation_dml = SQLOperation('INSERT INTO "from".generic (data) VALUES (:data)', {"data": record_in})

# Apply treatment to parameters.
parameters = operation_dml.parameters
Treatment(normalize_complex_lists=True).apply(parameters)

# Insert into CrateDB.
cratedb.database.run_sql(operation_ddl.statement)
cratedb.database.run_sql(operation_dml.statement, parameters)

# Verify data in target database.
assert cratedb.database.refresh_table("from.generic") is True

results = cratedb.database.run_sql('SELECT * FROM "from".generic;', records=True) # noqa: S608
assert results[0]["data"] == record_out
23 changes: 23 additions & 0 deletions tests/zyp/test_treatment.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,29 @@ def test_treatment_ignore_fields():
assert transformation.apply([{"data": [{"abc": 123}]}]) == [{"data": [{}]}]


def test_treatment_normalize_complex_lists_success():
"""
Verify normalizing lists of objects works.
"""
transformation = Treatment(normalize_complex_lists=True)
assert transformation.apply([{"data": [{"abc": 123.42}, {"abc": 123}]}]) == [
{"data": [{"abc": 123.42}, {"abc": 123.0}]}
]
assert transformation.apply([{"data": [{"abc": 123}, {"abc": "123"}]}]) == [
{"data": [{"abc": "123"}, {"abc": "123"}]}
]


def test_treatment_normalize_complex_lists_passthrough():
"""
When no normalization rule can be applied, return input 1:1.
"""
transformation = Treatment(normalize_complex_lists=True)
assert transformation.apply([{"data": [{"abc": 123.42}, {"abc": None}]}]) == [
{"data": [{"abc": 123.42}, {"abc": None}]}
]


def test_treatment_convert_string():
"""
Verify treating nested data to convert values into strings works.
Expand Down

0 comments on commit 62a3bbb

Please sign in to comment.