From 90e9b3dbc3257b4142877f6f0462399ca7da1dcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Thu, 24 Oct 2024 18:26:18 -0600 Subject: [PATCH] feat(target): SQL target developers can now more easily override the mapping from JSON schema to SQL column type --- singer_sdk/connectors/sql.py | 189 ++++++++++++++++++++++++++++++- singer_sdk/typing.py | 4 + tests/core/test_connector_sql.py | 142 ++++++++++++++++++++++- tests/core/test_sql_typing.py | 3 +- tests/core/test_typing.py | 1 + 5 files changed, 333 insertions(+), 6 deletions(-) diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index 98e6ea7d6..7fd6d4b82 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -25,6 +25,11 @@ else: from warnings import deprecated +if sys.version_info < (3, 10): + from typing_extensions import TypeAlias +else: + from typing import TypeAlias # noqa: ICN003 + if t.TYPE_CHECKING: from sqlalchemy.engine import Engine from sqlalchemy.engine.reflection import Inspector @@ -192,6 +197,175 @@ def boolean_to_jsonschema(self, column_type: sa.types.Boolean) -> dict: # noqa: return th.BooleanType.type_dict # type: ignore[no-any-return] +JSONtoSQLHandler: TypeAlias = t.Union[ + t.Type[sa.types.TypeEngine], + t.Callable[[dict], sa.types.TypeEngine], +] + + +class JSONSchemaToSQL: + """A configurable mapper for converting JSON Schema types to SQLAlchemy types.""" + + def __init__(self) -> None: + """Initialize the mapper with default type mappings.""" + # Default type mappings + self._type_mapping: dict[str, JSONtoSQLHandler] = { + "string": self._handle_string_type, + "integer": sa.types.INTEGER, + "number": sa.types.DECIMAL, + "boolean": sa.types.BOOLEAN, + "object": sa.types.VARCHAR, + "array": sa.types.VARCHAR, + } + + # Format handlers for string types + self._format_handlers: dict[str, JSONtoSQLHandler] = { + # Default date-like formats + "date-time": sa.types.DATETIME, + "time": sa.types.TIME, + "date": sa.types.DATE, + # Common string formats with sensible defaults + "uuid": sa.types.UUID, + "email": lambda _: sa.types.VARCHAR(254), # RFC 5321 + "uri": lambda _: sa.types.VARCHAR(2083), # Common browser limit + "hostname": lambda _: sa.types.VARCHAR(253), # RFC 1035 + "ipv4": lambda _: sa.types.VARCHAR(15), + "ipv6": lambda _: sa.types.VARCHAR(45), + } + + def _invoke_handler( # noqa: PLR6301 + self, + handler: JSONtoSQLHandler, + schema: dict, + ) -> sa.types.TypeEngine: + """Invoke a handler, handling both type classes and callables. + + Args: + handler: The handler to invoke. + schema: The schema to pass to callable handlers. + + Returns: + The resulting SQLAlchemy type. + """ + if isinstance(handler, type): + return handler() # type: ignore[no-any-return] + return handler(schema) + + def register_type_handler(self, json_type: str, handler: JSONtoSQLHandler) -> None: + """Register a custom type handler. + + Args: + json_type: The JSON Schema type to handle. + handler: Either a SQLAlchemy type class or a callable that takes a schema + dict and returns a SQLAlchemy type instance. + """ + self._type_mapping[json_type] = handler + + def register_format_handler( + self, + format_name: str, + handler: JSONtoSQLHandler, + ) -> None: + """Register a custom format handler. + + Args: + format_name: The format string (e.g., "date-time", "email", "custom-format"). + handler: Either a SQLAlchemy type class or a callable that takes a schema + dict and returns a SQLAlchemy type instance. + """ # noqa: E501 + self._format_handlers[format_name] = handler + + def _get_type_from_schema(self, schema: dict) -> sa.types.TypeEngine | None: + """Try to get a SQL type from a single schema object. + + Args: + schema: The JSON Schema object. + + Returns: + SQL type if one can be determined, None otherwise. + """ + # Check if this is a string with format first + if schema.get("type") == "string" and "format" in schema: + format_type = self._handle_format(schema) + if format_type is not None: + return format_type + + # Then check regular types + if "type" in schema: + schema_type = schema["type"] + if isinstance(schema_type, (list, tuple)): + # For type arrays, try each type + for t in schema_type: + if handler := self._type_mapping.get(t): + return self._invoke_handler(handler, schema) + elif schema_type in self._type_mapping: + handler = self._type_mapping[schema_type] + return self._invoke_handler(handler, schema) + + return None + + def _handle_format(self, schema: dict) -> sa.types.TypeEngine | None: + """Handle format-specific type conversion. + + Args: + schema: The JSON Schema object. + + Returns: + The format-specific SQL type if applicable, None otherwise. + """ + if "format" not in schema: + return None + + format_string: str = schema["format"] + + if handler := self._format_handlers.get(format_string): + return self._invoke_handler(handler, schema) + + return None + + def _handle_string_type(self, schema: dict) -> sa.types.TypeEngine: + """Handle string type conversion with special cases for formats. + + Args: + schema: The JSON Schema object. + + Returns: + Appropriate SQLAlchemy type. + """ + # Check for format-specific handling first + if format_type := self._handle_format(schema): + return format_type + + # Default string handling + max_length: int | None = schema.get("maxLength") + return sa.types.VARCHAR(max_length) + + def to_sql_type(self, schema: dict) -> sa.types.TypeEngine: + """Convert a JSON Schema type definition to a SQLAlchemy type. + + Args: + schema: The JSON Schema object. + + Returns: + The corresponding SQLAlchemy type. + """ + if sql_type := self._get_type_from_schema(schema): + return sql_type + + # Handle anyOf + if "anyOf" in schema: + for subschema in schema["anyOf"]: + # Skip null types in anyOf + if subschema.get("type") == "null": + continue + + if sql_type := self._get_type_from_schema(subschema): + return sql_type + + # Fallback + return sa.types.VARCHAR() + + class SQLConnector: # noqa: PLR0904 """Base class for SQLAlchemy-based connectors. @@ -255,6 +429,16 @@ def sql_to_jsonschema(self) -> SQLToJSONSchema: """ return SQLToJSONSchema() + @functools.cached_property + def jsonschema_to_sql(self) -> JSONSchemaToSQL: + """The JSON-to-SQL type mapper object for this SQL connector. + + Override this property to provide a custom mapping for your SQL dialect. + + .. versionadded:: 0.42.0 + """ + return JSONSchemaToSQL() + @contextmanager def _connect(self) -> t.Iterator[sa.engine.Connection]: with self._engine.connect().execution_options(stream_results=True) as conn: @@ -418,8 +602,7 @@ def to_jsonschema_type( msg = f"Unexpected type received: '{type(sql_type).__name__}'" raise ValueError(msg) - @staticmethod - def to_sql_type(jsonschema_type: dict) -> sa.types.TypeEngine: + def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine: """Return a JSON Schema representation of the provided type. By default will call `typing.to_sql_type()`. @@ -435,7 +618,7 @@ def to_sql_type(jsonschema_type: dict) -> sa.types.TypeEngine: Returns: The SQLAlchemy type representation of the data type. """ - return th.to_sql_type(jsonschema_type) + return self.jsonschema_to_sql.to_sql_type(jsonschema_type) @staticmethod def get_fully_qualified_name( diff --git a/singer_sdk/typing.py b/singer_sdk/typing.py index a21447a49..d1fe530d6 100644 --- a/singer_sdk/typing.py +++ b/singer_sdk/typing.py @@ -1205,6 +1205,10 @@ def _jsonschema_type_check(jsonschema_type: dict, type_check: tuple[str]) -> boo ) +@deprecated( + "Use `JSONSchemaToSQL` instead.", + category=DeprecationWarning, +) def to_sql_type( # noqa: PLR0911, C901 jsonschema_type: dict, ) -> sa.types.TypeEngine: diff --git a/tests/core/test_connector_sql.py b/tests/core/test_connector_sql.py index 94bce926e..3c86f3471 100644 --- a/tests/core/test_connector_sql.py +++ b/tests/core/test_connector_sql.py @@ -11,7 +11,11 @@ from samples.sample_duckdb import DuckDBConnector from singer_sdk.connectors import SQLConnector -from singer_sdk.connectors.sql import FullyQualifiedName, SQLToJSONSchema +from singer_sdk.connectors.sql import ( + FullyQualifiedName, + JSONSchemaToSQL, + SQLToJSONSchema, +) from singer_sdk.exceptions import ConfigValidationError if t.TYPE_CHECKING: @@ -445,7 +449,7 @@ def test_sql_to_json_schema_map( assert m.to_jsonschema(sql_type) == expected_jsonschema_type -def test_custom_type(): +def test_custom_type_to_jsonschema(): class MyMap(SQLToJSONSchema): @SQLToJSONSchema.to_jsonschema.register def custom_number_to_jsonschema(self, column_type: sa.types.NUMERIC) -> dict: @@ -470,3 +474,137 @@ def my_type_to_jsonschema(self, column_type) -> dict: # noqa: ARG002 "multipleOf": 0.01, } assert m.to_jsonschema(sa.types.BOOLEAN()) == {"type": ["boolean"]} + + +class TestJSONSchemaToSQL: + @pytest.fixture + def json_schema_to_sql(self) -> JSONSchemaToSQL: + return JSONSchemaToSQL() + + def test_register_jsonschema_type_handler( + self, + json_schema_to_sql: JSONSchemaToSQL, + ): + json_schema_to_sql.register_type_handler("my-type", sa.types.LargeBinary) + result = json_schema_to_sql.to_sql_type({"type": "my-type"}) + assert isinstance(result, sa.types.LargeBinary) + + def test_register_jsonschema_format_handler( + self, + json_schema_to_sql: JSONSchemaToSQL, + ): + json_schema_to_sql.register_format_handler("my-format", sa.types.LargeBinary) + result = json_schema_to_sql.to_sql_type( + { + "type": "string", + "format": "my-format", + } + ) + assert isinstance(result, sa.types.LargeBinary) + + def test_string(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"type": ["string", "null"]} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.VARCHAR) + assert result.length is None + + def test_string_max_length(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"type": ["string", "null"], "maxLength": 10} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance( + json_schema_to_sql.to_sql_type(jsonschema_type), + sa.types.VARCHAR, + ) + assert result.length == 10 + + def test_integer(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"type": ["integer", "null"]} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.INTEGER) + + def test_number(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"type": ["number", "null"]} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.DECIMAL) + + def test_boolean(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"type": ["boolean", "null"]} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.BOOLEAN) + + def test_object(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"type": "object", "properties": {}} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.VARCHAR) + + def test_array(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"type": "array"} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.VARCHAR) + + def test_array_items(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"type": "array", "items": {"type": "string"}} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.VARCHAR) + + def test_date(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"format": "date", "type": ["string", "null"]} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.DATE) + + def test_time(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"format": "time", "type": ["string", "null"]} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.TIME) + + def test_uuid(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"format": "uuid", "type": ["string", "null"]} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.UUID) + + def test_datetime(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"format": "date-time", "type": ["string", "null"]} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.DATETIME) + + def test_anyof_datetime(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = { + "anyOf": [ + {"type": "string", "format": "date-time"}, + {"type": "null"}, + ], + } + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.DATETIME) + + def test_anyof_integer(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = { + "anyOf": [ + {"type": "integer"}, + {"type": "null"}, + ], + } + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.INTEGER) + + def test_complex(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = { + "type": [ + "array", + "object", + "boolean", + "null", + ] + } + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.VARCHAR) + + def test_unknown_type(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"cannot": "compute"} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.VARCHAR) + + def test_unknown_format(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"type": "string", "format": "unknown"} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.VARCHAR) diff --git a/tests/core/test_sql_typing.py b/tests/core/test_sql_typing.py index 5662d7d0d..8b6ff1a96 100644 --- a/tests/core/test_sql_typing.py +++ b/tests/core/test_sql_typing.py @@ -49,7 +49,8 @@ def test_convert_jsonschema_type_to_sql_type( jsonschema_type: dict, sql_type: sa.types.TypeEngine, ): - result = th.to_sql_type(jsonschema_type) + with pytest.warns(DeprecationWarning): + result = th.to_sql_type(jsonschema_type) assert isinstance(result, sql_type.__class__) assert str(result) == str(sql_type) diff --git a/tests/core/test_typing.py b/tests/core/test_typing.py index 219c9a587..c6919a81d 100644 --- a/tests/core/test_typing.py +++ b/tests/core/test_typing.py @@ -325,6 +325,7 @@ def test_conform_primitives(): assert _conform_primitive_property(1, {"type": ["boolean"]}) is True +@pytest.mark.filterwarnings("ignore:Use `JSONSchemaToSQL` instead.:DeprecationWarning") @pytest.mark.parametrize( "jsonschema_type,expected", [