diff --git a/singer_sdk/mapper.py b/singer_sdk/mapper.py index cb9fb35b0..f705ac5da 100644 --- a/singer_sdk/mapper.py +++ b/singer_sdk/mapper.py @@ -12,6 +12,7 @@ import logging import typing as t +import singer_sdk.typing as th from singer_sdk.exceptions import MapExpressionError, StreamMapConfigError from singer_sdk.helpers import _simpleeval as simpleeval from singer_sdk.helpers._catalog import get_selected_schema @@ -21,15 +22,6 @@ flatten_schema, get_flattening_options, ) -from singer_sdk.typing import ( - CustomType, - IntegerType, - JSONTypeHelper, - NumberType, - PropertiesList, - Property, - StringType, -) if t.TYPE_CHECKING: import sys @@ -349,8 +341,8 @@ def _eval( def _eval_type( self, expr: str, - default: JSONTypeHelper | None = None, - ) -> JSONTypeHelper: + default: th.JSONTypeHelper | None = None, + ) -> th.JSONTypeHelper: """Evaluate an expression's type. Args: @@ -367,21 +359,22 @@ def _eval_type( msg = "Expression should be str, not None" raise ValueError(msg) - default = default or StringType() + default = default or th.StringType() + + # If a field is set to "record", then it should be an "object" in the schema + if expr == "record": + return th.CustomType(self.raw_schema) if expr.startswith("float("): - return NumberType() + return th.NumberType() if expr.startswith("int("): - return IntegerType() + return th.IntegerType() if expr.startswith("str("): - return StringType() - - if expr[0] == "'" and expr[-1] == "'": - return StringType() + return th.StringType() - return default + return th.StringType() if expr[0] == "'" and expr[-1] == "'" else default def _init_functions_and_schema( # noqa: PLR0912, PLR0915, C901 self, @@ -442,7 +435,7 @@ def _init_functions_and_schema( # noqa: PLR0912, PLR0915, C901 transformed_schema = copy.copy(self.raw_schema) if not include_by_default: # Start with only the defined (or transformed) key properties - transformed_schema = PropertiesList().to_dict() + transformed_schema = th.PropertiesList().to_dict() if "properties" not in transformed_schema: transformed_schema["properties"] = {} @@ -460,7 +453,7 @@ def _init_functions_and_schema( # noqa: PLR0912, PLR0915, C901 raise StreamMapConfigError(msg) transformed_schema["properties"].pop(prop_key, None) elif isinstance(prop_def, str): - default_type: JSONTypeHelper = StringType() # Fallback to string + default_type: th.JSONTypeHelper = th.StringType() # Fallback to string existing_schema: dict = ( # Use transformed schema if available transformed_schema["properties"].get(prop_key, {}) @@ -469,10 +462,10 @@ def _init_functions_and_schema( # noqa: PLR0912, PLR0915, C901 ) if existing_schema: # Set default type if property exists already in JSON Schema - default_type = CustomType(existing_schema) + default_type = th.CustomType(existing_schema) transformed_schema["properties"].update( - Property( + th.Property( prop_key, self._eval_type(prop_def, default=default_type), ).to_dict(), diff --git a/tests/core/test_mapper.py b/tests/core/test_mapper.py index 1cc214810..5905e50c2 100644 --- a/tests/core/test_mapper.py +++ b/tests/core/test_mapper.py @@ -624,6 +624,18 @@ def _clear_schema_cache() -> None: "non_pk_passthrough.jsonl", id="non_pk_passthrough", ), + pytest.param( + { + "mystream": { + "_data": "record", + "__else__": None, + }, + }, + False, + 0, + "record_to_column.jsonl", + id="record_to_column", + ), ], ) def test_mapped_stream( diff --git a/tests/snapshots/mapped_stream/record_to_column.jsonl b/tests/snapshots/mapped_stream/record_to_column.jsonl new file mode 100644 index 000000000..8fc3efb21 --- /dev/null +++ b/tests/snapshots/mapped_stream/record_to_column.jsonl @@ -0,0 +1,6 @@ +{"type": "STATE", "value": {}} +{"type": "SCHEMA", "stream": "mystream", "schema": {"type": "object", "properties": {"_data": {"properties": {"email": {"type": ["string", "null"]}, "count": {"type": ["integer", "null"]}, "user": {"properties": {"id": {"type": ["integer", "null"]}, "sub": {"properties": {"num": {"type": ["integer", "null"]}}, "type": ["object", "null"]}, "some_numbers": {"items": {"type": ["number"]}, "type": ["array", "null"]}}, "type": ["object", "null"]}}, "type": ["object", "null"]}}}, "key_properties": []} +{"type": "RECORD", "stream": "mystream", "record": {"_data": {"email": "alice@example.com", "count": 21, "user": {"id": 1, "sub": {"num": 1}, "some_numbers": [3.14, 2.718]}}}, "time_extracted": "2022-01-01T00:00:00+00:00"} +{"type": "RECORD", "stream": "mystream", "record": {"_data": {"email": "bob@example.com", "count": 13, "user": {"id": 2, "sub": {"num": 2}, "some_numbers": [10.32, 1.618]}}}, "time_extracted": "2022-01-01T00:00:00+00:00"} +{"type": "RECORD", "stream": "mystream", "record": {"_data": {"email": "charlie@example.com", "count": 19, "user": {"id": 3, "sub": {"num": 3}, "some_numbers": [1.414, 1.732]}}}, "time_extracted": "2022-01-01T00:00:00+00:00"} +{"type": "STATE", "value": {"bookmarks": {"mystream": {}}}}