Skip to content

Commit

Permalink
fix(mappers): Map record field to a JSON object type (#1885)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon authored Sep 28, 2023
1 parent 1d8143d commit fe7d7d6
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 23 deletions.
39 changes: 16 additions & 23 deletions singer_sdk/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import logging
import typing as t

import singer_sdk.typing as th
from singer_sdk.exceptions import MapExpressionError, StreamMapConfigError
from singer_sdk.helpers import _simpleeval as simpleeval
from singer_sdk.helpers._catalog import get_selected_schema
Expand All @@ -21,15 +22,6 @@
flatten_schema,
get_flattening_options,
)
from singer_sdk.typing import (
CustomType,
IntegerType,
JSONTypeHelper,
NumberType,
PropertiesList,
Property,
StringType,
)

if t.TYPE_CHECKING:
import sys
Expand Down Expand Up @@ -349,8 +341,8 @@ def _eval(
def _eval_type(
self,
expr: str,
default: JSONTypeHelper | None = None,
) -> JSONTypeHelper:
default: th.JSONTypeHelper | None = None,
) -> th.JSONTypeHelper:
"""Evaluate an expression's type.
Args:
Expand All @@ -367,21 +359,22 @@ def _eval_type(
msg = "Expression should be str, not None"
raise ValueError(msg)

default = default or StringType()
default = default or th.StringType()

# If a field is set to "record", then it should be an "object" in the schema
if expr == "record":
return th.CustomType(self.raw_schema)

if expr.startswith("float("):
return NumberType()
return th.NumberType()

if expr.startswith("int("):
return IntegerType()
return th.IntegerType()

if expr.startswith("str("):
return StringType()

if expr[0] == "'" and expr[-1] == "'":
return StringType()
return th.StringType()

return default
return th.StringType() if expr[0] == "'" and expr[-1] == "'" else default

def _init_functions_and_schema( # noqa: PLR0912, PLR0915, C901
self,
Expand Down Expand Up @@ -442,7 +435,7 @@ def _init_functions_and_schema( # noqa: PLR0912, PLR0915, C901
transformed_schema = copy.copy(self.raw_schema)
if not include_by_default:
# Start with only the defined (or transformed) key properties
transformed_schema = PropertiesList().to_dict()
transformed_schema = th.PropertiesList().to_dict()

if "properties" not in transformed_schema:
transformed_schema["properties"] = {}
Expand All @@ -460,7 +453,7 @@ def _init_functions_and_schema( # noqa: PLR0912, PLR0915, C901
raise StreamMapConfigError(msg)
transformed_schema["properties"].pop(prop_key, None)
elif isinstance(prop_def, str):
default_type: JSONTypeHelper = StringType() # Fallback to string
default_type: th.JSONTypeHelper = th.StringType() # Fallback to string
existing_schema: dict = (
# Use transformed schema if available
transformed_schema["properties"].get(prop_key, {})
Expand All @@ -469,10 +462,10 @@ def _init_functions_and_schema( # noqa: PLR0912, PLR0915, C901
)
if existing_schema:
# Set default type if property exists already in JSON Schema
default_type = CustomType(existing_schema)
default_type = th.CustomType(existing_schema)

transformed_schema["properties"].update(
Property(
th.Property(
prop_key,
self._eval_type(prop_def, default=default_type),
).to_dict(),
Expand Down
12 changes: 12 additions & 0 deletions tests/core/test_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,18 @@ def _clear_schema_cache() -> None:
"non_pk_passthrough.jsonl",
id="non_pk_passthrough",
),
pytest.param(
{
"mystream": {
"_data": "record",
"__else__": None,
},
},
False,
0,
"record_to_column.jsonl",
id="record_to_column",
),
],
)
def test_mapped_stream(
Expand Down
6 changes: 6 additions & 0 deletions tests/snapshots/mapped_stream/record_to_column.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "mystream", "schema": {"type": "object", "properties": {"_data": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}, "some_numbers": {"items": {"type": ["number"]}, "type": ["array", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}}}, "key_properties": []}
{"type": "RECORD", "stream": "mystream", "record": {"_data": {"email": "alice@example.com", "count": 21, "user": {"id": 1, "sub": {"num": 1}, "some_numbers": [3.14, 2.718]}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"_data": {"email": "bob@example.com", "count": 13, "user": {"id": 2, "sub": {"num": 2}, "some_numbers": [10.32, 1.618]}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "RECORD", "stream": "mystream", "record": {"_data": {"email": "charlie@example.com", "count": 19, "user": {"id": 3, "sub": {"num": 3}, "some_numbers": [1.414, 1.732]}}}, "time_extracted": "2022-01-01T00:00:00+00:00"}
{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}

0 comments on commit fe7d7d6

Please sign in to comment.