Skip to content

Commit

Permalink
Zyp Treatments: A slightly tailored transformation subsystem
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Sep 7, 2024
1 parent 7171fa8 commit d40cd82
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 12 deletions.
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## Unreleased
- Added Zyp Treatments, a slightly tailored transformation subsystem

## 2024/09/02 v0.0.14
- Replace poor man's relation name quoting with implementation
Expand Down Expand Up @@ -41,7 +42,7 @@
column. This allows defining primary keys on the sink table.

## 2024/08/14 v0.0.4
- Added `BucketTransformation`, a minimal transformation engine
- Added Zyp Transformations, a minimal transformation engine
based on JSON Pointer (RFC 6901).
- Added documentation using Sphinx and Read the Docs

Expand Down
4 changes: 4 additions & 0 deletions src/zyp/model/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@

from zyp.util.data import no_privates_no_nulls_no_empties

Record = t.Dict[str, t.Any]
Collection = t.List[Record]
DictOrList = t.Union[Record, Collection]


@define
class Metadata:
Expand Down
4 changes: 0 additions & 4 deletions src/zyp/model/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@
logger = logging.getLogger(__name__)


Record = t.Dict[str, t.Any]
Collection = t.List[Record]
DictOrList = t.Union[Record, Collection]
TransonTemplate = t.Dict[str, t.Any]

MokshaTransformer = t.Union[jmespath.parser.ParsedResult, jq._Program, transon.Transformer]


Expand Down
7 changes: 5 additions & 2 deletions src/zyp/model/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

from attrs import define

from zyp.model.base import Dumpable, Metadata, SchemaDefinition
from zyp.model.bucket import BucketTransformation, Collection, DictOrList
from zyp.model.base import Collection, DictOrList, Dumpable, Metadata, SchemaDefinition
from zyp.model.bucket import BucketTransformation
from zyp.model.moksha import MokshaTransformation
from zyp.model.treatment import Treatment


@define(frozen=True)
Expand Down Expand Up @@ -35,4 +36,6 @@ def apply(self, data: DictOrList) -> Collection:
collection_out.append(item)
if self.post:
collection_out = t.cast(Collection, self.post.apply(collection_out))
if self.treatment:
collection_out = t.cast(Collection, self.treatment.apply(collection_out))
return collection_out
3 changes: 2 additions & 1 deletion src/zyp/model/moksha.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from attr import Factory
from attrs import define

from zyp.model.bucket import ConverterBase, DictOrList, MokshaTransformer, TransonTemplate
from zyp.model.base import DictOrList
from zyp.model.bucket import ConverterBase, MokshaTransformer, TransonTemplate
from zyp.util.expression import compile_expression


Expand Down
68 changes: 68 additions & 0 deletions src/zyp/model/treatment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import typing as t

from attr import Factory
from attrs import define

from zyp.model.base import Collection, DictOrList, Dumpable, Record


@define
class Treatment(Dumpable):
ignore_complex_lists: bool = False
ignore_field: t.List[str] = Factory(list)
convert_list: t.List[str] = Factory(list)
convert_string: t.List[str] = Factory(list)
convert_dict: t.List[t.Dict[str, str]] = Factory(list)
prune_invalid_date: t.List[str] = Factory(list)

def apply(self, data: DictOrList) -> DictOrList:
if isinstance(data, dict):
self.apply_record(data)
return {k: self.apply(v) for (k, v) in data.items()}
elif isinstance(data, list):
return t.cast(list, [self.apply(v) for v in data])
return data

def apply_record(self, data: Record) -> Record:
# Optionally ignore lists of complex objects.
local_ignores = []
if self.ignore_complex_lists:
for k, v in data.items():
if isinstance(v, list) and v and isinstance(v[0], dict):
# Skip ignoring special-encoded items.
if v[0] and list(v[0].keys())[0].startswith("$"):
continue
local_ignores.append(k)

# Apply global and computed ignores.
for ignore_name in self.ignore_field + local_ignores:
if ignore_name in data:
del data[ignore_name]

# Converge certain items to `list` even when defined differently.
for to_list_name in self.convert_list:
if to_list_name in data and not isinstance(data[to_list_name], list):
data[to_list_name] = [data[to_list_name]]

# Converge certain items to `str` even when defined differently.
for name in self.convert_string:
if name in data and not isinstance(data[name], str):
data[name] = str(data[name])

# Converge certain items to `dict` even when defined differently.
for rule in self.convert_dict:
name = rule["name"]
wrapper_name = rule["wrapper_name"]
if name in data and not isinstance(data[name], dict):
data[name] = {wrapper_name: data[name]}

# Prune invalid date representations.
for key in self.prune_invalid_date:
if key in data:
if not isinstance(data[key], dict):
del data[key]
elif "date" in data[key]:
if isinstance(data[key]["date"], str):
del data[key]

return data
21 changes: 17 additions & 4 deletions tests/zyp/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,31 @@ def test_collection_transformation_serialize():
}
dict_result = transformation.to_dict()
assert dict_result == transformation_dict
return

yaml_result = transformation.to_yaml()
assert yaml.full_load(yaml_result) == transformation_dict
CollectionTransformation.from_yaml(yaml_result)
transformation_second = CollectionTransformation.from_yaml(yaml_result)
assert isinstance(transformation_second, CollectionTransformation)


def test_collection_transformation_load_and_apply():
def test_collection_transformation_regular_load_and_apply():
"""
Verify transformation can be loaded from JSON and applied again.
Verify rule-based transformations can be loaded and applied.
"""
payload = Path("tests/zyp/transformation-collection.yaml").read_text()
transformation = CollectionTransformation.from_yaml(payload)
result = transformation.apply(deepcopy(ComplexRecipe.data_in))
assert result == ComplexRecipe.data_out


def test_collection_transformation_treatment_load_and_apply():
"""
Verify collection transformation with treatment can be loaded and applied.
"""
payload = Path("tests/zyp/transformation-collection-treatment.yaml").read_text()
transformation = CollectionTransformation.from_yaml(payload)
result = transformation.apply(deepcopy(ComplexRecipe.data_in))
assert result == {
"message-source": "system-3000",
"message-type": "eai-warehouse",
}
103 changes: 103 additions & 0 deletions tests/zyp/test_treatment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from zyp.model.treatment import Treatment

RECORD_IN = {
"data": {
"ignore_complex_list": [{}],
"ignore_field": 123,
"invalid_date_scalar": 123,
"invalid_date_nested": {"date": "123"},
"to_string": 123,
"to_list": 123,
"to_dict": 123,
},
}

RECORD_OUT = {
"data": {
"to_string": "123",
"to_list": [123],
"to_dict": {"id": 123},
},
}


def test_treatment_all():
"""
Verify treating nested data.
"""
transformation = Treatment(
ignore_complex_lists=True,
ignore_field=["ignore_field"],
prune_invalid_date=["invalid_date_scalar", "invalid_date_nested"],
convert_dict=[{"name": "to_dict", "wrapper_name": "id"}],
convert_list=["to_list"],
convert_string=["to_string"],
)
assert transformation.apply(RECORD_IN) == RECORD_OUT


def test_treatment_noop():
"""
Treating nested data without rules will yield the same result.
"""
transformation = Treatment()
assert transformation.apply([{"data": {"abc": 123}}]) == [{"data": {"abc": 123}}]


def test_treatment_ignore_complex_lists_basic():
"""
Verify the "ignore_complex_lists" directive works.
"""
transformation = Treatment(ignore_complex_lists=True)
assert transformation.apply([{"data": [{"abc": 123}]}]) == [{}]


def test_treatment_ignore_complex_lists_with_specials():
"""
Verify the "ignore_complex_lists" directive does not remove special encoded fields.
"""
transformation = Treatment(ignore_complex_lists=True)
assert transformation.apply([{"data": [{"abc": 123}], "stamps": [{"$date": 123}]}]) == [
{"stamps": [{"$date": 123}]}
]


def test_treatment_ignore_fields():
"""
Verify ignoring fields works.
"""
transformation = Treatment(ignore_field=["abc"])
assert transformation.apply([{"data": [{"abc": 123}]}]) == [{"data": [{}]}]


def test_treatment_convert_string():
"""
Verify treating nested data to convert values into strings works.
"""
transformation = Treatment(convert_string=["abc"])
assert transformation.apply([{"data": [{"abc": 123}]}]) == [{"data": [{"abc": "123"}]}]


def test_treatment_convert_list():
"""
Verify treating nested data to convert values into lists works.
"""
transformation = Treatment(convert_list=["abc"])
assert transformation.apply([{"data": [{"abc": 123}]}]) == [{"data": [{"abc": [123]}]}]


def test_treatment_convert_dict():
"""
Verify treating nested data to convert values into dicts works.
"""
transformation = Treatment(convert_dict=[{"name": "abc", "wrapper_name": "id"}])
assert transformation.apply([{"data": [{"abc": 123}]}]) == [{"data": [{"abc": {"id": 123}}]}]


def test_treatment_prune_invalid_date():
"""
Verify pruning invalid dates works.
"""
transformation = Treatment(prune_invalid_date=["date"])
assert transformation.apply([{"data": [{"date": 123}]}]) == [{"data": [{}]}]
assert transformation.apply([{"data": [{"date": {"date": 123}}]}]) == [{"data": [{"date": {}}]}]
5 changes: 5 additions & 0 deletions tests/zyp/transformation-collection-treatment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
meta:
version: 1
type: zyp-collection
treatment:
ignore_complex_lists: true

0 comments on commit d40cd82

Please sign in to comment.